diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7cc22e6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +bin/test_recoder +.vscode/launch.json +bin/logs/* +bin/res/* \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..10cb357 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,63 @@ +{ + // 使用 IntelliSense 了解相关属性。 + // 悬停以查看现有属性的描述。 + // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "dvpp", + "type": "cppdbg", + "request": "launch", + "program": "${workspaceFolder}/bin/vpt_proj", + "args": ["/opt/cmhu/data/Street.uvf","0", "0", "1"], + "stopAtEntry": false, + "cwd": "${workspaceFolder}/bin", + "environment": [], + "externalConsole": false, + "MIMode": "gdb", + "setupCommands": [ + { + "description": "Enable pretty-printing for gdb", + "text": "-enable-pretty-printing", + "ignoreFailures": true + } + ] + },{ + "name": "test", + "type": "cppdbg", + "request": "launch", + "program": "${workspaceFolder}/bin/test", + "args": ["/opt/cmhu/data/Street.uvf","0", "0", "0"], + "stopAtEntry": false, + "cwd": "${workspaceFolder}/bin", + "environment": [], + "externalConsole": false, + "MIMode": "gdb", + "setupCommands": [ + { + "description": "Enable pretty-printing for gdb", + "text": "-enable-pretty-printing", + "ignoreFailures": true + } + ] + },{ + "name": "test_recoder", + "type": "cppdbg", + "request": "launch", + "program": "${workspaceFolder}/bin/test_recoder", + "args": ["/opt/cmhu/data/Street.uvf","0", "0", "0"], + "stopAtEntry": false, + "cwd": "${workspaceFolder}/bin", + "environment": [], + "externalConsole": false, + "MIMode": "gdb", + "setupCommands": [ + { + "description": "Enable pretty-printing for gdb", + "text": "-enable-pretty-printing", + "ignoreFailures": true + } + ] + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..c68926a --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,67 @@ +{ + "files.associations": { + "thread": "cpp", + "chrono": "cpp", + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "*.tcc": "cpp", + "bitset": "cpp", + "cctype": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "complex": "cpp", + "condition_variable": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "list": "cpp", + "map": "cpp", + "set": "cpp", + "unordered_map": "cpp", + "vector": "cpp", + "exception": "cpp", + "algorithm": "cpp", + "functional": "cpp", + "iterator": "cpp", + "memory": "cpp", + "memory_resource": "cpp", + "numeric": "cpp", + "optional": "cpp", + "random": "cpp", + "ratio": "cpp", + "regex": "cpp", + "string": "cpp", + "string_view": "cpp", + "system_error": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "utility": "cpp", + "fstream": "cpp", + "future": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "limits": "cpp", + "mutex": "cpp", + "new": "cpp", + "ostream": "cpp", + "shared_mutex": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "streambuf": "cpp", + "cfenv": "cpp", + "cinttypes": "cpp", + "typeinfo": "cpp", + "variant": "cpp" + } +} \ No newline at end of file diff --git a/src/ai_platform/common_header.h b/src/ai_platform/common_header.h index bec3a10..88f1dea 100755 --- a/src/ai_platform/common_header.h +++ b/src/ai_platform/common_header.h @@ -3,6 +3,7 @@ #include #include +#include struct point_t { int x, y; @@ -31,9 +32,10 @@ struct box_t { }; struct RecoderInfo { - string task_id; - string object_id; - bool bSave; + std::string recoderDir; + std::string task_id; + std::string object_id; + unsigned long long frame_nb; }; #endif // ___COMMON_HEADER_H__ \ No newline at end of file diff --git a/src/decoder/AbstractDecoder.o b/src/decoder/AbstractDecoder.o deleted file mode 100644 index 51bf2f4..0000000 --- a/src/decoder/AbstractDecoder.o +++ /dev/null diff --git a/src/decoder/Makefile b/src/decoder/Makefile index 9de7b05..adae0ec 100755 --- a/src/decoder/Makefile +++ b/src/decoder/Makefile @@ -1,12 +1,12 @@ XX = g++ -PROJECT_ROOT= /opt/cmhu/vpt_ascend +PROJECT_ROOT= /opt/cmhu/vpt_ascend_arm DEPEND_DIR = $(PROJECT_ROOT)/bin SRC_ROOT = $(PROJECT_ROOT)/src -TARGET= $(PROJECT_ROOT)/bin/test_face.so +TARGET= $(PROJECT_ROOT)/bin/test_recoder THIRDPARTY_ROOT = $(PROJECT_ROOT)/3rdparty SPDLOG_ROOT = $(THIRDPARTY_ROOT)/spdlog-1.9.2/release @@ -32,8 +32,7 @@ lib_dir=-L/usr/local/Ascend/ascend-toolkit/6.3.RC1/runtime/lib64 \ lib=-lacl_dvpp -lascendcl -lacl_dvpp_mpi -lruntime -lascendalog -lc_sec -lmsprofiler -lgert -lmmpa -lascend_hal -lexe_graph -lge_executor -lgraph -lprofapi -lascend_protobuf -lerror_manager -lhybrid_executor -lregister -ldavinci_executor -lge_common -lge_common_base \ -lplatform -lgraph_base -lqos_manager -LIBS= -L $(DEPEND_DIR) -lface_det_arm_with_log1\ - -L $(FFMPEG_ROOT)/lib -lavformat -lavcodec -lswscale -lavutil -lavfilter -lswresample -lavdevice \ +LIBS= -L $(FFMPEG_ROOT)/lib -lavformat -lavcodec -lswscale -lavutil -lavfilter -lswresample -lavdevice \ CXXFLAGS= -g -O0 -fPIC $(include_dir) $(lib_dir) $(lib) $(LIBS) $(DEFS) -lpthread -lrt -lz -fexceptions -std=c++11 -D_GLIBCXX_USE_CXX11_ABI=0 -fvisibility=hidden -Wall -Wno-deprecated -Wdeprecated-declarations -Wl,-Bsymbolic -ldl diff --git a/src/decoder/dvpp/DvppDataMemory.hpp b/src/decoder/dvpp/DvppDataMemory.hpp index 2ebe58a..dd73856 100755 --- a/src/decoder/dvpp/DvppDataMemory.hpp +++ b/src/decoder/dvpp/DvppDataMemory.hpp @@ -7,8 +7,8 @@ using namespace std; class DvppDataMemory : public DeviceMemory { public: - DvppDataMemory(int _channel, int _width, int _width_stride, int _height, int _height_stride, int _size, string _id, string _dev_id, bool _key_frame) - :DeviceMemory(_channel, _width, _width_stride, _height, _height_stride, _id, _dev_id, _key_frame, false){ + DvppDataMemory(int _channel, int _width, int _width_stride, int _height, int _height_stride, int _size, string _id, string _dev_id, bool _key_frame, unsigned long long frame_nb) + :DeviceMemory(_channel, _width, _width_stride, _height, _height_stride, _id, _dev_id, _key_frame, frame_nb, false){ data_size = _size; int ret = acldvppMalloc((void **)&pHwRgb, data_size); if(ret != ACL_ERROR_NONE){ @@ -16,8 +16,8 @@ public: } } - DvppDataMemory( int _width, int _width_stride, int _height, int _height_stride, int _size, string _id, string _dev_id, bool _key_frame, unsigned char * pHwData) - :DeviceMemory(3, _width, _width_stride, _height, _height_stride, _id, _dev_id, _key_frame, false){ + DvppDataMemory( int _width, int _width_stride, int _height, int _height_stride, int _size, string _id, string _dev_id, bool _key_frame, unsigned long long frame_nb, unsigned char * pHwData) + :DeviceMemory(3, _width, _width_stride, _height, _height_stride, _id, _dev_id, _key_frame, frame_nb, false){ data_size = _size; data_type = 1; pHwRgb = pHwData; diff --git a/src/decoder/dvpp/DvppDecoder.cpp b/src/decoder/dvpp/DvppDecoder.cpp index d30e5ba..19a364a 100755 --- a/src/decoder/dvpp/DvppDecoder.cpp +++ b/src/decoder/dvpp/DvppDecoder.cpp @@ -4,12 +4,14 @@ struct Vdec_CallBack_UserData { uint64_t frameId; + unsigned long long frame_nb; long startTime; long sendTime; // void* vdecOutputBuf; DvppDecoder* self; Vdec_CallBack_UserData() { frameId = 0; + frame_nb = 0; } }; @@ -148,6 +150,8 @@ AVCodecContext* DvppDecoder::init_FFmpeg(FFDecConfig config){ pix_fmt = (AVPixelFormat)codecpar->format; m_fps = av_q2d(stream ->avg_frame_rate); + m_recoderManager.init(stream->time_base, avctx); + LOG_INFO("[{}]- init ffmpeg success! input:{} frame_width:{} frame_height:{} fps:{} ", m_dec_name, input_file, frame_width, frame_height, m_fps); return avctx; @@ -336,6 +340,7 @@ void DvppDecoder::read_thread() { ,this); AVPacket* pkt = nullptr; + unsigned long long frame_nb = 0; while (m_bRunning){ if (!m_bReal){ @@ -372,6 +377,9 @@ void DvppDecoder::read_thread() { if (video_index == pkt->stream_index){ + frame_nb++; + m_recoderManager.cache_pkt(pkt, frame_nb); + ret = av_bsf_send_packet(h264bsfc, pkt); if(ret < 0) { LOG_ERROR("[{}]- av_bsf_send_packet error!", m_dec_name); @@ -392,7 +400,10 @@ void DvppDecoder::read_thread() { } m_pktQueue_mutex.lock(); - m_pktQueue.push(pkt); + DataPacket* data_pkt = new DataPacket(); + data_pkt->pkt = pkt; + data_pkt->frame_nb = frame_nb; + m_pktQueue.push(data_pkt); m_pktQueue_mutex.unlock(); bPushed = true; @@ -418,9 +429,9 @@ void DvppDecoder::read_thread() { m_pktQueue_mutex.lock(); while(m_pktQueue.size() > 0){ - pkt = m_pktQueue.front(); - av_packet_free(&pkt); - pkt = nullptr; + DataPacket* data_pkt = m_pktQueue.front(); + delete data_pkt; + data_pkt = nullptr; m_pktQueue.pop(); } m_pktQueue_mutex.unlock(); @@ -481,14 +492,14 @@ static void VdecCallback(acldvppStreamDesc *input, acldvppPicDesc *output, void DvppDecoder* self = userData->self; if(self != nullptr){ - self->doVdppVdecCallBack(input, output); + self->doVdppVdecCallBack(input, output, userData->frame_nb); } delete userData; userData = nullptr; } } -void DvppDecoder::doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicDesc *output){ +void DvppDecoder::doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicDesc *output, unsigned long long frame_nb){ m_vdecQueue_mutex.lock(); if(m_vdecQueue.size() > 0){ @@ -519,7 +530,7 @@ void DvppDecoder::doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicDesc *o } if(width > 0 && height > 0 && outputSize > 0){ - DvppDataMemory* mem = new DvppDataMemory(width, width_stride, height, height_stride, outputSize, m_dec_name, to_string(m_dvpp_deviceId), false, (unsigned char *)outputDataDev); + DvppDataMemory* mem = new DvppDataMemory(width, width_stride, height, height_stride, outputSize, m_dec_name, to_string(m_dvpp_deviceId), false, frame_nb, (unsigned char *)outputDataDev); if(mem){ if(post_decoded_cbk) { post_decoded_cbk(m_postDecArg, mem); @@ -532,7 +543,7 @@ void DvppDecoder::doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicDesc *o // 缓存snapshot std::unique_lock locker(m_cached_mutex); - m_cached_mem = new DvppDataMemory(-1, width, width_stride, height, height_stride, outputSize, m_dec_name, to_string(m_dvpp_deviceId), false); + m_cached_mem = new DvppDataMemory(-1, width, width_stride, height, height_stride, outputSize, m_dec_name, to_string(m_dvpp_deviceId), false, 0); if(m_cached_mem != nullptr){ aclrtMemcpy(m_cached_mem->getMem(), outputSize, (unsigned char *)outputDataDev, outputSize, ACL_MEMCPY_DEVICE_TO_DEVICE); } @@ -698,12 +709,12 @@ void DvppDecoder::decode_thread(){ LOG_INFO("[{}]- decode thread exit.", m_dec_name); } -#include -#include -#include +// #include +// #include +// #include -static int nRecoder = 0; -std::ofstream outfile; +// static int nRecoder = 0; +// std::ofstream outfile; int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_count){ @@ -716,14 +727,14 @@ int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_c } m_vdecQueue_mutex.unlock(); - AVPacket * pkt = nullptr; + DataPacket * data_pkt = nullptr; m_pktQueue_mutex.lock(); if(m_pktQueue.size() <= 0){ m_pktQueue_mutex.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); return 1; } - pkt = m_pktQueue.front(); + data_pkt = m_pktQueue.front(); m_pktQueue.pop(); m_pktQueue_mutex.unlock(); @@ -732,8 +743,8 @@ int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_c int ret = acldvppMalloc((void **)&vdecInputbuf, g_pkt_size); if(ACL_ERROR_NONE != ret){ LOG_ERROR("[{}]- acldvppMalloc failed!, ret:{}", m_dec_name, ret); - av_packet_free(&pkt); - pkt = nullptr; + delete data_pkt; + data_pkt = nullptr; return 2; } @@ -754,12 +765,12 @@ int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_c // } - + AVPacket* pkt = data_pkt->pkt; ret = aclrtMemcpy(vdecInputbuf, pkt->size, pkt->data, pkt->size, ACL_MEMCPY_HOST_TO_DEVICE); if(ACL_ERROR_NONE != ret){ LOG_ERROR("[{}]- aclrtMemcpy failed", m_dec_name); - av_packet_free(&pkt); - pkt = nullptr; + delete data_pkt; + data_pkt = nullptr; return 2; } @@ -767,8 +778,8 @@ int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_c ret = acldvppMalloc((void **)&vdecOutputBuf, m_vdec_out_size); if(ret != ACL_ERROR_NONE){ LOG_ERROR("[{}]- acldvppMalloc failed", m_dec_name); - av_packet_free(&pkt); - pkt = nullptr; + delete data_pkt; + data_pkt = nullptr; return 2; } @@ -793,12 +804,13 @@ int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_c Vdec_CallBack_UserData *user_data = NULL; user_data = new Vdec_CallBack_UserData; user_data->frameId = frame_count; + user_data->frame_nb = data_pkt->frame_nb; // user_data->startTime = startTime; user_data->sendTime = UtilTools::get_cur_time_ms(); user_data->self = this; ret = aclvdecSendFrame(vdecChannelDesc, input_stream_desc, output_pic_desc, nullptr, reinterpret_cast(user_data)); - av_packet_free(&pkt); - pkt = nullptr; + delete data_pkt; + data_pkt = nullptr; if(ret != ACL_ERROR_NONE){ delete user_data; user_data = nullptr; @@ -813,9 +825,9 @@ int DvppDecoder::sentFrame(aclvdecChannelDesc *vdecChannelDesc, uint64_t frame_c return 0; }while (0); - if(pkt != nullptr){ - av_packet_free(&pkt); - pkt = nullptr; + if(data_pkt != nullptr){ + delete data_pkt; + data_pkt = nullptr; } // 报错情形 @@ -870,4 +882,8 @@ void DvppDecoder::release_dvpp(){ DvppSourceManager* pSrcMgr = DvppSourceManager::getInstance(); pSrcMgr->releaseChannel(m_dvpp_deviceId, m_dvpp_channel); +} + +void DvppDecoder::doRecode(RecoderInfo& recoderInfo) { + m_recoderManager.create_recode_task2(recoderInfo); } \ No newline at end of file diff --git a/src/decoder/dvpp/DvppDecoder.h b/src/decoder/dvpp/DvppDecoder.h index 2eea078..b460ead 100755 --- a/src/decoder/dvpp/DvppDecoder.h +++ b/src/decoder/dvpp/DvppDecoder.h @@ -8,6 +8,7 @@ #include #include +#include "FFRecoderTaskManager.h" using namespace std; @@ -51,8 +52,10 @@ public: int getCachedQueueLength(); + void doRecode(RecoderInfo& recoderInfo); + public: - void doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicDesc *output); + void doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicDesc *output, unsigned long long frame_nb); void doProcessReport(); private: @@ -96,7 +99,7 @@ private: bool m_dec_keyframe; mutex m_pktQueue_mutex; - queue m_pktQueue; + queue m_pktQueue; // 解码 int m_dvpp_deviceId {-1}; @@ -118,4 +121,6 @@ private: DvppDataMemory* m_cached_mem{nullptr}; mutex m_cached_mutex; condition_variable m_cached_cond; + + FFRecoderTaskManager m_recoderManager; }; \ No newline at end of file diff --git a/src/decoder/dvpp/DvppDecoderApi.cpp b/src/decoder/dvpp/DvppDecoderApi.cpp index d7b9f68..0a2e526 100755 --- a/src/decoder/dvpp/DvppDecoderApi.cpp +++ b/src/decoder/dvpp/DvppDecoderApi.cpp @@ -130,4 +130,10 @@ void DvppDecoderApi::setFinishedDecArg(const void* finishedDecArg){ if(m_pDecoder != nullptr){ return m_pDecoder->setFinishedDecArg(finishedDecArg); } +} + +void DvppDecoderApi::doRecode(RecoderInfo& recoderInfo) { + if(m_pDecoder != nullptr){ + return m_pDecoder->doRecode(recoderInfo); + } } \ No newline at end of file diff --git a/src/decoder/dvpp/DvppDecoderApi.h b/src/decoder/dvpp/DvppDecoderApi.h index 3f16555..5c02823 100755 --- a/src/decoder/dvpp/DvppDecoderApi.h +++ b/src/decoder/dvpp/DvppDecoderApi.h @@ -39,6 +39,8 @@ public: void setPostDecArg(const void* postDecArg); void setFinishedDecArg(const void* finishedDecArg); + + void doRecode(RecoderInfo& recoderInfo); private: DvppDecoder* m_pDecoder; }; \ No newline at end of file diff --git a/src/decoder/dvpp/FFRecoder.cpp1 b/src/decoder/dvpp/FFRecoder.cpp index 087cc1b..13546cc 100644 --- a/src/decoder/dvpp/FFRecoder.cpp1 +++ b/src/decoder/dvpp/FFRecoder.cpp @@ -3,14 +3,6 @@ #include #include #include -extern "C" { - #include - #include - #include - #include - #include - #include -} FFRecoder::FFRecoder() @@ -156,6 +148,8 @@ bool FFRecoder::init(AVRational time_base, AVCodecContext* avctx, const char* ou fprintf(stderr, "Write header failed!\n"); return false; } + + return true; } void FFRecoder::uninit() @@ -248,6 +242,8 @@ void FFRecoder::update_pts(AVPacket* pkt) { } bool FFRecoder::write_pkt(AVPacket *pkt) { + char errbuf[64]{ 0 }; + av_packet_rescale_ts(pkt, codec_ctx_->time_base, out_stream_->time_base); pkt->stream_index = out_stream_->index; update_pts(pkt); diff --git a/src/decoder/dvpp/FFRecoder.h1 b/src/decoder/dvpp/FFRecoder.h index 3339f3a..8dbcd71 100644 --- a/src/decoder/dvpp/FFRecoder.h1 +++ b/src/decoder/dvpp/FFRecoder.h @@ -1,13 +1,14 @@ #pragma once #include -class AVFrame; -class AVStream; -class AVCodecContext; -class AVFormatContext; -struct AVRational; -struct SwsContext; -struct AVPacket; +extern "C" { + #include + #include + #include + #include + #include + #include +} class FFRecoder { diff --git a/src/decoder/dvpp/FFRecoderTaskManager.cpp b/src/decoder/dvpp/FFRecoderTaskManager.cpp new file mode 100644 index 0000000..6523a63 --- /dev/null +++ b/src/decoder/dvpp/FFRecoderTaskManager.cpp @@ -0,0 +1,283 @@ +#include "FFRecoderTaskManager.h" +#include + +struct RecodeThreadParam { + FFRecoderTaskManager* _this; + RecodeParam param; +}; + +static long get_cur_time() { + + chrono::time_point tpMicro + = chrono::time_point_cast(chrono::system_clock::now()); + + return tpMicro.time_since_epoch().count(); +} + +static string get_cur_datetime() +{ + using namespace std; + auto t = chrono::system_clock::to_time_t(chrono::system_clock::now()); + + char buffer[40]; + strftime(buffer, 80, "%Y_%m_%d_%H_%M_%S", std::localtime(&t)); + buffer[20] = '\0'; + return buffer; +} + +static bool is_key_frame(AVPacket *pkt) { + return (pkt->flags & AV_PKT_FLAG_KEY) != 0; +} + +FFRecoderTaskManager::FFRecoderTaskManager(){ + +} + +FFRecoderTaskManager::~FFRecoderTaskManager(){ + +} + +bool FFRecoderTaskManager::init(AVRational time_base, AVCodecContext* avctx){ + m_time_base = time_base; + m_avctx = avctx; + + m_recoder_thread = new std::thread( + [](void* arg) { + FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg; + if(_this != nullptr) { + _this->recode_thread2(); + }else{ + LOG_ERROR("recode 线程启动失败 !"); + } + return (void*)0; + }, this); + + return true; +} + +void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){ + std::lock_guard l_pkt(m_pkt_list_mtx); + + // 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone + AVPacket *new_pkt = av_packet_clone(pkt); + + DataPacket* newDataPkt = new DataPacket(); + newDataPkt->pkt = new_pkt; + newDataPkt->frame_nb = frame_nb; + m_pkt_list.emplace_back(newDataPkt); + + if(is_key_frame(pkt)){ + // 越来越大的值 + newDataPkt->isKeyFrame = true; + LOG_INFO("key frame_nb: {}", frame_nb); + } else { + newDataPkt->isKeyFrame = false; + } + + std::lock_guard l_info(m_recoderinfo_list_mtx); + if(m_recoderinfo_list.size() <= 0){ + // 没有任务的时候,维持500的长度 + while(m_pkt_list.size() > 1000) { + DataPacket* dataPkt = m_pkt_list.front(); + delete dataPkt; + dataPkt = nullptr; + m_pkt_list.pop_front(); + } + } +} + +void FFRecoderTaskManager::save_intask_frame_nb(unsigned long long frame_nb) { + if(m_intask_frame_nb_list.size() <= 0) { + m_intask_frame_nb_list.push_back(frame_nb); + return; + } + + for(auto it = m_intask_frame_nb_list.begin(); it != m_intask_frame_nb_list.end(); it++) { + if(*it > frame_nb) { + m_intask_frame_nb_list.insert(it, frame_nb); + return; + } + } + + // 新 frame_nb 比所有的都大 + m_intask_frame_nb_list.push_back(frame_nb); +} + +void FFRecoderTaskManager::save_intask_recoderinfo(RecoderInfo info) { + std::lock_guard l(m_recoderinfo_list_mtx); + if(m_recoderinfo_list.size() <= 0) { + m_recoderinfo_list.push_back(info); + return; + } + + for(auto it = m_recoderinfo_list.begin(); it != m_recoderinfo_list.end(); it++) { + if(it->frame_nb > info.frame_nb) { + m_recoderinfo_list.insert(it, info); + return; + } + } + + // 新 frame_nb 比所有的都大 + m_recoderinfo_list.push_back(info); +} + +list::iterator FFRecoderTaskManager::getStartIterator(unsigned long long frame_nb){ + std::lock_guard l(m_pkt_list_mtx); + + auto it_first = m_pkt_list.begin(); + + long long start_frame_nb = (long long)(frame_nb - 375); + if(start_frame_nb <= 0) { + return it_first; + } + + auto it_second = m_pkt_list.begin(); + for(;it_second != m_pkt_list.end(); it_second++) { + DataPacket* dataPkt = *it_second; + if(dataPkt->isKeyFrame) { + it_first = it_second; + } + if(start_frame_nb >= (*it_first)->frame_nb && start_frame_nb <= (*it_second)->frame_nb) { + return it_first; + } + } + + return m_pkt_list.begin(); +} + +// 多线程版 +void FFRecoderTaskManager::create_recode_task(AVRational time_base, AVCodecContext* avctx, RecoderInfo& recoderInfo){ + + std::lock_guard l(m_task_creat_mtx); + + RecodeParam recodeParam; + recodeParam.time_base = time_base; + recodeParam.avctx = avctx; + recodeParam.recoderInfo = recoderInfo; + + RecodeThreadParam* threadParam = new RecodeThreadParam(); + threadParam->_this = this; + threadParam->param = recodeParam; + std::thread* recode_thread = new std::thread( + [](void* arg) { + RecodeThreadParam* threadParam = (RecodeThreadParam*)arg; + if(threadParam != nullptr){ + FFRecoderTaskManager* _this=(FFRecoderTaskManager*)threadParam->_this; + if(_this != nullptr) { + _this->recode_thread(threadParam->param); + }else{ + LOG_ERROR("recode 线程启动失败 !"); + } + } + return (void*)0; + }, threadParam); + + std::string id = recoderInfo.task_id + "_" + recoderInfo.object_id + "_" + std::to_string(recoderInfo.frame_nb); + m_id_recoderTask[id] = recode_thread; + + save_intask_frame_nb(recoderInfo.frame_nb); +} + +void FFRecoderTaskManager::create_recode_task2(RecoderInfo& recoderInfo) { + save_intask_recoderinfo(recoderInfo); +} + +// 多线程版 +void FFRecoderTaskManager::recode_thread(RecodeParam recodeParam){ + { + // 此处确保create_recode_task执行完成,m_id_recoderTask 已经保存当前线程信息 + std::lock_guard l(m_task_creat_mtx); + } + + RecoderInfo recoderInfo; + recoderInfo = recodeParam.recoderInfo; + std::string id = recoderInfo.task_id + "_" + recoderInfo.object_id + "_" + std::to_string(recoderInfo.frame_nb); + string file_name = recoderInfo.recoderDir + "/recoder_" + id + "_" + std::to_string(get_cur_time()) + ".mp4"; + FFRecoder ffrecoder; + bool bInit = ffrecoder.init(recodeParam.time_base, recodeParam.avctx, file_name.c_str()); + if (!bInit) { + LOG_ERROR("ffrecoder init error : {} {} {}", recoderInfo.task_id, recoderInfo.object_id, recoderInfo.frame_nb); + m_id_recoderTask.erase(id); + return; + } + LOG_INFO("record start, pkt_list size: {} id: {}", m_pkt_list.size(), id); + + int count = 0; + for (auto it = m_pkt_list.begin(); it != m_pkt_list.end() && count < 500; ++it) { + DataPacket* dataPkt = *it; + AVPacket* pkt = dataPkt->pkt; + ffrecoder.write_pkt(pkt); + count++; + } + + ffrecoder.flush(); + ffrecoder.uninit(); + + m_id_recoderTask.erase(id); + + LOG_INFO("record end : {}", file_name); +} + +void FFRecoderTaskManager::recode_thread2() { + + while(true) { + m_recoderinfo_list_mtx.lock(); + if(m_recoderinfo_list.size() <= 0){ + m_recoderinfo_list_mtx.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + continue; + } + + auto it_param = m_recoderinfo_list.begin(); + RecoderInfo recoderinfo = *it_param; + m_recoderinfo_list.pop_front(); + m_recoderinfo_list_mtx.unlock(); + + auto it_data = getStartIterator(recoderinfo.frame_nb); + if(it_data == m_pkt_list.end()) { + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + continue; + } + + LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb); + + m_pkt_list_mtx.lock(); + auto it = m_pkt_list.begin(); + while (it != it_data) { + DataPacket* dataPkt = m_pkt_list.front(); + delete dataPkt; + dataPkt = nullptr; + m_pkt_list.pop_front(); + it = m_pkt_list.begin(); + } + m_pkt_list_mtx.unlock(); + + std::string id = recoderinfo.task_id + "_" + recoderinfo.object_id + "_" + std::to_string(recoderinfo.frame_nb); + string file_name = recoderinfo.recoderDir + "/recoder_" + id + "_" + std::to_string(get_cur_time()) + ".mp4"; + FFRecoder ffrecoder; + bool bInit = ffrecoder.init(m_time_base, m_avctx, file_name.c_str()); + if (!bInit) { + LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb); + continue; + } + LOG_INFO("record start, pkt_list size: {} id: {}", m_pkt_list.size(), id); + + int count = 0; + auto it_save = it_data; + unsigned long long start_frame_nb = (*it_data)->frame_nb; + unsigned long long end_frame_nb = (*it_data)->frame_nb; + for (; it_save != m_pkt_list.end() && count < 500; ++it_save) { + DataPacket* dataPkt = *it_save; + AVPacket* pkt = dataPkt->pkt; + ffrecoder.write_pkt(pkt); + count++; + end_frame_nb = (*it_save)->frame_nb; + } + + // ffrecoder.flush(); + ffrecoder.uninit(); + + LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name); + } + +} \ No newline at end of file diff --git a/src/decoder/dvpp/FFRecoderTaskManager.h b/src/decoder/dvpp/FFRecoderTaskManager.h new file mode 100644 index 0000000..b338f7d --- /dev/null +++ b/src/decoder/dvpp/FFRecoderTaskManager.h @@ -0,0 +1,63 @@ +#include "FFRecoder.h" + +#include "../../ai_platform/common_header.h" +#include "depend_headers.h" + +#include +#include +#include +#include +#include +#include + +using namespace std; + +struct RecodeParam { + AVRational time_base; + RecoderInfo recoderInfo; + AVCodecContext* avctx; +}; + +class FFRecoderTaskManager { +public: + FFRecoderTaskManager(); + virtual ~FFRecoderTaskManager(); + + void cache_pkt(AVPacket* pkt, long long frame_nb); + void create_recode_task(AVRational time_base, AVCodecContext* avctx, RecoderInfo& recoderInfo); + + bool init(AVRational time_base, AVCodecContext* avctx); + void create_recode_task2(RecoderInfo& recoderInfo); + +public: + void recode_thread(RecodeParam param); + list::iterator getStartIterator(unsigned long long frame_nb); + + void recode_thread2(); + +private: + void save_intask_frame_nb(unsigned long long frame_nb); + void save_intask_recoderinfo(RecoderInfo info); + +private: + std::queue m_key_frame_interval; + std::list m_keyframe_nb_list; + std::list m_pkt_list; + mutex m_pkt_list_mtx; + unsigned long long m_last_key_frame_nb; + + std::list m_intask_frame_nb_list; + + std::list m_recoderinfo_list; + mutex m_recoderinfo_list_mtx; + + bool m_bExit{false}; + + map m_id_recoderTask; + mutex m_task_creat_mtx; + + AVRational m_time_base; + AVCodecContext* m_avctx; + + thread* m_recoder_thread {nullptr}; +}; \ No newline at end of file diff --git a/src/decoder/dvpp/VpcPicConverter.cpp b/src/decoder/dvpp/VpcPicConverter.cpp index 136506a..af62ad4 100755 --- a/src/decoder/dvpp/VpcPicConverter.cpp +++ b/src/decoder/dvpp/VpcPicConverter.cpp @@ -41,7 +41,7 @@ DvppDataMemory* VpcPicConverter::convert2bgr(acldvppPicDesc *inputDesc_, int out int out_buf_height = ALIGN_UP(out_height, 2); int out_buf_size = out_buf_width * out_buf_height; - DvppDataMemory* rgbMem = new DvppDataMemory(3, out_buf_width, out_buf_width, out_buf_height, out_buf_height, out_buf_size, "", to_string(m_devId), key_frame); + DvppDataMemory* rgbMem = new DvppDataMemory(3, out_buf_width, out_buf_width, out_buf_height, out_buf_height, out_buf_size, "", to_string(m_devId), key_frame, 0); void *outBufferDev_ = (void*)rgbMem->getMem(); acldvppPicDesc *outputDesc_= acldvppCreatePicDesc(); diff --git a/src/decoder/dvpp/depend_headers.h b/src/decoder/dvpp/depend_headers.h index 84b2bc7..52fd0ce 100755 --- a/src/decoder/dvpp/depend_headers.h +++ b/src/decoder/dvpp/depend_headers.h @@ -35,4 +35,18 @@ extern "C" { #include "../interface/interface_headers.h" #include "../interface/utiltools.hpp" + +struct DataPacket { + AVPacket* pkt {nullptr}; + unsigned long long frame_nb{0}; + bool isKeyFrame{false}; + + ~DataPacket(){ + if(pkt != nullptr) { + av_packet_free(&pkt); + pkt = nullptr; + } + } +}; + #endif \ No newline at end of file diff --git a/src/decoder/interface/AbstractDecoder.h b/src/decoder/interface/AbstractDecoder.h index 5aea5d3..3ee91a4 100755 --- a/src/decoder/interface/AbstractDecoder.h +++ b/src/decoder/interface/AbstractDecoder.h @@ -3,6 +3,8 @@ #include "interface_headers.h" +#include "../../ai_platform/common_header.h" + using namespace std; class AbstractDecoder{ @@ -38,6 +40,8 @@ public: virtual void setPostDecArg(const void* postDecArg) = 0; virtual void setFinishedDecArg(const void* finishedDecArg) = 0; + virtual void doRecode(RecoderInfo& recoderInfo) = 0; + public: bool isSnapTime(); diff --git a/src/decoder/interface/DecoderManager.cpp b/src/decoder/interface/DecoderManager.cpp index 830dfe3..469a43c 100755 --- a/src/decoder/interface/DecoderManager.cpp +++ b/src/decoder/interface/DecoderManager.cpp @@ -517,3 +517,23 @@ DeviceMemory* DecoderManager::snapshot_out_task(string& uri, int devId) { return devMem; } + + +void DecoderManager::doRecode(RecoderInfo& recoderInfo) { + string name = recoderInfo.task_id; + if (name.empty()){ + LOG_ERROR("name 为空!"); + return; + } + + std::lock_guard l(m_mutex); + + auto dec = decoderMap.find(name); + if (dec != decoderMap.end()){ + dec->second->doRecode(recoderInfo); + return; + } + + LOG_ERROR("没有找到name为{}的解码器",name); + return; +} \ No newline at end of file diff --git a/src/decoder/interface/DecoderManager.h b/src/decoder/interface/DecoderManager.h index f2685ea..676c725 100755 --- a/src/decoder/interface/DecoderManager.h +++ b/src/decoder/interface/DecoderManager.h @@ -273,6 +273,8 @@ public: **************************************************/ vector timing_snapshot_all(); + void doRecode(RecoderInfo& recoderInfo); + private: DecoderManager(){} diff --git a/src/decoder/interface/DeviceMemory.hpp b/src/decoder/interface/DeviceMemory.hpp index 63420ae..d739cc5 100755 --- a/src/decoder/interface/DeviceMemory.hpp +++ b/src/decoder/interface/DeviceMemory.hpp @@ -10,7 +10,7 @@ using namespace std; class DeviceMemory{ public: - DeviceMemory(int _channel, int _width, int _width_stride, int _height, int _height_stride, string _id, string _dev_id, bool _key_frame, bool _isused){ + DeviceMemory(int _channel, int _width, int _width_stride, int _height, int _height_stride, string _id, string _dev_id, bool _key_frame, unsigned long long _frame_nb, bool _isused){ channel = _channel; width = _width; width_stride = _width_stride; @@ -21,6 +21,7 @@ public: id = _id; device_id = _dev_id; key_frame = _key_frame; + frame_nb = _frame_nb; timestamp = UtilTools::get_cur_time_ms(); } @@ -80,6 +81,10 @@ public: return key_frame; } + unsigned long long getFrameNb() { + return frame_nb; + } + public: int data_size; bool isused; @@ -94,6 +99,7 @@ public: int channel{3}; bool key_frame; long index; + unsigned long long frame_nb; }; #endif \ No newline at end of file diff --git a/src/decoder/test.cpp b/src/decoder/test.cpp1 index 06af9ce..06af9ce 100644 --- a/src/decoder/test.cpp +++ b/src/decoder/test.cpp1 diff --git a/src/decoder/test_recoder.cpp b/src/decoder/test_recoder.cpp new file mode 100644 index 0000000..85f2e1a --- /dev/null +++ b/src/decoder/test_recoder.cpp @@ -0,0 +1,171 @@ +#include "./interface/DecoderManager.h" +#include +#include +#include + +using namespace std; + +struct decode_cbk_userdata{ + string task_id; + void* opaque; + void* opaque1; +}; + +deque m_RgbDataList; +mutex m_DataListMtx; + +thread* m_pAlgorthimThread{nullptr}; +bool m_bfinish{false}; +int m_devId = 0; +const char* task_id = "test0"; +int skip_frame_ = 5; +int m_batch_size = 20; + +void algorthim_process_thread(); +void algorthim_face_detect(vector vec_gpuMem); + +void post_decod_cbk(const void * userPtr, DeviceMemory* devFrame){ + do{ + if(m_bfinish){ + break; + } + m_DataListMtx.lock(); + if(m_RgbDataList.size() >= 30){ + m_DataListMtx.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + continue; + } + m_RgbDataList.push_back(devFrame); + m_DataListMtx.unlock(); + break; + }while (true); +} + +void decode_finished_cbk(const void * userPtr){ + decode_cbk_userdata* ptr = (decode_cbk_userdata*)userPtr; + if (ptr!= nullptr){ + printf("task finished: %s \n", ptr->task_id.c_str()); + } + delete ptr; + ptr = nullptr; +} + +int main(){ + + // 创建解码任务 + DecoderManager* pDecManager = DecoderManager::getInstance(); + + MgrDecConfig config; + config.name = task_id; + config.cfg.uri = "rtsp://admin:ad123456@192.168.60.165:554/cam/realmonitor?channel=1&subtype=0"; + config.cfg.post_decoded_cbk = post_decod_cbk; + config.cfg.decode_finished_cbk = decode_finished_cbk; + config.cfg.force_tcp = true; // rtsp用tcp + config.cfg.gpuid = to_string(m_devId); + config.cfg.skip_frame = skip_frame_; + + config.dec_type = DECODER_TYPE_DVPP; + + AbstractDecoder* dec = pDecManager->createDecoder(config); + if (!dec){ + printf("创建解码器失败 \n"); + return false; + } + + decode_cbk_userdata* userPtr = new decode_cbk_userdata; + userPtr->task_id = string(task_id); + pDecManager->setPostDecArg(config.name, userPtr); + pDecManager->setFinishedDecArg(config.name, userPtr); + + + int input_image_width = 0; + int input_image_height = 0; + pDecManager->getResolution(config.name, input_image_width, input_image_height); + + + // 创建算法线程 + m_pAlgorthimThread = new thread([](void* arg) { + algorthim_process_thread(); + return (void*)0; + } + , nullptr); + + pDecManager->startDecodeByName(config.name); + + while (getchar() != 'q'); +} + +void algorthim_process_thread(){ + + while (true){ + if(m_bfinish){ + break; + } + + vector vec_gpuMem; + m_DataListMtx.lock(); + while (!m_RgbDataList.empty()){ + DeviceMemory* gpuMem = m_RgbDataList.front(); + if(gpuMem->getMem() == nullptr){ + // 错误数据,直接删除 + delete gpuMem; + gpuMem = nullptr; + printf("mem is null \n"); + } else { + vec_gpuMem.push_back(gpuMem); + } + m_RgbDataList.pop_front(); + if(vec_gpuMem.size() >= m_batch_size){ + break; + } + } + m_DataListMtx.unlock(); + + if(vec_gpuMem.size() <= 0){ + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + continue; + } + + algorthim_face_detect(vec_gpuMem); + + for(int i=0;i < vec_gpuMem.size(); i++){ + DeviceMemory* mem = vec_gpuMem[i]; + if(mem->getSize() <= 0){ + continue; + } + delete mem; + mem = nullptr; + } + vec_gpuMem.clear(); + + } + + printf("algorthim_process_thread exit. \n"); +} + +static int interval = 0; +static int obj_id = 0; + +void algorthim_face_detect(vector vec_gpuMem) { + interval ++ ; + + if(interval % 50 != 0) { + return; + } + + for(int i= 0; i < vec_gpuMem.size(); i++) { + DeviceMemory* mem = vec_gpuMem[i]; + string task_id = mem->getId(); + + RecoderInfo recoderInfo; + recoderInfo.task_id = task_id; + recoderInfo.object_id = std::to_string(obj_id); + recoderInfo.recoderDir = "./res/recode"; + recoderInfo.frame_nb = mem->getFrameNb(); + DecoderManager* pDecManager = DecoderManager::getInstance(); + pDecManager->doRecode(recoderInfo); + + obj_id++; + + } +} \ No newline at end of file