Commit 294ef5bc750b52b65014c459ca566c87d1247d0f

Authored by Hu Chunming
1 parent 190b4e40

修复解码器内因线程退出顺序错误导致的显存泄漏问题;

添加count_running_task和close_all_task接口
src/ai_platform/MultiSourceProcess.cpp
@@ -462,7 +462,7 @@ bool CMultiSourceProcess::AddTask(task_param _cur_task_param){ @@ -462,7 +462,7 @@ bool CMultiSourceProcess::AddTask(task_param _cur_task_param){
462 // if(algor_face != algor_map->end()){ 462 // if(algor_face != algor_map->end()){
463 // const algor_basic_config_param_t *cur_param = ((algor_init_config_param_t *)(algor_face->second))->basic_param; 463 // const algor_basic_config_param_t *cur_param = ((algor_init_config_param_t *)(algor_face->second))->basic_param;
464 // LOG_INFO("face_snapshot, result_folder: {} result_folder_little: {}", cur_param->result_folder, cur_param->result_folder_little); 464 // LOG_INFO("face_snapshot, result_folder: {} result_folder_little: {}", cur_param->result_folder, cur_param->result_folder_little);
465 - // } 465 + // }
466 } 466 }
467 467
468 LOG_INFO("started task {} successed!", config.name); 468 LOG_INFO("started task {} successed!", config.name);
@@ -470,6 +470,11 @@ bool CMultiSourceProcess::AddTask(task_param _cur_task_param){ @@ -470,6 +470,11 @@ bool CMultiSourceProcess::AddTask(task_param _cur_task_param){
470 return true; 470 return true;
471 } 471 }
472 472
  473 +int CMultiSourceProcess::CountRunningTask() {
  474 + DecoderManager* pDecManager = DecoderManager::getInstance();
  475 + return pDecManager->count();
  476 +}
  477 +
