RedisJobMonitor.h 4.03 KB
#pragma once

#define WIN32_LEAN_AND_MEAN
#include "Windows.h"
#include "base/base.h"
#include "locker/Locker.h"

#define KEY_JOB_PROCESS_ID_POSTFIX  ".pid"
#define KEY_JOB_STATUS_POSTFIX      ".status"
#define KEY_JOB_PROGRESS_POSTFIX    ".progress"
#define KEY_JOB_TIMEOUT_POSTFIX     ".timeout"

struct redisContext;
class CRedisConnector;
class CBaseRedisJobMonitor;
class CRedisJobMonitor;

// 任务处理状态定义
enum ENUM_JOB_STATUS
{
    JOB_STATUS_PENDING = 0,         // 待处理 
    JOB_STATUS_PROCESSING,          // 任务处理中
    JOB_STATUS_FREEZING,            // 任务暂停中
    JOB_STATUS_FINISH_SUCCESS,      // 任务处理成功
    JOB_STATUS_FINISH_FAIL,         // 任务处理失败
    JOB_STATUS_INTERRUPTED,         // 任务中断
    JOB_STATUS_TIMEOUT,             // 任务超时异常
	JOB_STATUS_LOADED,              // 任务结果已载入
    JOB_STATUS_UNKNOWN              // 任务状态未知
};

// 
// REDIS服务器连接类
// 
class CRedisConnector
{
public:
    explicit CRedisConnector(const std::string& redis_ip, int redis_port);
    virtual ~CRedisConnector(void);

    // 获取REDIS服务器连接句柄
    redisContext * get(void);

    // 测试服务器是否连接成功
    bool ping(void);

private:
    redisContext * ctx_;            // REDIS服务器连接池
};

//
// REDIS任务监测器基类
//
class CBaseRedisJobMonitor
{
public:
    CBaseRedisJobMonitor(void) : connector_ptr_(NULL)
    {
    }

    virtual ~CBaseRedisJobMonitor(void)
    {
		cleanup();
    }
    
    // 建立REDIS服务器连接
    inline bool connect(const std::string& redis_ip = "127.0.0.1", int redis_port = 6379)
    {
        redis_ip_ = redis_ip;
        redis_port_ = redis_port;

        connector_ptr_ = new CRedisConnector(redis_ip, redis_port);
        return connector_ptr_->ping();
    }

    // 测试REDIS连接是否正常
    inline bool ping(void)
    {
        if (connector_ptr_)
        {
			// 若连接异常,则重连
			if (!connector_ptr_->ping())
			{
				return reconnect();
			}

			return TRUE;
        }

        return FALSE;
    }
    
    // 重新连接REDIS服务器
    inline bool reconnect(void)
    {
        if (connector_ptr_)
        {
            delete connector_ptr_;
        }

        return connect(redis_ip_, redis_port_);
    }

    // 初始化任务
    inline void init(const std::string& job_uuid)
    {
        job_uuid_ = job_uuid;
        job_status_key_ = job_uuid + KEY_JOB_STATUS_POSTFIX;
        job_progress_key_ = job_uuid + KEY_JOB_PROGRESS_POSTFIX;
        job_timeout_key_ = job_uuid + KEY_JOB_TIMEOUT_POSTFIX;
    }

	inline std::string get_job_id(void)
	{
		return job_uuid_;
	}

    inline void cleanup(void)
    {
        if (connector_ptr_)
        {
            delete connector_ptr_;
            connector_ptr_ = NULL;
        }
    }

public:
    // 获取任务键值
    static int get_job_val(redisContext * ctx, const std::string& job_key, int default_job_val);

	static DWORD get_job_val(redisContext * ctx, const std::string& job_key, DWORD default_job_val);

    static std::string get_job_val(redisContext * ctx, const std::string& job_key, std::string default_job_val);

    // 设置任务键值(int)
    static bool set_job_val(redisContext * ctx, const std::string& job_key, int job_val);
	// 设置任务键值(DWORD, ulong)
	static bool set_job_val(redisContext * ctx, const std::string& job_key, DWORD job_val);
    // 设置任务键值
	static bool set_job_val(redisContext * ctx, const std::string& job_key, std::string job_val);

    static bool del_key(redisContext * ctx, const std::string& job_key);

    static bool exist_key(redisContext * ctx, const std::string& job_key);

protected:
    std::string redis_ip_;                      // REDIS服务器地址
    int redis_port_;                            // REDIS服务端口
    Common::CMutex mutex_;
    CRedisConnector * connector_ptr_;           // REDIS连接器指针

    std::string job_uuid_;                      // 任务ID
	std::string job_pid_key_;                   // 任务处理进程id键
    std::string job_status_key_;                // 任务状态键
    std::string job_progress_key_;              // 任务进度键
    std::string job_timeout_key_;               // 任务超时键
};