ZMQ第三方通讯库封装与使用

  ZeroMQ(简称ZMQ)是一个基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。ZMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间。ZMQ不是单独的服务,而是一个嵌入式库,它封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信。
  手动编译ZMQ的教程可以参考ZMQ第三方通讯库的编译与使用
  为了使用方便,我们特地将ZMQ(version 4.2.5)封装成类,作为一个常用工具使用,暂时用到了“订阅-发布模式”和“请求-服务模式”。在此,将封装结果简单总结如下。

发布-订阅模式封装

  发布-订阅模式氛围发布者和订阅者,发布者向某个url上发布消息,而订阅者则直接订阅某个url上的消息,从而实现进程间的数据交换。

发布者封装

  发布者的构造函数需要创建上下文环境和相应的socket,并绑定到对应的url上,以发送数据。
  发布者声明ZmqSendUtil.h

#ifndef ZMQSENDUTIL_H
#define ZMQSENDUTIL_H

#include <string>
#include <cpp/zmq.hpp>
class ZmqSendUtil{
private:
    void *context;
    void *publisher;
    std::string url;

public:
    ZmqSendUtil();                              //默认构造方法
    ZmqSendUtil(void *context, void *publisher, const std::string &url);    //初始化构造方法
    virtual ~ZmqSendUtil();                     //析构函数
    int sendMsg(const std::string& info);       //发送消息
};

#endif

  发布者实现ZmqSendUtil.cpp

#include "ZmqSendUtil.h"
ZmqSendUtil::ZmqSendUtil() {}
ZmqSendUtil::ZmqSendUtil(void *context, void *publisher, const std::string &url) 
                        :context(context),publisher(publisher), url(url){
    //初始化上下文
    if (this->context == nullptr){
        this->context = zmq_ctx_new();
        assert(this->context != nullptr);
    }

    //获取socket对象
    if (this->publisher == nullptr){
        this->publisher = zmq_socket(this->context, ZMQ_PUB);
        assert(this->publisher != nullptr);
    }

    //socket绑定通信地址
    int result = zmq_bind(this->publisher, this->url.c_str());
    assert(result == 0);
}

ZmqSendUtil::~ZmqSendUtil() {
    zmq_close(this->publisher);
    zmq_ctx_destroy(this->context);
}

