Commit 40ac568f6168b0d58ddd0e0de85d7167271ecd4c

Authored by Hu Chunming
1 parent 8cf128fc

修改AVPacket复制方式,避免泄漏;删除无用代码

src/decoder/dvpp/FFRecoderTaskManager.cpp
... ... @@ -46,36 +46,35 @@ bool FFRecoderTaskManager::init(AVStream* stream, AVCodecContext* avctx){
46 46 m_avctx = avctx;
47 47 m_inStream = stream;
48 48  
49   - m_recoder_thread = new std::thread(
50   - [](void* arg) {
51   - FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg;
52   - if(_this != nullptr) {
53   - _this->recode_thread2();
54   - }else{
55   - LOG_ERROR("recode 线程启动失败 !");
56   - }
57   - return (void*)0;
58   - }, this);
  49 + // m_recoder_thread = new std::thread(
  50 + // [](void* arg) {
  51 + // FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg;
  52 + // if(_this != nullptr) {
  53 + // _this->recode_thread();
  54 + // }else{
  55 + // LOG_ERROR("recode 线程启动失败 !");
  56 + // }
  57 + // return (void*)0;
  58 + // }, this);
59 59  
60 60 return true;
61 61 }
62 62  
63   -bool FFRecoderTaskManager::init3(AVRational time_base, AVCodecContext* avctx){
64   - m_time_base = time_base;
65   - m_avctx = avctx;
66   -
67   - m_recoder_thread = new std::thread(
68   - [](void* arg) {
69   - FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg;
70   - if(_this != nullptr) {
71   - _this->recode_thread3();
72   - }else{
73   - LOG_ERROR("recode 线程启动失败 !");
74   - }
75   - return (void*)0;
76   - }, this);
77   -
78   - return true;
  63 +static AVPacket* packet_clone(AVPacket* pkt) {
  64 + AVPacket *new_pkt = av_packet_alloc();
  65 + av_init_packet( new_pkt );
  66 + av_new_packet(new_pkt, pkt->size);
  67 + // new_pkt->data = (uint8_t *)av_malloc(pkt->size) ;
  68 + memcpy(new_pkt->data, pkt->data, pkt->size);
  69 + new_pkt->size = pkt->size;
  70 + new_pkt->pts = pkt->pts;
  71 + new_pkt->dts = pkt->dts;
  72 + new_pkt->stream_index = pkt->stream_index;
  73 + new_pkt->duration = pkt->duration;
  74 + new_pkt->pos = pkt->pos;
  75 + new_pkt->flags = pkt->flags;
  76 + av_copy_packet_side_data(new_pkt, pkt);
  77 + return new_pkt;
79 78 }
80 79  
81 80 void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){
... ... @@ -87,18 +86,12 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){
87 86 std::lock_guard<std::mutex> l_pkt(m_pkt_list_mtx);
88 87  
89 88 // 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone
90   - AVPacket *new_pkt = av_packet_alloc();
91   - av_init_packet( new_pkt );
92   - new_pkt->data = (uint8_t *)av_malloc(pkt->size) ;
93   - memcpy(new_pkt->data, pkt->data, pkt->size);
94   - new_pkt->size = pkt->size;
95   - new_pkt->pts = pkt->pts;
96   - new_pkt->dts = pkt->dts;
97   - av_copy_packet_side_data(new_pkt, pkt);
  89 + AVPacket *new_pkt = packet_clone(pkt);
98 90  
99 91 DataPacket* newDataPkt = new DataPacket();
100 92 newDataPkt->pkt = new_pkt;
101 93 newDataPkt->frame_nb = frame_nb;
  94 +
102 95 m_pkt_list.emplace_back(newDataPkt);
103 96  
104 97 if(is_key_frame(pkt)){
... ... @@ -121,59 +114,6 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){
121 114 }
122 115 }
123 116  
124   -void FFRecoderTaskManager::cache_frame(AVFrame* frame, long long frame_nb){
125   - if(m_bExit) {
126   - // 任务退出了就不再缓存数据了
127   - return;
128   - }
129   -
130   - std::lock_guard<std::mutex> l_pkt(m_frame_list_mtx);
131   -
132   - // 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone
133   - AVFrame *new_pkt = av_frame_clone(frame);
134   -
135   - DataFrame* newFrame = new DataFrame();
136   - newFrame->frame = new_pkt;
137   - newFrame->frame_nb = frame_nb;
138   - m_frame_list.emplace_back(newFrame);
139   -
140   - // if(is_key_frame(pkt)){
141   - // // 越来越大的值
142   - // newFrame->isKeyFrame = true;
143   - // LOG_INFO("key frame_nb: {}", frame_nb);
144   - // } else {
145   - // newFrame->isKeyFrame = false;
146   - // }
147   -
148   - std::lock_guard<std::mutex> l_info(m_recoderinfo_list_mtx);
149   - if(m_recoderinfo_list.size() <= 0){
150   - // 没有任务的时候,维持500的长度
151   - while(m_frame_list.size() > 1000) {
152   - DataFrame* dataPkt = m_frame_list.front();
153   - delete dataPkt;
154   - dataPkt = nullptr;
155   - m_frame_list.pop_front();
156   - }
157   - }
158   -}
159   -
160   -void FFRecoderTaskManager::save_intask_frame_nb(unsigned long long frame_nb) {
161   - if(m_intask_frame_nb_list.size() <= 0) {
162   - m_intask_frame_nb_list.push_back(frame_nb);
163   - return;
164   - }
165   -
166   - for(auto it = m_intask_frame_nb_list.begin(); it != m_intask_frame_nb_list.end(); it++) {
167   - if(*it > frame_nb) {
168   - m_intask_frame_nb_list.insert(it, frame_nb);
169   - return;
170   - }
171   - }
172   -
173   - // 新 frame_nb 比所有的都大
174   - m_intask_frame_nb_list.push_back(frame_nb);
175   -}
176   -
177 117 void FFRecoderTaskManager::save_intask_recoderinfo(RecoderInfo info) {
178 118 std::lock_guard<std::mutex> l(m_recoderinfo_list_mtx);
179 119 if(m_recoderinfo_list.size() <= 0) {
... ... @@ -216,77 +156,7 @@ list&lt;DataPacket*&gt;::iterator FFRecoderTaskManager::getStartIterator(unsigned long
216 156 return m_pkt_list.begin();
217 157 }
218 158  
219   -list<DataPacket*>::iterator FFRecoderTaskManager::getEndIterator(unsigned long long frame_nb){
220   - std::lock_guard<std::mutex> l(m_pkt_list_mtx);
221   -
222   - auto it_first = m_pkt_list.end();
223   -
224   - auto it_second = m_pkt_list.begin();
225   - for(;it_second != m_pkt_list.end(); it_second++) {
226   - DataPacket* dataPkt = *it_second;
227   - if (dataPkt->isKeyFrame && dataPkt->frame_nb >= frame_nb){
228   - return it_second;
229   - }
230   - }
231   -
232   - return m_pkt_list.end();
233   -}
234   -
235   -list<DataFrame*>::iterator FFRecoderTaskManager::getStartIterator3(unsigned long long frame_nb) {
236   - std::lock_guard<std::mutex> l(m_frame_list_mtx);
237   -
238   - auto it_first = m_frame_list.begin();
239   -
240   - long long start_frame_nb = (long long)(frame_nb - 375);
241   - if(start_frame_nb <= 0) {
242   - return it_first;
243   - }
244   -
245   - auto it_second = m_frame_list.begin();
246   - for(;it_second != m_frame_list.end(); it_second++) {
247   - DataFrame* dataPkt = *it_second;
248   - if (dataPkt->frame_nb >= start_frame_nb){
249   - return it_second;
250   - }
251   - }
252   -
253   - return m_frame_list.begin();
254   -}
255   -
256   -// 多线程版
257   -void FFRecoderTaskManager::create_recode_task(AVRational time_base, AVCodecContext* avctx, RecoderInfo& recoderInfo){
258   -
259   - std::lock_guard<std::mutex> l(m_task_creat_mtx);
260   -
261   - RecodeParam recodeParam;
262   - recodeParam.time_base = time_base;
263   - recodeParam.avctx = avctx;
264   - recodeParam.recoderInfo = recoderInfo;
265   -
266   - RecodeThreadParam* threadParam = new RecodeThreadParam();
267   - threadParam->_this = this;
268   - threadParam->param = recodeParam;
269   - std::thread* recode_thread = new std::thread(
270   - [](void* arg) {
271   - RecodeThreadParam* threadParam = (RecodeThreadParam*)arg;
272   - if(threadParam != nullptr){
273   - FFRecoderTaskManager* _this=(FFRecoderTaskManager*)threadParam->_this;
274   - if(_this != nullptr) {
275   - _this->recode_thread(threadParam->param);
276   - }else{
277   - LOG_ERROR("recode 线程启动失败 !");
278   - }
279   - }
280   - return (void*)0;
281   - }, threadParam);
282   -
283   - std::string id = recoderInfo.task_id + "_" + recoderInfo.object_id + "_" + std::to_string(recoderInfo.frame_nb);
284   - m_id_recoderTask[id] = recode_thread;
285   -
286   - save_intask_frame_nb(recoderInfo.frame_nb);
287   -}
288   -
289   -void FFRecoderTaskManager::create_recode_task2(RecoderInfo& recoderInfo) {
  159 +void FFRecoderTaskManager::create_recode_task(RecoderInfo& recoderInfo) {
290 160 if(m_bExit) {
291 161 // 任务退出了就不再接收录制任务
292 162 return;
... ... @@ -294,44 +164,8 @@ void FFRecoderTaskManager::create_recode_task2(RecoderInfo&amp; recoderInfo) {
294 164 save_intask_recoderinfo(recoderInfo);
295 165 }
296 166  
297   -// 多线程版
298   -void FFRecoderTaskManager::recode_thread(RecodeParam recodeParam){
299   - {
300   - // 此处确保create_recode_task执行完成,m_id_recoderTask 已经保存当前线程信息
301   - std::lock_guard<std::mutex> l(m_task_creat_mtx);
302   - }
303   -
304   - RecoderInfo recoderInfo;
305   - recoderInfo = recodeParam.recoderInfo;
306   - std::string id = recoderInfo.task_id + "_" + recoderInfo.object_id + "_" + std::to_string(recoderInfo.frame_nb);
307   - string file_name = recoderInfo.recoderPath;
308   - FFRecoder ffrecoder;
309   - bool bInit = ffrecoder.init(m_inStream, recodeParam.avctx, file_name.c_str());
310   - if (!bInit) {
311   - LOG_ERROR("ffrecoder init error : {} {} {}", recoderInfo.task_id, recoderInfo.object_id, recoderInfo.frame_nb);
312   - m_id_recoderTask.erase(id);
313   - return;
314   - }
315   - LOG_INFO("record start, pkt_list size: {} id: {}", m_pkt_list.size(), id);
316   -
317   - int count = 0;
318   - for (auto it = m_pkt_list.begin(); it != m_pkt_list.end() && count < 500; ++it) {
319   - DataPacket* dataPkt = *it;
320   - AVPacket* pkt = dataPkt->pkt;
321   - ffrecoder.write_pkt(pkt);
322   - count++;
323   - }
324   -
325   - ffrecoder.flush();
326   - ffrecoder.uninit();
327   -
328   - m_id_recoderTask.erase(id);
329   -
330   - LOG_INFO("record end : {}", file_name);
331   -}
332   -
333   -void FFRecoderTaskManager::recode_thread2() {
334   - LOG_INFO("recode_thread2 start...");
  167 +void FFRecoderTaskManager::recode_thread() {
  168 + LOG_INFO("recode_thread start...");
335 169 while(true) {
336 170 if(m_bExit) {
337 171 break;
... ... @@ -361,10 +195,6 @@ void FFRecoderTaskManager::recode_thread2() {
361 195 auto it = m_pkt_list.begin();
362 196 while (it != it_data) {
363 197 DataPacket* dataPkt = m_pkt_list.front();
364   - // if(dataPkt->pkt != nullptr) {
365   - // av_packet_free(&dataPkt->pkt);
366   - // dataPkt->pkt = nullptr;
367   - // }
368 198 delete dataPkt;
369 199 dataPkt = nullptr;
370 200 m_pkt_list.pop_front();
... ... @@ -417,157 +247,7 @@ void FFRecoderTaskManager::recode_thread2() {
417 247 LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name);
418 248 }
419 249  
420   - LOG_INFO("recode_thread2 end.");
421   -}
422   -
423   -void FFRecoderTaskManager::recode_thread3() {
424   - LOG_INFO("recode_thread3 start...");
425   - while(true) {
426   - if(m_bExit) {
427   - break;
428   - }
429   -
430   - m_recoderinfo_list_mtx.lock();
431   - if(m_recoderinfo_list.size() <= 0){
432   - m_recoderinfo_list_mtx.unlock();
433   - std::this_thread::sleep_for(std::chrono::milliseconds(3));
434   - continue;
435   - }
436   -
437   - auto it_param = m_recoderinfo_list.begin();
438   - RecoderInfo recoderinfo = *it_param;
439   - m_recoderinfo_list.pop_front();
440   - m_recoderinfo_list_mtx.unlock();
441   -
442   - auto it_data = getStartIterator3(recoderinfo.frame_nb);
443   - if(it_data == m_frame_list.end()) {
444   - std::this_thread::sleep_for(std::chrono::milliseconds(3));
445   - continue;
446   - }
447   -
448   - LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb);
449   -
450   - m_frame_list_mtx.lock();
451   - auto it = m_frame_list.begin();
452   - while (it != it_data) {
453   - DataFrame* dataPkt = m_frame_list.front();
454   - delete dataPkt;
455   - dataPkt = nullptr;
456   - m_frame_list.pop_front();
457   - it = m_frame_list.begin();
458   - }
459   - m_frame_list_mtx.unlock();
460   -
461   - std::string id = recoderinfo.task_id + "_" + recoderinfo.object_id + "_" + std::to_string(recoderinfo.frame_nb);
462   - string file_name = recoderinfo.recoderPath;
463   - FFRecoder ffrecoder;
464   - bool bInit = ffrecoder.init(m_avctx->width, m_avctx->height, m_time_base, m_avctx, file_name.c_str());
465   - if (!bInit) {
466   - LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb);
467   - ffrecoder.uninit();
468   - continue;
469   - }
470   - LOG_INFO("record start, pkt_list size: {} id: {}", m_frame_list.size(), id);
471   -
472   - int count = 0;
473   - auto it_save = it_data;
474   - unsigned long long start_frame_nb = (*it_data)->frame_nb;
475   - unsigned long long end_frame_nb = (*it_data)->frame_nb;
476   - for (; it_save != m_frame_list.end() && count < 500; ++it_save) {
477   - DataFrame* dataPkt = *it_save;
478   - AVFrame* pkt = dataPkt->frame;
479   - ffrecoder.write_frame(pkt);
480   - count++;
481   - end_frame_nb = (*it_save)->frame_nb;
482   - }
483   -
484   - // ffrecoder.flush();
485   - ffrecoder.uninit();
486   -
487   - // 发送mq消息
488   - if(mq_publish_func) {
489   - mq_publish_func(recoderinfo.mq_info.c_str());
490   - }
491   -
492   - LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name);
493   - }
494   -
495   - LOG_INFO("recode_thread3 end.");
496   -}
497   -
498   -void FFRecoderTaskManager::recode_thread4() {
499   - LOG_INFO("recode_thread4 start...");
500   - while(true) {
501   - if(m_bExit) {
502   - break;
503   - }
504   -
505   - m_recoderinfo_list_mtx.lock();
506   - if(m_recoderinfo_list.size() <= 0){
507   - m_recoderinfo_list_mtx.unlock();
508   - std::this_thread::sleep_for(std::chrono::milliseconds(3));
509   - continue;
510   - }
511   -
512   - auto it_param = m_recoderinfo_list.begin();
513   - RecoderInfo recoderinfo = *it_param;
514   - m_recoderinfo_list.pop_front();
515   - m_recoderinfo_list_mtx.unlock();
516   -
517   - auto it_data = getStartIterator(recoderinfo.frame_nb);
518   - if(it_data == m_pkt_list.end()) {
519   - std::this_thread::sleep_for(std::chrono::milliseconds(3));
520   - continue;
521   - }
522   -
523   - LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb);
524   -
525   - m_pkt_list_mtx.lock();
526   - auto it = m_pkt_list.begin();
527   - while (it != it_data) {
528   - DataPacket* dataPkt = m_pkt_list.front();
529   - delete dataPkt;
530   - dataPkt = nullptr;
531   - m_pkt_list.pop_front();
532   - it = m_pkt_list.begin();
533   - }
534   - m_pkt_list_mtx.unlock();
535   -
536   - std::string id = recoderinfo.task_id + "_" + recoderinfo.object_id + "_" + std::to_string(recoderinfo.frame_nb);
537   - string file_name = recoderinfo.recoderPath;
538   - FFRecoder ffrecoder;
539   - bool bInit = ffrecoder.init(m_inStream, m_avctx, file_name.c_str());
540   - if (!bInit) {
541   - LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb);
542   - ffrecoder.uninit();
543   - continue;
544   - }
545   - LOG_INFO("record start, pkt_list size: {} id: {}", m_pkt_list.size(), id);
546   -
547   - int count = 0;
548   - auto it_save = it_data;
549   - unsigned long long start_frame_nb = (*it_data)->frame_nb;
550   - unsigned long long end_frame_nb = (*it_data)->frame_nb;
551   - for (; it_save != m_pkt_list.end() && count < 500; ++it_save) {
552   - DataPacket* dataPkt = *it_save;
553   - AVPacket* pkt = dataPkt->pkt;
554   - ffrecoder.write_pkt(pkt);
555   - count++;
556   - end_frame_nb = (*it_save)->frame_nb;
557   - }
558   -
559   - // ffrecoder.flush();
560   - ffrecoder.uninit();
561   -
562   - // 发送mq消息
563   - if(mq_publish_func) {
564   - mq_publish_func(recoderinfo.mq_info.c_str());
565   - }
566   -
567   - LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name);
568   - }
569   -
570   - LOG_INFO("recode_thread4 end.");
  250 + LOG_INFO("recode_thread end.");
571 251 }
572 252  
573 253 void FFRecoderTaskManager::close() {
... ...
src/decoder/dvpp/FFRecoderTaskManager.h
... ... @@ -26,32 +26,20 @@ public:
26 26 virtual ~FFRecoderTaskManager();
27 27  
28 28 void cache_pkt(AVPacket* pkt, long long frame_nb);
29   - void create_recode_task(AVRational time_base, AVCodecContext* avctx, RecoderInfo& recoderInfo);
30 29  
31 30 bool init(AVStream* stream, AVCodecContext* avctx);
32   - void create_recode_task2(RecoderInfo& recoderInfo);
  31 + void create_recode_task(RecoderInfo& recoderInfo);
33 32  
34 33 void close();
35 34  
36 35 void set_mq_callback(mq_callback_t cb);
37 36  
38   - bool init3(AVRational time_base, AVCodecContext* avctx);
39   - void cache_frame(AVFrame* frame, long long frame_nb);
40   -
41 37 public:
42   - void recode_thread(RecodeParam param);
43   - list<DataPacket*>::iterator getStartIterator(unsigned long long frame_nb);
44   - list<DataPacket*>::iterator getEndIterator(unsigned long long frame_nb);
45   -
46   - void recode_thread2();
47   - void recode_thread4();
48   -
49   - list<DataFrame*>::iterator getStartIterator3(unsigned long long frame_nb);
50   - void recode_thread3();
  38 + void recode_thread();
51 39  
52 40 private:
53   - void save_intask_frame_nb(unsigned long long frame_nb);
54 41 void save_intask_recoderinfo(RecoderInfo info);
  42 + list<DataPacket*>::iterator getStartIterator(unsigned long long frame_nb);
55 43  
56 44 private:
57 45 std::queue<int> m_key_frame_interval;
... ...