RabbitmqClientImpl.hpp 5.45 KB
#pragma once
#include "RabbitmqClient.hpp"

#include <spdlog/spdlog.h>
#include <string>
#include <cstring>
#include <vector>
#include "amqp_tcp_socket.h"
#include <memory>


using std::string;
using std::vector;

typedef void (*RABBITMQ_CALLBACK)(void* contex, const char * msg);

enum class error_code_t
{
	UNKNOWN = -1,
	NORMAL = 0,
	CONNECTION_CLOSED,
	CHANNEL_CLOSED,
	RESPONSE_NONE,
	LIBRARY_EXCEPTION,
};

class CRabbitmqClientImpl: virtual public CRabbitmqClient{
public:
    CRabbitmqClientImpl();
    ~CRabbitmqClientImpl();

    virtual int Connect(const string &strHostname, int iPort, const string &strUser, const string &strPasswd, const string &vhost = "/");
    virtual int Disconnect();

    /**
	*   @brief       ExchangeDeclare    声明exchange
	*	@param       [in]               strExchange
	*   @param       [in]               strType
	*   @return 等于0值代表成功创建exchange,小于0代表错误
	*/
    virtual int ExchangeDeclare(const string &strExchange, const string &strType, const bool durable = false);

    /**
	*   @brief       QueueDeclare                     声明消息队列
	*	@param       [in]               strQueueName  消息队列实例
	*   @param
	*   @return 等于0值代表成功创建queue,小于0代表错误
	*/
    virtual int QueueDeclare(const string &strQueueName, const bool durable = false);

    /**
	*   @brief       QueueBind                        将队列,交换机和绑定规则绑定起来形成一个路由表
	*	@param       [in]               strQueueName  消息队列
	*	@param       [in]               strExchange   交换机名称
	*	@param       [in]               strBindKey    路由名称  “msg.#” “msg.weather.**”
    *   @return 等于0值代表成功绑定,小于0代表错误
	*/
    virtual int QueueBind(const string &strQueueName, const string &strExchange, const string &strBindKey);

    /**
	*   @brief       QueueUnbind                      将队列,交换机和绑定规则绑定解除
	*	@param       [in]               strQueueName  消息队列
	*	@param       [in]               strExchange   交换机名称
	*	@param       [in]               strBindKey    路由名称  “msg.#” “msg.weather.**”
    *   @return 等于0值代表成功绑定,小于0代表错误
	*/
    virtual int QueueUnbind(const string &strQueueName, const string &strExchange, const string &strBindKey);

    /**
	*   @brief       QueueDelete                      删除消息队列。
	*	@param       [in]               strQueueName  消息队列名称
	*	@param       [in]               iIfUnused     消息队列是否在用,1 则论是否在用都删除
	*   @return 等于0值代表成功删除queue,小于0代表错误
	*/
    virtual int QueueDelete(const string &strQueueName, int iIfUnused);

    /**
	* @brief Publish  发布消息
	* @param [in] strMessage        消息实体
    * @param [in] strExchange       交换器
	* @param [in] strRoutekey       路由规则
    *   1.Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
    *   2.Fanout Exchange – 不处理路由键。将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
	*   3.Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
    *      因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
	* @return 等于0值代表成功发送消息实体,小于0代表发送错误
	*/
    virtual int Publish(const string &strMessage, const string &strExchange, const string &strRoutekey);

    /**
	* @brief consumer  消费消息
	* @param [in]  strQueueName         队列名称
	* @param [out] message_array        获取的消息实体
    * @param [int] GetNum               需要取得的消息个数
	* @param [int] timeout              取得的消息是延迟,若为NULL,表示持续取,无延迟,阻塞状态
	* @return 等于0值代表成功,小于0代表错误,错误信息从ErrorReturn返回
	*/
    virtual int Consumer_limit(const string &strQueueName, vector<string> &message_array, int GetNum = 1, struct timeval *timeout = NULL);

        /**
	* @brief consumer  消费消息
	* @param [in]  strQueueName         队列名称
	* @param [out] message_array        获取的消息实体
    * @param [int] GetNum               需要取得的消息个数
	* @param [int] timeout              取得的消息是延迟,若为NULL,表示持续取,无延迟,阻塞状态
	* @return 等于0值代表成功,小于0代表错误,错误信息从ErrorReturn返回
	*/
    virtual int Consumer(const string &strQueueName, RABBITMQ_CALLBACK callback, void * contex, struct timeval *timeout = NULL);


    CRabbitmqClientImpl(const CRabbitmqClientImpl &lh) = delete;
    void operator=(const CRabbitmqClientImpl &lh) = delete;

private:
    error_code_t ErrorMsg(amqp_rpc_reply_t x, char const *context);

	error_code_t Reconnection();

    string                      m_strHostname;      // amqp主机
    int                         m_iPort;            // amqp端口
    string					    m_strUser;
    string					    m_strPasswd;
	string 						m_strVHost;
    int                         m_iChannel;
	int 						m_max_reconnection;

	amqp_socket_t               *m_pSock;
    amqp_connection_state_t     m_pConn;

};