473 bool CMultiSourceProcess::task_has_vpt_algor(const std::string &task_id){ 478 bool CMultiSourceProcess::task_has_vpt_algor(const std::string &task_id){
474 //! TODO: create enum iterator. 479 //! TODO: create enum iterator.
475 auto algor_map = m_task_param_manager->get_task_other_param(task_id); 480 auto algor_map = m_task_param_manager->get_task_other_param(task_id);
@@ -684,6 +689,11 @@ void CMultiSourceProcess::CloseAllTask(){ @@ -684,6 +689,11 @@ void CMultiSourceProcess::CloseAllTask(){
684 LOG_INFO("CloseAllTask exit."); 689 LOG_INFO("CloseAllTask exit.");
685 } 690 }
686 691
  692 +void CMultiSourceProcess::CloseAllTask2() {
  693 + DecoderManager* pDecManager = DecoderManager::getInstance();
  694 + pDecManager->closeAllDecoder();
  695 +}
  696 +
687 void CMultiSourceProcess::clear_finished_task(){// 清理已经结束的任务 697 void CMultiSourceProcess::clear_finished_task(){// 清理已经结束的任务
688 698
689 std::lock_guard<std::mutex> l1(m_FinishedTaskMtx); 699 std::lock_guard<std::mutex> l1(m_FinishedTaskMtx);
src/ai_platform/MultiSourceProcess.h
@@ -46,6 +46,9 @@ public: @@ -46,6 +46,9 @@ public:
46 bool FinishTask(const string taskID); 46 bool FinishTask(const string taskID);
47 void CloseAllTask(); 47 void CloseAllTask();
48 int SnapShot(task_param param); 48 int SnapShot(task_param param);
  49 + int CountRunningTask();
  50 +
  51 + void CloseAllTask2();
49 52
50 #ifdef POST_USE_RABBITMQ 53 #ifdef POST_USE_RABBITMQ
51 int AddMqConn(mq_type_t mq_type, rabbitmq_conn_params_t mq_conn_param); 54 int AddMqConn(mq_type_t mq_type, rabbitmq_conn_params_t mq_conn_param);
src/ai_platform/stl_aiplatform.cpp
@@ -95,6 +95,17 @@ int tsl_aiplatform_release(void **handle) @@ -95,6 +95,17 @@ int tsl_aiplatform_release(void **handle)
95 return SUCCESS; 95 return SUCCESS;
96 } 96 }
97 97
  98 +int count_running_task(void *handle)
  99 +{
  100 + CMultiSourceProcess* tools = (CMultiSourceProcess*)handle;
  101 + return tools->CountRunningTask();
  102 +}
  103 +
  104 +void close_all_task(void *handle)
  105 +{
  106 + CMultiSourceProcess* tools = (CMultiSourceProcess*)handle;
  107 + tools->CloseAllTask2();
  108 +}
98 109
99 const char* get_tsl_aiplatform_version() 110 const char* get_tsl_aiplatform_version()
100 { 111 {
src/ai_platform/stl_aiplatform.h
@@ -120,6 +120,11 @@ extern &quot;C&quot; @@ -120,6 +120,11 @@ extern &quot;C&quot;
120 TSL_AIPLATFORM_API int tsl_aiplatform_release(void **handle); 120 TSL_AIPLATFORM_API int tsl_aiplatform_release(void **handle);
121 121
122 122
  123 + TSL_AIPLATFORM_API int count_running_task(void *handle);
  124 +
  125 + TSL_AIPLATFORM_API void close_all_task(void *handle);
  126 +
  127 +
123 /************************************************************************* 128 /*************************************************************************
124 * FUNCTION: get_tsl_aiplatform_version 129 * FUNCTION: get_tsl_aiplatform_version
125 * PURPOSE: 获取SDK版本号 130 * PURPOSE: 获取SDK版本号
src/decoder/dvpp/DvppDataMemory.hpp
@@ -25,7 +25,7 @@ public: @@ -25,7 +25,7 @@ public:
25 25
26 ~DvppDataMemory(){ 26 ~DvppDataMemory(){
27 if (pHwRgb) { 27 if (pHwRgb) {
28 - int ret = acldvppFree((uint8_t*)pHwRgb); 28 + int ret = acldvppFree(pHwRgb);
29 if(ret != ACL_ERROR_NONE){ 29 if(ret != ACL_ERROR_NONE){
30 cout << "acldvppFree failed" << endl; 30 cout << "acldvppFree failed" << endl;
31 } 31 }
src/decoder/dvpp/DvppDecoder.cpp
@@ -335,6 +335,7 @@ void DvppDecoder::read_thread() { @@ -335,6 +335,7 @@ void DvppDecoder::read_thread() {
335 int frame_count = 0; 335 int frame_count = 0;
336 int ret = -1; 336 int ret = -1;
337 337
  338 + m_bExitDecodeThd = false;
338 pthread_t m_decode_thread; 339 pthread_t m_decode_thread;
339 pthread_create(&m_decode_thread,0, 340 pthread_create(&m_decode_thread,0,
340 [](void* arg) 341 [](void* arg)
@@ -429,8 +430,11 @@ void DvppDecoder::read_thread() { @@ -429,8 +430,11 @@ void DvppDecoder::read_thread() {
429 } 430 }
430 } 431 }
431 432
432 - m_bRunning=false; 433 + while(m_bRunning && m_pktQueue.size() > 0) {
  434 + std::this_thread::sleep_for(std::chrono::milliseconds(5));
  435 + }
433 436
  437 + m_bExitDecodeThd = true;
434 if(m_decode_thread != 0){ 438 if(m_decode_thread != 0){
435 pthread_join(m_decode_thread,0); 439 pthread_join(m_decode_thread,0);
436 } 440 }
@@ -444,6 +448,8 @@ void DvppDecoder::read_thread() { @@ -444,6 +448,8 @@ void DvppDecoder::read_thread() {
444 } 448 }
445 m_pktQueue_mutex.unlock(); 449 m_pktQueue_mutex.unlock();
446 450
  451 + m_bRunning=false;
  452 +
447 if(decode_finished_cbk) { 453 if(decode_finished_cbk) {
448 decode_finished_cbk(m_finishedDecArg); 454 decode_finished_cbk(m_finishedDecArg);
449 } 455 }
@@ -618,6 +624,7 @@ void DvppDecoder::decode_thread(){ @@ -618,6 +624,7 @@ void DvppDecoder::decode_thread(){
618 return; 624 return;
619 } 625 }
620 626
  627 + m_bExitDisplayThd = false;
621 pthread_t display_thread; 628 pthread_t display_thread;
622 pthread_create(&display_thread,0, 629 pthread_create(&display_thread,0,
623 [](void* arg) 630 [](void* arg)
@@ -655,7 +662,7 @@ void DvppDecoder::decode_thread(){ @@ -655,7 +662,7 @@ void DvppDecoder::decode_thread(){
655 662
656 uint64_t frame_count = 0; 663 uint64_t frame_count = 0;
657 bool bBreak = false; 664 bool bBreak = false;
658 - while (m_bRunning) { 665 + while (!m_bExitDecodeThd) {
659 if (m_bPause){ 666 if (m_bPause){
660 std::this_thread::sleep_for(std::chrono::milliseconds(3)); 667 std::this_thread::sleep_for(std::chrono::milliseconds(3));
661 continue; 668 continue;
@@ -694,18 +701,26 @@ void DvppDecoder::decode_thread(){ @@ -694,18 +701,26 @@ void DvppDecoder::decode_thread(){
694 701
695 CHECK_NOT_RETURN(aclvdecDestroyChannel(vdecChannelDesc), "aclvdecDestroyChannel failed"); 702 CHECK_NOT_RETURN(aclvdecDestroyChannel(vdecChannelDesc), "aclvdecDestroyChannel failed");
696 }while(0); 703 }while(0);
697 -  
698 - CHECK_NOT_RETURN(aclvdecDestroyChannelDesc(vdecChannelDesc), "aclvdecDestroyChannelDesc failed");  
699 704
  705 + // 退出 report thread
  706 + while(m_bRunning && m_vdecQueue.size() > 0) {
  707 + std::this_thread::sleep_for(std::chrono::milliseconds(5));
  708 + }
  709 + CHECK_NOT_RETURN(aclvdecDestroyChannelDesc(vdecChannelDesc), "aclvdecDestroyChannelDesc failed");
700 // report_thread 需后于destroy退出 710 // report_thread 需后于destroy退出
701 - m_bRunning = false;  
702 m_bExitReportThd = true; 711 m_bExitReportThd = true;
703 - CHECK_NOT_RETURN(pthread_join(report_thread, nullptr), "pthread_join failed");  
704 - CHECK_NOT_RETURN(pthread_join(display_thread, nullptr), "pthread_join failed"); 712 + CHECK_NOT_RETURN(pthread_join(report_thread, nullptr), "report_thread join failed");
  713 +
  714 + // 退出 display thread
  715 + while(m_bRunning && m_decoded_data_queue.size() > 0) {
  716 + std::this_thread::sleep_for(std::chrono::milliseconds(5));
  717 + }
  718 + m_bExitDisplayThd = true;
  719 + CHECK_NOT_RETURN(pthread_join(display_thread, nullptr), "display_thread join failed");
705 720
706 // 最后清理一遍未解码的数据 721 // 最后清理一遍未解码的数据
707 m_vdecQueue_mutex.lock(); 722 m_vdecQueue_mutex.lock();
708 - if(m_vdecQueue.size() > 0){ 723 + while(m_vdecQueue.size() > 0){
709 void* inputData = m_vdecQueue.front(); 724 void* inputData = m_vdecQueue.front();
710 acldvppFree(inputData); 725 acldvppFree(inputData);
711 inputData = nullptr; 726 inputData = nullptr;
@@ -872,7 +887,7 @@ bool DvppDecoder::sendVdecEos(aclvdecChannelDesc *vdecChannelDesc) { @@ -872,7 +887,7 @@ bool DvppDecoder::sendVdecEos(aclvdecChannelDesc *vdecChannelDesc) {
872 887
873 void DvppDecoder::display_thread(){ 888 void DvppDecoder::display_thread(){
874 LOG_INFO("[{}]- display_thread start...", m_dec_name); 889 LOG_INFO("[{}]- display_thread start...", m_dec_name);
875 - while(m_bRunning) { 890 + while(!m_bExitDisplayThd) {
876 m_decoded_data_queue_mtx.lock(); 891 m_decoded_data_queue_mtx.lock();
877 if(m_decoded_data_queue.size() <= 0) { 892 if(m_decoded_data_queue.size() <= 0) {
878 m_decoded_data_queue_mtx.unlock(); 893 m_decoded_data_queue_mtx.unlock();
@@ -898,7 +913,7 @@ void DvppDecoder::display_thread(){ @@ -898,7 +913,7 @@ void DvppDecoder::display_thread(){
898 mem = nullptr; 913 mem = nullptr;
899 } 914 }
900 915
901 - LOG_INFO("[{}]- display_thread end.", m_dec_name); 916 + LOG_INFO("[{}]- display_thread exit.", m_dec_name);
902 } 917 }
903 918
904 void DvppDecoder::release_dvpp(){ 919 void DvppDecoder::release_dvpp(){
src/decoder/dvpp/DvppDecoder.h
@@ -83,7 +83,10 @@ private: @@ -83,7 +83,10 @@ private:
83 bool m_bFinished{false}; 83 bool m_bFinished{false};
84 bool m_bRunning{false}; 84 bool m_bRunning{false};
85 bool m_bPause{false}; 85 bool m_bPause{false};
  86 +
86 bool m_bExitReportThd{false}; 87 bool m_bExitReportThd{false};
  88 + bool m_bExitDisplayThd{false};
  89 + bool m_bExitDecodeThd{false};
87 90
88 // 读取数据 91 // 读取数据
89 AVStream* stream{nullptr}; 92 AVStream* stream{nullptr};
src/decoder/dvpp/FFRecoderTaskManager.cpp
@@ -92,8 +92,6 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){ @@ -92,8 +92,6 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){
92 newDataPkt->pkt = new_pkt; 92 newDataPkt->pkt = new_pkt;
93 newDataPkt->frame_nb = frame_nb; 93 newDataPkt->frame_nb = frame_nb;
94 94
95 - m_pkt_list.emplace_back(newDataPkt);  
96 -  
97 if(is_key_frame(pkt)){ 95 if(is_key_frame(pkt)){
98 // 越来越大的值 96 // 越来越大的值
99 newDataPkt->isKeyFrame = true; 97 newDataPkt->isKeyFrame = true;
@@ -102,6 +100,8 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){ @@ -102,6 +100,8 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){
102 newDataPkt->isKeyFrame = false; 100 newDataPkt->isKeyFrame = false;
103 } 101 }
104 102
  103 + m_pkt_list.emplace_back(newDataPkt);
  104 +
105 std::lock_guard<std::mutex> l_info(m_recoderinfo_list_mtx); 105 std::lock_guard<std::mutex> l_info(m_recoderinfo_list_mtx);
106 if(m_recoderinfo_list.size() <= 0){ 106 if(m_recoderinfo_list.size() <= 0){
107 // 没有任务的时候,维持500的长度 107 // 没有任务的时候,维持500的长度
src/demo/demo.cpp
@@ -1105,11 +1105,11 @@ void test_gpu(int gpuID){ @@ -1105,11 +1105,11 @@ void test_gpu(int gpuID){
1105 // createTask(handle, algor_vec, 1); 1105 // createTask(handle, algor_vec, 1);
1106 // createTask(handle, algor_vec, 2); 1106 // createTask(handle, algor_vec, 2);
1107 // createTask(handle, algor_vec, 3); 1107 // createTask(handle, algor_vec, 3);
1108 - createTask(handle, algor_vec3, 4);  
1109 - createTask(handle, algor_vec3, 5);  
1110 - createTask(handle, algor_vec3, 6);  
1111 - createTask(handle, algor_vec3, 7);  
1112 - createTask(handle, algor_vec3, 8); 1108 + // createTask(handle, algor_vec3, 4);
  1109 + // createTask(handle, algor_vec3, 5);
  1110 + // createTask(handle, algor_vec3, 6);
  1111 + // createTask(handle, algor_vec3, 7);
  1112 + // createTask(handle, algor_vec3, 8);
1113 // createTask(handle, algor_vec3, 9); 1113 // createTask(handle, algor_vec3, 9);
1114 // createTask(handle, algor_vec3, 10); 1114 // createTask(handle, algor_vec3, 10);
1115 // createTask(handle, algor_vec3, 11); 1115 // createTask(handle, algor_vec3, 11);
@@ -1148,7 +1148,27 @@ void test_gpu(int gpuID){ @@ -1148,7 +1148,27 @@ void test_gpu(int gpuID){
1148 finish_task(handle, (char*)task_id2.data(), 0); 1148 finish_task(handle, (char*)task_id2.data(), 0);
1149 finish_task(handle, (char*)task_id1.data(), 0); 1149 finish_task(handle, (char*)task_id1.data(), 0);
1150 }*/ 1150 }*/
1151 - while (getchar() != 'q'); 1151 +
  1152 + char ch = 'a';
  1153 + while (ch != 'q') {
  1154 + ch = getchar();
  1155 + switch (ch)
  1156 + {
  1157 + case 'a':
  1158 + createTask(handle, algor_vec3, 4, false);
  1159 + createTask(handle, algor_vec3, 5, false);
  1160 + createTask(handle, algor_vec3, 6, false);
  1161 + createTask(handle, algor_vec3, 7, false);
  1162 + createTask(handle, algor_vec3, 8, false);
  1163 + break;
  1164 + case 'c':
  1165 + close_all_task(handle);
  1166 + break;
  1167 + default:
  1168 + break;
  1169 + }
  1170 +
  1171 + }
1152 1172
1153 // finish_task(handle, (char*)task_id.data(), 0); 1173 // finish_task(handle, (char*)task_id.data(), 0);
1154 1174