RTPUdpReceiver.cpp 6.19 KB

#include "RTPUdpReceiver.h"
#include <iostream>
#include <time.h>

#include <thread>
#include <chrono>

#include "../common_header.h"
#include "../websocket/WebsocketClient.h"


using namespace std;

#define BUFFERSIZE_1024     4096
#define BUFFERSIZE_GAP      4096//5120 //1024*5

namespace 
{
	const int kVideoFrameSize         = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2;
	const int kRtpRecvBufferSize      = BUFFERSIZE_1024*BUFFERSIZE_1024*2;
	const uint16_t kInvalidPort       = 0;
}; // namespace

class UdpRTPSession : public RTPSession
{
public:
	UdpRTPSession() {}
	virtual ~UdpRTPSession() {}

private:
	virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress)
	{
		AddDestination(*senderAddress);
	}

	virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress)
	{
		//AddDestination(*senderaddress);
		//const char* name = "hi~";
		//SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8);

		//printf("send rtcp app");
	}
};

static int rtp_revc_thread_(void* param)
{
	if (!param)
	{
		return -1;
	}

	RTPUdpReceiver* self = (RTPUdpReceiver*)param;
	return self->OnRtpRecv();
}

static int connecting_thread_(void* param)
{
	if (!param) {
		return -1;
	}

	RTPUdpReceiver* self = (RTPUdpReceiver*)param;
	return self->CheckConnecting();
}

RTPUdpReceiver::RTPUdpReceiver() 
:m_bOpened(false)
, m_idleCount(-1)
,m_noDataCount(-1)
{
	m_bRtpExit = false;
	m_sessparamsPtr = new RTPSessionParams();
	m_transparamsPtr = new RTPUDPv4TransmissionParams();
	m_rtpSessionPtr = new UdpRTPSession();
}

RTPUdpReceiver::~RTPUdpReceiver()
{
	m_bRtpExit = true;

	if(nullptr != m_sessparamsPtr){
		delete m_sessparamsPtr;
		m_sessparamsPtr = nullptr;
	}

	if(nullptr != m_transparamsPtr){
		delete m_transparamsPtr;
		m_transparamsPtr = nullptr;
	}

	if (nullptr != m_connThreadPtr && m_connThreadPtr->joinable()) {
        m_connThreadPtr->join();
		delete m_connThreadPtr;
		m_connThreadPtr = nullptr;
    }

	if(nullptr != m_rtpSessionPtr){
		delete m_rtpSessionPtr;
		m_rtpSessionPtr = nullptr;
	}
}

bool RTPUdpReceiver::Open(string channel_id)
{
	m_SipChannelId = channel_id;

	m_rtp_port = allocRtpPort();
	if (m_rtp_port < 0) {
		return false;
	}

	m_sessparamsPtr->SetUsePollThread(true);
	m_sessparamsPtr->SetMinimumRTCPTransmissionInterval(10);
	m_sessparamsPtr->SetOwnTimestampUnit(1.0/90000.0);
	m_sessparamsPtr->SetAcceptOwnPackets(true);
	
	m_transparamsPtr->SetPortbase(m_rtp_port);
	m_transparamsPtr->SetRTPReceiveBuffer(kRtpRecvBufferSize);

    LOG_INFO("[{}] port: {}", m_SipChannelId, m_rtp_port);
	
	int err = m_rtpSessionPtr->Create(*m_sessparamsPtr, m_transparamsPtr);
	if (err != 0) {	
		LOG_ERROR("[{}] Create error: {}", m_SipChannelId, err);
		return false;
	}

	m_rtpThreadPtr = new std::thread(rtp_revc_thread_, this);
	if (nullptr == m_rtpThreadPtr) {
		LOG_ERROR("[{}] Create m_rtpThreadPtr error", m_SipChannelId);
		return false;
	}

	if (InitPS() != 0) {
		return false;
	}

	// InitPS()成功就得起该线程,因为ClosePsThread是在这里完成的
	m_connThreadPtr = new std::thread(connecting_thread_, this);
	if (nullptr == m_connThreadPtr) {
		LOG_ERROR("[{}] Create m_connThreadPtr error", m_SipChannelId);
		return false;
	}

	bool bReq = RequestStream();
	if (!bReq) {
		LOG_INFO("[{}] RequestStream failed !", m_SipChannelId);
		Close();
		return false;
	}

	m_bOpened = true;
	m_bNoData = false;
    LOG_INFO("[{}] Open ok", m_SipChannelId);

	return true;
}

