C# 提供了一些强大的工具来帮助开发人员管理这些挑战,其中之一就是 BlockingCollection<T>。 这篇博客将介绍 BlockingCollection 的基本概念、用法以及它在多线程环境中的优势。 什么是 BlockingCollection? 为什么使用 BlockingCollection? 线程安全:BlockingCollection 内部实现了锁机制,确保在多线程环境中操作集合时不会出现竞争条件。 BlockingCollection<int> blockingCollection = new BlockingCollection<int>(5); // 启动生产者任务 BlockingCollection<int> dataQueue = new BlockingCollection<int>(); // 启动消费者线程
多线程操作集合时,ConcurrentQueue 是我常用的,一直用得也挺爽,突然发现了 BlockingCollection,原来还可以更简单。。。 BlockingCollection<T> 是一个自带阻塞功能的线程安全集合类,和 ConcurrentQueue<T> 有点像,不同的是,BlockingCollection<T> 自带阻塞功能。 实例化 BlockingCollection<T> 时,可以传入 boundedCapacity 参数,设置集合的上限,集合中元素到达上限后,Add 方法将阻塞。 这是 MSDN 上的 例子: https://docs.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100); Task.Run(() => { while
我一开始是这么写的: BlockingCollection<string> blockingCollection = new BlockingCollection<string>(); blockingCollection.IsCompleted) { foreach (var b in blockingCollection.GetConsumingEnumerable ///// 后续补充 BlockingCollection的GetComsumingEnumerate方法跳出循环的标志是BlockingCollection的IsCompleteAdding为true 并且BlockingCollection集合的数据为空 ///// BlockingCollection<string> blockingCollection = new BlockingCollection blockingCollection.IsCompleted) { foreach (var b in blockingCollection.GetConsumingEnumerable
枚举:GetConsumingEnumerable和BlockingCollection本身 4. GetConsumingEnumerable和CompleteAdding 返回目录 1. 比如下面代码,试图将1-50加入到BlockingCollection,此时默认内部是ConcurrentBag,当然你可以指定任意IProducerConsumerCollection。 枚举:GetConsumingEnumerable和BlockingCollection本身 BlockingCollection有两种枚举方法,首先BlockingCollection本身继承自IEnumerable <T>,所以它自己就可以被foreach枚举,首先BlockingCollection包装了一个线程安全集合,那么它自己也是线程安全的,而当多个线程在同时修改或访问线程安全容器时,BlockingCollection 而BlockingCollection还有一个GetConsumingEnumerable方法,同样返回一个IEnumerable<T>,这个可枚举的集合背后的迭代器不同于BlockingCollection
在多线程环境下,使用BlockingCollection以及ConcurrentQueue来消费生产者生产的资源,这是我自己写的多生产者多消费者的作法,其实也是基于单个task下的阻塞队列的IsComplete 其实是在<<.NET 中的阻塞队列BlockingCollection的正确打开方式>>基础上做的,也没有什么,但是这是个好思路。后续尝试自己封装线程标志来做,不依靠FCL的阻塞队列。 <string> blockingCollection = new BlockingCollection<string>(); var t = new Task[50]; { foreach (var b in blockingCollection.GetConsumingEnumerable()) ); 参考: .Net中的并行编程-7.基于BlockingCollection实现高性能异步队列
StackOverflow 有人说自己的 Disruptor.NET 代码比 BlockingCollection 还有慢 2 倍,并且把完整代码贴出,楼下几个老外也的回复说了一堆,但是没研究出个所以然来 ,讨论到最后甚至说可能你的场景不适合 Disruptor,我对此表示怀疑,BlockingCollection 内部实现比较简单粗暴,必要时就加锁,取数据时用信号量等待添加操作完成,而 Disruptor 还有 BlockingCollection 里面的 while (! dataItems.IsCompleted) 写的也有问题,即使 BlockingCollection Producer 在循环中一直做添加操作,BlockingCollection 内部状态也并不是一直在添加状态中 ,这样导致添加循环还没做完,可是计时器的循环已经提前结束,导致 BlockingCollection 测得时间少于实际时间。
下面举一个使用线程安全集合的例子,使用的是BlockingCollection,个人觉得这个集合是够用了,其他集合和这个集合基本上大同小异,没什么大区别。 但是根据官方解释ConcurrentBag是适用于快速删除和添加,具体为什么他适用,而BlockingCollection不适用,我也没找到原因。 属性一:BlockingCollection.IsAddingCompleted,表示是否添加完成。 方法二:BlockingCollection.Add,添加一个实体 方法三:BlockingCollection.TryAdd,添加一个实体,我这里用的是这个方法,区别是,如果添加重复项,他会引发InvalidOperationException 方法四:BlockingCollection.Take,从集合中取一个值,注意,是真的取出来,取出来后,BlockingCollection.cout会减一。 运行结果如下: ?
限界:使用BlockingCollection(int boundedCapacity),设置boundedCapacity的值,当集合容量达到这个值得时候,向BlockingCollection添加元素的线程将会被阻塞 默认情况下,BlockingCollection封装了一个ConcurrentQueue。 图2 并行的流水线模式 说明: 常使用BlockingCollection<T>作为缓冲罐区队列。 流水线的速度近似等于流水线最慢阶段的速度。 4 使用方式 仅以ConcurrentBag和BlockingCollection为例,其他的并发集合与之相似。 <int> producerColl = new BlockingCollection<int>(); 11 //消费者集合 12 private static BlockingCollection<
❌这个例子永远获取一个线程池的线程,为了在BlockingCollection<T>上执行队列工作。 public class QueueProcessor { private readonly BlockingCollection<Message> _messageQueue = new BlockingCollection public class QueueProcessor { private readonly BlockingCollection<Message> _messageQueue = new BlockingCollection
引言 最近看一些代码的时候,发现有人用 System.Collections.Concurrent 下的 BlockingCollection 很便利的实现了生产者 - 消费者模式,这是之前没有注意到的 BlockingCollection BlockingCollection<T> 是专门用于生产者-消费者模式的并发集合。 下面看一下示例: Random random = new Random(); // 方式1:创建 BlockingCollection(默认不限制容量) var blockingCollection = new BlockingCollection<int>(); // 方式2:创建一个容量为 5 的集合 //var blockingCollection = new BlockingCollection <int>(2); // 方式3:创建 ConcurrentStack(后进先出) //var blockingCollection = new BlockingCollection<int>(new
} } } } class ShardedDictionary { Dictionary<string, string>[] _dics; BlockingCollection shardingFactor) { _dics = new Dictionary<string, string>[shardingFactor]; _workers = new BlockingCollection i++) { var dic = new Dictionary<string, string>(); var worker = new BlockingCollection }.Start(); } } private static void ExecWorker(Dictionary<string, string> dic, BlockingCollection
IProducerConsumerCollection<T>被上述集合类实现; BlockingCollection<T>实现了对IProducerConsumerCollection<T>实现类的包装 ,有上线和阻塞(block); BlockingCollection<T>默认封装的是ConcurrentQueue,如果要修改其封装的类型那么可以: var t= new BlockingCollection
这是从上文的<<图文并茂的生产者消费者应用实例demo>>整理总结出来的,具体就不说了,直接给出代码,注释我已经加了,原来的code请看<<.Net中的并行编程-7.基于BlockingCollection Console.WriteLine(ex.ToString()); } 封装的队列: public class ProcessQueue<T> { private BlockingCollection Exception, T> ProcessExceptionEvent; public ProcessQueue() { _queue = new BlockingCollection
为了解决这个问题,微软先后增加了 BlockingCollection[5] 和 BufferBlock[6] 两种数据结构,这里以前者为例,下面是一个典型的生产者-消费者模型: var bc = new BlockingCollection<int>(); // 生产者 var producer = Task.Run(() => { for (var i = 0; i < Count; i+ 与此同时,BlockingCollection<T> 和 BufferBlock<T> 都是线程安全的集合,这可以让我们在多线程环境下更加得心应手。 那么,你也许会问,既然我们已经有 BlockingCollection<T> 和 BufferBlock<T> 这样的数据结构,为什么我们还需要 Channel 呢? 所以,我们当时能想到的方案,是打算用 BlockingCollection 来做一个阻塞式的队列,换句话讲,就是从 NLog 或者 Log4Net中拿到日志以后,将这些日志全部放在 BlockingCollection
BlockingCollection<>类,其实我之前写的好些关于线程的文章都说到了这个类库,用到的地方也多。 原文说明 BlockingCollection 概述 (https://docs.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/ blockingcollection-overview ) 感慨一句,微软的好东西是真多,为什么不能像java那样轻易地被人发现使用呢?
ConcurrentQueue 对应非线程安全类型:Queue ConcurrentStack 对应非线程安全类型:Stack ConcurrentDictionary 对应非线程安全类型:Dictionary (2)BlockingCollection BlockingCollection 意为 阻塞集合。 线程安全的集合 可以转换为 阻塞集合,只要它实现了IProducerConsumerCollection接口BlockingCollection可以实现类似发布订阅的业务场景应用: 生产端Add进去发布的消息
我重新写了段代码,让ID自增和集合添加都放到锁里面: num = 1; total = 0; using (var q = new BlockingCollection var x = q.GroupBy(n => n).Where(o => o.Count() > 1); Console.WriteLine($"并发使用安全集合BlockingCollection Interlocked添加num,集合重复值:{x.Count()}个"); Console.ReadKey(); } 这里我测试了另外一个线程安全的集合BlockingCollection
于是乎,我重新写了段代码,让ID自增和集合添加都放到锁里面: num = 1; total = 0; using (var q = new BlockingCollection<int>()) { 并发之后值为:{num}"); var x = q.GroupBy(n => n).Where(o => o.Count() > 1); Console.WriteLine($"并发使用安全集合BlockingCollection +Interlocked添加num,集合重复值:{x.Count()}个"); Console.ReadKey(); } 这里我测试了另外一个线程安全的集合BlockingCollection,
性能 这里的基准测试我对比了三种类型,Channel, BufferBlock, BlockingCollection,分别写入了10000条数据,然后进行读取,发现 Channel 确实是表现比较好。 实际上还是使用 ConcurrentQueue做的封装, 使用起来更方便,对异步更友好,另外,.NET 5 其中的 Quic 内部就使用了Channel,CAP 也在新版本中使用 Channel 替换掉了之前的 BlockingCollection
给个例子: public sealed class CustomTaskScheduler : TaskScheduler, IDisposable { private BlockingCollection <Task> tasksCollection = new BlockingCollection<Task>(); private readonly Thread mainThread =