網站首頁 編程語言 正文
在并行計算中,不可避免的會碰到多個任務共享變量,實例,集合。雖然task自帶了兩個方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()來實現任務串行化,但是這些簡單的方法遠遠不能滿足我們實際的開發需要,從.net 4.0開始,類庫給我們提供了很多的類來幫助我們簡化并行計算中復雜的數據同步問題。
一、隔離執行:不共享數據,讓每個task都有一份自己的數據拷貝。
對數據共享問題處理的方式是“分離執行”,我們通過把每個Task執行完成后的各自計算的值進行最后的匯總,也就是說多個Task之間不存在數據共享了,各自做各自的事,完全分離開來。
1、傳統方式
每個Task執行時不存在數據共享了,每個Task中計算自己值,最后我們匯總每個Task的Result。我們可以通過Task中傳遞的state參數來進行隔離執行:
int Sum = 0;
Task<int>[] tasks = new Task<int>[10];
for (int i = 0; i < 10; i++)
{
tasks[i] = new Task<int>((obj) =>
{
var start = (int)obj;
for (int j = 0; j < 1000; j++)
{
start = start + 1;
}
return start;
}, Sum);
tasks[i].Start();
}
Task.WaitAll(tasks);
for (var i = 0; i < 10; i++)
{
Sum += tasks[i].Result;
}
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
2、ThreadLocal類
在.Net中提供了System.Threading.ThreadLocal來創建分離。
ThreadLocal是一種提供線程本地存儲的類型,它可以給每個線程一個分離的實例,來提供每個線程單獨的數據結果。上面的程序我們可以使用TreadLocal:
int Sum = 0;
Task<int>[] tasks = new Task<int>[10];
var tl = new ThreadLocal<int>();
for (int i = 0; i < 10; i++)
{
tasks[i] = new Task<int>((obj) =>
{
tl.Value = (int)obj;
for (int j = 0; j < 1000; j++)
{
tl.Value++;
}
returntl.Value;
}, Sum);
tasks[i].Start();
}
Task.WaitAll(tasks);
for (var i = 0; i < 10; i++)
{
Sum += tasks[i].Result;
}
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
但是我們要注意的一點TreadLocal是針對每個線程的,不是針對每個Task的。一個Tread中可能有多個Task。
ThreadLocal類舉例:
static ThreadLocal<string> local;
static void Main()
{
//創建ThreadLocal并提供默認值
local = new ThreadLocal<string>(() => "hehe");
//修改TLS的線程
Thread th = new Thread(() =>
{
local.Value = "Mgen";
Display();
});
th.Start();
th.Join();
Display();
}
//顯示TLS中數據值
static void Display()
{
Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value);
}
二、同步類型:通過調整task的執行,有序的執行task。
同步類型是一種用來調度Task訪問臨界區域的一種特殊類型。在.Net 4.0中提供了多種同步類型給我們使用,主要分為:輕量級的、重量級的和等待處理型的,在下面我們會介紹常用的同步處理類型。
常用的同步類型
首先來看看.Net 4.0中常見的幾種同步類型以及處理的相關問題:
同步類型以及解決問題
- lock關鍵字、Montor類、SpinLock類:有序訪問臨界區域
- Interlocked類:數值類型的增加或則減少
- Mutex類:交叉同步
- WaitAll方法:同步多個鎖定(主要是Task之間的調度)
- 申明性的同步(如Synchronization):使類中的所有的方法同步
1、Lock鎖
其實最簡單同步類型的使用辦法就是使用lock關鍵字。在使用lock關鍵字時,首先我們需要創建一個鎖定的object,而且這個object需要所有的task都能訪問,其次能我們需要將我們的臨界區域包含在lock塊中。我們之前例子中代碼可以這樣加上lock:
int Sum = 0;
Task[] tasks = new Task[10];
var obj = new Object(); for (int i = 0; i < 10; i++)
{
tasks[i] = new Task(() =>
{
for (int j = 0; j < 1000; j++)
{
lock (obj)
{ Sum = Sum + 1; }
}
});
tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
其實lock關鍵字是使用Monitor的一種簡短的方式,lock關鍵字自動通過調用Monitor.Enter\Monitor.Exit方法來處理獲得鎖以及釋放鎖。
2、Interlocked 聯鎖
Interlocked通過使用操作系統或則硬件的一些特性提供了一些列高效的靜態的同步方法。其中主要提供了這些方法:Exchange、Add、Increment、CompareExchange四種類型的多個方法的重載。我們將上面的例子中使用Interlocked:
int Sum = 0;
Task[] tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
tasks[i] = new Task(() =>
{
for (int j = 0; j < 1000; j++)
{
Interlocked.Increment(ref Sum);
}
});
tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
3、Mutex互斥體
Mutex也是一個同步類型,在多個線程進行訪問的時候,它只向一個線程授權共享數據的獨立訪問。我們可以通過Mutex中的WaitOne方法來獲取Mutex的所有權,但是同時我們要注意的是,我們在一個線程中多少次調用過WaitOne方法,就需要調用多少次ReleaseMutex方法來釋放Mutex的占有。上面的例子我們通過Mutex這樣實現:
int Sum = 0;
Task[] tasks = new Task[10];
var mutex = new Mutex();
for (int i = 0; i < 10; i++)
{
tasks[i] = new Task(() =>
{
for (int j = 0; j < 1000; j++)
{
bool lockAcquired = mutex.WaitOne(); try
{
Sum++;
}
finally
{
if (lockAcquired) mutex.ReleaseMutex();
}
}
});
tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
三、申明性同步
我們可以通過使用Synchronization 特性來標識一個類,從而使一個類型的字段以及方法都實現同步化。在使用Synchronization 時,我們需要將我們的目標同步的類繼承于System.ContextBoundObject類型。我們來看看之前的例子我們同步標識Synchronization 的實現:
static void Main(string[] args)
{
var sum = new SumClass();
Task[] tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
tasks[i] = new Task(() =>
{
for (int j = 0; j < 1000; j++)
{
sum.Increment();
}
});
tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, sum.GetSum());
}
[Synchronization]
class SumClass : ContextBoundObject
{
private int _Sum;
public void Increment()
{
_Sum++;
}
public int GetSum()
{
return _Sum;
}
}
四、并發集合
當多個線程對某個非線程安全容器并發地進行讀寫操作時,這些操作將導致不可預估的后果或者會導致報錯。為了解決這個問題我們可以使用lock關鍵字或者Monitor類來給容器上鎖。但鎖的引入使得我們的代碼更加復雜,同時也帶來了更多的同步消耗。而.NET Framework 4提供的線程安全且可拓展的并發集合能夠使得我們的并行代碼更加容易編寫,此外,鎖的使用次數的減少也減少了麻煩的死鎖與競爭條件的問題。.NET Framework 4主要提供了如下幾種并發集合:BlockingCollection,ConcurrentBag,ConcurrentDictionary,ConcurrentQueue,ConcurrentStack。這些集合通過使用一種叫做比較并交換(compare and swap, CAS)指令和內存屏障的技術來避免使用重量級的鎖。
在.Net 4.0中提供了很多并發的集合類型來讓我們處理數據同步的集合的問題,這里面包括:
1.ConcurrentQueue:提供并發安全的隊列集合,以先進先出的方式進行操作;
2.ConcurrentStack:提供并發安全的堆棧集合,以先進后出的方式進行操作;
3.ConcurrentBag:提供并發安全的一種無序集合;
4.ConcurrentDictionary:提供并發安全的一種key-value類型的集合。
我們在這里只做ConcurrentQueue的一個嘗試,并發隊列是一種線程安全的隊列集合,我們可以通過Enqueue()進行排隊、TryDequeue()進行出隊列操作:
for (var j = 0; j < 10; j++)
{
var queue = new ConcurrentQueue<int>();
var count = 0;
for (var i = 0; i < 1000; i++)
{
queue.Enqueue(i);
}
var tasks = new Task[10];
for (var i = 0; i < tasks.Length; i++)
{
tasks[i] = new Task(() =>
{
while (queue.Count > 0)
{
int item;
var isDequeue = queue.TryDequeue(out item);
if (isDequeue) Interlocked.Increment(ref count);
}
});
tasks[i].Start();
}
try
{
Task.WaitAll(tasks);
}
catch (AggregateException e)
{
e.Handle((ex) =>
{
Console.WriteLine("Exception Message:{0}", ex.Message);
return true;
});
}
Console.WriteLine("Dequeue items count :{0}", count);
}
五、Barrier(屏障同步)
barrier叫做屏障,就像下圖中的“紅色線”,如果我們的屏障設為4個task就認為已經滿了的話,那么執行中先到的task必須等待后到的task,通知方式也就是barrier.SignalAndWait(),屏障中線程設置操作為new Barrier(4,(i)=>{})。SignalAndWait給我們提供了超時的重載,為了能夠取消后續執行
//四個task執行
static Task[] tasks = new Task[4];
static Barrier barrier = null;
static void Main(string[] args)
{
barrier = new Barrier(tasks.Length, (i) =>
{
Console.WriteLine("**********************************************************");
Console.WriteLine("\n屏障中當前階段編號:{0}\n", i.CurrentPhaseNumber);
Console.WriteLine("**********************************************************");
});
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = Task.Factory.StartNew((obj) =>
{
var single = Convert.ToInt32(obj);
LoadUser(single);
barrier.SignalAndWait();
LoadProduct(single);
barrier.SignalAndWait();
LoadOrder(single);
barrier.SignalAndWait();
}, j);
}
Task.WaitAll(tasks);
Console.WriteLine("指定數據庫中所有數據已經加載完畢!");
Console.Read();
}
static void LoadUser(int num)
{
Console.WriteLine("當前任務:{0}正在加載User部分數據!", num);
}
static void LoadProduct(int num)
{
Console.WriteLine("當前任務:{0}正在加載Product部分數據!", num);
}
static void LoadOrder(int num)
{
Console.WriteLine("當前任務:{0}正在加載Order部分數據!", num);
}
原文鏈接:https://www.cnblogs.com/springsnow/p/9409477.html
相關推薦
- 2023-05-30 Jquery使用原生AJAX方法請求數據_jquery
- 2022-04-12 el-form表單驗證的一些方法總結
- 2022-09-13 C++?static詳解,類中的static用法說明_C 語言
- 2022-09-13 Python運算符的應用超全面詳細教程_python
- 2022-09-21 使用Python遍歷文件夾實現查找指定文件夾_python
- 2022-04-05 h5給input元素type=file的對象賦值報錯
- 2023-07-06 mybatis-plus 3.5.x Cannot resolve method ‘setUseDe
- 2023-01-17 Golang排序和查找使用方法介紹_Golang
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支