FFRecoderTaskManager.cpp 10.1 KB
#include "FFRecoderTaskManager.h"
#include <chrono>

struct RecodeThreadParam {
    FFRecoderTaskManager* _this;
    RecodeParam param;
};

static long get_cur_time() {

	chrono::time_point<chrono::system_clock, chrono::milliseconds> tpMicro
		= chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now());

	return tpMicro.time_since_epoch().count();
}

static string get_cur_datetime()
{
	using namespace std;
	auto t = chrono::system_clock::to_time_t(chrono::system_clock::now());

	char buffer[40];
	strftime(buffer, 80, "%Y_%m_%d_%H_%M_%S", std::localtime(&t));
	buffer[20] = '\0';
	return buffer;
}

static bool is_key_frame(AVPacket *pkt) {  
    return (pkt->flags & AV_PKT_FLAG_KEY) != 0;  
}

FFRecoderTaskManager::FFRecoderTaskManager(){

}

FFRecoderTaskManager::~FFRecoderTaskManager(){
    
}

bool FFRecoderTaskManager::init(AVRational time_base, AVCodecContext* avctx){
    m_time_base = time_base;
    m_avctx = avctx;

    m_recoder_thread = new std::thread(
        [](void* arg) {
            FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg;
            if(_this != nullptr) {
                _this->recode_thread2();
            }else{
                LOG_ERROR("recode 线程启动失败 !");
            }
            return (void*)0;
        }, this);

    return true;
}

void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb){
    if(m_bExit) {
        // 任务退出了就不再缓存数据了
        return;
    }

    std::lock_guard<std::mutex> l_pkt(m_pkt_list_mtx);

    // 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone
    AVPacket *new_pkt = av_packet_clone(pkt);

    DataPacket* newDataPkt = new DataPacket();
    newDataPkt->pkt = new_pkt;
    newDataPkt->frame_nb = frame_nb;
    m_pkt_list.emplace_back(newDataPkt);

    if(is_key_frame(pkt)){
        // 越来越大的值
        newDataPkt->isKeyFrame = true;
        LOG_INFO("key frame_nb: {}", frame_nb);
    } else {
        newDataPkt->isKeyFrame = false;
    }

    std::lock_guard<std::mutex> l_info(m_recoderinfo_list_mtx);
    if(m_recoderinfo_list.size() <= 0){
        // 没有任务的时候,维持500的长度
        while(m_pkt_list.size() > 1000) {
            DataPacket* dataPkt = m_pkt_list.front();
            delete dataPkt;
            dataPkt = nullptr;
            m_pkt_list.pop_front();
        }
    }
}

void FFRecoderTaskManager::save_intask_frame_nb(unsigned long long frame_nb) {
    if(m_intask_frame_nb_list.size() <= 0) {
        m_intask_frame_nb_list.push_back(frame_nb);
         return;
    }

    for(auto it = m_intask_frame_nb_list.begin(); it != m_intask_frame_nb_list.end(); it++) {
        if(*it > frame_nb) {
            m_intask_frame_nb_list.insert(it, frame_nb);
            return;
        }
    }
    
    // 新 frame_nb 比所有的都大
    m_intask_frame_nb_list.push_back(frame_nb);
}

void FFRecoderTaskManager::save_intask_recoderinfo(RecoderInfo info) {
    std::lock_guard<std::mutex> l(m_recoderinfo_list_mtx);
    if(m_recoderinfo_list.size() <= 0) {
        m_recoderinfo_list.push_back(info);
         return;
    }

    for(auto it = m_recoderinfo_list.begin(); it != m_recoderinfo_list.end(); it++) {
        if(it->frame_nb > info.frame_nb) {
            m_recoderinfo_list.insert(it, info);
            return;
        }
    }
    
    // 新 frame_nb 比所有的都大
    m_recoderinfo_list.push_back(info);
}

list<DataPacket*>::iterator FFRecoderTaskManager::getStartIterator(unsigned long long frame_nb){
    std::lock_guard<std::mutex> l(m_pkt_list_mtx);

    auto it_first = m_pkt_list.begin();

    long long start_frame_nb = (long long)(frame_nb - 375);
    if(start_frame_nb <= 0) {
        return it_first;
    }

    // auto it_second = m_pkt_list.begin();
    // for(;it_second != m_pkt_list.end(); it_second++) {
    //     DataPacket* dataPkt = *it_second;
    //     if (dataPkt->frame_nb >= start_frame_nb){
    //         return it_second;
    //     }
    // }

    auto it_second = m_pkt_list.begin();
    for(;it_second != m_pkt_list.end(); it_second++) {
        DataPacket* dataPkt = *it_second;
        if(dataPkt->isKeyFrame) {
            it_first = it_second;
        }
        if(start_frame_nb >= (*it_first)->frame_nb && start_frame_nb <= (*it_second)->frame_nb) {
            return it_first;
        }
    }

    return m_pkt_list.begin();
}

// 多线程版
void FFRecoderTaskManager::create_recode_task(AVRational time_base, AVCodecContext* avctx, RecoderInfo& recoderInfo){

    std::lock_guard<std::mutex> l(m_task_creat_mtx);

    RecodeParam recodeParam;
    recodeParam.time_base = time_base;
    recodeParam.avctx = avctx;
    recodeParam.recoderInfo = recoderInfo;

    RecodeThreadParam* threadParam = new RecodeThreadParam();
    threadParam->_this = this;
    threadParam->param = recodeParam;
    std::thread* recode_thread = new std::thread(
        [](void* arg) {
            RecodeThreadParam* threadParam = (RecodeThreadParam*)arg;
            if(threadParam != nullptr){
                FFRecoderTaskManager* _this=(FFRecoderTaskManager*)threadParam->_this;
                if(_this != nullptr) {
                    _this->recode_thread(threadParam->param);
                }else{
                    LOG_ERROR("recode 线程启动失败 !");
                }
            }
            return (void*)0;
        }, threadParam);

    std::string id = recoderInfo.task_id + "_" + recoderInfo.object_id + "_" + std::to_string(recoderInfo.frame_nb);
    m_id_recoderTask[id] = recode_thread;

    save_intask_frame_nb(recoderInfo.frame_nb);
}

