c++消息队列的实现详解编程语言

 
#ifndef NET_FRAME_CONCURRENT_QUEUE_H   
#define NET_FRAME_CONCURRENT_QUEUE_H   
#include <queue> 
 
#include <mutex>   
 
#include <condition_variable>   
 
template<class Type>   
 
/*消息队列实现*/   
 
class ConcurrentQueue {   
 
        ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;   
 
        ConcurrentQueue(const ConcurrentQueue& other) = delete;   
 
public:   
 
        ConcurrentQueue() : _queue(), _mutex(), _condition() { }   
 
virtual ~ConcurrentQueue() { }   
 
void Push(Type record) {   
 
            std::lock_guard <std::mutex> lock(_mutex);   
 
            _queue.push(record);   
 
            _condition.notify_one();   
 
        }   
 
bool Pop(Type& record, bool isBlocked = true) {   
 
if (isBlocked) {   
 
                std::unique_lock <std::mutex> lock(_mutex);   
 
while (_queue.empty()) {   
 
                    _condition.wait(lock);   
 
                }   
 
            }   
 
else // If user wants to retrieve data in non-blocking mode   
 
            {   
 
                std::lock_guard <std::mutex> lock(_mutex);   
 
if (_queue.empty()) {   
 
return false;   
 
                }   
 
            }   
 
            record = std::move(_queue.front());   
 
            _queue.pop();   
 
return true;   
 
        }   
 
        int32_t Size() {   
 
            std::lock_guard <std::mutex> lock(_mutex);   
 
return _queue.size();   
 
        }   
 
bool Empty() {   
 
            std::lock_guard <std::mutex> lock(_mutex);   
 
return _queue.empty();   
 
        }   
 
private:   
 
        std::queue <Type> _queue;   
 
mutable std::mutex _mutex;   
 
        std::condition_variable _condition;   
 
    };   
 
#endif //NET_FRAME_CONCURRENT_QUEUE_H  

 

(2)拥有消息队列的线程池的实现

.h文件如下

#ifndef NET_FRAME_THREAD_POOL_H   
 
#define NET_FRAME_THREAD_POOL_H   
 
#include "ConcurrentQueue.h"   
 
#include <vector>   
 
#include <queue>   
 
#include <memory>   
 
#include <thread>   
 
#include <mutex>   
 
#include <condition_variable>   
 
#include <future>   
 
#include <functional>   
 
#include <stdexcept>   
 
#define MIN_THREADS 10   
 
    template<class Type>   
 
    class ThreadPool {   
 
        ThreadPool& operator=(const ThreadPool&) = delete;   
 
        ThreadPool(const ThreadPool& other) = delete;   
 
    public:   
 
        ThreadPool(int32_t threads, std::function<void(Type& record)> handler);   
 
        virtual ~ThreadPool();   
 
        void Submit(Type record);   
 
    private:   
 
    private:   
 
        bool _shutdown;   
 
        int32_t _threads;   
 
        std::function<void(Type& record)> _handler;   
 
        std::vector <std::thread> _workers;   
 
        ConcurrentQueue <Type> _tasks;   
 
    };   
 
    template<class Type>   
 
    ThreadPool<Type>::ThreadPool(int32_t threads, std::function<void(Type &record)> handler)   
 
            : _shutdown(false),   
 
              _threads(threads),   
 
              _handler(handler),   
 
              _workers(),   
 
              _tasks() {   
 
        if (_threads < MIN_THREADS)   
 
_threads = MIN_THREADS;   
 
        for (int32_t i = 0; i < _threads; ++i)   
 
            _workers.emplace_back(   
 
                    [this] {   
 
                        while (!_shutdown) {   
 
                            Type record;   
 
                            _tasks.Pop(record, true);   
 
                            _handler(record);   
 
                        }   
 
                    }   
 
            );   
 
    }   
 
    template<class Type>   
 
    ThreadPool<Type>::~ThreadPool() {   
 
        for (std::thread &worker: _workers)   
 
            worker.join();   
 
    }   
 
    template<class Type>   
 
    void ThreadPool<Type>::Submit(Type record) {   
 
        _tasks.Push(record);   
 
    }   
 
#endif //NET_FRAME_THREAD_POOL_H  

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/18481.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论