diff --git a/src/decoder/dvpp/DvppDecoder.cpp b/src/decoder/dvpp/DvppDecoder.cpp index 6cf41d4..4d76ee6 100644 --- a/src/decoder/dvpp/DvppDecoder.cpp +++ b/src/decoder/dvpp/DvppDecoder.cpp @@ -174,7 +174,7 @@ AVCodecContext* DvppDecoder::init_FFmpeg(FFDecConfig config){ } // 查找视频流信息 - const AVCodec *decoder = nullptr; + AVCodec *decoder = nullptr; video_index = av_find_best_stream(fmt_ctx, AVMEDIA_TYPE_VIDEO, -1, -1, &decoder, 0); if (video_index < 0) { LOG_ERROR("[{}]- Cannot find a video stream in the input file!", m_dec_name); diff --git a/src/decoder/dvpp/DvppRtpDecoder.cpp b/src/decoder/dvpp/DvppRtpDecoder.cpp index 842a502..e786192 100644 --- a/src/decoder/dvpp/DvppRtpDecoder.cpp +++ b/src/decoder/dvpp/DvppRtpDecoder.cpp @@ -224,13 +224,13 @@ bool DvppRtpDecoder::isSurport(FFDecConfig& cfg){ } bool DvppRtpDecoder::start(){ + + m_bRunning = true; if(!probe()) { return false; } - m_bRunning = true; - m_read_thread = new std::thread([](void* arg) { DvppRtpDecoder* a=(DvppRtpDecoder*)arg; @@ -362,62 +362,82 @@ void DvppRtpDecoder::release_ffmpeg() { void DvppRtpDecoder::CacheBuffer(uint8_t* recvBuf, int recvBufSize) { if ((m_bufferSize + recvBufSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) { - memcpy(m_buffer + m_bufferSize, recvBuf + RTP_HEADER_SIZE, recvBufSize - RTP_HEADER_SIZE); - m_bufferSize += recvBufSize - RTP_HEADER_SIZE; - } else { + memcpy(m_buffer + m_bufferSize, recvBuf + RTP_HEADER_SIZE, recvBufSize - RTP_HEADER_SIZE); + m_bufferSize += recvBufSize - RTP_HEADER_SIZE; + } else { LOG_WARN("recvBufSize = {} over MAX_RTP_BUFFER_SIZE ", recvBufSize); } } int DvppRtpDecoder::ReadBuffer(uint8_t* buf, int buffsize) { int ret = 0; + int count = 0; + while(m_bRunning && (m_bufferSize < 4096 || m_bufferSize < buffsize)){ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + count++; + if (count >= 1000) { + // 只等待10s + return -1; + } + } + if (m_bufferSize >= buffsize) { memcpy(buf, m_buffer, buffsize); m_bufferSize = m_bufferSize - buffsize; memmove(m_buffer, m_buffer + buffsize, m_bufferSize); ret = buffsize; - - LOG_DEBUG("avio_read_packet={}", buffsize); } + printf("m_bufferSize=%d buffsize=%d\n", m_bufferSize.load(), buffsize); + return ret; } bool DvppRtpDecoder::probe() { + // todo: 此处可能有泄露 - unsigned char* avioBuff = (unsigned char*)av_malloc(7680 * 4320); - ioCtx = avio_alloc_context(avioBuff, sizeof(avioBuff), 0, this, avio_read_packet, NULL, NULL); + unsigned char* avioBuff = (unsigned char*)av_malloc(MAX_RTP_BUFFER_SIZE); + AVIOContext *ioCtx = avio_alloc_context(avioBuff, sizeof(avioBuff), 0, this, avio_read_packet, NULL, NULL); //探测流(获取码流格式) - const AVInputFormat* inputFmt; - int ret = av_probe_input_buffer2(ioCtx, &inputFmt, "", NULL, 0, 0); + AVInputFormat* inputFmt = nullptr; + int ret = av_probe_input_buffer(ioCtx, &inputFmt, "", NULL, 0, 0); if (ret < 0){ - LOG_ERROR("av_probe_input_buffer2 error: {}", ret); + LOG_ERROR("av_probe_input_buffer error: {}", ret); return false; } do{ fmt_ctx = avformat_alloc_context(); + + fmt_ctx->probesize = 10000000;//5 000 000 + fmt_ctx->flags |= AVFMT_FLAG_NOBUFFER; + av_opt_set(fmt_ctx->priv_data,"preset","ultrafast",0); + + //AV_TIME_BASE = 1000 000 + fmt_ctx->max_analyze_duration = 90 * AV_TIME_BASE; + fmt_ctx->pb = ioCtx; AVDictionary* net_options{nullptr};//网络连接参数 - - //配置流参数 - //av_dict_set(&net_options, "fflags", "nobuffer", 0); //不缓存直接解码 + // av_dict_set(&net_options, "fflags", "nobuffer", 0); //不缓存直接解码 + av_dict_set( &net_options, "bufsize", "655360", 0 ); + av_dict_set( &net_options, "stimeout", "30000000", 0 ); // 单位为 百万分之一秒 + av_dict_set( &net_options, "max_delay", "500000", 0); //设置最大时延 //打开流 - ret = avformat_open_input(&fmt_ctx, "", inputFmt, &net_options); - if (ret != 0) - { + ret = avformat_open_input(&fmt_ctx, 0, 0, &net_options); + if (ret != 0) { LOG_ERROR("avformat_open_input error: {}", ret); break; } + //获取流信息 - if (avformat_find_stream_info(fmt_ctx, NULL) < 0)//? - { + if (avformat_find_stream_info(fmt_ctx, NULL) < 0) { LOG_ERROR("avformat_find_stream_info error"); break; } + //获取视频流 mVideoIndex = av_find_best_stream(fmt_ctx, AVMEDIA_TYPE_VIDEO, -1, -1, NULL, 0); if (mVideoIndex < 0) @@ -550,8 +570,8 @@ void DvppRtpDecoder::read_thread() { if (result == AVERROR_EOF || result < 0){ av_packet_free(&pkt); pkt = nullptr; - LOG_WARN("[{}]- Failed to read frame!", m_dec_name); - break; + // LOG_WARN("[{}]- Failed to read frame!", m_dec_name); + continue; } if (m_DvppCacheCounter.load() > m_cache_gop){ diff --git a/src/decoder/dvpp/DvppRtpDecoder.h b/src/decoder/dvpp/DvppRtpDecoder.h index 8473c56..6a8cc2c 100644 --- a/src/decoder/dvpp/DvppRtpDecoder.h +++ b/src/decoder/dvpp/DvppRtpDecoder.h @@ -106,7 +106,6 @@ private: // 读取数据 AVFormatContext *fmt_ctx{nullptr}; - AVIOContext * ioCtx{nullptr}; int mVideoIndex {-1}; AVPixelFormat pix_fmt; AVCodecContext *avctx{nullptr}; diff --git a/src/decoder/gb28181/rtp2/RTPReceiver2.cpp b/src/decoder/gb28181/rtp2/RTPReceiver2.cpp index 5134e58..84565b5 100644 --- a/src/decoder/gb28181/rtp2/RTPReceiver2.cpp +++ b/src/decoder/gb28181/rtp2/RTPReceiver2.cpp @@ -184,7 +184,7 @@ int RTPReceiver2::tcp_server() { char buff[4096]; int n; - if( (listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1 ){ + if( (listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP)) == -1 ){ printf("create socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } @@ -204,23 +204,30 @@ int RTPReceiver2::tcp_server() { return 0; } - char recvBuf[10000]; + uint8_t recvBuf[10000]; int recvBufSize = 0; + bool bFilter = false; + while (!m_bRtpExit) { - LOG_INFO("阻塞监听新连接..."); + // LOG_INFO("阻塞监听新连接..."); // 阻塞接收请求 start socklen_t len = sizeof(sockaddr); sockaddr_in accept_addr; int clientFd = accept(listenfd, (struct sockaddr*)&accept_addr, &len); if (clientFd < 0) { - LOG_WARN("accept connection error"); + if (!bFilter) { + LOG_WARN("accept connection warn"); + bFilter = true; + } + std::this_thread::sleep_for(std::chrono::milliseconds(5)); continue; } // 阻塞接收请求 end LOG_INFO("发现新连接:clientFd={}", clientFd); + bFilter = false; while (!m_bRtpExit) { recvBufSize = recv(clientFd, recvBuf, sizeof(recvBuf), 0); @@ -244,7 +251,7 @@ int RTPReceiver2::tcp_server() { return 0; } -void RTPReceiver2::parseTcpData(char* recvBuf, int recvBufSize) { +void RTPReceiver2::parseTcpData(uint8_t* recvBuf, int recvBufSize) { if ((mRecvCacheSize + recvBufSize) > Server_cache_max_size) { LOG_ERROR("超过缓冲容量上限,忽略本次读取的数据。mRecvCacheSize=%d,recvBufSize=%d",mRecvCacheSize, recvBufSize); @@ -277,10 +284,10 @@ void RTPReceiver2::parseTcpData(char* recvBuf, int recvBufSize) { struct RtpHeader rtpHeader; parseRtpHeader(mRecvRtpBuffer, &rtpHeader); - printf("get a rtp seq=%d,RtpBufferSize=%d,mRecvCacheSize=%d,marker=%d,timestamp=%d\n", - rtpHeader.seq, - mRecvRtpBufferSize, - mRecvCacheSize,rtpHeader.marker, rtpHeader.timestamp); + // printf("get a rtp seq=%d,RtpBufferSize=%d,mRecvCacheSize=%d,marker=%d,timestamp=%d\n", + // rtpHeader.seq, + // mRecvRtpBufferSize, + // mRecvCacheSize,rtpHeader.marker, rtpHeader.timestamp); // buffer 抛出 m_buffer_cbk(m_bufferParam, mRecvRtpBuffer, mRecvRtpBufferSize, rtpHeader.timestamp); diff --git a/src/decoder/gb28181/rtp2/RTPReceiver2.h b/src/decoder/gb28181/rtp2/RTPReceiver2.h index eecc410..1124ebf 100644 --- a/src/decoder/gb28181/rtp2/RTPReceiver2.h +++ b/src/decoder/gb28181/rtp2/RTPReceiver2.h @@ -36,7 +36,7 @@ public: private: bool start_server(string channel_id, int port, bool isUdp); - void parseTcpData(char* recvBuf, int recvBufSize); + void parseTcpData(uint8_t* recvBuf, int recvBufSize); public: uint8_t* mRecvCache {nullptr};