網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
在并行計(jì)算中,不可避免的會(huì)碰到多個(gè)任務(wù)共享變量,實(shí)例,集合。雖然task自帶了兩個(gè)方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()來(lái)實(shí)現(xiàn)任務(wù)串行化,但是這些簡(jiǎn)單的方法遠(yuǎn)遠(yuǎn)不能滿足我們實(shí)際的開(kāi)發(fā)需要,從.net 4.0開(kāi)始,類庫(kù)給我們提供了很多的類來(lái)幫助我們簡(jiǎn)化并行計(jì)算中復(fù)雜的數(shù)據(jù)同步問(wèn)題。
一、隔離執(zhí)行:不共享數(shù)據(jù),讓每個(gè)task都有一份自己的數(shù)據(jù)拷貝。
對(duì)數(shù)據(jù)共享問(wèn)題處理的方式是“分離執(zhí)行”,我們通過(guò)把每個(gè)Task執(zhí)行完成后的各自計(jì)算的值進(jìn)行最后的匯總,也就是說(shuō)多個(gè)Task之間不存在數(shù)據(jù)共享了,各自做各自的事,完全分離開(kāi)來(lái)。
1、傳統(tǒng)方式
每個(gè)Task執(zhí)行時(shí)不存在數(shù)據(jù)共享了,每個(gè)Task中計(jì)算自己值,最后我們匯總每個(gè)Task的Result。我們可以通過(guò)Task中傳遞的state參數(shù)來(lái)進(jìn)行隔離執(zhí)行:
int Sum = 0;
Task<int>[] tasks = new Task<int>[10];
for (int i = 0; i < 10; i++)
{
tasks[i] = new Task<int>((obj) =>
{
var start = (int)obj;
for (int j = 0; j < 1000; j++)
{
start = start + 1;
}
return start;
}, Sum);
tasks[i].Start();
}
Task.WaitAll(tasks);
for (var i = 0; i < 10; i++)
{
Sum += tasks[i].Result;
}
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
2、ThreadLocal類
在.Net中提供了System.Threading.ThreadLocal來(lái)創(chuàng)建分離。
ThreadLocal是一種提供線程本地存儲(chǔ)的類型,它可以給每個(gè)線程一個(gè)分離的實(shí)例,來(lái)提供每個(gè)線程單獨(dú)的數(shù)據(jù)結(jié)果。上面的程序我們可以使用TreadLocal:
int Sum = 0;
Task<int>[] tasks = new Task<int>[10];
var tl = new ThreadLocal<int>();
for (int i = 0; i < 10; i++)
{
tasks[i] = new Task<int>((obj) =>
{
tl.Value = (int)obj;
for (int j = 0; j < 1000; j++)
{
tl.Value++;
}
returntl.Value;
}, Sum);
tasks[i].Start();
}
Task.WaitAll(tasks);
for (var i = 0; i < 10; i++)
{
Sum += tasks[i].Result;
}
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
但是我們要注意的一點(diǎn)TreadLocal是針對(duì)每個(gè)線程的,不是針對(duì)每個(gè)Task的。一個(gè)Tread中可能有多個(gè)Task。
ThreadLocal類舉例:
static ThreadLocal<string> local;
static void Main()
{
//創(chuàng)建ThreadLocal并提供默認(rèn)值
local = new ThreadLocal<string>(() => "hehe");
//修改TLS的線程
Thread th = new Thread(() =>
{
local.Value = "Mgen";
Display();
});
th.Start();
th.Join();
Display();
}
//顯示TLS中數(shù)據(jù)值
static void Display()
{
Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value);
}
二、同步類型:通過(guò)調(diào)整task的執(zhí)行,有序的執(zhí)行task。
同步類型是一種用來(lái)調(diào)度Task訪問(wèn)臨界區(qū)域的一種特殊類型。在.Net 4.0中提供了多種同步類型給我們使用,主要分為:輕量級(jí)的、重量級(jí)的和等待處理型的,在下面我們會(huì)介紹常用的同步處理類型。
常用的同步類型
首先來(lái)看看.Net 4.0中常見(jiàn)的幾種同步類型以及處理的相關(guān)問(wèn)題:
同步類型以及解決問(wèn)題
- lock關(guān)鍵字、Montor類、SpinLock類:有序訪問(wèn)臨界區(qū)域
- Interlocked類:數(shù)值類型的增加或則減少
- Mutex類:交叉同步
- WaitAll方法:同步多個(gè)鎖定(主要是Task之間的調(diào)度)
- 申明性的同步(如Synchronization):使類中的所有的方法同步
1、Lock鎖
其實(shí)最簡(jiǎn)單同步類型的使用辦法就是使用lock關(guān)鍵字。在使用lock關(guān)鍵字時(shí),首先我們需要?jiǎng)?chuàng)建一個(gè)鎖定的object,而且這個(gè)object需要所有的task都能訪問(wèn),其次能我們需要將我們的臨界區(qū)域包含在lock塊中。我們之前例子中代碼可以這樣加上lock:
int Sum = 0;
Task[] tasks = new Task[10];
var obj = new Object(); for (int i = 0; i < 10; i++)
{
tasks[i] = new Task(() =>
{
for (int j = 0; j < 1000; j++)
{
lock (obj)
{ Sum = Sum + 1; }
}
});
tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
其實(shí)lock關(guān)鍵字是使用Monitor的一種簡(jiǎn)短的方式,lock關(guān)鍵字自動(dòng)通過(guò)調(diào)用Monitor.Enter\Monitor.Exit方法來(lái)處理獲得鎖以及釋放鎖。
2、Interlocked 聯(lián)鎖
Interlocked通過(guò)使用操作系統(tǒng)或則硬件的一些特性提供了一些列高效的靜態(tài)的同步方法。其中主要提供了這些方法:Exchange、Add、Increment、CompareExchange四種類型的多個(gè)方法的重載。我們將上面的例子中使用Interlocked:
int Sum = 0;
Task[] tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
tasks[i] = new Task(() =>
{
for (int j = 0; j < 1000; j++)
{
Interlocked.Increment(ref Sum);
}
});
tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
3、Mutex互斥體
Mutex也是一個(gè)同步類型,在多個(gè)線程進(jìn)行訪問(wèn)的時(shí)候,它只向一個(gè)線程授權(quán)共享數(shù)據(jù)的獨(dú)立訪問(wèn)。我們可以通過(guò)Mutex中的WaitOne方法來(lái)獲取Mutex的所有權(quán),但是同時(shí)我們要注意的是,我們?cè)谝粋€(gè)線程中多少次調(diào)用過(guò)WaitOne方法,就需要調(diào)用多少次ReleaseMutex方法來(lái)釋放Mutex的占有。上面的例子我們通過(guò)Mutex這樣實(shí)現(xiàn):
int Sum = 0;
Task[] tasks = new Task[10];
var mutex = new Mutex();
for (int i = 0; i < 10; i++)
{
tasks[i] = new Task(() =>
{
for (int j = 0; j < 1000; j++)
{
bool lockAcquired = mutex.WaitOne(); try
{
Sum++;
}
finally
{
if (lockAcquired) mutex.ReleaseMutex();
}
}
});
tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
三、申明性同步
我們可以通過(guò)使用Synchronization 特性來(lái)標(biāo)識(shí)一個(gè)類,從而使一個(gè)類型的字段以及方法都實(shí)現(xiàn)同步化。在使用Synchronization 時(shí),我們需要將我們的目標(biāo)同步的類繼承于System.ContextBoundObject類型。我們來(lái)看看之前的例子我們同步標(biāo)識(shí)Synchronization 的實(shí)現(xiàn):
static void Main(string[] args)
{
var sum = new SumClass();
Task[] tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
tasks[i] = new Task(() =>
{
for (int j = 0; j < 1000; j++)
{
sum.Increment();
}
});
tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, sum.GetSum());
}
[Synchronization]
class SumClass : ContextBoundObject
{
private int _Sum;
public void Increment()
{
_Sum++;
}
public int GetSum()
{
return _Sum;
}
}
四、并發(fā)集合
當(dāng)多個(gè)線程對(duì)某個(gè)非線程安全容器并發(fā)地進(jìn)行讀寫(xiě)操作時(shí),這些操作將導(dǎo)致不可預(yù)估的后果或者會(huì)導(dǎo)致報(bào)錯(cuò)。為了解決這個(gè)問(wèn)題我們可以使用lock關(guān)鍵字或者M(jìn)onitor類來(lái)給容器上鎖。但鎖的引入使得我們的代碼更加復(fù)雜,同時(shí)也帶來(lái)了更多的同步消耗。而.NET Framework 4提供的線程安全且可拓展的并發(fā)集合能夠使得我們的并行代碼更加容易編寫(xiě),此外,鎖的使用次數(shù)的減少也減少了麻煩的死鎖與競(jìng)爭(zhēng)條件的問(wèn)題。.NET Framework 4主要提供了如下幾種并發(fā)集合:BlockingCollection,ConcurrentBag,ConcurrentDictionary,ConcurrentQueue,ConcurrentStack。這些集合通過(guò)使用一種叫做比較并交換(compare and swap, CAS)指令和內(nèi)存屏障的技術(shù)來(lái)避免使用重量級(jí)的鎖。
在.Net 4.0中提供了很多并發(fā)的集合類型來(lái)讓我們處理數(shù)據(jù)同步的集合的問(wèn)題,這里面包括:
1.ConcurrentQueue:提供并發(fā)安全的隊(duì)列集合,以先進(jìn)先出的方式進(jìn)行操作;
2.ConcurrentStack:提供并發(fā)安全的堆棧集合,以先進(jìn)后出的方式進(jìn)行操作;
3.ConcurrentBag:提供并發(fā)安全的一種無(wú)序集合;
4.ConcurrentDictionary:提供并發(fā)安全的一種key-value類型的集合。
我們?cè)谶@里只做ConcurrentQueue的一個(gè)嘗試,并發(fā)隊(duì)列是一種線程安全的隊(duì)列集合,我們可以通過(guò)Enqueue()進(jìn)行排隊(duì)、TryDequeue()進(jìn)行出隊(duì)列操作:
for (var j = 0; j < 10; j++)
{
var queue = new ConcurrentQueue<int>();
var count = 0;
for (var i = 0; i < 1000; i++)
{
queue.Enqueue(i);
}
var tasks = new Task[10];
for (var i = 0; i < tasks.Length; i++)
{
tasks[i] = new Task(() =>
{
while (queue.Count > 0)
{
int item;
var isDequeue = queue.TryDequeue(out item);
if (isDequeue) Interlocked.Increment(ref count);
}
});
tasks[i].Start();
}
try
{
Task.WaitAll(tasks);
}
catch (AggregateException e)
{
e.Handle((ex) =>
{
Console.WriteLine("Exception Message:{0}", ex.Message);
return true;
});
}
Console.WriteLine("Dequeue items count :{0}", count);
}
五、Barrier(屏障同步)
barrier叫做屏障,就像下圖中的“紅色線”,如果我們的屏障設(shè)為4個(gè)task就認(rèn)為已經(jīng)滿了的話,那么執(zhí)行中先到的task必須等待后到的task,通知方式也就是barrier.SignalAndWait(),屏障中線程設(shè)置操作為new Barrier(4,(i)=>{})。SignalAndWait給我們提供了超時(shí)的重載,為了能夠取消后續(xù)執(zhí)行
//四個(gè)task執(zhí)行
static Task[] tasks = new Task[4];
static Barrier barrier = null;
static void Main(string[] args)
{
barrier = new Barrier(tasks.Length, (i) =>
{
Console.WriteLine("**********************************************************");
Console.WriteLine("\n屏障中當(dāng)前階段編號(hào):{0}\n", i.CurrentPhaseNumber);
Console.WriteLine("**********************************************************");
});
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = Task.Factory.StartNew((obj) =>
{
var single = Convert.ToInt32(obj);
LoadUser(single);
barrier.SignalAndWait();
LoadProduct(single);
barrier.SignalAndWait();
LoadOrder(single);
barrier.SignalAndWait();
}, j);
}
Task.WaitAll(tasks);
Console.WriteLine("指定數(shù)據(jù)庫(kù)中所有數(shù)據(jù)已經(jīng)加載完畢!");
Console.Read();
}
static void LoadUser(int num)
{
Console.WriteLine("當(dāng)前任務(wù):{0}正在加載User部分?jǐn)?shù)據(jù)!", num);
}
static void LoadProduct(int num)
{
Console.WriteLine("當(dāng)前任務(wù):{0}正在加載Product部分?jǐn)?shù)據(jù)!", num);
}
static void LoadOrder(int num)
{
Console.WriteLine("當(dāng)前任務(wù):{0}正在加載Order部分?jǐn)?shù)據(jù)!", num);
}
原文鏈接:https://www.cnblogs.com/springsnow/p/9409477.html
相關(guān)推薦
- 2022-06-02 ubuntu安裝jupyter并設(shè)置遠(yuǎn)程訪問(wèn)的實(shí)現(xiàn)_python
- 2022-08-13 Linux內(nèi)核中container_of宏定義講解
- 2022-07-16 結(jié)構(gòu)體通過(guò)成員變量獲取主結(jié)構(gòu)體地址(struct)
- 2022-12-25 React不使用requestIdleCallback實(shí)現(xiàn)調(diào)度原理解析_React
- 2022-09-29 數(shù)據(jù)設(shè)計(jì)之權(quán)限的實(shí)現(xiàn)_數(shù)據(jù)庫(kù)其它
- 2022-10-23 React路由中的redux和redux知識(shí)點(diǎn)拓展_React
- 2022-03-17 docker安裝rocketMQ和安裝過(guò)程中出現(xiàn)問(wèn)題的解決_docker
- 2022-05-21 redis數(shù)據(jù)一致性的實(shí)現(xiàn)示例_Redis
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過(guò)濾器
- Spring Security概述快速入門(mén)
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支