diff --git a/FFNvDecoder/FFNvDecoder.cpp b/FFNvDecoder/FFNvDecoder.cpp index 1c3bdd0..4d1c4cb 100644 --- a/FFNvDecoder/FFNvDecoder.cpp +++ b/FFNvDecoder/FFNvDecoder.cpp @@ -340,6 +340,10 @@ bool FFNvDecoder::isFinished() return m_bFinished; } +bool FFNvDecoder::isPausing(){ + return m_bPause; +} + bool FFNvDecoder::getResolution( int &width, int &height ) { if (avctx != nullptr) diff --git a/FFNvDecoder/FFNvDecoder.h b/FFNvDecoder/FFNvDecoder.h index 971b036..d306c61 100644 --- a/FFNvDecoder/FFNvDecoder.h +++ b/FFNvDecoder/FFNvDecoder.h @@ -54,6 +54,7 @@ public: bool isRunning(); bool isFinished(); + bool isPausing(); bool getResolution( int &width, int &height ); void setName(string nm); diff --git a/FFNvDecoder/FFNvDecoderManager.cpp b/FFNvDecoder/FFNvDecoderManager.cpp index 2dab4f3..d42715d 100644 --- a/FFNvDecoder/FFNvDecoderManager.cpp +++ b/FFNvDecoder/FFNvDecoderManager.cpp @@ -232,6 +232,40 @@ bool FFNvDecoderManager::isRunning(const string name){ return false; } +bool FFNvDecoderManager::isFinished(const string name){ + if (name.empty()) + { + cout << "name 为空!"<< endl; + return false; + } + + auto dec = decoderMap.find(name); + if (dec != decoderMap.end()) + { + return dec->second->isFinished(); + } + + cout << "没有找到name为" << name << "的解码器!" << endl; + return false; +} + +bool FFNvDecoderManager::isPausing(const string name){ + if (name.empty()) + { + cout << "name 为空!"<< endl; + return false; + } + + auto dec = decoderMap.find(name); + if (dec != decoderMap.end()) + { + return dec->second->isPausing(); + } + + cout << "没有找到name为" << name << "的解码器!" << endl; + return false; +} + bool FFNvDecoderManager::setDecKeyframe(const string name, bool bKeyframe) { if (name.empty()) diff --git a/FFNvDecoder/FFNvDecoderManager.h b/FFNvDecoder/FFNvDecoderManager.h index 7415286..76f46b4 100644 --- a/FFNvDecoder/FFNvDecoderManager.h +++ b/FFNvDecoder/FFNvDecoderManager.h @@ -147,6 +147,24 @@ public: **************************************************/ bool isRunning(const string name); + /************************************************** + * 接口:isFinished + * 功能:根据解码器名称判断解码器是否已经结束 + * 参数:const string name 解码器名称 + * 返回:正在运行返回true,否则返回false + * 备注: + **************************************************/ + bool isFinished(const string name); + + /************************************************** + * 接口:isPausing + * 功能:根据解码器名称判断解码器是否暂停 + * 参数:const string name 解码器名称 + * 返回:正在运行返回true,否则返回false + * 备注: + **************************************************/ + bool isPausing(const string name); + /************************************************** * 接口:count * 功能:获取正在运行的解码器数量 diff --git a/tsl_aiplatform/ai_platform/MultiSourceProcess.cpp b/tsl_aiplatform/ai_platform/MultiSourceProcess.cpp index 6fc232f..137f8d2 100644 --- a/tsl_aiplatform/ai_platform/MultiSourceProcess.cpp +++ b/tsl_aiplatform/ai_platform/MultiSourceProcess.cpp @@ -254,7 +254,8 @@ bool CMultiSourceProcess::add_task_operation(task_param _cur_task_param){ // 保存新添加任务的配置参数 m_task_param_manager->add_task_param(task_id, _cur_task_param); - int input_image_width = 0, input_image_height = 0; + int input_image_width = 0; + int input_image_height = 0; pDecManager->getResolution(config.name, input_image_width, input_image_height); #ifdef WITH_SECOND_PROCESS @@ -299,7 +300,7 @@ void CMultiSourceProcess::startProcessByGpuid(const string gpuid){ ThreadArg thread_arg = {gpuid, this}; pthread_t* processThread = gpuProcessthreadMap[gpuid]; - if(processThread){ + if(processThread == nullptr){ pthread_t* pTread = new pthread_t; pthread_create(pTread,0, [](void* arg) @@ -332,12 +333,12 @@ void CMultiSourceProcess::post_decode_thread(task_param _cur_task_param, AVFrame do{ // TODO 本循环需要一个可以手动终止的开关 m_QueueMtx.lock(); - if(m_queueRgbData.size() >= 40){ + if(m_RgbDataList.size() >= (20 * gpuProcessthreadMap.size() + 20)){ m_QueueMtx.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); continue; } - m_queueRgbData.push(gpuMem); + m_RgbDataList.push_back(gpuMem); m_QueueMtx.unlock(); break; }while (true); @@ -401,21 +402,29 @@ void CMultiSourceProcess::algorthim_process_thread(const string gpuid){ /* step5. 凑齐的解码数据 拼batch */ m_QueueMtx.lock(); - int batch_size = m_queueRgbData.size(); - if(batch_size > 20){ - batch_size = 20; + vector vec_gpuMem; + for (auto iter = m_RgbDataList.begin(); iter!=m_RgbDataList.end(); ){ + GpuRgbMemory* gpuMem = *iter; + if(gpuMem->getGpuId() == gpuid){ + vec_gpuMem.push_back(gpuMem); + iter = m_RgbDataList.erase(iter); + if(vec_gpuMem.size() >= 20){ + break; + } + } else { + ++ iter; + } } - + + int batch_size = vec_gpuMem.size(); + vector task_list; sy_img *batch_img = new sy_img[batch_size]; - vector vec_gpuMem; for (size_t i = 0; i < batch_size; i++){ - GpuRgbMemory* gpuMem = m_queueRgbData.front(); + GpuRgbMemory* gpuMem = vec_gpuMem[i]; batch_img[i].set_data(gpuMem->getWidth(), gpuMem->getHeight(), gpuMem->getChannel(), gpuMem->getMem()); task_list.push_back(gpuMem->getId()); ++task_id_to_n_frame[gpuMem->getId()]; - vec_gpuMem.push_back(gpuMem); - m_queueRgbData.pop(); } m_QueueMtx.unlock(); @@ -1074,4 +1083,47 @@ void CMultiSourceProcess::algorthim_face_detect(vector& task_list, sy_im std::vector().swap(facedet_result); } #endif +} + +/* MQ队列的初始化 */ +int CMultiSourceProcess::AddMqConn(mq_type_t mq_type, rabbitmq_conn_params_t mq_conn_param) { + /* 初始化MQ队列 */ + if (!mq_manager_->add_conn(mq_type, mq_conn_param)) { + LOG_ERROR("Connection MQ failed, ip: {} port: {} uname: {} passwd: {}", mq_conn_param.ip, mq_conn_param.port, + mq_conn_param.uname, mq_conn_param.passwd); + return MQ_CONN_ERROR; + } + + /* 为报警类 绑定回调 传入mq_manager_.publish 内部直接调用*/ + if (mq_type_t::ALARM_MQ == mq_type) + m_save_snapshot_reprocessing->set_callback( + std::bind(&mq::Manager::publish, mq_manager_, mq_type, std::placeholders::_1, true)); + + return SUCCESS; +} + +/* 获取任务的状态 MQ返回 */ +int CMultiSourceProcess::GetTaskStatus(const string taskID) { + + FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance(); + + int status = 0; + if(pDecManager->isPausing(taskID)){ + status = 2; + }else if(pDecManager->isRunning(taskID)){ + status = 1; + } + + std::vector taskids; + std::vector statues; + taskids.emplace_back(taskID); + statues.emplace_back(status); + + if (!taskids.empty()) { + auto json_str = helpers::gen_json::gen_task_status_json(taskids, statues); + // mq_manager_->publish(mq_type_t::GET_TASK_MQ, json_str.c_str()); + mq_manager_->publish(mq_type_t::GET_TASK_MQ, json_str.c_str(),true); + } + + return SUCCESS; } \ No newline at end of file diff --git a/tsl_aiplatform/ai_platform/MultiSourceProcess.h b/tsl_aiplatform/ai_platform/MultiSourceProcess.h index 45d8d23..78677a6 100644 --- a/tsl_aiplatform/ai_platform/MultiSourceProcess.h +++ b/tsl_aiplatform/ai_platform/MultiSourceProcess.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "mvpt_process_assist.h" #include @@ -67,6 +68,7 @@ using namespace cv; using std::map; using std::set; using std::vector; +using std::list; #ifndef _MSC_VER #ifndef TRUE @@ -291,7 +293,7 @@ private: private: - queue m_queueRgbData; + list m_RgbDataList; std::mutex m_QueueMtx; };