#include"RTPTcpReceiver.h" #include "common_header.h" // class TcpRTPSession : public RTPSession // { // public: // void setReceiver(RTPTcpReceiver* r){ // tcpReceiver = r; // } // protected: // void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled) // { // // printf("SSRC %x Got packet (%d bytes) in OnValidatedRTPPacket from source 0x%04x!\n", GetLocalSSRC(), // // (int)rtppack->GetPayloadLength(), srcdat->GetSSRC()); // LOG_DEBUG("SSRC {} Got packet ({} bytes) in OnValidatedRTPPacket from source {}}!\n", GetLocalSSRC(), // (int)rtppack->GetPayloadLength(), srcdat->GetSSRC()); // if(nullptr != tcpReceiver){ // tcpReceiver->ParsePacket(rtppack); // } // DeletePacket(rtppack); // *ispackethandled = true; // } // void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength) // { // char msg[1024]; // memset(msg, 0, sizeof(msg)); // if (itemlength >= sizeof(msg)) // itemlength = sizeof(msg)-1; // memcpy(msg, itemdata, itemlength); // // printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC()); // LOG_DEBUG("SSRC {} Received SDES item ({}): {} from SSRC {}\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC()); // } // private: // RTPTcpReceiver* tcpReceiver{nullptr}; // }; class MyTCPTransmitter : public RTPTCPTransmitter { public: void setReceiver(RTPTcpReceiver* r){ tcpReceiver = r; } public: MyTCPTransmitter() : RTPTCPTransmitter(0){ } void OnSendError(SocketType sock) { LOG_ERROR("Error sending over socket {}, removing destination", sock); DeleteDestination(RTPTCPAddress(sock)); if(nullptr != tcpReceiver && !tcpReceiver->isClosing()){ tcpReceiver->ReConnect(); } } void OnReceiveError(SocketType sock) { LOG_ERROR("Error receiving over socket {}, removing destination", sock); DeleteDestination(RTPTCPAddress(sock)); } private: RTPTcpReceiver* tcpReceiver{nullptr}; }; static int rtp_revc_thread_(void* param) { if (!param) { return -1; } RTPTcpReceiver* self = (RTPTcpReceiver*)param; return self->OnRtpRecv(); } static int listen_finish_thread_(void* param) { if (!param) { return -1; } RTPTcpReceiver* self = (RTPTcpReceiver*)param; return self->ListenFinish(); } RTPTcpReceiver::RTPTcpReceiver() : m_bRtpExit(false) , m_bOpened(false) , m_idleCount(-1) , m_noDataCount(-1) , m_nListener(-1) , m_bAccepted(false) , m_bClosing(false) { m_rtpSessionPtr = new RTPSession(); m_pSessparams = new RTPSessionParams(); m_pTrans = new MyTCPTransmitter(); } RTPTcpReceiver::~RTPTcpReceiver(){ if (IsOpened()) Close(); if(m_rtpSessionPtr != nullptr){ delete m_rtpSessionPtr; m_rtpSessionPtr = nullptr; } if(m_pSessparams != nullptr){ delete m_pSessparams; m_pSessparams = nullptr; } if(m_pTrans != nullptr){ delete m_pTrans; m_pTrans = nullptr; } } bool RTPTcpReceiver::Open(uint16_t localPort){ if(0 != initSession(localPort)){ return false; } m_bOpened = true; LOG_INFO("[{}] started.", m_deviceID); return true; } bool RTPTcpReceiver::IsOpened(){ LOG_INFO("[{}] isopng:{} ", m_deviceID, m_bOpened); return m_bOpened; } void RTPTcpReceiver::Close(){ m_bRtpExit = true; if(m_listenFinishThread.joinable()){ m_listenFinishThread.join(); } } void RTPTcpReceiver::close_task(){ m_bRtpExit = true; m_bClosing = true; m_bAccepted = true; LOG_DEBUG("[{}] 1.", m_deviceID); // rtp接收线程退出 if (m_rtpThread.joinable()) { m_rtpThread.join(); } LOG_DEBUG("[{}] 2.", m_deviceID); ClosePsThread(); m_bOpened = false; LOG_INFO("[{}] closed.", m_deviceID); } bool RTPTcpReceiver::isClosing(){ return m_bClosing; } int RTPTcpReceiver::initSession(int localPort){ m_nListener = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); if (m_nListener < 0) { return -1; } sockaddr_in serverAddr; memset(&serverAddr, 0, sizeof(sockaddr_in)); serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(localPort); serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); int nRet = bind(m_nListener, (sockaddr*)&serverAddr, sizeof(serverAddr)); if (nRet == -1) { LOG_ERROR("[{}] 绑定端口失败: {}", m_deviceID, localPort); return -1; } if (listen(m_nListener, 1) == -1) { LOG_ERROR("[{}] listen 失败", m_deviceID); return -1; } int nPackSize = 45678; m_pSessparams->SetProbationType(RTPSources::NoProbation); m_pSessparams->SetOwnTimestampUnit(90000.0 / 25.0); m_pSessparams->SetMaximumPacketSize(nPackSize + 64); int status = m_pTrans->Init(false); status = m_pTrans->Create(65535, NULL); m_pTrans->setReceiver(this); status = m_rtpSessionPtr->Create(*m_pSessparams, m_pTrans); if (status < 0) { // 若status = -59 ,需运行 export LOGNAME=root ,见 https://blog.csdn.net/m0_37876242/article/details/128588162 LOG_ERROR("[{}] create session error: {}", m_deviceID, status); return -1; } m_rtpThread = std::thread(rtp_revc_thread_, this); m_listenFinishThread = std::thread(listen_finish_thread_, this); InitPS(); // bool bRet = RequestStream(); // if (!bRet) // { // LOG_INFO("[{}] 请求流失败!", m_deviceID); // return -1; // } LOG_INFO("[{}] 初始化成功, congratulations !!!", m_deviceID); return 0; } int RTPTcpReceiver::OnRtpRecv() { if(nullptr == m_rtpSessionPtr){ return -1; } LOG_INFO("[{}] OnRtpRecv started, m_nListener : {}", m_deviceID, m_nListener); sockaddr_in clientAddr; int nLen = sizeof(sockaddr_in); SocketType nServer = -1; LOG_INFO("[{}] Poll started.", m_deviceID); int reconn_times = 0; int reaccept_times = 0; bool bReconn = false; while(!m_bRtpExit){ while(!m_bAccepted){ if(m_bRtpExit){ goto end_flag; } while (!bReconn){ if(m_bRtpExit){ goto end_flag; } reconn_times++; if(reconn_times > 10){ // 10次请求都失败,结束任务 m_bRtpExit = true; goto end_flag; } LOG_DEBUG("[{}] RequestStream...", m_deviceID); bReconn = RequestStream(); if (bReconn){ LOG_DEBUG("[{}] RequestStream, True", m_deviceID); continue; } LOG_DEBUG("[{}] RequestStream, False", m_deviceID); std::this_thread::sleep_for(std::chrono::seconds(3)); } LOG_DEBUG("[{}] accepting...", m_deviceID); nServer = accept(m_nListener, (sockaddr*)&clientAddr, (socklen_t * ) &nLen); if (-1 == nServer){ reaccept_times++; LOG_DEBUG("[{}] reaccept_times = {}", m_deviceID, reaccept_times); if(reaccept_times > 600){ LOG_DEBUG("[{}] reaccept_times > 600", m_deviceID); bReconn = false; reaccept_times = 0; } std::this_thread::sleep_for(std::chrono::milliseconds(50)); continue; } LOG_DEBUG("[{}] accept success", m_deviceID); m_rtpSessionPtr->AddDestination(RTPTCPAddress(nServer)); m_bAccepted = true; bReconn = false; reconn_times = 0; reaccept_times = 0; LOG_INFO("[{}] nServer={}", m_deviceID, nServer); break; } m_rtpSessionPtr->BeginDataAccess(); if (m_rtpSessionPtr->GotoFirstSourceWithData()) { do { RTPPacket *pack; while ((pack = m_rtpSessionPtr->GetNextPacket()) != NULL) { // LOG_DEBUG("[{}] time: {} ", m_deviceID, UtilTools::get_cur_time_ms()); ParsePacket(pack); m_rtpSessionPtr->DeletePacket(pack); } } while (m_rtpSessionPtr->GotoNextSourceWithData()); } m_rtpSessionPtr->EndDataAccess(); m_rtpSessionPtr->Poll(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } end_flag: m_rtpSessionPtr->Destroy(); if(nServer > 0){ close(nServer); } if(m_nListener > 0){ close(m_nListener); } LOG_INFO("[{}] OnRtpRecv exited.", m_deviceID); return 0; } int RTPTcpReceiver::ListenFinish(){ while(!m_bRtpExit){ std::this_thread::sleep_for(std::chrono::seconds(3)); } close_task(); } bool RTPTcpReceiver::ReConnect(){ m_bAccepted = false; } bool RTPTcpReceiver::RequestStream(){ return m_callback_request_stream(m_deviceID.c_str()); }