#include "RTPUdpReceiver.h" #include #include #include #include #include "../common_header.h" #include "../websocket/WebsocketClient.h" using namespace std; #define BUFFERSIZE_1024 4096 #define BUFFERSIZE_GAP 4096//5120 //1024*5 namespace { const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2; const int kRtpRecvBufferSize = BUFFERSIZE_1024*BUFFERSIZE_1024*2; const uint16_t kInvalidPort = 0; }; // namespace class UdpRTPSession : public RTPSession { public: UdpRTPSession() {} virtual ~UdpRTPSession() {} private: virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress) { AddDestination(*senderAddress); } virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress) { //AddDestination(*senderaddress); //const char* name = "hi~"; //SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8); //printf("send rtcp app"); } }; static int rtp_revc_thread_(void* param) { if (!param) { return -1; } RTPUdpReceiver* self = (RTPUdpReceiver*)param; return self->OnRtpRecv(); } static int connecting_thread_(void* param) { if (!param) { return -1; } RTPUdpReceiver* self = (RTPUdpReceiver*)param; return self->CheckConnecting(); } RTPUdpReceiver::RTPUdpReceiver() :m_bOpened(false) , m_idleCount(-1) ,m_noDataCount(-1) { m_bRtpExit = false; m_sessparamsPtr = new RTPSessionParams(); m_transparamsPtr = new RTPUDPv4TransmissionParams(); m_rtpSessionPtr = new UdpRTPSession(); } RTPUdpReceiver::~RTPUdpReceiver() { m_bRtpExit = true; if(nullptr != m_sessparamsPtr){ delete m_sessparamsPtr; m_sessparamsPtr = nullptr; } if(nullptr != m_transparamsPtr){ delete m_transparamsPtr; m_transparamsPtr = nullptr; } if (nullptr != m_connThreadPtr && m_connThreadPtr->joinable()) { m_connThreadPtr->join(); delete m_connThreadPtr; m_connThreadPtr = nullptr; } if(nullptr != m_rtpSessionPtr){ delete m_rtpSessionPtr; m_rtpSessionPtr = nullptr; } } bool RTPUdpReceiver::Open(string channel_id) { m_SipChannelId = channel_id; m_rtp_port = allocRtpPort(); if (m_rtp_port < 0) { return false; } m_sessparamsPtr->SetUsePollThread(true); m_sessparamsPtr->SetMinimumRTCPTransmissionInterval(10); m_sessparamsPtr->SetOwnTimestampUnit(1.0/90000.0); m_sessparamsPtr->SetAcceptOwnPackets(true); m_transparamsPtr->SetPortbase(m_rtp_port); m_transparamsPtr->SetRTPReceiveBuffer(kRtpRecvBufferSize); LOG_INFO("[{}] port: {}", m_SipChannelId, m_rtp_port); int err = m_rtpSessionPtr->Create(*m_sessparamsPtr, m_transparamsPtr); if (err != 0) { LOG_ERROR("[{}] Create error: {}", m_SipChannelId, err); return false; } m_rtpThreadPtr = new std::thread(rtp_revc_thread_, this); if (nullptr == m_rtpThreadPtr) { LOG_ERROR("[{}] Create m_rtpThreadPtr error", m_SipChannelId); return false; } if (InitPS() != 0) { return false; } // InitPS()成功就得起该线程,因为ClosePsThread是在这里完成的 m_connThreadPtr = new std::thread(connecting_thread_, this); if (nullptr == m_connThreadPtr) { LOG_ERROR("[{}] Create m_connThreadPtr error", m_SipChannelId); return false; } bool bReq = RequestStream(); if (!bReq) { LOG_INFO("[{}] RequestStream failed !", m_SipChannelId); Close(); return false; } m_bOpened = true; m_bNoData = false; LOG_INFO("[{}] Open ok", m_SipChannelId); return true; } bool RTPUdpReceiver::IsOpened() { return m_bOpened; } void RTPUdpReceiver::Close() { m_bRtpExit = true; } // 收RTP包线程 int RTPUdpReceiver::OnRtpRecv() { if(nullptr == m_rtpSessionPtr){ return -1; } m_bRecvExit = false; LOG_INFO("[{}] OnRtpRecv started.", m_SipChannelId); while (!m_bRecvExit) { m_rtpSessionPtr->Poll(); m_rtpSessionPtr->BeginDataAccess(); if (m_rtpSessionPtr->GotoFirstSourceWithData()) { // LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_SipChannelId); last_recv_ts = UtilTools::get_cur_time_ms(); m_idleCount = 0; m_noDataCount = 0; do { RTPPacket* packet; while ((packet = m_rtpSessionPtr->GetNextPacket()) != NULL) { m_bNoData = false; // LOG_INFO("OnRtpRecv GetNextPacket --{}", m_SipChannelId); int ret = ParsePacket(packet); m_rtpSessionPtr->DeletePacket(packet); if(ret != 0){ m_bRecvExit = true; } } } while (m_rtpSessionPtr->GotoNextSourceWithData()); } m_rtpSessionPtr->EndDataAccess(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } LOG_INFO("[{}] OnRtpRecv exited.", m_SipChannelId); return 0; } bool RTPUdpReceiver::RequestStream() { WebsocketClient* pClient = WebsocketClient::getInstance(); if (pClient){ if (pClient->InviteUdp(m_SipChannelId, m_rtp_port, this) < 0) { return false; } } return true; } int RTPUdpReceiver::CheckConnecting() { LOG_INFO("[{}] CheckConnecting started.", m_SipChannelId); int count = 0; while (!m_bRtpExit) { if (m_bNoData) { // bool bReq = RequestStream(); // if (!bReq) { // LOG_INFO("[{}] RequestStream failed !", m_SipChannelId); // } wait_times(50); // 等待5s count++; if (count > 60) { // 3min 依然没数据过来,则关闭 m_bRtpExit = true; break; } } else { m_bNoData = true; wait_times(100); // 等待10s, 10s之内正常有数据的情况 m_bNoData 已经被置为false } } m_bRecvExit = true; // 结束整个任务 WebsocketClient* pClient = WebsocketClient::getInstance(); if (pClient){ pClient->ByeInvite(m_SipChannelId, m_rtp_port); } LOG_DEBUG("[{}] ByeInvite", m_SipChannelId); // rtp接收线程退出 if (nullptr != m_rtpThreadPtr && m_rtpThreadPtr->joinable()) { m_rtpThreadPtr->join(); delete m_rtpThreadPtr; m_rtpThreadPtr = nullptr; } m_rtpSessionPtr->Destroy(); ClosePsThread(); m_bOpened = false; LOG_INFO("[{}] CheckConnecting exited.", m_SipChannelId); return 0; } // 对退出命令敏感的延时 bool RTPUdpReceiver::wait_times(int times) { int count_sleep = times; while (!m_bRtpExit) { count_sleep-- ; if (count_sleep <= 0) { count_sleep = times; break; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }