RTPUdpReceiver.cpp 8.98 KB

#include "RTPUdpReceiver.h"
#include <iostream>
#include <time.h>

#include <thread>
#include <chrono>

#include "common_header.h"

using namespace std;

#define BUFFERSIZE_1024     4096
#define BUFFERSIZE_GAP      4096//5120 //1024*5

namespace 
{
	const int kVideoFrameSize         = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2;
	const int kRtpRecvBufferSize      = BUFFERSIZE_1024*BUFFERSIZE_1024*2;
	const uint16_t kInvalidPort       = 0;
}; // namespace

class UdpRTPSession : public RTPSession
{
public:
	UdpRTPSession() {}
	virtual ~UdpRTPSession() {}

private:
	virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress)
	{
		AddDestination(*senderAddress);
	}

	virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress)
	{
		//AddDestination(*senderaddress);
		//const char* name = "hi~";
		//SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8);

		//printf("send rtcp app");
	}
};

static int rtp_revc_thread_(void* param)
{
	if (!param)
	{
		return -1;
	}

	RTPUdpReceiver* self = (RTPUdpReceiver*)param;
	return self->OnRtpRecv();
}

RTPUdpReceiver::RTPUdpReceiver() 
: m_bRtpExit(false)
, m_bOpened(false)
, m_idleCount(-1)
,m_noDataCount(-1)
{
	m_sessparamsPtr = new RTPSessionParams();
	m_transparamsPtr = new RTPUDPv4TransmissionParams();
	m_rtpSessionPtr = new UdpRTPSession();
}

RTPUdpReceiver::~RTPUdpReceiver()
{
	if (IsOpened())
		Close();

	if(nullptr != m_sessparamsPtr){
		delete m_sessparamsPtr;
		m_sessparamsPtr = nullptr;
	}

	if(nullptr != m_transparamsPtr){
		delete m_transparamsPtr;
		m_transparamsPtr = nullptr;
	}

	if(nullptr != m_rtpSessionPtr){
		delete m_rtpSessionPtr;
		m_rtpSessionPtr = nullptr;
	}
}

bool RTPUdpReceiver::Open(uint16_t localPort)
{
	m_sessparamsPtr->SetUsePollThread(true);
	m_sessparamsPtr->SetMinimumRTCPTransmissionInterval(10);
	m_sessparamsPtr->SetOwnTimestampUnit(1.0/90000.0);
	m_sessparamsPtr->SetAcceptOwnPackets(true);
	
	m_transparamsPtr->SetPortbase(localPort);
	m_transparamsPtr->SetRTPReceiveBuffer(kRtpRecvBufferSize);

    LOG_INFO("[{}] port: {}", m_deviceID, localPort);
	
	int err = m_rtpSessionPtr->Create(*m_sessparamsPtr, m_transparamsPtr);
	if (err != 0)
	{	
		LOG_ERROR("[{}] Create error: {}", m_deviceID, err);
		return false;
	}

	m_rtpThreadPtr = new std::thread(rtp_revc_thread_, this);
	if (nullptr == m_rtpThreadPtr)
	{
		LOG_ERROR("[{}] Create m_rtpThreadPtr error", m_deviceID);
		return false;
	}
	

	if (InitPS() != 0)
	{
		return false;
	}

	m_bOpened = true;
    LOG_INFO("[{}] Open ok", m_deviceID);

	return true;
}

bool RTPUdpReceiver::IsOpened()
{
	return m_bOpened;
}

void RTPUdpReceiver::Close()
{
    m_bRtpExit = true;
    
    // rtp接收线程退出
    if (nullptr != m_rtpThreadPtr && m_rtpThreadPtr->joinable())
    {
        m_rtpThreadPtr->join();
		delete m_rtpThreadPtr;
		m_rtpThreadPtr = nullptr;
    }
    m_rtpSessionPtr->Destroy();

	ClosePsThread();

    m_bOpened = false;

	LOG_INFO("[{}] closed.", m_deviceID);
}