void FFRecoderTaskManager::create_recode_task2(RecoderInfo& recoderInfo) {
    if(m_bExit) {
        // 任务退出了就不再接收录制任务
        return;
    }
    save_intask_recoderinfo(recoderInfo);
}

// 多线程版
void FFRecoderTaskManager::recode_thread(RecodeParam recodeParam){
    {
        // 此处确保create_recode_task执行完成,m_id_recoderTask 已经保存当前线程信息
        std::lock_guard<std::mutex> l(m_task_creat_mtx);
    }

    RecoderInfo recoderInfo;
    recoderInfo = recodeParam.recoderInfo;
    std::string id = recoderInfo.task_id + "_" + recoderInfo.object_id + "_" + std::to_string(recoderInfo.frame_nb);
    string file_name = recoderInfo.recoderDir + "/recoder_" + id + "_" + std::to_string(get_cur_time()) + ".mp4";
    FFRecoder ffrecoder;
    bool bInit = ffrecoder.init(recodeParam.time_base, recodeParam.avctx, file_name.c_str());
    if (!bInit) {
        LOG_ERROR("ffrecoder init error : {} {} {}", recoderInfo.task_id, recoderInfo.object_id, recoderInfo.frame_nb);
        m_id_recoderTask.erase(id);
        return;
    }
    LOG_INFO("record start, pkt_list size: {} id: {}", m_pkt_list.size(), id);

    int count = 0;
    for (auto it = m_pkt_list.begin(); it != m_pkt_list.end() && count < 500; ++it) {
        DataPacket* dataPkt = *it;
        AVPacket* pkt = dataPkt->pkt;
        ffrecoder.write_pkt(pkt);
        count++;
    }

    ffrecoder.flush();
    ffrecoder.uninit();

    m_id_recoderTask.erase(id);

    LOG_INFO("record end : {}", file_name);
}

void FFRecoderTaskManager::recode_thread2() {
    LOG_INFO("recode_thread2 start...");
    while(true) {
        if(m_bExit) {
            break;
        }

        m_recoderinfo_list_mtx.lock();
        if(m_recoderinfo_list.size() <= 0){
            m_recoderinfo_list_mtx.unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(3));
            continue;
        }

        auto it_param = m_recoderinfo_list.begin();
        RecoderInfo recoderinfo = *it_param;
        m_recoderinfo_list.pop_front();
        m_recoderinfo_list_mtx.unlock();

        auto it_data = getStartIterator(recoderinfo.frame_nb);
        if(it_data == m_pkt_list.end()) {
            std::this_thread::sleep_for(std::chrono::milliseconds(3));
            continue;
        }

        LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb);

        m_pkt_list_mtx.lock();
        auto it = m_pkt_list.begin();
        while (it != it_data) {
            DataPacket* dataPkt = m_pkt_list.front();
            delete dataPkt;
            dataPkt = nullptr;
            m_pkt_list.pop_front();
            it = m_pkt_list.begin();
        }
        m_pkt_list_mtx.unlock();

        std::string id = recoderinfo.task_id + "_" + recoderinfo.object_id + "_" + std::to_string(recoderinfo.frame_nb);
        string file_name = recoderinfo.recoderDir + "/recoder_" + id + "_" + std::to_string(get_cur_time()) + ".mp4";
        FFRecoder ffrecoder;
        bool bInit = ffrecoder.init(m_time_base, m_avctx, file_name.c_str());
        if (!bInit) {
            LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb);
            ffrecoder.uninit();
            continue;
        }
        LOG_INFO("record start, pkt_list size: {} id: {}", m_pkt_list.size(), id);

        int count = 0;
        auto it_save = it_data;
        unsigned long long start_frame_nb = (*it_data)->frame_nb;
        unsigned long long end_frame_nb = (*it_data)->frame_nb;
        for (; it_save != m_pkt_list.end() && count < 500; ++it_save) {
            DataPacket* dataPkt = *it_save;
            AVPacket* pkt = dataPkt->pkt;
            ffrecoder.write_pkt(pkt);
            count++;
            end_frame_nb = (*it_save)->frame_nb;
        }

        // ffrecoder.flush();
        ffrecoder.uninit();

        // 发送mq消息
        if(mq_publish_func) {
            mq_publish_func(recoderinfo.mq_info.c_str());
        }

        LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {}  file_path: {}", count, start_frame_nb, end_frame_nb, file_name);
    }

    LOG_INFO("recode_thread2 end.");
}

void FFRecoderTaskManager::close() {
    m_bExit = true;

    if (m_recoder_thread) {
        m_recoder_thread->join();
        m_recoder_thread = nullptr;
    }

    // 清空数据
    while(!m_pkt_list.empty()) {
        DataPacket* dataPkt = m_pkt_list.front();
        delete dataPkt;
        dataPkt = nullptr;
        m_pkt_list.pop_front();
    }
}

void FFRecoderTaskManager::set_mq_callback(mq_callback_t cb) {
    mq_publish_func = cb;
}