From 74d1e6a8da5c1dbac8f0eff3f1d353f9bcf24161 Mon Sep 17 00:00:00 2001 From: cmhu <2657262686@qq.com> Date: Wed, 3 Apr 2024 18:49:46 +0800 Subject: [PATCH] 完成gb28181大体的代码,未完成,bug可能很多 --- src/decoder/dvpp/DvppDecoder.cpp | 31 +++++++------------------------ src/decoder/dvpp/DvppStreamDecoder.cpp | 238 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------ src/decoder/dvpp/DvppStreamDecoder.h | 28 +++++++++++++++++++++++++--- src/decoder/gb28181/DvppGB28181Decoder.cpp | 370 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/decoder/gb28181/DvppGB28181Decoder.h | 106 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/decoder/gb28181/GB28181Provider.cpp | 502 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/decoder/gb28181/GB28181Provider.h | 103 ------------------------------------------------------------------------------------------------------- src/decoder/gb28181/main.cpp | 11 ++++------- src/decoder/gb28181/rtp/RTPReceiver.cpp | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------- src/decoder/gb28181/rtp/RTPReceiver.h | 8 ++++---- src/decoder/gb28181/rtp/RTPTcpReceiver.cpp | 116 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------------------------------- src/decoder/gb28181/rtp/RTPTcpReceiver.h | 2 +- src/decoder/gb28181/rtp/RTPUdpReceiver.cpp | 69 ++++++++++++++++++++++++++++++++++++++++++--------------------------- src/decoder/gb28181/rtp/RTPUdpReceiver.h | 6 +++++- src/decoder/gb28181/sip/SipServer.cpp | 15 ++++++--------- src/decoder/gb28181/sip/SipServer.h | 16 ++++++++++++---- 16 files changed, 929 insertions(+), 776 deletions(-) create mode 100644 src/decoder/gb28181/DvppGB28181Decoder.cpp create mode 100644 src/decoder/gb28181/DvppGB28181Decoder.h delete mode 100644 src/decoder/gb28181/GB28181Provider.cpp delete mode 100644 src/decoder/gb28181/GB28181Provider.h diff --git a/src/decoder/dvpp/DvppDecoder.cpp b/src/decoder/dvpp/DvppDecoder.cpp index df9aafd..6a9b1ec 100644 --- a/src/decoder/dvpp/DvppDecoder.cpp +++ b/src/decoder/dvpp/DvppDecoder.cpp @@ -156,6 +156,12 @@ AVCodecContext* DvppDecoder::init_FFmpeg(FFDecConfig config){ if (avcodec_parameters_to_context(avctx, codecpar) < 0) break; + int enType = getVdecType(codecpar->codec_id, codecpar->profile); + if(-1 == enType) { + break; + } + m_enType = static_cast(enType); + const AVBitStreamFilter * filter = nullptr; if(codecpar->codec_id == AV_CODEC_ID_H264){ filter = av_bsf_get_by_name("h264_mp4toannexb"); @@ -166,12 +172,6 @@ AVCodecContext* DvppDecoder::init_FFmpeg(FFDecConfig config){ break; } - int enType = getVdecType(codecpar->codec_id, codecpar->profile); - if(-1 == enType) { - break; - } - m_enType = static_cast(enType); - int ret = av_bsf_alloc(filter, &h264bsfc); if (ret < 0){ break; @@ -799,25 +799,8 @@ void DvppDecoder::doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicDesc *o // 换成解码后数据, 这里这样做的是为了保证解码一直持续进行,避免后续操作阻碍文件读取和解码从而导致花屏 DvppDataMemory* mem = nullptr; if (m_bResize && (width > 1920 || height > 1080)) { - float srcRatio = width / (float)height; - float stdRatio = 1920.0 / 1080.0f ; - int outWidth = 1920; - int outHeight = 1080; - if (srcRatio > stdRatio) { - outHeight = static_cast(outWidth * (float)height / width) ; - if (outHeight % 2 == 1) - { - outHeight += 1; - } - } else if (srcRatio < stdRatio) { - outWidth = static_cast(outHeight * (float)width / height) ; - if (outWidth % 2 == 1) - { - outWidth += 1; - } - } - mem = m_vpcUtils.resize(output, outWidth, outHeight); + mem = m_vpcUtils.resize(output, out_frame_width, out_frame_height); if (mem) { acldvppFree(outputDataDev); outputDataDev = nullptr; diff --git a/src/decoder/dvpp/DvppStreamDecoder.cpp b/src/decoder/dvpp/DvppStreamDecoder.cpp index f60c783..d9392ee 100644 --- a/src/decoder/dvpp/DvppStreamDecoder.cpp +++ b/src/decoder/dvpp/DvppStreamDecoder.cpp @@ -43,9 +43,12 @@ DvppStreamDecoder::DvppStreamDecoder(/* args */) DvppStreamDecoder::~DvppStreamDecoder() { + Close(); } -bool DvppStreamDecoder::init_vdpp(FFDecConfig cfg) { +bool DvppStreamDecoder::Init(FFDecConfig cfg) { + + m_dec_name = cfg.dec_name; LOG_INFO("[{}]- Init device start...", m_dec_name); @@ -74,6 +77,8 @@ bool DvppStreamDecoder::init_vdpp(FFDecConfig cfg) { m_vpcUtils.init(m_deviceId); + decode_finished_cbk = cfg.decode_finished_cbk; + LOG_INFO("[{}]- init vdpp success! device:{} channel:{}", m_dec_name, m_deviceId, m_dvpp_channel); return true; }while(0); @@ -235,7 +240,10 @@ void DvppStreamDecoder::doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicD } if(mem){ + m_decoded_data_queue_mtx.lock(); m_decoded_data_queue.push(mem); + m_decoded_data_queue_mtx.unlock(); + bCached = true; } } @@ -251,6 +259,18 @@ void DvppStreamDecoder::doVdppVdecCallBack(acldvppStreamDesc *input, acldvppPicD CHECK_AND_RETURN_NOVALUE(acldvppDestroyPicDesc(output), "acldvppDestroyPicDesc failed"); } +DvppDataMemory* DvppStreamDecoder::GetFrame() { + DvppDataMemory* mem = nullptr; + m_decoded_data_queue_mtx.lock(); + if (m_decoded_data_queue.size() > 0) { + mem = m_decoded_data_queue.front(); + m_decoded_data_queue.pop(); + } + m_decoded_data_queue_mtx.unlock(); + + return mem; +} + bool DvppStreamDecoder::sendVdecEos(aclvdecChannelDesc *vdecChannelDesc) { // create stream desc acldvppStreamDesc *streamInputDesc = acldvppCreateStreamDesc(); @@ -277,7 +297,7 @@ bool DvppStreamDecoder::sendVdecEos(aclvdecChannelDesc *vdecChannelDesc) { return true; } -int DvppStreamDecoder::sendPkt(aclvdecChannelDesc *vdecChannelDesc, AVPacket* pkt, unsigned long long frame_nb) { +int DvppStreamDecoder::sendPkt(aclvdecChannelDesc *vdecChannelDesc, AVPacket* pkt, unsigned long long frame_nb, int vdec_out_size) { void *vdecInputbuf = nullptr; void *vdecOutputBuf = nullptr; @@ -296,7 +316,7 @@ int DvppStreamDecoder::sendPkt(aclvdecChannelDesc *vdecChannelDesc, AVPacket* pk break; } - ret = acldvppMalloc((void **)&vdecOutputBuf, m_vdec_out_size); + ret = acldvppMalloc((void **)&vdecOutputBuf, vdec_out_size); if(ret != ACL_ERROR_NONE){ LOG_ERROR("[{}]- acldvppMalloc failed", m_dec_name); break; @@ -315,7 +335,7 @@ int DvppStreamDecoder::sendPkt(aclvdecChannelDesc *vdecChannelDesc, AVPacket* pk CHECK_AND_BREAK(acldvppSetStreamDescData(input_stream_desc, vdecInputbuf), "acldvppSetStreamDescData failed"); CHECK_AND_BREAK(acldvppSetStreamDescSize(input_stream_desc, pkt->size), "acldvppSetStreamDescSize failed"); CHECK_AND_BREAK(acldvppSetPicDescData(output_pic_desc, vdecOutputBuf), "acldvppSetPicDescData failed"); - CHECK_AND_BREAK(acldvppSetPicDescSize(output_pic_desc, m_vdec_out_size), "acldvppSetPicDescSize failed"); + CHECK_AND_BREAK(acldvppSetPicDescSize(output_pic_desc, vdec_out_size), "acldvppSetPicDescSize failed"); Vdec_CallBack_UserData *user_data = NULL; user_data = new Vdec_CallBack_UserData; @@ -362,21 +382,36 @@ int DvppStreamDecoder::sendPkt(aclvdecChannelDesc *vdecChannelDesc, AVPacket* pk return -1; } -int DvppStreamDecoder::Decode(int videoType, char* data, int len, int isKey, uint64_t pts) { +int DvppStreamDecoder::SendData(int videoType, char* data, int len, int isKey, uint64_t pts) { + if (m_bExit) { + return -1; + } + + if (m_DvppCacheCounter.load() > 20) { + // 解码器解码不过来。实时流在此处的处理会导致花屏,这是由于解码器性能问题导致,无法避免 + // 实时流在这里处理是为了避免长时间不读取数据导致数据中断 + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + return -3; + } + + int ret = aclrtSetCurrentContext(m_context); + if(ret != ACL_ERROR_NONE){ + LOG_ERROR("[{}]- aclrtSetCurrentContext failed", m_dec_name); + return -2; + } if (vdecChannelDesc == nullptr) { vdecChannelDesc = aclvdecCreateChannelDesc(); if (vdecChannelDesc == nullptr) { LOG_ERROR("[{}]- aclvdecCreateChannelDesc failed", m_dec_name); - return; + return -2; } - pthread_t report_thread; int ret = pthread_create(&report_thread, nullptr, ReportThd, (void *)this); if(ret != 0){ LOG_ERROR("[{}]- pthread_create failed", m_dec_name); - return; + return -2; } acldvppStreamFormat enType = getVdecType(videoType); @@ -391,20 +426,187 @@ int DvppStreamDecoder::Decode(int videoType, char* data, int len, int isKey, uin CHECK_AND_BREAK(aclvdecCreateChannel(vdecChannelDesc), "aclvdecCreateChannel failed"); } - if (vdecChannelDesc) + AVPacket* pkt = av_packet_alloc(); + av_init_packet(pkt); + + pkt->size = len; + pkt->data = (uint8_t*)data; + + int ret = -2; + + do { - m_frame_nb++; + int vdec_out_size = parse_stream_info(videoType, pkt); + if (vdec_out_size <= 0) { + ret = -4; + break; + } - AVPacket* pkt = av_packet_alloc(); - av_init_packet(pkt); + if (vdecChannelDesc) + { + ret = av_bsf_send_packet(h264bsfc, pkt); + if(ret < 0) { + LOG_ERROR("[{}]- av_bsf_send_packet error!", m_dec_name); + ret = -3; + break;; + } + + int nSended = -1; + while ((ret = av_bsf_receive_packet(h264bsfc, pkt)) == 0) { + if(!m_bExit){ + break; + } + + m_frame_nb++; + + // dvpp 解码 + nSended = sendPkt(vdecChannelDesc, pkt, m_frame_nb, vdec_out_size); + } + + if(nSended < 0) { + // 执行出错,强行结束整个任务 + ret = -2; + } + + ret = 0; + } + } while (0); + + av_packet_free(&pkt); + pkt = nullptr; + + return ret; +} + + +int DvppStreamDecoder::parse_stream_info(int videoType, AVPacket* pkt) { + if (m_vdec_out_size > 0) { + return m_vdec_out_size; + } + + m_vdec_out_size = -1; + + AVCodecContext* avctx = nullptr; + const AVCodec* pAVCodec = nullptr; - pkt->size = len; - pkt->data = (uint8_t*)data; + try + { + if (0 == videoType) { + pAVCodec = avcodec_find_decoder(AV_CODEC_ID_H264); + LOG_INFO("m_avCodecName is H264"); + } else if (1 == videoType) { + pAVCodec = avcodec_find_decoder(AV_CODEC_ID_H265); + LOG_INFO("m_avCodecName is H265"); + } else{ + LOG_INFO("m_avCodecName is unknown, videoType is {}", videoType); + } + + if (!pAVCodec) { + LOG_ERROR("frameCallback frame decode error, ERROR_DECODER_NOT_FOUND"); + throw -2; + } + + avctx = avcodec_alloc_context3(pAVCodec); + + if (avcodec_open2(avctx, pAVCodec, nullptr) < 0) { + LOG_ERROR("avcodec_open2 failed!"); + throw -2; + } + + //开始解码 + int ret = avcodec_send_packet(avctx, pkt); + if (ret < 0) { + LOG_ERROR("Real stream视频解码失败,请检查视频设备{}: avcodec_send_packet failed. ret={}", m_dec_name, ret); + throw -3; + } + + if (frameW < 1) { + frameW = avctx->width; + frameH = avctx->height; + if (frameW <= 0 || frameH <= 0) { + LOG_ERROR("[{}] frame W or H is error! ({},{})", m_dec_name, frameW, frameH); + throw -1; + } + + const AVBitStreamFilter * filter = nullptr; + if(VIDEO_TYPE_H264 == videoType){ + filter = av_bsf_get_by_name("h264_mp4toannexb"); + }else if(VIDEO_TYPE_H265 == videoType){ + filter = av_bsf_get_by_name("hevc_mp4toannexb"); + }else { + LOG_ERROR("[{}]- codec_id is not supported!", m_dec_name); + throw -4; + } + + int ret = av_bsf_alloc(filter, &h264bsfc); + if (ret < 0){ + LOG_ERROR("av_bsf_alloc failed!"); + throw -2; + } + + avcodec_parameters_from_context(h264bsfc->par_in, avctx); + av_bsf_init(h264bsfc); + } + + m_fps = av_q2d(avctx->framerate); - // dvpp 解码 - int nSended = sendPkt(vdecChannelDesc, pkt, m_frame_nb); + AVFrame* frame = av_frame_alloc(); + do + { + ret = avcodec_receive_frame(avctx, frame); + if ((ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) || ret < 0){ + LOG_ERROR("{} - Failed to receive frame: {}", m_dec_name, ret); + break; + } + + if (frame->width != frameW || frame->height != frameH){ + LOG_ERROR("AVFrame is inconsistent: width is {}, height is {}; original frameW is {}, frameH is {}--{}", frame->width, frame->height, frameW, frameH , m_dec_name); + break; + } + + m_vdec_out_size = frame->width * frame->height * 3 / 2; + } while (0); + + av_frame_free(&frame); + frame = nullptr; + + } + catch(const int& iError) { + m_vdec_out_size = iError; + } catch(...) { + m_vdec_out_size = -1; + } + + if(avctx){ + avcodec_free_context(&avctx); + avctx = nullptr; + } + + return m_vdec_out_size; +} + +void DvppStreamDecoder::Close() { + m_bExit = true; + + if (vdecChannelDesc) { + sendVdecEos(vdecChannelDesc); + + CHECK_NOT_RETURN(aclvdecDestroyChannel(vdecChannelDesc), "aclvdecDestroyChannel failed"); + CHECK_NOT_RETURN(aclvdecDestroyChannelDesc(vdecChannelDesc), "aclvdecDestroyChannelDesc failed"); + vdecChannelDesc = nullptr; + + m_bExitReportThd = true; + CHECK_NOT_RETURN(pthread_join(report_thread, nullptr), "report_thread join failed"); + } + + release_dvpp(); + + if(h264bsfc){ + av_bsf_free(&h264bsfc); + h264bsfc = nullptr; + } - av_packet_free(&pkt); - pkt = nullptr; + if(decode_finished_cbk) { + decode_finished_cbk(m_finishedDecArg); } } \ No newline at end of file diff --git a/src/decoder/dvpp/DvppStreamDecoder.h b/src/decoder/dvpp/DvppStreamDecoder.h index d63cf72..691b3a4 100644 --- a/src/decoder/dvpp/DvppStreamDecoder.h +++ b/src/decoder/dvpp/DvppStreamDecoder.h @@ -14,9 +14,13 @@ public: DvppStreamDecoder(/* args */); ~DvppStreamDecoder(); - bool init_vdpp(FFDecConfig cfg); + bool Init(FFDecConfig cfg); - DataFrame* Decode(int videoType, char* data, int len, int isKey, uint64_t pts); + int SendData(int videoType, char* data, int len, int isKey, uint64_t pts); + + void Close(); + + DvppDataMemory* GetFrame(); public: void doProcessReport(); @@ -24,7 +28,9 @@ public: private: bool sendVdecEos(aclvdecChannelDesc *vdecChannelDesc); - int sendPkt(aclvdecChannelDesc *vdecChannelDesc, AVPacket* pkt, unsigned long long frame_nb); + int sendPkt(aclvdecChannelDesc *vdecChannelDesc, AVPacket* pkt, unsigned long long frame_nb, int vdec_out_size); + + int parse_stream_info(int videoType, AVPacket* pkt); private: string m_dec_name {""}; @@ -34,12 +40,28 @@ private: aclrtContext m_context{nullptr}; aclvdecChannelDesc *vdecChannelDesc {nullptr}; + pthread_t report_thread; + bool m_bExitReportThd{false}; int m_vdec_out_size {-1}; + int m_fps {-1}; + int frameW {-1}; + int frameH {-1}; VpcUtils m_vpcUtils; unsigned long m_frame_nb {0}; + std::atomic m_DvppCacheCounter{0}; + + queue m_decoded_data_queue; + mutex m_decoded_data_queue_mtx; + + bool m_bExit {false}; + + const void * m_finishedDecArg {nullptr}; + DECODE_FINISHED_CALLBACK decode_finished_cbk {nullptr}; + + AVBSFContext * h264bsfc{nullptr}; }; diff --git a/src/decoder/gb28181/DvppGB28181Decoder.cpp b/src/decoder/gb28181/DvppGB28181Decoder.cpp new file mode 100644 index 0000000..134fde1 --- /dev/null +++ b/src/decoder/gb28181/DvppGB28181Decoder.cpp @@ -0,0 +1,370 @@ +//#include "LOG_manager.h" +#include +#include "DvppGB28181Decoder.h" + + + +extern "C" { + #include "libavutil/avstring.h" + #include "libavformat/avformat.h" + #include "libswscale/swscale.h" +} + +#include"RTPTcpReceiver.h" +#include"RTPUdpReceiver.h" + +#include + +#include "common_header.h" + +// #include "../nvdec/FFCuContextManager.h" +// #include "../nvdec/GpuRgbMemory.hpp" +// #include "../nvdec/cuda_kernels.h" + +#define ECLOSED 0 +#define ECLOSING 1 +#define ERUNNING 2 +#define EPAUSE 3 + +static void RTP_Stream_CallBack(void* userdata, int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts) +{ + DvppGB28181Decoder* decoder = (DvppGB28181Decoder*)userdata; + decoder->stream_callback(videoType, data, len, isKey, pts, localPts); +} + +static void RTP_Stream_End_CallBack(void* userdata) +{ + DvppGB28181Decoder* decoder = (DvppGB28181Decoder*)userdata; + decoder->stream_end_callback(); +} + +DvppGB28181Decoder::DvppGB28181Decoder() { + m_frameSkip = 1; + m_dec_keyframe = false; + m_post_decode_thread = 0; +} + +DvppGB28181Decoder::~DvppGB28181Decoder() +{ + close(); + + if (m_pAVCodecCtx) { + avcodec_close(m_pAVCodecCtx); + avcodec_free_context(&m_pAVCodecCtx); + } + + m_dec_keyframe = false; + + LOG_INFO("destroy OK--{}", m_dec_name); +} + +void DvppGB28181Decoder::close(){ + if (m_status == ECLOSED || m_status == ECLOSING) return ; + + m_status = ECLOSING; + + LOG_DEBUG("real decode thread exit success 1--{}", m_dec_name); + + if(nullptr != m_rtpPtr){ + if (m_rtpPtr->IsOpened()) { + m_rtpPtr->Close(); + LOG_DEBUG("real decode thread exit success 2--{}", m_dec_name); + } + + delete m_rtpPtr; + m_rtpPtr = nullptr; + } + + if (gpu_options) av_dict_free(&gpu_options); + + if (m_post_decode_thread != 0) + { + pthread_join(m_post_decode_thread,0); + } + + while(mFrameQueue.size() > 0){ + AVFrame * gpuFrame = mFrameQueue.front(); + av_frame_free(&gpuFrame); + mFrameQueue.pop(); + } + + m_status = ECLOSED; + + LOG_INFO("解码器关闭成功 --{}", m_dec_name); +} + +bool DvppGB28181Decoder::init(FFDecConfig& cfg){ + if(cfg.force_tcp){ + m_rtpPtr = new RTPTcpReceiver(); + }else{ + m_rtpPtr = new RTPUdpReceiver(); + } + if(nullptr == m_rtpPtr){ + return false; + } + + m_dec_name = cfg.uri; + m_frameSkip = cfg.skip_frame; + if (m_frameSkip < 1) m_frameSkip = 1; + + m_gpuid = atoi(cfg.gpuid.c_str()); + + m_rtpPtr->SetOutputCallback(RTP_Stream_CallBack, this); + m_rtpPtr->SetVodEndCallback(RTP_Stream_End_CallBack, this); + + post_decoded_cbk = cfg.post_decoded_cbk; + decode_finished_cbk = cfg.decode_finished_cbk; + + if (!streamDecoder.Init(cfg)) { + return false; + } + + m_cfg = cfg; + + LOG_INFO("init - {} ", m_dec_name); + + return true; +} + +bool DvppGB28181Decoder::start() { + + m_status = ERUNNING; + + bool bRet = m_rtpPtr->Open(m_dec_name); + if(bRet){ + pthread_create(&m_post_decode_thread,0, + [](void* arg) + { + DvppGB28181Decoder* a=(DvppGB28181Decoder*)arg; + a->display_thread(); + return (void*)0; + } + ,this); + } + + LOG_ERROR("[{}] - rtp receiver open failed !", m_dec_name); + + return bRet; +} + +void DvppGB28181Decoder::setDecKeyframe(bool bKeyframe){ + m_dec_keyframe = bKeyframe; +} + +void DvppGB28181Decoder::stream_callback(int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts) { + if (m_status == EPAUSE) return; + + // 若设置为关键帧解码,非关键帧数据直接返回 + if(m_dec_keyframe && !isKey) return; + + if (len <= 0) { + if (data == nullptr && pts == -1) { + LOG_INFO("frame callback EOF!"); + post_decoded_cbk(m_postDecArg, nullptr); + return ; + } + LOG_INFO("frame data is zero --{}", m_dec_name); + return; + } + + streamDecoder.SendData(videoType, data, len, isKey, pts); + + // AVPacket* pkt = av_packet_alloc(); + // av_init_packet(pkt); + + // pkt->size = len; + // pkt->data = (uint8_t*)data; + + // // ffmpeg 解码 + // ff_decode(videoType, pkt); + + // av_packet_free(&pkt); + // pkt = nullptr; + +} + +int DvppGB28181Decoder::ff_decode(int videoType, AVPacket* pkt) { + + if (m_pAVCodecCtx == nullptr) { + LOG_INFO("frame data is zero --{}", m_dec_name); + if (VIDEO_TYPE_H264 == videoType) { + if (m_gpuid >= 0){ + m_pAVCodec = avcodec_find_decoder_by_name("h264_cuvid"); + } + else{ + m_pAVCodec = avcodec_find_decoder(AV_CODEC_ID_H264); + } + LOG_INFO("m_avCodecName is H264"); + } + else if (VIDEO_TYPE_H265 == videoType) + { + if (m_gpuid >= 0){ + m_pAVCodec = avcodec_find_decoder_by_name("hevc_cuvid"); + } + else{ + m_pAVCodec = avcodec_find_decoder(AV_CODEC_ID_H265); + } + LOG_INFO("m_avCodecName is H265"); + } + else{ + LOG_INFO("m_avCodecName is unknown, videoType is {}", videoType); + } + + if (!m_pAVCodec) + { + LOG_ERROR("frameCallback frame decode error, ERROR_DECODER_NOT_FOUND"); + return; + } + + m_pAVCodecCtx = avcodec_alloc_context3(m_pAVCodec); + + if (m_gpuid >= 0) { + char gpui[8] = { 0 }; + sprintf(gpui, "%d", m_gpuid); + av_dict_set(&gpu_options, "gpu", gpui, 0); + + m_pAVCodecCtx->get_format = get_hw_format; + + FFCuContextManager* pCtxMgr = FFCuContextManager::getInstance(); + m_pAVCodecCtx->hw_device_ctx = av_buffer_ref(pCtxMgr->getCuCtx(gpui)); + if (nullptr == m_pAVCodecCtx->hw_device_ctx){ + // TODO 这里应该抛出错误 + return ; + } + } + + if (avcodec_open2(m_pAVCodecCtx, m_pAVCodec, &gpu_options) < 0) + return; + } + + //开始解码 + int ret = avcodec_send_packet(m_pAVCodecCtx, pkt); + if (ret < 0) { + //send_exception(RunMessageType::E2002, e_msg); + LOG_ERROR("Real stream视频解码失败,请检查视频设备{}: avcodec_send_packet failed. ret={}", m_dec_name, ret); + return; + } + + if (frameW < 1) { + frameW = m_pAVCodecCtx->width; + frameH = m_pAVCodecCtx->height; + if (frameW <= 0 || frameH <= 0) { + LOG_ERROR("[{}] frame W or H is error! ({},{})", m_dec_name, frameW, frameH); + return; + } + } + + m_fps = av_q2d(m_pAVCodecCtx->framerate); + + AVFrame* gpuFrame = av_frame_alloc(); + ret = avcodec_receive_frame(m_pAVCodecCtx, gpuFrame); + if ((ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) || ret < 0){ + LOG_ERROR("{} - Failed to receive frame: {}", m_dec_name, ret); + av_frame_free(&gpuFrame); + gpuFrame = nullptr; + return; + } + + if (gpuFrame->width != frameW || gpuFrame->height != frameH){ + LOG_INFO("AVFrame is inconsistent: width is {}, height is {}; original frameW is {}, frameH is {}--{}", gpuFrame->width, gpuFrame->height, frameW, frameH , m_dec_name); + av_frame_free(&gpuFrame); + gpuFrame = nullptr; + return; + } + + av_frame_free(&gpuFrame); + gpuFrame = nullptr; +} + +void DvppGB28181Decoder::display_thread(){ + + int index = 0; + while (isRunning()) + { + auto mem = streamDecoder.GetFrame(); + if(mem) { + if ((m_frameSkip == 1 || index % m_frameSkip == 0) && post_decoded_cbk){ + post_decoded_cbk(m_postDecArg, mem); + } + + index++; + if(index >= 100000){ + index = 0; + } + } else { + delete mem; + mem = nullptr; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + LOG_INFO("display thread exited."); +} + +void DvppGB28181Decoder::stream_end_callback() +{ + LOG_INFO("send_video_eof--{}", m_dec_name); + + decode_finished_cbk(m_finishedDecArg); + + return; +} + +void DvppGB28181Decoder::setPostDecArg(const void* postDecArg){ + m_postDecArg = postDecArg; +} + +void DvppGB28181Decoder::setFinishedDecArg(const void* finishedDecArg){ + m_finishedDecArg = finishedDecArg; +} + +void DvppGB28181Decoder::pause() { + m_status = EPAUSE; + LOG_INFO("pause --{}", m_dec_name); +} + +void DvppGB28181Decoder::resume() { + m_status = ERUNNING; + LOG_INFO("resume --{}", m_dec_name); +} + +bool DvppGB28181Decoder::isRunning(){ + if (m_status == ECLOSED || m_status == ECLOSING){ + return false; + } + return true; +} + +bool DvppGB28181Decoder::isFinished(){ + if (m_status == ECLOSED || m_status == ECLOSING){ + return true; + } + return false; +} + +bool DvppGB28181Decoder::isPausing(){ + if (m_status == EPAUSE){ + return true; + } + return false; +} + +bool DvppGB28181Decoder::getResolution( int &width, int &height ){ + width = frameW; + height = frameH; + return true; +} + +float DvppGB28181Decoder::fps() { + return m_fps; +} + +bool DvppGB28181Decoder::isSurport(FFDecConfig& cfg){ + // 由于是否支持需要在拿到数据后才能断定,无法事先判断,所以这个地方默认返回true + return true; +} + +int DvppGB28181Decoder::getCachedQueueLength(){ + return m_rtpPtr->GetPsFrameListSize(); +} \ No newline at end of file diff --git a/src/decoder/gb28181/DvppGB28181Decoder.h b/src/decoder/gb28181/DvppGB28181Decoder.h new file mode 100644 index 0000000..6c36057 --- /dev/null +++ b/src/decoder/gb28181/DvppGB28181Decoder.h @@ -0,0 +1,106 @@ +#ifndef _GB28181_DECODER_H_ +#define _GB28181_DECODER_H_ + +#include +#include + +#include "RTPReceiver.h" +#include "../dvpp/DvppStreamDecoder.h" + +#include "common_header.h" +#include "../interface/AbstractDecoder.h" + + +struct AVFormatContext; +struct AVCodecContext; +struct AVCodec; +struct AVFrame; +struct AVDictionary; +struct AVPacket; + +using namespace std; + +class DvppGB28181Decoder: public AbstractDecoder +{ +public: + DvppGB28181Decoder(); + ~DvppGB28181Decoder(); + + bool init(FFDecConfig& cfg); + void close(); + bool start(); + void pause(); + void resume(); + + void setDecKeyframe(bool bKeyframe); + + bool isRunning(); + bool isFinished(); + bool isPausing(); + bool getResolution( int &width, int &height ); + + bool isSurport(FFDecConfig& cfg); + + int getCachedQueueLength(); + + float fps(); + + DECODER_TYPE getDecoderType(){ return DECODER_TYPE_GB28181; } + + void setName(string nm){ + m_dec_name = nm; + } + + string getName(){ + return m_dec_name; + } + + void setPostDecArg(const void* postDecArg); + void setFinishedDecArg(const void* finishedDecArg); + +public: + void stream_callback(int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts); + void stream_end_callback(); + void display_thread(); + +private: + int ff_decode(int videoType, AVPacket* pkt); + +private: + string m_dec_name; // 必须为28181编码 + FFDecConfig m_cfg; + + RTPReceiver* m_rtpPtr {nullptr}; + + uint64_t m_startPts {}; + uint64_t m_lastPts {}; //上一次pts的值 + uint64_t m_curPts {}; //当前的pts值 + uint64_t m_diffPts {}; + + uint32_t frameW {}, frameH {}; + float m_fps {}; + int m_frameSkip {}; + + int log_count {}; + + std::atomic_int m_status {}; + + pthread_t m_post_decode_thread; + const void * m_postDecArg; + POST_DECODE_CALLBACK post_decoded_cbk; // 解码数据回调接口 + + const void * m_finishedDecArg; + DECODE_FINISHED_CALLBACK decode_finished_cbk; + + queue mFrameQueue; + mutex m_queue_mutex; + mutex m_snapshot_mutex; + + bool m_dec_keyframe; + + DvppStreamDecoder streamDecoder; + + int m_gpuid {0}; +}; + +#endif // _GB28181_DECODER_H_ diff --git a/src/decoder/gb28181/GB28181Provider.cpp b/src/decoder/gb28181/GB28181Provider.cpp deleted file mode 100644 index abdc914..0000000 --- a/src/decoder/gb28181/GB28181Provider.cpp +++ /dev/null @@ -1,502 +0,0 @@ -//#include "LOG_manager.h" -#include -#include "FFGB28181Decoder.h" - - - -extern "C" { - #include "libavutil/avstring.h" - #include "libavformat/avformat.h" - #include "libswscale/swscale.h" -} - -#include"RTPTcpReceiver.h" -#include"RTPUdpReceiver.h" - -#include - -#include "common_header.h" - -#include "../nvdec/FFCuContextManager.h" -#include "../nvdec/GpuRgbMemory.hpp" -#include "../nvdec/cuda_kernels.h" - -#define ECLOSED 0 -#define ECLOSING 1 -#define ERUNNING 2 -#define EPAUSE 3 - -static void RTP_Stream_CallBack(void* userdata, int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts) -{ - FFGB28181Decoder* decoder = (FFGB28181Decoder*)userdata; - decoder->stream_callback(videoType, data, len, isKey, pts, localPts); -} - -static void RTP_Stream_End_CallBack(void* userdata) -{ - FFGB28181Decoder* decoder = (FFGB28181Decoder*)userdata; - decoder->stream_end_callback(); -} - -FFGB28181Decoder::FFGB28181Decoder() { - m_frameSkip = 1; - m_port = -1; - m_dec_keyframe = false; - m_post_decode_thread = 0; -} - -FFGB28181Decoder::~FFGB28181Decoder() -{ - close(); - - if (m_pAVCodecCtx) { - avcodec_close(m_pAVCodecCtx); - avcodec_free_context(&m_pAVCodecCtx); - } - - m_dec_keyframe = false; - - LOG_INFO("destroy OK--{}", m_dec_name); -} - -void FFGB28181Decoder::close(){ - if (m_status == ECLOSED || m_status == ECLOSING) return ; - - m_status = ECLOSING; - - LOG_DEBUG("real decode thread exit success 1--{}", m_dec_name); - - if(nullptr != m_rtpPtr){ - if (m_rtpPtr->IsOpened()) { - m_rtpPtr->Close(); - LOG_DEBUG("real decode thread exit success 2--{}", m_dec_name); - } - - delete m_rtpPtr; - m_rtpPtr = nullptr; - } - - if (gpu_options) av_dict_free(&gpu_options); - - if (m_post_decode_thread != 0) - { - pthread_join(m_post_decode_thread,0); - } - - while(mFrameQueue.size() > 0){ - AVFrame * gpuFrame = mFrameQueue.front(); - av_frame_free(&gpuFrame); - mFrameQueue.pop(); - } - - m_status = ECLOSED; - - LOG_INFO("解码器关闭成功 --{}", m_dec_name); -} - -bool FFGB28181Decoder::init(FFDecConfig& cfg){ - if(cfg.force_tcp){ - m_rtpPtr = new RTPTcpReceiver(); - }else{ - m_rtpPtr = new RTPUdpReceiver(); - } - if(nullptr == m_rtpPtr){ - return false; - } - - m_dec_name = cfg.uri; - m_frameSkip = cfg.skip_frame; - if (m_frameSkip < 1) m_frameSkip = 1; - - m_gpuid = atoi(cfg.gpuid.c_str()); - - m_rtpPtr->SetDeviceID(m_dec_name); - - if(cfg.request_stream_cbk == nullptr){ - LOG_INFO("request_stream_cbk 为 nullptr -- {}", m_dec_name); - return false; - } - - post_decoded_cbk = cfg.post_decoded_cbk; - decode_finished_cbk = cfg.decode_finished_cbk; - m_rtpPtr->SetRequestStreamCallback(cfg.request_stream_cbk); - - m_port = cfg.port; - - m_cfg = cfg; - - LOG_INFO("init - {} : ", m_dec_name, m_port); - - return true; -} - -bool FFGB28181Decoder::start() { - - m_status = ERUNNING; - - m_rtpPtr->SetOutputCallback(RTP_Stream_CallBack, this); - m_rtpPtr->SetVodEndCallback(RTP_Stream_End_CallBack, this); - - LOG_INFO("start - {} {}: ", m_dec_name, m_port); - - bool bRet = m_rtpPtr->Open((uint16_t)m_port); - if(bRet){ - pthread_create(&m_post_decode_thread,0, - [](void* arg) - { - FFGB28181Decoder* a=(FFGB28181Decoder*)arg; - a->post_decode_thread(); - return (void*)0; - } - ,this); - } - return bRet; -} - -void FFGB28181Decoder::setDecKeyframe(bool bKeyframe){ - m_dec_keyframe = bKeyframe; -} - -void FFGB28181Decoder::stream_callback(int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts) { - if (m_status == EPAUSE) return; - - // 若设置为关键帧解码,非关键帧数据直接返回 - if(m_dec_keyframe && !isKey) return; - - if (len <= 0) { - if (data == nullptr && pts == -1) { - LOG_INFO("frame callback EOF!"); - post_decoded_cbk(m_postDecArg, nullptr); - return ; - } - LOG_INFO("frame data is zero --{}", m_dec_name); - return; - } - - AVPacket* pkt = av_packet_alloc(); - av_init_packet(pkt); - - pkt->size = len; - pkt->data = (uint8_t*)data; - - // ffmpeg 解码 - ff_decode(videoType, pkt); - - av_packet_free(&pkt); - pkt = nullptr; - -} - -int FFGB28181Decoder::ff_decode(int videoType, AVPacket* pkt) { - if (m_pAVCodecCtx == nullptr) { - LOG_INFO("frame data is zero --{}", m_dec_name); - if (VIDEO_TYPE_H264 == videoType) { - if (m_gpuid >= 0){ - m_pAVCodec = avcodec_find_decoder_by_name("h264_cuvid"); - } - else{ - m_pAVCodec = avcodec_find_decoder(AV_CODEC_ID_H264); - } - LOG_INFO("m_avCodecName is H264"); - } - else if (VIDEO_TYPE_H265 == videoType) - { - if (m_gpuid >= 0){ - m_pAVCodec = avcodec_find_decoder_by_name("hevc_cuvid"); - } - else{ - m_pAVCodec = avcodec_find_decoder(AV_CODEC_ID_H265); - } - LOG_INFO("m_avCodecName is H265"); - } - else{ - LOG_INFO("m_avCodecName is unknown, videoType is {}", videoType); - } - - if (!m_pAVCodec) - { - LOG_ERROR("frameCallback frame decode error, ERROR_DECODER_NOT_FOUND"); - return; - } - - m_pAVCodecCtx = avcodec_alloc_context3(m_pAVCodec); - - if (m_gpuid >= 0) { - char gpui[8] = { 0 }; - sprintf(gpui, "%d", m_gpuid); - av_dict_set(&gpu_options, "gpu", gpui, 0); - - m_pAVCodecCtx->get_format = get_hw_format; - - FFCuContextManager* pCtxMgr = FFCuContextManager::getInstance(); - m_pAVCodecCtx->hw_device_ctx = av_buffer_ref(pCtxMgr->getCuCtx(gpui)); - if (nullptr == m_pAVCodecCtx->hw_device_ctx){ - // TODO 这里应该抛出错误 - return ; - } - } - - if (avcodec_open2(m_pAVCodecCtx, m_pAVCodec, &gpu_options) < 0) - return; - } - - //开始解码 - int ret = avcodec_send_packet(m_pAVCodecCtx, pkt); - if (ret < 0) { - //send_exception(RunMessageType::E2002, e_msg); - LOG_ERROR("Real stream视频解码失败,请检查视频设备{}: avcodec_send_packet failed. ret={}", m_dec_name, ret); - return; - } - - if (frameW < 1) { - frameW = m_pAVCodecCtx->width; - frameH = m_pAVCodecCtx->height; - if (frameW <= 0 || frameH <= 0) { - LOG_ERROR("[{}] frame W or H is error! ({},{})", m_dec_name, frameW, frameH); - av_packet_free(&pkt); - pkt = nullptr; - return; - } - } - // m_fps = m_pAVCodecCtx->pkt_timebase.den == 0 ? 25.0 : av_q2d(m_pAVCodecCtx->pkt_timebase); - m_fps = av_q2d(m_pAVCodecCtx->framerate); - // LOG_DEBUG("frameW {}--frameH {}", frameW, frameH); - - AVFrame* gpuFrame = av_frame_alloc(); - ret = avcodec_receive_frame(m_pAVCodecCtx, gpuFrame); - if ((ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) || ret < 0){ - LOG_ERROR("{} - Failed to receive frame: {}", m_dec_name, ret); - av_frame_free(&gpuFrame); - gpuFrame = nullptr; - return; - } - - if (gpuFrame->width != frameW || gpuFrame->height != frameH){ - LOG_INFO("AVFrame is inconsistent: width is {}, height is {}; original frameW is {}, frameH is {}--{}", gpuFrame->width, gpuFrame->height, frameW, frameH , m_dec_name); - av_frame_free(&gpuFrame); - gpuFrame = nullptr; - return; - } - - m_queue_mutex.lock(); - if(mFrameQueue.size() <= 10){ - mFrameQueue.push(gpuFrame); - }else{ - av_frame_free(&gpuFrame); - gpuFrame = nullptr; - } - m_queue_mutex.unlock(); -} - -void FFGB28181Decoder::post_decode_thread(){ - - int index = 0; - while (isRunning()) - { - if(mFrameQueue.size() > 0){ - std::lock_guard l(m_snapshot_mutex); - // 取队头数据 - m_queue_mutex.lock(); - AVFrame * gpuFrame = mFrameQueue.front(); - mFrameQueue.pop(); - m_queue_mutex.unlock(); - // 跳帧 - if (m_frameSkip == 1 || index % m_frameSkip == 0){ - post_decoded_cbk(m_postDecArg, convert2bgr(gpuFrame)); - } - - av_frame_free(&gpuFrame); - gpuFrame = nullptr; - - index++; - if(index >= 100000){ - index = 0; - } - } - } - - LOG_INFO("post decode thread exited."); -} - -void FFGB28181Decoder::stream_end_callback() -{ - LOG_INFO("send_video_eof--{}", m_dec_name); - - decode_finished_cbk(m_finishedDecArg); - - return; -} - -void FFGB28181Decoder::setPostDecArg(const void* postDecArg){ - m_postDecArg = postDecArg; -} - -void FFGB28181Decoder::setFinishedDecArg(const void* finishedDecArg){ - m_finishedDecArg = finishedDecArg; -} - -void FFGB28181Decoder::pause() { - m_status = EPAUSE; - LOG_INFO("pause --{}", m_dec_name); -} - -void FFGB28181Decoder::resume() { - m_status = ERUNNING; - LOG_INFO("resume --{}", m_dec_name); -} - -bool FFGB28181Decoder::isRunning(){ - if (m_status == ECLOSED || m_status == ECLOSING){ - return false; - } - return true; -} - -bool FFGB28181Decoder::isFinished(){ - if (m_status == ECLOSED || m_status == ECLOSING){ - return true; - } - return false; -} - -bool FFGB28181Decoder::isPausing(){ - if (m_status == EPAUSE){ - return true; - } - return false; -} - -bool FFGB28181Decoder::getResolution( int &width, int &height ){ - width = frameW; - height = frameH; - return true; -} - -float FFGB28181Decoder::fps() { - return m_fps; -} - -bool FFGB28181Decoder::isSurport(FFDecConfig& cfg){ - // 由于是否支持需要在拿到数据后才能断定,无法事先判断,所以这个地方默认返回true - return true; -} - -int FFGB28181Decoder::getCachedQueueLength(){ - return m_rtpPtr->GetPsFrameListSize(); -} - -DeviceRgbMemory* FFGB28181Decoder::convert2bgr(AVFrame * gpuFrame){ - if (gpuFrame != nullptr && gpuFrame->format == AV_PIX_FMT_CUDA ){ - LOG_DEBUG("decode task: gpuid: {} width: {} height: {}", m_cfg.gpuid, gpuFrame->width, gpuFrame->height); - GpuRgbMemory* gpuMem = new GpuRgbMemory(3, gpuFrame->width, gpuFrame->height, getName(), m_cfg.gpuid, false, true); - - do{ - if (gpuMem->getMem() == nullptr){ - LOG_ERROR("new GpuRgbMemory failed !!!"); - break; - } - - cudaSetDevice(atoi(m_cfg.gpuid.c_str())); - cuda_common::setColorSpace( ITU_709, 0 ); - cudaError_t cudaStatus = cuda_common::CUDAToBGR((CUdeviceptr)gpuFrame->data[0],(CUdeviceptr)gpuFrame->data[1], gpuFrame->linesize[0], gpuFrame->linesize[1], gpuMem->getMem(), gpuFrame->width, gpuFrame->height); - cudaDeviceSynchronize(); - if (cudaStatus != cudaSuccess) { - LOG_ERROR("CUDAToBGR failed failed !!!"); - break; - } - - return gpuMem; - }while(0); - - delete gpuMem; - gpuMem = nullptr; - } - - return nullptr; -} - -FFImgInfo* FFGB28181Decoder::snapshot(){ - - // 锁住停止队列消耗 - std::lock_guard l(m_snapshot_mutex); - - AVFrame * gpuFrame = nullptr; - - bool bFirst = true; - while(true){ - m_queue_mutex.lock(); - if(mFrameQueue.size() <= 0){ - m_queue_mutex.unlock(); - if(bFirst){ - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - bFirst = false; - continue; - }else{ - // 再进来说明前面已经等了 100 ms - // 100 ms都没有等到解码数据,则退出 - return nullptr; - } - } - - // 队列中数据大于1 - gpuFrame = mFrameQueue.front(); - m_queue_mutex.unlock(); - break; - } - - if (gpuFrame != nullptr && gpuFrame->format == AV_PIX_FMT_CUDA ){ - LOG_DEBUG("decode task: gpuid: {} width: {} height: {}", m_cfg.gpuid, gpuFrame->width, gpuFrame->height); - GpuRgbMemory* gpuMem = new GpuRgbMemory(3, gpuFrame->width, gpuFrame->height, getName(), m_cfg.gpuid , false, true); - - if (gpuMem->getMem() == nullptr){ - LOG_ERROR("new GpuRgbMemory failed !!!"); - return nullptr; - } - - cudaSetDevice(atoi(m_cfg.gpuid.c_str())); - cuda_common::setColorSpace( ITU_709, 0 ); - cudaError_t cudaStatus = cuda_common::CUDAToBGR((CUdeviceptr)gpuFrame->data[0],(CUdeviceptr)gpuFrame->data[1], gpuFrame->linesize[0], gpuFrame->linesize[1], gpuMem->getMem(), gpuFrame->width, gpuFrame->height); - cudaDeviceSynchronize(); - if (cudaStatus != cudaSuccess) { - LOG_ERROR("CUDAToBGR failed failed !!!"); - return nullptr; - } - - unsigned char * pHwRgb = gpuMem->getMem(); - int channel = gpuMem->getChannel(); - int width = gpuMem->getWidth(); - int height = gpuMem->getHeight(); - - if (pHwRgb != nullptr && channel > 0 && width > 0 && height > 0){ - int nSize = channel * height * width; - - LOG_INFO("channel:{} height:{} width:{}", channel, height, width); - // unsigned char* cpu_data = new unsigned char[nSize]; - - unsigned char* cpu_data = (unsigned char *)av_malloc(nSize * sizeof(unsigned char)); - - cudaMemcpy(cpu_data, pHwRgb, nSize * sizeof(unsigned char), cudaMemcpyDeviceToHost); - cudaDeviceSynchronize(); - - delete gpuMem; - gpuMem = nullptr; - - FFImgInfo* imgInfo = new FFImgInfo(); - imgInfo->dec_name = m_dec_name; - imgInfo->pData = cpu_data; - imgInfo->height = height; - imgInfo->width = width; - imgInfo->timestamp = UtilTools::get_cur_time_ms(); - imgInfo->index = m_index; - - m_index++; - - return imgInfo; - } - - delete gpuMem; - gpuMem = nullptr; - } - - return nullptr; -} \ No newline at end of file diff --git a/src/decoder/gb28181/GB28181Provider.h b/src/decoder/gb28181/GB28181Provider.h deleted file mode 100644 index 5cabf77..0000000 --- a/src/decoder/gb28181/GB28181Provider.h +++ /dev/null @@ -1,103 +0,0 @@ -#ifndef _GB28181_DECODER_H_ -#define _GB28181_DECODER_H_ - -#include "RTPReceiver.h" - -#include "common_header.h" -#include "../interface/AbstractDecoder.h" - -#include -#include - -struct AVFormatContext; -struct AVCodecContext; -struct AVCodec; -struct AVFrame; -struct AVDictionary; - -using namespace std; - -class FFGB28181Decoder: public AbstractDecoder -{ -public: - FFGB28181Decoder(); - ~FFGB28181Decoder(); - - bool init(FFDecConfig& cfg); - void close(); - bool start(); - void pause(); - void resume(); - - void setDecKeyframe(bool bKeyframe); - - bool isRunning(); - bool isFinished(); - bool isPausing(); - bool getResolution( int &width, int &height ); - - bool isSurport(FFDecConfig& cfg); - - int getCachedQueueLength(); - - float fps(); - - DECODER_TYPE getDecoderType(){ return DECODER_TYPE_GB28181; } - - FFImgInfo* snapshot(); - - void setName(string nm){ - m_dec_name = nm; - } - - string getName(){ - return m_dec_name; - } - - void setPostDecArg(const void* postDecArg); - void setFinishedDecArg(const void* finishedDecArg); - -public: - void stream_callback(int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts); - void stream_end_callback(); - void post_decode_thread(); - -private: - DeviceRgbMemory* convert2bgr(AVFrame * gpuFrame); - int ff_decode(); - -private: - string m_dec_name; - FFDecConfig m_cfg; - - RTPReceiver* m_rtpPtr; - int m_port; - - uint64_t m_startPts {}; - uint64_t m_lastPts {}; //上一次pts的值 - uint64_t m_curPts {}; //当前的pts值 - uint64_t m_diffPts {}; - - uint32_t frameW {}, frameH {}; - float m_fps {}; - int m_frameSkip {}; - - int log_count {}; - - std::atomic_int m_status {}; - - pthread_t m_post_decode_thread; - const void * m_postDecArg; - POST_DECODE_CALLBACK post_decoded_cbk; // 解码数据回调接口 - - const void * m_finishedDecArg; - DECODE_FINISHED_CALLBACK decode_finished_cbk; - - queue mFrameQueue; - mutex m_queue_mutex; - mutex m_snapshot_mutex; - - bool m_dec_keyframe; -}; - -#endif // _GB28181_DECODER_H_ diff --git a/src/decoder/gb28181/main.cpp b/src/decoder/gb28181/main.cpp index ebd2f01..de6ea1c 100644 --- a/src/decoder/gb28181/main.cpp +++ b/src/decoder/gb28181/main.cpp @@ -105,8 +105,6 @@ bool start_rtp(string deviceId, int m_port) { return false; } - m_rtpPtr->SetDeviceID(deviceId); - m_rtpPtr->SetRequestStreamCallback(RequestStream); @@ -115,7 +113,7 @@ bool start_rtp(string deviceId, int m_port) { m_rtpPtr->SetVodEndCallback(RTP_Stream_End_CallBack, nullptr); - bool bRet = m_rtpPtr->Open(m_port); + bool bRet = m_rtpPtr->Open(deviceId); if(bRet){ // pthread_create(&m_post_decode_thread,0, // [](void* arg) @@ -140,7 +138,7 @@ bool RequestStream(const char* deviceId, int rtp_port) { return false; } - int ret = sipServer.RequestInvite_TCP_a(vec_device[0], rtp_port); + int ret = sipServer.RequestInvite_TCP_a(vec_device[0].id.c_str(), rtp_port); if (ret > 0) { return true; @@ -152,7 +150,7 @@ bool RequestStream(const char* deviceId, int rtp_port) { int main(int argc, char *argv[]) { ServerInfo info( - "SY_SipServer", + "SY_Sip_Server", "12345678", "192.168.60.179", 15060, @@ -164,7 +162,6 @@ int main(int argc, char *argv[]) { sipServer.Init(info); - sipServer.Start(); std::this_thread::sleep_for(std::chrono::seconds(5)); @@ -191,7 +188,7 @@ int main(int argc, char *argv[]) { int rtp_port = 30026;//allocRtpPort(); start_rtp(vec_device[0].id, rtp_port); - sipServer.RequestInvite_UDP(vec_device[0], rtp_port); + sipServer.RequestInvite_UDP(vec_device[0].id.c_str(), rtp_port); } break; case 'b': diff --git a/src/decoder/gb28181/rtp/RTPReceiver.cpp b/src/decoder/gb28181/rtp/RTPReceiver.cpp index 0d3da4a..94bd575 100644 --- a/src/decoder/gb28181/rtp/RTPReceiver.cpp +++ b/src/decoder/gb28181/rtp/RTPReceiver.cpp @@ -4,8 +4,16 @@ #include "../common_header.h" +#ifdef __linux__ +#include "arpa/inet.h" +#endif + #define BUFFERSIZE_1024 1024 -const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2; + +const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2; + +const int MIN_RTP_PORT = 10000 ; +const int MAX_RTP_PORT = 60000; // PS解包器回调 static int ReceivePESFunction(unsigned char streamid, void * data, int size, uint64_t pts, uint64_t localPts, bool key, void* param) @@ -73,13 +81,13 @@ int RTPReceiver::InitPS(){ return -1; } - LOG_INFO("[{}] InitPS finished", m_deviceID); + LOG_INFO("[{}] InitPS finished", m_SipChannelId); return 0; } void RTPReceiver::ClosePsThread(){ - LOG_INFO("[{}] 3.", m_deviceID); + LOG_INFO("[{}] 3.", m_SipChannelId); m_bPsExit = true; // PS解包线程退出 if (m_psThreadPtr->joinable()) @@ -89,7 +97,7 @@ void RTPReceiver::ClosePsThread(){ m_psThreadPtr = nullptr; } - LOG_INFO("[{}] ps demux thread quit", m_deviceID); + LOG_INFO("[{}] ps demux thread quit", m_SipChannelId); } // 处理去除了PS头的数据 @@ -125,7 +133,7 @@ void RTPReceiver::OnPsDemux(unsigned char streamId, BYTE *data, int len, bool ke //{ // byte_buffer bb(64); // bb << ERROR_REALSTREAM_INTERRUPT << "This session have a long time no decoding"; - // LOG_INFO("[{}] Long time no decoding!!!m_notToDecodCount=[{}]", m_deviceID, m_notToDecodCount); + // LOG_INFO("[{}] Long time no decoding!!!m_notToDecodCount=[{}]", m_SipChannelId, m_notToDecodCount); // // if (m_usrParam) // { @@ -135,7 +143,7 @@ void RTPReceiver::OnPsDemux(unsigned char streamId, BYTE *data, int len, bool ke // //通知网关关闭句柄 // if(!((VideoSession *)GetUsrParam())->streamHandle().empty()) // { - // LOG_INFO("[{}] ---->Notify hisense gateway release handle = {} !<----", m_deviceID, ((VideoSession *)GetUsrParam())->streamHandle()); + // LOG_INFO("[{}] ---->Notify hisense gateway release handle = {} !<----", m_SipChannelId, ((VideoSession *)GetUsrParam())->streamHandle()); // if (((VideoSession *)GetUsrParam())->video_type() == EREAL) // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); @@ -172,10 +180,10 @@ void RTPReceiver::OnPsDemux(unsigned char streamId, BYTE *data, int len, bool ke // 解PS包线程 int RTPReceiver::OnPsProcess() { - LOG_INFO("[{}] started.", m_deviceID); + LOG_INFO("[{}] started.", m_SipChannelId); while (!m_bPsExit) { m_psFrameMutex.lock(); - // LOG_DEBUG("[{}] PS frame size : {}", m_deviceID, m_psVideoFrames.size()); + // LOG_DEBUG("[{}] PS frame size : {}", m_SipChannelId, m_psVideoFrames.size()); if (m_psVideoFrames.size() <= 0){ m_psFrameMutex.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -189,15 +197,15 @@ int RTPReceiver::OnPsProcess() int nRet = m_psParser.AddData(frame->buf_, frame->len_); if (nRet == -1) { - LOG_INFO("m_psParser return -1--{}", m_deviceID); + LOG_INFO("m_psParser return -1--{}", m_SipChannelId); } else if (nRet == -2) { - LOG_INFO("m_psParser return -2--{}", m_deviceID); + LOG_INFO("m_psParser return -2--{}", m_SipChannelId); } else if (nRet == -3) { - LOG_INFO("m_psParser return -3--{}", m_deviceID); + LOG_INFO("m_psParser return -3--{}", m_SipChannelId); } delete frame; @@ -209,15 +217,11 @@ int RTPReceiver::OnPsProcess() m_hVodEndFunc(m_usrParam); - LOG_INFO("[{}] exited.", m_deviceID); + LOG_INFO("[{}] exited.", m_SipChannelId); return 0; } -void RTPReceiver::SetDeviceID(string deviceID){ - m_deviceID = deviceID; -} - int RTPReceiver::GetPsFrameListSize() { std::lock_guard l(m_psFrameMutex); @@ -232,7 +236,7 @@ void RTPReceiver::ClearPsVideoFrameList() delete f; m_psVideoFrames.pop(); } - LOG_INFO("[{}] cleared ps video frame list!", m_deviceID); + LOG_INFO("[{}] cleared ps video frame list!", m_SipChannelId); } int RTPReceiver::ParsePacket(RTPPacket* packet){ @@ -258,7 +262,7 @@ int RTPReceiver::ParsePacket(RTPPacket* packet){ break; } - // LOG_DEBUG("[{}] ParsePacket GetPayloadLength", m_deviceID); + // LOG_DEBUG("[{}] ParsePacket GetPayloadLength", m_SipChannelId); if (mark) { @@ -272,7 +276,7 @@ int RTPReceiver::ParsePacket(RTPPacket* packet){ std::lock_guard l(m_psFrameMutex); if (m_psVideoFrames.size() < 100) { - // LOG_DEBUG("[{}]ParsePacket push", m_deviceID); + // LOG_DEBUG("[{}]ParsePacket push", m_SipChannelId); m_psVideoFrames.push(new Frame(frameBuf, offset, false)); } else { @@ -282,7 +286,7 @@ int RTPReceiver::ParsePacket(RTPPacket* packet){ else{ //若此时解码线程已经退出,不再往m_psVideoFrames推送帧,且退出当前线程 free(frameBuf); - LOG_INFO("ParsePacket quit, device_id:{}", m_deviceID); + LOG_INFO("ParsePacket quit, device_id:{}", m_SipChannelId); return 1; } offset = 0; @@ -294,4 +298,44 @@ int RTPReceiver::ParsePacket(RTPPacket* packet){ } while (0); return 0; +} + +int RTPReceiver::allocRtpPort() { + + int s_rtpPort = MIN_RTP_PORT; + + srand((unsigned int)time(NULL)); + s_rtpPort = MIN_RTP_PORT + (rand() % MIN_RTP_PORT); + + if (s_rtpPort % 2) + ++s_rtpPort; + + while (true) + { + s_rtpPort = s_rtpPort >= MAX_RTP_PORT ? MIN_RTP_PORT : s_rtpPort; + + for (int i = 0; i < 2; i++) { + sockaddr_in sRecvAddr; + int s = socket(AF_INET, SOCK_DGRAM, 0); + + sRecvAddr.sin_family = AF_INET; + sRecvAddr.sin_addr.s_addr = htonl(INADDR_ANY); + sRecvAddr.sin_port = htons(s_rtpPort + i); + + int nResult = bind(s, (sockaddr *)&sRecvAddr, sizeof(sRecvAddr)); + if (nResult != 0) { + break; + } + + nResult = close(s); + if (nResult != 0) { + LOG_ERROR("[{}] - closesocket failed : {}", m_SipChannelId, nResult); + return -1; + } + } + + s_rtpPort += 2; + } + + return s_rtpPort; } \ No newline at end of file diff --git a/src/decoder/gb28181/rtp/RTPReceiver.h b/src/decoder/gb28181/rtp/RTPReceiver.h index 271f797..2342701 100644 --- a/src/decoder/gb28181/rtp/RTPReceiver.h +++ b/src/decoder/gb28181/rtp/RTPReceiver.h @@ -87,7 +87,7 @@ public: RTPReceiver(); virtual ~RTPReceiver(); - virtual bool Open(int localPort) = 0; + virtual bool Open(string channel_id) = 0; virtual bool IsOpened() = 0; virtual void Close() = 0; @@ -97,10 +97,10 @@ public: void SetRequestStreamCallback(CallBack_Request_Stream cb); - void SetDeviceID(string deviceID); - int GetPsFrameListSize(); + int allocRtpPort(); + public: void OnPsDemux(unsigned char streamId, BYTE *data, int len, bool key, uint64_t pts, uint64_t localPts); int OnPsProcess(); @@ -126,7 +126,7 @@ public: std::queue m_psVideoFrames; mutex m_psFrameMutex; - string m_deviceID; + string m_SipChannelId; int m_rtp_port{-1}; CMpeg2Demux m_psParser; diff --git a/src/decoder/gb28181/rtp/RTPTcpReceiver.cpp b/src/decoder/gb28181/rtp/RTPTcpReceiver.cpp index 51e662a..8b960bf 100644 --- a/src/decoder/gb28181/rtp/RTPTcpReceiver.cpp +++ b/src/decoder/gb28181/rtp/RTPTcpReceiver.cpp @@ -1,6 +1,7 @@ #include"RTPTcpReceiver.h" #include "../common_header.h" +#include "../sip/SipServer.h" // class TcpRTPSession : public RTPSession @@ -128,20 +129,29 @@ RTPTcpReceiver::~RTPTcpReceiver(){ } } -bool RTPTcpReceiver::Open(int localPort){ - if(0 != initSession(localPort)){ +bool RTPTcpReceiver::Open(string channel_id){ + m_SipChannelId = channel_id; + + int rtpPort = allocRtpPort(); + if (rtpPort < 0) { + return false; + } + + m_rtp_port = rtpPort; + + if(0 != initSession(m_rtp_port)){ return false; } m_bOpened = true; - LOG_INFO("[{}] started.", m_deviceID); + LOG_INFO("[{}] started.", m_SipChannelId); return true; } bool RTPTcpReceiver::IsOpened(){ - LOG_INFO("[{}] isopng:{} ", m_deviceID, m_bOpened); + LOG_INFO("[{}] isopng:{} ", m_SipChannelId, m_bOpened); return m_bOpened; } @@ -160,7 +170,7 @@ void RTPTcpReceiver::close_task(){ m_bAccepted = true; - LOG_DEBUG("[{}] 1.", m_deviceID); + LOG_DEBUG("[{}] 1.", m_SipChannelId); // rtp接收线程退出 if (m_rtpThread.joinable()) @@ -168,13 +178,13 @@ void RTPTcpReceiver::close_task(){ m_rtpThread.join(); } - LOG_DEBUG("[{}] 2.", m_deviceID); + LOG_DEBUG("[{}] 2.", m_SipChannelId); ClosePsThread(); m_bOpened = false; - LOG_INFO("[{}] closed.", m_deviceID); + LOG_INFO("[{}] closed.", m_SipChannelId); } bool RTPTcpReceiver::isClosing(){ @@ -196,13 +206,13 @@ int RTPTcpReceiver::initSession(int localPort){ int nRet = bind(m_nListener, (sockaddr*)&serverAddr, sizeof(serverAddr)); if (nRet == -1) { - LOG_ERROR("[{}] 绑定端口失败: {}", m_deviceID, localPort); + LOG_ERROR("[{}] 绑定端口失败: {}", m_SipChannelId, localPort); return -1; } if (listen(m_nListener, 1) == -1) { - LOG_ERROR("[{}] listen 失败", m_deviceID); + LOG_ERROR("[{}] listen 失败", m_SipChannelId); return -1; } @@ -219,25 +229,23 @@ int RTPTcpReceiver::initSession(int localPort){ if (status < 0) { // 若status = -59 ,需运行 export LOGNAME=root ,见 https://blog.csdn.net/m0_37876242/article/details/128588162 - LOG_ERROR("[{}] create session error: {}", m_deviceID, status); + LOG_ERROR("[{}] create session error: {}", m_SipChannelId, status); return -1; } - m_rtp_port = localPort; - m_rtpThread = std::thread(rtp_revc_thread_, this); m_listenFinishThread = std::thread(listen_finish_thread_, this); InitPS(); - // bool bRet = RequestStream(); - // if (!bRet) - // { - // LOG_INFO("[{}] 请求流失败!", m_deviceID); - // return -1; - // } + bool bRet = RequestStream(); + if (!bRet) + { + LOG_INFO("[{}] 请求流失败!", m_SipChannelId); + return -1; + } - LOG_INFO("[{}] 初始化成功, congratulations !!!", m_deviceID); + LOG_INFO("[{}] 初始化成功, congratulations !!!", m_SipChannelId); return 0; } @@ -248,13 +256,13 @@ int RTPTcpReceiver::OnRtpRecv() return -1; } - LOG_INFO("[{}] OnRtpRecv started, m_nListener : {}", m_deviceID, m_nListener); + LOG_INFO("[{}] OnRtpRecv started, m_nListener : {}", m_SipChannelId, m_nListener); sockaddr_in clientAddr; int nLen = sizeof(sockaddr_in); SocketType nServer = -1; - LOG_INFO("[{}] Poll started.", m_deviceID); + LOG_INFO("[{}] Poll started.", m_SipChannelId); int reconn_times = 0; int reaccept_times = 0; bool bReconn = false; @@ -264,49 +272,49 @@ int RTPTcpReceiver::OnRtpRecv() goto end_flag; } - // while (!bReconn){ - // if(m_bRtpExit){ - // goto end_flag; - // } - - // reconn_times++; - // if(reconn_times > 10){ - // // 10次请求都失败,结束任务 - // m_bRtpExit = true; - // goto end_flag; - // } - // LOG_DEBUG("[{}] RequestStream...", m_deviceID); - // bReconn = RequestStream(); - // if (bReconn){ - // LOG_DEBUG("[{}] RequestStream, True", m_deviceID); - // continue; - // } - // LOG_DEBUG("[{}] RequestStream, False", m_deviceID); + while (!bReconn){ + if(m_bRtpExit){ + goto end_flag; + } + + reconn_times++; + if(reconn_times > 10){ + // 10次请求都失败,结束任务 + m_bRtpExit = true; + goto end_flag; + } + LOG_DEBUG("[{}] RequestStream...", m_SipChannelId); + bReconn = RequestStream(); + if (bReconn){ + LOG_DEBUG("[{}] RequestStream, True", m_SipChannelId); + continue; + } + LOG_DEBUG("[{}] RequestStream, False", m_SipChannelId); - // std::this_thread::sleep_for(std::chrono::seconds(5)); - // } + std::this_thread::sleep_for(std::chrono::seconds(5)); + } - LOG_DEBUG("[{}] accepting...", m_deviceID); + LOG_DEBUG("[{}] accepting...", m_SipChannelId); nServer = accept(m_nListener, (sockaddr*)&clientAddr, (socklen_t * ) &nLen); if (-1 == nServer){ reaccept_times++; - LOG_DEBUG("[{}] reaccept_times = {}", m_deviceID, reaccept_times); + LOG_DEBUG("[{}] reaccept_times = {}", m_SipChannelId, reaccept_times); if(reaccept_times > 600){ - LOG_DEBUG("[{}] reaccept_times > 600", m_deviceID); + LOG_DEBUG("[{}] reaccept_times > 600", m_SipChannelId); bReconn = false; reaccept_times = 0; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); continue; } - LOG_DEBUG("[{}] accept success", m_deviceID); + LOG_DEBUG("[{}] accept success", m_SipChannelId); m_rtpSessionPtr->AddDestination(RTPTCPAddress(nServer)); m_bAccepted = true; bReconn = false; reconn_times = 0; reaccept_times = 0; - LOG_INFO("[{}] nServer={}", m_deviceID, nServer); + LOG_INFO("[{}] nServer={}", m_SipChannelId, nServer); break; } @@ -319,7 +327,7 @@ int RTPTcpReceiver::OnRtpRecv() while ((pack = m_rtpSessionPtr->GetNextPacket()) != NULL) { - // LOG_DEBUG("[{}] time: {} ", m_deviceID, UtilTools::get_cur_time_ms()); + // LOG_DEBUG("[{}] time: {} ", m_SipChannelId, UtilTools::get_cur_time_ms()); ParsePacket(pack); m_rtpSessionPtr->DeletePacket(pack); @@ -344,7 +352,7 @@ end_flag: close(m_nListener); } - LOG_INFO("[{}] OnRtpRecv exited.", m_deviceID); + LOG_INFO("[{}] OnRtpRecv exited.", m_SipChannelId); return 0; } @@ -362,9 +370,11 @@ bool RTPTcpReceiver::ReConnect(){ } bool RTPTcpReceiver::RequestStream(){ - if (m_callback_request_stream){ - return m_callback_request_stream(m_deviceID.c_str(), m_rtp_port); - } - - return false; + SipServer* pServer = SipServer::getInstance(); + int ret = -1; + if (pServer){ + ret = pServer->RequestInvite_UDP(m_SipChannelId.c_str(), m_rtp_port); + } + + return (ret > 0) ; } \ No newline at end of file diff --git a/src/decoder/gb28181/rtp/RTPTcpReceiver.h b/src/decoder/gb28181/rtp/RTPTcpReceiver.h index 64045b0..4cfcd70 100644 --- a/src/decoder/gb28181/rtp/RTPTcpReceiver.h +++ b/src/decoder/gb28181/rtp/RTPTcpReceiver.h @@ -51,7 +51,7 @@ public: RTPTcpReceiver(); ~RTPTcpReceiver(); - bool Open(int localPort); + bool Open(string channel_id); bool IsOpened(); void Close(); diff --git a/src/decoder/gb28181/rtp/RTPUdpReceiver.cpp b/src/decoder/gb28181/rtp/RTPUdpReceiver.cpp index 10026c3..4021563 100644 --- a/src/decoder/gb28181/rtp/RTPUdpReceiver.cpp +++ b/src/decoder/gb28181/rtp/RTPUdpReceiver.cpp @@ -7,6 +7,7 @@ #include #include "../common_header.h" +#include "../sip/SipServer.h" using namespace std; @@ -86,44 +87,58 @@ RTPUdpReceiver::~RTPUdpReceiver() } } -bool RTPUdpReceiver::Open(int localPort) +bool RTPUdpReceiver::Open(string channel_id) { + m_SipChannelId = channel_id; + + int rtpPort = allocRtpPort(); + if (rtpPort < 0) { + return false; + } + m_rtp_port = rtpPort; + m_sessparamsPtr->SetUsePollThread(true); m_sessparamsPtr->SetMinimumRTCPTransmissionInterval(10); m_sessparamsPtr->SetOwnTimestampUnit(1.0/90000.0); m_sessparamsPtr->SetAcceptOwnPackets(true); - m_transparamsPtr->SetPortbase(localPort); + m_transparamsPtr->SetPortbase(m_rtp_port); m_transparamsPtr->SetRTPReceiveBuffer(kRtpRecvBufferSize); - LOG_INFO("[{}] port: {}", m_deviceID, localPort); + LOG_INFO("[{}] port: {}", m_SipChannelId, m_rtp_port); int err = m_rtpSessionPtr->Create(*m_sessparamsPtr, m_transparamsPtr); - if (err != 0) - { - LOG_ERROR("[{}] Create error: {}", m_deviceID, err); + if (err != 0) { + LOG_ERROR("[{}] Create error: {}", m_SipChannelId, err); return false; } m_rtpThreadPtr = new std::thread(rtp_revc_thread_, this); - if (nullptr == m_rtpThreadPtr) - { - LOG_ERROR("[{}] Create m_rtpThreadPtr error", m_deviceID); + if (nullptr == m_rtpThreadPtr) { + LOG_ERROR("[{}] Create m_rtpThreadPtr error", m_SipChannelId); return false; } - - if (InitPS() != 0) - { + if (InitPS() != 0) { return false; } m_bOpened = true; - LOG_INFO("[{}] Open ok", m_deviceID); + LOG_INFO("[{}] Open ok", m_SipChannelId); return true; } +bool RTPUdpReceiver::RequestStream() { + SipServer* pServer = SipServer::getInstance(); + int ret = -1; + if (pServer){ + ret = pServer->RequestInvite_UDP(m_SipChannelId.c_str(), m_rtp_port); + } + + return (ret > 0) ; +} + bool RTPUdpReceiver::IsOpened() { return m_bOpened; @@ -146,7 +161,7 @@ void RTPUdpReceiver::Close() m_bOpened = false; - LOG_INFO("[{}] closed.", m_deviceID); + LOG_INFO("[{}] closed.", m_SipChannelId); } // 收RTP包线程 @@ -156,7 +171,7 @@ int RTPUdpReceiver::OnRtpRecv() return -1; } - LOG_INFO("[{}] OnRtpRecv started.", m_deviceID); + LOG_INFO("[{}] OnRtpRecv started.", m_SipChannelId); while (!m_bRtpExit) { //try @@ -166,7 +181,7 @@ int RTPUdpReceiver::OnRtpRecv() if (m_rtpSessionPtr->GotoFirstSourceWithData()) { - // LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_deviceID); + // LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_SipChannelId); last_recv_ts = UtilTools::get_cur_time_ms(); m_idleCount = 0; m_noDataCount = 0; @@ -175,7 +190,7 @@ int RTPUdpReceiver::OnRtpRecv() RTPPacket* packet; while ((packet = m_rtpSessionPtr->GetNextPacket()) != NULL) { - // LOG_INFO("OnRtpRecv GetNextPacket --{}", m_deviceID); + // LOG_INFO("OnRtpRecv GetNextPacket --{}", m_SipChannelId); int ret = ParsePacket(packet); m_rtpSessionPtr->DeletePacket(packet); @@ -206,7 +221,7 @@ int RTPUdpReceiver::OnRtpRecv() // //由于record_stream_status这个函数返回值不准确,所以增加一个进度条大于80% // if(record_stream_status(((VideoSession *)GetUsrParam())->streamHandle())) // { - // LOG_INFO("************Record stream is finished**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress()); + // LOG_INFO("************Record stream is finished**{}**m_progress = {}********", m_SipChannelId, ((VideoSession *)GetUsrParam())->progress()); // m_idleCount = -1; // m_hVodEndFunc(m_usrParam); // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); @@ -217,7 +232,7 @@ int RTPUdpReceiver::OnRtpRecv() // //如果此时进度大于80% 算完成吧 // if(((VideoSession *)GetUsrParam())->progress() > 0.80) // { - // LOG_INFO("************Record stream is overtime**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress()); + // LOG_INFO("************Record stream is overtime**{}**m_progress = {}********", m_SipChannelId, ((VideoSession *)GetUsrParam())->progress()); // m_idleCount = 0; // m_hVodEndFunc(m_usrParam); @@ -227,7 +242,7 @@ int RTPUdpReceiver::OnRtpRecv() // else // { // m_idleCount = -1; - // //LOG_ERROR("************post ERROR_REALSTREAM_INTERRUPT to structure****{}********", m_deviceID); + // //LOG_ERROR("************post ERROR_REALSTREAM_INTERRUPT to structure****{}********", m_SipChannelId); // //发送流中断 // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption!"); // } @@ -238,7 +253,7 @@ int RTPUdpReceiver::OnRtpRecv() // // if (m_noDataCount < -200000)//任务开始时没收到流 // { - // //LOG_ERROR("************m_hVodEndFunc(m_usrParam)!!!m_hVodEndFunc(m_usrParam)********{}******", m_deviceID); + // //LOG_ERROR("************m_hVodEndFunc(m_usrParam)!!!m_hVodEndFunc(m_usrParam)********{}******", m_SipChannelId); // m_noDataCount = -1; // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption2!"); @@ -256,7 +271,7 @@ int RTPUdpReceiver::OnRtpRecv() // uint64_t cts = UtilTools::get_cur_time_ms(); // float duration_not_recv = (cts - last_recv_ts) / 1000.0; // - // //LOG_ERROR("************I haven't got stream from hik gateway exceed {}s,send eof********{}******", duration_not_recv, m_deviceID); + // //LOG_ERROR("************I haven't got stream from hik gateway exceed {}s,send eof********{}******", duration_not_recv, m_SipChannelId); // m_idleCount = -1; // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption!"); @@ -264,7 +279,7 @@ int RTPUdpReceiver::OnRtpRecv() // // if (m_noDataCount < -200000)//任务开始时没收到流 // { - // //LOG_ERROR("************m_noDataCount < -200000********{}******", m_deviceID); + // //LOG_ERROR("************m_noDataCount < -200000********{}******", m_SipChannelId); // m_noDataCount = -1; // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption2!"); @@ -275,7 +290,7 @@ int RTPUdpReceiver::OnRtpRecv() //} // catch (GeneralException2& e) //{ - // //LOG_ERROR("---> video streaming interruption!<---{}, error: {}", m_deviceID, e.err_msg()); + // //LOG_ERROR("---> video streaming interruption!<---{}, error: {}", m_SipChannelId, e.err_msg()); // byte_buffer bb(64); // bb << VasCmd::VAS_CMD_REALSTREAM_INTERRUPT << e.err_msg(); @@ -287,7 +302,7 @@ int RTPUdpReceiver::OnRtpRecv() // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size()); // } // catch (GeneralException2& e) { - // //LOG_ERROR("[{}] send vas cmd VAS_CMD_REALSTREAM_INTERRUPT error: {}, {}", m_deviceID, e.err_code(), e.err_str()); + // //LOG_ERROR("[{}] send vas cmd VAS_CMD_REALSTREAM_INTERRUPT error: {}, {}", m_SipChannelId, e.err_code(), e.err_str()); // } // } @@ -295,7 +310,7 @@ int RTPUdpReceiver::OnRtpRecv() // if(!((VideoSession *)GetUsrParam())->streamHandle().empty()) // { - // LOG_INFO("---->Notify hisense gateway release handle = {} !<----{}", ((VideoSession *)GetUsrParam())->streamHandle().c_str(), m_deviceID); + // LOG_INFO("---->Notify hisense gateway release handle = {} !<----{}", ((VideoSession *)GetUsrParam())->streamHandle().c_str(), m_SipChannelId); // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // @@ -315,7 +330,7 @@ int RTPUdpReceiver::OnRtpRecv() std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - LOG_INFO("[{}] OnRtpRecv exited.", m_deviceID); + LOG_INFO("[{}] OnRtpRecv exited.", m_SipChannelId); return 0; } diff --git a/src/decoder/gb28181/rtp/RTPUdpReceiver.h b/src/decoder/gb28181/rtp/RTPUdpReceiver.h index fc6cc3b..965de8f 100644 --- a/src/decoder/gb28181/rtp/RTPUdpReceiver.h +++ b/src/decoder/gb28181/rtp/RTPUdpReceiver.h @@ -37,10 +37,12 @@ public: RTPUdpReceiver(); ~RTPUdpReceiver(); - virtual bool Open(int localPort); + virtual bool Open(string channel_id); virtual bool IsOpened() ; virtual void Close() ; + bool RequestStream(); + public: int OnRtpRecv(); @@ -58,6 +60,8 @@ private: RTPSessionParams* m_sessparamsPtr; RTPUDPv4TransmissionParams* m_transparamsPtr; + + string m_sip_channel_id; }; #endif // _RTP_UDP_RECEIVER_H_ diff --git a/src/decoder/gb28181/sip/SipServer.cpp b/src/decoder/gb28181/sip/SipServer.cpp index f1863f9..c70e3ff 100644 --- a/src/decoder/gb28181/sip/SipServer.cpp +++ b/src/decoder/gb28181/sip/SipServer.cpp @@ -73,6 +73,9 @@ SipServer::~SipServer() { bool SipServer::Init(ServerInfo info) { mInfo = info; LOG_INFO("{}:{}", mInfo.getIp(), mInfo.getPort()); + + m_event_loop_thread = new std::thread(event_loop_thread, this); + return true; } @@ -174,10 +177,6 @@ int SipServer::init_sip_server() { return 0; } -void SipServer::Start() { - m_event_loop_thread = new std::thread(event_loop_thread, this); -} - void SipServer::event_loop() { if(this->init_sip_server() !=0 ){ @@ -370,7 +369,7 @@ int SipServer::request_bye(eXosip_event_t* evtp) { return ret; } -int SipServer::RequestInvite_UDP(const DeviceInfo& device, int rtpPort) { +int SipServer::RequestInvite_UDP(const char* dst_channel, int rtpPort) { if (mClientMap.size() <= 0){ return -1; @@ -388,8 +387,6 @@ int SipServer::RequestInvite_UDP(const DeviceInfo& device, int rtpPort) { char sdp[2048] = { 0 }; char head[1024] = { 0 }; - const char* dst_channel = "34020000001310004065"; - sprintf(from, "sip:%s@%s:%d", mInfo.getSipId().c_str(), mInfo.getIp().c_str(), mInfo.getPort()); sprintf(to, "sip:%s@%s:%d", dst_channel, client->getIp().c_str(), client->getPort()); snprintf(sdp, 2048, @@ -430,7 +427,7 @@ int SipServer::RequestInvite_UDP(const DeviceInfo& device, int rtpPort) { return ret; } -int SipServer::RequestInvite_TCP_a(const DeviceInfo& device, int rtpPort) { +int SipServer::RequestInvite_TCP_a(const char* dst_channel, int rtpPort) { if (mClientMap.size() <= 0){ return -1; } @@ -447,7 +444,7 @@ int SipServer::RequestInvite_TCP_a(const DeviceInfo& device, int rtpPort) { char sdp[2048] = { 0 }; char head[1024] = { 0 }; - const char* dst_channel = "34020000001320000001"; + // const char* dst_channel = "34020000001320000001"; sprintf(from, "sip:%s@%s:%d", mInfo.getSipId().c_str(), mInfo.getIp().c_str(), mInfo.getPort()); sprintf(to, "sip:%s@%s:%d", dst_channel, client->getIp().c_str(), client->getPort()); diff --git a/src/decoder/gb28181/sip/SipServer.h b/src/decoder/gb28181/sip/SipServer.h index bad1c01..a0c6a8c 100644 --- a/src/decoder/gb28181/sip/SipServer.h +++ b/src/decoder/gb28181/sip/SipServer.h @@ -116,11 +116,19 @@ private: class SipServer { public: + static SipServer* getInstance(){ + static SipServer* singleton = nullptr; + if (singleton == nullptr){ + singleton = new SipServer(); + } + return singleton; + } + +public: SipServer(); ~SipServer(); -public: + bool Init(ServerInfo info); - void Start(); Client* GetClientByDevice(string device); void DeleteClientByDevice(string device); @@ -129,9 +137,9 @@ public: std::map GetClientMap(); - int RequestInvite_UDP(const DeviceInfo& device, int rtpPort); + int RequestInvite_UDP(const char* dst_channel, int rtpPort); - int RequestInvite_TCP_a(const DeviceInfo& device, int rtpPort); + int RequestInvite_TCP_a(const char* dst_channel, int rtpPort); void cacheCatalog(); -- libgit2 0.21.4