FFRecoderTaskManager.cpp 10.3 KB
#include "FFRecoderTaskManager.h"
#include <chrono>

struct RecodeThreadParam {
    FFRecoderTaskManager* _this;
    RecodeParam param;
};

static long get_cur_time() {

	chrono::time_point<chrono::system_clock, chrono::milliseconds> tpMicro
		= chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now());

	return tpMicro.time_since_epoch().count();
}

static string get_cur_datetime()
{
	using namespace std;
	auto t = chrono::system_clock::to_time_t(chrono::system_clock::now());

	char buffer[40];
	strftime(buffer, 80, "%Y_%m_%d_%H_%M_%S", std::localtime(&t));
	buffer[20] = '\0';
	return buffer;
}

static bool is_key_frame(AVPacket *pkt) {  
    return (pkt->flags & AV_PKT_FLAG_KEY) != 0;  
}

FFRecoderTaskManager::FFRecoderTaskManager(){
    m_recoder_thread = nullptr;
    m_cache_thread = nullptr;

    m_bExit = false;
    m_bExitRecoderThread = false;
}

FFRecoderTaskManager::~FFRecoderTaskManager(){

    close();

    LOG_DEBUG("~FFRecoderTaskManager()");
}

bool FFRecoderTaskManager::init(int w, int h, int fps, int bit_rate) {
    m_width = w;
    m_height = h;
    m_bit_rate = bit_rate;

    if (fps > 1) {
        m_fps = fps;
    } else {
        m_fps = 25;
    }

    m_cache_thread = new std::thread(
        [](void* arg) {
            FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg;
            if(_this != nullptr) {
                _this->pkt_cache_thread();
            }else{
                LOG_ERROR("recode 线程启动失败 !");
            }
            return (void*)0;
        }, this);

    return true;
}

static AVPacket* packet_clone(AVPacket* pkt) {
    AVPacket *new_pkt = av_packet_alloc();
	av_init_packet( new_pkt );
    av_new_packet(new_pkt, pkt->size);
    memcpy(new_pkt->data, pkt->data, pkt->size);
    new_pkt->size = pkt->size;
    // new_pkt->pts = pkt->pts;
    // new_pkt->dts = pkt->dts;
    // new_pkt->stream_index = pkt->stream_index;
    // new_pkt->duration = pkt->duration;
    // new_pkt->pos = pkt->pos;
    // new_pkt->flags = pkt->flags;
    // av_copy_packet_side_data(new_pkt, pkt);
    return new_pkt;
}

void FFRecoderTaskManager::cache_pkt(AVPacket* pkt, long long frame_nb, string dec_name){
    if(m_bExit) {
        // 任务退出了就不再缓存数据了
        return;
    }

    // 考虑到一个AVPacket中的数据并不很大,为减少与解码模块的耦合度,方便管理,这里做一个clone
    AVPacket *new_pkt = packet_clone(pkt);

    DataPacket* newDataPkt = new DataPacket();
    newDataPkt->pkt = new_pkt;
    newDataPkt->frame_nb = frame_nb;

    if(is_key_frame(pkt)){
        // 越来越大的值
        newDataPkt->isKeyFrame = true;
        LOG_INFO("[{}] - key frame_nb: {}", dec_name, frame_nb);
    } else {
        newDataPkt->isKeyFrame = false;
    }

    AVPacket* npkt = newDataPkt->pkt;
    if(npkt == nullptr) {
        return ;
    } else if (npkt->data == nullptr || npkt->size <= 0){
        return ;
    }
    
    std::lock_guard<std::mutex> l_info(m_pkt_list_short_mtx);
    m_pkt_list_short.push_back(newDataPkt);
}

void FFRecoderTaskManager::save_intask_recoderinfo(RecoderInfo info) {
    std::lock_guard<std::mutex> l(m_recoderinfo_list_mtx);
    if(m_recoderinfo_list.size() <= 0) {
        m_recoderinfo_list.push_back(info);
         return;
    }

    for(auto it = m_recoderinfo_list.begin(); it != m_recoderinfo_list.end(); it++) {
        if(it->frame_nb > info.frame_nb) {
            m_recoderinfo_list.insert(it, info);
            return;
        }
    }
    
    // 新 frame_nb 比所有的都大
    m_recoderinfo_list.push_back(info);
}

list<DataPacket*>::iterator FFRecoderTaskManager::getStartIterator(unsigned long long frame_nb){

    auto it_first = m_pkt_list.begin();

    long long start_frame_nb = (long long)(frame_nb - 375);
    if(start_frame_nb <= 0) {
        return it_first;
    }

    auto it_second = m_pkt_list.begin();
    for(;it_second != m_pkt_list.end(); it_second++) {
        DataPacket* dataPkt = *it_second;
        if(dataPkt->isKeyFrame) {
            it_first = it_second;
        }
        if(start_frame_nb >= (*it_first)->frame_nb && start_frame_nb <= (*it_second)->frame_nb) {
            return it_first;
        }
    }

    return m_pkt_list.begin();
}

void FFRecoderTaskManager::create_recode_task(RecoderInfo& recoderInfo) {
    if(m_bExit) {
        // 任务退出了就不再接收录制任务
        return;
    }
    save_intask_recoderinfo(recoderInfo);
}

