From 341effc6e169f1997a905785a7cf469cc8ded08a Mon Sep 17 00:00:00 2001 From: Hu Chunming <2657262686@qq.com> Date: Mon, 5 Aug 2024 16:01:27 +0800 Subject: [PATCH] 等待时间优化 --- src/decoder/dvpp/DvppRtpDecoder.cpp | 25 ++++++++++++------------- src/decoder/dvpp/DvppRtpDecoder.h | 2 ++ src/decoder/gb28181/rtp2/RTPReceiver2.cpp | 20 +++++++++++++++----- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/src/decoder/dvpp/DvppRtpDecoder.cpp b/src/decoder/dvpp/DvppRtpDecoder.cpp index f806b4e..9c7f6d8 100644 --- a/src/decoder/dvpp/DvppRtpDecoder.cpp +++ b/src/decoder/dvpp/DvppRtpDecoder.cpp @@ -382,8 +382,8 @@ int DvppRtpDecoder::ReadBuffer(uint8_t* buf, int buffsize) { } std::this_thread::sleep_for(std::chrono::milliseconds(10)); count++; - if (count >= 3000) { - // 只等待30s + if (count >= m_buffer_waiting_time) { + // 等待 return AVERROR(EIO); } } @@ -398,27 +398,22 @@ int DvppRtpDecoder::ReadBuffer(uint8_t* buf, int buffsize) { } bool DvppRtpDecoder::probe() { + + m_buffer_waiting_time = 3000; //probe只等待30s,避免卡主任务添加 // todo: 此处可能有泄露 unsigned char* avioBuff = (unsigned char*)av_malloc(MAX_RTP_BUFFER_SIZE); AVIOContext *ioCtx = avio_alloc_context(avioBuff, sizeof(avioBuff), 0, this, avio_read_packet, NULL, NULL); - //探测流(获取码流格式) - AVInputFormat* inputFmt = nullptr; - int ret = av_probe_input_buffer(ioCtx, &inputFmt, "", NULL, 0, 0); - if (ret < 0){ - LOG_ERROR("av_probe_input_buffer error: {}", ret); - return false; - } do{ fmt_ctx = avformat_alloc_context(); - fmt_ctx->probesize = 10000000;//5 000 000 - fmt_ctx->flags |= AVFMT_FLAG_NOBUFFER; + // fmt_ctx->probesize = 10000000;//5 000 000 + // fmt_ctx->flags |= AVFMT_FLAG_NOBUFFER; av_opt_set(fmt_ctx->priv_data,"preset","ultrafast",0); //AV_TIME_BASE = 1000 000 - fmt_ctx->max_analyze_duration = 90 * AV_TIME_BASE; + fmt_ctx->max_analyze_duration = 100 * AV_TIME_BASE; fmt_ctx->pb = ioCtx; @@ -428,9 +423,11 @@ bool DvppRtpDecoder::probe() { av_dict_set( &net_options, "bufsize", "655360", 0 ); av_dict_set( &net_options, "stimeout", "30000000", 0 ); // 单位为 百万分之一秒 av_dict_set( &net_options, "max_delay", "500000", 0); //设置最大时延 + av_dict_set( &net_options, "probesize", "50M", 0); //设置最大时延 + //打开流 - ret = avformat_open_input(&fmt_ctx, 0, inputFmt, &net_options); + int ret = avformat_open_input(&fmt_ctx, 0, 0, &net_options); if (ret != 0) { LOG_ERROR("avformat_open_input error: {}", ret); break; @@ -565,6 +562,8 @@ void DvppRtpDecoder::read_thread() { CHECK_AND_BREAK(aclvdecSetChannelDescOutPicFormat(vdecChannelDesc, PIXEL_FORMAT_YUV_SEMIPLANAR_420), "aclvdecSetChannelDescOutPicFormat failed"); CHECK_AND_BREAK(aclvdecCreateChannel(vdecChannelDesc), "aclvdecCreateChannel failed"); + m_buffer_waiting_time = 30000; //read最多等待5分钟没有数据 + unsigned long long frame_nb = 0; while (m_bRunning){ diff --git a/src/decoder/dvpp/DvppRtpDecoder.h b/src/decoder/dvpp/DvppRtpDecoder.h index 59d980b..063728f 100644 --- a/src/decoder/dvpp/DvppRtpDecoder.h +++ b/src/decoder/dvpp/DvppRtpDecoder.h @@ -158,5 +158,7 @@ private: void* m_finishParam; CallBack_DecodeFinished m_finish_cbk; // 录像流结束回调 + int m_buffer_waiting_time{3000} ; + }; #endif //__DVPP_RTP_DECODER_H__ \ No newline at end of file diff --git a/src/decoder/gb28181/rtp2/RTPReceiver2.cpp b/src/decoder/gb28181/rtp2/RTPReceiver2.cpp index d53b1c6..591d063 100644 --- a/src/decoder/gb28181/rtp2/RTPReceiver2.cpp +++ b/src/decoder/gb28181/rtp2/RTPReceiver2.cpp @@ -61,6 +61,8 @@ bool RTPReceiver2::Open(string channel_id, bool isUdp) { return false; } + m_bRtpExit = false; + bool bReq = start_server(channel_id, m_rtp_port, isUdp); if (!bReq) { LOG_INFO("[{}] start_server failed !", m_SipChannelId); @@ -91,7 +93,6 @@ void RTPReceiver2::Close(){ bool RTPReceiver2::start_server(string channel_id, int port, bool isUdp) { WebsocketClient* pClient = WebsocketClient::getInstance(); if (pClient){ - if (isUdp) { m_server_thread = new std::thread([](void* arg) { RTPReceiver2* a=(RTPReceiver2*)arg; @@ -125,7 +126,7 @@ int RTPReceiver2::udp_server() { LOG_INFO("udp {}",port); - int server_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); //AF_INET:IPV4;SOCK_DGRAM:UDP + int server_fd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, IPPROTO_UDP); //AF_INET:IPV4;SOCK_DGRAM:UDP if(server_fd < 0) { printf("create socket fail!\n"); @@ -146,19 +147,28 @@ int RTPReceiver2::udp_server() { uint8_t recvBuf[10000]; int recvBufSize; - socklen_t len; struct sockaddr_in clent_addr; //clent_addr用于记录发送方的地址信息 + long long last_time = get_cur_time_second(); while(!m_bRtpExit) { memset(recvBuf, 0, sizeof(recvBuf)); len = sizeof(clent_addr); recvBufSize = recvfrom(server_fd, recvBuf, sizeof(recvBuf), 0, (struct sockaddr*)&clent_addr, &len); //recvfrom是拥塞函数,没有数据就一直拥塞 if(recvBufSize <= 0) { - LOG_ERROR("recieve data fail!"); - break; + long long cur_time = get_cur_time_second(); + if (cur_time - last_time > 300) { + // 5分钟未能获取数据,则退出 + m_bRtpExit = true; + break; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + continue; } + last_time = get_cur_time_second(); + // buffer 抛出 m_buffer_cbk(m_bufferParam, recvBuf, recvBufSize, 0); } -- libgit2 0.21.4