Commit d9fc3e828f043799738364fb39b43ead509e3b3c

Authored by Hu Chunming
1 parent beec83ee

recode添加colse功能和mq功能

src/ai_platform/MultiSourceProcess.cpp
... ... @@ -303,6 +303,10 @@ bool CMultiSourceProcess::AddTask(task_param _cur_task_param){
303 303 return false;
304 304 }
305 305  
  306 +#ifdef POST_USE_RABBITMQ
  307 + pDecManager->set_mq_callback(config.name, std::bind(&mq::Manager::publish, mq_manager_, mq_type, std::placeholders::_1, true));
  308 +#endif
  309 +
306 310 // 人车物跟踪
307 311 if (task_has_vpt_algor(task_id))
308 312 vpt_process.addTaskTracker(task_id, 1, 1, skip_frame_);
... ...
src/decoder/dvpp/DvppDecoder.cpp
... ... @@ -245,6 +245,8 @@ void DvppDecoder::close(){
245 245 if(m_read_thread != 0){
246 246 pthread_join(m_read_thread,0);
247 247 }
  248 +
  249 + m_recoderManager.close();
248 250 }
249 251  
250 252 void DvppDecoder::setPostDecArg(const void* postDecArg){
... ... @@ -886,4 +888,8 @@ void DvppDecoder::release_dvpp(){
886 888  
887 889 void DvppDecoder::doRecode(RecoderInfo& recoderInfo) {
888 890 m_recoderManager.create_recode_task2(recoderInfo);
  891 +}
  892 +
  893 +void DvppDecoder::set_mq_callback(mq_callback_t cb) {
  894 + m_recoderManager.set_mq_callback(cb);
889 895 }
890 896 \ No newline at end of file
... ...
src/decoder/dvpp/DvppDecoder.h
... ... @@ -54,6 +54,8 @@ public:
54 54  
55 55 void doRecode(RecoderInfo& recoderInfo);
56 56  
  57 + void set_mq_callback(mq_callback_t cb);
  58 +
57 59 public:
58 60 void doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicDesc *output, unsigned long long frame_nb);
59 61 void doProcessReport();
... ...
src/decoder/dvpp/DvppDecoderApi.cpp
... ... @@ -136,4 +136,10 @@ void DvppDecoderApi::doRecode(RecoderInfo& recoderInfo) {
136 136 if(m_pDecoder != nullptr){
137 137 return m_pDecoder->doRecode(recoderInfo);
138 138 }
  139 +}
  140 +
  141 +void DvppDecoderApi::set_mq_callback(std::function<bool(const char *msg)> mq_publish) {
  142 + if(m_pDecoder != nullptr){
  143 + return m_pDecoder->set_mq_callback(mq_publish);
  144 + }
139 145 }
140 146 \ No newline at end of file
... ...
src/decoder/dvpp/DvppDecoderApi.h
... ... @@ -41,6 +41,8 @@ public:
41 41 void setFinishedDecArg(const void* finishedDecArg);
42 42  
43 43 void doRecode(RecoderInfo& recoderInfo);
  44 +
  45 + void set_mq_callback(std::function<bool(const char *msg)> mq_publish);
44 46 private:
45 47 DvppDecoder* m_pDecoder;
46 48 };
47 49 \ No newline at end of file
... ...
src/decoder/dvpp/FFRecoder.cpp
... ... @@ -123,7 +123,7 @@ bool FFRecoder::init(int w, int h, AVRational time_base, AVCodecContext* avctx,
123 123  
124 124 bool FFRecoder::init(AVRational time_base, AVCodecContext* avctx, const char* outfile_name) {
125 125  
126   - codec_ctx_ = new AVCodecContext();
  126 + codec_ctx_ = (AVCodecContext*)av_malloc(sizeof(AVCodecContext));
127 127 avcodec_copy_context(codec_ctx_, avctx);
128 128 codec_ctx_->time_base = time_base;
129 129  
... ... @@ -244,6 +244,11 @@ void FFRecoder::update_pts(AVPacket* pkt) {
244 244 bool FFRecoder::write_pkt(AVPacket *pkt) {
245 245 char errbuf[64]{ 0 };
246 246  
  247 + // frame_number++;
  248 + // pkt->pts = av_rescale_q(frame_number, codec_ctx_->time_base, out_stream_->time_base);
  249 + // pkt->dts = pkt->pts;
  250 + // pkt->duration = av_rescale_q(1, codec_ctx_->time_base, out_stream_->time_base);
  251 +
247 252 av_packet_rescale_ts(pkt, codec_ctx_->time_base, out_stream_->time_base);
248 253 pkt->stream_index = out_stream_->index;
249 254 update_pts(pkt);
... ...
src/decoder/dvpp/FFRecoder.h
... ... @@ -49,4 +49,6 @@ private:
49 49 bool bFirstFrame;
50 50 int64_t last_src_pts;
51 51 int64_t last_pts;
  52 +
  53 + int64_t frame_number{0};
52 54 };
53 55 \ No newline at end of file
... ...
src/decoder/dvpp/FFRecoderTaskManager.cpp
... ... @@ -56,6 +56,11 @@ bool FFRecoderTaskManager::init(AVRational time_base, AVCodecContext* avctx){
56 56 }
57 57  
58 58 void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){
  59 + if(m_bExit) {
  60 + // 任务退出了就不再缓存数据了
  61 + return;
  62 + }
  63 +
59 64 std::lock_guard<std::mutex> l_pkt(m_pkt_list_mtx);
60 65  
61 66 // 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone
... ... @@ -131,6 +136,14 @@ list&lt;DataPacket*&gt;::iterator FFRecoderTaskManager::getStartIterator(unsigned long
131 136 return it_first;
132 137 }
133 138  
  139 + // auto it_second = m_pkt_list.begin();
  140 + // for(;it_second != m_pkt_list.end(); it_second++) {
  141 + // DataPacket* dataPkt = *it_second;
  142 + // if (dataPkt->frame_nb >= start_frame_nb){
  143 + // return it_second;
  144 + // }
  145 + // }
  146 +
134 147 auto it_second = m_pkt_list.begin();
135 148 for(;it_second != m_pkt_list.end(); it_second++) {
136 149 DataPacket* dataPkt = *it_second;
... ... @@ -179,6 +192,10 @@ void FFRecoderTaskManager::create_recode_task(AVRational time_base, AVCodecConte
179 192 }
180 193  
181 194 void FFRecoderTaskManager::create_recode_task2(RecoderInfo& recoderInfo) {
  195 + if(m_bExit) {
  196 + // 任务退出了就不再接收录制任务
  197 + return;
  198 + }
182 199 save_intask_recoderinfo(recoderInfo);
183 200 }
184 201  
... ... @@ -219,8 +236,12 @@ void FFRecoderTaskManager::recode_thread(RecodeParam recodeParam){
219 236 }
220 237  
221 238 void FFRecoderTaskManager::recode_thread2() {
222   -
  239 + LOG_INFO("recode_thread2 start...");
223 240 while(true) {
  241 + if(m_bExit) {
  242 + break;
  243 + }
  244 +
224 245 m_recoderinfo_list_mtx.lock();
225 246 if(m_recoderinfo_list.size() <= 0){
226 247 m_recoderinfo_list_mtx.unlock();
... ... @@ -258,6 +279,7 @@ void FFRecoderTaskManager::recode_thread2() {
258 279 bool bInit = ffrecoder.init(m_time_base, m_avctx, file_name.c_str());
259 280 if (!bInit) {
260 281 LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb);
  282 + ffrecoder.uninit();
261 283 continue;
262 284 }
263 285 LOG_INFO("record start, pkt_list size: {} id: {}", m_pkt_list.size(), id);
... ... @@ -277,7 +299,34 @@ void FFRecoderTaskManager::recode_thread2() {
277 299 // ffrecoder.flush();
278 300 ffrecoder.uninit();
279 301  
  302 + // 发送mq消息
  303 + if(mq_publish_func) {
  304 + mq_publish_func(recoderinfo.mq_info.c_str());
  305 + }
  306 +
280 307 LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name);
281 308 }
282 309  
  310 + LOG_INFO("recode_thread2 end.");
  311 +}
  312 +
  313 +void FFRecoderTaskManager::close() {
  314 + m_bExit = true;
  315 +
  316 + if (m_recoder_thread) {
  317 + m_recoder_thread->join();
  318 + m_recoder_thread = nullptr;
  319 + }
  320 +
  321 + // 清空数据
  322 + while(!m_pkt_list.empty()) {
  323 + DataPacket* dataPkt = m_pkt_list.front();
  324 + delete dataPkt;
  325 + dataPkt = nullptr;
  326 + m_pkt_list.pop_front();
  327 + }
  328 +}
  329 +
  330 +void FFRecoderTaskManager::set_mq_callback(mq_callback_t cb) {
  331 + mq_publish_func = cb;
283 332 }
284 333 \ No newline at end of file
... ...
src/decoder/dvpp/FFRecoderTaskManager.h
... ... @@ -18,6 +18,8 @@ struct RecodeParam {
18 18 AVCodecContext* avctx;
19 19 };
20 20  
  21 +typedef std::function<bool(const char *msg)> mq_callback_t;
  22 +
21 23 class FFRecoderTaskManager {
22 24 public:
23 25 FFRecoderTaskManager();
... ... @@ -29,6 +31,10 @@ public:
29 31 bool init(AVRational time_base, AVCodecContext* avctx);
30 32 void create_recode_task2(RecoderInfo& recoderInfo);
31 33  
  34 + void close();
  35 +
  36 + void set_mq_callback(mq_callback_t cb);
  37 +
32 38 public:
33 39 void recode_thread(RecodeParam param);
34 40 list<DataPacket*>::iterator getStartIterator(unsigned long long frame_nb);
... ... @@ -59,5 +65,7 @@ private:
59 65 AVRational m_time_base;
60 66 AVCodecContext* m_avctx;
61 67  
62   - thread* m_recoder_thread {nullptr};
  68 + thread* m_recoder_thread{nullptr};
  69 +
  70 + mq_callback_t mq_publish_func;
63 71 };
64 72 \ No newline at end of file
... ...
src/decoder/interface/AbstractDecoder.h
... ... @@ -5,6 +5,8 @@
5 5  
6 6 #include "../../ai_platform/common_header.h"
7 7  
  8 +#include <functional>
  9 +
8 10 using namespace std;
9 11  
10 12 class AbstractDecoder{
... ... @@ -42,6 +44,8 @@ public:
42 44  
43 45 virtual void doRecode(RecoderInfo& recoderInfo) = 0;
44 46  
  47 + virtual void set_mq_callback(std::function<bool(const char *msg)> mq_publish) = 0;
  48 +
45 49 public:
46 50 bool isSnapTime();
47 51  
... ...
src/decoder/interface/DecoderManager.cpp
... ... @@ -536,4 +536,22 @@ void DecoderManager::doRecode(RecoderInfo&amp; recoderInfo) {
536 536  
537 537 LOG_ERROR("没有找到name为{}的解码器",name);
538 538 return;
  539 +}
  540 +
  541 +void DecoderManager::set_mq_callback(const string name, std::function<bool(const char *msg)> mq_publish) {
  542 + if (name.empty()){
  543 + LOG_ERROR("name 为空!");
  544 + return;
  545 + }
  546 +
  547 + std::lock_guard<std::mutex> l(m_mutex);
  548 +
  549 + auto dec = decoderMap.find(name);
  550 + if (dec != decoderMap.end()){
  551 + dec->second->set_mq_callback(mq_publish);
  552 + return;
  553 + }
  554 +
  555 + LOG_ERROR("没有找到name为{}的解码器",name);
  556 + return;
539 557 }
540 558 \ No newline at end of file
... ...
src/decoder/interface/DecoderManager.h
... ... @@ -275,6 +275,8 @@ public:
275 275  
276 276 void doRecode(RecoderInfo& recoderInfo);
277 277  
  278 + void set_mq_callback(const string name, std::function<bool(const char *msg)> mq_publish);
  279 +
278 280 private:
279 281 DecoderManager(){}
280 282  
... ...
src/decoder/test_recoder.cpp
... ... @@ -15,13 +15,18 @@ deque&lt;DeviceMemory*&gt; m_RgbDataList;
15 15 mutex m_DataListMtx;
16 16  
17 17 thread* m_pAlgorthimThread{nullptr};
  18 +thread* m_recodeThread{nullptr};
18 19 bool m_bfinish{false};
19 20 int m_devId = 0;
20 21 const char* task_id = "test0";
21 22 int skip_frame_ = 5;
22 23 int m_batch_size = 20;
23 24  
  25 +deque<RecoderInfo> m_recoderinfo_queue;
  26 +mutex m_recoderinfo_queue_mtx;
  27 +
24 28 void algorthim_process_thread();
  29 +void recode_thread();
25 30 void algorthim_face_detect(vector<DeviceMemory*> vec_gpuMem);
26 31  
27 32 void post_decod_cbk(const void * userPtr, DeviceMemory* devFrame){
... ... @@ -90,6 +95,12 @@ int main(){
90 95 }
91 96 , nullptr);
92 97  
  98 + m_recodeThread = new thread([](void* arg) {
  99 + recode_thread();
  100 + return (void*)0;
  101 + }
  102 + , nullptr);
  103 +
93 104 pDecManager->startDecodeByName(config.name);
94 105  
95 106 while (getchar() != 'q');
... ... @@ -162,10 +173,31 @@ void algorthim_face_detect(vector&lt;DeviceMemory*&gt; vec_gpuMem) {
162 173 recoderInfo.object_id = std::to_string(obj_id);
163 174 recoderInfo.recoderDir = "./res/recode";
164 175 recoderInfo.frame_nb = mem->getFrameNb();
165   - DecoderManager* pDecManager = DecoderManager::getInstance();
166   - pDecManager->doRecode(recoderInfo);
  176 +
  177 + m_recoderinfo_queue_mtx.lock();
  178 + m_recoderinfo_queue.push_back(recoderInfo);
  179 + m_recoderinfo_queue_mtx.unlock();
167 180  
168 181 obj_id++;
169 182  
170 183 }
  184 +}
  185 +
  186 +void recode_thread() {
  187 + while(true) {
  188 +
  189 + m_recoderinfo_queue_mtx.lock();
  190 + if(m_recoderinfo_queue.size() <= 0) {
  191 + m_recoderinfo_queue_mtx.unlock();
  192 + std::this_thread::sleep_for(std::chrono::milliseconds(5));
  193 + continue;
  194 + }
  195 +
  196 + RecoderInfo info = m_recoderinfo_queue.front();
  197 + m_recoderinfo_queue.pop_front();
  198 + m_recoderinfo_queue_mtx.unlock();
  199 +
  200 + DecoderManager* pDecManager = DecoderManager::getInstance();
  201 + pDecManager->doRecode(info);
  202 + }
171 203 }
172 204 \ No newline at end of file
... ...