746db74c
Hu Chunming
实现recode
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
#include "FFRecoderTaskManager.h"
#include <chrono>
struct RecodeThreadParam {
FFRecoderTaskManager* _this;
RecodeParam param;
};
static long get_cur_time() {
chrono::time_point<chrono::system_clock, chrono::milliseconds> tpMicro
= chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now());
return tpMicro.time_since_epoch().count();
}
static string get_cur_datetime()
{
using namespace std;
auto t = chrono::system_clock::to_time_t(chrono::system_clock::now());
char buffer[40];
strftime(buffer, 80, "%Y_%m_%d_%H_%M_%S", std::localtime(&t));
buffer[20] = '\0';
return buffer;
}
static bool is_key_frame(AVPacket *pkt) {
return (pkt->flags & AV_PKT_FLAG_KEY) != 0;
}
FFRecoderTaskManager::FFRecoderTaskManager(){
|
5a84488e
Hu Chunming
添加农村事件开关
|
33
|
m_recoder_thread = nullptr;
|
746db74c
Hu Chunming
实现recode
|
34
35
36
|
}
FFRecoderTaskManager::~FFRecoderTaskManager(){
|
bc01d0b6
Hu Chunming
代码优化;主要是修改了截图逻辑,避...
|
37
|
LOG_DEBUG("~FFRecoderTaskManager()");
|
746db74c
Hu Chunming
实现recode
|
38
39
|
}
|
bf661eb0
Hu Chunming
录像文件保存优化
|
40
|
bool FFRecoderTaskManager::init(AVStream* stream, AVCodecContext* avctx){
|
1b57a1c5
Hu Chunming
代码优化,避免可能的崩溃
|
41
42
43
44
|
if(stream == nullptr || avctx == nullptr) {
return false;
}
|
bf661eb0
Hu Chunming
录像文件保存优化
|
45
|
m_time_base = stream->time_base;
|
746db74c
Hu Chunming
实现recode
|
46
|
m_avctx = avctx;
|
bf661eb0
Hu Chunming
录像文件保存优化
|
47
|
m_inStream = stream;
|
746db74c
Hu Chunming
实现recode
|
48
|
|
0bb99bec
Hu Chunming
恢复recode代码;删除无用变量
|
49
50
51
52
53
54
55
56
57
58
|
m_recoder_thread = new std::thread(
[](void* arg) {
FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg;
if(_this != nullptr) {
_this->recode_thread();
}else{
LOG_ERROR("recode 线程启动失败 !");
}
return (void*)0;
}, this);
|
746db74c
Hu Chunming
实现recode
|
59
60
61
62
|
return true;
}
|
40ac568f
Hu Chunming
修改AVPacket复制方式,避免...
|
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
static AVPacket* packet_clone(AVPacket* pkt) {
AVPacket *new_pkt = av_packet_alloc();
av_init_packet( new_pkt );
av_new_packet(new_pkt, pkt->size);
// new_pkt->data = (uint8_t *)av_malloc(pkt->size) ;
memcpy(new_pkt->data, pkt->data, pkt->size);
new_pkt->size = pkt->size;
new_pkt->pts = pkt->pts;
new_pkt->dts = pkt->dts;
new_pkt->stream_index = pkt->stream_index;
new_pkt->duration = pkt->duration;
new_pkt->pos = pkt->pos;
new_pkt->flags = pkt->flags;
av_copy_packet_side_data(new_pkt, pkt);
return new_pkt;
|
bf661eb0
Hu Chunming
录像文件保存优化
|
78
79
|
}
|
746db74c
Hu Chunming
实现recode
|
80
|
void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){
|
d9fc3e82
Hu Chunming
recode添加colse功能和mq功能
|
81
82
83
84
85
|
if(m_bExit) {
// 任务退出了就不再缓存数据了
return;
}
|
746db74c
Hu Chunming
实现recode
|
86
87
88
|
std::lock_guard<std::mutex> l_pkt(m_pkt_list_mtx);
// 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone
|
40ac568f
Hu Chunming
修改AVPacket复制方式,避免...
|
89
|
AVPacket *new_pkt = packet_clone(pkt);
|
746db74c
Hu Chunming
实现recode
|
90
91
92
93
|
DataPacket* newDataPkt = new DataPacket();
newDataPkt->pkt = new_pkt;
newDataPkt->frame_nb = frame_nb;
|
40ac568f
Hu Chunming
修改AVPacket复制方式,避免...
|
94
|
|
746db74c
Hu Chunming
实现recode
|
95
96
97
98
99
100
101
102
|
if(is_key_frame(pkt)){
// 越来越大的值
newDataPkt->isKeyFrame = true;
LOG_INFO("key frame_nb: {}", frame_nb);
} else {
newDataPkt->isKeyFrame = false;
}
|
294ef5bc
Hu Chunming
修复解码器内因线程退出顺序错误导致...
|
103
104
|
m_pkt_list.emplace_back(newDataPkt);
|
746db74c
Hu Chunming
实现recode
|
105
106
107
108
109
110
111
112
113
114
115
116
|
std::lock_guard<std::mutex> l_info(m_recoderinfo_list_mtx);
if(m_recoderinfo_list.size() <= 0){
// 没有任务的时候,维持500的长度
while(m_pkt_list.size() > 1000) {
DataPacket* dataPkt = m_pkt_list.front();
delete dataPkt;
dataPkt = nullptr;
m_pkt_list.pop_front();
}
}
}
|
746db74c
Hu Chunming
实现recode
|
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
void FFRecoderTaskManager::save_intask_recoderinfo(RecoderInfo info) {
std::lock_guard<std::mutex> l(m_recoderinfo_list_mtx);
if(m_recoderinfo_list.size() <= 0) {
m_recoderinfo_list.push_back(info);
return;
}
for(auto it = m_recoderinfo_list.begin(); it != m_recoderinfo_list.end(); it++) {
if(it->frame_nb > info.frame_nb) {
m_recoderinfo_list.insert(it, info);
return;
}
}
// 新 frame_nb 比所有的都大
m_recoderinfo_list.push_back(info);
}
list<DataPacket*>::iterator FFRecoderTaskManager::getStartIterator(unsigned long long frame_nb){
std::lock_guard<std::mutex> l(m_pkt_list_mtx);
auto it_first = m_pkt_list.begin();
long long start_frame_nb = (long long)(frame_nb - 375);
if(start_frame_nb <= 0) {
return it_first;
}
|
746db74c
Hu Chunming
实现recode
|
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
auto it_second = m_pkt_list.begin();
for(;it_second != m_pkt_list.end(); it_second++) {
DataPacket* dataPkt = *it_second;
if(dataPkt->isKeyFrame) {
it_first = it_second;
}
if(start_frame_nb >= (*it_first)->frame_nb && start_frame_nb <= (*it_second)->frame_nb) {
return it_first;
}
}
return m_pkt_list.begin();
}
|
40ac568f
Hu Chunming
修改AVPacket复制方式,避免...
|
159
|
void FFRecoderTaskManager::create_recode_task(RecoderInfo& recoderInfo) {
|
d9fc3e82
Hu Chunming
recode添加colse功能和mq功能
|
160
161
162
163
|
if(m_bExit) {
// 任务退出了就不再接收录制任务
return;
}
|
746db74c
Hu Chunming
实现recode
|
164
165
166
|
save_intask_recoderinfo(recoderInfo);
}
|
40ac568f
Hu Chunming
修改AVPacket复制方式,避免...
|
167
168
|
void FFRecoderTaskManager::recode_thread() {
LOG_INFO("recode_thread start...");
|
746db74c
Hu Chunming
实现recode
|
169
|
while(true) {
|
d9fc3e82
Hu Chunming
recode添加colse功能和mq功能
|
170
171
172
173
|
if(m_bExit) {
break;
}
|
746db74c
Hu Chunming
实现recode
|
174
175
176
177
178
179
180
181
182
|
m_recoderinfo_list_mtx.lock();
if(m_recoderinfo_list.size() <= 0){
m_recoderinfo_list_mtx.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(3));
continue;
}
auto it_param = m_recoderinfo_list.begin();
RecoderInfo recoderinfo = *it_param;
|
746db74c
Hu Chunming
实现recode
|
183
184
|
m_recoderinfo_list_mtx.unlock();
|
b070c0fd
Hu Chunming
修正m_recoderinfo_l...
|
185
186
187
188
189
190
191
|
do
{
auto it_data = getStartIterator(recoderinfo.frame_nb);
if(it_data == m_pkt_list.end()) {
std::this_thread::sleep_for(std::chrono::milliseconds(3));
break;
}
|
746db74c
Hu Chunming
实现recode
|
192
|
|
b070c0fd
Hu Chunming
修正m_recoderinfo_l...
|
193
|
LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb);
|
bf661eb0
Hu Chunming
录像文件保存优化
|
194
|
|
b070c0fd
Hu Chunming
修正m_recoderinfo_l...
|
195
196
197
198
199
200
201
202
203
204
|
m_pkt_list_mtx.lock();
auto it = m_pkt_list.begin();
while (it != it_data) {
DataPacket* dataPkt = m_pkt_list.front();
delete dataPkt;
dataPkt = nullptr;
m_pkt_list.pop_front();
it = m_pkt_list.begin();
}
m_pkt_list_mtx.unlock();
|
bf661eb0
Hu Chunming
录像文件保存优化
|
205
|
|
b070c0fd
Hu Chunming
修正m_recoderinfo_l...
|
206
207
208
209
210
211
212
|
string file_name = recoderinfo.recoderPath;
FFRecoder ffrecoder;
bool bInit = ffrecoder.init(m_inStream, m_avctx, file_name.c_str());
if (!bInit) {
LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb);
ffrecoder.uninit();
|
bf661eb0
Hu Chunming
录像文件保存优化
|
213
214
|
break;
}
|
b070c0fd
Hu Chunming
修正m_recoderinfo_l...
|
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
|
LOG_DEBUG("record start, pkt_list size: {} task_id: {} object_id:{} frame_nb: {}", m_pkt_list.size(), recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb);
int count = 0;
auto it_save = it_data;
unsigned long long start_frame_nb = (*it_data)->frame_nb;
unsigned long long end_frame_nb = (*it_data)->frame_nb;
for (; it_save != m_pkt_list.end() && count < 500; ++it_save) {
DataPacket* dataPkt = *it_save;
if(dataPkt->frame_nb > recoderinfo.frame_nb) {
break;
}
AVPacket* pkt = dataPkt->pkt;
if(pkt == nullptr) {
LOG_ERROR("{} pkt is nullptr", recoderinfo.task_id);
continue;
} else if (pkt->data == nullptr || pkt->size <= 0){
LOG_ERROR("{} pkt data is nullptr", recoderinfo.task_id);
continue;
}
ffrecoder.write_pkt(pkt);
count++;
end_frame_nb = (*it_save)->frame_nb;
|
1b57a1c5
Hu Chunming
代码优化,避免可能的崩溃
|
238
|
}
|
bf661eb0
Hu Chunming
录像文件保存优化
|
239
|
|
b070c0fd
Hu Chunming
修正m_recoderinfo_l...
|
240
241
|
ffrecoder.flush_pkt();
ffrecoder.uninit();
|
bf661eb0
Hu Chunming
录像文件保存优化
|
242
|
|
b070c0fd
Hu Chunming
修正m_recoderinfo_l...
|
243
244
245
246
247
|
// 发送mq消息
if(mq_publish_func && recoderinfo.mq_info.length() > 0) {
mq_publish_func(recoderinfo.mq_info.c_str());
// LOG_INFO("record save: {}", recoderinfo.mq_info.c_str());
}
|
bf661eb0
Hu Chunming
录像文件保存优化
|
248
|
|
b070c0fd
Hu Chunming
修正m_recoderinfo_l...
|
249
250
251
252
253
254
255
256
|
LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name);
} while (0);
// m_recoderinfo_list 为空会触发 m_pkt_list size 大于1000时的删除操作,
// 因此应当在本次m_pkt_list操作都完成之后再pop,避免这边在用m_pkt_list, 另一边在删除,从而导致崩溃
m_recoderinfo_list_mtx.lock();
m_recoderinfo_list.pop_front();
m_recoderinfo_list_mtx.unlock();
|
bf661eb0
Hu Chunming
录像文件保存优化
|
257
258
|
}
|
40ac568f
Hu Chunming
修改AVPacket复制方式,避免...
|
259
|
LOG_INFO("recode_thread end.");
|
d9fc3e82
Hu Chunming
recode添加colse功能和mq功能
|
260
261
262
263
264
265
266
|
}
void FFRecoderTaskManager::close() {
m_bExit = true;
if (m_recoder_thread) {
m_recoder_thread->join();
|
bc01d0b6
Hu Chunming
代码优化;主要是修改了截图逻辑,避...
|
267
|
delete m_recoder_thread;
|
d9fc3e82
Hu Chunming
recode添加colse功能和mq功能
|
268
269
270
271
272
273
274
275
276
277
278
279
280
281
|
m_recoder_thread = nullptr;
}
// 清空数据
while(!m_pkt_list.empty()) {
DataPacket* dataPkt = m_pkt_list.front();
delete dataPkt;
dataPkt = nullptr;
m_pkt_list.pop_front();
}
}
void FFRecoderTaskManager::set_mq_callback(mq_callback_t cb) {
mq_publish_func = cb;
|
746db74c
Hu Chunming
实现recode
|
282
|
}
|