diff --git a/src/decoder/gb28181/rtp/RTPUdpReceiver.cpp b/src/decoder/gb28181/rtp/RTPUdpReceiver.cpp index f05f838..69931bf 100644 --- a/src/decoder/gb28181/rtp/RTPUdpReceiver.cpp +++ b/src/decoder/gb28181/rtp/RTPUdpReceiver.cpp @@ -62,7 +62,7 @@ static int connecting_thread_(void* param) } RTPUdpReceiver* self = (RTPUdpReceiver*)param; - return self->IsConnecting(); + return self->CheckConnecting(); } RTPUdpReceiver::RTPUdpReceiver() @@ -175,161 +175,37 @@ int RTPUdpReceiver::OnRtpRecv() return -1; } + m_bRecvExit = false; + LOG_INFO("[{}] OnRtpRecv started.", m_SipChannelId); - while (!m_bRtpExit) + while (!m_bRecvExit) { - //try - //{ - m_rtpSessionPtr->Poll(); - m_rtpSessionPtr->BeginDataAccess(); - - if (m_rtpSessionPtr->GotoFirstSourceWithData()) + m_rtpSessionPtr->Poll(); + m_rtpSessionPtr->BeginDataAccess(); + + if (m_rtpSessionPtr->GotoFirstSourceWithData()) + { + // LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_SipChannelId); + last_recv_ts = UtilTools::get_cur_time_ms(); + m_idleCount = 0; + m_noDataCount = 0; + do { - // LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_SipChannelId); - last_recv_ts = UtilTools::get_cur_time_ms(); - m_idleCount = 0; - m_noDataCount = 0; - do + RTPPacket* packet; + while ((packet = m_rtpSessionPtr->GetNextPacket()) != NULL) { - RTPPacket* packet; - while ((packet = m_rtpSessionPtr->GetNextPacket()) != NULL) - { - m_bNoData = false; - // LOG_INFO("OnRtpRecv GetNextPacket --{}", m_SipChannelId); - int ret = ParsePacket(packet); - m_rtpSessionPtr->DeletePacket(packet); - - if(ret != 0){ - m_bRtpExit = true; - } + m_bNoData = false; + // LOG_INFO("OnRtpRecv GetNextPacket --{}", m_SipChannelId); + int ret = ParsePacket(packet); + m_rtpSessionPtr->DeletePacket(packet); + + if(ret != 0){ + m_bRecvExit = true; } - } while (m_rtpSessionPtr->GotoNextSourceWithData()); - } - //else { - // if (m_idleCount != -1) - // { - // ++m_idleCount;//流中断计数 - // } - // if (m_noDataCount != 0) - // { - // --m_noDataCount;//没流计数 - // } - // //if (m_idleCount > 3000) { - // // m_hVodEndFunc(m_usrParam); - // // m_idleCount = 0; - // //历史流结束的时候,也会出现超时,这个是正常的 - // if(m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD) - // { - // if (m_idleCount > 10000) - // { - // //这里要判断下历史流是否结束,如果未结束,就设置为流中断 - // //由于record_stream_status这个函数返回值不准确,所以增加一个进度条大于80% - // if(record_stream_status(((VideoSession *)GetUsrParam())->streamHandle())) - // { - // LOG_INFO("************Record stream is finished**{}**m_progress = {}********", m_SipChannelId, ((VideoSession *)GetUsrParam())->progress()); - // m_idleCount = -1; - // m_hVodEndFunc(m_usrParam); - // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); - // ((VideoSession *)GetUsrParam())->streamHandle().clear(); - // } - // else - // { - // //如果此时进度大于80% 算完成吧 - // if(((VideoSession *)GetUsrParam())->progress() > 0.80) - // { - // LOG_INFO("************Record stream is overtime**{}**m_progress = {}********", m_SipChannelId, ((VideoSession *)GetUsrParam())->progress()); - - // m_idleCount = 0; - // m_hVodEndFunc(m_usrParam); - // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); - // ((VideoSession *)GetUsrParam())->streamHandle().clear(); - // } - // else - // { - // m_idleCount = -1; - // //LOG_ERROR("************post ERROR_REALSTREAM_INTERRUPT to structure****{}********", m_SipChannelId); - // //发送流中断 - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption!"); - // } - // } - // - // - // } - // - // if (m_noDataCount < -200000)//任务开始时没收到流 - // { - // //LOG_ERROR("************m_hVodEndFunc(m_usrParam)!!!m_hVodEndFunc(m_usrParam)********{}******", m_SipChannelId); - // m_noDataCount = -1; - - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption2!"); - // //m_hVodEndFunc(m_usrParam); - // } - // } - // else//实时任务断流 - // //if (m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) - // { - // - // //每超过3000次,发送一次send_vedio_eof 时长大约1.5s - // //若是30000,时长大约 18s - // if(m_idleCount > 30000) - // { - // uint64_t cts = UtilTools::get_cur_time_ms(); - // float duration_not_recv = (cts - last_recv_ts) / 1000.0; - // - // //LOG_ERROR("************I haven't got stream from hik gateway exceed {}s,send eof********{}******", duration_not_recv, m_SipChannelId); - // m_idleCount = -1; - - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption!"); - // } - // - // if (m_noDataCount < -200000)//任务开始时没收到流 - // { - // //LOG_ERROR("************m_noDataCount < -200000********{}******", m_SipChannelId); - // m_noDataCount = -1; - - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption2!"); - // } - // - // } - //} - //} - // catch (GeneralException2& e) - //{ - // //LOG_ERROR("---> video streaming interruption!<---{}, error: {}", m_SipChannelId, e.err_msg()); - - // byte_buffer bb(64); - // bb << VasCmd::VAS_CMD_REALSTREAM_INTERRUPT << e.err_msg(); - - // if (m_usrParam) - // { - // if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) { - // try { - // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size()); - // } - // catch (GeneralException2& e) { - // //LOG_ERROR("[{}] send vas cmd VAS_CMD_REALSTREAM_INTERRUPT error: {}, {}", m_SipChannelId, e.err_code(), e.err_str()); - // } - // } - - // //通知网关关闭句柄 - // if(!((VideoSession *)GetUsrParam())->streamHandle().empty()) - // { - - // LOG_INFO("---->Notify hisense gateway release handle = {} !<----{}", ((VideoSession *)GetUsrParam())->streamHandle().c_str(), m_SipChannelId); - // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) - // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); - // - // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD) - // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); - // - // //清理保活的句柄 - // ((VideoSession *)GetUsrParam())->streamHandle().clear(); - // } - // } - // - // bb.bset(0); - // - //} + } + } while (m_rtpSessionPtr->GotoNextSourceWithData()); + } + m_rtpSessionPtr->EndDataAccess(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -351,24 +227,24 @@ bool RTPUdpReceiver::RequestStream() { return true; } -int RTPUdpReceiver::IsConnecting() { - LOG_INFO("[{}] IsConnecting started.", m_SipChannelId); +int RTPUdpReceiver::CheckConnecting() { + LOG_INFO("[{}] CheckConnecting started.", m_SipChannelId); int count = 0; while (!m_bRtpExit) { if (m_bNoData) { - bool bReq = RequestStream(); - if (!bReq) { - LOG_INFO("[{}] RequestStream failed !", m_SipChannelId); - } + // bool bReq = RequestStream(); + // if (!bReq) { + // LOG_INFO("[{}] RequestStream failed !", m_SipChannelId); + // } - wait_times(30); // 等待3s + wait_times(50); // 等待5s count++; - if (count > 10) { - // 30s 依然没数据过来,则关闭 + if (count > 60) { + // 3min 依然没数据过来,则关闭 m_bRtpExit = true; break; } @@ -379,7 +255,7 @@ int RTPUdpReceiver::IsConnecting() { } } - LOG_DEBUG("[{}] while", m_SipChannelId); + m_bRecvExit = true; // 结束整个任务 WebsocketClient* pClient = WebsocketClient::getInstance(); @@ -397,17 +273,13 @@ int RTPUdpReceiver::IsConnecting() { m_rtpThreadPtr = nullptr; } - LOG_DEBUG("[{}] m_rtpThreadPtr", m_SipChannelId); - m_rtpSessionPtr->Destroy(); - LOG_DEBUG("[{}] m_rtpSessionPtr", m_SipChannelId); - ClosePsThread(); m_bOpened = false; - LOG_INFO("[{}] IsConnecting exited.", m_SipChannelId); + LOG_INFO("[{}] CheckConnecting exited.", m_SipChannelId); return 0; } diff --git a/src/decoder/gb28181/rtp/RTPUdpReceiver.h b/src/decoder/gb28181/rtp/RTPUdpReceiver.h index 835e466..37675ad 100644 --- a/src/decoder/gb28181/rtp/RTPUdpReceiver.h +++ b/src/decoder/gb28181/rtp/RTPUdpReceiver.h @@ -33,7 +33,7 @@ public: public: int OnRtpRecv(); - int IsConnecting(); + int CheckConnecting(); private: bool wait_times(int times); @@ -56,6 +56,7 @@ private: string m_sip_channel_id; std::atomic_bool m_bNoData{false}; + bool m_bRecvExit{false}; }; #endif // _RTP_UDP_RECEIVER_H_