ThreadLock.cs 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Globalization;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading;
  8. namespace HslCommunication.Core
  9. {
  10. #region 多线程同步协调类
  11. /// <summary>
  12. /// 线程的协调逻辑状态
  13. /// </summary>
  14. internal enum CoordinationStatus
  15. {
  16. /// <summary>
  17. /// 所有项完成
  18. /// </summary>
  19. AllDone,
  20. /// <summary>
  21. /// 超时
  22. /// </summary>
  23. Timeout,
  24. /// <summary>
  25. /// 任务取消
  26. /// </summary>
  27. Cancel
  28. }
  29. /// <summary>
  30. /// 一个线程协调逻辑类,详细参考书籍《CLR Via C#》page:681
  31. /// 这个类可惜没有报告进度的功能
  32. /// </summary>
  33. internal sealed class AsyncCoordinator
  34. {
  35. private int m_opCount = 1;
  36. private int m_statusReported = 0;
  37. private Action<CoordinationStatus> m_callback;
  38. private System.Threading.Timer m_timer;
  39. /// <summary>
  40. /// 每次的操作任务开始前必须调用该方法
  41. /// </summary>
  42. /// <param name="opsToAdd"></param>
  43. public void AboutToBegin(int opsToAdd = 1) => Interlocked.Add(ref m_opCount, opsToAdd);
  44. /// <summary>
  45. /// 在一次任务处理好操作之后,必须调用该方法
  46. /// </summary>
  47. public void JustEnded()
  48. {
  49. if (Interlocked.Decrement(ref m_opCount) == 0)
  50. {
  51. ReportStatus(CoordinationStatus.AllDone);
  52. }
  53. }
  54. /// <summary>
  55. /// 该方法必须在发起所有的操作之后调用
  56. /// </summary>
  57. /// <param name="callback">回调方法</param>
  58. /// <param name="timeout">超时时间</param>
  59. public void AllBegun(Action<CoordinationStatus> callback, int timeout = Timeout.Infinite)
  60. {
  61. m_callback = callback;
  62. if (timeout != Timeout.Infinite)
  63. {
  64. m_timer = new System.Threading.Timer(TimeExpired, null, timeout, Timeout.Infinite);
  65. }
  66. JustEnded();//修正一开始设置的初始值
  67. }
  68. /// <summary>
  69. /// 超时的方法
  70. /// </summary>
  71. /// <param name="o"></param>
  72. private void TimeExpired(object o) => ReportStatus(CoordinationStatus.Timeout);
  73. /// <summary>
  74. /// 取消任务的执行
  75. /// </summary>
  76. public void Cancel() => ReportStatus(CoordinationStatus.Cancel);
  77. /// <summary>
  78. /// 生成一次报告
  79. /// </summary>
  80. /// <param name="status">报告的状态</param>
  81. private void ReportStatus(CoordinationStatus status)
  82. {
  83. //只报告一次的限制
  84. if (Interlocked.Exchange(ref m_statusReported, 1) == 0)
  85. {
  86. m_callback(status);
  87. }
  88. }
  89. /// <summary>
  90. /// 乐观的并发方法模型,具体参照《CLR Via C#》page:686
  91. /// </summary>
  92. /// <param name="target">唯一的目标数据</param>
  93. /// <param name="change">修改数据的算法</param>
  94. /// <returns></returns>
  95. public static int Maxinum(ref int target, Func<int, int> change)
  96. {
  97. int currentVal = target, startVal, desiredVal;
  98. do
  99. {
  100. startVal = currentVal;//设置值
  101. //以下为业务逻辑,允许实现非常复杂的设置
  102. desiredVal = change(startVal);
  103. currentVal = Interlocked.CompareExchange(ref target, desiredVal, startVal);
  104. }
  105. while (startVal != currentVal);//更改失败就强制更新
  106. return desiredVal;
  107. }
  108. }
  109. #endregion
  110. #region 乐观并发模型的协调类
  111. /// <summary>
  112. /// 一个用于高性能,乐观并发模型控制操作的类,允许一个方法(隔离方法)的安全单次执行
  113. /// </summary>
  114. public sealed class HslAsyncCoordinator
  115. {
  116. /// <summary>
  117. /// 实例化一个对象,需要传入隔离执行的方法
  118. /// </summary>
  119. /// <param name="operater">隔离执行的方法</param>
  120. public HslAsyncCoordinator(Action operater)
  121. {
  122. action = operater;
  123. }
  124. /// <summary>
  125. /// 操作状态,0是未操作,1是操作中
  126. /// </summary>
  127. private int OperaterStatus = 0;
  128. /// <summary>
  129. /// 需要操作的次数
  130. /// </summary>
  131. private long Target = 0;
  132. /// <summary>
  133. /// 启动线程池执行隔离方法
  134. /// </summary>
  135. public void StartOperaterInfomation()
  136. {
  137. Interlocked.Increment(ref Target);
  138. if (Interlocked.CompareExchange(ref OperaterStatus, 1, 0) == 0)
  139. {
  140. //启动保存
  141. ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadPoolOperater), null);
  142. }
  143. }
  144. private Action action = null;
  145. private void ThreadPoolOperater(object obj)
  146. {
  147. long currentVal = Target, startVal;
  148. long desiredVal = 0;
  149. do
  150. {
  151. startVal = currentVal;//设置值
  152. //以下为业务逻辑,允许实现非常复杂的设置
  153. action?.Invoke();
  154. //需要清零值的时候必须用下面的原子操作
  155. currentVal = Interlocked.CompareExchange(ref Target, desiredVal, startVal);
  156. }
  157. while (startVal != currentVal);//更改失败就强制更新
  158. //退出保存状态
  159. Interlocked.Exchange(ref OperaterStatus, 0);
  160. //最终状态确认
  161. if (Target != desiredVal) StartOperaterInfomation();
  162. }
  163. }
  164. #endregion
  165. #region 高性能的读写锁
  166. // 一个高性能的读写锁,由《CLR Via C#》作者Jeffrey Richter提供
  167. /// <summary>
  168. /// 一个高性能的读写锁,支持写锁定,读灵活,读时写锁定,写时读锁定
  169. /// </summary>
  170. public sealed class HslReadWriteLock : IDisposable
  171. {
  172. #region Lock State Management
  173. #if false
  174. private struct BitField {
  175. private Int32 m_mask, m_1, m_startBit;
  176. public BitField(Int32 startBit, Int32 numBits) {
  177. m_startBit = startBit;
  178. m_mask = unchecked((Int32)((1 << numBits) - 1) << startBit);
  179. m_1 = unchecked((Int32)1 << startBit);
  180. }
  181. public void Increment(ref Int32 value) { value += m_1; }
  182. public void Decrement(ref Int32 value) { value -= m_1; }
  183. public void Decrement(ref Int32 value, Int32 amount) { value -= m_1 * amount; }
  184. public Int32 Get(Int32 value) { return (value & m_mask) >> m_startBit; }
  185. public Int32 Set(Int32 value, Int32 fieldValue) { return (value & ~m_mask) | (fieldValue << m_startBit); }
  186. }
  187. private static BitField s_state = new BitField(0, 3);
  188. private static BitField s_readersReading = new BitField(3, 9);
  189. private static BitField s_readersWaiting = new BitField(12, 9);
  190. private static BitField s_writersWaiting = new BitField(21, 9);
  191. private static OneManyLockStates State(Int32 value) { return (OneManyLockStates)s_state.Get(value); }
  192. private static void State(ref Int32 ls, OneManyLockStates newState) {
  193. ls = s_state.Set(ls, (Int32)newState);
  194. }
  195. #endif
  196. private enum OneManyLockStates
  197. {
  198. Free = 0x00000000,
  199. OwnedByWriter = 0x00000001,
  200. OwnedByReaders = 0x00000002,
  201. OwnedByReadersAndWriterPending = 0x00000003,
  202. ReservedForWriter = 0x00000004,
  203. }
  204. private const Int32 c_lsStateStartBit = 0;
  205. private const Int32 c_lsReadersReadingStartBit = 3;
  206. private const Int32 c_lsReadersWaitingStartBit = 12;
  207. private const Int32 c_lsWritersWaitingStartBit = 21;
  208. // Mask = unchecked((Int32) ((1 << numBits) - 1) << startBit);
  209. private const Int32 c_lsStateMask = unchecked((Int32)((1 << 3) - 1) << c_lsStateStartBit);
  210. private const Int32 c_lsReadersReadingMask = unchecked((Int32)((1 << 9) - 1) << c_lsReadersReadingStartBit);
  211. private const Int32 c_lsReadersWaitingMask = unchecked((Int32)((1 << 9) - 1) << c_lsReadersWaitingStartBit);
  212. private const Int32 c_lsWritersWaitingMask = unchecked((Int32)((1 << 9) - 1) << c_lsWritersWaitingStartBit);
  213. private const Int32 c_lsAnyWaitingMask = c_lsReadersWaitingMask | c_lsWritersWaitingMask;
  214. // FirstBit = unchecked((Int32) 1 << startBit);
  215. private const Int32 c_ls1ReaderReading = unchecked((Int32)1 << c_lsReadersReadingStartBit);
  216. private const Int32 c_ls1ReaderWaiting = unchecked((Int32)1 << c_lsReadersWaitingStartBit);
  217. private const Int32 c_ls1WriterWaiting = unchecked((Int32)1 << c_lsWritersWaitingStartBit);
  218. private static OneManyLockStates State(Int32 ls) { return (OneManyLockStates)(ls & c_lsStateMask); }
  219. private static void SetState(ref Int32 ls, OneManyLockStates newState)
  220. {
  221. ls = (ls & ~c_lsStateMask) | ((Int32)newState);
  222. }
  223. private static Int32 NumReadersReading(Int32 ls) { return (ls & c_lsReadersReadingMask) >> c_lsReadersReadingStartBit; }
  224. private static void AddReadersReading(ref Int32 ls, Int32 amount) { ls += (c_ls1ReaderReading * amount); }
  225. private static Int32 NumReadersWaiting(Int32 ls) { return (ls & c_lsReadersWaitingMask) >> c_lsReadersWaitingStartBit; }
  226. private static void AddReadersWaiting(ref Int32 ls, Int32 amount) { ls += (c_ls1ReaderWaiting * amount); }
  227. private static Int32 NumWritersWaiting(Int32 ls) { return (ls & c_lsWritersWaitingMask) >> c_lsWritersWaitingStartBit; }
  228. private static void AddWritersWaiting(ref Int32 ls, Int32 amount) { ls += (c_ls1WriterWaiting * amount); }
  229. private static Boolean AnyWaiters(Int32 ls) { return (ls & c_lsAnyWaitingMask) != 0; }
  230. private static String DebugState(Int32 ls)
  231. {
  232. return String.Format(CultureInfo.InvariantCulture,
  233. "State={0}, RR={1}, RW={2}, WW={3}", State(ls),
  234. NumReadersReading(ls), NumReadersWaiting(ls), NumWritersWaiting(ls));
  235. }
  236. /// <summary>
  237. /// 返回本对象的描述字符串
  238. /// </summary>
  239. /// <returns>对象的描述字符串</returns>
  240. public override String ToString() { return DebugState(m_LockState); }
  241. #endregion
  242. #region State Fields
  243. private Int32 m_LockState = (Int32)OneManyLockStates.Free;
  244. // Readers wait on this if a writer owns the lock
  245. private Semaphore m_ReadersLock = new Semaphore(0, Int32.MaxValue);
  246. // Writers wait on this if a reader owns the lock
  247. private Semaphore m_WritersLock = new Semaphore(0, Int32.MaxValue);
  248. #endregion
  249. #region Construction
  250. /// <summary>
  251. /// 实例化一个读写锁的对象
  252. /// </summary>
  253. public HslReadWriteLock() : base() { }
  254. #endregion
  255. #region IDisposable Support
  256. private bool disposedValue = false; // 要检测冗余调用
  257. void Dispose(bool disposing)
  258. {
  259. if (!disposedValue)
  260. {
  261. if (disposing)
  262. {
  263. // TODO: 释放托管状态(托管对象)。
  264. }
  265. // TODO: 释放未托管的资源(未托管的对象)并在以下内容中替代终结器。
  266. // TODO: 将大型字段设置为 null。
  267. m_WritersLock.Close(); m_WritersLock = null;
  268. m_ReadersLock.Close(); m_ReadersLock = null;
  269. disposedValue = true;
  270. }
  271. }
  272. // TODO: 仅当以上 Dispose(bool disposing) 拥有用于释放未托管资源的代码时才替代终结器。
  273. // ~HslReadWriteLock() {
  274. // // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
  275. // Dispose(false);
  276. // }
  277. // 添加此代码以正确实现可处置模式。
  278. /// <summary>
  279. /// 释放资源
  280. /// </summary>
  281. public void Dispose()
  282. {
  283. // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
  284. Dispose(true);
  285. // TODO: 如果在以上内容中替代了终结器,则取消注释以下行。
  286. // GC.SuppressFinalize(this);
  287. }
  288. #endregion
  289. #region Writer members
  290. private Boolean m_exclusive;
  291. /// <summary>
  292. /// 根据读写情况请求锁
  293. /// </summary>
  294. /// <param name="exclusive">True为写请求,False为读请求</param>
  295. public void Enter(Boolean exclusive)
  296. {
  297. if (exclusive)
  298. {
  299. while (WaitToWrite(ref m_LockState)) m_WritersLock.WaitOne();
  300. }
  301. else
  302. {
  303. while (WaitToRead(ref m_LockState)) m_ReadersLock.WaitOne();
  304. }
  305. m_exclusive = exclusive;
  306. }
  307. private static Boolean WaitToWrite(ref Int32 target)
  308. {
  309. Int32 start, current = target;
  310. Boolean wait;
  311. do
  312. {
  313. start = current;
  314. Int32 desired = start;
  315. wait = false;
  316. switch (State(desired))
  317. {
  318. case OneManyLockStates.Free: // If Free -> OBW, return
  319. case OneManyLockStates.ReservedForWriter: // If RFW -> OBW, return
  320. SetState(ref desired, OneManyLockStates.OwnedByWriter);
  321. break;
  322. case OneManyLockStates.OwnedByWriter: // If OBW -> WW++, wait & loop around
  323. AddWritersWaiting(ref desired, 1);
  324. wait = true;
  325. break;
  326. case OneManyLockStates.OwnedByReaders: // If OBR or OBRAWP -> OBRAWP, WW++, wait, loop around
  327. case OneManyLockStates.OwnedByReadersAndWriterPending:
  328. SetState(ref desired, OneManyLockStates.OwnedByReadersAndWriterPending);
  329. AddWritersWaiting(ref desired, 1);
  330. wait = true;
  331. break;
  332. default:
  333. Debug.Assert(false, "Invalid Lock state");
  334. break;
  335. }
  336. current = Interlocked.CompareExchange(ref target, desired, start);
  337. } while (start != current);
  338. return wait;
  339. }
  340. /// <summary>
  341. /// 释放锁,将根据锁状态自动区分读写锁
  342. /// </summary>
  343. public void Leave()
  344. {
  345. Int32 wakeup;
  346. if (m_exclusive)
  347. {
  348. Debug.Assert((State(m_LockState) == OneManyLockStates.OwnedByWriter) && (NumReadersReading(m_LockState) == 0));
  349. // Pre-condition: Lock's state must be OBW (not Free/OBR/OBRAWP/RFW)
  350. // Post-condition: Lock's state must become Free or RFW (the lock is never passed)
  351. // Phase 1: Release the lock
  352. wakeup = DoneWriting(ref m_LockState);
  353. }
  354. else
  355. {
  356. var s = State(m_LockState);
  357. Debug.Assert((State(m_LockState) == OneManyLockStates.OwnedByReaders) || (State(m_LockState) == OneManyLockStates.OwnedByReadersAndWriterPending));
  358. // Pre-condition: Lock's state must be OBR/OBRAWP (not Free/OBW/RFW)
  359. // Post-condition: Lock's state must become unchanged, Free or RFW (the lock is never passed)
  360. // Phase 1: Release the lock
  361. wakeup = DoneReading(ref m_LockState);
  362. }
  363. // Phase 2: Possibly wake waiters
  364. if (wakeup == -1) m_WritersLock.Release();
  365. else if (wakeup > 0) m_ReadersLock.Release(wakeup);
  366. }
  367. // Returns -1 to wake a writer, +# to wake # readers, or 0 to wake no one
  368. private static Int32 DoneWriting(ref Int32 target)
  369. {
  370. Int32 start, current = target;
  371. Int32 wakeup = 0;
  372. do
  373. {
  374. Int32 desired = (start = current);
  375. // We do this test first because it is commonly true &
  376. // we avoid the other tests improving performance
  377. if (!AnyWaiters(desired))
  378. {
  379. SetState(ref desired, OneManyLockStates.Free);
  380. wakeup = 0;
  381. }
  382. else if (NumWritersWaiting(desired) > 0)
  383. {
  384. SetState(ref desired, OneManyLockStates.ReservedForWriter);
  385. AddWritersWaiting(ref desired, -1);
  386. wakeup = -1;
  387. }
  388. else
  389. {
  390. wakeup = NumReadersWaiting(desired);
  391. Debug.Assert(wakeup > 0);
  392. SetState(ref desired, OneManyLockStates.OwnedByReaders);
  393. AddReadersWaiting(ref desired, -wakeup);
  394. // RW=0, RR=0 (incremented as readers enter)
  395. }
  396. current = Interlocked.CompareExchange(ref target, desired, start);
  397. } while (start != current);
  398. return wakeup;
  399. }
  400. #endregion
  401. #region Reader members
  402. private static Boolean WaitToRead(ref Int32 target)
  403. {
  404. Int32 start, current = target;
  405. Boolean wait;
  406. do
  407. {
  408. Int32 desired = (start = current);
  409. wait = false;
  410. switch (State(desired))
  411. {
  412. case OneManyLockStates.Free: // If Free->OBR, RR=1, return
  413. SetState(ref desired, OneManyLockStates.OwnedByReaders);
  414. AddReadersReading(ref desired, 1);
  415. break;
  416. case OneManyLockStates.OwnedByReaders: // If OBR -> RR++, return
  417. AddReadersReading(ref desired, 1);
  418. break;
  419. case OneManyLockStates.OwnedByWriter: // If OBW/OBRAWP/RFW -> RW++, wait, loop around
  420. case OneManyLockStates.OwnedByReadersAndWriterPending:
  421. case OneManyLockStates.ReservedForWriter:
  422. AddReadersWaiting(ref desired, 1);
  423. wait = true;
  424. break;
  425. default:
  426. Debug.Assert(false, "Invalid Lock state");
  427. break;
  428. }
  429. current = Interlocked.CompareExchange(ref target, desired, start);
  430. } while (start != current);
  431. return wait;
  432. }
  433. // Returns -1 to wake a writer, +# to wake # readers, or 0 to wake no one
  434. private static Int32 DoneReading(ref Int32 target)
  435. {
  436. Int32 start, current = target;
  437. Int32 wakeup;
  438. do
  439. {
  440. Int32 desired = (start = current);
  441. AddReadersReading(ref desired, -1); // RR--
  442. if (NumReadersReading(desired) > 0)
  443. {
  444. // RR>0, no state change & no threads to wake
  445. wakeup = 0;
  446. }
  447. else if (!AnyWaiters(desired))
  448. {
  449. SetState(ref desired, OneManyLockStates.Free);
  450. wakeup = 0;
  451. }
  452. else
  453. {
  454. Debug.Assert(NumWritersWaiting(desired) > 0);
  455. SetState(ref desired, OneManyLockStates.ReservedForWriter);
  456. AddWritersWaiting(ref desired, -1);
  457. wakeup = -1; // Wake 1 writer
  458. }
  459. current = Interlocked.CompareExchange(ref target, desired, start);
  460. } while (start != current);
  461. return wakeup;
  462. }
  463. #endregion
  464. }
  465. #endregion
  466. #region 简单的混合锁
  467. /// <summary>
  468. /// 一个简单的混合线程同步锁,采用了基元用户加基元内核同步构造实现
  469. /// </summary>
  470. public sealed class SimpleHybirdLock : IDisposable
  471. {
  472. #region IDisposable Support
  473. private bool disposedValue = false; // 要检测冗余调用
  474. void Dispose(bool disposing)
  475. {
  476. if (!disposedValue)
  477. {
  478. if (disposing)
  479. {
  480. // TODO: 释放托管状态(托管对象)。
  481. }
  482. // TODO: 释放未托管的资源(未托管的对象)并在以下内容中替代终结器。
  483. // TODO: 将大型字段设置为 null。
  484. m_waiterLock.Close();
  485. disposedValue = true;
  486. }
  487. }
  488. // TODO: 仅当以上 Dispose(bool disposing) 拥有用于释放未托管资源的代码时才替代终结器。
  489. // ~SimpleHybirdLock() {
  490. // // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
  491. // Dispose(false);
  492. // }
  493. // 添加此代码以正确实现可处置模式。
  494. /// <summary>
  495. /// 释放资源
  496. /// </summary>
  497. public void Dispose()
  498. {
  499. // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
  500. Dispose(true);
  501. // TODO: 如果在以上内容中替代了终结器,则取消注释以下行。
  502. // GC.SuppressFinalize(this);
  503. }
  504. #endregion
  505. /// <summary>
  506. /// 基元用户模式构造同步锁
  507. /// </summary>
  508. private Int32 m_waiters = 0;
  509. /// <summary>
  510. /// 基元内核模式构造同步锁
  511. /// </summary>
  512. private AutoResetEvent m_waiterLock = new AutoResetEvent(false);
  513. /// <summary>
  514. /// 获取锁
  515. /// </summary>
  516. public void Enter()
  517. {
  518. if (Interlocked.Increment(ref m_waiters) == 1) return;//用户锁可以使用的时候,直接返回,第一次调用时发生
  519. //当发生锁竞争时,使用内核同步构造锁
  520. m_waiterLock.WaitOne();
  521. }
  522. /// <summary>
  523. /// 离开锁
  524. /// </summary>
  525. public void Leave()
  526. {
  527. if (Interlocked.Decrement(ref m_waiters) == 0) return;//没有可用的锁的时候
  528. m_waiterLock.Set();
  529. }
  530. }
  531. #endregion
  532. #region 多线程并发处理数据的类
  533. /*******************************************************************************
  534. *
  535. * 创建日期:2017年7月6日 08:30:56
  536. *
  537. *
  538. *******************************************************************************/
  539. /// <summary>
  540. /// 一个用于多线程并发处理数据的模型类,适用于处理数据量非常庞大的情况
  541. /// </summary>
  542. /// <typeparam name="T">等待处理的数据类型</typeparam>
  543. public sealed class SoftMultiTask<T>
  544. {
  545. /// <summary>
  546. /// 实例化一个数据处理对象
  547. /// </summary>
  548. /// <param name="dataList">数据处理列表</param>
  549. /// <param name="operater">数据操作方法,应该是相对耗时的任务</param>
  550. /// <param name="threadCount">需要使用的线程数</param>
  551. public SoftMultiTask(T[] dataList, Func<T, bool> operater, int threadCount = 10)
  552. {
  553. if (dataList == null)
  554. {
  555. throw new ArgumentNullException("dataList");
  556. }
  557. if (operater == null)
  558. {
  559. throw new ArgumentNullException("operater");
  560. }
  561. m_operater = operater;
  562. m_dataList = dataList;
  563. if (threadCount < 1) throw new ArgumentException("线程数不能小于1", "threadCount");
  564. m_threadCount = threadCount;
  565. //增加任务处理
  566. Interlocked.Add(ref m_opCount, dataList.Length);
  567. //增加线程处理
  568. Interlocked.Add(ref m_opThreadCount, threadCount);
  569. }
  570. /// <summary>
  571. /// 操作总数,判定操作是否完成
  572. /// </summary>
  573. private Int32 m_opCount = 0;
  574. /// <summary>
  575. /// 判断是否所有的线程是否处理完成
  576. /// </summary>
  577. private Int32 m_opThreadCount = 1;
  578. /// <summary>
  579. /// 准备启动的处理数据的线程数量
  580. /// </summary>
  581. private Int32 m_threadCount = 10;
  582. /// <summary>
  583. /// 指示多线程处理是否在运行中,防止冗余调用
  584. /// </summary>
  585. private Int32 m_runStatus = 0;
  586. /// <summary>
  587. /// 列表数据
  588. /// </summary>
  589. private T[] m_dataList = null;
  590. /// <summary>
  591. /// 需要操作的方法
  592. /// </summary>
  593. private Func<T, bool> m_operater = null;
  594. /// <summary>
  595. /// 一个双参数委托
  596. /// </summary>
  597. /// <param name="item"></param>
  598. /// <param name="ex"></param>
  599. public delegate void MultiInfo(T item, Exception ex);
  600. /// <summary>
  601. /// 用于报告进度的委托,当finish等于count时,任务完成
  602. /// </summary>
  603. /// <param name="finish">已完成操作数量</param>
  604. /// <param name="count">总数量</param>
  605. /// <param name="success">成功数量</param>
  606. /// <param name="failed">失败数量</param>
  607. public delegate void MultiInfoTwo(int finish, int count, int success, int failed);
  608. /// <summary>
  609. /// 异常发生时事件
  610. /// </summary>
  611. public event MultiInfo OnExceptionOccur;
  612. /// <summary>
  613. /// 报告处理进度时发生
  614. /// </summary>
  615. public event MultiInfoTwo OnReportProgress;
  616. /// <summary>
  617. /// 已处理完成数量,无论是否异常
  618. /// </summary>
  619. private int m_finishCount = 0;
  620. /// <summary>
  621. /// 处理完成并实现操作数量
  622. /// </summary>
  623. private int m_successCount = 0;
  624. /// <summary>
  625. /// 处理过程中异常数量
  626. /// </summary>
  627. private int m_failedCount = 0;
  628. /// <summary>
  629. /// 用于触发事件的混合线程锁
  630. /// </summary>
  631. private SimpleHybirdLock HybirdLock = new SimpleHybirdLock();
  632. /// <summary>
  633. /// 指示处理状态是否为暂停状态
  634. /// </summary>
  635. private bool m_isRunningStop = false;
  636. /// <summary>
  637. /// 指示系统是否需要强制退出
  638. /// </summary>
  639. private bool m_isQuit = false;
  640. /// <summary>
  641. /// 在发生错误的时候是否强制退出后续的操作
  642. /// </summary>
  643. private bool m_isQuitAfterException = false;
  644. #region 公开的接口
  645. /// <summary>
  646. /// 启动多线程进行数据处理
  647. /// </summary>
  648. public void StartOperater()
  649. {
  650. if (Interlocked.CompareExchange(ref m_runStatus, 0, 1) == 0)
  651. {
  652. for (int i = 0; i < m_threadCount; i++)
  653. {
  654. Thread thread = new Thread(new ThreadStart(ThreadBackground));
  655. thread.IsBackground = true;
  656. thread.Start();
  657. }
  658. JustEnded();
  659. }
  660. }
  661. /// <summary>
  662. /// 暂停当前的操作
  663. /// </summary>
  664. public void StopOperater()
  665. {
  666. if (m_runStatus == 1)
  667. {
  668. m_isRunningStop = true;
  669. }
  670. }
  671. /// <summary>
  672. /// 恢复暂停的操作
  673. /// </summary>
  674. public void ResumeOperater()
  675. {
  676. m_isRunningStop = false;
  677. }
  678. /// <summary>
  679. /// 直接手动强制结束操作
  680. /// </summary>
  681. public void EndedOperater()
  682. {
  683. if (m_runStatus == 1)
  684. {
  685. m_isQuit = true;
  686. }
  687. }
  688. /// <summary>
  689. /// 在发生错误的时候是否强制退出后续的操作
  690. /// </summary>
  691. public bool IsQuitAfterException
  692. {
  693. get
  694. {
  695. return m_isQuitAfterException;
  696. }
  697. set
  698. {
  699. m_isQuitAfterException = value;
  700. }
  701. }
  702. #endregion
  703. private void ThreadBackground()
  704. {
  705. while (true)
  706. {
  707. // 检测是否处于暂停的状态
  708. while (m_isRunningStop)
  709. {
  710. ;
  711. }
  712. // 提取处理的任务
  713. int index = Interlocked.Decrement(ref m_opCount);
  714. if (index < 0)
  715. {
  716. // 任务完成
  717. break;
  718. }
  719. else
  720. {
  721. T item = m_dataList[index];
  722. bool result = false;
  723. bool isException = false;
  724. try
  725. {
  726. if (!m_isQuit) result = m_operater(item);
  727. }
  728. catch (Exception ex)
  729. {
  730. isException = true;
  731. // 此处必须吞噬所有异常
  732. OnExceptionOccur?.Invoke(item, ex);
  733. // 是否需要退出处理
  734. if (m_isQuitAfterException) EndedOperater();
  735. }
  736. finally
  737. {
  738. // 保证了报告进度时数据的正确性
  739. HybirdLock.Enter();
  740. if (result) m_successCount++;
  741. if (isException) m_failedCount++;
  742. m_finishCount++;
  743. OnReportProgress?.Invoke(m_finishCount, m_dataList.Length, m_successCount, m_failedCount);
  744. HybirdLock.Leave();
  745. }
  746. }
  747. }
  748. JustEnded();
  749. }
  750. private void JustEnded()
  751. {
  752. if (Interlocked.Decrement(ref m_opThreadCount) == 0)
  753. {
  754. // 数据初始化
  755. m_finishCount = 0;
  756. m_failedCount = 0;
  757. m_successCount = 0;
  758. Interlocked.Exchange(ref m_opCount, m_dataList.Length);
  759. Interlocked.Exchange(ref m_opThreadCount, m_threadCount + 1);
  760. // 状态复位
  761. Interlocked.Exchange(ref m_runStatus, 0);
  762. m_isRunningStop = false;
  763. m_isQuit = false;
  764. }
  765. }
  766. }
  767. #endregion
  768. #region 弃用的双检锁
  769. /// <summary>
  770. /// 一个双检锁的示例,适合一些占内存的静态数据对象,获取的时候才实例化真正的对象
  771. /// </summary>
  772. [Obsolete(".NET 4.5才拥有Volatile类,目前该类还是不安全的")]
  773. internal sealed class Singleton
  774. {
  775. private static object m_lock = new object();
  776. private static Singleton SValue = null;
  777. public Singleton()
  778. {
  779. }
  780. public static Singleton GetSingleton()
  781. {
  782. if (SValue != null) return SValue;
  783. Monitor.Enter(m_lock);
  784. if (SValue == null)
  785. {
  786. //Singleton temp = new Singleton();
  787. //Volatile.Write(ref SValue, temp);
  788. //上述编译不通过,简单的使用下述过程
  789. SValue = new Singleton();
  790. }
  791. Monitor.Exit(m_lock);
  792. return SValue;
  793. }
  794. }
  795. #endregion
  796. #region 弃用的高级混合锁
  797. /// <summary>
  798. /// 一个高级的混合线程同步锁,采用了基元用户加基元内核同步构造实现,并包含了自旋和线程所有权
  799. /// </summary>
  800. [Obsolete("该类中间的一个类型才.NET4.0及以上才实现")]
  801. internal sealed class AdvancedHybirdLock : IDisposable
  802. {
  803. #region IDisposable Support
  804. private bool disposedValue = false; // 要检测冗余调用
  805. void Dispose(bool disposing)
  806. {
  807. if (!disposedValue)
  808. {
  809. if (disposing)
  810. {
  811. // TODO: 释放托管状态(托管对象)。
  812. }
  813. // TODO: 释放未托管的资源(未托管的对象)并在以下内容中替代终结器。
  814. // TODO: 将大型字段设置为 null。
  815. m_waiterLock.Close();
  816. disposedValue = true;
  817. }
  818. }
  819. // TODO: 仅当以上 Dispose(bool disposing) 拥有用于释放未托管资源的代码时才替代终结器。
  820. // ~SimpleHybirdLock() {
  821. // // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
  822. // Dispose(false);
  823. // }
  824. // 添加此代码以正确实现可处置模式。
  825. /// <summary>
  826. /// 释放资源
  827. /// </summary>
  828. public void Dispose()
  829. {
  830. // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
  831. Dispose(true);
  832. // TODO: 如果在以上内容中替代了终结器,则取消注释以下行。
  833. // GC.SuppressFinalize(this);
  834. }
  835. #endregion
  836. /// <summary>
  837. /// 基元用户模式构造同步锁
  838. /// </summary>
  839. private Int32 m_waiters = 0;
  840. /// <summary>
  841. /// 基元内核模式构造同步锁
  842. /// </summary>
  843. private AutoResetEvent m_waiterLock = new AutoResetEvent(false);
  844. /// <summary>
  845. /// 控制自旋的一个字段
  846. /// </summary>
  847. private Int32 m_spincount = 4000;
  848. /// <summary>
  849. /// 指出哪个线程拥有锁
  850. /// </summary>
  851. private Int32 m_owningThreadId = 0;
  852. /// <summary>
  853. /// 指示锁拥有了多少次
  854. /// </summary>
  855. private Int32 m_recursion = 0;
  856. /// <summary>
  857. /// 获取锁
  858. /// </summary>
  859. public void Enter()
  860. {
  861. Int32 threadId = Thread.CurrentThread.ManagedThreadId;
  862. if (threadId == m_owningThreadId)
  863. {
  864. m_recursion++;
  865. return;//如果调用线程已经拥有锁,就返回
  866. }
  867. //SpinWait spinwait
  868. if (Interlocked.Increment(ref m_waiters) == 1) return;//用户锁可以使用的时候,直接返回,第一次调用时发生
  869. //当发生锁竞争时,使用内核同步构造锁
  870. m_waiterLock.WaitOne();
  871. }
  872. /// <summary>
  873. /// 离开锁
  874. /// </summary>
  875. public void Leave()
  876. {
  877. if (Interlocked.Decrement(ref m_waiters) == 0) return;//没有可用的锁的时候
  878. m_waiterLock.Set();
  879. }
  880. }
  881. #endregion
  882. }