MultiSourceVideoProcess.h 9.33 KB
/*
 * @Description: 主流程类 + 任务管理相关操作
 */

//#pragma once
#ifndef __MutliSourceVideoProcess_H__
#define __MutliSourceVideoProcess_H__

#include <iostream>
#include <stdlib.h>

#include "../DxDecoder/DxDecoderWrap.h"
#include "../DxDecoder/ImageSaveGPU.h"
#include "VPTProcess.h"
#include "common.h"
#include "nvml.h"
#include "opencv2/highgui/highgui.hpp"
#include "time.h"
#include <boost/thread/thread.hpp>
#include <queue>
#include <set>

#include "mvpt_process_assist.h"
#include <atomic>
#include <boost/thread/thread_pool.hpp>
#include <mutex>
#include <thread>

#include "../reprocessing_module/save_snapshot_reprocessing.h"
#include "../reprocessing_module/snapshot_reprocessing.h"
#include "task_param_manager.h"

#ifdef POST_USE_RABBITMQ
#include "../reprocessing_module/mq_manager.hpp"
#endif

#include "../ai_engine_module/ai_engine_module.h"
#ifdef WITH_SECOND_PROCESS
#include "../ai_engine_module/human_gather_statistics.h"
#include "../ai_engine_module/pedestrian_safety_det.hpp"
#include "../ai_engine_module/pedestrian_vehicle_retrograde.hpp"
#include "../ai_engine_module/pedestrian_vehicle_trespass.hpp"
#endif

#include "ErrorInfo.h"
#include <condition_variable>

#ifdef WITH_FACE_DET_SS
#include "face_det_ai_engine.h"
#endif

#ifdef _MSC_VER
#ifdef _DEBUG
#pragma comment(lib, "opencv_world310d.lib")
#else
#pragma comment(lib, "opencv_world310.lib")
#endif
#endif
using namespace cv;
// using namespace std;
using std::map;
using std::set;
using std::vector;

#ifndef _MSC_VER
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
#define Sleep(a) usleep((a)*1000)
// typedef    int    BOOL;
#define BOOL bool
typedef unsigned int DWORD;
typedef void *LPVOID;
#endif

