一个独立的线程函数示例
#ifndef __PUSH_CACHE_THREAD_H__
#define __PUSH_CACHE_THREAD_H__
#include "util/tc_singleton.h"
#include "util/tc_thread.h"
#include "util/tc_config.h" //taf::TC_Config
#include "util/tc_thread_rwlock.h"
#include <vector>
#include <string>
#include "PushInterface.h"
#include "PushCache.h"
class CPushCacheThread :public taf::TC_Thread , public taf::TC_Singleton<CPushCacheThread>
{
public:
CPushCacheThread();
~CPushCacheThread();
bool initialize(const taf::TC_Config &conf);
void run();
void terminate();
public:
void pushToCache(const HQExtend::HPushUserMsgReq& oneMsg);
private:
std::queue<HQExtend::HPushUserMsgReq> m_msgQue;
taf::TC_ThreadLock m_ThreadLock;//线程锁
taf::TC_ThreadRWLocker m_rwLock;//读写锁
HQExtend::PushCachePrx m_pushCachePrx;
std::vector<std::string> m_vObjName;
};
#endif
#include "pushCacheThread.h"
#include "CountTimeApp.h" //TELL_TIME_COST_THIS
#include "servant/Application.h" //ServerConfig::LocalIp
#include "servant/taf_logger.h" //FDLOG
#include "Pusher.h"
using namespace std;
CPushCacheThread::CPushCacheThread()
{
FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << std::this_thread::get_id() << endl;
}
CPushCacheThread::~CPushCacheThread()
{
FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|" << std::this_thread::get_id() << endl;
terminate();
}
bool CPushCacheThread::initialize(const taf::TC_Config &conf)
{
bool bRet = false;
try
{
string pushCacheObj = conf.get("/conf/push<pushCache>");
if (!pushCacheObj.empty())
{ //预警历史缓存服务 接口
m_pushCachePrx = Communicator::getInstance()->stringToProxy<HQExtend::PushCachePrx>(pushCacheObj);
FDLOG("debug") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
<< "Push Cache prx init succ " << endl;
bRet = true;
}
return bRet;
}
catch (std::exception &ex)
{
LOG_ERROR << "Exception: " << ex.what() << endl;
}
catch (...)
{
LOG_ERROR << "Unkonwn exception" << endl;
}
return bRet;
}
void CPushCacheThread::run()
{
FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
<< " =========== run START =========== " << std::this_thread::get_id() << endl;
while (_running)
{
try
{
// do nothing...
std::queue<HQExtend::HPushUserMsgReq> tempMsgQue;
{
taf::TC_ThreadWLock wlock(m_rwLock);
tempMsgQue.swap(m_msgQue);
}
while (tempMsgQue.size() > 0)
{
HQExtend::HPushUserMsgReq req = tempMsgQue.front();
HQExtend::PushCachePrxCallbackPtr callback(new PushCacheCallback());
m_pushCachePrx->async_pushUserMsg(callback, req);
tempMsgQue.pop();
}
{
TC_ThreadLock::Lock lock(m_ThreadLock);
m_ThreadLock.timedWait(5000);
}
}
catch (std::exception &ex)
{
LOG_ERROR << "Exception:" << ex.what() << endl;
}
catch (...)
{
LOG_ERROR << "Unkown exception." << endl;
}
}
FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
<< " =========== run END =========== " << std::this_thread::get_id() << endl;
}
void CPushCacheThread::terminate()
{
FDLOG("cast") << __FILE__ << ":" << __LINE__ << ":" << __func__ << "|"
<< " =========== terminate =========== " << std::this_thread::get_id() << endl;
if (_running)
{
_running = false;
try
{
TC_ThreadLock::Lock lock(m_ThreadLock);
m_ThreadLock.notifyAll();
LOG_DEBUG << __FILE__ << " CPushCacheThread::terminate " << endl;
}
catch (std::exception &ex)
{
LOG_ERROR << __FILE__ << "Exception:" << ex.what() << endl;
}
catch (...)
{
LOG_ERROR << __FILE__ << "Unknown exception." << endl;
}
}
}
void CPushCacheThread::pushToCache(const HQExtend::HPushUserMsgReq &oneMsg)
{
taf::TC_ThreadWLock wlock(m_rwLock);
m_msgQue.push(oneMsg);
}
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/274068.html