RTPTcpReceiver.cpp 7.23 KB
#include"RTPTcpReceiver.h"
#include "../logger.hpp"


static long long get_cur_time() {

	chrono::time_point<chrono::system_clock, chrono::milliseconds> tpMicro
		= chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now());

	return tpMicro.time_since_epoch().count();
}

// class TcpRTPSession : public RTPSession
// {
// public:
//     void setReceiver(RTPTcpReceiver* r){
//         tcpReceiver = r;
//     }

// protected:
// 	void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled)
// 	{
// 		// printf("SSRC %x Got packet (%d bytes) in OnValidatedRTPPacket from source 0x%04x!\n", GetLocalSSRC(), 
// 		// 	   (int)rtppack->GetPayloadLength(), srcdat->GetSSRC());

//         LOG_DEBUG("SSRC {} Got packet ({} bytes) in OnValidatedRTPPacket from source {}}!\n", GetLocalSSRC(), 
// 			   (int)rtppack->GetPayloadLength(), srcdat->GetSSRC());

//         if(nullptr != tcpReceiver){
//             tcpReceiver->ParsePacket(rtppack);
//         }
// 		DeletePacket(rtppack);
// 		*ispackethandled = true;
// 	}

// 	void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength)
// 	{
// 		char msg[1024];

// 		memset(msg, 0, sizeof(msg));
// 		if (itemlength >= sizeof(msg))
// 			itemlength = sizeof(msg)-1;

// 		memcpy(msg, itemdata, itemlength);
// 		// printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC());
//         LOG_DEBUG("SSRC {} Received SDES item ({}): {} from SSRC {}\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC());
// 	}

// private:
//     RTPTcpReceiver* tcpReceiver{nullptr};
// };

class MyTCPTransmitter : public RTPTCPTransmitter
{
public:
    void setReceiver(RTPTcpReceiver* r){
        tcpReceiver = r;
    }

public:
	MyTCPTransmitter() : RTPTCPTransmitter(0){ }

	void OnSendError(SocketType sock)
	{
        LOG_ERROR("Error sending over socket {}, removing destination", sock);
		DeleteDestination(RTPTCPAddress(sock));
        if(nullptr != tcpReceiver && !tcpReceiver->isClosing()){
            tcpReceiver->RequestStream();
        }
	}
	
	void OnReceiveError(SocketType sock)
	{
        LOG_ERROR("Error receiving over socket {}, removing destination", sock);
		DeleteDestination(RTPTCPAddress(sock));
	}

private:
    RTPTcpReceiver* tcpReceiver{nullptr};
};

static int rtp_revc_thread_(void* param)
{
	if (!param)
	{
		return -1;
	}

	RTPTcpReceiver* self = (RTPTcpReceiver*)param;
	return self->OnRtpRecv();
}


RTPTcpReceiver::RTPTcpReceiver()
: m_bRtpExit(false)
, m_bOpened(false)
, m_idleCount(-1)
, m_noDataCount(-1)
, m_nListener(-1)
, m_bAccepted(false)
, m_bClosing(false)
{
    m_rtpSessionPtr = new RTPSession();
    m_pSessparams = new RTPSessionParams();
    m_pTrans = new MyTCPTransmitter();
}

RTPTcpReceiver::~RTPTcpReceiver(){
    if (IsOpened())
		Close();

    if(m_rtpSessionPtr != nullptr){
        delete m_rtpSessionPtr;
        m_rtpSessionPtr = nullptr;
    }

    if(m_pSessparams != nullptr){
        delete m_pSessparams;
        m_pSessparams = nullptr;
    }

    if(m_pTrans != nullptr){
        delete m_pTrans;
        m_pTrans = nullptr;
    }
}

bool RTPTcpReceiver::Open(uint16_t localPort){
    if(0 != initSession(localPort)){
        return false;
    }
    
    m_bOpened = true;

    LOG_INFO("[{}] started.", m_deviceID);

    return true;
}

bool RTPTcpReceiver::IsOpened(){
    LOG_INFO("[{}] isopng:{} ", m_deviceID, m_bOpened);
    return m_bOpened;
}

