Commit 341effc6e169f1997a905785a7cf469cc8ded08a

Authored by Hu Chunming
1 parent 5f4935b7

等待时间优化

src/decoder/dvpp/DvppRtpDecoder.cpp
@@ -382,8 +382,8 @@ int DvppRtpDecoder::ReadBuffer(uint8_t* buf, int buffsize) { @@ -382,8 +382,8 @@ int DvppRtpDecoder::ReadBuffer(uint8_t* buf, int buffsize) {
382 } 382 }
383 std::this_thread::sleep_for(std::chrono::milliseconds(10)); 383 std::this_thread::sleep_for(std::chrono::milliseconds(10));
384 count++; 384 count++;
385 - if (count >= 3000) {  
386 - // 只等待30s 385 + if (count >= m_buffer_waiting_time) {
  386 + // 等待
387 return AVERROR(EIO); 387 return AVERROR(EIO);
388 } 388 }
389 } 389 }
@@ -398,27 +398,22 @@ int DvppRtpDecoder::ReadBuffer(uint8_t* buf, int buffsize) { @@ -398,27 +398,22 @@ int DvppRtpDecoder::ReadBuffer(uint8_t* buf, int buffsize) {
398 } 398 }
399 399
400 bool DvppRtpDecoder::probe() { 400 bool DvppRtpDecoder::probe() {
  401 +
  402 + m_buffer_waiting_time = 3000; //probe只等待30s,避免卡主任务添加
401 403
402 // todo: 此处可能有泄露 404 // todo: 此处可能有泄露
403 unsigned char* avioBuff = (unsigned char*)av_malloc(MAX_RTP_BUFFER_SIZE); 405 unsigned char* avioBuff = (unsigned char*)av_malloc(MAX_RTP_BUFFER_SIZE);
404 AVIOContext *ioCtx = avio_alloc_context(avioBuff, sizeof(avioBuff), 0, this, avio_read_packet, NULL, NULL); 406 AVIOContext *ioCtx = avio_alloc_context(avioBuff, sizeof(avioBuff), 0, this, avio_read_packet, NULL, NULL);
405 - //探测流(获取码流格式)  
406 - AVInputFormat* inputFmt = nullptr;  
407 - int ret = av_probe_input_buffer(ioCtx, &inputFmt, "", NULL, 0, 0);  
408 - if (ret < 0){  
409 - LOG_ERROR("av_probe_input_buffer error: {}", ret);  
410 - return false;  
411 - }  
412 407
413 do{ 408 do{
414 fmt_ctx = avformat_alloc_context(); 409 fmt_ctx = avformat_alloc_context();
415 410
416 - fmt_ctx->probesize = 10000000;//5 000 000  
417 - fmt_ctx->flags |= AVFMT_FLAG_NOBUFFER; 411 + // fmt_ctx->probesize = 10000000;//5 000 000
  412 + // fmt_ctx->flags |= AVFMT_FLAG_NOBUFFER;
418 av_opt_set(fmt_ctx->priv_data,"preset","ultrafast",0); 413 av_opt_set(fmt_ctx->priv_data,"preset","ultrafast",0);
419 414
420 //AV_TIME_BASE = 1000 000 415 //AV_TIME_BASE = 1000 000
421 - fmt_ctx->max_analyze_duration = 90 * AV_TIME_BASE; 416 + fmt_ctx->max_analyze_duration = 100 * AV_TIME_BASE;
422 417
423 fmt_ctx->pb = ioCtx; 418 fmt_ctx->pb = ioCtx;
424 419
@@ -428,9 +423,11 @@ bool DvppRtpDecoder::probe() { @@ -428,9 +423,11 @@ bool DvppRtpDecoder::probe() {
428 av_dict_set( &net_options, "bufsize", "655360", 0 ); 423 av_dict_set( &net_options, "bufsize", "655360", 0 );
429 av_dict_set( &net_options, "stimeout", "30000000", 0 ); // 单位为 百万分之一秒 424 av_dict_set( &net_options, "stimeout", "30000000", 0 ); // 单位为 百万分之一秒
430 av_dict_set( &net_options, "max_delay", "500000", 0); //设置最大时延 425 av_dict_set( &net_options, "max_delay", "500000", 0); //设置最大时延
  426 + av_dict_set( &net_options, "probesize", "50M", 0); //设置最大时延
  427 +
431 428
432 //打开流 429 //打开流
433 - ret = avformat_open_input(&fmt_ctx, 0, inputFmt, &net_options); 430 + int ret = avformat_open_input(&fmt_ctx, 0, 0, &net_options);
434 if (ret != 0) { 431 if (ret != 0) {
435 LOG_ERROR("avformat_open_input error: {}", ret); 432 LOG_ERROR("avformat_open_input error: {}", ret);
436 break; 433 break;
@@ -565,6 +562,8 @@ void DvppRtpDecoder::read_thread() { @@ -565,6 +562,8 @@ void DvppRtpDecoder::read_thread() {
565 CHECK_AND_BREAK(aclvdecSetChannelDescOutPicFormat(vdecChannelDesc, PIXEL_FORMAT_YUV_SEMIPLANAR_420), "aclvdecSetChannelDescOutPicFormat failed"); 562 CHECK_AND_BREAK(aclvdecSetChannelDescOutPicFormat(vdecChannelDesc, PIXEL_FORMAT_YUV_SEMIPLANAR_420), "aclvdecSetChannelDescOutPicFormat failed");
566 CHECK_AND_BREAK(aclvdecCreateChannel(vdecChannelDesc), "aclvdecCreateChannel failed"); 563 CHECK_AND_BREAK(aclvdecCreateChannel(vdecChannelDesc), "aclvdecCreateChannel failed");
567 564
  565 + m_buffer_waiting_time = 30000; //read最多等待5分钟没有数据
  566 +
568 unsigned long long frame_nb = 0; 567 unsigned long long frame_nb = 0;
569 while (m_bRunning){ 568 while (m_bRunning){
570 569
src/decoder/dvpp/DvppRtpDecoder.h
@@ -158,5 +158,7 @@ private: @@ -158,5 +158,7 @@ private:
158 void* m_finishParam; 158 void* m_finishParam;
159 CallBack_DecodeFinished m_finish_cbk; // 录像流结束回调 159 CallBack_DecodeFinished m_finish_cbk; // 录像流结束回调
160 160
  161 + int m_buffer_waiting_time{3000} ;
  162 +
161 }; 163 };
162 #endif //__DVPP_RTP_DECODER_H__ 164 #endif //__DVPP_RTP_DECODER_H__
163 \ No newline at end of file 165 \ No newline at end of file
src/decoder/gb28181/rtp2/RTPReceiver2.cpp
@@ -61,6 +61,8 @@ bool RTPReceiver2::Open(string channel_id, bool isUdp) { @@ -61,6 +61,8 @@ bool RTPReceiver2::Open(string channel_id, bool isUdp) {
61 return false; 61 return false;
62 } 62 }
63 63
  64 + m_bRtpExit = false;
  65 +
64 bool bReq = start_server(channel_id, m_rtp_port, isUdp); 66 bool bReq = start_server(channel_id, m_rtp_port, isUdp);
65 if (!bReq) { 67 if (!bReq) {
66 LOG_INFO("[{}] start_server failed !", m_SipChannelId); 68 LOG_INFO("[{}] start_server failed !", m_SipChannelId);
@@ -91,7 +93,6 @@ void RTPReceiver2::Close(){ @@ -91,7 +93,6 @@ void RTPReceiver2::Close(){
91 bool RTPReceiver2::start_server(string channel_id, int port, bool isUdp) { 93 bool RTPReceiver2::start_server(string channel_id, int port, bool isUdp) {
92 WebsocketClient* pClient = WebsocketClient::getInstance(); 94 WebsocketClient* pClient = WebsocketClient::getInstance();
93 if (pClient){ 95 if (pClient){
94 -  
95 if (isUdp) { 96 if (isUdp) {
96 m_server_thread = new std::thread([](void* arg) { 97 m_server_thread = new std::thread([](void* arg) {
97 RTPReceiver2* a=(RTPReceiver2*)arg; 98 RTPReceiver2* a=(RTPReceiver2*)arg;
@@ -125,7 +126,7 @@ int RTPReceiver2::udp_server() { @@ -125,7 +126,7 @@ int RTPReceiver2::udp_server() {
125 126
126 LOG_INFO("udp {}",port); 127 LOG_INFO("udp {}",port);
127 128
128 - int server_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); //AF_INET:IPV4;SOCK_DGRAM:UDP 129 + int server_fd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, IPPROTO_UDP); //AF_INET:IPV4;SOCK_DGRAM:UDP
129 if(server_fd < 0) 130 if(server_fd < 0)
130 { 131 {
131 printf("create socket fail!\n"); 132 printf("create socket fail!\n");
@@ -146,19 +147,28 @@ int RTPReceiver2::udp_server() { @@ -146,19 +147,28 @@ int RTPReceiver2::udp_server() {
146 147
147 uint8_t recvBuf[10000]; 148 uint8_t recvBuf[10000];
148 int recvBufSize; 149 int recvBufSize;
149 -  
150 socklen_t len; 150 socklen_t len;
151 struct sockaddr_in clent_addr; //clent_addr用于记录发送方的地址信息 151 struct sockaddr_in clent_addr; //clent_addr用于记录发送方的地址信息
  152 + long long last_time = get_cur_time_second();
152 while(!m_bRtpExit) 153 while(!m_bRtpExit)
153 { 154 {
154 memset(recvBuf, 0, sizeof(recvBuf)); 155 memset(recvBuf, 0, sizeof(recvBuf));
155 len = sizeof(clent_addr); 156 len = sizeof(clent_addr);
156 recvBufSize = recvfrom(server_fd, recvBuf, sizeof(recvBuf), 0, (struct sockaddr*)&clent_addr, &len); //recvfrom是拥塞函数,没有数据就一直拥塞 157 recvBufSize = recvfrom(server_fd, recvBuf, sizeof(recvBuf), 0, (struct sockaddr*)&clent_addr, &len); //recvfrom是拥塞函数,没有数据就一直拥塞
157 if(recvBufSize <= 0) { 158 if(recvBufSize <= 0) {
158 - LOG_ERROR("recieve data fail!");  
159 - break; 159 + long long cur_time = get_cur_time_second();
  160 + if (cur_time - last_time > 300) {
  161 + // 5分钟未能获取数据,则退出
  162 + m_bRtpExit = true;
  163 + break;
  164 + }
  165 +
  166 + std::this_thread::sleep_for(std::chrono::milliseconds(2));
  167 + continue;
160 } 168 }
161 169
  170 + last_time = get_cur_time_second();
  171 +
162 // buffer 抛出 172 // buffer 抛出
163 m_buffer_cbk(m_bufferParam, recvBuf, recvBufSize, 0); 173 m_buffer_cbk(m_bufferParam, recvBuf, recvBufSize, 0);
164 } 174 }