1.基本介绍
Parallel类是对线程的一个很好的抽象。该类位于System.Threading.Tasks命名空间中,提供了数据和任务并行性。
2.Parallel.Invoke 主要用于任务的并行
这个函数的功能和Task有些相似,就是并发执行一系列任务,然后等待所有完成。和Task比起来,省略了Task.WaitAll这一步,自然也缺少了Task的相关管理功能。它有两种形式:
Parallel.Invoke( params Action[] actions);
Parallel.Invoke(Action[] actions,TaskManager manager,TaskCreationOptions options);
static void Main()
{
try
{
Parallel.Invoke(
BasicAction,
() =>
{
Console.WriteLine("Method=beta, Thread={0}", Thread.CurrentThread.ManagedThreadId);
},
delegate ()
{
Console.WriteLine("Method=gamma, Thread={0}", Thread.CurrentThread.ManagedThreadId);
}
);
}
// No exception is expected in this example, but if one is still thrown from a task,
// it will be wrapped in AggregateException and propagated to the main thread.
catch (AggregateException e)
{
Console.WriteLine("An action has thrown an exception. THIS WAS UNEXPECTED./n{0}", e.InnerException.ToString());
}
}
static void BasicAction()
{
Console.WriteLine("Method=alpha, Thread={0}", Thread.CurrentThread.ManagedThreadId);
}
3.Parallel.For方法,主要用于处理针对数组元素的并行操作(数据的并行)
static void Main(string[] args)
{
int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
Parallel.For(0, nums.Length, (i) =>
{
Console.WriteLine("针对数组索引{0}对应的那个元素{1}的一些工作代码……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId);
});
Console.ReadKey();
}
4.Parallel.ForEach方法,主要用于处理泛型集合元素的并行操作(数据的并行)
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace AsyncDemo1
{
class Program
{
static void Main(string[] args)
{
List<Student> students = new List<Student>();
students.Add(new Student() {
StudentID=1,
Name="张三",
Age=18
});
students.Add(new Student()
{
StudentID = 2,
Name = "李四",
Age = 17
});
students.Add(new Student()
{
StudentID = 3,
Name = "王五",
Age = 19
});
Parallel.ForEach(students, (item)=>
{
Console.WriteLine($"学生编号:{item.StudentID},姓名:{item.Name},年龄:{item.Age}");
});
Console.ReadLine();
}
}
public class Student
{
public int StudentID { get; set; }
public string Name { get; set; }
public int Age { get; set; }
}
}
5.ForEach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用Partitioner.Create实现。
static void Main(string[] args)
{
ConcurrentBag<int> bag = new ConcurrentBag<int>();
var watch = Stopwatch.StartNew();
watch.Start();
Parallel.ForEach(Partitioner.Create(0, 3000000), tuple =>
{
for (int m = tuple.Item1; m < tuple.Item2; m++)
{
bag.Add(m);
}
});
Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
GC.Collect();
Console.ReadLine();
}
6.ParallelOptions类
有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好ParallelOptions提供了MaxDegreeOfParallelism属性。
static void Main(string[] args)
{
List<Student> students = new List<Student>();
students.Add(new Student()
{
StudentID = 1,
Name = "张三",
Age = 18
});
students.Add(new Student()
{
StudentID = 2,
Name = "李四",
Age = 17
});
students.Add(new Student()
{
StudentID = 3,
Name = "王五",
Age = 19
});
Parallel.ForEach(students, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (item) =>
{
Console.WriteLine($"学生编号:{item.StudentID},姓名:{item.Name},年龄:{item.Age}");
});
Console.ReadLine();
}
public class Student
{
public int ID { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public DateTime CreateTime { get; set; }
}
class Program
{
static void Main(string[] args)
{
var dic = LoadData();
Stopwatch watch = new Stopwatch();
watch.Start();
var query2 = (from n in dic.Values.AsParallel()
where n.Age > 20 && n.Age < 25
select n).ToList();
watch.Stop();
Console.WriteLine("并行计算耗费时间:{0}", watch.ElapsedMilliseconds);
Console.Read();
}
public static ConcurrentDictionary<int, Student> LoadData()
{
ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
ParallelOptions options = new ParallelOptions();
//指定使用的硬件线程数为4
options.MaxDegreeOfParallelism = 4;
//预加载1500w条记录
Parallel.For(0, 15000000, options, (i) =>
{
var single = new Student()
{
ID = i,
Name = "hxc" + i,
Age = i % 151,
CreateTime = DateTime.Now.AddSeconds(i)
};
dic.TryAdd(i, single);
});
return dic;
}
}
7.中途退出并行循环
在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。
Break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。
Stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。
static void Main(string[] args)
{
ConcurrentBag<int> bag = new ConcurrentBag<int>();
Parallel.For(0, 20000000, (i, state) =>
{
if (bag.Count == 1000)
{
//state.Break();
state.Stop();
return;
}
bag.Add(i);
});
Console.WriteLine("当前集合有{0}个元素。", bag.Count);
}
8.并行中的异常处理
首先任务是并行计算的,处理过程中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的Exception并不能获取到异常,然而为并行诞生的AggregateExcepation就可以获取到一组异常。
static void Main(string[] args)
{
try
{
Parallel.Invoke(Run1, Run2);
}
catch (AggregateException ex)
{
foreach (var single in ex.InnerExceptions)
{
Console.WriteLine(single.Message);
}
}
Console.WriteLine("结束了!");
//Console.Read();
}
static void Run1()
{
Thread.Sleep(3000);
throw new Exception("我是任务1抛出的异常");
}
static void Run2()
{
Thread.Sleep(5000);
throw new Exception("我是任务2抛出的异常");
}
还有另外一种就是直接在Parallel处理异常
private static readonly object locker = new object();
static void Main(string[] args)
{
List<string> errList = new List<string>();
Parallel.For(0, 10, (i) =>
{
try
{
TestClass a = new TestClass();
a.Test(i);
}
catch (Exception ex)
{
lock (locker)
{
errList.Add(ex.Message);
}
//Console.WriteLine(ex.Message);
//注:这里不再将错误抛出.....
//throw ex;
}
});
int Index = 1;
foreach (string err in errList)
{
Console.WriteLine("{0}、的错误:{1}", Index++, err);
}
}
参考链接:https://www.cnblogs.com/scmail81/p/9521096.html
https://docs.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.parallel?view=net-6.0
原创文章,作者:254126420,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/276678.html