// 收RTP包线程
int RTPUdpReceiver::OnRtpRecv()
{
	if(nullptr == m_rtpSessionPtr){
		return -1;
	}

	LOG_INFO("[{}] OnRtpRecv started.", m_deviceID);
	while (!m_bRtpExit)
	{
		//try
		//{
			m_rtpSessionPtr->Poll();
			m_rtpSessionPtr->BeginDataAccess();
			
			if (m_rtpSessionPtr->GotoFirstSourceWithData())
			{
				LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_deviceID);
    			last_recv_ts = UtilTools::get_cur_time_ms();
				m_idleCount = 0;
				m_noDataCount = 0;
				do
				{
					RTPPacket* packet;
					while ((packet = m_rtpSessionPtr->GetNextPacket()) != NULL) 
					{
						LOG_INFO("OnRtpRecv GetNextPacket --{}", m_deviceID);
						int ret = ParsePacket(packet);
						m_rtpSessionPtr->DeletePacket(packet);
						
						if(ret != 0){
							m_bRtpExit = true;
						}
					}
				} while (m_rtpSessionPtr->GotoNextSourceWithData());
			}
			//else {
			//	if (m_idleCount != -1)
			//	{
			//		++m_idleCount;//流中断计数
			//	}
			//	if (m_noDataCount != 0)
			//	{
			//		--m_noDataCount;//没流计数
			//	}
			//	//if (m_idleCount > 3000) {
			//	//	     	m_hVodEndFunc(m_usrParam);
			//	//	    m_idleCount = 0;
			//	//历史流结束的时候,也会出现超时,这个是正常的
			//	if(m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD)
			//	{
			//		if (m_idleCount > 10000)
			//		{
			//			//这里要判断下历史流是否结束,如果未结束,就设置为流中断
			//			//由于record_stream_status这个函数返回值不准确,所以增加一个进度条大于80%
			//			if(record_stream_status(((VideoSession *)GetUsrParam())->streamHandle()))
			//			{
   // 						LOG_INFO("************Record stream is finished**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress());
			//				m_idleCount = -1;
			//				m_hVodEndFunc(m_usrParam);
   // 						record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());
   // 						((VideoSession *)GetUsrParam())->streamHandle().clear();
			//			}
			//			else 
			//			{
			//				//如果此时进度大于80% 算完成吧
			//				if(((VideoSession *)GetUsrParam())->progress() > 0.80)
			//				{
   // 							LOG_INFO("************Record stream is overtime**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress());

			//					m_idleCount = 0;
			//					m_hVodEndFunc(m_usrParam);
   // 							record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());
   // 							((VideoSession *)GetUsrParam())->streamHandle().clear();
			//				}
			//				else
			//				{
			//					m_idleCount = -1;
   // 							//LOG_ERROR("************post ERROR_REALSTREAM_INTERRUPT to structure****{}********", m_deviceID);
			//					//发送流中断
			//					//throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption!");
			//				}
			//			}
			//			 
   //                                                                         
			//		}
			//		
			//		if (m_noDataCount < -200000)//任务开始时没收到流
			//		{
   // 					//LOG_ERROR("************m_hVodEndFunc(m_usrParam)!!!m_hVodEndFunc(m_usrParam)********{}******", m_deviceID);	 
			//			m_noDataCount = -1;

			//			//throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption2!");
			//			//m_hVodEndFunc(m_usrParam);
			//		}
			//	}
			//	else//实时任务断流
			//	//if (m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL)
			//	{
			//       
			//		//每超过3000次,发送一次send_vedio_eof 时长大约1.5s
			//		//若是30000,时长大约 18s
			//		if(m_idleCount > 30000)
			//		{
   // 					uint64_t cts = UtilTools::get_cur_time_ms();
   // 					float duration_not_recv = (cts - last_recv_ts) / 1000.0;
   // 					
   // 					//LOG_ERROR("************I haven't  got stream from hik gateway exceed {}s,send eof********{}******", duration_not_recv, m_deviceID);	 
			//			m_idleCount = -1;

			//			//throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption!");
			//		} 
			//		
			//		if (m_noDataCount < -200000)//任务开始时没收到流
			//		{
   // 					//LOG_ERROR("************m_noDataCount < -200000********{}******", m_deviceID);	 
			//			m_noDataCount = -1;

			//			//throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption2!");
			//		}
			//		
			//	}
			//}
		//}
  //      catch (GeneralException2& e)
		//{
  //  		//LOG_ERROR("---> video streaming interruption!<---{}, error: {}", m_deviceID, e.err_msg());

		//	byte_buffer bb(64);
  //  		bb << VasCmd::VAS_CMD_REALSTREAM_INTERRUPT << e.err_msg();

		//	if (m_usrParam)
		//	{
  //  			if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) {
  //                  try {
  //                      ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size());
  //                  }
  //      			catch (GeneralException2& e) {
  //          			//LOG_ERROR("[{}] send vas cmd VAS_CMD_REALSTREAM_INTERRUPT error: {}, {}", m_deviceID, e.err_code(), e.err_str());
  //      			}    			
  //              }

		//		//通知网关关闭句柄
		//		if(!((VideoSession *)GetUsrParam())->streamHandle().empty())
		//		{

  //  				LOG_INFO("---->Notify hisense gateway release handle = {} !<----{}", ((VideoSession *)GetUsrParam())->streamHandle().c_str(), m_deviceID);
  //  				if (((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL)
  //      				real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());
  //        
  //  				if (((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD)
  //      				record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());
  //                                                
		//			  //清理保活的句柄
		//			  ((VideoSession *)GetUsrParam())->streamHandle().clear();
		//		}
		//	}
  //    
		//	bb.bset(0);
		//
		//}
		m_rtpSessionPtr->EndDataAccess();

		std::this_thread::sleep_for(std::chrono::milliseconds(10));
	}

	LOG_INFO("[{}] OnRtpRecv exited.", m_deviceID);

	return 0;
}