#include "MultiSourceProcess.h" #include "../common/logger.hpp" #include #include #include #include #include #include "../decoder/interface/DecoderManager.h" #include "../decoder/interface/utiltools.hpp" #include "../util/vpc_util.h" #include "../util/crop_process.h" #include "../helpers/time_helper.hpp" #include "../helpers/os_helper.hpp" #include "../helpers/gen_json.hpp" #include "../reprocessing_module/save_snapshot_reprocessing.h" #include "macro_definition.h" #define VEHICLE_MULTI_BOXES using namespace std; struct decode_cbk_userdata{ string task_id; void* opaque; void* opaque1; }; /** * 注意: gpuFrame 在解码器设置的显卡上,后续操作要十分注意这一点,尤其是多线程情况 * */ void post_decod_cbk(const void * userPtr, DeviceMemory* devFrame){ decode_cbk_userdata* ptr = (decode_cbk_userdata*)userPtr; if (ptr!= nullptr) { CMultiSourceProcess* _this = (CMultiSourceProcess*)ptr->opaque; if(nullptr != _this){ _this->decoded_cbk(devFrame); } } } void decode_finished_cbk(const void * userPtr){ decode_cbk_userdata* ptr = (decode_cbk_userdata*)userPtr; if (ptr!= nullptr) { CMultiSourceProcess* _this = (CMultiSourceProcess*)ptr->opaque; if(nullptr != _this){ _this->task_finished(ptr->task_id); } } delete ptr; ptr = nullptr; } CMultiSourceProcess::CMultiSourceProcess(){ aclInit(nullptr); } CMultiSourceProcess::~CMultiSourceProcess(){ dvpp_crop_release(); aclFinalize(); } int CMultiSourceProcess::InitAlgorthim(tsl_aiplatform_param vptParam){ set_default_logger(LogLevel(vptParam.log_level), "multi_source_process", vptParam.log_path, vptParam.log_mem, vptParam.log_mem); LOG_INFO("编译时间:{} {}", __DATE__, __TIME__); skip_frame_ = 5; m_batch_size = 16; m_devId = vptParam.gpuid; VPTProcess_PARAM vparam; vparam.gpuid = m_devId; vparam.max_batch = m_batch_size; vparam.threshold = 0.4; aclrtSetDevice(m_devId); int ret = vpt_process.init(vparam); if (ret < 0){ return ret; } m_task_param_manager = task_param_manager::getInstance(); m_snapshot_reprocessing = snapshot_reprocessing::getInstance(); m_save_snapshot_reprocessing = new save_snapshot_reprocessing(m_devId); dvpp_crop_init(m_devId); m_pAlgorthimThread = new thread([](void* arg) { CMultiSourceProcess* process = (CMultiSourceProcess*)arg ; process->algorthim_process_thread(); return (void*)0; } , this); return 0; } #ifdef POST_USE_RABBITMQ /* MQ队列的初始化 */ int CMultiSourceProcess::AddMqConn(mq_type_t mq_type, rabbitmq_conn_params_t mq_conn_param) { /* 初始化MQ队列 */ if (!mq_manager_->add_conn(mq_type, mq_conn_param)) { LOG_ERROR("Connection MQ failed, ip: {} port: {} uname: {} passwd: {}", mq_conn_param.ip, mq_conn_param.port, mq_conn_param.uname, mq_conn_param.passwd); return MQ_CONN_ERROR; } /* 为报警类 绑定回调 传入mq_manager_.publish 内部直接调用*/ if (mq_type_t::ALARM_MQ == mq_type) m_save_snapshot_reprocessing->set_callback( std::bind(&mq::Manager::publish, mq_manager_, mq_type, std::placeholders::_1, true)); return SUCCESS; } /* 获取任务的状态 MQ返回 */ int CMultiSourceProcess::GetTaskStatus(const string taskID) { DecoderManager* pDecManager = DecoderManager::getInstance(); std::vector taskids; std::vector statues; if(pDecManager->isPausing(taskID)){ taskids.emplace_back(taskID); statues.emplace_back(2); }else if(pDecManager->isRunning(taskID)){ taskids.emplace_back(taskID); statues.emplace_back(1); } if (!taskids.empty()) { auto json_str = helpers::gen_json::gen_task_status_json(taskids, statues); mq_manager_->publish(mq_type_t::GET_TASK_MQ, json_str.c_str(),true); } return SUCCESS; } #endif bool CMultiSourceProcess::AddTask(task_param _cur_task_param){ DecoderManager* pDecManager = DecoderManager::getInstance(); const char* task_id = _cur_task_param.task_id; MgrDecConfig config; config.name = task_id; config.cfg.uri = _cur_task_param.ipc_url; config.cfg.post_decoded_cbk = post_decod_cbk; config.cfg.decode_finished_cbk = decode_finished_cbk; config.cfg.force_tcp = true; // rtsp用tcp config.cfg.gpuid = to_string(m_devId); config.cfg.skip_frame = skip_frame_; if (1 == _cur_task_param.dec_type){ config.cfg.port = _cur_task_param.port; config.dec_type = DECODER_TYPE_GB28181; config.cfg.uri = task_id; if(_cur_task_param.protocal == 0){ // 指定用udp协议 config.cfg.force_tcp = false; } config.cfg.request_stream_cbk = _cur_task_param.gb28181_request_stream_callback ; }else if (2 == _cur_task_param.dec_type){ config.dec_type = DECODER_TYPE_DVPP; }else { config.dec_type = DECODER_TYPE_FFMPEG; } AbstractDecoder* dec = pDecManager->createDecoder(config); if (!dec) { return false; } decode_cbk_userdata* userPtr = new decode_cbk_userdata; userPtr->task_id = string(task_id); userPtr->opaque = this; userPtr->opaque1 = dec; pDecManager->setPostDecArg(config.name, userPtr); pDecManager->setFinishedDecArg(config.name, userPtr); // pDecManager->setDecKeyframe(config.name, true); // 只对关键帧解码 // 保存新添加任务的配置参数 m_task_param_manager->add_task_param(task_id, _cur_task_param); int input_image_width = 0; int input_image_height = 0; pDecManager->getResolution(config.name, input_image_width, input_image_height); LOG_INFO("task_id: {} width: {} height:{}", task_id, input_image_width, input_image_height); // 所有参数都准备好之后再启动解码 bool bStart = pDecManager->startDecodeByName(config.name); if (!bStart){ LOG_INFO("started task {} failed!", config.name); pDecManager->closeDecoderByName(config.name); return false; } // 人车物跟踪 if (task_has_vpt_algor(task_id)) vpt_process.addTaskTracker(task_id, 1, 1, skip_frame_); m_FinishedTaskMtx.lock(); m_FinishedTaskMap[task_id] = false; m_FinishedTaskMtx.unlock(); LOG_INFO("started task {} successed!", config.name); return true; } bool CMultiSourceProcess::task_has_vpt_algor(const std::string &task_id){ //! TODO: create enum iterator. auto algor_map = m_task_param_manager->get_task_other_param(task_id); if (algor_map == nullptr) return false; return (algor_map->find(algorithm_type_t::HUMAN_GATHER) != algor_map->end() || algor_map->find(algorithm_type_t::HUMAN_SNAPSHOT) != algor_map->end() || algor_map->find(algorithm_type_t::NONMOTOR_VEHICLE_SNAPSHOT) != algor_map->end() || algor_map->find(algorithm_type_t::SMOKING_DET) != algor_map->end() || algor_map->find(algorithm_type_t::NO_REFLECTIVE_CLOTHING) != algor_map->end() || algor_map->find(algorithm_type_t::NO_SAFETY_HELMET) != algor_map->end() || algor_map->find(algorithm_type_t::CALL_PHONE_DET) != algor_map->end() || algor_map->find(algorithm_type_t::VEHICLE_SNAPSHOT) != algor_map->end() || algor_map->find(algorithm_type_t::TAKEAWAY_MEMBER_CLASSIFICATION) != algor_map->end() || algor_map->find(algorithm_type_t::PEDESTRIAN_FALL) != algor_map->end() || algor_map->find(algorithm_type_t::PEDESTRIAN_FIGHT) != algor_map->end() || algor_map->find(algorithm_type_t::PEDESTRIAN_RETROGRADE) != algor_map->end() || algor_map->find(algorithm_type_t::VEHICLE_RETROGRADE) != algor_map->end() || algor_map->find(algorithm_type_t::PEDESTRIAN_TRESPASS) != algor_map->end() || algor_map->find(algorithm_type_t::VEHICLE_TRESPASS) != algor_map->end()); } void CMultiSourceProcess::decoded_cbk(DeviceMemory* devFrame){ do{ if(m_bfinish){ break; } m_DataListMtx.lock(); if(m_RgbDataList.size() >= 30){ m_DataListMtx.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(3)); continue; } m_RgbDataList.push_back(devFrame); m_DataListMtx.unlock(); break; }while (true); } void CMultiSourceProcess::task_finished(const string task_id){ std::lock_guard l(m_FinishedTaskMtx); m_FinishedTaskMap[task_id] = true; LOG_INFO("task {} finished!", task_id); } bool CMultiSourceProcess::PauseTask(const string taskID){ return false; } bool CMultiSourceProcess::RestartTask(const string taskID){ return false; } bool CMultiSourceProcess::FinishTask(const string taskID){ DecoderManager* pDecManager = DecoderManager::getInstance(); return pDecManager->closeDecoderByName(taskID); } int CMultiSourceProcess::SnapShot(task_param param){ return 0; } void CMultiSourceProcess::CloseAllTask(){ m_bfinish = true; DecoderManager* pDecManager = DecoderManager::getInstance(); pDecManager->closeAllDecoder(); if(m_pAlgorthimThread){ m_pAlgorthimThread->join(); m_pAlgorthimThread = nullptr; } m_DataListMtx.lock(); while (!m_RgbDataList.empty()){ DeviceMemory* gpuMem = m_RgbDataList.front(); delete gpuMem; gpuMem = nullptr; m_RgbDataList.pop_front(); } m_DataListMtx.unlock(); int size = m_RgbDataList.size(); bool bEmpty = m_RgbDataList.empty(); LOG_INFO("CloseAllTask exit."); } void CMultiSourceProcess::clear_finished_task(){// 清理已经结束的任务 std::lock_guard l1(m_FinishedTaskMtx); std::lock_guard l2(m_DataListMtx); for (auto iter_finished = m_FinishedTaskMap.begin(); iter_finished!=m_FinishedTaskMap.end(); ){ if(iter_finished->second){ // 解码已经结束 // 判断数据对列中是否还有数据 string task_id = iter_finished->first; bool bFinished = true; for (auto iter = m_RgbDataList.begin(); iter!=m_RgbDataList.end(); ++ iter){ DeviceMemory* gpuMem = *iter; if(task_id == gpuMem->getId()){ bFinished = false; break; } } if (bFinished){ // 解码器已经结束,且数据队列中没有改任务的数据,则做最后任务清理工作 finish_task(task_id,false); iter_finished = m_FinishedTaskMap.erase(iter_finished); continue; } } ++ iter_finished; } } bool CMultiSourceProcess::finish_task(const string taskID, const bool delete_snapshot){ // 任务结束,关闭跟踪 if (!vpt_process.finishTaskTracker(taskID)) LOG_ERROR("Finish VPT Tracker failed, task_id: {}", taskID); #ifdef POST_USE_RABBITMQ auto json_str = helpers::gen_json::gen_office_task_heart_beat_json({taskID}); mq_manager_->publish(mq_type_t::HEART_BEAT_MQ, json_str.c_str(), true); #endif m_task_param_manager->delete_task_param(taskID); return true; } int CMultiSourceProcess::algorthim_process_thread(){ LOG_INFO("algorthim_process_thread start..."); ACL_CALL(aclrtSetDevice(m_devId), ACL_ERROR_NONE, 1); aclrtContext ctx; ACL_CALL(aclrtCreateContext(&ctx, m_devId), ACL_ERROR_NONE, 1); while (true){ if(m_bfinish){ break; } clear_finished_task(); vector vec_gpuMem; m_DataListMtx.lock(); while (!m_RgbDataList.empty()){ DeviceMemory* gpuMem = m_RgbDataList.front(); vec_gpuMem.push_back(gpuMem); m_RgbDataList.pop_front(); if(vec_gpuMem.size() >= m_batch_size){ break; } } m_DataListMtx.unlock(); if(vec_gpuMem.size() <= 0){ std::this_thread::sleep_for(std::chrono::milliseconds(3)); continue; } ACL_CALL(aclrtSetCurrentContext(ctx), ACL_ERROR_NONE, 1); algorthim_vpt(vec_gpuMem); for(int i=0;i < vec_gpuMem.size(); i++){ DeviceMemory* mem = vec_gpuMem[i]; if(mem->getSize() <= 0){ continue; } delete mem; mem = nullptr; } vec_gpuMem.clear(); } LOG_INFO("algorthim_process_thread exit."); return 0; } int CMultiSourceProcess::algorthim_vpt(vector vec_gpuMem){ vector vpt_interest_task_id; vector vpt_interest_imgs(0); vector vec_vptMem; for (int i = 0; i < vec_gpuMem.size(); i++) { DeviceMemory* mem = vec_gpuMem[i]; if (!task_has_vpt_algor(mem->getId())){ continue; } sy_img img; img.w_ = mem->getWidth(); img.h_ = mem->getHeight(); img.data_ = mem->getMem(); vpt_interest_imgs.push_back(img); vpt_interest_task_id.push_back(mem->getId()); vec_vptMem.push_back(mem); } /* 待检测的图片不为空 开始检测 */ if (!vpt_interest_imgs.empty()) { vector> deleteObjectID; deleteObjectID.resize(vpt_interest_task_id.size()); vector> unUsedResult; vector vptResult(0); /* 一级检测器,内部已完成跟踪操作 */ vpt_process.process_gpu(vpt_interest_imgs.data(), vpt_interest_task_id, vptResult, deleteObjectID, unUsedResult); // do det & track. m_snapshot_reprocessing->screen_effective_snapshot(vpt_interest_task_id, vptResult); #ifndef VEHICLE_MULTI_BOXES /* 快照优选(内部可实现不同的快照优选策略) */ m_snapshot_reprocessing->update_bestsnapshot(vpt_interest_task_id, vpt_interest_imgs.data(), vptResult, deleteObjectID); #else algorithm_vehicle_relult(vec_vptMem, vptResult, deleteObjectID); // send_locus_finished_msg(vpt_interest_task_id, deleteObjectID); #endif if(vptResult.size() > 0){ cout << vptResult[0].obj_count<< endl; } } return 0; } int CMultiSourceProcess::algorithm_vehicle_relult(vector vec_devMem, vector& vptResult, vector>& delete_object_id) { vector results = m_snapshot_reprocessing->get_vehicle_snapshot(vec_devMem, vptResult, skip_frame_); for (auto &result : results) { if(result.objs.size() <= 0){ continue; } auto task_id = result.task_id; auto task_other_params = m_task_param_manager->get_task_other_param(task_id); const auto &algor_other_params = task_other_params->find(algorithm_type_t::VEHICLE_SNAPSHOT); if (algor_other_params == task_other_params->end()) { LOG_ERROR("[Error] taskId {} not found algor {}", task_id.c_str(), (int)algorithm_type_t::VEHICLE_SNAPSHOT); continue; } const algor_basic_config_param_t *basic_param = algor_other_params->second->basic_param; std::string cur_timestamp_ms = std::to_string(helpers::timer::get_cur_time_ms()); const std::string fpath_origin = basic_param->result_folder + helpers::os::sep + task_id + "_" + std::to_string(result.objs.size()) + "_" + std::to_string(result.id) + "_" + cur_timestamp_ms + ".jpg"; ImgSaveInfo saveInfo; saveInfo.file_path = fpath_origin; saveInfo.img_info = dvpp_devMem2vpcImg(result.memPtr); m_save_snapshot_reprocessing->reprocessing_process_wo_locus_async(saveInfo); vector vec_obj_info_list = dvpp_crop_batch(result.memPtr, result.objs); if(vec_obj_info_list.size() != result.objs.size()){ LOG_ERROR("vpc_crop size error !"); dvpp_imgList_release(vec_obj_info_list); continue; } // 保存抠图并发MQ for(int i =0; i < result.objs.size(); i++){ video_object_info obj = result.objs[i]; std::string cur_timestamp_ms = std::to_string(helpers::timer::get_cur_time_ms()); const std::string fpath_roi = basic_param->result_folder_little + helpers::os::sep + task_id + "_" + std::to_string(obj.object_id) + "_" + cur_timestamp_ms + ".jpg"; video_object_snapshot new_obj_ss_info; new_obj_ss_info.analysisRes = nullptr; new_obj_ss_info.object_id = obj.object_id; new_obj_ss_info.obj_info.set_data(obj.index, obj.confidence, obj.left, obj.top, obj.right, obj.bottom); strcpy(new_obj_ss_info.task_id, task_id.c_str()); strcpy(new_obj_ss_info.video_image_path, fpath_origin.c_str()); strcpy(new_obj_ss_info.snapshot_image_path, fpath_roi.c_str()); new_obj_ss_info.nFinished = 0; string json_str = "1111";//helpers::gen_json::gen_multi_obj_json(algorithm_type_t::VEHICLE_SNAPSHOT, new_obj_ss_info); ImgSaveInfo save_info; save_info.file_path = fpath_roi; save_info.img_info = vec_obj_info_list[i]; save_info.json_str = json_str; m_save_snapshot_reprocessing->reprocessing_process_wo_locus_async(save_info); } vec_obj_info_list.clear(); } return 0; } void CMultiSourceProcess::send_locus_finished_msg(vector& vpt_interest_task_id, vector> deleteObjectID){ auto task_iter = vpt_interest_task_id.begin(); for (int i = 0; i < deleteObjectID.size(); i++, ++task_iter) // loop taskId. { string task_id = *task_iter; for (int &j : deleteObjectID[i]) // loop algor type. { OBJ_KEY obj_key = {task_id, j}; auto task_param_ptr = m_task_param_manager->get_task_algor_param(task_id); auto task_other_param_ptr = m_task_param_manager->get_task_other_param(task_id); // 该路任务开启了抓拍功能 开始抓拍保存;若未开启抓拍,清空显存资源 // if (task_param_ptr->vehicle_algors.find(algorithm_type_t::VEHICLE_SNAPSHOT) != task_param_ptr->vehicle_algors.end()) { // std::lock_guard l(m_total_mutex); // if (m_total_snapshot_info_multi_object.find(obj_key) != m_total_snapshot_info_multi_object.end()) { // video_object_snapshot new_obj_ss_info; // new_obj_ss_info.object_id = j; // new_obj_ss_info.nFinished = 1; // strcpy(new_obj_ss_info.task_id, task_id.c_str()); // string json_str = helpers::gen_json::gen_multi_obj_json(algorithm_type_t::VEHICLE_SNAPSHOT, new_obj_ss_info); // // 通知结束的轨迹 // m_save_snapshot_reprocessing->reprocessing_finish_locus_async(obj_key,json_str); // m_total_snapshot_info_multi_object.erase(obj_key); // } // } } } }