C#并行编程:并发集合


.NET Core在System.Collections.Concurrent命名空间下提供了线程安全的集合:

并发集合 非并发等价集合
ConcurrentStack<T> Stack<T>
ConcurrentQueue<T> Queue<T>
ConcurrentBag<T>
ConcurrentDictionary<TKey,TValue> Dictionary<TKey,TValue>

并发集合主要是针对高并发场景进行的优化。但是它们也可以单纯作为一般的线程安全的集合使用(替代用锁保护的一般集合)。但是使用时仍需注意:

  • 传统集合在非并发场景下的性能要高于并发集合。
  • 线程安全的集合并不能保证使用它们的代码是线程安全的
  • 在枚举并发集合时,如果另一个线程更新了集合的内容,不会抛出任何异常。相反,会得到一个新旧内容混合的结果。
  • List没有对应的并发版本。
  • ConcurrentStack、ConcurrentQueue和ConcurrentBag类的内部是使用链表实现的。因此,其内存利用不如非并发的Stack和Queue高效。但是它们适用于并发访问,因为链表更容易实现无锁算法或者少锁的算法。

因此,并发集合绝不仅仅是在普通集合上加了一把锁这么简单。例如,如果在单线程上执行以下代码:

var d = new ConcurrentDictionary<int, int>();
for (int i = 0; i < 1000000; i++)
{
    d[i] = 123;
}

它的运行速度会比一下代码慢三倍以上:

var d = new Dictionary<int, int>();
for (int i = 0; i < 1000000; i++)
{
    lock (d)
    {
        d[i] = 123;
    }
}

并发集合和传统集合的另一个不同之处是并发集合提供了原子的检测和执行操作,例如TryPop。其中大部分方法是通过IProducerConsumerCollection<T>接口统一起来的。

IProducerConsumerCollection<T>接口

IProducerConsumerCollection<T>接口代表了一个线程安全的生产者/消费者集合。ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>均实现了这个接口。

IProducerConsumerCollection<T>扩展了ICollection接口,并添加了以下方法:

void CopyTo(T[] array, int index);
T[] ToArray();
bool TryAdd(T item);
bool TryTake([MaybeNullWhen(false)] out T item);

其中TryAdd和TryTake方法会测试添加和删除操作是否可以执行,如果可以则执行该操作。测试和执行是以原子方式执行的,因此无须像传统集合那样在操作时加锁。

TryTake方法在集合为空的情况下会返回false。TryAdd在现有的三个实现类中都必定能成功,并返回true。但如果自定义的集合不允许出现重复元素,则该方法应在欲添加元素已经存在的情况下返回false。

不同类型的TryTake方法执行的操作也各有差异:

  • 对于ConcurrentStack类,TryTake会删除最近添加的元素。
  • 对于ConcurrentQueue类,TryTake会删除最早添加的元素。
  • 对于ConcurrentBag类,哪个元素删除效率最高,TryTake方法就会删除哪个元素。

以上三个具体类都显式实现了TryTake和TryAdd方法,并用更加特殊命名的公有方法来提供相应的功能,例如TryDequeue和TryPop。

ConcurrentBag<T>类

生产者/消费者集合主要有两种使用场景:

  • 添加一个元素(“生产”)。
  • 检索一个元素并删除它(“消费”)。

栈和队列都是典型的生产者/消费者集合。这种集合在并行编程中非常重要,因为它们有利于实现高效的无锁设计。

ConcurrentBag<T>是一个无序的对象集合(而且集合中允许出现重复的对象)。如果我们不关心调用Take或者TryTake时所获得的元素的顺序,就可以使用ConcurrentBag <T>类。ConcurrentBag<T>的Add方法可以非常高效地在多个线程上被并行调用而几乎不会出现竞争。相反,ConcurrentQueue和ConcurrentStack的Add方法会造成一些竞争。

准确地说,Take方法将返回调用线程在集合中最近添加的元素。如果该线程上已经没有任何元素,它会返回其他线程(随机挑选)最近添加的元素。
当集合的并行操作大部分是添加元素,或者各个线程添加元素和移除元素数量基本平衡时,ConcurrentBag类是理想的选择。之前使用Parallel.ForEach实现的并行拼写检查器示例中就使用了ConcurrentBag

var misspellings = new ConcurrentBag<Tuple<int, string>>();
 
Parallel.ForEach(wordsToTest, (word, state, i) =>
{
    if (!wordLookup.Contains(word))
        misspellings.Add(Tuple.Create((int)i, word));
});

ConcurrentBag<T>不适用于实现生产者/消费者队列,因为元素的添加和移除操作是在不同的线程间执行的。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/288267.html

(0)
上一篇 2022年9月8日
下一篇 2022年9月8日

相关推荐

发表回复

登录后才能评论