網站首頁 編程語言 正文
一、并行概念
1、并行編程
在.NET 4中的并行編程是依賴Task Parallel Library(后面簡稱為TPL) 實現的。在TPL中,最基本的執行單元是task(中文可以理解為"任務"),一個task就代表了你要執行的一個操作。你可以為你所要執行的每一個操作定義一個task,TPL就負責創建線程來執行你所定義的task,并且管理線程。TPL是面向task的,自動的;而傳統的多線程是以人工為導向的。
現在已經進入了多核的時代,我們的程序如何更多的利用好硬件cpu,答案是并行處理。在.net4.0之前我們要開發并行的程序是非常的困難,在.net4.0中,在命名空間System.Threading.Tasks提供了方便的并行開發的類庫。
2、數據并行
數據并行指的是對源集合或數組的元素同時(即,并行)執行相同操作的場景。 在數據并行操作中,對源集合進行分區,以便多個線程能夠同時在不同的網段上操作。
任務并行庫 (TPL) 支持通過?System.Threading.Tasks.Parallel 類實現的數據并行。 此類對 for 循環和 foreach 循環提供了基于方法的并行執行。你為Parallel.For 或 Parallel.ForEach 循環編寫的循環邏輯與編寫連續循環的相似。 無需創建線程或列工作項。 在基本循環中,不需要加鎖。TPL 為你處理所有低級別的工作。
Parallel.For()和Parallel.ForEach()方法多次調用同一個方法,而Parallel.Invoke()方法允許同時調用不同的方法。
二、Parallel.Invoke():并行調用多個任務 。
例1:同時調用2個任務
static void Main(string[] args)
{
var watch = Stopwatch.StartNew();
Parallel.Invoke(Run1, Run2);
watch.Stop();
Console.WriteLine("我是并行開發,總共耗時:{0}", watch.ElapsedMilliseconds)
}
static void Run1()
{
Console.WriteLine("我是任務一,我跑了3s");
Thread.Sleep(3000);
}
static void Run2()
{
Console.WriteLine("我是任務二,我跑了5s");
Thread.Sleep(5000);
}
例2:說明并不是每個任務一個線程。
// 定義一個線程局部變量,返回其線程名
ThreadLocal<string> ThreadName = new ThreadLocal<string>(() =>
{
return "Thread" + Thread.CurrentThread.ManagedThreadId;
});
// 打印出當前線程名的方法。
Action action = () =>
{
// 如果 ThreadName.IsValueCreated 為true,在這個線程上不是第一次運行這個方法。
bool repeat = ThreadName.IsValueCreated;
Console.WriteLine("ThreadName = {0} {1}", ThreadName.Value, repeat ? "(repeat)" : "");
};
// 調用8個方法,你應該會看到一些重復的線程名
Parallel.Invoke(action, action, action, action, action, action, action, action);
ThreadName.Dispose();
三、Parallel.For():?for 循環的并行運算?
我們知道串行代碼中也有一個for,但是那個for并沒有用到多核,而Paraller.for它會在底層根據硬件線程的運行狀況來充分的使用所有的可利用的硬件線程,注意這里的Parallel.for的步行是1。
在For()方法中,前兩個參數定義了循環的開頭和結束。示例從0迭代到9。第3個參數是一個 Action<int>委托。整數參數是循環的迭代次數,該參數被傳遞給Action < int >委托引用的方法。 Parallel.For方法的返回類型是ParallelLoopResult結構,它提供了循環是否結束的信息。
ParallelLoopResult result = Parallel.For(0, 10, i =>
{
Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10);
});
Console.WriteLine(result.IsCompleted);
首先先寫一個普通的循環:
private void NormalFor()
{
for (var i = 0; i < 10000; i++)
{
for (var j = 0; j < 1000; j++)
{
for (var k = 0; k < 100; k++)
{
DoSomething();
}
}
}
}
再看一個并行的For語句:
private void ParallelFor()
{
Parallel.For(0, 10000, i =>
{
for (int j = 0; j < 1000; j++)
{
for (var k = 0; k < 100; k++)
{
DoSomething();
}
}
});
}
上面的例子中,只是將最外層的For語句替換成了Parallel.For,Parallel執行速度可以提高近一倍。
四、Parallel.ForEach():foreach 循環的并行運算?
private void NormalForeach()
{
foreach (var file in GetFiles())
{
DoSomething();
}
}
private void ParallelForeach()
{
Parallel.ForEach(GetFiles(), file => {
DoSomething();
});
}
ForEach的使用跟For使用幾乎是差不多了,只是在對非泛型的Collection進行操作的時候,需要通過Cast方法進行轉換。
ForEach的獨到之處就是可以將數據進行分區,每一個小區內實現串行計算,分區采用Partitioner.Create實現。
for (int j = 1; j < 4; j++)
{
Console.WriteLine("\n第{0}次比較", j);
ConcurrentBag<int> bag = new ConcurrentBag<int>();
var watch = Stopwatch.StartNew();
watch.Start();
for (int i = 0; i < 3000000; i++)
{
bag.Add(i);
}
Console.WriteLine("串行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);
GC.Collect();
bag = new ConcurrentBag<int>();
watch = Stopwatch.StartNew();
watch.Start();
Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
{
for (int m = i.Item1; m < i.Item2; m++)
{
bag.Add(m);
}
});
Console.WriteLine("并行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);
GC.Collect();
}
五、線程局部變量
下面這段代碼多次運行每次的結果都不一樣,因為total變量是公共的,而我們的程序是多個線程的加,而多個線程之間是不能把數據共享的。
public void NormalParallelTest()
{
int[] nums = Enumerable.Range(0, 1000000).ToArray();
long total = 0;
Parallel.For(0,nums.Length,i=>
{
total += nums[i];
});
Console.WriteLine("The total is {0}", total);
}
其實我們需要的是在每個線程中計算出一個和值,然后再進行累加。我們來看看線程局部變量:
泛型方法Parallel.For<T>的原型:
public static ParallelLoopResult
For<TLocal>
(int fromInclusive, int toExclusive,
Func<TLocal> localInit,
Func<int, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal>
localFinally
);
- TLocal:線程變量的類型;第一個、第二個參數就不必多說了,就是起始值跟結束值。
- localInit:每個線程的線程局部變量初始值的設置;
- body:每次循環執行的方法,其中方法的最后一個參數就是線程局部變量;
- localFinally:每個線程之后執行的方法。
1、Parallel.For中定義局部變量:
從2開始,累加2個,得49.
int[] nums = Enumerable.Range(0, 10).ToArray();
long total = 0;
Parallel.For<long>(0, nums.Length, () => { return 2; },
(j, loop, subtotal) =>//1、每次循環執行的方法
{
subtotal += nums[j];
Console.WriteLine("主體: thread {1}, task {2},結果:{0}", j+ ":" +nums[j] + "-" + subtotal, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
return subtotal;
},
(x) =>//2、每個線程執行之后執行的方法
{
Console.WriteLine(" 最終執行:thread {1}, task {2},結果:{0} ", x, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
Interlocked.Add(ref total, x);
});
Console.WriteLine("The total is {0}", total);
2、Parallel.Each中定義局部變量:
要注意的是,我們必須要使用ForEach<TSource, TLocal>,因為第一個參數表示的是迭代源的類型,第二個表示的是線程局部變量的類型,其方法的參數跟For是差不多的。
public void ForeachThreadLocalTest()
{
int[] nums = Enumerable.Range(0, 1000000).ToArray();
long total = 0;
Parallel.ForEach<int,long>(nums,()=>0,
(member,loopState,subTotal)=>//1、每次循環執行的方法
{
subTotal += member;
return subTotal;
},
(perLocal)=>//2、每個線程執行之后執行的方法
Interlocked.Add(ref total,perLocal)
);
Console.WriteLine("The total is {0}", total);
}
六、Break、Stop中斷與停止線程
在并行循環的委托參數中提供了一個ParallelLoopState,該實例提供了Break和Stop方法來幫我們實現。
- Break“中斷”:表示完成當前線程上當前迭代之前的所有線程上的所有迭代,然后退出循環。(比如并行計算正在迭代100,那么break后程序還會迭代所有小于100的。)
- Stop“停止”:表示在方便的情況下盡快停止所有迭代。(比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。)
首先我們可以看到在Parallel.For的一個重載方法:
public static ParallelLoopResult
For
(int fromInclusive, int toExclusive, Action<int, ParallelLoopState > body)
在委托的最后一個參數類型為ParallelLoopState,而ParallelLoopState里面提供給我們兩個方法:Break、Stop來終止迭代。
private void StopLoop()
{
var Stack = new ConcurrentStack<string>();
Parallel.For(0, 10000, (i, loopState
) =>
{
if (i < 1000)
Stack.Push(i.ToString());
else
{
loopState.Stop();
return;
}
});
Console.WriteLine("Stop Loop Info:\n elements count:{0}", Stack.Count);
}
七、Cancel取消循環
在并行的循環中支持通過傳遞ParallelOptions參數中的CancellationToken進行取消循環的控制,我們可以CancellationTokenSource實例化之后傳遞給ParallelOptions對象Cancellation值。下面來看個示例:
在For循環的實現代碼內部,Parallel類驗證CancellationToken 的結果,并取消操作。一旦取消操作,For()方法就拋出個OperationCanceledException類型的異常,這是本例捕獲的異常。使用 CancellationTokeri可以注冊取消操作時的信息。為此,需要調用Register方法,并傳遞一個在取消 操作時調用的委托。
var cts = new CancellationTokenSource();
cts.Token.Register(() =>Console.WriteLine("*** token canceled"));
// start a task that sends a cancel after 500 ms
new Task(() =>
{
Thread.Sleep(500);
cts.Cancel(false);
}).Start();
try
{
ParallelLoopResult result =
Parallel.For(0, 100,
new <strong>ParallelOptions</strong>()
{
CancellationToken = cts.Token,
},
x =>
{
Console.WriteLine("loop {0} started", x);
int sum = 0;
for (int i = 0; i < 100; i++)
{
Thread.Sleep(2);
sum += i;
}
Console.WriteLine("loop {0} finished", x);
});
}
catch (OperationCanceledException ex)
{
Console.WriteLine(ex.Message);
}
八、Handel Exceptions異常處理
在處理并行循環的異常的與順序循環異常的處理是有所不同的,并行循環里面可能會一個異常在多個循環中出現,或則一個線程上的異常導致另外一個線程上也出現異常。比較好的處理方式就是,首先獲取所有的異常最后通過AggregateException來包裝所有的循環的異常,循環結束后進行throw。看一段示例代碼:
private void HandleNumbers(int[] numbers)
{
var exceptions = new ConcurrentQueue<Exception>();
Parallel.For(0, numbers.Length, i =>
{
try
{
if (numbers[i] > 10 && numbers[i] < 20)
{
throw new Exception(String.Format("numbers[{0}] betwewn 10 to 20",i));
}
}
catch (Exception e)
{
exceptions.Enqueue(e);
}
});
if (exceptions.Count > 0) throw new AggregateException(exceptions); }
測試方法:
public void HandleExceptions()
{
var numbers = Enumerable.Range(0, 10000).ToArray();
try
{
this.HandleNumbers(numbers);
}
catch(AggregateException exceptions)
{
foreach (var ex in exceptions.InnerExceptions)
{
Console.WriteLine(ex.Message);
}
}
}
對上面的方法說明下,在HandleNumbers方法中,就是一個小的demo如果元素的值出現在10-20之間就拋出異常。在上面我們的處理方法就是:在循環時通過隊列將所有的異常都集中起來,循環結束后來拋出一個AggregateException。
原文鏈接:https://www.cnblogs.com/springsnow/p/9405016.html
相關推薦
- 2022-03-16 C#中獲取二維數組的行數和列數以及多維數組各個維度的長度_C#教程
- 2022-05-08 ASP.NET中Web?API解決跨域問題_實用技巧
- 2022-05-24 C語言的strcpy函數你了解嗎_C 語言
- 2023-03-29 Android?Flutter中Offstage組件的使用教程詳解_Android
- 2022-12-11 C語言計算1/1+1/2+1/3+…+1/n的問題_C 語言
- 2022-10-25 Go語言實戰學習之流程控制詳解_Golang
- 2023-05-16 golang-gorm自動建表問題_Golang
- 2023-01-23 redis實現多級緩存同步方案詳解_Redis
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支