RTPReceiver2.cpp 7.06 KB
#include "RTPReceiver2.h"
#include "rtppacket.h"
#include <thread>

#include "../common_header.h"
#include "../websocket/WebsocketClient.h"

#ifdef __linux__
#include "arpa/inet.h"
#endif

#include "Rtp.h"

const int MAX_RTP_BUFFER_SIZE = 1024*1024*10;

#define Server_cache_max_size 4194304 // 1M = 1 * 1024 * 1024 = 1048576 字节
#define Server_rtp_max_size 1800


RTPReceiver2::RTPReceiver2()
{
    mRecvCache = (uint8_t*)malloc(Server_cache_max_size);
	mRecvRtpBuffer = (uint8_t*)malloc(Server_rtp_max_size);
}

RTPReceiver2::~RTPReceiver2(){
    if (mRecvCache) {
		free(mRecvCache);
		mRecvCache = nullptr;
	}

	if (mRecvRtpBuffer) {
		free(mRecvRtpBuffer);
		mRecvRtpBuffer = nullptr;
	}
}

int RTPReceiver2::init(const char* ip, uint16_t port, bool isUdp) {
	if (!isUdp) {
		LOG_INFO("tcp://%s:%d", ip, port);
		startTcpServer(ip, port);
	}
	else {
		LOG_INFO("udp://%s:%d", ip, port);
		startUdpServer(ip, port);
	}
}

