2.Parallel.Invoke 主要用于任务的并行
Parallel.Invoke( params Action[] actions);
Parallel.Invoke(Action[] actions,TaskManager manager,TaskCreationOptions options);
static void Main() { try { Parallel.Invoke( BasicAction, () => { Console.WriteLine("Method=beta, Thread={0}", Thread.CurrentThread.ManagedThreadId); }, delegate () { Console.WriteLine("Method=gamma, Thread={0}", Thread.CurrentThread.ManagedThreadId); } ); } // No exception is expected in this example, but if one is still thrown from a task, // it will be wrapped in AggregateException and propagated to the main thread. catch (AggregateException e) { Console.WriteLine("An action has thrown an exception. THIS WAS UNEXPECTED./n{0}", e.InnerException.ToString()); } } static void BasicAction() { Console.WriteLine("Method=alpha, Thread={0}", Thread.CurrentThread.ManagedThreadId); }
static void Main(string[] args) { int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }; Parallel.For(0, nums.Length, (i) => { Console.WriteLine("针对数组索引{0}对应的那个元素{1}的一些工作代码……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId); }); Console.ReadKey(); }
using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace AsyncDemo1 { class Program { static void Main(string[] args) { List<Student> students = new List<Student>(); students.Add(new Student() { StudentID=1, Name="张三", Age=18 }); students.Add(new Student() { StudentID = 2, Name = "李四", Age = 17 }); students.Add(new Student() { StudentID = 3, Name = "王五", Age = 19 }); Parallel.ForEach(students, (item)=> { Console.WriteLine($"学生编号:{item.StudentID},姓名:{item.Name},年龄:{item.Age}"); }); Console.ReadLine(); } } public class Student { public int StudentID { get; set; } public string Name { get; set; } public int Age { get; set; } } }
static void Main(string[] args) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); var watch = Stopwatch.StartNew(); watch.Start(); Parallel.ForEach(Partitioner.Create(0, 3000000), tuple => { for (int m = tuple.Item1; m < tuple.Item2; m++) { bag.Add(m); } }); Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds); GC.Collect(); Console.ReadLine(); }
static void Main(string[] args) { List<Student> students = new List<Student>(); students.Add(new Student() { StudentID = 1, Name = "张三", Age = 18 }); students.Add(new Student() { StudentID = 2, Name = "李四", Age = 17 }); students.Add(new Student() { StudentID = 3, Name = "王五", Age = 19 }); Parallel.ForEach(students, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (item) => { Console.WriteLine($"学生编号:{item.StudentID},姓名:{item.Name},年龄:{item.Age}"); }); Console.ReadLine(); }
public class Student { public int ID { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime CreateTime { get; set; } } class Program { static void Main(string[] args) { var dic = LoadData(); Stopwatch watch = new Stopwatch(); watch.Start(); var query2 = (from n in dic.Values.AsParallel() where n.Age > 20 && n.Age < 25 select n).ToList(); watch.Stop(); Console.WriteLine("并行计算耗费时间:{0}", watch.ElapsedMilliseconds); Console.Read(); } public static ConcurrentDictionary<int, Student> LoadData() { ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>(); ParallelOptions options = new ParallelOptions(); //指定使用的硬件线程数为4 options.MaxDegreeOfParallelism = 4; //预加载1500w条记录 Parallel.For(0, 15000000, options, (i) => { var single = new Student() { ID = i, Name = "hxc" + i, Age = i % 151, CreateTime = DateTime.Now.AddSeconds(i) }; dic.TryAdd(i, single); }); return dic; } }
Break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。
static void Main(string[] args) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); Parallel.For(0, 20000000, (i, state) => { if (bag.Count == 1000) { //state.Break(); state.Stop(); return; } bag.Add(i); }); Console.WriteLine("当前集合有{0}个元素。", bag.Count); }
static void Main(string[] args) { try { Parallel.Invoke(Run1, Run2); } catch (AggregateException ex) { foreach (var single in ex.InnerExceptions) { Console.WriteLine(single.Message); } } Console.WriteLine("结束了!"); //Console.Read(); } static void Run1() { Thread.Sleep(3000); throw new Exception("我是任务1抛出的异常"); } static void Run2() { Thread.Sleep(5000); throw new Exception("我是任务2抛出的异常"); }
private static readonly object locker = new object(); static void Main(string[] args) { List<string> errList = new List<string>(); Parallel.For(0, 10, (i) => { try { TestClass a = new TestClass(); a.Test(i); } catch (Exception ex) { lock (locker) { errList.Add(ex.Message); } //Console.WriteLine(ex.Message); //注:这里不再将错误抛出..... //throw ex; } }); int Index = 1; foreach (string err in errList) { Console.WriteLine("{0}、的错误:{1}", Index++, err); } }