#include "RTPUdpReceiver.h" #include #include #include #include #include "common_header.h" using namespace std; #define BUFFERSIZE_1024 4096 #define BUFFERSIZE_GAP 4096//5120 //1024*5 namespace { const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2; const int kRtpRecvBufferSize = BUFFERSIZE_1024*BUFFERSIZE_1024*2; const uint16_t kInvalidPort = 0; }; // namespace class UdpRTPSession : public RTPSession { public: UdpRTPSession() {} virtual ~UdpRTPSession() {} private: virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress) { AddDestination(*senderAddress); } virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress) { //AddDestination(*senderaddress); //const char* name = "hi~"; //SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8); //printf("send rtcp app"); } }; static int rtp_revc_thread_(void* param) { if (!param) { return -1; } RTPUdpReceiver* self = (RTPUdpReceiver*)param; return self->OnRtpRecv(); } RTPUdpReceiver::RTPUdpReceiver() : m_bRtpExit(false) , m_bOpened(false) , m_idleCount(-1) ,m_noDataCount(-1) { m_sessparamsPtr = new RTPSessionParams(); m_transparamsPtr = new RTPUDPv4TransmissionParams(); m_rtpSessionPtr = new UdpRTPSession(); } RTPUdpReceiver::~RTPUdpReceiver() { if (IsOpened()) Close(); if(nullptr != m_sessparamsPtr){ delete m_sessparamsPtr; m_sessparamsPtr = nullptr; } if(nullptr != m_transparamsPtr){ delete m_transparamsPtr; m_transparamsPtr = nullptr; } if(nullptr != m_rtpSessionPtr){ delete m_rtpSessionPtr; m_rtpSessionPtr = nullptr; } } bool RTPUdpReceiver::Open(uint16_t localPort) { m_sessparamsPtr->SetUsePollThread(true); m_sessparamsPtr->SetMinimumRTCPTransmissionInterval(10); m_sessparamsPtr->SetOwnTimestampUnit(1.0/90000.0); m_sessparamsPtr->SetAcceptOwnPackets(true); m_transparamsPtr->SetPortbase(localPort); m_transparamsPtr->SetRTPReceiveBuffer(kRtpRecvBufferSize); LOG_INFO("[{}] port: {}", m_deviceID, localPort); int err = m_rtpSessionPtr->Create(*m_sessparamsPtr, m_transparamsPtr); if (err != 0) { LOG_ERROR("[{}] Create error: {}", m_deviceID, err); return false; } m_rtpThreadPtr = new std::thread(rtp_revc_thread_, this); if (nullptr == m_rtpThreadPtr) { LOG_ERROR("[{}] Create m_rtpThreadPtr error", m_deviceID); return false; } if (InitPS() != 0) { return false; } m_bOpened = true; LOG_INFO("[{}] Open ok", m_deviceID); return true; } bool RTPUdpReceiver::IsOpened() { return m_bOpened; } void RTPUdpReceiver::Close() { m_bRtpExit = true; // rtp接收线程退出 if (nullptr != m_rtpThreadPtr && m_rtpThreadPtr->joinable()) { m_rtpThreadPtr->join(); delete m_rtpThreadPtr; m_rtpThreadPtr = nullptr; } m_rtpSessionPtr->Destroy(); ClosePsThread(); m_bOpened = false; LOG_INFO("[{}] closed.", m_deviceID); } // 收RTP包线程 int RTPUdpReceiver::OnRtpRecv() { if(nullptr == m_rtpSessionPtr){ return -1; } LOG_INFO("[{}] OnRtpRecv started.", m_deviceID); while (!m_bRtpExit) { //try //{ m_rtpSessionPtr->Poll(); m_rtpSessionPtr->BeginDataAccess(); if (m_rtpSessionPtr->GotoFirstSourceWithData()) { LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_deviceID); last_recv_ts = UtilTools::get_cur_time_ms(); m_idleCount = 0; m_noDataCount = 0; do { RTPPacket* packet; while ((packet = m_rtpSessionPtr->GetNextPacket()) != NULL) { LOG_INFO("OnRtpRecv GetNextPacket --{}", m_deviceID); int ret = ParsePacket(packet); m_rtpSessionPtr->DeletePacket(packet); if(ret != 0){ m_bRtpExit = true; } } } while (m_rtpSessionPtr->GotoNextSourceWithData()); } //else { // if (m_idleCount != -1) // { // ++m_idleCount;//流中断计数 // } // if (m_noDataCount != 0) // { // --m_noDataCount;//没流计数 // } // //if (m_idleCount > 3000) { // // m_hVodEndFunc(m_usrParam); // // m_idleCount = 0; // //历史流结束的时候,也会出现超时,这个是正常的 // if(m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD) // { // if (m_idleCount > 10000) // { // //这里要判断下历史流是否结束,如果未结束,就设置为流中断 // //由于record_stream_status这个函数返回值不准确,所以增加一个进度条大于80% // if(record_stream_status(((VideoSession *)GetUsrParam())->streamHandle())) // { // LOG_INFO("************Record stream is finished**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress()); // m_idleCount = -1; // m_hVodEndFunc(m_usrParam); // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // ((VideoSession *)GetUsrParam())->streamHandle().clear(); // } // else // { // //如果此时进度大于80% 算完成吧 // if(((VideoSession *)GetUsrParam())->progress() > 0.80) // { // LOG_INFO("************Record stream is overtime**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress()); // m_idleCount = 0; // m_hVodEndFunc(m_usrParam); // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // ((VideoSession *)GetUsrParam())->streamHandle().clear(); // } // else // { // m_idleCount = -1; // //LOG_ERROR("************post ERROR_REALSTREAM_INTERRUPT to structure****{}********", m_deviceID); // //发送流中断 // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption!"); // } // } // // // } // // if (m_noDataCount < -200000)//任务开始时没收到流 // { // //LOG_ERROR("************m_hVodEndFunc(m_usrParam)!!!m_hVodEndFunc(m_usrParam)********{}******", m_deviceID); // m_noDataCount = -1; // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption2!"); // //m_hVodEndFunc(m_usrParam); // } // } // else//实时任务断流 // //if (m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) // { // // //每超过3000次,发送一次send_vedio_eof 时长大约1.5s // //若是30000,时长大约 18s // if(m_idleCount > 30000) // { // uint64_t cts = UtilTools::get_cur_time_ms(); // float duration_not_recv = (cts - last_recv_ts) / 1000.0; // // //LOG_ERROR("************I haven't got stream from hik gateway exceed {}s,send eof********{}******", duration_not_recv, m_deviceID); // m_idleCount = -1; // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption!"); // } // // if (m_noDataCount < -200000)//任务开始时没收到流 // { // //LOG_ERROR("************m_noDataCount < -200000********{}******", m_deviceID); // m_noDataCount = -1; // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption2!"); // } // // } //} //} // catch (GeneralException2& e) //{ // //LOG_ERROR("---> video streaming interruption!<---{}, error: {}", m_deviceID, e.err_msg()); // byte_buffer bb(64); // bb << VasCmd::VAS_CMD_REALSTREAM_INTERRUPT << e.err_msg(); // if (m_usrParam) // { // if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) { // try { // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size()); // } // catch (GeneralException2& e) { // //LOG_ERROR("[{}] send vas cmd VAS_CMD_REALSTREAM_INTERRUPT error: {}, {}", m_deviceID, e.err_code(), e.err_str()); // } // } // //通知网关关闭句柄 // if(!((VideoSession *)GetUsrParam())->streamHandle().empty()) // { // LOG_INFO("---->Notify hisense gateway release handle = {} !<----{}", ((VideoSession *)GetUsrParam())->streamHandle().c_str(), m_deviceID); // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD) // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // // //清理保活的句柄 // ((VideoSession *)GetUsrParam())->streamHandle().clear(); // } // } // // bb.bset(0); // //} m_rtpSessionPtr->EndDataAccess(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } LOG_INFO("[{}] OnRtpRecv exited.", m_deviceID); return 0; }