#ifdef _DEBUG
#define DEBUG_MSG(msg, ...)                                                                                            \
  {                                                                                                                    \
    printf("%s %s [%d]: ", __FILE__, __FUNCTION__, __LINE__);                                                          \
    printf(msg, ##__VA_ARGS__);                                                                                        \
    printf("\n");                                                                                                      \
  }
#else
#define DEBUG_MSG(msg, ...)
#endif

#define MAXLENGTH 416
#define MINLENGTH 224

//#define PROCESSHEIGHT 224
//#define PROCESSWIDTH 416
//#define DATASIZE PROCESSWIDTH * PROCESSHEIGHT * 3
#define THREAD_COUNT 30
#define SNAPSHOTFRAME 15
#define LOSTMAXFRAMECCOUNT 4

#define SCALE_OUT 10 // 判断目标框初始位置时,在最小距离的基础上适当外扩

enum TaskState {
  PLAY,
  PAUSE,
  FINISH,
  DECODEERROR // 解码线程可能报错,报错之后直接结束掉该路解码
};

struct task_resource {
  DxDecoderWrap *taskcuvid;
  TaskState task_state;
  DxGPUFrame task_algorithm_data;
};

struct Operator {
  string changeTaskID;
  const char *videoFileName;
  const char *resultFolderLittleName;
  const char *resultFolderName;
  int algor_counts;
  TaskOperator changeTaskOperator;
};

class CMultiSourceVideoProcess {
public:
  CMultiSourceVideoProcess();
  ~CMultiSourceVideoProcess();

  int InitAlgorthim(tsl_aiplatform_param vptParam);
  void *GetVPT_Handle() const {
    return VPT_Handle_;
  };

#ifdef POST_USE_RABBITMQ
  int AddMqConn(mq_type_t mq_type, rabbitmq_conn_params_t mq_conn_param);
  int GetTaskStatus(const string taskID);
#endif

  /* task api */
  bool add_task_operation(task_param _cur_task_param);

  int AddOperator(task_param tparam);
  void AddOperator(string taskID, int taskOper);

  void OperatorTask();
  bool HasNewTask() const {
    return !TaskOperatorQ.empty();
  }
  void PauseTask(const string taskID);
  void RestartTask(const string taskID);
  void FinishTask(const string taskID, const bool delete_snapshot);
  bool DeleteTaskQ(const string taskID);

  /* decode api */
  void FinishDecode(const string taskID);
  bool FinishDecode(std::pair<const std::string, task_resource> &iter);

  /* sync api */
  int WaitAndPauseTask(const string taskID, const int max_timeout_ms);
  int WaitAndFinishTask(const string taskID, const int max_timeout_ms);
  int WaitAndRestartTask(const string taskID, const int max_timeout_ms);

  /* frame process api */
  int FinishProcessThread();
  int endframe_obj_process(const OBJ_KEY &obj_key, algorithm_type_t algor_type);
  int everyframe_process(set<string> &task_in_play_id, sy_img *images, vector<onelevel_det_result> &ol_det_result);
  bool save_snapshot_process(const OBJ_KEY &obj_key, const algorithm_type_t &algorithm_type, const sy_img &ori_img,
                             const sy_img &roi_img, const long long id, const std::string &json_str,
                             bool enable_async = true, const bool ori_img_is_in_gpu = true,
                             const bool roi_img_is_in_gpu = true);

private:
  boost::thread ProcessThread;
  std::mutex _tx_add_task;

  deque<Operator> TaskOperatorQ;
  int capacity;

  double gpu_total_memory;
  boost::thread thread_;
  void *authority_handle{nullptr};

public: /*按道理不应该是public的 但是在线程函数中会用到以下的数据 每个都写一个get函数太过复杂*/
  map<string, task_resource> system_all_tasks_;

  void *VPT_Handle_{nullptr};

  int section_batch_size_;
  int licence_status_;
  int thread_status_;
  int gpu_id_;

  int AddTaskSucFlag; // 0:初始化状态 1:添加任务成功 -1:添加任务失败
  int TaskInPlay;
  int TotalTask;
  set<string> TaskInPlayID;
  map<string, uint> task_id_to_processed_frame_;

  // vector<onelevel_det_result> VPTResult;
  std::atomic<bool> ProcessFlag;
  bool SourceFlag;
  unsigned char *imgDataDevice;
  void *FrameTemp;
  char *mModeSnapshotVideo;
  char *mModeSnapshotLittle;
  string viewTaskID;

  map<int, set<int>> objDelete;

  FINISH_CALLBACK taskFinishCallbackFunc;
  OBJECT_INFO_CALLBACK taskObjInfoCallbackFunc;

  bool beginSaveSnapshot;
  boost::thread_group saveSnapshotsThreadGroup;
  std::mutex taskMutex;
  std::condition_variable taskCondVar;
  bool AttributionAnalysis; // 用于控制,每帧分析只进行一个二次属性分析

  snapshot_reprocessing *m_snapshot_reprocessing{nullptr};
  task_param_manager *m_task_param_manager{nullptr};

#ifdef WITH_SECOND_PROCESS
  //! TODO: use model manager to replace.
  ai_engine_module::fight_fall_cls::FightfallCls fight_fall_cls_;
  ai_engine_module::takeaway_member_classification::TakeawayMemberCls takeaway_member_;
  ai_engine_module::human_gather_statistics::human_gather_statistics m_human_gather_statistics;
  ai_engine_module::pedestrian_safety_det::PedestrianSafetyDetector pedestrian_safety_detector_;
  ai_engine_module::pedestrian_vehicle_retrograde::PedestrianVehicleRetrograde pedestrian_vehicle_retrograde_;
  ai_engine_module::pedestrian_vehicle_trespass::PedestrianVehicleTrespass pedestrian_vehicle_trespass_;
#endif

#ifdef POST_USE_RABBITMQ
  mq::Manager *mq_manager_{nullptr};
#endif

#ifdef WITH_FACE_DET_SS
  face_det_ai_engine m_face_det_ai_engine; // 人脸检测
#endif

private:
  base_reprocessing_unit *m_save_snapshot_reprocessing{nullptr};
};

static CMultiSourceVideoProcess mainProcess;

#if 0

struct CUVID_USERDATA {
	int		id;
	void* opaque;
};

struct CUVID_DATA {
	float* pData;
	int		nWidth;
	int		nHeight;
	int		nDatasize;
};


struct SNAPSHOT_PROCESS_UNIT
{
	vector<DxGPUFrame> imgBig;
	vector<DxGPUFrame> imgSmall;
	vector<onelevel_det_result> imgVPTResult;
};


struct VideoHeightWidth
{
	double height;
	double width;
};

struct VideoObjectSS
{
	unsigned char* obj_person;
	unsigned char* obj_bike;
	unsigned char* obj_car;

	VideoObjectSS() : obj_person(NULL), obj_bike(NULL), obj_car(NULL) {}

};
template <typename T> class MyAtomic;
template<typename T>
bool operator == (MyAtomic<T>& d1, T& d2);

template<typename T>
bool operator == (MyAtomic<T>& d1, MyAtomic<T>& d2);
template <typename T>
class MyAtomic
{
public:
	MyAtomic() {};
	MyAtomic(const T& d) { data.store(d); };
	MyAtomic(const MyAtomic& d) { data.store(d.data.load()); };
	MyAtomic& operator =(T d) { data.store(d); return *this; };
	MyAtomic& operator =(MyAtomic& d) { data.store(d.data.load()); return *this; };
	MyAtomic& operator +=(T d) { data.fetch_add(d); return *this; };
	MyAtomic& operator +=(MyAtomic& d) { data.fetch_add(d); return *this; };
	operator int() { return data.load(); }
	friend bool operator ==<T> (MyAtomic<T>& d1, T& d2);
	friend bool operator ==<T> (MyAtomic<T>& d1, MyAtomic<T>& d2);
private:
	std::atomic<T> data;
};
template<typename T>
bool operator == (MyAtomic<T>& d1, T& d2)
{
	if (d1.data.load() == d2)
		return true;
	else
		return false;
}

template<typename T>
bool operator == (MyAtomic<T>& d1, MyAtomic<T>& d2)
{
	if (d1.data.load() == d2.load())
		return true;
	else
		return false;
}

struct Task {
	int taskID;
	const char* taskFileSource;
	TaskState taskState;
	DxDecoderWrap* taskTcuvid;
	DxGPUFrame task_algorithm_data;      //针对新框架不做resize的处理,先暂时用backup的原图大小的图片送进算法参与计算
	float* taskDataToRT;
	bool taskHasBackup;
	//VideoHeightWidth taskHeightWidthRatio;
	VideoHeightWidth taskHeightWidth;
	MyAtomic<int> taskFrameCount;
	int taskTotalFrameCount;
	SNAPSHOT_CALLBACK taskObjCallbackFunc;
	REALTIME_CALLBACK taskRealTimeCallbackFunc;
	cv::Mat frameImage;
	char* folderNameLittle;
	char* folderName;
	sy_rect task_min_boxsize[DETECTTYPE];
};
#endif

#endif