一个独立的线程函数示例


 

 

一个独立的线程函数示例

#ifndef  __PUSH_CACHE_THREAD_H__
#define  __PUSH_CACHE_THREAD_H__
#include "util/tc_singleton.h"
#include "util/tc_thread.h"
#include "util/tc_config.h"  //taf::TC_Config 
#include "util/tc_thread_rwlock.h"
#include <vector>
#include <string>
#include "PushInterface.h"
#include "PushCache.h"

class CPushCacheThread :public taf::TC_Thread , public taf::TC_Singleton<CPushCacheThread>
{
public:
    CPushCacheThread();
    ~CPushCacheThread();

    bool initialize(const taf::TC_Config  &conf);
    void run();
    void terminate();

public:
    
    void pushToCache(const HQExtend::HPushUserMsgReq& oneMsg);

private:
    std::queue<HQExtend::HPushUserMsgReq> m_msgQue;

    taf::TC_ThreadLock m_ThreadLock;//线程锁 
    taf::TC_ThreadRWLocker m_rwLock;//读写锁
    HQExtend::PushCachePrx m_pushCachePrx;
    std::vector<std::string>  m_vObjName;
};


#endif 

 

#include "pushCacheThread.h"
#include "CountTimeApp.h"        //TELL_TIME_COST_THIS
#include "servant/Application.h" //ServerConfig::LocalIp
#include "servant/taf_logger.h"  //FDLOG
#include "Pusher.h"
using namespace std;

CPushCacheThread::CPushCacheThread()
{
    FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << std::this_thread::get_id() << endl;
}

CPushCacheThread::~CPushCacheThread()
{
    FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << std::this_thread::get_id() << endl;
    terminate();
}

bool CPushCacheThread::initialize(const taf::TC_Config &conf)
{
    bool bRet = false;
    try
    {
        string pushCacheObj = conf.get("/conf/push<pushCache>");
        if (!pushCacheObj.empty())
        { //预警历史缓存服务 接口
            m_pushCachePrx = Communicator::getInstance()->stringToProxy<HQExtend::PushCachePrx>(pushCacheObj);
            FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
                           << "Push Cache prx init succ " << endl;
            bRet = true;
        }
        return bRet;
    }
    catch (std::exception &ex)
    {
        LOG_ERROR << "Exception: " << ex.what() << endl;
    }
    catch (...)
    {
        LOG_ERROR << "Unkonwn exception" << endl;
    }

    return bRet;
}

void CPushCacheThread::run()
{
    FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
                  << " =========== run START =========== " << std::this_thread::get_id() << endl;
    while (_running)
    {
        try
        {
            // do nothing...
            std::queue<HQExtend::HPushUserMsgReq> tempMsgQue;
            {
                taf::TC_ThreadWLock wlock(m_rwLock);
                tempMsgQue.swap(m_msgQue);
            }
            while (tempMsgQue.size() > 0)
            {
                HQExtend::HPushUserMsgReq req = tempMsgQue.front();
                HQExtend::PushCachePrxCallbackPtr callback(new PushCacheCallback());
                m_pushCachePrx->async_pushUserMsg(callback, req);
                tempMsgQue.pop();
            }

            {
                TC_ThreadLock::Lock lock(m_ThreadLock);
                m_ThreadLock.timedWait(5000);
            }
        }
        catch (std::exception &ex)
        {
            LOG_ERROR << "Exception:" << ex.what() << endl;
        }
        catch (...)
        {
            LOG_ERROR << "Unkown exception." << endl;
        }
    }

    FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
                  << " =========== run END =========== " << std::this_thread::get_id() << endl;
}

void CPushCacheThread::terminate()
{
    FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
                  << " =========== terminate =========== " << std::this_thread::get_id() << endl;
    if (_running)
    {
        _running = false;
        try
        {
            TC_ThreadLock::Lock lock(m_ThreadLock);
            m_ThreadLock.notifyAll();
            LOG_DEBUG << __FILE__ << " CPushCacheThread::terminate " << endl;
        }
        catch (std::exception &ex)
        {
            LOG_ERROR << __FILE__ << "Exception:" << ex.what() << endl;
        }
        catch (...)
        {
            LOG_ERROR << __FILE__ << "Unknown exception." << endl;
        }
    }
}

void CPushCacheThread::pushToCache(const HQExtend::HPushUserMsgReq &oneMsg)
{
    taf::TC_ThreadWLock wlock(m_rwLock);
    m_msgQue.push(oneMsg);
}

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/274068.html

(0)
上一篇 2022年7月13日
下一篇 2022年7月13日

相关推荐

发表回复

登录后才能评论