網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
一、并行概念
1、并行編程
在.NET 4中的并行編程是依賴Task Parallel Library(后面簡(jiǎn)稱為TPL) 實(shí)現(xiàn)的。在TPL中,最基本的執(zhí)行單元是task(中文可以理解為"任務(wù)"),一個(gè)task就代表了你要執(zhí)行的一個(gè)操作。你可以為你所要執(zhí)行的每一個(gè)操作定義一個(gè)task,TPL就負(fù)責(zé)創(chuàng)建線程來執(zhí)行你所定義的task,并且管理線程。TPL是面向task的,自動(dòng)的;而傳統(tǒng)的多線程是以人工為導(dǎo)向的。
現(xiàn)在已經(jīng)進(jìn)入了多核的時(shí)代,我們的程序如何更多的利用好硬件cpu,答案是并行處理。在.net4.0之前我們要開發(fā)并行的程序是非常的困難,在.net4.0中,在命名空間System.Threading.Tasks提供了方便的并行開發(fā)的類庫(kù)。
2、數(shù)據(jù)并行
數(shù)據(jù)并行指的是對(duì)源集合或數(shù)組的元素同時(shí)(即,并行)執(zhí)行相同操作的場(chǎng)景。 在數(shù)據(jù)并行操作中,對(duì)源集合進(jìn)行分區(qū),以便多個(gè)線程能夠同時(shí)在不同的網(wǎng)段上操作。
任務(wù)并行庫(kù) (TPL) 支持通過?System.Threading.Tasks.Parallel 類實(shí)現(xiàn)的數(shù)據(jù)并行。 此類對(duì) for 循環(huán)和 foreach 循環(huán)提供了基于方法的并行執(zhí)行。你為Parallel.For 或 Parallel.ForEach 循環(huán)編寫的循環(huán)邏輯與編寫連續(xù)循環(huán)的相似。 無需創(chuàng)建線程或列工作項(xiàng)。 在基本循環(huán)中,不需要加鎖。TPL 為你處理所有低級(jí)別的工作。
Parallel.For()和Parallel.ForEach()方法多次調(diào)用同一個(gè)方法,而Parallel.Invoke()方法允許同時(shí)調(diào)用不同的方法。
二、Parallel.Invoke():并行調(diào)用多個(gè)任務(wù) 。
例1:同時(shí)調(diào)用2個(gè)任務(wù)
static void Main(string[] args)
{
var watch = Stopwatch.StartNew();
Parallel.Invoke(Run1, Run2);
watch.Stop();
Console.WriteLine("我是并行開發(fā),總共耗時(shí):{0}", watch.ElapsedMilliseconds)
}
static void Run1()
{
Console.WriteLine("我是任務(wù)一,我跑了3s");
Thread.Sleep(3000);
}
static void Run2()
{
Console.WriteLine("我是任務(wù)二,我跑了5s");
Thread.Sleep(5000);
}
例2:說明并不是每個(gè)任務(wù)一個(gè)線程。
// 定義一個(gè)線程局部變量,返回其線程名
ThreadLocal<string> ThreadName = new ThreadLocal<string>(() =>
{
return "Thread" + Thread.CurrentThread.ManagedThreadId;
});
// 打印出當(dāng)前線程名的方法。
Action action = () =>
{
// 如果 ThreadName.IsValueCreated 為true,在這個(gè)線程上不是第一次運(yùn)行這個(gè)方法。
bool repeat = ThreadName.IsValueCreated;
Console.WriteLine("ThreadName = {0} {1}", ThreadName.Value, repeat ? "(repeat)" : "");
};
// 調(diào)用8個(gè)方法,你應(yīng)該會(huì)看到一些重復(fù)的線程名
Parallel.Invoke(action, action, action, action, action, action, action, action);
ThreadName.Dispose();
三、Parallel.For():?for 循環(huán)的并行運(yùn)算?
我們知道串行代碼中也有一個(gè)for,但是那個(gè)for并沒有用到多核,而Paraller.for它會(huì)在底層根據(jù)硬件線程的運(yùn)行狀況來充分的使用所有的可利用的硬件線程,注意這里的Parallel.for的步行是1。
在For()方法中,前兩個(gè)參數(shù)定義了循環(huán)的開頭和結(jié)束。示例從0迭代到9。第3個(gè)參數(shù)是一個(gè) Action<int>委托。整數(shù)參數(shù)是循環(huán)的迭代次數(shù),該參數(shù)被傳遞給Action < int >委托引用的方法。 Parallel.For方法的返回類型是ParallelLoopResult結(jié)構(gòu),它提供了循環(huán)是否結(jié)束的信息。
ParallelLoopResult result = Parallel.For(0, 10, i =>
{
Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10);
});
Console.WriteLine(result.IsCompleted);
首先先寫一個(gè)普通的循環(huán):
private void NormalFor()
{
for (var i = 0; i < 10000; i++)
{
for (var j = 0; j < 1000; j++)
{
for (var k = 0; k < 100; k++)
{
DoSomething();
}
}
}
}
再看一個(gè)并行的For語(yǔ)句:
private void ParallelFor()
{
Parallel.For(0, 10000, i =>
{
for (int j = 0; j < 1000; j++)
{
for (var k = 0; k < 100; k++)
{
DoSomething();
}
}
});
}
上面的例子中,只是將最外層的For語(yǔ)句替換成了Parallel.For,Parallel執(zhí)行速度可以提高近一倍。
四、Parallel.ForEach():foreach 循環(huán)的并行運(yùn)算?
private void NormalForeach()
{
foreach (var file in GetFiles())
{
DoSomething();
}
}
private void ParallelForeach()
{
Parallel.ForEach(GetFiles(), file => {
DoSomething();
});
}
ForEach的使用跟For使用幾乎是差不多了,只是在對(duì)非泛型的Collection進(jìn)行操作的時(shí)候,需要通過Cast方法進(jìn)行轉(zhuǎn)換。
ForEach的獨(dú)到之處就是可以將數(shù)據(jù)進(jìn)行分區(qū),每一個(gè)小區(qū)內(nèi)實(shí)現(xiàn)串行計(jì)算,分區(qū)采用Partitioner.Create實(shí)現(xiàn)。
for (int j = 1; j < 4; j++)
{
Console.WriteLine("\n第{0}次比較", j);
ConcurrentBag<int> bag = new ConcurrentBag<int>();
var watch = Stopwatch.StartNew();
watch.Start();
for (int i = 0; i < 3000000; i++)
{
bag.Add(i);
}
Console.WriteLine("串行計(jì)算:集合有:{0},總共耗時(shí):{1}", bag.Count, watch.ElapsedMilliseconds);
GC.Collect();
bag = new ConcurrentBag<int>();
watch = Stopwatch.StartNew();
watch.Start();
Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
{
for (int m = i.Item1; m < i.Item2; m++)
{
bag.Add(m);
}
});
Console.WriteLine("并行計(jì)算:集合有:{0},總共耗時(shí):{1}", bag.Count, watch.ElapsedMilliseconds);
GC.Collect();
}
五、線程局部變量
下面這段代碼多次運(yùn)行每次的結(jié)果都不一樣,因?yàn)閠otal變量是公共的,而我們的程序是多個(gè)線程的加,而多個(gè)線程之間是不能把數(shù)據(jù)共享的。
public void NormalParallelTest()
{
int[] nums = Enumerable.Range(0, 1000000).ToArray();
long total = 0;
Parallel.For(0,nums.Length,i=>
{
total += nums[i];
});
Console.WriteLine("The total is {0}", total);
}
其實(shí)我們需要的是在每個(gè)線程中計(jì)算出一個(gè)和值,然后再進(jìn)行累加。我們來看看線程局部變量:
泛型方法Parallel.For<T>的原型:
public static ParallelLoopResult
For<TLocal>
(int fromInclusive, int toExclusive,
Func<TLocal> localInit,
Func<int, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal>
localFinally
);
- TLocal:線程變量的類型;第一個(gè)、第二個(gè)參數(shù)就不必多說了,就是起始值跟結(jié)束值。
- localInit:每個(gè)線程的線程局部變量初始值的設(shè)置;
- body:每次循環(huán)執(zhí)行的方法,其中方法的最后一個(gè)參數(shù)就是線程局部變量;
- localFinally:每個(gè)線程之后執(zhí)行的方法。
1、Parallel.For中定義局部變量:
從2開始,累加2個(gè),得49.
int[] nums = Enumerable.Range(0, 10).ToArray();
long total = 0;
Parallel.For<long>(0, nums.Length, () => { return 2; },
(j, loop, subtotal) =>//1、每次循環(huán)執(zhí)行的方法
{
subtotal += nums[j];
Console.WriteLine("主體: thread {1}, task {2},結(jié)果:{0}", j+ ":" +nums[j] + "-" + subtotal, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
return subtotal;
},
(x) =>//2、每個(gè)線程執(zhí)行之后執(zhí)行的方法
{
Console.WriteLine(" 最終執(zhí)行:thread {1}, task {2},結(jié)果:{0} ", x, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
Interlocked.Add(ref total, x);
});
Console.WriteLine("The total is {0}", total);
2、Parallel.Each中定義局部變量:
要注意的是,我們必須要使用ForEach<TSource, TLocal>,因?yàn)榈谝粋€(gè)參數(shù)表示的是迭代源的類型,第二個(gè)表示的是線程局部變量的類型,其方法的參數(shù)跟For是差不多的。
public void ForeachThreadLocalTest()
{
int[] nums = Enumerable.Range(0, 1000000).ToArray();
long total = 0;
Parallel.ForEach<int,long>(nums,()=>0,
(member,loopState,subTotal)=>//1、每次循環(huán)執(zhí)行的方法
{
subTotal += member;
return subTotal;
},
(perLocal)=>//2、每個(gè)線程執(zhí)行之后執(zhí)行的方法
Interlocked.Add(ref total,perLocal)
);
Console.WriteLine("The total is {0}", total);
}
六、Break、Stop中斷與停止線程
在并行循環(huán)的委托參數(shù)中提供了一個(gè)ParallelLoopState,該實(shí)例提供了Break和Stop方法來幫我們實(shí)現(xiàn)。
- Break“中斷”:表示完成當(dāng)前線程上當(dāng)前迭代之前的所有線程上的所有迭代,然后退出循環(huán)。(比如并行計(jì)算正在迭代100,那么break后程序還會(huì)迭代所有小于100的。)
- Stop“停止”:表示在方便的情況下盡快停止所有迭代。(比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。)
首先我們可以看到在Parallel.For的一個(gè)重載方法:
public static ParallelLoopResult
For
(int fromInclusive, int toExclusive, Action<int, ParallelLoopState > body)
在委托的最后一個(gè)參數(shù)類型為ParallelLoopState,而ParallelLoopState里面提供給我們兩個(gè)方法:Break、Stop來終止迭代。
private void StopLoop()
{
var Stack = new ConcurrentStack<string>();
Parallel.For(0, 10000, (i, loopState
) =>
{
if (i < 1000)
Stack.Push(i.ToString());
else
{
loopState.Stop();
return;
}
});
Console.WriteLine("Stop Loop Info:\n elements count:{0}", Stack.Count);
}
七、Cancel取消循環(huán)
在并行的循環(huán)中支持通過傳遞ParallelOptions參數(shù)中的CancellationToken進(jìn)行取消循環(huán)的控制,我們可以CancellationTokenSource實(shí)例化之后傳遞給ParallelOptions對(duì)象Cancellation值。下面來看個(gè)示例:
在For循環(huán)的實(shí)現(xiàn)代碼內(nèi)部,Parallel類驗(yàn)證CancellationToken 的結(jié)果,并取消操作。一旦取消操作,F(xiàn)or()方法就拋出個(gè)OperationCanceledException類型的異常,這是本例捕獲的異常。使用 CancellationTokeri可以注冊(cè)取消操作時(shí)的信息。為此,需要調(diào)用Register方法,并傳遞一個(gè)在取消 操作時(shí)調(diào)用的委托。
var cts = new CancellationTokenSource();
cts.Token.Register(() =>Console.WriteLine("*** token canceled"));
// start a task that sends a cancel after 500 ms
new Task(() =>
{
Thread.Sleep(500);
cts.Cancel(false);
}).Start();
try
{
ParallelLoopResult result =
Parallel.For(0, 100,
new <strong>ParallelOptions</strong>()
{
CancellationToken = cts.Token,
},
x =>
{
Console.WriteLine("loop {0} started", x);
int sum = 0;
for (int i = 0; i < 100; i++)
{
Thread.Sleep(2);
sum += i;
}
Console.WriteLine("loop {0} finished", x);
});
}
catch (OperationCanceledException ex)
{
Console.WriteLine(ex.Message);
}
八、Handel Exceptions異常處理
在處理并行循環(huán)的異常的與順序循環(huán)異常的處理是有所不同的,并行循環(huán)里面可能會(huì)一個(gè)異常在多個(gè)循環(huán)中出現(xiàn),或則一個(gè)線程上的異常導(dǎo)致另外一個(gè)線程上也出現(xiàn)異常。比較好的處理方式就是,首先獲取所有的異常最后通過AggregateException來包裝所有的循環(huán)的異常,循環(huán)結(jié)束后進(jìn)行throw。看一段示例代碼:
private void HandleNumbers(int[] numbers)
{
var exceptions = new ConcurrentQueue<Exception>();
Parallel.For(0, numbers.Length, i =>
{
try
{
if (numbers[i] > 10 && numbers[i] < 20)
{
throw new Exception(String.Format("numbers[{0}] betwewn 10 to 20",i));
}
}
catch (Exception e)
{
exceptions.Enqueue(e);
}
});
if (exceptions.Count > 0) throw new AggregateException(exceptions); }
測(cè)試方法:
public void HandleExceptions()
{
var numbers = Enumerable.Range(0, 10000).ToArray();
try
{
this.HandleNumbers(numbers);
}
catch(AggregateException exceptions)
{
foreach (var ex in exceptions.InnerExceptions)
{
Console.WriteLine(ex.Message);
}
}
}
對(duì)上面的方法說明下,在HandleNumbers方法中,就是一個(gè)小的demo如果元素的值出現(xiàn)在10-20之間就拋出異常。在上面我們的處理方法就是:在循環(huán)時(shí)通過隊(duì)列將所有的異常都集中起來,循環(huán)結(jié)束后來拋出一個(gè)AggregateException。
原文鏈接:https://www.cnblogs.com/springsnow/p/9405016.html
相關(guān)推薦
- 2022-09-02 Pytorch-LSTM輸入輸出參數(shù)方式_python
- 2022-12-13 pandas中merge()函數(shù)的用法解讀_python
- 2022-12-24 React重新渲染超詳細(xì)講解_React
- 2022-07-01 Python數(shù)據(jù)可視化繪圖實(shí)例詳解_python
- 2022-10-02 jQuery?編程之jQuery?屬性選擇器_jquery
- 2022-04-19 C語(yǔ)言進(jìn)階可變參數(shù)列表_C 語(yǔ)言
- 2022-07-13 spring-boot2.6.x兼容swagger2問題
- 2022-10-28 Go語(yǔ)言包和包管理詳解_Golang
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支