#include "RTPReceiver.h" #include "rtppacket.h" #include #include "../common_header.h" #include "../sip/SipServer.h" #ifdef __linux__ #include "arpa/inet.h" #endif #define BUFFERSIZE_1024 1024 const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2; // PS解包器回调 static int ReceivePESFunction(unsigned char streamid, void * data, int size, uint64_t pts, uint64_t localPts, bool key, void* param) { if (NULL != data && size > 0) { ((RTPReceiver*)param)->OnPsDemux(streamid, (BYTE*)data, size, key, (uint64_t)pts, (uint64_t)localPts); } return 0; } static int ps_demuxer_thread_(void* param) { if (!param) { return -1; } RTPReceiver* self = (RTPReceiver*)param; return self->OnPsProcess(); } RTPReceiver::RTPReceiver() :m_LastPTS(-1) , m_LastIsKeyFrame(0) , m_SliceBuf(1024*1024) , m_h264DataFunc(NULL) , m_hVodEndFunc(NULL) , m_usrParam(NULL) , m_bPsExit(false) , m_psThreadPtr(nullptr) { m_LastStreamType = 0; recvTmpBuf = new BYTE[kVideoFrameSize]; } RTPReceiver::~RTPReceiver(){ if(recvTmpBuf != nullptr){ delete[] recvTmpBuf; } } void RTPReceiver::SetOutputCallback(CallBack_Stream cb, void* param) { m_h264DataFunc = cb; m_usrParam = param; } void RTPReceiver::SetVodEndCallback(CallBack_VodFileEnd cb, void* param) { m_hVodEndFunc = cb; m_usrParam = param; } void RTPReceiver::SetRequestStreamCallback(CallBack_Request_Stream cb){ m_callback_request_stream = cb; } int RTPReceiver::InitPS(){ m_psParser.SetReceiveFunction(ReceivePESFunction, this); m_psThreadPtr = new std::thread(ps_demuxer_thread_, this); if(nullptr == m_psThreadPtr){ return -1; } LOG_INFO("[{}] InitPS finished", m_SipChannelId); return 0; } void RTPReceiver::ClosePsThread(){ LOG_INFO("[{}] 3.", m_SipChannelId); m_bPsExit = true; // PS解包线程退出 if (m_psThreadPtr->joinable()) { m_psThreadPtr->join(); delete m_psThreadPtr; m_psThreadPtr = nullptr; } LOG_INFO("[{}] ps demux thread quit", m_SipChannelId); } // 处理去除了PS头的数据 // 下级厂商发过来的流有可能是MPEG-4/H264/SVAC中的任意一种 // 国标标准规定, 编码方(下级)可以选择实现H264、MPEG4或者SVAC, 但解码方(上级) // 必须同时实现对H264、MPEG4和SVAC的支持 void RTPReceiver::OnPsDemux(unsigned char streamId, BYTE *data, int len, bool key, uint64_t pts, uint64_t localPts) { if (!m_h264DataFunc) { return; } if (-1 == m_LastPTS) { if (!key) { return; } m_LastPTS = pts; m_LastIsKeyFrame = key; m_LastStreamType = streamId; } // 音频数据不处理 if (0xC0 == streamId) { return; } ////add by mds 20190424 //if (m_notToDecodCount > 50000)//针对大华相机,可能会很久不调用解码 //{ // byte_buffer bb(64); // bb << ERROR_REALSTREAM_INTERRUPT << "This session have a long time no decoding"; // LOG_INFO("[{}] Long time no decoding!!!m_notToDecodCount=[{}]", m_SipChannelId, m_notToDecodCount); // // if (m_usrParam) // { // if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size()); // //通知网关关闭句柄 // if(!((VideoSession *)GetUsrParam())->streamHandle().empty()) // { // LOG_INFO("[{}] ---->Notify hisense gateway release handle = {} !<----", m_SipChannelId, ((VideoSession *)GetUsrParam())->streamHandle()); // if (((VideoSession *)GetUsrParam())->video_type() == EREAL) // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // // if (((VideoSession *)GetUsrParam())->video_type() == ERECORD) // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // // ((VideoSession *)GetUsrParam())->streamHandle().clear(); // } // // m_hVodEndFunc(m_usrParam); // } // // bb.bset(0); // return; //} if (m_LastPTS != pts) { int stream_type = 0; if (VIDEO_TYPE_H264 == streamId) { stream_type = 0; } else if(VIDEO_TYPE_H265 == streamId) { stream_type = 1; } else { LOG_ERROR("[{}] - video type not support!", m_SipChannelId); } m_notToDecodCount = 0; m_h264DataFunc(m_usrParam, stream_type, (char *)m_SliceBuf.head(), m_SliceBuf.len(), m_LastIsKeyFrame, m_LastPTS, localPts); m_SliceBuf.reset(); m_LastPTS = pts; m_LastIsKeyFrame = key; m_LastStreamType = streamId; } m_notToDecodCount++; m_SliceBuf.add((char*)data, len); } // 解PS包线程 int RTPReceiver::OnPsProcess() { LOG_INFO("[{}] started.", m_SipChannelId); while (!m_bPsExit) { m_psFrameMutex.lock(); // LOG_DEBUG("[{}] PS frame size : {}", m_SipChannelId, m_psVideoFrames.size()); if (m_psVideoFrames.size() <= 0){ m_psFrameMutex.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } Frame* frame = m_psVideoFrames.front(); m_psVideoFrames.pop(); m_psFrameMutex.unlock(); if (frame != nullptr) { int nRet = m_psParser.AddData(frame->buf_, frame->len_); if (nRet == -1) { LOG_INFO("m_psParser return -1--{}", m_SipChannelId); } else if (nRet == -2) { LOG_INFO("m_psParser return -2--{}", m_SipChannelId); } else if (nRet == -3) { LOG_INFO("m_psParser return -3--{}", m_SipChannelId); } delete frame; frame = nullptr; } } ClearPsVideoFrameList(); m_hVodEndFunc(m_usrParam); LOG_INFO("[{}] exited.", m_SipChannelId); return 0; } int RTPReceiver::GetPsFrameListSize() { std::lock_guard l(m_psFrameMutex); return m_psVideoFrames.size(); } void RTPReceiver::ClearPsVideoFrameList() { std::lock_guard l(m_psFrameMutex); while (!m_psVideoFrames.empty()) { Frame* f = m_psVideoFrames.front(); delete f; m_psVideoFrames.pop(); } LOG_INFO("[{}] cleared ps video frame list!", m_SipChannelId); } int RTPReceiver::ParsePacket(RTPPacket* packet){ do { if (0 == packet->GetPayloadType()) { // 音频数据, 暂不处理 break; } // 判断是否收到完整的帧(有些厂商打的marker标记不准, 所以只能看时间戳来判断) uint32_t curPts = packet->GetTimestamp(); if (lastPts != 0 && curPts != lastPts) { mark = 1; } lastPts = curPts; int payloadLen = packet->GetPayloadLength(); if (offset + payloadLen > kVideoFrameSize) { offset = 0, mark = 0; break; } // LOG_DEBUG("[{}] ParsePacket GetPayloadLength", m_SipChannelId); if (mark) { BYTE* frameBuf = (BYTE*)malloc(sizeof(BYTE) * offset); if (!frameBuf) { offset = 0, mark = 0; break; } memcpy(frameBuf, recvTmpBuf, offset); if (!m_bPsExit){ std::lock_guard l(m_psFrameMutex); if (m_psVideoFrames.size() < 100) { // LOG_DEBUG("[{}]ParsePacket push", m_SipChannelId); m_psVideoFrames.push(new Frame(frameBuf, offset, false)); } else { free(frameBuf); } } else{ //若此时解码线程已经退出,不再往m_psVideoFrames推送帧,且退出当前线程 free(frameBuf); LOG_INFO("ParsePacket quit, device_id:{}", m_SipChannelId); return 1; } offset = 0; mark = 0; } memcpy(recvTmpBuf + offset, packet->GetPayloadData(), payloadLen); offset += payloadLen; } while (0); return 0; } int RTPReceiver::allocRtpPort() { SipServer* pServer = SipServer::getInstance(); int MIN_RTP_PORT = pServer->GetMinRtpPort() ; int MAX_RTP_PORT = pServer->GetMaxRtpPort(); int s_rtpPort = MIN_RTP_PORT; srand((unsigned int)time(NULL)); s_rtpPort = MIN_RTP_PORT + (rand() % MIN_RTP_PORT); if (s_rtpPort % 2) ++s_rtpPort; int count = 0; while (true) { if (s_rtpPort >= MAX_RTP_PORT) { s_rtpPort = MIN_RTP_PORT; count ++; if (count > 1) { LOG_ERROR("[{}] - 10000 到 60000 没有可用的port", m_SipChannelId); } } int i = 0; for (; i < 2; i++) { sockaddr_in sRecvAddr; int s = socket(AF_INET, SOCK_DGRAM, 0); sRecvAddr.sin_family = AF_INET; sRecvAddr.sin_addr.s_addr = htonl(INADDR_ANY); sRecvAddr.sin_port = htons(s_rtpPort + i); int nResult = bind(s, (sockaddr *)&sRecvAddr, sizeof(sRecvAddr)); if (nResult != 0) { break; } nResult = close(s); if (nResult != 0) { LOG_ERROR("[{}] - closesocket failed : {}", m_SipChannelId, nResult); break; } } if (i == 2) break; s_rtpPort += 2; } return s_rtpPort; }