int ZmqSendUtil::sendMsg(const std::string& info) {
    int result=-1;
    if (this->context != nullptr && this->publisher != nullptr){
        result = zmq_send(this->publisher, info.c_str(), info.length(), 0);
    return result > 0;
}

订阅者封装

  订阅者的构造函数需要创建上下文环境和相应的socket,并链接到对应的url上,以接收数据。
  订阅者声明ZmqRecvUtil.h

#ifndef USV_ZMQRECVUTIL_H
#define USV_ZMQRECVUTIL_H

#include <string>
#include <cpp/zmq.hpp>
class ZmqRecvUtil{
private:
    void *context;
    void *subscriber;
    std::string url;

public:
    ZmqRecvUtil();              //默认构造方法
    ZmqRecvUtil(void *context, void *subscriber, const std::string &url);   //初始化构造方法
    virtual ~ZmqRecvUtil();     //析构函数
    std::string recvMsg();      //接收消息
};

#endif

  订阅者实现ZmqRecvUtil.cpp

#include "ZmqRecvUtil.h"

ZmqRecvUtil::ZmqRecvUtil() {}

ZmqRecvUtil::ZmqRecvUtil(void *context, void *subscriber, const std::string &url) 
                        :context(context),subscriber(subscriber),url(url){
    //初始化上下文context
    if (this->context == nullptr){
        this->context = zmq_ctx_new();  //如果上下文是空的,那么就初始化上下文
        assert(this->context != nullptr);
    }

    //获取socket对象
    if (this->subscriber == nullptr){   //模式为订阅发布模式 发布端单向发布数据也不管订阅端是否能接收...
        this->subscriber = zmq_socket(this->context, ZMQ_SUB); //如果订阅者是空的句柄那就创建一个不透明的套接字...
        assert(this->subscriber != nullptr);
    }

    //socket绑定通信地址  绑定特定的ip和端口号获取数据....
    int result = zmq_connect(this->subscriber, this->url.c_str());
    assert(result == 0);
    /*
     int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);
     注意:除了以下属性,所有的属性均需要在对socket进行bind/connect操作之前设置:
     ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, ZMQ_LINGER, ZMQ_ROUTER_HANDOVER,
     ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE, ZMQ_REQ_CORRELATE, and ZMQ_REQ_RELAXED
     特别的,安全的属性也可以在bind/connect操作之后生效,并且可以随时进行修改并影响之后的bind/connect操作。*/
    //设置socket可选项,其中“”参数为过滤器,用于过滤获取信息,这里默认全部接收
    result = zmq_setsockopt(this->subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(result == 0);
}

ZmqRecvUtil::~ZmqRecvUtil() {
    zmq_close(this->subscriber);
    zmq_ctx_destroy(this->context);
}

std::string ZmqRecvUtil::recvMsg() {
    //读取消息
    char infoBuffer[MAX_INFO_SIZE] = {0};
    int ret = zmq_recv(this->subscriber, infoBuffer, MAX_INFO_SIZE , 0);
    assert(ret != -1);
    std::string tmp="";                     //返回值不能使用infoBuffer强化类型转换,要重新构建字符串
    for (int i = 0; i < ret; ++i) {
        tmp+=infoBuffer[i];
    }
    return tmp;
}

注意事项

  在使用时发现,如果封装在类的内部或者再子线程中使用,请使用指针,在主线程中声明并定义了对应的发布者和订阅者之后再使用指针传入类或者线程中,否则会导致奇奇怪怪的错误。

请求-服务模式封装

  请求-服务模式分为服务端和客户端,服务端向客户端发送请求,服务端收到请求后形成相应的数据,然后发送给客户端。不同于发布-订阅模式,请求服务模式有先后顺序。客户端和服务端需要使用ZMQ的语法创建相应的资源,并使用语法规则执行接收和发送相关内容。

客户端封装

  客户端的构造函数需要创建上下文环境和相应的socket,并链接到对应的url上。然后是请求过程,需要指定请求的服务名称,最后接收服务端发回的数据。其声明和实现如下所示:
  客户端声明ZmqClientUtil.h

//
// Created by songjiahao on 2021/11/29.
//
#ifndef ZMQCLIENTUTIL_H
#define ZMQCLIENTUTIL_H

#include <string>
#include <cpp/zmq.hpp>
class ZmqClientUtil {
private:
    void *context;
    void *requester;
    std::string url;

public:
    std::string receive_content_string;

public:
    ZmqClientUtil();                //默认构造方法
    ZmqClientUtil(void *context, void *requester, const std::string &url);   //初始化构造方法
    virtual ~ZmqClientUtil();       //析构函数

public:
    void Request(const std::string &ServiceName);

    void client_recv_data();
};

#endif

  客户端实现ZmqClientUtil.cpp

//
// Created by songjiahao on 2021/11/29.
//

#include "ZmqClientUtil.h"
ZmqClientUtil::ZmqClientUtil() {

}
ZmqClientUtil::ZmqClientUtil(void *context, void *requester, const std::string &url)
                            :context(context),requester(requester),url(url) {
    //初始化上下文context
    if (this->context == nullptr){
        this->context = zmq_ctx_new();
        assert(this->context != nullptr);
    }

    //获取socket对象
    if (this->requester == nullptr){                            //模式为请求响应
        this->requester = zmq_socket(this->context, ZMQ_REQ);   //如果响应者是空的句柄那就创建一个不透明的套接字...
        assert(this->requester != nullptr);
    }

    //socket绑 定通信地址  绑定特定的ip和端口号获取数据....
    //zmq_bind (responder, "tcp://*:5555");
    int result = zmq_connect(this->requester, this->url.c_str());
    assert(result==0);
}

ZmqClientUtil::~ZmqClientUtil() {
    zmq_close (requester);
    zmq_ctx_destroy (context);
}

void ZmqClientUtil::Request(const std::string & ServiceName) {
    int ret=zmq_send(this->requester,ServiceName.c_str(),ServiceName.size(),0);
    assert(ret==ServiceName.size());
    client_recv_data();
}

void ZmqClientUtil::client_recv_data() {
    const int MAXLEN=1024;
    char buffer[MAXLEN]= {};
    int ret=zmq_recv(this->requester,buffer,MAXLEN,0);
    assert(ret!=-1);
    std::string result;
    for(int i=0;i<ret;i++)
        result.push_back(buffer[i]);
    receive_content_string=result;
}

服务端封装

  服务端也需要创建上下文环境和socket,并绑定到相应的url上。根据接收的请求不同,形成不同的数据以相应客户端的请求。
  服务端声明ZmqServerUtil.cpp

//
// Created by songjiahao on 2021/11/29.
//

#ifndef ZMQSERVERUTIL_H
#define ZMQSERVERUTIL_H

#include <string>
#include <cpp/zmq.hpp>
class ZmqServerUtil {
private:
    void *context;
    void *responder;
    std::string url;

public:
    ZmqServerUtil();  //默认构造方法
    ZmqServerUtil(void *context, void *responder, const std::string &url);   //初始化构造方法
    virtual ~ZmqServerUtil();     //析构函数

public:
    void Service(const std::string &ServiceName, std::string (*ServieceContentFunction)());
};

#endif

  服务端声明ZmqServerUtil.cpp

//
// Created by songjiahao on 2021/11/29.
//
#include "ZmqServerUtil.h"
ZmqServerUtil::ZmqServerUtil() {}

ZmqServerUtil::ZmqServerUtil(void *context, void *responder, const std::string &url)
                            :context(context),responder(responder),url(url) {
    //初始化上下文context
    if (this->context == nullptr){
        this->context = zmq_ctx_new();
        assert(this->context != nullptr);
    }

    //获取socket对象
    if (this->responder == nullptr){                            //模式为请求响应
        this->responder = zmq_socket(this->context, ZMQ_REP);   //如果响应者是空的句柄那就创建一个不透明的套接字...
        assert(this->responder != nullptr);
    }

    //socket绑定通信地址  绑定特定的ip和端口号获取数据....
    //zmq_bind (responder, "tcp://*:5555");
    int result = zmq_bind (this->responder, this->url.c_str());
    assert(result == 0);
}

ZmqServerUtil::~ZmqServerUtil() {
    zmq_close (responder);
    zmq_ctx_destroy (context);
}
//参数为服务名称和生成服务内容的函数指针
void ZmqServerUtil::Service(const std::string & ServiceName, std::string (*ServieceContentFunction)()) {
    bool start=true;
    const int MAXLEN = 1024;
    while(start) {
        char buffer[MAXLEN] = {};                       //用于接收请求服务的名称
        int ret = zmq_recv(this->responder, buffer, MAXLEN, 0);
        assert(ret != -1);
        std::string quaryName;                          //将请求服务的名称转换为string,为避免截断,逐字符处理
        for (int i = 0; i < ret; i++)
            quaryName.push_back(buffer[i]);
        Sleep(10);
        if (quaryName == ServiceName) {                 //根据请求服务的名声回复内容
            std::string ServieceContent=ServieceContentFunction();
            ret = zmq_send(this->responder, ServieceContent.c_str(), ServieceContent.size(), 0);
            assert(ret = ServieceContent.size());
        }
    }
}

使用测试

  这里为了测试在多线程中是否可用,我们将其封装在一个类中使用多线程进行测试,其中客户端和服务端分别在一个线程中进行处理,服务端每次发送带有编号的Hello World!作为回应信息。

#include <bits/stdc++.h>
#include "ZmqClientUtil.h"
#include "ZmqServerUtil.h"
using namespace std;
class TestClass{
private:
    whu::ZmqClientUtil *client;             //客户端指针
    whu::ZmqServerUtil *server;             //服务端指针
    std::thread* client_thread_;            //客户端线程
    std::thread* server_thread_;            //服务端线程
public:
    TestClass(){
        client=new whu::ZmqClientUtil(nullptr, nullptr,"tcp://127.0.0.1:8888");
        server=new whu::ZmqServerUtil(nullptr, nullptr,"tcp://127.0.0.1:8888");
    }
    ~TestClass(){
        delete client;
        delete server;
        delete client_thread_;
        delete server_thread_;
    }
    void CreatThread(){                     //创建线程
        client_thread_=new std::thread(&TestClass::Request,this);
        server_thread_=new std::thread(&TestClass::Service,this);
    }
    void JoinAll(){
        client_thread_->join();
        server_thread_->join();
    }
    void Request(){                         //客户端请求函数
        int i=1;
        string ServiceName="Say Hello";
        while(true){
            cout<<"Client Request "<<i<<" for "<<ServiceName<<endl;
            client->Request(ServiceName);
            cout<<"Client Received "<<i++<<" Msg : "<<client->receive_content_string<<endl;
            cout<<endl;
            Sleep(2000);                    //每隔2秒发送一次请求
        }
    }
    static std::string CreatString(){       //生成服务内容的函数
        static int i=1;
        return to_string(i++)+" Hello World!";
    }
    void Service(){
        int i=1;
        string ServiceName="Say Hello";
        while(true){
            server->Service(ServiceName,CreatString);
        }
    }
};
int main(){
    TestClass test;
    test.CreatThread();
    test.JoinAll();
    return 0;
}

  测试效果如下:

Client Request 1 for Say Hello
Client Received 1 Msg : 1 Hello World!

Client Request 2 for Say Hello
Client Received 2 Msg : 2 Hello World!

Client Request 3 for Say Hello
Client Received 3 Msg : 3 Hello World!

Client Request 4 for Say Hello
Client Received 4 Msg : 4 Hello World!

Client Request 5 for Say Hello
Client Received 5 Msg : 5 Hello World!

Client Request 6 for Say Hello
Client Received 6 Msg : 6 Hello World!

注意事项

  使用过程中发现,如果直接运行不会出现问题,如果再debug模式下,跳入构造函数的zmq_socket位置则会出现卡死,原因暂时未知,猜测可能与ZMQ自身内部实现有关。


当珍惜每一片时光~