int RTPReceiver2::startUdpServer(const char* ip, uint16_t port) {

	int server_fd, ret;
    struct sockaddr_in ser_addr;

    server_fd = socket(AF_INET, SOCK_DGRAM, 0); //AF_INET:IPV4;SOCK_DGRAM:UDP
    if(server_fd < 0)
    {
        printf("create socket fail!\n");
        return -1;
    }

    memset(&ser_addr, 0, sizeof(ser_addr));
    ser_addr.sin_family = AF_INET;
    ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); //IP地址,需要进行网络序转换,INADDR_ANY:本地地址
    ser_addr.sin_port = htons(port);  //端口号,需要网络序转换

    ret = bind(server_fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
    if(ret < 0)
    {
        printf("socket bind fail!\n");
        return -1;
    }


	char recvBuf[10000];
	int  recvBufSize;

    socklen_t len;
    struct sockaddr_in clent_addr;  //clent_addr用于记录发送方的地址信息
    while(!m_bRtpExit)
    {
        memset(recvBuf, 0, sizeof(recvBuf));
        len = sizeof(clent_addr);
        recvBufSize = recvfrom(server_fd, recvBuf, sizeof(recvBuf), 0, (struct sockaddr*)&clent_addr, &len);  //recvfrom是拥塞函数,没有数据就一直拥塞
        if(recvBufSize <= 0) {
            printf("recieve data fail!\n");
            break;
        }

		if ((mPlayer->bufferSize + recvBufSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) {
			memcpy(mPlayer->buffer + mPlayer->bufferSize, recvBuf + RTP_HEADER_SIZE, recvBufSize - RTP_HEADER_SIZE);
			mPlayer->bufferSize += recvBufSize - RTP_HEADER_SIZE;
		} else {
			LOG_ERROR("recvBufSize = {} over GB28181Player_buffer_max_size ", recvBufSize);
		}
    }

	close(server_fd);

	return 0;
}

int RTPReceiver2::startTcpServer(const char* ip, uint16_t port) {

	int  listenfd, connfd;
    struct sockaddr_in  servaddr;
    char  buff[4096];
    int  n;

    if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1 ){
        printf("create socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);

    if( bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){
        printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }

    if( listen(listenfd, 10) == -1){
        printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }


	char recvBuf[10000];
	int  recvBufSize = 0;

	while (!m_bRtpExit)
	{
		LOG_INFO("阻塞监听新连接...");
		// 阻塞接收请求 start

		int clientFd = accept(listenfd, (struct sockaddr*)NULL, NULL);
		if (clientFd < 0) {
			LOG_ERROR("accept connection error");
			continue;
		}
		// 阻塞接收请求 end
		LOG_INFO("发现新连接:clientFd=%d", clientFd);
		
		while (!m_bRtpExit) {
			recvBufSize = recv(clientFd, recvBuf, sizeof(recvBuf), 0);
			if (recvBufSize <= 0) {
				LOG_ERROR("::recv error: clientFd={},recvBufSize={}", clientFd, recvBufSize);
				break;
			}

			parseTcpData(recvBuf, recvBufSize);
		}

		close(clientFd);
		LOG_INFO("关闭连接 clientFd={}", clientFd);

	}

	close(listenfd);
	return 0;
}

void RTPReceiver2::parseTcpData(char* recvBuf, int recvBufSize) {

	if ((mRecvCacheSize + recvBufSize) > Server_cache_max_size) {
		LOG_ERROR("超过缓冲容量上限,忽略本次读取的数据。mRecvCacheSize=%d,recvBufSize=%d",mRecvCacheSize, recvBufSize);
	
	}
	else {
		memcpy(mRecvCache + mRecvCacheSize, recvBuf, recvBufSize);
		mRecvCacheSize += recvBufSize;
	}
	//LOGI("cacheSize=%d,开始进入解析 ... ...", cacheSize);

	while (true) {

		if (mRecvCacheSize > 2) {

			bool success = false;

			if (mRecvCacheSize > 2) {
				mRecvRtpBufferSize = ntohs(*(int16_t*)(mRecvCache));
				if ((mRecvCacheSize - 2) >= mRecvRtpBufferSize) {
					success = true;
				}
			}

			if (success) {
				mRecvCacheSize -= 2;
				mRecvCacheSize -= mRecvRtpBufferSize;

				// 提取RTP
				memcpy(mRecvRtpBuffer, mRecvCache + 2, mRecvRtpBufferSize);
				memmove(mRecvCache, mRecvCache + 2 + mRecvRtpBufferSize, mRecvCacheSize);

				// RTP
				RtpHeader rtpHeader;
				parseRtpHeader(mRecvRtpBuffer, &rtpHeader);
				printf("get a rtp seq=%d,RtpBufferSize=%d,mRecvCacheSize=%d,marker=%d,timestamp=%d\n", 
					rtpHeader.seq, 
					mRecvRtpBufferSize, 
					mRecvCacheSize,rtpHeader.marker, rtpHeader.timestamp);


				// 将从mRecvCache提取出来的rtp字节流 mRecvRtpBuffer去掉RTP_HEADER_SIZE,存储到播放器缓存中
				if ((mPlayer->bufferSize + mRecvRtpBufferSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) {
					memcpy(mPlayer->buffer + mPlayer->bufferSize, mRecvRtpBuffer + RTP_HEADER_SIZE, mRecvRtpBufferSize - RTP_HEADER_SIZE);
					mPlayer->bufferSize += mRecvRtpBufferSize - RTP_HEADER_SIZE;
				}
				else {
					LOG_ERROR("recvBufSize = %d over MAX_RTP_BUFFER_SIZE ", recvBufSize);
				}

			}
			else {
				//LOGI("跳出解析:cacheSize=%d,pktSize=%d", cacheSize, pktSize);
				break;
			}
		}
		else {
			//LOGI("跳出解析:缓冲数据未发现完整数据包");
			break;
		}
	}
}

int RTPReceiver2::allocRtpPort() {

	WebsocketClient* pServer = WebsocketClient::getInstance();
	int MIN_RTP_PORT = pServer->GetMinRtpPort() ;
	int MAX_RTP_PORT = pServer->GetMaxRtpPort();

	int s_rtpPort = MIN_RTP_PORT;

	srand((unsigned int)time(NULL));
	s_rtpPort = MIN_RTP_PORT + (rand() % MIN_RTP_PORT);

	if (s_rtpPort % 2)
		++s_rtpPort;

	int count = 0;

	while (true)
	{
		if (s_rtpPort >= MAX_RTP_PORT) {
			s_rtpPort = MIN_RTP_PORT;
			count ++;
			if (count > 1) {
				LOG_ERROR("[{}] - 范围内没有可用的port", m_SipChannelId);
			}
		}

		int i = 0;
		for (; i < 2; i++) {
			sockaddr_in sRecvAddr;
			int s = socket(AF_INET, SOCK_DGRAM, 0);

			sRecvAddr.sin_family = AF_INET;        
			sRecvAddr.sin_addr.s_addr = htonl(INADDR_ANY);    
			sRecvAddr.sin_port = htons(s_rtpPort + i); 

			int nResult = bind(s, (sockaddr *)&sRecvAddr, sizeof(sRecvAddr));
			if (nResult != 0) {
				break;
			}

			nResult = close(s);
			if (nResult != 0) {
				LOG_ERROR("[{}] - closesocket failed : {}", m_SipChannelId, nResult);
				break;
			}
		}

		if (i == 2)
			break;

		s_rtpPort += 2;
	}

	return s_rtpPort;
}

void RTPReceiver2::RequestStreamFailed() {
	m_bRtpExit = true;
}