#include "RTPReceiver2.h" #include "rtppacket.h" #include #include "../common_header.h" #include "../websocket/WebsocketClient.h" #ifdef __linux__ #include "arpa/inet.h" #endif #include "Rtp.h" const int MAX_RTP_BUFFER_SIZE = 1024*1024*10; #define Server_cache_max_size 4194304 // 1M = 1 * 1024 * 1024 = 1048576 字节 #define Server_rtp_max_size 1800 static long long get_cur_time_second(){ chrono::time_point tpMicro = chrono::time_point_cast(chrono::system_clock::now()); return tpMicro.time_since_epoch().count(); } RTPReceiver2::RTPReceiver2() { mRecvCache = (uint8_t*)malloc(Server_cache_max_size); mRecvRtpBuffer = (uint8_t*)malloc(Server_rtp_max_size); } RTPReceiver2::~RTPReceiver2(){ if (mRecvCache) { free(mRecvCache); mRecvCache = nullptr; } if (mRecvRtpBuffer) { free(mRecvRtpBuffer); mRecvRtpBuffer = nullptr; } } void RTPReceiver2::SetOutputCallback(CallBack_Stream cb, void* param) { m_buffer_cbk = cb; m_bufferParam = param; } void RTPReceiver2::SetVodEndCallback(CallBack_VodFileEnd cb, void* param) { m_finish_cbk = cb; m_finishParam = param; } bool RTPReceiver2::Open(string channel_id, bool isUdp) { m_SipChannelId = channel_id; m_rtp_port = allocRtpPort(); if (m_rtp_port < 0) { return false; } bool bReq = start_server(channel_id, m_rtp_port, isUdp); if (!bReq) { LOG_INFO("[{}] start_server failed !", m_SipChannelId); Close(); return false; } m_bOpened = true; LOG_INFO("[{}] started.", m_SipChannelId); return true; } bool RTPReceiver2::IsOpened(){ LOG_INFO("[{}] isopen:{} ", m_SipChannelId, m_bOpened); return m_bOpened; } void RTPReceiver2::Close(){ m_bRtpExit = true; WebsocketClient* pServer = WebsocketClient::getInstance(); if (pServer){ pServer->ByeInvite(m_SipChannelId, m_rtp_port); } if(m_server_thread) { m_server_thread->join(); delete m_server_thread; m_server_thread = nullptr; } } bool RTPReceiver2::start_server(string channel_id, int port, bool isUdp) { WebsocketClient* pClient = WebsocketClient::getInstance(); if (pClient){ if (isUdp) { m_server_thread = new std::thread([](void* arg) { RTPReceiver2* a=(RTPReceiver2*)arg; a->udp_server(); return (void*)0; }, this); if (pClient->InviteUdp(channel_id, port, this) < 0) { return false; } } else { m_server_thread = new std::thread([](void* arg) { RTPReceiver2* a=(RTPReceiver2*)arg; a->tcp_server(); return (void*)0; }, this); if (pClient->InviteTcp(channel_id, port, this) < 0) { return false; } } } return true; } int RTPReceiver2::udp_server() { uint16_t port = m_rtp_port; LOG_INFO("udp {}",port); int server_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); //AF_INET:IPV4;SOCK_DGRAM:UDP if(server_fd < 0) { printf("create socket fail!\n"); return -1; } struct sockaddr_in ser_addr; memset(&ser_addr, 0, sizeof(ser_addr)); ser_addr.sin_family = AF_INET; ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); //IP地址,需要进行网络序转换,INADDR_ANY:本地地址 ser_addr.sin_port = htons(port); //端口号,需要网络序转换 int ret = bind(server_fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr)); if(ret < 0) { printf("socket bind fail!\n"); return -1; } uint8_t recvBuf[10000]; int recvBufSize; socklen_t len; struct sockaddr_in clent_addr; //clent_addr用于记录发送方的地址信息 while(!m_bRtpExit) { memset(recvBuf, 0, sizeof(recvBuf)); len = sizeof(clent_addr); recvBufSize = recvfrom(server_fd, recvBuf, sizeof(recvBuf), 0, (struct sockaddr*)&clent_addr, &len); //recvfrom是拥塞函数,没有数据就一直拥塞 if(recvBufSize <= 0) { LOG_ERROR("recieve data fail!"); break; } // buffer 抛出 m_buffer_cbk(m_bufferParam, recvBuf, recvBufSize, 0); } close(server_fd); m_finish_cbk(m_finishParam); LOG_INFO("udp server exit."); return 0; } int RTPReceiver2::tcp_server() { uint16_t port = m_rtp_port; LOG_INFO("tcp {}", port); int listenfd, connfd; struct sockaddr_in servaddr; char buff[4096]; int n; if( (listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP)) == -1 ){ printf("create socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(port); if( bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){ printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } if( listen(listenfd, SOMAXCONN) == -1){ printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno); return 0; } uint8_t recvBuf[10000]; int recvBufSize = 0; bool bFilter = false; long long last_time = get_cur_time_second(); while (!m_bRtpExit) { // LOG_INFO("阻塞监听新连接..."); // 阻塞接收请求 start socklen_t len = sizeof(sockaddr); sockaddr_in accept_addr; int clientFd = accept(listenfd, (struct sockaddr*)&accept_addr, &len); if (clientFd < 0) { if (!bFilter) { LOG_WARN("accept connection warn"); bFilter = true; } long long cur_time = get_cur_time_second(); if (cur_time - last_time > 300) { // 5分钟未能建立新连接,退出 m_bRtpExit = true; break; } std::this_thread::sleep_for(std::chrono::milliseconds(5)); continue; } // 阻塞接收请求 end last_time = get_cur_time_second(); LOG_INFO("发现新连接:clientFd={}", clientFd); bFilter = false; while (!m_bRtpExit) { recvBufSize = recv(clientFd, recvBuf, sizeof(recvBuf), 0); if (recvBufSize <= 0) { LOG_ERROR("::recv error: clientFd={},recvBufSize={}", clientFd, recvBufSize); break; } parseTcpData(recvBuf, recvBufSize); } close(clientFd); LOG_WARN("关闭连接 clientFd={}", clientFd); } close(listenfd); m_finish_cbk(m_finishParam); LOG_INFO("tcp server exit."); return 0; } void RTPReceiver2::parseTcpData(uint8_t* recvBuf, int recvBufSize) { if ((mRecvCacheSize + recvBufSize) > Server_cache_max_size) { LOG_ERROR("超过缓冲容量上限,忽略本次读取的数据。mRecvCacheSize=%d,recvBufSize=%d",mRecvCacheSize, recvBufSize); } else { memcpy(mRecvCache + mRecvCacheSize, recvBuf, recvBufSize); mRecvCacheSize += recvBufSize; } //LOGI("cacheSize=%d,开始进入解析 ... ...", cacheSize); while (true) { if (mRecvCacheSize > 2) { bool success = false; if (mRecvCacheSize > 2) { mRecvRtpBufferSize = ntohs(*(int16_t*)(mRecvCache)); if ((mRecvCacheSize - 2) >= mRecvRtpBufferSize) { success = true; } } if (success) { mRecvCacheSize -= 2; mRecvCacheSize -= mRecvRtpBufferSize; // 提取RTP memcpy(mRecvRtpBuffer, mRecvCache + 2, mRecvRtpBufferSize); memmove(mRecvCache, mRecvCache + 2 + mRecvRtpBufferSize, mRecvCacheSize); struct RtpHeader rtpHeader; parseRtpHeader(mRecvRtpBuffer, &rtpHeader); // printf("get a rtp seq=%d,RtpBufferSize=%d,mRecvCacheSize=%d,marker=%d,timestamp=%d\n", // rtpHeader.seq, // mRecvRtpBufferSize, // mRecvCacheSize,rtpHeader.marker, rtpHeader.timestamp); // buffer 抛出 m_buffer_cbk(m_bufferParam, mRecvRtpBuffer, mRecvRtpBufferSize, rtpHeader.timestamp); } else { //LOGI("跳出解析:cacheSize=%d,pktSize=%d", cacheSize, pktSize); break; } } else { //LOGI("跳出解析:缓冲数据未发现完整数据包"); break; } } } int RTPReceiver2::allocRtpPort() { WebsocketClient* pServer = WebsocketClient::getInstance(); int MIN_RTP_PORT = pServer->GetMinRtpPort() ; int MAX_RTP_PORT = pServer->GetMaxRtpPort(); int s_rtpPort = MIN_RTP_PORT; srand((unsigned int)time(NULL)); s_rtpPort = MIN_RTP_PORT + (rand() % MIN_RTP_PORT); if (s_rtpPort % 2) ++s_rtpPort; int count = 0; while (true) { if (s_rtpPort >= MAX_RTP_PORT) { s_rtpPort = MIN_RTP_PORT; count ++; if (count > 1) { LOG_ERROR("[{}] - 范围内没有可用的port", m_SipChannelId); } } int i = 0; for (; i < 2; i++) { sockaddr_in sRecvAddr; int s = socket(AF_INET, SOCK_DGRAM, 0); sRecvAddr.sin_family = AF_INET; sRecvAddr.sin_addr.s_addr = htonl(INADDR_ANY); sRecvAddr.sin_port = htons(s_rtpPort + i); int nResult = bind(s, (sockaddr *)&sRecvAddr, sizeof(sRecvAddr)); if (nResult != 0) { break; } nResult = close(s); if (nResult != 0) { LOG_ERROR("[{}] - closesocket failed : {}", m_SipChannelId, nResult); break; } } if (i == 2) break; s_rtpPort += 2; } return s_rtpPort; } void RTPReceiver2::RequestStreamFailed() { m_bRtpExit = true; }