RedisJobMonitor.h
4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#pragma once
#define WIN32_LEAN_AND_MEAN
#include "Windows.h"
#include "base/base.h"
#include "locker/Locker.h"
#define KEY_JOB_PROCESS_ID_POSTFIX ".pid"
#define KEY_JOB_STATUS_POSTFIX ".status"
#define KEY_JOB_PROGRESS_POSTFIX ".progress"
#define KEY_JOB_TIMEOUT_POSTFIX ".timeout"
struct redisContext;
class CRedisConnector;
class CBaseRedisJobMonitor;
class CRedisJobMonitor;
// 任务处理状态定义
enum ENUM_JOB_STATUS
{
JOB_STATUS_PENDING = 0, // 待处理
JOB_STATUS_PROCESSING, // 任务处理中
JOB_STATUS_FREEZING, // 任务暂停中
JOB_STATUS_FINISH_SUCCESS, // 任务处理成功
JOB_STATUS_FINISH_FAIL, // 任务处理失败
JOB_STATUS_INTERRUPTED, // 任务中断
JOB_STATUS_TIMEOUT, // 任务超时异常
JOB_STATUS_LOADED, // 任务结果已载入
JOB_STATUS_UNKNOWN // 任务状态未知
};
//
// REDIS服务器连接类
//
class CRedisConnector
{
public:
explicit CRedisConnector(const std::string& redis_ip, int redis_port);
virtual ~CRedisConnector(void);
// 获取REDIS服务器连接句柄
redisContext * get(void);
// 测试服务器是否连接成功
bool ping(void);
private:
redisContext * ctx_; // REDIS服务器连接池
};
//
// REDIS任务监测器基类
//
class CBaseRedisJobMonitor
{
public:
CBaseRedisJobMonitor(void) : connector_ptr_(NULL)
{
}
virtual ~CBaseRedisJobMonitor(void)
{
cleanup();
}
// 建立REDIS服务器连接
inline bool connect(const std::string& redis_ip = "127.0.0.1", int redis_port = 6379)
{
redis_ip_ = redis_ip;
redis_port_ = redis_port;
connector_ptr_ = new CRedisConnector(redis_ip, redis_port);
return connector_ptr_->ping();
}
// 测试REDIS连接是否正常
inline bool ping(void)
{
if (connector_ptr_)
{
// 若连接异常,则重连
if (!connector_ptr_->ping())
{
return reconnect();
}
return TRUE;
}
return FALSE;
}
// 重新连接REDIS服务器
inline bool reconnect(void)
{
if (connector_ptr_)
{
delete connector_ptr_;
}
return connect(redis_ip_, redis_port_);
}
// 初始化任务
inline void init(const std::string& job_uuid)
{
job_uuid_ = job_uuid;
job_status_key_ = job_uuid + KEY_JOB_STATUS_POSTFIX;
job_progress_key_ = job_uuid + KEY_JOB_PROGRESS_POSTFIX;
job_timeout_key_ = job_uuid + KEY_JOB_TIMEOUT_POSTFIX;
}
inline std::string get_job_id(void)
{
return job_uuid_;
}
inline void cleanup(void)
{
if (connector_ptr_)
{
delete connector_ptr_;
connector_ptr_ = NULL;
}
}
public:
// 获取任务键值
static int get_job_val(redisContext * ctx, const std::string& job_key, int default_job_val);
static DWORD get_job_val(redisContext * ctx, const std::string& job_key, DWORD default_job_val);
static std::string get_job_val(redisContext * ctx, const std::string& job_key, std::string default_job_val);
// 设置任务键值(int)
static bool set_job_val(redisContext * ctx, const std::string& job_key, int job_val);
// 设置任务键值(DWORD, ulong)
static bool set_job_val(redisContext * ctx, const std::string& job_key, DWORD job_val);
// 设置任务键值
static bool set_job_val(redisContext * ctx, const std::string& job_key, std::string job_val);
static bool del_key(redisContext * ctx, const std::string& job_key);
static bool exist_key(redisContext * ctx, const std::string& job_key);
protected:
std::string redis_ip_; // REDIS服务器地址
int redis_port_; // REDIS服务端口
Common::CMutex mutex_;
CRedisConnector * connector_ptr_; // REDIS连接器指针
std::string job_uuid_; // 任务ID
std::string job_pid_key_; // 任务处理进程id键
std::string job_status_key_; // 任务状态键
std::string job_progress_key_; // 任务进度键
std::string job_timeout_key_; // 任务超时键
};