/* * @Description: 主流程类 + 任务管理相关操作 */ //#pragma once #ifndef __MutliSourceVideoProcess_H__ #define __MutliSourceVideoProcess_H__ #include #include #include "../FFNvDecoder/FFNvDecoderManager.h" #include "../FFNvDecoder/cuda_kernels.h" #include "VPTProcess.h" #include "common.h" #include "nvml.h" #include "opencv2/highgui/highgui.hpp" #include "time.h" #include #include #include #include #include "mvpt_process_assist.h" #include #include #include #include #include #include "../reprocessing_module/save_snapshot_reprocessing.h" #include "../reprocessing_module/snapshot_reprocessing.h" #include "task_param_manager.h" #ifdef POST_USE_RABBITMQ #include "../reprocessing_module/mq_manager.hpp" #endif #include "../ai_engine_module/ai_engine_module.h" #ifdef WITH_SECOND_PROCESS #include "../ai_engine_module/human_gather_statistics.h" #include "../ai_engine_module/pedestrian_safety_det.hpp" #include "../ai_engine_module/pedestrian_vehicle_retrograde.hpp" #include "../ai_engine_module/pedestrian_vehicle_trespass.hpp" #endif #include "ErrorInfo.h" #include #ifdef WITH_FACE_DET_SS #include "face_det_ai_engine.h" #endif #include "GpuRgbMemory.hpp" #ifdef _MSC_VER #ifdef _DEBUG #pragma comment(lib, "opencv_world310d.lib") #else #pragma comment(lib, "opencv_world310.lib") #endif #endif using namespace cv; // using namespace std; using std::map; using std::set; using std::vector; using std::list; #ifndef _MSC_VER #ifndef TRUE #define TRUE 1 #endif #ifndef FALSE #define FALSE 0 #endif #define Sleep(a) usleep((a)*1000) // typedef int BOOL; #define BOOL bool typedef unsigned int DWORD; typedef void *LPVOID; #endif #ifdef _DEBUG #define DEBUG_MSG(msg, ...) \ { \ printf("%s %s [%d]: ", __FILE__, __FUNCTION__, __LINE__); \ printf(msg, ##__VA_ARGS__); \ printf("\n"); \ } #else #define DEBUG_MSG(msg, ...) #endif #define MAXLENGTH 416 #define MINLENGTH 224 //#define PROCESSHEIGHT 224 //#define PROCESSWIDTH 416 //#define DATASIZE PROCESSWIDTH * PROCESSHEIGHT * 3 #define THREAD_COUNT 30 #define SNAPSHOTFRAME 15 #define LOSTMAXFRAMECCOUNT 4 #define SCALE_OUT 10 // 判断目标框初始位置时,在最小距离的基础上适当外扩 enum TaskState { PLAY, PAUSE, FINISH, DECODEERROR // 解码线程可能报错,报错之后直接结束掉该路解码 }; struct task_resource { DxDecoderWrap *taskcuvid; TaskState task_state; DxGPUFrame task_algorithm_data; }; struct Operator { string changeTaskID; const char *videoFileName; const char *resultFolderLittleName; const char *resultFolderName; int algor_counts; TaskOperator changeTaskOperator; }; struct decode_cbk_userdata{ task_param _cur_task_param; void* opaque; }; class CMultiSourceProcess { public: CMultiSourceProcess(); ~CMultiSourceProcess(); int InitAlgorthim(tsl_aiplatform_param vptParam); void *GetVPT_Handle() const { return VPT_Handle_; }; #ifdef POST_USE_RABBITMQ int AddMqConn(mq_type_t mq_type, rabbitmq_conn_params_t mq_conn_param); int GetTaskStatus(const string taskID); #endif /* task api */ bool add_task_operation(task_param _cur_task_param); int AddOperator(task_param tparam); void AddOperator(string taskID, int taskOper); void OperatorTask(); bool HasNewTask() const { return !TaskOperatorQ.empty(); } void PauseTask(const string taskID); void RestartTask(const string taskID); void FinishTask(const string taskID, const bool delete_snapshot); bool DeleteTaskQ(const string taskID); /* decode api */ void FinishDecode(const string taskID); bool FinishDecode(std::pair &iter); /* sync api */ int WaitAndPauseTask(const string taskID, const int max_timeout_ms); int WaitAndFinishTask(const string taskID, const int max_timeout_ms); int WaitAndRestartTask(const string taskID, const int max_timeout_ms); /* frame process api */ int FinishProcessThread(); int everyframe_process(vector &task_in_play_id, sy_img *images, vector &ol_det_result); public: void algorthim_process_thread(const string gpuid); void post_decode_thread(task_param _cur_task_param, AVFrame * gpuFrame); void decode_finished_thread(task_param _cur_task_param); private: void startProcessByGpuid(const string gpuid); bool task_has_vpt_algor(const std::string &task_id); bool task_has_face_algor(const std::string &task_id); void cuda_free_wrap(sy_img &img); // VPT void algorthim_vpt(vector& task_list, sy_img *batch_img); // 行人安全分析算法 void algorthim_pedestrian_safety(vector& task_list, vector& vpt_interest_imgs, vector& vptResult); // 逆行&非法闯入算法模块 void algorthim_retrograde_trespass(vector& vpt_interest_task_id, vector& vpt_interest_imgs, vector& vptResult ,vector>& deleteObjectID); // for snapshot algorithm. 轨迹结束目标 做最后的结果返回(当前返回算法结果+快照保存路径) void algorithm_snapshot(vector& vpt_interest_task_id, vector> deleteObjectID); // 打架跌倒算法模块 void algorithm_fight_fall(vector& vpt_interest_task_id, vector& vpt_interest_imgs, vector& vptResult); // 外卖员分析模块 void algorithm_takeaway_member_cls(vector& vpt_interest_task_id, vector& vpt_interest_imgs, vector& vptResult); // 人脸检测抓拍算法模块 void algorthim_face_detect(vector& task_list, sy_img *batch_img); // 轨迹结束帧需要做的算法模块 int endframe_obj_process(const OBJ_KEY &obj_key, algorithm_type_t algor_type); /* 实现快照保存功能(还未真正保存 将显存图片cp到内存 * 直接保存本地或者存入缓存队列异步保存,保存方式看需求,报警类需要同步保存报警,分析类可异步保存后返回)*/ bool save_snapshot_process(const OBJ_KEY &obj_key, const algorithm_type_t &algorithm_type, const sy_img &ori_img, const sy_img &roi_img, const long long id, const std::string &json_str, bool enable_async = true, const bool ori_img_is_in_gpu = true, const bool roi_img_is_in_gpu = true); private: map gpuProcessthreadMap; boost::thread ProcessThread; std::mutex _tx_add_task; deque TaskOperatorQ; int capacity; double gpu_total_memory; boost::thread thread_; void *authority_handle{nullptr}; public: /*按道理不应该是public的 但是在线程函数中会用到以下的数据 每个都写一个get函数太过复杂*/ map system_all_tasks_; void *VPT_Handle_{nullptr}; int section_batch_size_; int licence_status_; int thread_status_; int gpu_id_; int AddTaskSucFlag; // 0:初始化状态 1:添加任务成功 -1:添加任务失败 int TaskInPlay; int TotalTask; set TaskInPlayID; map task_id_to_processed_frame_; // vector VPTResult; std::atomic ProcessFlag; bool SourceFlag; unsigned char *imgDataDevice; void *FrameTemp; char *mModeSnapshotVideo; char *mModeSnapshotLittle; string viewTaskID; map> objDelete; FINISH_CALLBACK taskFinishCallbackFunc; OBJECT_INFO_CALLBACK taskObjInfoCallbackFunc; bool beginSaveSnapshot; boost::thread_group saveSnapshotsThreadGroup; std::mutex taskMutex; std::condition_variable taskCondVar; bool AttributionAnalysis; // 用于控制,每帧分析只进行一个二次属性分析 snapshot_reprocessing *m_snapshot_reprocessing{nullptr}; task_param_manager *m_task_param_manager{nullptr}; #ifdef WITH_SECOND_PROCESS //! TODO: use model manager to replace. ai_engine_module::fight_fall_cls::FightfallCls fight_fall_cls_; ai_engine_module::takeaway_member_classification::TakeawayMemberCls takeaway_member_; ai_engine_module::human_gather_statistics::human_gather_statistics m_human_gather_statistics; ai_engine_module::pedestrian_safety_det::PedestrianSafetyDetector pedestrian_safety_detector_; ai_engine_module::pedestrian_vehicle_retrograde::PedestrianVehicleRetrograde pedestrian_vehicle_retrograde_; ai_engine_module::pedestrian_vehicle_trespass::PedestrianVehicleTrespass pedestrian_vehicle_trespass_; #endif #ifdef POST_USE_RABBITMQ mq::Manager *mq_manager_{nullptr}; #endif #ifdef WITH_FACE_DET_SS face_det_ai_engine m_face_det_ai_engine; // 人脸检测 #endif private: base_reprocessing_unit *m_save_snapshot_reprocessing{nullptr}; private: list m_RgbDataList; std::mutex m_QueueMtx; }; static CMultiSourceProcess mainProcess; #if 0 struct CUVID_USERDATA { int id; void* opaque; }; struct CUVID_DATA { float* pData; int nWidth; int nHeight; int nDatasize; }; struct SNAPSHOT_PROCESS_UNIT { vector imgBig; vector imgSmall; vector imgVPTResult; }; struct VideoHeightWidth { double height; double width; }; struct VideoObjectSS { unsigned char* obj_person; unsigned char* obj_bike; unsigned char* obj_car; VideoObjectSS() : obj_person(NULL), obj_bike(NULL), obj_car(NULL) {} }; template class MyAtomic; template bool operator == (MyAtomic& d1, T& d2); template bool operator == (MyAtomic& d1, MyAtomic& d2); template class MyAtomic { public: MyAtomic() {}; MyAtomic(const T& d) { data.store(d); }; MyAtomic(const MyAtomic& d) { data.store(d.data.load()); }; MyAtomic& operator =(T d) { data.store(d); return *this; }; MyAtomic& operator =(MyAtomic& d) { data.store(d.data.load()); return *this; }; MyAtomic& operator +=(T d) { data.fetch_add(d); return *this; }; MyAtomic& operator +=(MyAtomic& d) { data.fetch_add(d); return *this; }; operator int() { return data.load(); } friend bool operator == (MyAtomic& d1, T& d2); friend bool operator == (MyAtomic& d1, MyAtomic& d2); private: std::atomic data; }; template bool operator == (MyAtomic& d1, T& d2) { if (d1.data.load() == d2) return true; else return false; } template bool operator == (MyAtomic& d1, MyAtomic& d2) { if (d1.data.load() == d2.load()) return true; else return false; } struct Task { int taskID; const char* taskFileSource; TaskState taskState; DxDecoderWrap* taskTcuvid; DxGPUFrame task_algorithm_data; //针对新框架不做resize的处理,先暂时用backup的原图大小的图片送进算法参与计算 float* taskDataToRT; bool taskHasBackup; //VideoHeightWidth taskHeightWidthRatio; VideoHeightWidth taskHeightWidth; MyAtomic taskFrameCount; int taskTotalFrameCount; SNAPSHOT_CALLBACK taskObjCallbackFunc; REALTIME_CALLBACK taskRealTimeCallbackFunc; cv::Mat frameImage; char* folderNameLittle; char* folderName; sy_rect task_min_boxsize[DETECTTYPE]; }; #endif #endif