#pragma once #define WIN32_LEAN_AND_MEAN #include "Windows.h" #include "base/base.h" #include "locker/Locker.h" #define KEY_JOB_PROCESS_ID_POSTFIX ".pid" #define KEY_JOB_STATUS_POSTFIX ".status" #define KEY_JOB_PROGRESS_POSTFIX ".progress" #define KEY_JOB_TIMEOUT_POSTFIX ".timeout" struct redisContext; class CRedisConnector; class CBaseRedisJobMonitor; class CRedisJobMonitor; // 任务处理状态定义 enum ENUM_JOB_STATUS { JOB_STATUS_PENDING = 0, // 待处理 JOB_STATUS_PROCESSING, // 任务处理中 JOB_STATUS_FREEZING, // 任务暂停中 JOB_STATUS_FINISH_SUCCESS, // 任务处理成功 JOB_STATUS_FINISH_FAIL, // 任务处理失败 JOB_STATUS_INTERRUPTED, // 任务中断 JOB_STATUS_TIMEOUT, // 任务超时异常 JOB_STATUS_LOADED, // 任务结果已载入 JOB_STATUS_UNKNOWN // 任务状态未知 }; // // REDIS服务器连接类 // class CRedisConnector { public: explicit CRedisConnector(const std::string& redis_ip, int redis_port); virtual ~CRedisConnector(void); // 获取REDIS服务器连接句柄 redisContext * get(void); // 测试服务器是否连接成功 bool ping(void); private: redisContext * ctx_; // REDIS服务器连接池 }; // // REDIS任务监测器基类 // class CBaseRedisJobMonitor { public: CBaseRedisJobMonitor(void) : connector_ptr_(NULL) { } virtual ~CBaseRedisJobMonitor(void) { cleanup(); } // 建立REDIS服务器连接 inline bool connect(const std::string& redis_ip = "127.0.0.1", int redis_port = 6379) { redis_ip_ = redis_ip; redis_port_ = redis_port; connector_ptr_ = new CRedisConnector(redis_ip, redis_port); return connector_ptr_->ping(); } // 测试REDIS连接是否正常 inline bool ping(void) { if (connector_ptr_) { // 若连接异常,则重连 if (!connector_ptr_->ping()) { return reconnect(); } return TRUE; } return FALSE; } // 重新连接REDIS服务器 inline bool reconnect(void) { if (connector_ptr_) { delete connector_ptr_; } return connect(redis_ip_, redis_port_); } // 初始化任务 inline void init(const std::string& job_uuid) { job_uuid_ = job_uuid; job_status_key_ = job_uuid + KEY_JOB_STATUS_POSTFIX; job_progress_key_ = job_uuid + KEY_JOB_PROGRESS_POSTFIX; job_timeout_key_ = job_uuid + KEY_JOB_TIMEOUT_POSTFIX; } inline std::string get_job_id(void) { return job_uuid_; } inline void cleanup(void) { if (connector_ptr_) { delete connector_ptr_; connector_ptr_ = NULL; } } public: // 获取任务键值 static int get_job_val(redisContext * ctx, const std::string& job_key, int default_job_val); static DWORD get_job_val(redisContext * ctx, const std::string& job_key, DWORD default_job_val); static std::string get_job_val(redisContext * ctx, const std::string& job_key, std::string default_job_val); // 设置任务键值(int) static bool set_job_val(redisContext * ctx, const std::string& job_key, int job_val); // 设置任务键值(DWORD, ulong) static bool set_job_val(redisContext * ctx, const std::string& job_key, DWORD job_val); // 设置任务键值 static bool set_job_val(redisContext * ctx, const std::string& job_key, std::string job_val); static bool del_key(redisContext * ctx, const std::string& job_key); static bool exist_key(redisContext * ctx, const std::string& job_key); protected: std::string redis_ip_; // REDIS服务器地址 int redis_port_; // REDIS服务端口 Common::CMutex mutex_; CRedisConnector * connector_ptr_; // REDIS连接器指针 std::string job_uuid_; // 任务ID std::string job_pid_key_; // 任务处理进程id键 std::string job_status_key_; // 任务状态键 std::string job_progress_key_; // 任务进度键 std::string job_timeout_key_; // 任务超时键 };