使用thrift实现订阅服务和发布服务详解编程语言

服务:订阅服务 market_subscriber 和 发布服务 market_publisher

功能:market_subscriber 能够向 market_publisher 请求订阅某些类型的消息,当 market_publisher 有该类型的消息时,需要把它推送给订阅服务。

流程:1. market_publisher 服务启动,监听端口,注册事件处理函数,处理订阅请求消息。

         2. market_subscriber 服务启动,监听端口,注册事件处理函数,处理接收推动来的消息。

         3. market_subscriber 向 market_publisher 发起订阅请求,market_publisher 根据订阅请求参数,长连接 market_subscriber 提供的消息接收端口。

         4. market_publisher 通过长连接向 market_subscriber 推送消息。

注意:1. market_publisher 到 market_subscriber 的长连接的维护:

            (1)market_subscriber 一定时间内未收到 market_publisher 的推送消息,尝试重新发起订阅请求。

            (2)market_publisher 推送订阅消息时,发现连接断开,尝试重连。 考虑 market_publisher 有重启的情况,收到的订阅请求参数需要做持久化。 ==> TODO:增加一个 market_subscriber 到 market_publisher 的取消订阅的请求。

实现:

1. market_subscriber.thrift  — market_subscriber 实现的服务接口

namespace cpp market_subscriber

namespace java market_subscriber

namespace perl market_subscriber

namespace php market_subscriber

struct Snapshot

{

    1: i32 nSecurityID;              ///< 证券ID

    2: i32 nTime;                    ///< 序号/时间/日期 HHMMSSmmm

    3: i32 nTradingPhaseCode;        ///< 0:开市前  1:开盘集合竞价 2:连续竞价 3:临时停盘 4:收盘集合竞价 5:集中竞价闭市  6:协议转让结束  7:闭市

    4: i32 nPreClosePx;              ///< 昨收价 * 10000

    5: i32 nOpenPx;                  ///< 开盘价 ..

    6: i32 nHighPx;                  ///< 最高价 ..

    7: i32 nLowPx;                   ///< 最低价 ..

    8: i32 nLastPx;                  ///< 最新价 ..

    9: i64 llTradeNum;               ///< 成交笔数

    10: i64 llVolume;                 ///< 成交量

    11: i64 llValue;                  ///< 成交额(*10000)

}   

service SubscriberService

{

    // 推送消息

    // Oneway means the client only makes request and does not listen for any response at all. Oneway methods must be void.

    oneway void sendSnapshot(1:list<Snapshot> lstSnapshot);

}

2. market_publisher.thrift — market_publisher 实现的服务接口

namespace cpp market_publisher

namespace java market_publisher

namespace perl market_publisher

namespace php market_publisher

struct SubscribeMarketParam

{

    1: string user_name;

    2: string password;

    3: i32 type; // 订阅类型

    4: string ip; // 接收推送数据的ip

    5: i16 port; // 接收推送数据的port

}

struct SubscribeMarketAck

{

    1: required i32 error_code; // 0,成功; 其它,失败

    2: optional string error_info;

}

struct GetStockBaseInfoParam

{

    1: required i32 stock_code;

}

struct GetStockBaseInfoAck

{

    1: required i32 error_code;

    2: optional string error_info;

    3: optional string stock_name;

}

service PublisherService

{

    // 订阅请求:订阅行情信息

    SubscribeMarketAck subscribeMarket(1:SubscribeMarketParam param);

    GetStockBaseInfoAck getStockBaseInfo(1:GetStockBaseInfoParam param);

}

3. subscriber_server.cpp

/*

 * Main.cpp

 */

#include “../gen-cpp/SubscriberService.h”

#include “../gen-cpp/PublisherService.h”

#include <thrift/protocol/TBinaryProtocol.h>

#include <thrift/server/TSimpleServer.h>

#include <thrift/transport/TSocket.h>

#include <thrift/transport/TServerSocket.h>

#include <thrift/transport/TBufferTransports.h>

#include <boost/thread/thread.hpp>

