Commit 7d1675e3281776d5458fd791d65903465dcdc13e

Authored by Hu Chunming
1 parent 28ace279

解码器添加解码结束和暂停状态判断接口;

将数据队列改为链表,支持多GPU;
添加AddMqConn 和 GetTaskStatus 接口
FFNvDecoder/FFNvDecoder.cpp
... ... @@ -340,6 +340,10 @@ bool FFNvDecoder::isFinished()
340 340 return m_bFinished;
341 341 }
342 342  
  343 +bool FFNvDecoder::isPausing(){
  344 + return m_bPause;
  345 +}
  346 +
343 347 bool FFNvDecoder::getResolution( int &width, int &height )
344 348 {
345 349 if (avctx != nullptr)
... ...
FFNvDecoder/FFNvDecoder.h
... ... @@ -54,6 +54,7 @@ public:
54 54  
55 55 bool isRunning();
56 56 bool isFinished();
  57 + bool isPausing();
57 58 bool getResolution( int &width, int &height );
58 59  
59 60 void setName(string nm);
... ...
FFNvDecoder/FFNvDecoderManager.cpp
... ... @@ -232,6 +232,40 @@ bool FFNvDecoderManager::isRunning(const string name){
232 232 return false;
233 233 }
234 234  
  235 +bool FFNvDecoderManager::isFinished(const string name){
  236 + if (name.empty())
  237 + {
  238 + cout << "name 为空!"<< endl;
  239 + return false;
  240 + }
  241 +
  242 + auto dec = decoderMap.find(name);
  243 + if (dec != decoderMap.end())
  244 + {
  245 + return dec->second->isFinished();
  246 + }
  247 +
  248 + cout << "没有找到name为" << name << "的解码器!" << endl;
  249 + return false;
  250 +}
  251 +
  252 +bool FFNvDecoderManager::isPausing(const string name){
  253 + if (name.empty())
  254 + {
  255 + cout << "name 为空!"<< endl;
  256 + return false;
  257 + }
  258 +
  259 + auto dec = decoderMap.find(name);
  260 + if (dec != decoderMap.end())
  261 + {
  262 + return dec->second->isPausing();
  263 + }
  264 +
  265 + cout << "没有找到name为" << name << "的解码器!" << endl;
  266 + return false;
  267 +}
  268 +
235 269 bool FFNvDecoderManager::setDecKeyframe(const string name, bool bKeyframe)
236 270 {
237 271 if (name.empty())
... ...
FFNvDecoder/FFNvDecoderManager.h
... ... @@ -147,6 +147,24 @@ public:
147 147 **************************************************/
148 148 bool isRunning(const string name);
149 149  
  150 + /**************************************************
  151 + * 接口:isFinished
  152 + * 功能:根据解码器名称判断解码器是否已经结束
  153 + * 参数:const string name 解码器名称
  154 + * 返回:正在运行返回true,否则返回false
  155 + * 备注:
  156 + **************************************************/
  157 + bool isFinished(const string name);
  158 +
  159 + /**************************************************
  160 + * 接口:isPausing
  161 + * 功能:根据解码器名称判断解码器是否暂停
  162 + * 参数:const string name 解码器名称
  163 + * 返回:正在运行返回true,否则返回false
  164 + * 备注:
  165 + **************************************************/
  166 + bool isPausing(const string name);
  167 +
150 168 /**************************************************
151 169 * 接口:count
152 170 * 功能:获取正在运行的解码器数量
... ...
tsl_aiplatform/ai_platform/MultiSourceProcess.cpp
... ... @@ -254,7 +254,8 @@ bool CMultiSourceProcess::add_task_operation(task_param _cur_task_param){
254 254 // 保存新添加任务的配置参数
255 255 m_task_param_manager->add_task_param(task_id, _cur_task_param);
256 256  
257   - int input_image_width = 0, input_image_height = 0;
  257 + int input_image_width = 0;
  258 + int input_image_height = 0;
258 259 pDecManager->getResolution(config.name, input_image_width, input_image_height);
259 260  
260 261 #ifdef WITH_SECOND_PROCESS
... ... @@ -299,7 +300,7 @@ void CMultiSourceProcess::startProcessByGpuid(const string gpuid){
299 300 ThreadArg thread_arg = {gpuid, this};
300 301  
301 302 pthread_t* processThread = gpuProcessthreadMap[gpuid];
302   - if(processThread){
  303 + if(processThread == nullptr){
303 304 pthread_t* pTread = new pthread_t;
304 305 pthread_create(pTread,0,
305 306 [](void* arg)
... ... @@ -332,12 +333,12 @@ void CMultiSourceProcess::post_decode_thread(task_param _cur_task_param, AVFrame
332 333 do{
333 334 // TODO 本循环需要一个可以手动终止的开关
334 335 m_QueueMtx.lock();
335   - if(m_queueRgbData.size() >= 40){
  336 + if(m_RgbDataList.size() >= (20 * gpuProcessthreadMap.size() + 20)){
336 337 m_QueueMtx.unlock();
337 338 std::this_thread::sleep_for(std::chrono::milliseconds(1));
338 339 continue;
339 340 }
340   - m_queueRgbData.push(gpuMem);
  341 + m_RgbDataList.push_back(gpuMem);
341 342 m_QueueMtx.unlock();
342 343 break;
343 344 }while (true);
... ... @@ -401,21 +402,29 @@ void CMultiSourceProcess::algorthim_process_thread(const string gpuid){
401 402 /* step5. 凑齐的解码数据 拼batch */
402 403 m_QueueMtx.lock();
403 404  
404   - int batch_size = m_queueRgbData.size();
405   - if(batch_size > 20){
406   - batch_size = 20;
  405 + vector<GpuRgbMemory*> vec_gpuMem;
  406 + for (auto iter = m_RgbDataList.begin(); iter!=m_RgbDataList.end(); ){
  407 + GpuRgbMemory* gpuMem = *iter;
  408 + if(gpuMem->getGpuId() == gpuid){
  409 + vec_gpuMem.push_back(gpuMem);
  410 + iter = m_RgbDataList.erase(iter);
  411 + if(vec_gpuMem.size() >= 20){
  412 + break;
  413 + }
  414 + } else {
  415 + ++ iter;
  416 + }
407 417 }
408   -
  418 +
  419 + int batch_size = vec_gpuMem.size();
  420 +
409 421 vector<string> task_list;
410 422 sy_img *batch_img = new sy_img[batch_size];
411   - vector<GpuRgbMemory*> vec_gpuMem;
412 423 for (size_t i = 0; i < batch_size; i++){
413   - GpuRgbMemory* gpuMem = m_queueRgbData.front();
  424 + GpuRgbMemory* gpuMem = vec_gpuMem[i];
414 425 batch_img[i].set_data(gpuMem->getWidth(), gpuMem->getHeight(), gpuMem->getChannel(), gpuMem->getMem());
415 426 task_list.push_back(gpuMem->getId());
416 427 ++task_id_to_n_frame[gpuMem->getId()];
417   - vec_gpuMem.push_back(gpuMem);
418   - m_queueRgbData.pop();
419 428 }
420 429  
421 430 m_QueueMtx.unlock();
... ... @@ -1074,4 +1083,47 @@ void CMultiSourceProcess::algorthim_face_detect(vector&lt;string&gt;&amp; task_list, sy_im
1074 1083 std::vector<onelevel_det_result>().swap(facedet_result);
1075 1084 }
1076 1085 #endif
  1086 +}
  1087 +
  1088 +/* MQ队列的初始化 */
  1089 +int CMultiSourceProcess::AddMqConn(mq_type_t mq_type, rabbitmq_conn_params_t mq_conn_param) {
  1090 + /* 初始化MQ队列 */
  1091 + if (!mq_manager_->add_conn(mq_type, mq_conn_param)) {
  1092 + LOG_ERROR("Connection MQ failed, ip: {} port: {} uname: {} passwd: {}", mq_conn_param.ip, mq_conn_param.port,
  1093 + mq_conn_param.uname, mq_conn_param.passwd);
  1094 + return MQ_CONN_ERROR;
  1095 + }
  1096 +
  1097 + /* 为报警类 绑定回调 传入mq_manager_.publish 内部直接调用*/
  1098 + if (mq_type_t::ALARM_MQ == mq_type)
  1099 + m_save_snapshot_reprocessing->set_callback(
  1100 + std::bind(&mq::Manager::publish, mq_manager_, mq_type, std::placeholders::_1, true));
  1101 +
  1102 + return SUCCESS;
  1103 +}
  1104 +
  1105 +/* 获取任务的状态 MQ返回 */
  1106 +int CMultiSourceProcess::GetTaskStatus(const string taskID) {
  1107 +
  1108 + FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance();
  1109 +
  1110 + int status = 0;
  1111 + if(pDecManager->isPausing(taskID)){
  1112 + status = 2;
  1113 + }else if(pDecManager->isRunning(taskID)){
  1114 + status = 1;
  1115 + }
  1116 +
  1117 + std::vector<std::string> taskids;
  1118 + std::vector<int> statues;
  1119 + taskids.emplace_back(taskID);
  1120 + statues.emplace_back(status);
  1121 +
  1122 + if (!taskids.empty()) {
  1123 + auto json_str = helpers::gen_json::gen_task_status_json(taskids, statues);
  1124 + // mq_manager_->publish(mq_type_t::GET_TASK_MQ, json_str.c_str());
  1125 + mq_manager_->publish(mq_type_t::GET_TASK_MQ, json_str.c_str(),true);
  1126 + }
  1127 +
  1128 + return SUCCESS;
1077 1129 }
1078 1130 \ No newline at end of file
... ...
tsl_aiplatform/ai_platform/MultiSourceProcess.h
... ... @@ -21,6 +21,7 @@
21 21 #include <boost/thread/thread.hpp>
22 22 #include <queue>
23 23 #include <set>
  24 +#include <list>
24 25  
25 26 #include "mvpt_process_assist.h"
26 27 #include <atomic>
... ... @@ -67,6 +68,7 @@ using namespace cv;
67 68 using std::map;
68 69 using std::set;
69 70 using std::vector;
  71 +using std::list;
70 72  
71 73 #ifndef _MSC_VER
72 74 #ifndef TRUE
... ... @@ -291,7 +293,7 @@ private:
291 293  
292 294  
293 295 private:
294   - queue<GpuRgbMemory*> m_queueRgbData;
  296 + list<GpuRgbMemory*> m_RgbDataList;
295 297 std::mutex m_QueueMtx;
296 298 };
297 299  
... ...