#include "RabbitmqClientImpl.hpp" #include CRabbitmqClientImpl::CRabbitmqClientImpl() : m_strHostname("") , m_iPort(0) , m_strUser("") , m_strPasswd("") , m_iChannel(1) //默认用1号通道,通道无所谓 , m_pSock(NULL) , m_pConn(NULL) , m_max_reconnection(5) { } CRabbitmqClientImpl::~CRabbitmqClientImpl() { if (NULL != m_pConn) { Disconnect(); m_pConn = NULL; } } int CRabbitmqClientImpl::Connect(const string &strHostname, int iPort, const string &strUser, const string &strPasswd, const string &vhost) { m_strHostname = strHostname; m_iPort = iPort; m_strUser = strUser; m_strPasswd = strPasswd; m_strVHost = vhost; m_pConn = amqp_new_connection(); if (NULL == m_pConn) { spdlog::error("amqp new connection failed"); return -1; } m_pSock = amqp_tcp_socket_new(m_pConn); if (NULL == m_pSock) { spdlog::error("amqp tcp new socket failed"); return -2; } if (0 > amqp_socket_open(m_pSock, m_strHostname.c_str(), m_iPort)) { spdlog::error("amqp socket open failed"); return -3; } error_code_t err = ErrorMsg(amqp_login(m_pConn, m_strVHost.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, m_strUser.c_str(), m_strPasswd.c_str()), "Logging in"); if (error_code_t::NORMAL != err) return -4; return 0; } int CRabbitmqClientImpl::Disconnect() { if (NULL != m_pConn) { if (error_code_t::NORMAL != ErrorMsg(amqp_connection_close(m_pConn, AMQP_REPLY_SUCCESS), "Closing connection")) return -1; int ret_code = 0; if (0 > (ret_code = amqp_destroy_connection(m_pConn))) { fprintf(stderr, "%s: %s\n", "destory connection", amqp_error_string2(ret_code)); return -2; } m_pConn = NULL; } return 0; } error_code_t CRabbitmqClientImpl::Reconnection() { Disconnect(); Connect(m_strHostname, m_iPort, m_strUser, m_strPasswd, m_strVHost); } int CRabbitmqClientImpl::ExchangeDeclare(const string &strExchange, const string &strType, const bool durable) { amqp_channel_open(m_pConn, m_iChannel); amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str()); amqp_bytes_t _type = amqp_cstring_bytes(strType.c_str()); int _passive= 0; amqp_exchange_declare(m_pConn, m_iChannel, _exchange, _type, _passive, durable, 0, 0, amqp_empty_table); if (error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "exchange_declare")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -1; } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0; } int CRabbitmqClientImpl::QueueDeclare(const string &strQueueName, const bool durable) { if(NULL == m_pConn) { spdlog::error("QueueDeclare m_pConn is null"); return -1; } amqp_channel_open(m_pConn, m_iChannel); amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str()); int32_t _passive = 0; // int32_t _durable = 0; int32_t _exclusive = 0; //int32_t _auto_delete = 1; int32_t _auto_delete = 0; amqp_queue_declare(m_pConn, m_iChannel, _queue, _passive, durable, _exclusive, _auto_delete, amqp_empty_table); if (error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_declare")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -1; } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0; } int CRabbitmqClientImpl::QueueBind(const string &strQueueName, const string &strExchange, const string &strBindKey) { if(NULL == m_pConn) { spdlog::error("QueueBind m_pConn is null"); return -1; } amqp_channel_open(m_pConn, m_iChannel); amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str()); amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str()); amqp_bytes_t _routkey = amqp_cstring_bytes(strBindKey.c_str()); amqp_queue_bind(m_pConn, m_iChannel, _queue, _exchange, _routkey, amqp_empty_table); if(error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_bind")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -1; } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0; } int CRabbitmqClientImpl::QueueUnbind(const string &strQueueName, const string &strExchange, const string &strBindKey) { if(NULL == m_pConn) { spdlog::error("QueueUnbind m_pConn is null"); return -1; } amqp_channel_open(m_pConn, m_iChannel); amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str()); amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str()); amqp_bytes_t _routkey = amqp_cstring_bytes(strBindKey.c_str()); amqp_queue_unbind(m_pConn, m_iChannel, _queue, _exchange, _routkey, amqp_empty_table); if(error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_unbind")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -1; } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0; } int CRabbitmqClientImpl::QueueDelete(const string &strQueueName, int iIfUnused) { if(NULL == m_pConn) { spdlog::error("QueueDelete m_pConn is null"); return -1; } amqp_channel_open(m_pConn, m_iChannel); if(error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } amqp_queue_delete(m_pConn, m_iChannel, amqp_cstring_bytes(strQueueName.c_str()), iIfUnused, 0); if(error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "delete queue")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0; } int CRabbitmqClientImpl::Publish(const string &strMessage, const string &strExchange, const string &strRoutekey) { if (NULL == m_pConn) { spdlog::error("publish m_pConn is null, publish failed"); return -1; } #if 1 for (int i = 0; i < m_max_reconnection; ++i) { amqp_channel_open(m_pConn, m_iChannel); if (error_code_t::NORMAL == ErrorMsg(amqp_get_rpc_reply(m_pConn), "publish open channel")) goto _continue; spdlog::error("Publish try {:d} reconnection", i+1); amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); Reconnection(); } return -2; _continue: #else amqp_channel_open(m_pConn, m_iChannel); if (error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "publish open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } #endif amqp_bytes_t message_bytes; message_bytes.len = strMessage.length(); message_bytes.bytes = (void *)(strMessage.c_str()); /* amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes(m_type.c_str()); props.delivery_mode = m_durable; // persistent delivery mode */ amqp_bytes_t exchange = amqp_cstring_bytes(strExchange.c_str()); amqp_bytes_t routekey = amqp_cstring_bytes(strRoutekey.c_str()); if (0 != amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey, 0, 0, NULL, message_bytes)) { spdlog::error("publish amqp_basic_publish failed"); if (error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "amqp_basic_publish")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } } amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return 0; } int CRabbitmqClientImpl::Consumer(const string &strQueueName, RABBITMQ_CALLBACK callback, void *contex, struct timeval *timeout) { if (NULL == m_pConn) { spdlog::error("Consumer m_pConn is null, Consumer failed"); return -1; } #if 0 for (int i = 0; i < m_max_connection; ++i) { amqp_channel_open(m_pConn, m_iChannel); if (0 == ErrorMsg(amqp_get_rpc_reply(m_pConn), "consumer open channel")) goto _continue; spdlog::error("Consumer try {:d} reconnection", i+1); amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); Disconnect(); Connect(m_strHostname, m_iPort, m_strUser, m_strPasswd, m_strVHost); } return -2; _continue: #else amqp_channel_open(m_pConn, m_iChannel); if (error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "Consumer open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } #endif int GetNum = 1; amqp_basic_qos(m_pConn, m_iChannel, 0, GetNum, 0); int ack = 1; // no_ack 是否需要确认消息后再从队列中删除消息 amqp_bytes_t queuename= amqp_cstring_bytes(strQueueName.c_str()); amqp_basic_consume(m_pConn, m_iChannel, queuename, amqp_empty_bytes, 0, ack, 0, amqp_empty_table); if (error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "Consuming")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } int hasget = 0; amqp_rpc_reply_t res; amqp_envelope_t envelope; while (1) { amqp_maybe_release_buffers(m_pConn); res = amqp_consume_message(m_pConn, &envelope, timeout, 0); if (AMQP_RESPONSE_NORMAL != res.reply_type) { spdlog::error("Consumer amqp_channel_close failed then continue"); #if 1 amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); if (0 == hasget) return -res.reply_type; else return 0; #endif } char *temp = new char[envelope.message.body.len + 1](); memcpy(temp, (char *)envelope.message.body.bytes, sizeof(char)*envelope.message.body.len); amqp_destroy_envelope(&envelope); callback(contex, temp); hasget++; usleep(1); } return 0; } int CRabbitmqClientImpl::Consumer_limit(const string &strQueueName, vector &message_array, int GetNum, struct timeval *timeout) { if (NULL == m_pConn) { spdlog::error("Consumer m_pConn is null, Consumer failed"); return -1; } #if 0 for (int i = 0; i < m_max_connection; ++i) { amqp_channel_open(m_pConn, m_iChannel); if (0 == ErrorMsg(amqp_get_rpc_reply(m_pConn), "consumer open channel")) goto _continue; spdlog::error("Consumer_limit try {:d} reconnection", i+1); amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); Disconnect(); Connect(m_strHostname, m_iPort, m_strUser, m_strPasswd, m_strVHost); } return -2; _continue: #else amqp_channel_open(m_pConn, m_iChannel); if (error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "Consumer_limit open channel")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } #endif amqp_basic_qos(m_pConn, m_iChannel, 0, GetNum, 0); int ack = 1; // no_ack 是否需要确认消息后再从队列中删除消息 amqp_bytes_t queuename= amqp_cstring_bytes(strQueueName.c_str()); amqp_basic_consume(m_pConn, m_iChannel, queuename, amqp_empty_bytes, 0, ack, 0, amqp_empty_table); if (error_code_t::NORMAL != ErrorMsg(amqp_get_rpc_reply(m_pConn), "Consuming")) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } int hasget = 0; amqp_rpc_reply_t res; amqp_envelope_t envelope; while (GetNum > 0) { amqp_maybe_release_buffers(m_pConn); res = amqp_consume_message(m_pConn, &envelope, timeout, 0); if (AMQP_RESPONSE_NORMAL != res.reply_type) { spdlog::error("Consumer amqp_channel_close failed then continue"); #if 1 amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); if (0 == hasget) return -res.reply_type; else return 0; #endif } string str((char *)envelope.message.body.bytes, (char *)envelope.message.body.bytes + envelope.message.body.len); message_array.push_back(str); int rtn = amqp_basic_ack(m_pConn, m_iChannel, envelope.delivery_tag, 1); amqp_destroy_envelope(&envelope); if (rtn != 0) { amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS); return -4; } GetNum--; hasget++; usleep(1); } return 0; } error_code_t CRabbitmqClientImpl::ErrorMsg(amqp_rpc_reply_t x, char const *context) { switch (x.reply_type) { case AMQP_RESPONSE_NORMAL: return error_code_t::NORMAL; case AMQP_RESPONSE_NONE: spdlog::error("{:s}: missing RPC reply type!", context); return error_code_t::RESPONSE_NONE; case AMQP_RESPONSE_LIBRARY_EXCEPTION: spdlog::error("{:s}: {:s}", context, amqp_error_string2(x.library_error)); return error_code_t::LIBRARY_EXCEPTION; case AMQP_RESPONSE_SERVER_EXCEPTION: switch (x.reply.id) { case AMQP_CONNECTION_CLOSE_METHOD: { amqp_connection_close_t *m = (amqp_connection_close_t *)x.reply.decoded; std::printf("%s: server connection error %uh, message: %.*s", context, m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes); return error_code_t::CONNECTION_CLOSED; } case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded; std::printf("%s: server channel error %uh, message: %.*s", context, m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes); return error_code_t::CHANNEL_CLOSED; } default: std::printf("%s: unknown server error, method id 0x%08X", context, x.reply.id); return error_code_t::UNKNOWN; } } return error_code_t::NORMAL; } CRabbitmqClient* CRabbitmqClient::get_instance() { CRabbitmqClient* instance_ptr = nullptr; if (instance_ptr == nullptr) instance_ptr = new CRabbitmqClientImpl(); return instance_ptr; } void CRabbitmqClient::destory(CRabbitmqClient* instance) { if (instance != nullptr) { delete instance; instance = nullptr; } }