void FFRecoderTaskManager::pkt_cache_thread() {
    LOG_INFO("pkt_cache_thread start...");

     m_recoder_thread = new std::thread(
        [](void* arg) {
            FFRecoderTaskManager* _this=(FFRecoderTaskManager*)arg;
            if(_this != nullptr) {
                _this->recode_thread();
            }else{
                LOG_ERROR("recode 线程启动失败 !");
            }
            return (void*)0;
        }, this);

    // 开始缓存
    while(true) {
        if(m_bExit) {
            break;
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(500));

        m_pkt_list_short_mtx.lock();
        m_pkt_list_mtx.lock();
        for (size_t i = 0; i < m_pkt_list_short.size(); i++) {
            auto item = m_pkt_list_short.front();
            m_pkt_list.push_back(item);
            m_pkt_list_short.pop_front();
        }
        m_pkt_list_mtx.unlock();
        m_pkt_list_short_mtx.unlock();

        std::lock_guard<std::mutex> l_info(m_recoderinfo_list_mtx);
        if(m_recoderinfo_list.size() <= 0){
            // 没有任务的时候,维持500的长度
            m_pkt_list_mtx.lock();
            while(m_pkt_list.size() > 1000) {
                DataPacket* dataPkt = m_pkt_list.front();
                delete dataPkt;
                dataPkt = nullptr;
                m_pkt_list.pop_front();
            }
            m_pkt_list_mtx.unlock();
        }
    }

    m_bExitRecoderThread = true;

    if (m_recoder_thread) {
        m_recoder_thread->join();
        delete m_recoder_thread;
        m_recoder_thread = nullptr;
    }

    // 清空数据
    while(!m_pkt_list.empty()) {
        DataPacket* dataPkt = m_pkt_list.front();
        delete dataPkt;
        dataPkt = nullptr;
        m_pkt_list.pop_front();
    }

    LOG_INFO("pkt_cache_thread end.");
}

void FFRecoderTaskManager::recode_thread() {
    LOG_INFO("recode_thread2 start...");
    while(true) {
        if(m_bExitRecoderThread) {
            break;
        }

        m_recoderinfo_list_mtx.lock();
        if(m_recoderinfo_list.size() <= 0){
            m_recoderinfo_list_mtx.unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(3));
            continue;
        }

        auto it_param = m_recoderinfo_list.begin();
        RecoderInfo recoderinfo = *it_param;
        m_recoderinfo_list_mtx.unlock();

        do
        {
            std::lock_guard<std::mutex> l_long(m_pkt_list_mtx);

            auto it_data = getStartIterator(recoderinfo.frame_nb);
            if(it_data == m_pkt_list.end()) {
                LOG_WARN("待保存信息已无相关数据包");
                std::this_thread::sleep_for(std::chrono::milliseconds(3));
                break;
            }

            auto it = m_pkt_list.begin();
            while (it != it_data) {
                DataPacket* dataPkt = m_pkt_list.front();
                delete dataPkt;
                dataPkt = nullptr;
                m_pkt_list.pop_front();
                it = m_pkt_list.begin();
            }

            LOG_INFO("start frame_nb: {}", (*it_data)->frame_nb);

            string file_name = recoderinfo.recoderPath;
            
            FFRecoder2 ffrecoder;
            bool bInit = ffrecoder.init(m_width, m_height, m_fps, m_bit_rate, file_name.c_str());
            if (!bInit) {
                LOG_ERROR("ffrecoder init error : {} {} {}", recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb);
                ffrecoder.uninit();
                break;
            }
            LOG_DEBUG("record start, pkt_list size: {} task_id: {}  object_id:{}  frame_nb: {}", m_pkt_list.size(), recoderinfo.task_id, recoderinfo.object_id, recoderinfo.frame_nb);

            int count = 0;
            auto it_save = it_data;
            unsigned long long start_frame_nb = (*it_data)->frame_nb;
            unsigned long long end_frame_nb = (*it_data)->frame_nb;
            for (; it_save != m_pkt_list.end() && count < 500; ++it_save) {
                DataPacket* dataPkt = *it_save;
                if(dataPkt->frame_nb > recoderinfo.frame_nb) {
                    break;
                }
                AVPacket* pkt = dataPkt->pkt;
                if(pkt == nullptr) {
                    LOG_ERROR("{} pkt is nullptr", recoderinfo.task_id);
                    continue;
                } else if (pkt->data == nullptr || pkt->size <= 0){
                    LOG_ERROR("{} pkt data is nullptr", recoderinfo.task_id);
                    continue;
                }
                
                ffrecoder.write_pkt(pkt);
                count++;
                end_frame_nb = (*it_save)->frame_nb;
            }

            ffrecoder.flush();
            ffrecoder.uninit();

            // 发送mq消息
            if(mq_publish_func && recoderinfo.mq_info.length() > 0) {
                mq_publish_func(recoderinfo.mq_info.c_str());
                // LOG_INFO("record save: {}", recoderinfo.mq_info.c_str());
            }

            LOG_INFO("record end, total save: {} start_frame_nb: {} end_frame_nb: {}  file_path: {}", count, start_frame_nb, end_frame_nb, file_name);
        } while (0);
        
        // m_recoderinfo_list 为空会触发 m_pkt_list size 大于1000时的删除操作,
        // 因此应当在本次m_pkt_list操作都完成之后再pop,避免这边在用m_pkt_list, 另一边在删除,从而导致崩溃
        m_recoderinfo_list_mtx.lock();
        m_recoderinfo_list.pop_front();
        m_recoderinfo_list_mtx.unlock();  
    }

    LOG_INFO("recode_thread2 end.");
}

void FFRecoderTaskManager::close() {
    m_bExit = true;

    if (m_cache_thread) {
        m_cache_thread->join();
        delete m_cache_thread;
        m_cache_thread = nullptr;
    }

    // 清空数据
    while(!m_pkt_list_short.empty()) {
        DataPacket* dataPkt = m_pkt_list_short.front();
        delete dataPkt;
        dataPkt = nullptr;
        m_pkt_list_short.pop_front();
    }
}

void FFRecoderTaskManager::set_mq_callback(mq_callback_t cb) {
    mq_publish_func = cb;
}