void RTPTcpReceiver::Close(){

    m_bClosing = true;

    m_bAccepted = true;
    m_bRtpExit = true;

    LOG_DEBUG("[{}] 1.", m_deviceID);
    
    // rtp接收线程退出
    if (m_rtpThread.joinable())
    {
        m_rtpThread.join();
    }

    LOG_DEBUG("[{}] 2.", m_deviceID);

	ClosePsThread();

    m_bOpened = false;
    
    LOG_INFO("[{}] closed.", m_deviceID);
}

bool RTPTcpReceiver::isClosing(){
    return m_bClosing;
}

int RTPTcpReceiver::initSession(int localPort){
    m_nListener = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
	if (m_nListener < 0)
	{
		return  -1;
	}
 
	sockaddr_in   serverAddr;
	memset(&serverAddr, 0, sizeof(sockaddr_in));
	serverAddr.sin_family = AF_INET;
	serverAddr.sin_port = htons(localPort);
	serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
	int  nRet = bind(m_nListener, (sockaddr*)&serverAddr, sizeof(serverAddr));
	if (nRet == -1)
	{
        LOG_ERROR("[{}] 绑定端口失败: {}", m_deviceID, localPort);
		return  -1;
	}

	if (listen(m_nListener, 1) == -1)
	{
        LOG_ERROR("[{}] listen 失败", m_deviceID);
		return  -1;
	}

	int  nPackSize = 45678;
	m_pSessparams->SetProbationType(RTPSources::NoProbation);
	m_pSessparams->SetOwnTimestampUnit(90000.0 / 25.0);
	m_pSessparams->SetMaximumPacketSize(nPackSize + 64);

	int status = m_pTrans->Init(false);
	status = m_pTrans->Create(65535, NULL);
    m_pTrans->setReceiver(this);

	status = m_rtpSessionPtr->Create(*m_pSessparams, m_pTrans);
	if (status < 0)
	{
        LOG_ERROR("[{}] create session error!!", m_deviceID);
		return -1;
	}

    m_rtpThread = std::thread(rtp_revc_thread_, this);

    InitPS();

    bool bRet = RequestStream();
    if (!bRet)
    {
        LOG_INFO("[{}] 请求流失败!", m_deviceID);
        return -1;
    }

    LOG_INFO("[{}] 初始化成功, congratulations !!!", m_deviceID);

    return 0;
}

int RTPTcpReceiver::OnRtpRecv()
{
    if(nullptr == m_rtpSessionPtr){
        return -1;
    }

    LOG_INFO("[{}] OnRtpRecv started, m_nListener : {}", m_deviceID, m_nListener);

    sockaddr_in   clientAddr;
	int  nLen = sizeof(sockaddr_in);
	SocketType nServer = -1;
    
    LOG_INFO("[{}] Poll started.", m_deviceID);
    int status = -1;
    while(!m_bRtpExit){
        while(!m_bAccepted){
            LOG_DEBUG("[{}] accepting...", m_deviceID);
            nServer = accept(m_nListener, (sockaddr*)&clientAddr, (socklen_t * ) &nLen);
            if (-1 == nServer){
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
                continue;
            }
            m_rtpSessionPtr->AddDestination(RTPTCPAddress(nServer));
            m_bAccepted = true;

            LOG_INFO("[{}] nServer={}", m_deviceID, nServer);
            break;
        }

        m_rtpSessionPtr->BeginDataAccess();
        if (m_rtpSessionPtr->GotoFirstSourceWithData())
        {
            do
            {
                RTPPacket *pack;
                
                while ((pack = m_rtpSessionPtr->GetNextPacket()) != NULL)
                {
                    LOG_DEBUG("[{}] time: {} ", m_deviceID, get_cur_time());
                    ParsePacket(pack);
                    
                    m_rtpSessionPtr->DeletePacket(pack);
                }
            } while (m_rtpSessionPtr->GotoNextSourceWithData());
        }
        
        m_rtpSessionPtr->EndDataAccess();

        m_rtpSessionPtr->Poll();
		std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }

    m_rtpSessionPtr->Destroy();

    if(nServer > 0){
        close(nServer);
    }
    if(m_nListener > 0){
        close(m_nListener);
    }

    LOG_INFO("[{}] OnRtpRecv exited.", m_deviceID);

    return 0;
}

bool RTPTcpReceiver::RequestStream(){
    bool bConnect = m_callback_request_stream();
    if(!bConnect){
        Close();
        return false;
    }
    m_bAccepted = false;

    return true;
}