#include "post_reprocessing.hpp" #include "../common/logger.hpp" #ifdef POST_USE_RABBITMQ namespace mq { post_rabbitmq_reprocessing::post_rabbitmq_reprocessing() : rabbitmq_handle_(nullptr) { #ifdef _RBMQ_ASYNC_INTERFACE is_run_.store(true); run_thread_ = std::thread(std::bind(&post_rabbitmq_reprocessing::run, this)); #endif } post_rabbitmq_reprocessing::~post_rabbitmq_reprocessing() { #ifdef _RBMQ_ASYNC_INTERFACE is_run_.store(false); if (run_thread_.joinable()) run_thread_.join(); #endif if (rabbitmq_handle_ != nullptr) rabbitmq_handle_->Disconnect(); } post_rabbitmq_reprocessing::post_rabbitmq_reprocessing(post_rabbitmq_reprocessing &&other) : params_(std::move(other.params_)) , rabbitmq_handle_(std::move(other.rabbitmq_handle_)) { } post_rabbitmq_reprocessing& post_rabbitmq_reprocessing::operator=(post_rabbitmq_reprocessing &&other) { if (this == &other) return *this; this->params_ = std::move(other.params_); this->rabbitmq_handle_ = std::move(other.rabbitmq_handle_); return *this; } bool post_rabbitmq_reprocessing::init(const rabbitmq_conn_params_t ¶ms) { rabbitmq_handle_.reset(CRabbitmqClient::get_instance()); /* connection rabbitMQ. */ { int ret = -1; if (0 != (ret = rabbitmq_handle_->Connect(params.ip, params.port, params.uname, params.passwd, params.vhost))) { LOG_ERROR("RabbitMQ Connection {}:{} failed error code is {}!!!", params.ip, params.port, ret); return false; } if (0 != (ret = rabbitmq_handle_->ExchangeDeclare(params.exchange, params.exchange_type, params.durable_exchange))) { LOG_ERROR("RabbitMQ Declare Exchange {} failed error code is {}!!!", params.exchange, ret); return false; } if (0 != (ret = rabbitmq_handle_->QueueDeclare(params.queue, params.durable_queue))) { LOG_ERROR("RabbitMQ Declare Queue {} failed error code is {}!!!", params.queue, ret); return false; } if (0 != (ret = rabbitmq_handle_->QueueBind(params.queue, params.exchange, params.routing_key))) { LOG_ERROR("RabbitMQ Queue Bind failed queue is {} exchange is {} route key is {} error code is {}!!!", params.queue, params.exchange, params.routing_key, ret); return false; } } { params_ = params; } return true; } bool post_rabbitmq_reprocessing::sync_publish(const std::string &msg) { if (nullptr == rabbitmq_handle_) { LOG_ERROR("call Init before please !!!"); return false; } std::lock_guard lk(run_mutex_); return rabbitmq_handle_->Publish(msg, params_.exchange, params_.routing_key) == 0; } #ifdef _RBMQ_ASYNC_INTERFACE bool post_rabbitmq_reprocessing::async_publish(const std::string &msg) { if (nullptr == rabbitmq_handle_) { LOG_ERROR("call Init before please !!!"); return false; } std::lock_guard lk(run_mutex_); message_buffer_.push(msg); run_cv_.notify_one(); return true; } void post_rabbitmq_reprocessing::run() { while (is_run_.load()) { std::unique_lock lk(run_mutex_); run_cv_.wait(lk, [this] { return !this->message_buffer_.empty(); }); std::string msg = message_buffer_.front(); message_buffer_.pop(); if (rabbitmq_handle_->Publish(msg, params_.exchange, params_.routing_key) != 0) LOG_ERROR("Publish {} failed!", msg); else LOG_INFO("Publish {} successful!", msg); } } #endif } #endif // #ifdef POST_USE_RABBITMQ