bool RTPUdpReceiver::IsOpened()
{
	return m_bOpened;
}

void RTPUdpReceiver::Close()
{
    m_bRtpExit = true;
}

// 收RTP包线程
int RTPUdpReceiver::OnRtpRecv()
{
	if(nullptr == m_rtpSessionPtr){
		return -1;
	}

	m_bRecvExit = false;

	LOG_INFO("[{}] OnRtpRecv started.", m_SipChannelId);
	while (!m_bRecvExit)
	{
		m_rtpSessionPtr->Poll();
		m_rtpSessionPtr->BeginDataAccess();
		
		if (m_rtpSessionPtr->GotoFirstSourceWithData())
		{
			// LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_SipChannelId);
			last_recv_ts = UtilTools::get_cur_time_ms();
			m_idleCount = 0;
			m_noDataCount = 0;
			do
			{
				RTPPacket* packet;
				while ((packet = m_rtpSessionPtr->GetNextPacket()) != NULL) 
				{
					m_bNoData = false;
					// LOG_INFO("OnRtpRecv GetNextPacket --{}", m_SipChannelId);
					int ret = ParsePacket(packet);
					m_rtpSessionPtr->DeletePacket(packet);
					
					if(ret != 0){
						m_bRecvExit = true;
					}
				}
			} while (m_rtpSessionPtr->GotoNextSourceWithData());
		}

		m_rtpSessionPtr->EndDataAccess();

		std::this_thread::sleep_for(std::chrono::milliseconds(10));
	}

	LOG_INFO("[{}] OnRtpRecv exited.", m_SipChannelId);

	return 0;
}

bool RTPUdpReceiver::RequestStream() {
	WebsocketClient* pClient = WebsocketClient::getInstance();
	if (pClient){
		if (pClient->InviteUdp(m_SipChannelId, m_rtp_port, this) < 0) {
			return false;
		}
	}

	return true;
}

int RTPUdpReceiver::CheckConnecting() {
	LOG_INFO("[{}] CheckConnecting started.", m_SipChannelId);

	int count = 0;
	while (!m_bRtpExit)
	{
		if (m_bNoData) {
			// bool bReq = RequestStream();
			// if (!bReq) {
			// 	LOG_INFO("[{}] RequestStream failed !", m_SipChannelId);
			// }

			wait_times(50); // 等待5s
			
			count++;

			if (count > 60) {
				// 3min 依然没数据过来,则关闭
				m_bRtpExit = true;
				break;
			}
		} else {
			m_bNoData = true;

			wait_times(100);	// 等待10s, 10s之内正常有数据的情况 m_bNoData 已经被置为false
		}
	}

	m_bRecvExit = true;

	// 结束整个任务
	WebsocketClient* pClient = WebsocketClient::getInstance();
	if (pClient){
		pClient->ByeInvite(m_SipChannelId, m_rtp_port);
	}

	LOG_DEBUG("[{}] ByeInvite", m_SipChannelId);
    
    // rtp接收线程退出
    if (nullptr != m_rtpThreadPtr && m_rtpThreadPtr->joinable())
    {
        m_rtpThreadPtr->join();
		delete m_rtpThreadPtr;
		m_rtpThreadPtr = nullptr;
    }

    m_rtpSessionPtr->Destroy();

	ClosePsThread();

    m_bOpened = false;

	LOG_INFO("[{}] CheckConnecting exited.", m_SipChannelId);

	return 0;
}

// 对退出命令敏感的延时
bool RTPUdpReceiver::wait_times(int times) {
	int count_sleep = times;
	while (!m_bRtpExit) {
		count_sleep-- ;
		if (count_sleep <= 0) {
			count_sleep = times;
			break;
		}
		std::this_thread::sleep_for(std::chrono::milliseconds(100));
	}
}