RTPReceiver2.cpp 8.18 KB
#include "RTPReceiver2.h"
#include "rtppacket.h"
#include <thread>

#include "../common_header.h"
#include "../websocket/WebsocketClient.h"

#ifdef __linux__
#include "arpa/inet.h"
#endif

#include "Rtp.h"

const int MAX_RTP_BUFFER_SIZE = 1024*1024*10;

#define Server_cache_max_size 4194304 // 1M = 1 * 1024 * 1024 = 1048576 字节
#define Server_rtp_max_size 1800


RTPReceiver2::RTPReceiver2()
{
    mRecvCache = (uint8_t*)malloc(Server_cache_max_size);
	mRecvRtpBuffer = (uint8_t*)malloc(Server_rtp_max_size);
}

RTPReceiver2::~RTPReceiver2(){
    if (mRecvCache) {
		free(mRecvCache);
		mRecvCache = nullptr;
	}

	if (mRecvRtpBuffer) {
		free(mRecvRtpBuffer);
		mRecvRtpBuffer = nullptr;
	}
}

void RTPReceiver2::SetOutputCallback(CallBack_Stream cb, void* param)
{
	m_buffer_cbk = cb;
	m_bufferParam = param;
}

void RTPReceiver2::SetVodEndCallback(CallBack_VodFileEnd cb, void* param)
{
    m_finish_cbk = cb;
	m_finishParam = param;
}

bool RTPReceiver2::Open(string channel_id, bool isUdp) {
    m_SipChannelId = channel_id;

    m_rtp_port = allocRtpPort();
	if (m_rtp_port < 0) {
		return false;
	}

	bool bReq = start_server(channel_id, m_rtp_port, isUdp);
	if (!bReq) {
		LOG_INFO("[{}] start_server failed !", m_SipChannelId);
		Close();
		return false;
	}
    
    m_bOpened = true;

    LOG_INFO("[{}] started.", m_SipChannelId);

    return true;
}

bool RTPReceiver2::IsOpened(){
    LOG_INFO("[{}] isopen:{} ", m_SipChannelId, m_bOpened);
    return m_bOpened;
}

void RTPReceiver2::Close(){
    m_bRtpExit = true;

    WebsocketClient* pServer = WebsocketClient::getInstance();
	if (pServer){
		pServer->ByeInvite(m_SipChannelId, m_rtp_port);
	}

	if(m_server_thread) {
		m_server_thread->join();
		delete m_server_thread;
		m_server_thread = nullptr;
	}
}

bool RTPReceiver2::start_server(string channel_id, int port, bool isUdp) {
	WebsocketClient* pClient = WebsocketClient::getInstance();
	if (pClient){

		if (isUdp) {
			m_server_thread = new std::thread([](void* arg) {
				RTPReceiver2* a=(RTPReceiver2*)arg;
				a->udp_server();
				return (void*)0;
			}, this);
		
			if (pClient->InviteUdp(channel_id, port, this) < 0) {
				return false;
			}

		} else {
			m_server_thread = new std::thread([](void* arg) {
				RTPReceiver2* a=(RTPReceiver2*)arg;
				a->tcp_server();
				return (void*)0;
			}, this);

			if (pClient->InviteTcp(channel_id, port, this) < 0) {
				return false;
			}
		}
	}

	return true;
}

int RTPReceiver2::udp_server() {

	uint16_t port = m_rtp_port;

	LOG_INFO("udp {}",port);

    int server_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); //AF_INET:IPV4;SOCK_DGRAM:UDP
    if(server_fd < 0)
    {
        printf("create socket fail!\n");
        return -1;
    }

	struct sockaddr_in ser_addr;
    memset(&ser_addr, 0, sizeof(ser_addr));
    ser_addr.sin_family = AF_INET;
    ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); //IP地址,需要进行网络序转换,INADDR_ANY:本地地址
    ser_addr.sin_port = htons(port);  //端口号,需要网络序转换

    int ret = bind(server_fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
    if(ret < 0) {
        printf("socket bind fail!\n");
        return -1;
    }

	uint8_t recvBuf[10000];
	int  recvBufSize;

    socklen_t len;
    struct sockaddr_in clent_addr;  //clent_addr用于记录发送方的地址信息
    while(!m_bRtpExit)
    {
        memset(recvBuf, 0, sizeof(recvBuf));
        len = sizeof(clent_addr);
        recvBufSize = recvfrom(server_fd, recvBuf, sizeof(recvBuf), 0, (struct sockaddr*)&clent_addr, &len);  //recvfrom是拥塞函数,没有数据就一直拥塞
        if(recvBufSize <= 0) {
            LOG_ERROR("recieve data fail!");
            break;
        }

		// buffer 抛出
		m_buffer_cbk(m_bufferParam, recvBuf, recvBufSize, 0);
    }

	close(server_fd);

	m_finish_cbk(m_finishParam);

	LOG_INFO("udp server exit.");

	return 0;
}

