Commit 58f76d3f6d5a55c3cb6e45f9e9afa22d36ca3e9b

Authored by Hu Chunming
1 parent 77e1339c

改用FFRecoder2

src/decoder/dvpp/DvppDecoder.cpp
... ... @@ -197,7 +197,7 @@ AVCodecContext* DvppDecoder::init_FFmpeg(FFDecConfig config){
197 197 m_vdec_out_size = frame_width * frame_height * 3 / 2;
198 198  
199 199 #ifdef USE_VILLAGE
200   - bool bRet = m_recoderManager.init(stream, avctx);
  200 + bool bRet = m_recoderManager.init2(frame_width, frame_height, m_fps, avctx->bit_rate);
201 201 if (!bRet){
202 202 LOG_ERROR("[{}]- m_recoderManager 初始化失败!", m_dec_name);
203 203 }
... ...
src/decoder/dvpp/FFRecoder2.cpp
... ... @@ -72,7 +72,8 @@ bool FFRecoder2::init(int w, int h, int fps, int bit_rate, const char* outfile_n
72 72 av_opt_set(codec_ctx_->priv_data, "tune", "zerolatency", 0);
73 73  
74 74 // 打开解码器
75   - if (avcodec_open2(codec_ctx_, encoder, nullptr) < 0) {
  75 + int ret = avcodec_open2(codec_ctx_, encoder, nullptr);
  76 + if (ret < 0) {
76 77 fprintf(stderr, "Open encoder failed!\n");
77 78 return false;
78 79 }
... ... @@ -206,7 +207,14 @@ bool FFRecoder2::write_frame(const AVFrame* frame)
206 207 return true;
207 208 }
208 209  
  210 +static double a2d(AVRational a) {
  211 + return a.den / a.num;
  212 +}
  213 +
209 214 bool FFRecoder2::write_pkt(AVPacket* pkt) {
  215 + frame_nb++;
  216 + pkt->duration = int(a2d(codec_ctx_->time_base));
  217 + pkt->pts = frame_nb;
210 218 // 将pts缩放到输出流的time_base上
211 219 av_packet_rescale_ts(pkt, codec_ctx_->time_base, out_stream_->time_base);
212 220 pkt->stream_index = out_stream_->index;
... ...
src/decoder/dvpp/FFRecoder2.h
... ... @@ -35,4 +35,6 @@ private:
35 35 AVFormatContext* fmt_ctx_;
36 36 AVStream* out_stream_;
37 37 AVFrame* yuv_frame_;
  38 +
  39 + int frame_nb{0};
38 40 };
39 41 \ No newline at end of file
... ...
src/decoder/dvpp/FFRecoderTaskManager.cpp
... ... @@ -31,9 +31,16 @@ static bool is_key_frame(AVPacket *pkt) {
31 31  
32 32 FFRecoderTaskManager::FFRecoderTaskManager(){
33 33 m_recoder_thread = nullptr;
  34 + m_cache_thread = nullptr;
  35 +
  36 + m_bExit = false;
  37 + m_bExitRecoderThread = false;
34 38 }
35 39  
36 40 FFRecoderTaskManager::~FFRecoderTaskManager(){
  41 +
  42 + close();
  43 +
37 44 LOG_DEBUG("~FFRecoderTaskManager()");
38 45 }
39 46  
... ... @@ -71,11 +78,11 @@ bool FFRecoderTaskManager::init2(int w, int h, int fps, int bit_rate) {
71 78 m_fps = 25;
72 79 }
73 80  
74   - m_recoder_thread = new std::thread(
  81 + m_cache_thread = new std::thread(
75 82 [](void* arg) {
76 83 FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg;
77 84 if(_this != nullptr) {
78   - _this->recode_thread2();
  85 + _this->pkt_cache_thread();
79 86 }else{
80 87 LOG_ERROR("recode 线程启动失败 !");
81 88 }
... ... @@ -89,16 +96,15 @@ static AVPacket* packet_clone(AVPacket* pkt) {
89 96 AVPacket *new_pkt = av_packet_alloc();
90 97 av_init_packet( new_pkt );
91 98 av_new_packet(new_pkt, pkt->size);
92   - // new_pkt->data = (uint8_t *)av_malloc(pkt->size) ;
93 99 memcpy(new_pkt->data, pkt->data, pkt->size);
94 100 new_pkt->size = pkt->size;
95   - new_pkt->pts = pkt->pts;
96   - new_pkt->dts = pkt->dts;
97   - new_pkt->stream_index = pkt->stream_index;
98   - new_pkt->duration = pkt->duration;
99   - new_pkt->pos = pkt->pos;
100   - new_pkt->flags = pkt->flags;
101   - av_copy_packet_side_data(new_pkt, pkt);
  101 + // new_pkt->pts = pkt->pts;
  102 + // new_pkt->dts = pkt->dts;
  103 + // new_pkt->stream_index = pkt->stream_index;
  104 + // new_pkt->duration = pkt->duration;
  105 + // new_pkt->pos = pkt->pos;
  106 + // new_pkt->flags = pkt->flags;
  107 + // av_copy_packet_side_data(new_pkt, pkt);
102 108 return new_pkt;
103 109 }
104 110  
... ... @@ -124,19 +130,9 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){
124 130 } else {
125 131 newDataPkt->isKeyFrame = false;
126 132 }
127   -
128   - m_pkt_list.emplace_back(newDataPkt);
129   -
130   - std::lock_guard<std::mutex> l_info(m_recoderinfo_list_mtx);
131   - if(m_recoderinfo_list.size() <= 0){
132   - // 没有任务的时候,维持500的长度
133   - while(m_pkt_list.size() > 1000) {
134   - DataPacket* dataPkt = m_pkt_list.front();
135   - delete dataPkt;
136   - dataPkt = nullptr;
137   - m_pkt_list.pop_front();
138   - }
139   - }
  133 +
  134 + std::lock_guard<std::mutex> l_info(m_pkt_list_short_mtx);
  135 + m_pkt_list_short.push_back(newDataPkt);
140 136 }
141 137  
142 138 void FFRecoderTaskManager::save_intask_recoderinfo(RecoderInfo info) {
... ... @@ -284,11 +280,76 @@ void FFRecoderTaskManager::recode_thread() {
284 280 LOG_INFO("recode_thread end.");
285 281 }
286 282  
  283 +void FFRecoderTaskManager::pkt_cache_thread() {
  284 + LOG_INFO("pkt_cache_thread start...");
  285 +
  286 + m_recoder_thread = new std::thread(
  287 + [](void* arg) {
  288 + FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg;
  289 + if(_this != nullptr) {
  290 + _this->recode_thread2();
  291 + }else{
  292 + LOG_ERROR("recode 线程启动失败 !");
  293 + }
  294 + return (void*)0;
  295 + }, this);
  296 +
  297 + // 开始缓存
  298 + while(true) {
  299 + if(m_bExit) {
  300 + break;
  301 + }
  302 +
  303 + std::this_thread::sleep_for(std::chrono::milliseconds(500));
  304 +
  305 + m_pkt_list_short_mtx.lock();
  306 + m_pkt_list_mtx.lock();
  307 + for (size_t i = 0; i < m_pkt_list_short.size(); i++) {
  308 + auto item = m_pkt_list_short.front();
  309 + m_pkt_list.push_back(item);
  310 + m_pkt_list_short.pop_front();
  311 + }
  312 + m_pkt_list_mtx.unlock();
  313 + m_pkt_list_short_mtx.unlock();
  314 +
  315 +
  316 + std::lock_guard<std::mutex> l_info(m_recoderinfo_list_mtx);
  317 + if(m_recoderinfo_list.size() <= 0){
  318 + // 没有任务的时候,维持500的长度
  319 + m_pkt_list_mtx.lock();
  320 + while(m_pkt_list.size() > 1000) {
  321 + DataPacket* dataPkt = m_pkt_list.front();
  322 + delete dataPkt;
  323 + dataPkt = nullptr;
  324 + m_pkt_list.pop_front();
  325 + }
  326 + m_pkt_list_mtx.unlock();
  327 + }
  328 + }
  329 +
  330 + m_bExitRecoderThread = true;
  331 +
  332 + if (m_recoder_thread) {
  333 + m_recoder_thread->join();
  334 + delete m_recoder_thread;
  335 + m_recoder_thread = nullptr;
  336 + }
  337 +
  338 + // 清空数据
  339 + while(!m_pkt_list.empty()) {
  340 + DataPacket* dataPkt = m_pkt_list.front();
  341 + delete dataPkt;
  342 + dataPkt = nullptr;
  343 + m_pkt_list.pop_front();
  344 + }
  345 +
  346 + LOG_INFO("pkt_cache_thread end.");
  347 +}
287 348  
288 349 void FFRecoderTaskManager::recode_thread2() {
289 350 LOG_INFO("recode_thread2 start...");
290 351 while(true) {
291   - if(m_bExit) {
  352 + if(m_bExitRecoderThread) {
292 353 break;
293 354 }
294 355  
... ... @@ -313,16 +374,7 @@ void FFRecoderTaskManager::recode_thread2() {
313 374  
314 375 LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb);
315 376  
316   - m_pkt_list_mtx.lock();
317   - auto it = m_pkt_list.begin();
318   - while (it != it_data) {
319   - DataPacket* dataPkt = m_pkt_list.front();
320   - delete dataPkt;
321   - dataPkt = nullptr;
322   - m_pkt_list.pop_front();
323   - it = m_pkt_list.begin();
324   - }
325   - m_pkt_list_mtx.unlock();
  377 + std::lock_guard<std::mutex> l_long(m_pkt_list_mtx);
326 378  
327 379 string file_name = recoderinfo.recoderPath;
328 380  
... ... @@ -377,24 +429,24 @@ void FFRecoderTaskManager::recode_thread2() {
377 429 m_recoderinfo_list_mtx.unlock();
378 430 }
379 431  
380   - LOG_INFO("recode_thread end.");
  432 + LOG_INFO("recode_thread2 end.");
381 433 }
382 434  
383 435 void FFRecoderTaskManager::close() {
384 436 m_bExit = true;
385 437  
386   - if (m_recoder_thread) {
387   - m_recoder_thread->join();
388   - delete m_recoder_thread;
389   - m_recoder_thread = nullptr;
  438 + if (m_cache_thread) {
  439 + m_cache_thread->join();
  440 + delete m_cache_thread;
  441 + m_cache_thread = nullptr;
390 442 }
391 443  
392 444 // 清空数据
393   - while(!m_pkt_list.empty()) {
394   - DataPacket* dataPkt = m_pkt_list.front();
  445 + while(!m_pkt_list_short.empty()) {
  446 + DataPacket* dataPkt = m_pkt_list_short.front();
395 447 delete dataPkt;
396 448 dataPkt = nullptr;
397   - m_pkt_list.pop_front();
  449 + m_pkt_list_short.pop_front();
398 450 }
399 451 }
400 452  
... ...
src/decoder/dvpp/FFRecoderTaskManager.h
... ... @@ -39,6 +39,7 @@ public:
39 39 public:
40 40 void recode_thread();
41 41 void recode_thread2();
  42 + void pkt_cache_thread();
42 43  
43 44 private:
44 45 void save_intask_recoderinfo(RecoderInfo info);
... ... @@ -47,19 +48,23 @@ private:
47 48 private:
48 49 std::queue<int> m_key_frame_interval;
49 50 std::list<unsigned long long> m_keyframe_nb_list;
50   - std::list<DataPacket*> m_pkt_list;
  51 + std::list<DataPacket*> m_pkt_list_short; // 临时缓存
  52 + mutex m_pkt_list_short_mtx;
  53 + std::list<DataPacket*> m_pkt_list; //主要缓存
51 54 mutex m_pkt_list_mtx;
52 55  
53 56 std::list<RecoderInfo> m_recoderinfo_list;
54 57 mutex m_recoderinfo_list_mtx;
55 58  
56 59 bool m_bExit{false};
  60 + bool m_bExitRecoderThread{false};
57 61  
58 62 AVRational m_time_base;
59 63 AVCodecContext* m_avctx;
60 64 AVStream* m_inStream;
61 65  
62 66 thread* m_recoder_thread{nullptr};
  67 + thread* m_cache_thread{nullptr};
63 68  
64 69 mq_callback_t mq_publish_func;
65 70  
... ...