#include "RTPReceiver2.h" #include "rtppacket.h" #include #include "../common_header.h" #include "../websocket/WebsocketClient.h" #ifdef __linux__ #include "arpa/inet.h" #endif #include "Rtp.h" const int MAX_RTP_BUFFER_SIZE = 1024*1024*10; #define Server_cache_max_size 4194304 // 1M = 1 * 1024 * 1024 = 1048576 字节 #define Server_rtp_max_size 1800 RTPReceiver2::RTPReceiver2() { mRecvCache = (uint8_t*)malloc(Server_cache_max_size); mRecvRtpBuffer = (uint8_t*)malloc(Server_rtp_max_size); } RTPReceiver2::~RTPReceiver2(){ if (mRecvCache) { free(mRecvCache); mRecvCache = nullptr; } if (mRecvRtpBuffer) { free(mRecvRtpBuffer); mRecvRtpBuffer = nullptr; } } int RTPReceiver2::init(const char* ip, uint16_t port, bool isUdp) { if (!isUdp) { LOG_INFO("tcp://%s:%d", ip, port); startTcpServer(ip, port); } else { LOG_INFO("udp://%s:%d", ip, port); startUdpServer(ip, port); } } int RTPReceiver2::startUdpServer(const char* ip, uint16_t port) { int server_fd, ret; struct sockaddr_in ser_addr; server_fd = socket(AF_INET, SOCK_DGRAM, 0); //AF_INET:IPV4;SOCK_DGRAM:UDP if(server_fd < 0) { printf("create socket fail!\n"); return -1; } memset(&ser_addr, 0, sizeof(ser_addr)); ser_addr.sin_family = AF_INET; ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); //IP地址,需要进行网络序转换,INADDR_ANY:本地地址 ser_addr.sin_port = htons(port); //端口号,需要网络序转换 ret = bind(server_fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr)); if(ret < 0) { printf("socket bind fail!\n"); return -1; } char recvBuf[10000]; int recvBufSize; socklen_t len; struct sockaddr_in clent_addr; //clent_addr用于记录发送方的地址信息 while(!m_bRtpExit) { memset(recvBuf, 0, sizeof(recvBuf)); len = sizeof(clent_addr); recvBufSize = recvfrom(server_fd, recvBuf, sizeof(recvBuf), 0, (struct sockaddr*)&clent_addr, &len); //recvfrom是拥塞函数,没有数据就一直拥塞 if(recvBufSize <= 0) { printf("recieve data fail!\n"); break; } if ((mPlayer->bufferSize + recvBufSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) { memcpy(mPlayer->buffer + mPlayer->bufferSize, recvBuf + RTP_HEADER_SIZE, recvBufSize - RTP_HEADER_SIZE); mPlayer->bufferSize += recvBufSize - RTP_HEADER_SIZE; } else { LOG_ERROR("recvBufSize = {} over GB28181Player_buffer_max_size ", recvBufSize); } } close(server_fd); return 0; } int RTPReceiver2::startTcpServer(const char* ip, uint16_t port) { int listenfd, connfd; struct sockaddr_in servaddr; char buff[4096]; int n; if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1 ){ printf("create socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(port); if( bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){ printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } if( listen(listenfd, 10) == -1){ printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } char recvBuf[10000]; int recvBufSize = 0; while (!m_bRtpExit) { LOG_INFO("阻塞监听新连接..."); // 阻塞接收请求 start int clientFd = accept(listenfd, (struct sockaddr*)NULL, NULL); if (clientFd < 0) { LOG_ERROR("accept connection error"); continue; } // 阻塞接收请求 end LOG_INFO("发现新连接:clientFd=%d", clientFd); while (!m_bRtpExit) { recvBufSize = recv(clientFd, recvBuf, sizeof(recvBuf), 0); if (recvBufSize <= 0) { LOG_ERROR("::recv error: clientFd={},recvBufSize={}", clientFd, recvBufSize); break; } parseTcpData(recvBuf, recvBufSize); } close(clientFd); LOG_INFO("关闭连接 clientFd={}", clientFd); } close(listenfd); return 0; } void RTPReceiver2::parseTcpData(char* recvBuf, int recvBufSize) { if ((mRecvCacheSize + recvBufSize) > Server_cache_max_size) { LOG_ERROR("超过缓冲容量上限,忽略本次读取的数据。mRecvCacheSize=%d,recvBufSize=%d",mRecvCacheSize, recvBufSize); } else { memcpy(mRecvCache + mRecvCacheSize, recvBuf, recvBufSize); mRecvCacheSize += recvBufSize; } //LOGI("cacheSize=%d,开始进入解析 ... ...", cacheSize); while (true) { if (mRecvCacheSize > 2) { bool success = false; if (mRecvCacheSize > 2) { mRecvRtpBufferSize = ntohs(*(int16_t*)(mRecvCache)); if ((mRecvCacheSize - 2) >= mRecvRtpBufferSize) { success = true; } } if (success) { mRecvCacheSize -= 2; mRecvCacheSize -= mRecvRtpBufferSize; // 提取RTP memcpy(mRecvRtpBuffer, mRecvCache + 2, mRecvRtpBufferSize); memmove(mRecvCache, mRecvCache + 2 + mRecvRtpBufferSize, mRecvCacheSize); // RTP RtpHeader rtpHeader; parseRtpHeader(mRecvRtpBuffer, &rtpHeader); printf("get a rtp seq=%d,RtpBufferSize=%d,mRecvCacheSize=%d,marker=%d,timestamp=%d\n", rtpHeader.seq, mRecvRtpBufferSize, mRecvCacheSize,rtpHeader.marker, rtpHeader.timestamp); // 将从mRecvCache提取出来的rtp字节流 mRecvRtpBuffer去掉RTP_HEADER_SIZE,存储到播放器缓存中 if ((mPlayer->bufferSize + mRecvRtpBufferSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) { memcpy(mPlayer->buffer + mPlayer->bufferSize, mRecvRtpBuffer + RTP_HEADER_SIZE, mRecvRtpBufferSize - RTP_HEADER_SIZE); mPlayer->bufferSize += mRecvRtpBufferSize - RTP_HEADER_SIZE; } else { LOG_ERROR("recvBufSize = %d over MAX_RTP_BUFFER_SIZE ", recvBufSize); } } else { //LOGI("跳出解析:cacheSize=%d,pktSize=%d", cacheSize, pktSize); break; } } else { //LOGI("跳出解析:缓冲数据未发现完整数据包"); break; } } } int RTPReceiver2::allocRtpPort() { WebsocketClient* pServer = WebsocketClient::getInstance(); int MIN_RTP_PORT = pServer->GetMinRtpPort() ; int MAX_RTP_PORT = pServer->GetMaxRtpPort(); int s_rtpPort = MIN_RTP_PORT; srand((unsigned int)time(NULL)); s_rtpPort = MIN_RTP_PORT + (rand() % MIN_RTP_PORT); if (s_rtpPort % 2) ++s_rtpPort; int count = 0; while (true) { if (s_rtpPort >= MAX_RTP_PORT) { s_rtpPort = MIN_RTP_PORT; count ++; if (count > 1) { LOG_ERROR("[{}] - 范围内没有可用的port", m_SipChannelId); } } int i = 0; for (; i < 2; i++) { sockaddr_in sRecvAddr; int s = socket(AF_INET, SOCK_DGRAM, 0); sRecvAddr.sin_family = AF_INET; sRecvAddr.sin_addr.s_addr = htonl(INADDR_ANY); sRecvAddr.sin_port = htons(s_rtpPort + i); int nResult = bind(s, (sockaddr *)&sRecvAddr, sizeof(sRecvAddr)); if (nResult != 0) { break; } nResult = close(s); if (nResult != 0) { LOG_ERROR("[{}] - closesocket failed : {}", m_SipChannelId, nResult); break; } } if (i == 2) break; s_rtpPort += 2; } return s_rtpPort; } void RTPReceiver2::RequestStreamFailed() { m_bRtpExit = true; }