diff --git a/src/decoder/dvpp/DvppDecoder.cpp b/src/decoder/dvpp/DvppDecoder.cpp index 00a826d..2f5d946 100755 --- a/src/decoder/dvpp/DvppDecoder.cpp +++ b/src/decoder/dvpp/DvppDecoder.cpp @@ -150,7 +150,7 @@ AVCodecContext* DvppDecoder::init_FFmpeg(FFDecConfig config){ pix_fmt = (AVPixelFormat)codecpar->format; m_fps = av_q2d(stream ->avg_frame_rate); - m_recoderManager.init(stream->time_base, avctx); + m_recoderManager.init(stream, avctx); LOG_INFO("[{}]- init ffmpeg success! input:{} frame_width:{} frame_height:{} fps:{} ", m_dec_name, input_file, frame_width, frame_height, m_fps); @@ -379,9 +379,6 @@ void DvppDecoder::read_thread() { if (video_index == pkt->stream_index){ - frame_nb++; - m_recoderManager.cache_pkt(pkt, frame_nb); - ret = av_bsf_send_packet(h264bsfc, pkt); if(ret < 0) { LOG_ERROR("[{}]- av_bsf_send_packet error!", m_dec_name); @@ -401,6 +398,9 @@ void DvppDecoder::read_thread() { break; } + frame_nb++; + m_recoderManager.cache_pkt(pkt, frame_nb); + m_pktQueue_mutex.lock(); DataPacket* data_pkt = new DataPacket(); data_pkt->pkt = pkt; diff --git a/src/decoder/dvpp/FFRecoder.cpp b/src/decoder/dvpp/FFRecoder.cpp index ebed6a7..dd47502 100644 --- a/src/decoder/dvpp/FFRecoder.cpp +++ b/src/decoder/dvpp/FFRecoder.cpp @@ -121,11 +121,12 @@ bool FFRecoder::init(int w, int h, AVRational time_base, AVCodecContext* avctx, return true; } -bool FFRecoder::init(AVRational time_base, AVCodecContext* avctx, const char* outfile_name) { +bool FFRecoder::init(AVStream* stream, AVCodecContext* avctx, const char* outfile_name) { codec_ctx_ = (AVCodecContext*)av_malloc(sizeof(AVCodecContext)); avcodec_copy_context(codec_ctx_, avctx); - codec_ctx_->time_base = time_base; + codec_ctx_->time_base = stream->time_base; + m_inStream = stream; // [2] 创建输出上下文 avformat_alloc_output_context2(&fmt_ctx_, nullptr, nullptr, outfile_name); @@ -135,7 +136,8 @@ bool FFRecoder::init(AVRational time_base, AVCodecContext* avctx, const char* ou out_stream_->id = 0; out_stream_->codecpar->codec_tag = 0; avcodec_parameters_from_context(out_stream_->codecpar, codec_ctx_); - out_stream_->time_base = { 1,30 }; + // out_stream_->time_base = { 1,30 }; + out_stream_->time_base = stream->time_base; av_dump_format(fmt_ctx_, out_stream_->id, outfile_name, 1); @@ -244,14 +246,32 @@ void FFRecoder::update_pts(AVPacket* pkt) { bool FFRecoder::write_pkt(AVPacket *pkt) { char errbuf[64]{ 0 }; - // frame_number++; - // pkt->pts = av_rescale_q(frame_number, codec_ctx_->time_base, out_stream_->time_base); - // pkt->dts = pkt->pts; - // pkt->duration = av_rescale_q(1, codec_ctx_->time_base, out_stream_->time_base); + // av_packet_rescale_ts(pkt, codec_ctx_->time_base, out_stream_->time_base); + // update_pts(pkt); + // pkt->stream_index = out_stream_->index; + + // if(pkt->pts==AV_NOPTS_VALUE) + { + // printf("frame_index:%d\n", frame_index); + //Write PTS + AVRational time_base1 = codec_ctx_->time_base; + //Duration between 2 frames (us) + int64_t calc_duration = (double)AV_TIME_BASE / av_q2d(m_inStream->r_frame_rate); + //Parameters + pkt->pts = (double)(frame_index*calc_duration) / (double)(av_q2d(time_base1)*AV_TIME_BASE); + pkt->dts = pkt->pts; + pkt->duration = (double)calc_duration / (double)(av_q2d(time_base1)*AV_TIME_BASE); + frame_index++; + } + // Convert PTS/DTS + pkt->pts = av_rescale_q_rnd(pkt->pts, codec_ctx_->time_base, out_stream_->time_base, (enum AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX)); + pkt->dts = av_rescale_q_rnd(pkt->dts, codec_ctx_->time_base, out_stream_->time_base, (enum AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX)); + pkt->duration = av_rescale_q(pkt->duration, codec_ctx_->time_base, out_stream_->time_base); + pkt->pos = -1; - av_packet_rescale_ts(pkt, codec_ctx_->time_base, out_stream_->time_base); pkt->stream_index = out_stream_->index; - update_pts(pkt); + + // 将数据写入到输出流 int ret = av_interleaved_write_frame(fmt_ctx_, pkt); if (ret < 0) { diff --git a/src/decoder/dvpp/FFRecoder.h b/src/decoder/dvpp/FFRecoder.h index 001b2d6..0cfaa7c 100644 --- a/src/decoder/dvpp/FFRecoder.h +++ b/src/decoder/dvpp/FFRecoder.h @@ -24,7 +24,7 @@ public: bool flush(); // AVPacket 方式 - bool init(AVRational time_base, AVCodecContext* avctx, const char* outfile_name); + bool init(AVStream* stream, AVCodecContext* avctx, const char* outfile_name); bool write_pkt(AVPacket *pkt); private: @@ -50,5 +50,9 @@ private: int64_t last_src_pts; int64_t last_pts; - int64_t frame_number{0}; + int64_t first_pts; + int64_t first_dts; + + int64_t frame_index{0}; + AVStream* m_inStream; }; \ No newline at end of file diff --git a/src/decoder/dvpp/FFRecoderTaskManager.cpp b/src/decoder/dvpp/FFRecoderTaskManager.cpp index 7328d84..2ba2a0f 100644 --- a/src/decoder/dvpp/FFRecoderTaskManager.cpp +++ b/src/decoder/dvpp/FFRecoderTaskManager.cpp @@ -37,9 +37,10 @@ FFRecoderTaskManager::~FFRecoderTaskManager(){ } -bool FFRecoderTaskManager::init(AVRational time_base, AVCodecContext* avctx){ - m_time_base = time_base; +bool FFRecoderTaskManager::init(AVStream* stream, AVCodecContext* avctx){ + m_time_base = stream->time_base; m_avctx = avctx; + m_inStream = stream; m_recoder_thread = new std::thread( [](void* arg) { @@ -55,6 +56,24 @@ bool FFRecoderTaskManager::init(AVRational time_base, AVCodecContext* avctx){ return true; } +bool FFRecoderTaskManager::init3(AVRational time_base, AVCodecContext* avctx){ + m_time_base = time_base; + m_avctx = avctx; + + m_recoder_thread = new std::thread( + [](void* arg) { + FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg; + if(_this != nullptr) { + _this->recode_thread3(); + }else{ + LOG_ERROR("recode 线程启动失败 !"); + } + return (void*)0; + }, this); + + return true; +} + void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){ if(m_bExit) { // 任务退出了就不再缓存数据了 @@ -91,6 +110,42 @@ void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){ } } +void FFRecoderTaskManager::cache_frame(AVFrame* frame, long long frame_nb){ + if(m_bExit) { + // 任务退出了就不再缓存数据了 + return; + } + + std::lock_guard l_pkt(m_frame_list_mtx); + + // 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone + AVFrame *new_pkt = av_frame_clone(frame); + + DataFrame* newFrame = new DataFrame(); + newFrame->frame = new_pkt; + newFrame->frame_nb = frame_nb; + m_frame_list.emplace_back(newFrame); + + // if(is_key_frame(pkt)){ + // // 越来越大的值 + // newFrame->isKeyFrame = true; + // LOG_INFO("key frame_nb: {}", frame_nb); + // } else { + // newFrame->isKeyFrame = false; + // } + + std::lock_guard l_info(m_recoderinfo_list_mtx); + if(m_recoderinfo_list.size() <= 0){ + // 没有任务的时候,维持500的长度 + while(m_frame_list.size() > 1000) { + DataFrame* dataPkt = m_frame_list.front(); + delete dataPkt; + dataPkt = nullptr; + m_frame_list.pop_front(); + } + } +} + void FFRecoderTaskManager::save_intask_frame_nb(unsigned long long frame_nb) { if(m_intask_frame_nb_list.size() <= 0) { m_intask_frame_nb_list.push_back(frame_nb); @@ -158,6 +213,43 @@ list::iterator FFRecoderTaskManager::getStartIterator(unsigned long return m_pkt_list.begin(); } +list::iterator FFRecoderTaskManager::getEndIterator(unsigned long long frame_nb){ + std::lock_guard l(m_pkt_list_mtx); + + auto it_first = m_pkt_list.end(); + + auto it_second = m_pkt_list.begin(); + for(;it_second != m_pkt_list.end(); it_second++) { + DataPacket* dataPkt = *it_second; + if (dataPkt->isKeyFrame && dataPkt->frame_nb >= frame_nb){ + return it_second; + } + } + + return m_pkt_list.end(); +} + +list::iterator FFRecoderTaskManager::getStartIterator3(unsigned long long frame_nb) { + std::lock_guard l(m_frame_list_mtx); + + auto it_first = m_frame_list.begin(); + + long long start_frame_nb = (long long)(frame_nb - 375); + if(start_frame_nb <= 0) { + return it_first; + } + + auto it_second = m_frame_list.begin(); + for(;it_second != m_frame_list.end(); it_second++) { + DataFrame* dataPkt = *it_second; + if (dataPkt->frame_nb >= start_frame_nb){ + return it_second; + } + } + + return m_frame_list.begin(); +} + // 多线程版 void FFRecoderTaskManager::create_recode_task(AVRational time_base, AVCodecContext* avctx, RecoderInfo& recoderInfo){ @@ -211,7 +303,7 @@ void FFRecoderTaskManager::recode_thread(RecodeParam recodeParam){ std::string id = recoderInfo.task_id + "_" + recoderInfo.object_id + "_" + std::to_string(recoderInfo.frame_nb); string file_name = recoderInfo.recoderDir + "/recoder_" + id + "_" + std::to_string(get_cur_time()) + ".mp4"; FFRecoder ffrecoder; - bool bInit = ffrecoder.init(recodeParam.time_base, recodeParam.avctx, file_name.c_str()); + bool bInit = ffrecoder.init(m_inStream, recodeParam.avctx, file_name.c_str()); if (!bInit) { LOG_ERROR("ffrecoder init error : {} {} {}", recoderInfo.task_id, recoderInfo.object_id, recoderInfo.frame_nb); m_id_recoderTask.erase(id); @@ -260,6 +352,161 @@ void FFRecoderTaskManager::recode_thread2() { continue; } + auto it_end = getEndIterator(recoderinfo.frame_nb); + + LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb); + + m_pkt_list_mtx.lock(); + auto it = m_pkt_list.begin(); + while (it != it_data) { + DataPacket* dataPkt = m_pkt_list.front(); + delete dataPkt; + dataPkt = nullptr; + m_pkt_list.pop_front(); + it = m_pkt_list.begin(); + } + m_pkt_list_mtx.unlock(); + + std::string id = recoderinfo.task_id + "_" + recoderinfo.object_id + "_" + std::to_string(recoderinfo.frame_nb); + string file_name = recoderinfo.recoderDir + "/recoder_" + id + "_" + std::to_string(get_cur_time()) + ".mp4"; + FFRecoder ffrecoder; + bool bInit = ffrecoder.init(m_inStream, m_avctx, file_name.c_str()); + if (!bInit) { + LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb); + ffrecoder.uninit(); + continue; + } + LOG_INFO("record start, pkt_list size: {} id: {}", m_pkt_list.size(), id); + + int count = 0; + auto it_save = it_data; + unsigned long long start_frame_nb = (*it_data)->frame_nb; + unsigned long long end_frame_nb = (*it_data)->frame_nb; + for (; it_save != m_pkt_list.end() && count < 500; ++it_save) { + DataPacket* dataPkt = *it_save; + if(dataPkt->frame_nb > recoderinfo.frame_nb) { + break; + } + AVPacket* pkt = dataPkt->pkt; + ffrecoder.write_pkt(pkt); + count++; + end_frame_nb = (*it_save)->frame_nb; + } + + // ffrecoder.flush(); + ffrecoder.uninit(); + + // 发送mq消息 + if(mq_publish_func) { + mq_publish_func(recoderinfo.mq_info.c_str()); + } + + LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name); + } + + LOG_INFO("recode_thread2 end."); +} + +void FFRecoderTaskManager::recode_thread3() { + LOG_INFO("recode_thread2 start..."); + while(true) { + if(m_bExit) { + break; + } + + m_recoderinfo_list_mtx.lock(); + if(m_recoderinfo_list.size() <= 0){ + m_recoderinfo_list_mtx.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + continue; + } + + auto it_param = m_recoderinfo_list.begin(); + RecoderInfo recoderinfo = *it_param; + m_recoderinfo_list.pop_front(); + m_recoderinfo_list_mtx.unlock(); + + auto it_data = getStartIterator3(recoderinfo.frame_nb); + if(it_data == m_frame_list.end()) { + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + continue; + } + + LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb); + + m_frame_list_mtx.lock(); + auto it = m_frame_list.begin(); + while (it != it_data) { + DataFrame* dataPkt = m_frame_list.front(); + delete dataPkt; + dataPkt = nullptr; + m_frame_list.pop_front(); + it = m_frame_list.begin(); + } + m_frame_list_mtx.unlock(); + + std::string id = recoderinfo.task_id + "_" + recoderinfo.object_id + "_" + std::to_string(recoderinfo.frame_nb); + string file_name = recoderinfo.recoderDir + "/recoder_" + id + "_" + std::to_string(get_cur_time()) + ".mp4"; + FFRecoder ffrecoder; + bool bInit = ffrecoder.init(m_avctx->width, m_avctx->height, m_time_base, m_avctx, file_name.c_str()); + if (!bInit) { + LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb); + ffrecoder.uninit(); + continue; + } + LOG_INFO("record start, pkt_list size: {} id: {}", m_frame_list.size(), id); + + int count = 0; + auto it_save = it_data; + unsigned long long start_frame_nb = (*it_data)->frame_nb; + unsigned long long end_frame_nb = (*it_data)->frame_nb; + for (; it_save != m_frame_list.end() && count < 500; ++it_save) { + DataFrame* dataPkt = *it_save; + AVFrame* pkt = dataPkt->frame; + ffrecoder.write_frame(pkt); + count++; + end_frame_nb = (*it_save)->frame_nb; + } + + // ffrecoder.flush(); + ffrecoder.uninit(); + + // 发送mq消息 + if(mq_publish_func) { + mq_publish_func(recoderinfo.mq_info.c_str()); + } + + LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name); + } + + LOG_INFO("recode_thread2 end."); +} + +void FFRecoderTaskManager::recode_thread4() { + LOG_INFO("recode_thread2 start..."); + while(true) { + if(m_bExit) { + break; + } + + m_recoderinfo_list_mtx.lock(); + if(m_recoderinfo_list.size() <= 0){ + m_recoderinfo_list_mtx.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + continue; + } + + auto it_param = m_recoderinfo_list.begin(); + RecoderInfo recoderinfo = *it_param; + m_recoderinfo_list.pop_front(); + m_recoderinfo_list_mtx.unlock(); + + auto it_data = getStartIterator(recoderinfo.frame_nb); + if(it_data == m_pkt_list.end()) { + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + continue; + } + LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb); m_pkt_list_mtx.lock(); @@ -276,7 +523,7 @@ void FFRecoderTaskManager::recode_thread2() { std::string id = recoderinfo.task_id + "_" + recoderinfo.object_id + "_" + std::to_string(recoderinfo.frame_nb); string file_name = recoderinfo.recoderDir + "/recoder_" + id + "_" + std::to_string(get_cur_time()) + ".mp4"; FFRecoder ffrecoder; - bool bInit = ffrecoder.init(m_time_base, m_avctx, file_name.c_str()); + bool bInit = ffrecoder.init(m_inStream, m_avctx, file_name.c_str()); if (!bInit) { LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb); ffrecoder.uninit(); diff --git a/src/decoder/dvpp/FFRecoderTaskManager.h b/src/decoder/dvpp/FFRecoderTaskManager.h index a750e82..13a556f 100644 --- a/src/decoder/dvpp/FFRecoderTaskManager.h +++ b/src/decoder/dvpp/FFRecoderTaskManager.h @@ -28,18 +28,26 @@ public: void cache_pkt(AVPacket* pkt, long long frame_nb); void create_recode_task(AVRational time_base, AVCodecContext* avctx, RecoderInfo& recoderInfo); - bool init(AVRational time_base, AVCodecContext* avctx); + bool init(AVStream* stream, AVCodecContext* avctx); void create_recode_task2(RecoderInfo& recoderInfo); void close(); void set_mq_callback(mq_callback_t cb); + bool init3(AVRational time_base, AVCodecContext* avctx); + void cache_frame(AVFrame* frame, long long frame_nb); + public: void recode_thread(RecodeParam param); list::iterator getStartIterator(unsigned long long frame_nb); + list::iterator getEndIterator(unsigned long long frame_nb); void recode_thread2(); + void recode_thread4(); + + list::iterator getStartIterator3(unsigned long long frame_nb); + void recode_thread3(); private: void save_intask_frame_nb(unsigned long long frame_nb); @@ -64,8 +72,12 @@ private: AVRational m_time_base; AVCodecContext* m_avctx; + AVStream* m_inStream; thread* m_recoder_thread{nullptr}; mq_callback_t mq_publish_func; + + std::list m_frame_list; + mutex m_frame_list_mtx; }; \ No newline at end of file diff --git a/src/decoder/dvpp/depend_headers.h b/src/decoder/dvpp/depend_headers.h index 52fd0ce..562a3b9 100755 --- a/src/decoder/dvpp/depend_headers.h +++ b/src/decoder/dvpp/depend_headers.h @@ -49,4 +49,17 @@ struct DataPacket { } }; +struct DataFrame { + AVFrame* frame {nullptr}; + unsigned long long frame_nb{0}; + bool isKeyFrame{false}; + + ~DataFrame(){ + if(frame != nullptr) { + av_frame_free(&frame); + frame = nullptr; + } + } +}; + #endif \ No newline at end of file diff --git a/src/decoder/test_recoder.cpp b/src/decoder/test_recoder.cpp index 769ca5e..7b44c83 100644 --- a/src/decoder/test_recoder.cpp +++ b/src/decoder/test_recoder.cpp @@ -29,6 +29,8 @@ void algorthim_process_thread(); void recode_thread(); void algorthim_face_detect(vector vec_gpuMem); +void test_recode_thread(); + void post_decod_cbk(const void * userPtr, DeviceMemory* devFrame){ do{ if(m_bfinish){ @@ -62,7 +64,7 @@ int main(){ MgrDecConfig config; config.name = task_id; - config.cfg.uri = "rtsp://admin:ad123456@192.168.60.165:554/cam/realmonitor?channel=1&subtype=0"; + config.cfg.uri = "rtsp://122.97.218.170:8604/openUrl/LBBYTra?params=eyJwcm90b2NhbCI6InJ0c3AiLCJjbGllbnRUeXBlIjoib3Blbl9hcGkiLCJleHByaWVUaW1lIjotMSwicHJvdG9jb2wiOiJydHNwIiwiZXhwaXJlVGltZSI6MzAwLCJlbmFibGVNR0MiOnRydWUsImV4cGFuZCI6InN0YW5kYXJkPXJ0c3Amc3RyZWFtZm9ybT1ydHAiLCJhIjoiOTgzYjRjMmUxMThlNGU1OTlkYThmMTI3NTkyMGViODV8MXwwfDEiLCJ0IjoxfQ=="; config.cfg.post_decoded_cbk = post_decod_cbk; config.cfg.decode_finished_cbk = decode_finished_cbk; config.cfg.force_tcp = true; // rtsp用tcp @@ -95,8 +97,14 @@ int main(){ } , nullptr); + // m_recodeThread = new thread([](void* arg) { + // recode_thread(); + // return (void*)0; + // } + // , nullptr); + m_recodeThread = new thread([](void* arg) { - recode_thread(); + test_recode_thread(); return (void*)0; } , nullptr); @@ -183,6 +191,43 @@ void algorthim_face_detect(vector vec_gpuMem) { } } +void test_recode_thread() { + unsigned long long frame_index = 0; + while(true) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + DeviceMemory* mem = nullptr; + m_DataListMtx.lock(); + while (!m_RgbDataList.empty()){ + DeviceMemory* gpuMem = m_RgbDataList.front(); + if(gpuMem->getMem() == nullptr){ + // 错误数据,直接删除 + delete gpuMem; + gpuMem = nullptr; + printf("mem is null \n"); + } else { + frame_index ++ ; + if (frame_index % 50 == 0) { + RecoderInfo recoderInfo; + recoderInfo.task_id = gpuMem->getId(); + recoderInfo.object_id = std::to_string(obj_id); + recoderInfo.recoderDir = "./res/recode"; + recoderInfo.frame_nb = gpuMem->getFrameNb(); + + DecoderManager* pDecManager = DecoderManager::getInstance(); + pDecManager->doRecode(recoderInfo); + + obj_id++; + } + delete gpuMem; + gpuMem = nullptr; + } + m_RgbDataList.pop_front(); + } + m_DataListMtx.unlock(); + } +} + void recode_thread() { while(true) {