using namespace ::apache::thrift;

using namespace ::apache::thrift::protocol;

using namespace ::apache::thrift::transport;

using namespace ::apache::thrift::server;

using boost::shared_ptr;

using namespace  ::market_subscriber;

using namespace  ::market_publisher;

class SubscriberServiceHandler : virtual public SubscriberServiceIf {

 public:

  SubscriberServiceHandler() {

    // Your initialization goes here

  }

  void sendSnapshot(const std::vector<Snapshot> & lstSnapshot) {

    // Your implementation goes here

    printf(“sendSnapshot/n”);

    std::cout << “Received snapshots’ number: ” << lstSnapshot.size() << std::endl;

    for (std::vector<Snapshot>::const_iterator iter = lstSnapshot.begin();

            iter != lstSnapshot.end(); iter++)

    {

        std::cout << “nSecurityID: ” << iter->nSecurityID << “/t”;

        std::cout << “nTime: ” << iter->nTime << “/t”;

        std::cout << std::endl;

    }

    sleep(10);

  }

};

const char* CLIENT_LISTNE_IP = “127.0.0.1”;

const short CLIENT_LISTNE_PORT = 9060;

void subscriberServiceThread()

{

    shared_ptr<SubscriberServiceHandler> handler(new SubscriberServiceHandler());

    shared_ptr<TProcessor> processor(new SubscriberServiceProcessor(handler));

    shared_ptr<TServerTransport> serverTransport(new TServerSocket(CLIENT_LISTNE_PORT));

    shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());

    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

    TSimpleServer server(processor, serverTransport, transportFactory,

            protocolFactory);

    server.serve();

}

int main(int argc, char **argv)

{

    boost::thread thrd(&subscriberServiceThread);

    // wait for subscriberServiceThread ready

    sleep(3);

    boost::shared_ptr<TSocket> socket(new TSocket(“127.0.0.1”, 9090));

    boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));

    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));

    PublisherServiceClient client(protocol);

    try

    {

        transport->open();

        SubscribeMarketAck ack;

        SubscribeMarketParam param;

        param.__set_user_name(“mazhan”);

        param.__set_password(“123456”);

        param.__set_type(0);

        param.__set_ip(CLIENT_LISTNE_IP);

        param.__set_port(CLIENT_LISTNE_PORT);

        client.subscribeMarket(ack, param);

        std::cout << “subscribeMarket(), error code: ” << ack.error_code << std::endl;

        transport->close();

    }

    catch (TException& tx)

    {

        std::cout << “ERROR: ” << tx.what() << std::endl;

    }

    thrd.join();

    return 0;

}

4. pubsher_server.cpp

/*

 * Main.cpp

 */

#include “../gen-cpp/SubscriberService.h”

#include “../gen-cpp/PublisherService.h”

#include <thrift/protocol/TBinaryProtocol.h>

#include <thrift/server/TSimpleServer.h>

#include <thrift/transport/TSocket.h>

#include <thrift/transport/TServerSocket.h>

#include <thrift/transport/TBufferTransports.h>

#include <thrift/concurrency/ThreadManager.h>

#include <boost/thread/thread.hpp>

using namespace ::apache::thrift;

using namespace ::apache::thrift::protocol;

using namespace ::apache::thrift::transport;

using namespace ::apache::thrift::server;

using namespace ::apache::thrift::concurrency;

using boost::shared_ptr;

using namespace  ::market_publisher;

using namespace  ::market_subscriber;

std::list<boost::shared_ptr<SubscriberServiceClient> > g_lstSubscriberServiceClient;

boost::mutex g_mutexSubscriberServiceClient;

class PublisherServiceHandler : virtual public PublisherServiceIf {

 public:

  PublisherServiceHandler() {

    // Your initialization goes here

  }

