Commit b83a105d49a954f143cd659f1b3298bef123f35a

Authored by Hu Chunming
1 parent b070c0fd

1. 修正是否实时流判断错误的问题;

2.添加display线程,以实现解码后跳帧,避免解码后的处理操作耗时过长阻塞读取数据包从而导致花屏
3. 优化解码失败可能造成的显存泄漏问题
src/decoder/dvpp/DvppDecoder.cpp
... ... @@ -17,7 +17,6 @@ struct Vdec_CallBack_UserData {
17 17  
18 18 DvppDecoder::DvppDecoder(){
19 19 m_read_thread = 0;
20   - m_decode_thread = 0;
21 20 m_cached_mem = nullptr;
22 21  
23 22 fmt_ctx = nullptr;
... ... @@ -76,8 +75,9 @@ AVCodecContext* DvppDecoder::init_FFmpeg(FFDecConfig config){
76 75 avformat_network_init();
77 76  
78 77 const char* uri = config.uri.c_str();
79   - fstream infile(uri);
80   - if (infile.is_open()){
  78 + fstream infile;
  79 + infile.open(uri, ios::in);
  80 + if (!infile.fail()){
81 81 m_bReal = false;
82 82 infile.close();
83 83 } else {
... ... @@ -335,6 +335,7 @@ void DvppDecoder::read_thread() {
335 335 int frame_count = 0;
336 336 int ret = -1;
337 337  
  338 + pthread_t m_decode_thread;
338 339 pthread_create(&m_decode_thread,0,
339 340 [](void* arg)
340 341 {
... ... @@ -393,7 +394,7 @@ void DvppDecoder::read_thread() {
393 394 bool bPushed = false;
394 395 while ((ret = av_bsf_receive_packet(h264bsfc, pkt)) == 0) {
395 396 if(pkt->size > g_pkt_size){
396   - LOG_ERROR("[{}]- pkt size 大于最大预设值, 为 {}!", m_dec_name, pkt->size);
  397 + LOG_ERROR("[{}]- pkt size 大于最大预设值!", m_dec_name);
397 398 break;
398 399 }
399 400  
... ... @@ -412,13 +413,12 @@ void DvppDecoder::read_thread() {
412 413 data_pkt->frame_nb = frame_nb;
413 414 m_pktQueue.push(data_pkt);
414 415 m_pktQueue_mutex.unlock();
415   - bPushed = true;
416 416  
  417 + bPushed = true;
417 418 frame_count++;
418 419 }
419 420  
420   - if(!bPushed)
421   - {
  421 + if(!bPushed){
422 422 av_packet_free(&pkt);
423 423 pkt = nullptr;
424 424 }
... ... @@ -509,6 +509,13 @@ static void VdecCallback(acldvppStreamDesc *input, acldvppPicDesc *output, void
509 509 }
510 510 }
511 511  
  512 +
  513 +static long get_cur_time_ms() {
  514 + chrono::time_point<chrono::system_clock, chrono::milliseconds> tpMicro
  515 + = chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now());
  516 + return tpMicro.time_since_epoch().count();
  517 +}
  518 +
512 519 void DvppDecoder::doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicDesc *output, unsigned long long frame_nb){
513 520  
514 521 m_vdecQueue_mutex.lock();
... ... @@ -539,71 +546,58 @@ void DvppDecoder::doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicDesc *o
539 546 break;
540 547 }
541 548  
542   - if(width > 0 && height > 0 && outputSize > 0){
543   - DvppDataMemory* mem = new DvppDataMemory(width, width_stride, height, height_stride, outputSize, m_dec_name, to_string(m_dvpp_deviceId), false, frame_nb, (unsigned char *)outputDataDev);
544   - if(mem){
545   - if(post_decoded_cbk) {
546   - post_decoded_cbk(m_postDecArg, mem);
547   - } else {
548   - delete mem;
549   - mem = nullptr;
550   - }
  549 + bool bCached = false;
551 550  
552   - if(m_bSnapShoting){
553   - // 缓存snapshot
554   - std::unique_lock<std::mutex> locker(m_cached_mutex);
555   -
556   - m_cached_mem = new DvppDataMemory(-1, width, width_stride, height, height_stride, outputSize, m_dec_name, to_string(m_dvpp_deviceId), false, 0);
557   - if(m_cached_mem != nullptr){
558   - aclrtMemcpy(m_cached_mem->getMem(), outputSize, (unsigned char *)outputDataDev, outputSize, ACL_MEMCPY_DEVICE_TO_DEVICE);
  551 + if(width > 0 && height > 0 && outputSize > 0){
  552 + if (!m_bReal) {
  553 + while(m_bRunning) {
  554 + // 非实时流,即为文件情形,因为不存在花屏问题,为保证不丢帧,这里做个循环等待
  555 + m_decoded_data_queue_mtx.lock();
  556 + if(m_decoded_data_queue.size() > 5){
  557 + m_decoded_data_queue_mtx.unlock();
  558 + std::this_thread::sleep_for(std::chrono::milliseconds(5));
  559 + continue;
559 560 }
560   -
561   - locker.unlock();
562   - m_cached_cond.notify_one();
563   - m_bSnapShoting = false;
  561 + m_decoded_data_queue_mtx.unlock();
  562 + break;
564 563 }
565   - } else {
566   - LOG_ERROR("[{}]- DvppDataMemory 创建失败! ", m_dec_name, ret);
567   - acldvppFree(outputDataDev);
568   - outputDataDev = nullptr;
569 564 }
570 565  
571   - } else {
  566 + // cout << m_dec_name << " 解码时间间隔: " << get_cur_time_ms() - last_ts << endl;
  567 + // last_ts = get_cur_time_ms();
  568 +
  569 + // 换成解码后数据, 这里这样做的是为了保证解码一直持续进行,避免后续操作阻碍文件读取和解码从而导致花屏
  570 + m_decoded_data_queue_mtx.lock();
  571 + if(m_decoded_data_queue.size() <= 5) {
  572 + DvppDataMemory* mem = new DvppDataMemory(width, width_stride, height, height_stride, outputSize, m_dec_name, to_string(m_dvpp_deviceId), false, frame_nb, (unsigned char *)outputDataDev);
  573 + if(mem){
  574 + m_decoded_data_queue.push(mem);
  575 + bCached = true;
  576 + }
  577 + }
  578 + m_decoded_data_queue_mtx.unlock();
  579 +
  580 + if(m_bSnapShoting){
  581 + // 缓存snapshot
  582 + std::unique_lock<std::mutex> locker(m_cached_mutex);
  583 +
  584 + m_cached_mem = new DvppDataMemory(-1, width, width_stride, height, height_stride, outputSize, m_dec_name, to_string(m_dvpp_deviceId), false, 0);
  585 + if(m_cached_mem != nullptr){
  586 + aclrtMemcpy(m_cached_mem->getMem(), outputSize, (unsigned char *)outputDataDev, outputSize, ACL_MEMCPY_DEVICE_TO_DEVICE);
  587 + }
  588 +
  589 + locker.unlock();
  590 + m_cached_cond.notify_one();
  591 + m_bSnapShoting = false;
  592 + }
  593 +
  594 + }
  595 +
  596 + if(!bCached) {
572 597 LOG_WARN("[{}]- decode result error, width:{} width_stride:{} height:{} height_stride:{} size:{}", m_dec_name, width, width_stride, height, height_stride, outputSize);
573 598 acldvppFree(outputDataDev);
574 599 outputDataDev = nullptr;
575 600 }
576   -
577   - // DvppDataMemory* rgbMem = picConverter.convert2bgr(output, width, height, false);
578   - // if(rgbMem != nullptr){
579   - // #ifdef TEST_DECODER
580   - // // D2H
581   - // if(vdecHostAddr == nullptr){
582   - // CHECK_NOT_RETURN(aclrtMallocHost(&vdecHostAddr, width * height * 3), "aclrtMallocHost failed");
583   - // }
584   - // uint32_t data_size = rgbMem->getSize();
585   - // CHECK_AND_RETURN_NOVALUE(aclrtMemcpy(vdecHostAddr, data_size, rgbMem->getMem(), data_size, ACL_MEMCPY_DEVICE_TO_HOST), "D2H aclrtMemcpy failed");
586   -
587   - // // 保存vdec结果
588   - // if(count_frame > 45 && count_frame < 50)
589   - // {
590   - // string file_name = "./yuv_pic/vdec_out_"+ m_dec_name +".rgb" ;
591   - // FILE *outputFile = fopen(file_name.c_str(), "a");
592   - // if(outputFile){
593   - // fwrite(vdecHostAddr, data_size, sizeof(char), outputFile);
594   - // fclose(outputFile);
595   - // }
596   - // }
597   - // else if(count_frame > 50 && vdecHostAddr != nullptr){
598   - // CHECK_NOT_RETURN(aclrtFreeHost(vdecHostAddr), "aclrtFreeHost failed");
599   - // vdecHostAddr = nullptr;
600   - // }
601   - // count_frame++;
602   - // #endif
603   - // post_decoded_cbk(m_postDecArg, rgbMem);
604   - // }else{
605   - // LOG_ERROR("[{}]- convert2bgr failed !", m_dec_name);
606   - // }
607 601 }while(0);
608 602  
609 603 CHECK_AND_RETURN_NOVALUE(acldvppDestroyStreamDesc(input), "acldvppDestroyStreamDesc failed");
... ... @@ -624,6 +618,16 @@ void DvppDecoder::decode_thread(){
624 618 return;
625 619 }
626 620  
  621 + pthread_t display_thread;
  622 + pthread_create(&display_thread,0,
  623 + [](void* arg)
  624 + {
  625 + DvppDecoder* a=(DvppDecoder*)arg;
  626 + a->display_thread();
  627 + return (void*)0;
  628 + }
  629 + ,this);
  630 +
627 631 aclrtSetDevice(m_dvpp_deviceId);
628 632 aclrtContext ctx;
629 633 ret = aclrtCreateContext(&ctx, m_dvpp_deviceId);
... ... @@ -651,8 +655,7 @@ void DvppDecoder::decode_thread(){
651 655  
652 656 uint64_t frame_count = 0;
653 657 bool bBreak = false;
654   - while (m_bRunning)
655   - {
  658 + while (m_bRunning) {
656 659 if (m_bPause){
657 660 std::this_thread::sleep_for(std::chrono::milliseconds(3));
658 661 continue;
... ... @@ -662,10 +665,9 @@ void DvppDecoder::decode_thread(){
662 665 bBreak = true;
663 666 break;
664 667 }else if(ret == 1){
665   - std::this_thread::sleep_for(std::chrono::milliseconds(3));
666 668 continue;
667 669 }
668   -
  670 +
669 671 frame_count++;
670 672 }
671 673  
... ... @@ -699,6 +701,7 @@ void DvppDecoder::decode_thread(){
699 701 m_bRunning = false;
700 702 m_bExitReportThd = true;
701 703 CHECK_NOT_RETURN(pthread_join(report_thread, nullptr), "pthread_join failed");
  704 + CHECK_NOT_RETURN(pthread_join(display_thread, nullptr), "pthread_join failed");
702 705  
703 706 // 最后清理一遍未解码的数据
704 707 m_vdecQueue_mutex.lock();
... ... @@ -741,7 +744,7 @@ int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_c
741 744 data_pkt = m_pktQueue.front();
742 745 m_pktQueue.pop();
743 746 m_pktQueue_mutex.unlock();
744   -
  747 +
745 748 // 解码
746 749 void *vdecInputbuf = nullptr;
747 750 int ret = acldvppMalloc((void **)&vdecInputbuf, g_pkt_size);
... ... @@ -770,7 +773,6 @@ int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_c
770 773 return 2;
771 774 }
772 775  
773   - bool bRet = false;
774 776 acldvppStreamDesc *input_stream_desc = nullptr;
775 777 acldvppPicDesc *output_pic_desc = nullptr;
776 778 do{
... ... @@ -808,12 +810,10 @@ int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_c
808 810 user_data = nullptr;
809 811 break;
810 812 }
811   -
  813 +
812 814 m_vdecQueue.push(vdecInputbuf);
813 815 m_vdecQueue_mutex.unlock();
814 816  
815   - bRet = true;
816   -
817 817 return 0;
818 818 }while (0);
819 819  
... ... @@ -822,7 +822,7 @@ int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_c
822 822 data_pkt = nullptr;
823 823 }
824 824  
825   - if (!bRet){
  825 + if (vdecInputbuf){
826 826 acldvppFree(vdecInputbuf);
827 827 vdecInputbuf = nullptr;
828 828 }
... ... @@ -870,6 +870,37 @@ bool DvppDecoder::sendVdecEos(aclvdecChannelDesc *vdecChannelDesc) {
870 870 return true;
871 871 }
872 872  
  873 +void DvppDecoder::display_thread(){
  874 + LOG_INFO("[{}]- display_thread start...", m_dec_name);
  875 + while(m_bRunning) {
  876 + m_decoded_data_queue_mtx.lock();
  877 + if(m_decoded_data_queue.size() <= 0) {
  878 + m_decoded_data_queue_mtx.unlock();
  879 + std::this_thread::sleep_for(std::chrono::milliseconds(5));
  880 + continue;
  881 + }
  882 + DvppDataMemory* mem = m_decoded_data_queue.front();
  883 + m_decoded_data_queue.pop();
  884 + m_decoded_data_queue_mtx.unlock();
  885 +
  886 + if(post_decoded_cbk) {
  887 + post_decoded_cbk(m_postDecArg, mem);
  888 + } else {
  889 + delete mem;
  890 + mem = nullptr;
  891 + }
  892 + }
  893 +
  894 + while (m_decoded_data_queue.size() > 0){
  895 + DvppDataMemory* mem = m_decoded_data_queue.front();
  896 + m_decoded_data_queue.pop();
  897 + delete mem;
  898 + mem = nullptr;
  899 + }
  900 +
  901 + LOG_INFO("[{}]- display_thread end.", m_dec_name);
  902 +}
  903 +
873 904 void DvppDecoder::release_dvpp(){
874 905 if(m_context){
875 906 aclError ret = aclrtDestroyContext(m_context);
... ...
src/decoder/dvpp/DvppDecoder.h
... ... @@ -71,6 +71,8 @@ private:
71 71 bool sendVdecEos(aclvdecChannelDesc *vdecChannelDesc);
72 72 void release_dvpp();
73 73  
  74 + void display_thread();
  75 +
74 76 private:
75 77 FFDecConfig m_cfg;
76 78 string m_dec_name;
... ... @@ -109,7 +111,6 @@ private:
109 111 aclrtContext m_context{nullptr};
110 112 acldvppStreamFormat enType;
111 113  
112   - pthread_t m_decode_thread{0};
113 114 mutex m_vdecQueue_mutex;
114 115 queue<void*> m_vdecQueue;
115 116  
... ... @@ -125,4 +126,9 @@ private:
125 126 condition_variable m_cached_cond;
126 127  
127 128 FFRecoderTaskManager m_recoderManager;
  129 +
  130 + queue<DvppDataMemory*> m_decoded_data_queue;
  131 + mutex m_decoded_data_queue_mtx;
  132 +
  133 + long long last_ts {0};
128 134 };
129 135 \ No newline at end of file
... ...