From 40ac568f6168b0d58ddd0e0de85d7167271ecd4c Mon Sep 17 00:00:00 2001 From: fiss <2657262686@qq.com> Date: Wed, 30 Aug 2023 15:19:16 +0800 Subject: [PATCH] 修改AVPacket复制方式,避免泄漏;删除无用代码 --- src/decoder/dvpp/FFRecoderTaskManager.cpp | 382 +++++++++++++++++++++++++++++++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/decoder/dvpp/FFRecoderTaskManager.h | 18 +++--------------- 2 files changed, 34 insertions(+), 366 deletions(-) diff --git a/src/decoder/dvpp/FFRecoderTaskManager.cpp b/src/decoder/dvpp/FFRecoderTaskManager.cpp index 7847f10..d83f1c1 100644 --- a/src/decoder/dvpp/FFRecoderTaskManager.cpp +++ b/src/decoder/dvpp/FFRecoderTaskManager.cpp @@ -46,36 +46,35 @@ bool FFRecoderTaskManager::init(AVStream* stream, AVCodecContext* avctx){ m_avctx = avctx; m_inStream = stream; - m_recoder_thread = new std::thread( - [](void* arg) { - FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg; - if(_this != nullptr) { - _this->recode_thread2(); - }else{ - LOG_ERROR("recode 线程启动失败 !"); - } - return (void*)0; - }, this); + // m_recoder_thread = new std::thread( + // [](void* arg) { + // FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg; + // if(_this != nullptr) { + // _this->recode_thread(); + // }else{ + // LOG_ERROR("recode 线程启动失败 !"); + // } + // return (void*)0; + // }, this); return true; } -bool FFRecoderTaskManager::init3(AVRational time_base, AVCodecContext* avctx){ - m_time_base = time_base; - m_avctx = avctx; - - m_recoder_thread = new std::thread( - [](void* arg) { - FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg; - if(_this != nullptr) { - _this->recode_thread3(); - }else{ - LOG_ERROR("recode 线程启动失败 !"); - } - return (void*)0; - }, this); - - return true; +static AVPacket* packet_clone(AVPacket* pkt) { + AVPacket *new_pkt = av_packet_alloc(); + av_init_packet( new_pkt ); + av_new_packet(new_pkt, pkt->size); + // new_pkt->data = (uint8_t *)av_malloc(pkt->size) ; + memcpy(new_pkt->data, pkt->data, pkt->size); + new_pkt->size = pkt->size; + new_pkt->pts = pkt->pts; + new_pkt->dts = pkt->dts; + new_pkt->stream_index = pkt->stream_index; + new_pkt->duration = pkt->duration; + new_pkt->pos = pkt->pos; + new_pkt->flags = pkt->flags; + av_copy_packet_side_data(new_pkt, pkt); + return new_pkt; } void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){ @@ -87,18 +86,12 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){ std::lock_guard l_pkt(m_pkt_list_mtx); // 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone - AVPacket *new_pkt = av_packet_alloc(); - av_init_packet( new_pkt ); - new_pkt->data = (uint8_t *)av_malloc(pkt->size) ; - memcpy(new_pkt->data, pkt->data, pkt->size); - new_pkt->size = pkt->size; - new_pkt->pts = pkt->pts; - new_pkt->dts = pkt->dts; - av_copy_packet_side_data(new_pkt, pkt); + AVPacket *new_pkt = packet_clone(pkt); DataPacket* newDataPkt = new DataPacket(); newDataPkt->pkt = new_pkt; newDataPkt->frame_nb = frame_nb; + m_pkt_list.emplace_back(newDataPkt); if(is_key_frame(pkt)){ @@ -121,59 +114,6 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){ } } -void FFRecoderTaskManager::cache_frame(AVFrame* frame, long long frame_nb){ - if(m_bExit) { - // 任务退出了就不再缓存数据了 - return; - } - - std::lock_guard l_pkt(m_frame_list_mtx); - - // 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone - AVFrame *new_pkt = av_frame_clone(frame); - - DataFrame* newFrame = new DataFrame(); - newFrame->frame = new_pkt; - newFrame->frame_nb = frame_nb; - m_frame_list.emplace_back(newFrame); - - // if(is_key_frame(pkt)){ - // // 越来越大的值 - // newFrame->isKeyFrame = true; - // LOG_INFO("key frame_nb: {}", frame_nb); - // } else { - // newFrame->isKeyFrame = false; - // } - - std::lock_guard l_info(m_recoderinfo_list_mtx); - if(m_recoderinfo_list.size() <= 0){ - // 没有任务的时候,维持500的长度 - while(m_frame_list.size() > 1000) { - DataFrame* dataPkt = m_frame_list.front(); - delete dataPkt; - dataPkt = nullptr; - m_frame_list.pop_front(); - } - } -} - -void FFRecoderTaskManager::save_intask_frame_nb(unsigned long long frame_nb) { - if(m_intask_frame_nb_list.size() <= 0) { - m_intask_frame_nb_list.push_back(frame_nb); - return; - } - - for(auto it = m_intask_frame_nb_list.begin(); it != m_intask_frame_nb_list.end(); it++) { - if(*it > frame_nb) { - m_intask_frame_nb_list.insert(it, frame_nb); - return; - } - } - - // 新 frame_nb 比所有的都大 - m_intask_frame_nb_list.push_back(frame_nb); -} - void FFRecoderTaskManager::save_intask_recoderinfo(RecoderInfo info) { std::lock_guard l(m_recoderinfo_list_mtx); if(m_recoderinfo_list.size() <= 0) { @@ -216,77 +156,7 @@ list::iterator FFRecoderTaskManager::getStartIterator(unsigned long return m_pkt_list.begin(); } -list::iterator FFRecoderTaskManager::getEndIterator(unsigned long long frame_nb){ - std::lock_guard l(m_pkt_list_mtx); - - auto it_first = m_pkt_list.end(); - - auto it_second = m_pkt_list.begin(); - for(;it_second != m_pkt_list.end(); it_second++) { - DataPacket* dataPkt = *it_second; - if (dataPkt->isKeyFrame && dataPkt->frame_nb >= frame_nb){ - return it_second; - } - } - - return m_pkt_list.end(); -} - -list::iterator FFRecoderTaskManager::getStartIterator3(unsigned long long frame_nb) { - std::lock_guard l(m_frame_list_mtx); - - auto it_first = m_frame_list.begin(); - - long long start_frame_nb = (long long)(frame_nb - 375); - if(start_frame_nb <= 0) { - return it_first; - } - - auto it_second = m_frame_list.begin(); - for(;it_second != m_frame_list.end(); it_second++) { - DataFrame* dataPkt = *it_second; - if (dataPkt->frame_nb >= start_frame_nb){ - return it_second; - } - } - - return m_frame_list.begin(); -} - -// 多线程版 -void FFRecoderTaskManager::create_recode_task(AVRational time_base, AVCodecContext* avctx, RecoderInfo& recoderInfo){ - - std::lock_guard l(m_task_creat_mtx); - - RecodeParam recodeParam; - recodeParam.time_base = time_base; - recodeParam.avctx = avctx; - recodeParam.recoderInfo = recoderInfo; - - RecodeThreadParam* threadParam = new RecodeThreadParam(); - threadParam->_this = this; - threadParam->param = recodeParam; - std::thread* recode_thread = new std::thread( - [](void* arg) { - RecodeThreadParam* threadParam = (RecodeThreadParam*)arg; - if(threadParam != nullptr){ - FFRecoderTaskManager* _this=(FFRecoderTaskManager*)threadParam->_this; - if(_this != nullptr) { - _this->recode_thread(threadParam->param); - }else{ - LOG_ERROR("recode 线程启动失败 !"); - } - } - return (void*)0; - }, threadParam); - - std::string id = recoderInfo.task_id + "_" + recoderInfo.object_id + "_" + std::to_string(recoderInfo.frame_nb); - m_id_recoderTask[id] = recode_thread; - - save_intask_frame_nb(recoderInfo.frame_nb); -} - -void FFRecoderTaskManager::create_recode_task2(RecoderInfo& recoderInfo) { +void FFRecoderTaskManager::create_recode_task(RecoderInfo& recoderInfo) { if(m_bExit) { // 任务退出了就不再接收录制任务 return; @@ -294,44 +164,8 @@ void FFRecoderTaskManager::create_recode_task2(RecoderInfo& recoderInfo) { save_intask_recoderinfo(recoderInfo); } -// 多线程版 -void FFRecoderTaskManager::recode_thread(RecodeParam recodeParam){ - { - // 此处确保create_recode_task执行完成,m_id_recoderTask 已经保存当前线程信息 - std::lock_guard l(m_task_creat_mtx); - } - - RecoderInfo recoderInfo; - recoderInfo = recodeParam.recoderInfo; - std::string id = recoderInfo.task_id + "_" + recoderInfo.object_id + "_" + std::to_string(recoderInfo.frame_nb); - string file_name = recoderInfo.recoderPath; - FFRecoder ffrecoder; - bool bInit = ffrecoder.init(m_inStream, recodeParam.avctx, file_name.c_str()); - if (!bInit) { - LOG_ERROR("ffrecoder init error : {} {} {}", recoderInfo.task_id, recoderInfo.object_id, recoderInfo.frame_nb); - m_id_recoderTask.erase(id); - return; - } - LOG_INFO("record start, pkt_list size: {} id: {}", m_pkt_list.size(), id); - - int count = 0; - for (auto it = m_pkt_list.begin(); it != m_pkt_list.end() && count < 500; ++it) { - DataPacket* dataPkt = *it; - AVPacket* pkt = dataPkt->pkt; - ffrecoder.write_pkt(pkt); - count++; - } - - ffrecoder.flush(); - ffrecoder.uninit(); - - m_id_recoderTask.erase(id); - - LOG_INFO("record end : {}", file_name); -} - -void FFRecoderTaskManager::recode_thread2() { - LOG_INFO("recode_thread2 start..."); +void FFRecoderTaskManager::recode_thread() { + LOG_INFO("recode_thread start..."); while(true) { if(m_bExit) { break; @@ -361,10 +195,6 @@ void FFRecoderTaskManager::recode_thread2() { auto it = m_pkt_list.begin(); while (it != it_data) { DataPacket* dataPkt = m_pkt_list.front(); - // if(dataPkt->pkt != nullptr) { - // av_packet_free(&dataPkt->pkt); - // dataPkt->pkt = nullptr; - // } delete dataPkt; dataPkt = nullptr; m_pkt_list.pop_front(); @@ -417,157 +247,7 @@ void FFRecoderTaskManager::recode_thread2() { LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name); } - LOG_INFO("recode_thread2 end."); -} - -void FFRecoderTaskManager::recode_thread3() { - LOG_INFO("recode_thread3 start..."); - while(true) { - if(m_bExit) { - break; - } - - m_recoderinfo_list_mtx.lock(); - if(m_recoderinfo_list.size() <= 0){ - m_recoderinfo_list_mtx.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(3)); - continue; - } - - auto it_param = m_recoderinfo_list.begin(); - RecoderInfo recoderinfo = *it_param; - m_recoderinfo_list.pop_front(); - m_recoderinfo_list_mtx.unlock(); - - auto it_data = getStartIterator3(recoderinfo.frame_nb); - if(it_data == m_frame_list.end()) { - std::this_thread::sleep_for(std::chrono::milliseconds(3)); - continue; - } - - LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb); - - m_frame_list_mtx.lock(); - auto it = m_frame_list.begin(); - while (it != it_data) { - DataFrame* dataPkt = m_frame_list.front(); - delete dataPkt; - dataPkt = nullptr; - m_frame_list.pop_front(); - it = m_frame_list.begin(); - } - m_frame_list_mtx.unlock(); - - std::string id = recoderinfo.task_id + "_" + recoderinfo.object_id + "_" + std::to_string(recoderinfo.frame_nb); - string file_name = recoderinfo.recoderPath; - FFRecoder ffrecoder; - bool bInit = ffrecoder.init(m_avctx->width, m_avctx->height, m_time_base, m_avctx, file_name.c_str()); - if (!bInit) { - LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb); - ffrecoder.uninit(); - continue; - } - LOG_INFO("record start, pkt_list size: {} id: {}", m_frame_list.size(), id); - - int count = 0; - auto it_save = it_data; - unsigned long long start_frame_nb = (*it_data)->frame_nb; - unsigned long long end_frame_nb = (*it_data)->frame_nb; - for (; it_save != m_frame_list.end() && count < 500; ++it_save) { - DataFrame* dataPkt = *it_save; - AVFrame* pkt = dataPkt->frame; - ffrecoder.write_frame(pkt); - count++; - end_frame_nb = (*it_save)->frame_nb; - } - - // ffrecoder.flush(); - ffrecoder.uninit(); - - // 发送mq消息 - if(mq_publish_func) { - mq_publish_func(recoderinfo.mq_info.c_str()); - } - - LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name); - } - - LOG_INFO("recode_thread3 end."); -} - -void FFRecoderTaskManager::recode_thread4() { - LOG_INFO("recode_thread4 start..."); - while(true) { - if(m_bExit) { - break; - } - - m_recoderinfo_list_mtx.lock(); - if(m_recoderinfo_list.size() <= 0){ - m_recoderinfo_list_mtx.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(3)); - continue; - } - - auto it_param = m_recoderinfo_list.begin(); - RecoderInfo recoderinfo = *it_param; - m_recoderinfo_list.pop_front(); - m_recoderinfo_list_mtx.unlock(); - - auto it_data = getStartIterator(recoderinfo.frame_nb); - if(it_data == m_pkt_list.end()) { - std::this_thread::sleep_for(std::chrono::milliseconds(3)); - continue; - } - - LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb); - - m_pkt_list_mtx.lock(); - auto it = m_pkt_list.begin(); - while (it != it_data) { - DataPacket* dataPkt = m_pkt_list.front(); - delete dataPkt; - dataPkt = nullptr; - m_pkt_list.pop_front(); - it = m_pkt_list.begin(); - } - m_pkt_list_mtx.unlock(); - - std::string id = recoderinfo.task_id + "_" + recoderinfo.object_id + "_" + std::to_string(recoderinfo.frame_nb); - string file_name = recoderinfo.recoderPath; - FFRecoder ffrecoder; - bool bInit = ffrecoder.init(m_inStream, m_avctx, file_name.c_str()); - if (!bInit) { - LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb); - ffrecoder.uninit(); - continue; - } - LOG_INFO("record start, pkt_list size: {} id: {}", m_pkt_list.size(), id); - - int count = 0; - auto it_save = it_data; - unsigned long long start_frame_nb = (*it_data)->frame_nb; - unsigned long long end_frame_nb = (*it_data)->frame_nb; - for (; it_save != m_pkt_list.end() && count < 500; ++it_save) { - DataPacket* dataPkt = *it_save; - AVPacket* pkt = dataPkt->pkt; - ffrecoder.write_pkt(pkt); - count++; - end_frame_nb = (*it_save)->frame_nb; - } - - // ffrecoder.flush(); - ffrecoder.uninit(); - - // 发送mq消息 - if(mq_publish_func) { - mq_publish_func(recoderinfo.mq_info.c_str()); - } - - LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name); - } - - LOG_INFO("recode_thread4 end."); + LOG_INFO("recode_thread end."); } void FFRecoderTaskManager::close() { diff --git a/src/decoder/dvpp/FFRecoderTaskManager.h b/src/decoder/dvpp/FFRecoderTaskManager.h index 13a556f..a0d44d4 100644 --- a/src/decoder/dvpp/FFRecoderTaskManager.h +++ b/src/decoder/dvpp/FFRecoderTaskManager.h @@ -26,32 +26,20 @@ public: virtual ~FFRecoderTaskManager(); void cache_pkt(AVPacket* pkt, long long frame_nb); - void create_recode_task(AVRational time_base, AVCodecContext* avctx, RecoderInfo& recoderInfo); bool init(AVStream* stream, AVCodecContext* avctx); - void create_recode_task2(RecoderInfo& recoderInfo); + void create_recode_task(RecoderInfo& recoderInfo); void close(); void set_mq_callback(mq_callback_t cb); - bool init3(AVRational time_base, AVCodecContext* avctx); - void cache_frame(AVFrame* frame, long long frame_nb); - public: - void recode_thread(RecodeParam param); - list::iterator getStartIterator(unsigned long long frame_nb); - list::iterator getEndIterator(unsigned long long frame_nb); - - void recode_thread2(); - void recode_thread4(); - - list::iterator getStartIterator3(unsigned long long frame_nb); - void recode_thread3(); + void recode_thread(); private: - void save_intask_frame_nb(unsigned long long frame_nb); void save_intask_recoderinfo(RecoderInfo info); + list::iterator getStartIterator(unsigned long long frame_nb); private: std::queue m_key_frame_interval; -- libgit2 0.21.4