post_reprocessing.cpp 3.35 KB
#include "post_reprocessing.hpp"
#include "../common/logger.hpp"

#ifdef POST_USE_RABBITMQ


namespace mq
{

	post_rabbitmq_reprocessing::post_rabbitmq_reprocessing()
	: rabbitmq_handle_(nullptr)
	{
#ifdef _RBMQ_ASYNC_INTERFACE
		is_run_.store(true);
		run_thread_ = std::thread(std::bind(&post_rabbitmq_reprocessing::run, this));
#endif
	}

	post_rabbitmq_reprocessing::~post_rabbitmq_reprocessing()
	{
#ifdef _RBMQ_ASYNC_INTERFACE
		is_run_.store(false);

		if (run_thread_.joinable())
			run_thread_.join();
#endif
		if (rabbitmq_handle_ != nullptr)
			rabbitmq_handle_->Disconnect();
	}

	post_rabbitmq_reprocessing::post_rabbitmq_reprocessing(post_rabbitmq_reprocessing &&other)
	: params_(std::move(other.params_))
	, rabbitmq_handle_(std::move(other.rabbitmq_handle_))
	{

	}

	post_rabbitmq_reprocessing& post_rabbitmq_reprocessing::operator=(post_rabbitmq_reprocessing &&other)
	{
		if (this == &other)
			return *this;

		this->params_ = std::move(other.params_);
		this->rabbitmq_handle_ = std::move(other.rabbitmq_handle_);
		return *this;
	}

	bool post_rabbitmq_reprocessing::init(const rabbitmq_conn_params_t &params)
	{
		rabbitmq_handle_.reset(CRabbitmqClient::get_instance());
		/* connection rabbitMQ. */
		{
			int ret = -1;
			if (0 != (ret = rabbitmq_handle_->Connect(params.ip, params.port, params.uname, params.passwd, params.vhost)))
			{
				LOG_ERROR("RabbitMQ Connection {}:{} failed error code is {}!!!", params.ip, params.port, ret);
				return false;
			}

			if (0 != (ret = rabbitmq_handle_->ExchangeDeclare(params.exchange, params.exchange_type, params.durable_exchange)))
			{
				LOG_ERROR("RabbitMQ Declare Exchange {} failed error code is {}!!!", params.exchange, ret);
				return false;
			}

			if (0 != (ret = rabbitmq_handle_->QueueDeclare(params.queue, params.durable_queue)))
			{
				LOG_ERROR("RabbitMQ Declare Queue {} failed error code is {}!!!", params.queue, ret);
				return false;
			}

			if (0 != (ret = rabbitmq_handle_->QueueBind(params.queue, params.exchange, params.routing_key)))
			{
				LOG_ERROR("RabbitMQ Queue Bind failed queue is {} exchange is {} route key is {} error code is {}!!!", params.queue, params.exchange, params.routing_key, ret);
				return false;
			}
		}

		{
			params_ = params;
		}

		return true;
	}

	bool post_rabbitmq_reprocessing::sync_publish(const std::string &msg)
	{
		if (nullptr == rabbitmq_handle_)
		{
			LOG_ERROR("call Init before please !!!");
			return false;
		}
		std::lock_guard<std::mutex> lk(run_mutex_);
		return rabbitmq_handle_->Publish(msg, params_.exchange, params_.routing_key) == 0;
	}

#ifdef _RBMQ_ASYNC_INTERFACE
	bool post_rabbitmq_reprocessing::async_publish(const std::string &msg)
	{
		if (nullptr == rabbitmq_handle_)
		{
			LOG_ERROR("call Init before please !!!");
			return false;
		}
		std::lock_guard<std::mutex> lk(run_mutex_);
		message_buffer_.push(msg);
		run_cv_.notify_one();
		return true;
	}


	void post_rabbitmq_reprocessing::run()
	{
		while (is_run_.load())
		{
			std::unique_lock<std::mutex> lk(run_mutex_);
			run_cv_.wait(lk,
						[this] { return !this->message_buffer_.empty(); });
			std::string msg = message_buffer_.front();
			message_buffer_.pop();

			if (rabbitmq_handle_->Publish(msg, params_.exchange, params_.routing_key) != 0)
				LOG_ERROR("Publish {} failed!", msg);
			else
				LOG_INFO("Publish {} successful!", msg);
		}
	}
#endif

}

#endif  // #ifdef POST_USE_RABBITMQ