From 58f76d3f6d5a55c3cb6e45f9e9afa22d36ca3e9b Mon Sep 17 00:00:00 2001 From: cmhu <2657262686@qq.com> Date: Fri, 12 Apr 2024 16:41:05 +0800 Subject: [PATCH] 改用FFRecoder2 --- src/decoder/dvpp/DvppDecoder.cpp | 2 +- src/decoder/dvpp/FFRecoder2.cpp | 10 +++++++++- src/decoder/dvpp/FFRecoder2.h | 2 ++ src/decoder/dvpp/FFRecoderTaskManager.cpp | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------ src/decoder/dvpp/FFRecoderTaskManager.h | 7 ++++++- 5 files changed, 112 insertions(+), 45 deletions(-) diff --git a/src/decoder/dvpp/DvppDecoder.cpp b/src/decoder/dvpp/DvppDecoder.cpp index bc16f29..a790466 100644 --- a/src/decoder/dvpp/DvppDecoder.cpp +++ b/src/decoder/dvpp/DvppDecoder.cpp @@ -197,7 +197,7 @@ AVCodecContext* DvppDecoder::init_FFmpeg(FFDecConfig config){ m_vdec_out_size = frame_width * frame_height * 3 / 2; #ifdef USE_VILLAGE - bool bRet = m_recoderManager.init(stream, avctx); + bool bRet = m_recoderManager.init2(frame_width, frame_height, m_fps, avctx->bit_rate); if (!bRet){ LOG_ERROR("[{}]- m_recoderManager 初始化失败!", m_dec_name); } diff --git a/src/decoder/dvpp/FFRecoder2.cpp b/src/decoder/dvpp/FFRecoder2.cpp index caefb3b..f9dd526 100644 --- a/src/decoder/dvpp/FFRecoder2.cpp +++ b/src/decoder/dvpp/FFRecoder2.cpp @@ -72,7 +72,8 @@ bool FFRecoder2::init(int w, int h, int fps, int bit_rate, const char* outfile_n av_opt_set(codec_ctx_->priv_data, "tune", "zerolatency", 0); // 打开解码器 - if (avcodec_open2(codec_ctx_, encoder, nullptr) < 0) { + int ret = avcodec_open2(codec_ctx_, encoder, nullptr); + if (ret < 0) { fprintf(stderr, "Open encoder failed!\n"); return false; } @@ -206,7 +207,14 @@ bool FFRecoder2::write_frame(const AVFrame* frame) return true; } +static double a2d(AVRational a) { + return a.den / a.num; +} + bool FFRecoder2::write_pkt(AVPacket* pkt) { + frame_nb++; + pkt->duration = int(a2d(codec_ctx_->time_base)); + pkt->pts = frame_nb; // 将pts缩放到输出流的time_base上 av_packet_rescale_ts(pkt, codec_ctx_->time_base, out_stream_->time_base); pkt->stream_index = out_stream_->index; diff --git a/src/decoder/dvpp/FFRecoder2.h b/src/decoder/dvpp/FFRecoder2.h index 2ac361c..309aa81 100644 --- a/src/decoder/dvpp/FFRecoder2.h +++ b/src/decoder/dvpp/FFRecoder2.h @@ -35,4 +35,6 @@ private: AVFormatContext* fmt_ctx_; AVStream* out_stream_; AVFrame* yuv_frame_; + + int frame_nb{0}; }; \ No newline at end of file diff --git a/src/decoder/dvpp/FFRecoderTaskManager.cpp b/src/decoder/dvpp/FFRecoderTaskManager.cpp index e6fd545..6e5dfd7 100644 --- a/src/decoder/dvpp/FFRecoderTaskManager.cpp +++ b/src/decoder/dvpp/FFRecoderTaskManager.cpp @@ -31,9 +31,16 @@ static bool is_key_frame(AVPacket *pkt) { FFRecoderTaskManager::FFRecoderTaskManager(){ m_recoder_thread = nullptr; + m_cache_thread = nullptr; + + m_bExit = false; + m_bExitRecoderThread = false; } FFRecoderTaskManager::~FFRecoderTaskManager(){ + + close(); + LOG_DEBUG("~FFRecoderTaskManager()"); } @@ -71,11 +78,11 @@ bool FFRecoderTaskManager::init2(int w, int h, int fps, int bit_rate) { m_fps = 25; } - m_recoder_thread = new std::thread( + m_cache_thread = new std::thread( [](void* arg) { FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg; if(_this != nullptr) { - _this->recode_thread2(); + _this->pkt_cache_thread(); }else{ LOG_ERROR("recode 线程启动失败 !"); } @@ -89,16 +96,15 @@ static AVPacket* packet_clone(AVPacket* pkt) { AVPacket *new_pkt = av_packet_alloc(); av_init_packet( new_pkt ); av_new_packet(new_pkt, pkt->size); - // new_pkt->data = (uint8_t *)av_malloc(pkt->size) ; memcpy(new_pkt->data, pkt->data, pkt->size); new_pkt->size = pkt->size; - new_pkt->pts = pkt->pts; - new_pkt->dts = pkt->dts; - new_pkt->stream_index = pkt->stream_index; - new_pkt->duration = pkt->duration; - new_pkt->pos = pkt->pos; - new_pkt->flags = pkt->flags; - av_copy_packet_side_data(new_pkt, pkt); + // new_pkt->pts = pkt->pts; + // new_pkt->dts = pkt->dts; + // new_pkt->stream_index = pkt->stream_index; + // new_pkt->duration = pkt->duration; + // new_pkt->pos = pkt->pos; + // new_pkt->flags = pkt->flags; + // av_copy_packet_side_data(new_pkt, pkt); return new_pkt; } @@ -124,19 +130,9 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){ } else { newDataPkt->isKeyFrame = false; } - - m_pkt_list.emplace_back(newDataPkt); - - std::lock_guard l_info(m_recoderinfo_list_mtx); - if(m_recoderinfo_list.size() <= 0){ - // 没有任务的时候,维持500的长度 - while(m_pkt_list.size() > 1000) { - DataPacket* dataPkt = m_pkt_list.front(); - delete dataPkt; - dataPkt = nullptr; - m_pkt_list.pop_front(); - } - } + + std::lock_guard l_info(m_pkt_list_short_mtx); + m_pkt_list_short.push_back(newDataPkt); } void FFRecoderTaskManager::save_intask_recoderinfo(RecoderInfo info) { @@ -284,11 +280,76 @@ void FFRecoderTaskManager::recode_thread() { LOG_INFO("recode_thread end."); } +void FFRecoderTaskManager::pkt_cache_thread() { + LOG_INFO("pkt_cache_thread start..."); + + m_recoder_thread = new std::thread( + [](void* arg) { + FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg; + if(_this != nullptr) { + _this->recode_thread2(); + }else{ + LOG_ERROR("recode 线程启动失败 !"); + } + return (void*)0; + }, this); + + // 开始缓存 + while(true) { + if(m_bExit) { + break; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + m_pkt_list_short_mtx.lock(); + m_pkt_list_mtx.lock(); + for (size_t i = 0; i < m_pkt_list_short.size(); i++) { + auto item = m_pkt_list_short.front(); + m_pkt_list.push_back(item); + m_pkt_list_short.pop_front(); + } + m_pkt_list_mtx.unlock(); + m_pkt_list_short_mtx.unlock(); + + + std::lock_guard l_info(m_recoderinfo_list_mtx); + if(m_recoderinfo_list.size() <= 0){ + // 没有任务的时候,维持500的长度 + m_pkt_list_mtx.lock(); + while(m_pkt_list.size() > 1000) { + DataPacket* dataPkt = m_pkt_list.front(); + delete dataPkt; + dataPkt = nullptr; + m_pkt_list.pop_front(); + } + m_pkt_list_mtx.unlock(); + } + } + + m_bExitRecoderThread = true; + + if (m_recoder_thread) { + m_recoder_thread->join(); + delete m_recoder_thread; + m_recoder_thread = nullptr; + } + + // 清空数据 + while(!m_pkt_list.empty()) { + DataPacket* dataPkt = m_pkt_list.front(); + delete dataPkt; + dataPkt = nullptr; + m_pkt_list.pop_front(); + } + + LOG_INFO("pkt_cache_thread end."); +} void FFRecoderTaskManager::recode_thread2() { LOG_INFO("recode_thread2 start..."); while(true) { - if(m_bExit) { + if(m_bExitRecoderThread) { break; } @@ -313,16 +374,7 @@ void FFRecoderTaskManager::recode_thread2() { LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb); - m_pkt_list_mtx.lock(); - auto it = m_pkt_list.begin(); - while (it != it_data) { - DataPacket* dataPkt = m_pkt_list.front(); - delete dataPkt; - dataPkt = nullptr; - m_pkt_list.pop_front(); - it = m_pkt_list.begin(); - } - m_pkt_list_mtx.unlock(); + std::lock_guard l_long(m_pkt_list_mtx); string file_name = recoderinfo.recoderPath; @@ -377,24 +429,24 @@ void FFRecoderTaskManager::recode_thread2() { m_recoderinfo_list_mtx.unlock(); } - LOG_INFO("recode_thread end."); + LOG_INFO("recode_thread2 end."); } void FFRecoderTaskManager::close() { m_bExit = true; - if (m_recoder_thread) { - m_recoder_thread->join(); - delete m_recoder_thread; - m_recoder_thread = nullptr; + if (m_cache_thread) { + m_cache_thread->join(); + delete m_cache_thread; + m_cache_thread = nullptr; } // 清空数据 - while(!m_pkt_list.empty()) { - DataPacket* dataPkt = m_pkt_list.front(); + while(!m_pkt_list_short.empty()) { + DataPacket* dataPkt = m_pkt_list_short.front(); delete dataPkt; dataPkt = nullptr; - m_pkt_list.pop_front(); + m_pkt_list_short.pop_front(); } } diff --git a/src/decoder/dvpp/FFRecoderTaskManager.h b/src/decoder/dvpp/FFRecoderTaskManager.h index 32fa621..b9fcc8d 100644 --- a/src/decoder/dvpp/FFRecoderTaskManager.h +++ b/src/decoder/dvpp/FFRecoderTaskManager.h @@ -39,6 +39,7 @@ public: public: void recode_thread(); void recode_thread2(); + void pkt_cache_thread(); private: void save_intask_recoderinfo(RecoderInfo info); @@ -47,19 +48,23 @@ private: private: std::queue m_key_frame_interval; std::list m_keyframe_nb_list; - std::list m_pkt_list; + std::list m_pkt_list_short; // 临时缓存 + mutex m_pkt_list_short_mtx; + std::list m_pkt_list; //主要缓存 mutex m_pkt_list_mtx; std::list m_recoderinfo_list; mutex m_recoderinfo_list_mtx; bool m_bExit{false}; + bool m_bExitRecoderThread{false}; AVRational m_time_base; AVCodecContext* m_avctx; AVStream* m_inStream; thread* m_recoder_thread{nullptr}; + thread* m_cache_thread{nullptr}; mq_callback_t mq_publish_func; -- libgit2 0.21.4