  void subscribeMarket(SubscribeMarketAck& _return, const SubscribeMarketParam& param) {

    // Your implementation goes here

    std::cout << “subscribeMarket, ip=” << param.ip << “, port=” << param.port << “.” << std::endl;

    boost::shared_ptr<TSocket> socket(new TSocket(param.ip, param.port));

    boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));

    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));

    boost::shared_ptr<SubscriberServiceClient> client(new SubscriberServiceClient(protocol));

    int error_code = 1; // fail to open

    try

    {

        transport->open();

        error_code = 0;

        { // add to subscribes list

            boost::mutex::scoped_lock lock(g_mutexSubscriberServiceClient);

            g_lstSubscriberServiceClient.push_back(client);

        }

    }

    catch (TException& e)

    {

        std::cout << “Exception: ” << e.what() << std::endl;

        _return.__set_error_info(e.what());

    }

    catch (std::exception& e)

    {

        std::cout << “Exception: ” << e.what() << std::endl;

        _return.__set_error_info(e.what());

    }

    catch (
使用thrift实现订阅服务和发布服务详解编程语言)

    {

        char buff[100];

        snprintf(buff, 99, “fail to open %s:%d.”, param.ip.c_str(), param.port);

        std::cout << “Exception: ” << buff << std::endl;

        _return.__set_error_info(buff);

    }

    _return.__set_error_code(error_code);

  }

  void getStockBaseInfo(GetStockBaseInfoAck& _return, const GetStockBaseInfoParam& param) {

    // Your implementation goes here

    printf(“getStockBaseInfo/n”);

  }

};

int32_t getCurTime()

{

    time_t t = time(0);

    char tmp[64];

    strftime(tmp, sizeof(tmp), “%H%M%S”, localtime(&t));

    return atoi(tmp);

}

// send markets to subscribers.

void publisherServiceThread()

{

    while (1)

    {

        std::vector<Snapshot> lstSnapshot;

        Snapshot snapshot;

        snapshot.nSecurityID = 100000001;

        snapshot.nTime = getCurTime() * 1000 + rand() % 1000;

        snapshot.nTradingPhaseCode = 2;

        snapshot.nPreClosePx = 240500;

        snapshot.nOpenPx = 250500;

        snapshot.nHighPx = 250800;

        snapshot.nLowPx = 240800;

        snapshot.nLastPx = 250300;

        snapshot.llTradeNum = 15000;

        snapshot.llVolume = 6000000;

        snapshot.llValue = 15030000000;

        lstSnapshot.push_back(snapshot);

        boost::mutex::scoped_lock lock(g_mutexSubscriberServiceClient);

        std::list<boost::shared_ptr<SubscriberServiceClient> >::iterator iter = g_lstSubscriberServiceClient.begin();

        while (iter != g_lstSubscriberServiceClient.end())

        {

            try

            {

                (*iter)->sendSnapshot(lstSnapshot);

                iter++;

            }

            catch (TException& e)

            {

                std::cout << “Exception: ” << e.what() << std::endl;

                iter = g_lstSubscriberServiceClient.erase(iter);

            }

            catch (std::exception& e)

            {

                std::cout << “Exception: ” << e.what() << std::endl;

                iter = g_lstSubscriberServiceClient.erase(iter);

            }

            catch (
使用thrift实现订阅服务和发布服务详解编程语言)

            {

                std::cout << “Exception: fail to call sendSnapshot().” << std::endl;

                iter = g_lstSubscriberServiceClient.erase(iter);

            }

        }

        sleep(3);

    }

}

int main(int argc, char **argv)

{

    int port = 9090;

    shared_ptr<PublisherServiceHandler> handler(new PublisherServiceHandler());

    shared_ptr<TProcessor> processor(new PublisherServiceProcessor(handler));

    shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));

    shared_ptr<TTransportFactory> transportFactory(

            new TBufferedTransportFactory());

    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

    TSimpleServer server(processor, serverTransport, transportFactory,

            protocolFactory);

    boost::thread thrd(&publisherServiceThread);

    printf(“Starting the server
使用thrift实现订阅服务和发布服务详解编程语言/n”);

    server.serve();

    printf(“done./n”);

    return 0;

}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/18486.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论