#include "FFRecoderTaskManager.h" #include struct RecodeThreadParam { FFRecoderTaskManager* _this; RecodeParam param; }; static long get_cur_time() { chrono::time_point tpMicro = chrono::time_point_cast(chrono::system_clock::now()); return tpMicro.time_since_epoch().count(); } static string get_cur_datetime() { using namespace std; auto t = chrono::system_clock::to_time_t(chrono::system_clock::now()); char buffer[40]; strftime(buffer, 80, "%Y_%m_%d_%H_%M_%S", std::localtime(&t)); buffer[20] = '\0'; return buffer; } static bool is_key_frame(AVPacket *pkt) { return (pkt->flags & AV_PKT_FLAG_KEY) != 0; } FFRecoderTaskManager::FFRecoderTaskManager(){ m_recoder_thread = nullptr; m_cache_thread = nullptr; m_bExit = false; m_bExitRecoderThread = false; } FFRecoderTaskManager::~FFRecoderTaskManager(){ close(); LOG_DEBUG("~FFRecoderTaskManager()"); } bool FFRecoderTaskManager::init(int w, int h, int fps, int bit_rate) { m_width = w; m_height = h; m_bit_rate = bit_rate; if (fps > 1) { m_fps = fps; } else { m_fps = 25; } m_cache_thread = new std::thread( [](void* arg) { FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg; if(_this != nullptr) { _this->pkt_cache_thread(); }else{ LOG_ERROR("recode 线程启动失败 !"); } return (void*)0; }, this); return true; } static AVPacket* packet_clone(AVPacket* pkt) { AVPacket *new_pkt = av_packet_alloc(); av_init_packet( new_pkt ); av_new_packet(new_pkt, pkt->size); memcpy(new_pkt->data, pkt->data, pkt->size); new_pkt->size = pkt->size; // new_pkt->pts = pkt->pts; // new_pkt->dts = pkt->dts; // new_pkt->stream_index = pkt->stream_index; // new_pkt->duration = pkt->duration; // new_pkt->pos = pkt->pos; // new_pkt->flags = pkt->flags; // av_copy_packet_side_data(new_pkt, pkt); return new_pkt; } static AVPacket* copy_packet(const AVPacket* src) { AVPacket* dst = av_packet_alloc(); // 分配内存 if (!dst) { return NULL; } // 复制所有字段 av_packet_ref(dst, src); // 复制音视频数据 dst->data = (uint8_t*)av_malloc(src->size); memcpy(dst->data, src->data, src->size); dst->size = src->size; return dst; } void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb, string dec_name){ if(m_bExit) { // 任务退出了就不再缓存数据了 return; } // 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone // AVPacket *new_pkt = copy_packet(pkt); DataPacket* newDataPkt = new DataPacket(); newDataPkt->pkt = pkt; newDataPkt->frame_nb = frame_nb; if(is_key_frame(pkt)){ // 越来越大的值 newDataPkt->isKeyFrame = true; LOG_INFO("[{}] - key frame_nb: {}", dec_name, frame_nb); } else { newDataPkt->isKeyFrame = false; } AVPacket* npkt = newDataPkt->pkt; if(npkt == nullptr) { return ; } else if (npkt->data == nullptr || npkt->size <= 0){ return ; } std::lock_guard l_info(m_pkt_list_short_mtx); m_pkt_list_short.push_back(newDataPkt); } void FFRecoderTaskManager::save_intask_recoderinfo(RecoderInfo info) { std::lock_guard l(m_recoderinfo_list_mtx); if(m_recoderinfo_list.size() <= 0) { m_recoderinfo_list.push_back(info); return; } for(auto it = m_recoderinfo_list.begin(); it != m_recoderinfo_list.end(); it++) { if(it->frame_nb > info.frame_nb) { m_recoderinfo_list.insert(it, info); return; } } // 新 frame_nb 比所有的都大 m_recoderinfo_list.push_back(info); } list::iterator FFRecoderTaskManager::getStartIterator(unsigned long long frame_nb){ auto it_first = m_pkt_list.begin(); long long start_frame_nb = (long long)(frame_nb - 375); if(start_frame_nb <= 0) { return it_first; } auto it_second = m_pkt_list.begin(); for(;it_second != m_pkt_list.end(); it_second++) { DataPacket* dataPkt = *it_second; if(dataPkt->isKeyFrame) { it_first = it_second; } if(start_frame_nb >= (*it_first)->frame_nb && start_frame_nb <= (*it_second)->frame_nb) { return it_first; } } return m_pkt_list.begin(); } void FFRecoderTaskManager::create_recode_task(RecoderInfo& recoderInfo) { if(m_bExit) { // 任务退出了就不再接收录制任务 return; } save_intask_recoderinfo(recoderInfo); } void FFRecoderTaskManager::pkt_cache_thread() { LOG_INFO("pkt_cache_thread start..."); m_recoder_thread = new std::thread( [](void* arg) { FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg; if(_this != nullptr) { _this->recode_thread(); }else{ LOG_ERROR("recode 线程启动失败 !"); } return (void*)0; }, this); // 开始缓存 while(true) { if(m_bExit) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(500)); m_pkt_list_short_mtx.lock(); m_pkt_list_mtx.lock(); for (size_t i = 0; i < m_pkt_list_short.size(); i++) { auto item = m_pkt_list_short.front(); m_pkt_list.push_back(item); m_pkt_list_short.pop_front(); } m_pkt_list_mtx.unlock(); m_pkt_list_short_mtx.unlock(); std::lock_guard l_info(m_recoderinfo_list_mtx); if(m_recoderinfo_list.size() <= 0){ // 没有任务的时候,维持500的长度 m_pkt_list_mtx.lock(); while(m_pkt_list.size() > 1000) { DataPacket* dataPkt = m_pkt_list.front(); delete dataPkt; dataPkt = nullptr; m_pkt_list.pop_front(); } m_pkt_list_mtx.unlock(); } } m_bExitRecoderThread = true; if (m_recoder_thread) { m_recoder_thread->join(); delete m_recoder_thread; m_recoder_thread = nullptr; } // 清空数据 while(!m_pkt_list.empty()) { DataPacket* dataPkt = m_pkt_list.front(); delete dataPkt; dataPkt = nullptr; m_pkt_list.pop_front(); } LOG_INFO("pkt_cache_thread end."); } void FFRecoderTaskManager::recode_thread() { LOG_INFO("recode_thread2 start..."); while(true) { if(m_bExitRecoderThread) { break; } m_recoderinfo_list_mtx.lock(); if(m_recoderinfo_list.size() <= 0){ m_recoderinfo_list_mtx.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(3)); continue; } auto it_param = m_recoderinfo_list.begin(); RecoderInfo recoderinfo = *it_param; m_recoderinfo_list_mtx.unlock(); do { std::lock_guard l_long(m_pkt_list_mtx); auto it_data = getStartIterator(recoderinfo.frame_nb); if(it_data == m_pkt_list.end()) { LOG_WARN("待保存信息已无相关数据包"); std::this_thread::sleep_for(std::chrono::milliseconds(3)); break; } auto it = m_pkt_list.begin(); while (it != it_data) { DataPacket* dataPkt = m_pkt_list.front(); delete dataPkt; dataPkt = nullptr; m_pkt_list.pop_front(); it = m_pkt_list.begin(); } LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb); string file_name = recoderinfo.recoderPath; FFRecoder2 ffrecoder; bool bInit = ffrecoder.init(m_width, m_height, m_fps, m_bit_rate, file_name.c_str()); if (!bInit) { LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb); ffrecoder.uninit(); break; } LOG_DEBUG("record start, pkt_list size: {} task_id: {} object_id:{} frame_nb: {}", m_pkt_list.size(), recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb); int count = 0; auto it_save = it_data; unsigned long long start_frame_nb = (*it_data)->frame_nb; unsigned long long end_frame_nb = (*it_data)->frame_nb; for (; it_save != m_pkt_list.end() && count < 500; ++it_save) { DataPacket* dataPkt = *it_save; if(dataPkt->frame_nb > recoderinfo.frame_nb) { break; } AVPacket* pkt = dataPkt->pkt; if(pkt == nullptr) { LOG_ERROR("{} pkt is nullptr", recoderinfo.task_id); continue; } else if (pkt->data == nullptr || pkt->size <= 0){ LOG_ERROR("{} pkt data is nullptr or size is {}", recoderinfo.task_id, pkt->size); continue; } ffrecoder.write_pkt(pkt); count++; end_frame_nb = (*it_save)->frame_nb; } ffrecoder.flush(); ffrecoder.uninit(); // 发送mq消息 if(mq_publish_func && recoderinfo.mq_info.length() > 0) { mq_publish_func(recoderinfo.mq_info.c_str()); // LOG_INFO("record save: {}", recoderinfo.mq_info.c_str()); } LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {} file_path: {}", count, start_frame_nb, end_frame_nb, file_name); } while (0); // m_recoderinfo_list 为空会触发 m_pkt_list size 大于1000时的删除操作, // 因此应当在本次m_pkt_list操作都完成之后再pop,避免这边在用m_pkt_list, 另一边在删除,从而导致崩溃 m_recoderinfo_list_mtx.lock(); m_recoderinfo_list.pop_front(); m_recoderinfo_list_mtx.unlock(); } LOG_INFO("recode_thread2 end."); } void FFRecoderTaskManager::close() { m_bExit = true; if (m_cache_thread) { m_cache_thread->join(); delete m_cache_thread; m_cache_thread = nullptr; } // 清空数据 while(!m_pkt_list_short.empty()) { DataPacket* dataPkt = m_pkt_list_short.front(); delete dataPkt; dataPkt = nullptr; m_pkt_list_short.pop_front(); } } void FFRecoderTaskManager::set_mq_callback(mq_callback_t cb) { mq_publish_func = cb; }