test_decoder.cpp 5.58 KB
#include "./interface/DecoderManager.h"
#include <mutex>
#include <thread>
#include <chrono>
#include "../common/logger.hpp"

using namespace std;

struct decode_cbk_userdata{
  string task_id;
  void* opaque;
  void* opaque1;
};

deque<DeviceMemory*> m_RgbDataList;
mutex m_DataListMtx; 

thread* m_pAlgorthimThread{nullptr};

bool m_bfinish{false};
int m_devId = 0;
const char* task_id = "test0";
int skip_frame_ = 5;
int m_batch_size = 20;


void algorthim_process_thread();

void snap_shot_thread();

void post_decod_cbk(const void * userPtr, DeviceMemory* devFrame){
    do{
        if(m_bfinish){
            break;
        }
        m_DataListMtx.lock();
        if(m_RgbDataList.size() >= 30){
            m_DataListMtx.unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(3));
            continue;
        }
        m_RgbDataList.push_back(devFrame);
        m_DataListMtx.unlock();
        break;
    }while (true);
}

void decode_finished_cbk(const void * userPtr){
    decode_cbk_userdata* ptr = (decode_cbk_userdata*)userPtr;
    if (ptr!= nullptr){
        printf("task finished: %s \n", ptr->task_id.c_str());
        delete ptr;
        ptr = nullptr;
    }
}

bool create_task(string task_id) {
    
    // 创建解码任务
    DecoderManager* pDecManager = DecoderManager::getInstance();

    MgrDecConfig config;
    config.name = task_id;
    config.cfg.uri = "/data/share/data/Street.uvf";
    config.cfg.post_decoded_cbk = post_decod_cbk;
    config.cfg.decode_finished_cbk = decode_finished_cbk;
    config.cfg.force_tcp = true;  // rtsp用tcp
    config.cfg.gpuid = to_string(m_devId);
    config.cfg.skip_frame = skip_frame_;

    config.dec_type = DECODER_TYPE_DVPP;
    
    AbstractDecoder* dec = pDecManager->createDecoder(config);
    if (!dec){
        printf("创建解码器失败 \n");
        return false;
    }

    // decode_cbk_userdata* userPtr = new decode_cbk_userdata;
    // userPtr->task_id = string(task_id);
    // pDecManager->setPostDecArg(config.name, userPtr);
    // pDecManager->setFinishedDecArg(config.name, userPtr);

    dec->setSnapTimeInterval(1000);


    int input_image_width = 0;
    int input_image_height = 0;
    pDecManager->getResolution(config.name, input_image_width, input_image_height);

    pDecManager->startDecodeByName(config.name);
    // pDecManager->closeAllDecoder();

    return true;
}

int main(){

    set_default_logger(LogLevel(0), "test_decode","logs/main.log", 64 * 1024 * 1024, 64 * 1024 * 1024);
    LOG_INFO("编译时间:{} {}", __DATE__, __TIME__);

    printf("start... \n");

    DecoderManager* pDecManager = DecoderManager::getInstance();


    // 创建算法线程
    m_pAlgorthimThread = new thread([](void* arg) {
      algorthim_process_thread();
      return (void*)0;
    }
    , nullptr);

    thread snap_thread = thread([](void* arg) {
      snap_shot_thread();
      return (void*)0;
    }
    , nullptr);

    

    char ch = 'a';
    int task_id = 1;

    while (ch != 'q') {
        ch = getchar();
        switch (ch)
        {
        case '4':
            for (size_t i = 0; i < 4; i++) 
            {
                create_task(to_string(task_id));  
                task_id++;
                std::this_thread::sleep_for(std::chrono::seconds(1));
            }
            break;
        case 'a':
                create_task(to_string(task_id));  
                task_id++;
                std::this_thread::sleep_for(std::chrono::seconds(1));
            break;
        case 'c':
            pDecManager->closeAllDecoder();
            break;
        default:
            break;
        }
    }
}

void do_work(vector<DeviceMemory*> vec_gpuMem){
  for(int i=0;i < vec_gpuMem.size(); i++){
      DeviceMemory* mem = vec_gpuMem[i];
    //   printf("task:%s  width:%d  height:%d ts:%lld \n", mem->getId().c_str(), mem->getWidth(), mem->getHeight(), mem->getTimesstamp());
  }
}

void algorthim_process_thread(){

    while (true){
        if(m_bfinish){
            break;
        }

        vector<DeviceMemory*> vec_gpuMem;
        m_DataListMtx.lock();
        while (!m_RgbDataList.empty()){
            DeviceMemory* gpuMem = m_RgbDataList.front();
            if(gpuMem->getMem() == nullptr){
              // 错误数据,直接删除
              delete gpuMem;
              gpuMem = nullptr;
              printf("mem is null \n");
            } else {
              vec_gpuMem.push_back(gpuMem);
            }
            m_RgbDataList.pop_front();
            if(vec_gpuMem.size() >= m_batch_size){
                break;
            }
        }
        m_DataListMtx.unlock();

        if(vec_gpuMem.size() <= 0){
            std::this_thread::sleep_for(std::chrono::milliseconds(3));
            continue;
        }

        // do work
        do_work(vec_gpuMem);

        for(int i=0;i < vec_gpuMem.size(); i++){
            DeviceMemory* mem = vec_gpuMem[i];
            if(mem->getSize() <= 0){
              continue;
            }
            delete mem;
            mem = nullptr;
        }
        // vec_gpuMem.clear();

        vector<DeviceMemory*>().swap(vec_gpuMem);

    }

    printf("algorthim_process_thread exit. \n");
}

void snap_shot_thread(){
    DecoderManager* pDecManager = DecoderManager::getInstance();
      while (true){
        if(m_bfinish){
            break;
        }

        vector<DeviceMemory*> vec_devMem = pDecManager->timing_snapshot_all();
        for(auto devMem : vec_devMem){
            delete devMem;
            devMem = nullptr;
        }
        // vec_devMem.clear();

        vector<DeviceMemory*>().swap(vec_devMem);

        std::this_thread::sleep_for(std::chrono::milliseconds(600));

    }
}