int RTPReceiver2::tcp_server() {

	uint16_t port = m_rtp_port;

	LOG_INFO("tcp {}", port);

	int  listenfd, connfd;
    struct sockaddr_in  servaddr;
    char  buff[4096];
    int  n;

    if( (listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1 ){
        printf("create socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);

    if( bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){
        printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }

    if( listen(listenfd, SOMAXCONN) == -1){
        printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }

	char recvBuf[10000];
	int  recvBufSize = 0;

	while (!m_bRtpExit)
	{
		LOG_INFO("阻塞监听新连接...");
		// 阻塞接收请求 start
		socklen_t  len = sizeof(sockaddr);
		sockaddr_in accept_addr;
		int clientFd = accept(listenfd, (struct sockaddr*)&accept_addr, &len);
		if (clientFd < 0) {
			LOG_WARN("accept connection error");
			std::this_thread::sleep_for(std::chrono::milliseconds(5));
			continue;
		}
		// 阻塞接收请求 end
		LOG_INFO("发现新连接:clientFd={}", clientFd);
		
		while (!m_bRtpExit) {
			recvBufSize = recv(clientFd, recvBuf, sizeof(recvBuf), 0);
			if (recvBufSize <= 0) {
				LOG_ERROR("::recv error: clientFd={},recvBufSize={}", clientFd, recvBufSize);
				break;
			}

			parseTcpData(recvBuf, recvBufSize);
		}

		close(clientFd);
		LOG_WARN("关闭连接 clientFd={}", clientFd);
	}

	close(listenfd);

	m_finish_cbk(m_finishParam);

	LOG_INFO("tcp server exit.");
	return 0;
}

void RTPReceiver2::parseTcpData(char* recvBuf, int recvBufSize) {

	if ((mRecvCacheSize + recvBufSize) > Server_cache_max_size) {
		LOG_ERROR("超过缓冲容量上限,忽略本次读取的数据。mRecvCacheSize=%d,recvBufSize=%d",mRecvCacheSize, recvBufSize);
	} else {
		memcpy(mRecvCache + mRecvCacheSize, recvBuf, recvBufSize);
		mRecvCacheSize += recvBufSize;
	}
	//LOGI("cacheSize=%d,开始进入解析 ... ...", cacheSize);

	while (true) {

		if (mRecvCacheSize > 2) {

			bool success = false;

			if (mRecvCacheSize > 2) {
				mRecvRtpBufferSize = ntohs(*(int16_t*)(mRecvCache));
				if ((mRecvCacheSize - 2) >= mRecvRtpBufferSize) {
					success = true;
				}
			}

			if (success) {
				mRecvCacheSize -= 2;
				mRecvCacheSize -= mRecvRtpBufferSize;

				// 提取RTP
				memcpy(mRecvRtpBuffer, mRecvCache + 2, mRecvRtpBufferSize);
				memmove(mRecvCache, mRecvCache + 2 + mRecvRtpBufferSize, mRecvCacheSize);

				struct RtpHeader rtpHeader;
				parseRtpHeader(mRecvRtpBuffer, &rtpHeader);
				printf("get a rtp seq=%d,RtpBufferSize=%d,mRecvCacheSize=%d,marker=%d,timestamp=%d\n", 
					rtpHeader.seq, 
					mRecvRtpBufferSize, 
					mRecvCacheSize,rtpHeader.marker, rtpHeader.timestamp);

				// buffer 抛出
				m_buffer_cbk(m_bufferParam, mRecvRtpBuffer, mRecvRtpBufferSize, rtpHeader.timestamp);

			} else {
				//LOGI("跳出解析:cacheSize=%d,pktSize=%d", cacheSize, pktSize);
				break;
			}
		} else {
			//LOGI("跳出解析:缓冲数据未发现完整数据包");
			break;
		}
	}
}

int RTPReceiver2::allocRtpPort() {

	WebsocketClient* pServer = WebsocketClient::getInstance();
	int MIN_RTP_PORT = pServer->GetMinRtpPort() ;
	int MAX_RTP_PORT = pServer->GetMaxRtpPort();

	int s_rtpPort = MIN_RTP_PORT;

	srand((unsigned int)time(NULL));
	s_rtpPort = MIN_RTP_PORT + (rand() % MIN_RTP_PORT);

	if (s_rtpPort % 2)
		++s_rtpPort;

	int count = 0;

	while (true)
	{
		if (s_rtpPort >= MAX_RTP_PORT) {
			s_rtpPort = MIN_RTP_PORT;
			count ++;
			if (count > 1) {
				LOG_ERROR("[{}] - 范围内没有可用的port", m_SipChannelId);
			}
		}

		int i = 0;
		for (; i < 2; i++) {
			sockaddr_in sRecvAddr;
			int s = socket(AF_INET, SOCK_DGRAM, 0);

			sRecvAddr.sin_family = AF_INET;        
			sRecvAddr.sin_addr.s_addr = htonl(INADDR_ANY);    
			sRecvAddr.sin_port = htons(s_rtpPort + i); 

			int nResult = bind(s, (sockaddr *)&sRecvAddr, sizeof(sRecvAddr));
			if (nResult != 0) {
				break;
			}

			nResult = close(s);
			if (nResult != 0) {
				LOG_ERROR("[{}] - closesocket failed : {}", m_SipChannelId, nResult);
				break;
			}
		}

		if (i == 2)
			break;

		s_rtpPort += 2;
	}

	return s_rtpPort;
}

void RTPReceiver2::RequestStreamFailed() {
	m_bRtpExit = true;
}