C# 流水线 生产者/消费者链 Producer/Consumer


<body>

  1. manager.cs

    using System;
    using System.Collections.Concurrent;
    using System.Threading;
    using bntu.pcm.plworker;
    using bntu.pcm.works;
    
    /*
     * bntu 是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm  是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     */
    namespace bntu.pcm
    {
        /// <summary>
        /// 管理流水线工程(定义流水线,添加流程)
        /// </summary>
        class Manager
        {
            public static void Main(string[] args)
            {
                // 定义流水线工程,并为其添加流程
                Pipleline pipleline = new Pipleline(new BlockingCollection<string>());
                pipleline.AddItem<string, int>(ReadGrayImage.Read);
                pipleline.AddItem<int, double>(GenerateDeepthImage.Generate);
                pipleline.AddItem<double, double>(CalculateDeepthImage.Calculate);
    
                // 将整个流水线作为后台线程(这是因为前台线程是整个流水线的输入)
                Thread thread = new Thread(() => pipleline.PiplelineWork());
                thread.IsBackground = true;
                thread.Start();
    
                // 整个流水线的输入作为前台线程(这个while循环模拟相机不断输出图像)
                string image_path = GetImagePath();
                while (image_path != null)
                {
                    // 为整个流水线的输入缓冲区添加元素
                    pipleline.HIB.Add(image_path);
                    image_path = GetImagePath();
                }
            }
    
            // 模拟图片的编号
            private static int i = 0;
    
            /// <summary>
            /// 用于模拟相机输出图像
            /// </summary>
            /// <returns>模拟图像</returns>
            public static string GetImagePath()
            {
                return i++.ToString();
            }
        }
    }
    
  2. PiplelineItem.cs

    using System;
    using System.Collections.Concurrent;
    
    /*
     * bntu     是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm      是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     * plworker 是pipleline worker的缩写,是“生/消”模型的特殊形式——即流水线模型
     */
    namespace bntu.pcm.plworker
    {
        /// <summary>
        /// 流水线上的流程定义
        /// </summary>
        /// <typeparam name="INPUT">该流程的输入缓冲区的类型</typeparam>
        /// <typeparam name="OUTPUT">该流程的输出缓冲区的类型</typeparam>
        public class PiplelineItem<INPUT, OUTPUT>
        {
            // 该流程的输出缓冲区
            public BlockingCollection<OUTPUT> output;
    
            public BlockingCollection<OUTPUT> Output { get => output; }
    
            /// <summary>
            /// 流程的构造函数
            /// </summary>
            public PiplelineItem()
            {
                // 流程的输出缓冲区由其自己定义,然后为后一流程提供一个作为输入缓冲区的接口
                this.output = new BlockingCollection<OUTPUT>();
            }
    
            /// <summary>
            /// 该流程的操作过程(从输入缓冲区取走,然后处理后放入输出缓冲区)
            /// </summary>
            /// <param name="input_buffer">该流程的输入缓冲区(上一流程的输出缓冲区)</param>
            /// <param name="handle">该流程中的具体操作</param>
            private void Action(BlockingCollection<INPUT> input_buffer, Func<INPUT, OUTPUT> handle)
            {
                try
                {
                    // 从输入缓冲区中取走元素
                    foreach (var item in input_buffer.GetConsumingEnumerable())
                    {
                        // 经过指定操作后放入输出缓冲区
                        this.output.Add(handle(item));
                    }
                }
                finally
                {
                    // 当输入缓冲区取空了就需要告知输出缓冲区没有新的元素进入,否则线程无法结束
                    this.output.CompleteAdding();
                }
            }
    
            /// <summary>
            /// 获取该流程的动作(也就是操作或者称为任务)
            /// </summary>
            /// <param name="input_buffer">该流程的输入缓冲区(上一流程的输出缓冲区)</param>
            /// <param name="handle">该流程中的具体操作</param>
            /// <returns>该流程的任务</returns>
            public Action GetPiplelineItemAction(object input_buffer, object handle)
            {
                // 在该pipleline_item中包含了input的信息,所以在PiplelineItem可以进行类型转换,而在Pipleline中就不行
                BlockingCollection<INPUT> _input_buffer = input_buffer as BlockingCollection<INPUT>;
                Func<INPUT, OUTPUT> _handle = handle as Func<INPUT, OUTPUT>;
                // this指针包含了模板信息,所以action就固定成Task了,而不再是Pipleline中的Task<dynamic>
                return () => this.Action(_input_buffer, _handle);
            }
        }
    }
    
  3. Pipleline.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    
    /*
     * bntu     是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm      是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     * plworker 是pipleline worker的缩写,是“生/消”模型的特殊形式——即流水线模型
     */
    namespace bntu.pcm.plworker
    {
        /// <summary>
        /// 流水线的结构定义
        /// </summary>
        public class Pipleline
        {
            // 整个流水线的输入缓冲区
            private dynamic head_input_buffer;
            // 流水线上的所有流程组成的列表,各个流程中执行的操作组成的刘表
            private List<object> pipleline_item_list, handle_list;
    
            public dynamic HIB { get => head_input_buffer; }
    
            /// <summary>
            /// 流水线构造函数
            /// </summary>
            /// <param name="head_input_buffer">第一个输入缓冲区,也是整个流水线的输入缓冲区</param>
            public Pipleline(dynamic head_input_buffer)
            {
                this.head_input_buffer = head_input_buffer;
                this.pipleline_item_list = new List<object>();
                this.handle_list = new List<object>();
            }
    
            /// <summary>
            /// 为流水线添加一个流程
            /// </summary>
            /// <typeparam name="INPUT">输入缓冲区的类型</typeparam>
            /// <typeparam name="OUTPUT">输出缓冲区的类型</typeparam>
            /// <param name="handle">该流程需要执行的操作</param>
            public void AddItem<INPUT, OUTPUT>(Func<INPUT, OUTPUT> handle)
            {
                this.pipleline_item_list.Add(new PiplelineItem<INPUT, OUTPUT>());
                this.handle_list.Add(handle);
            }
    
            /// <summary>
            /// 构建流水线工程(也就是把流水线的各个结点都联系起来)
            /// </summary>
            public void PiplelineWork()
            {
                // 缓冲区列表,任务列表,任务工厂
                var buffer_list = new List<object>();
                var task_list = new List<Task>();
                var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
    
                // 先将整个流水线的输入缓冲区加入流水线再说
                buffer_list.Add(this.head_input_buffer);
                // 浅浅用一波zip函数(没啥实质性用处,主要是方便)
                foreach ((dynamic pipleline_item, dynamic handle) in
                    this.pipleline_item_list.Zip(this.handle_list,
                    (pipleline_item, handle) => new KeyValuePair<dynamic, dynamic>(pipleline_item, handle)))
                {
                    // 获取该流程的输入缓冲区(其实就是上一个流程的输出缓冲区)
                    var input_buffer = buffer_list.Last();
                    // 将流程需要执行的操作作为一个新的任务,并将该线程(任务)加入任务列表
                    task_list.Add(taskFactory.StartNew(pipleline_item.GetPiplelineItemAction(input_buffer, handle)));
                    // 将该流程的输出缓冲区加入缓冲区列表,作为后一个流程的输入缓冲区
                    buffer_list.Add(pipleline_item.Output);
                }
    
                // 等待所有所需的结果(就是已经结束咧)
                Task.WaitAll(task_list.ToArray());
            }
        }
    }
    
  4. CalculateDeepthImage.cs

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    
    /*
     * bntu     是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm      是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     * works    是流水线的各个流程
     */
    namespace bntu.pcm.works
    {
        /// <summary>
        /// 模拟进行深度图的计算
        /// </summary>
        class CalculateDeepthImage
        {
            public static double Calculate(double d)
            {
                Console.WriteLine("计算深度图像:" + d);
                Task.Delay(new Random().Next(300));
                return d;
            }
        }
    }
    
  5. GenerateDeepthImage.cs

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    
    /*
     * bntu     是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm      是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     * works    是流水线的各个流程
     */
    namespace bntu.pcm.works
    {
        /// <summary>
        /// 模拟通过灰度图像产生深度图
        /// </summary>
        class GenerateDeepthImage
        {
            public static double Generate(int i)
            {
                Console.WriteLine("产生深度图像:" + i * 1.0);
                Task.Delay(new Random().Next(200));
                return i * 1.0;
            }
        }
    }
    
  6. ReadGrayImage.cs

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    
    /*
     * bntu     是我的网名,在cnblogs上可以搜到我的博客:https://www.cnblogs.com/SimbaWang/
     * pcm      是producer/consumer manager的缩写,反正就是代表这个命名空间就是用来解决生产者消费者问题的
     * works    是流水线的各个流程
     */
    namespace bntu.pcm.works
    {
        /// <summary>
        /// 模拟读取灰度图像
        /// </summary>
        class ReadGrayImage
        {
            public static int Read(string s)
            {
                Console.WriteLine("读取灰度图像:" + s);
                Task.Delay(new Random().Next(100));
                return Convert.ToInt32(s);
            }
        }
    }
    

</body>

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/288646.html

(0)
上一篇 2022年9月10日
下一篇 2022年9月10日

相关推荐

发表回复

登录后才能评论