#include "RTPReceiver.h" #include "rtppacket.h" #include #include "common_header.h" #define BUFFERSIZE_1024 1024 const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2; // PS解包器回调 static int ReceivePESFunction(unsigned char streamid, void * data, int size, uint64_t pts, uint64_t localPts, bool key, void* param) { if (NULL != data && size > 0) { ((RTPReceiver*)param)->OnPsDemux(streamid, (BYTE*)data, size, key, (uint64_t)pts, (uint64_t)localPts); } return 0; } static int ps_demuxer_thread_(void* param) { if (!param) { return -1; } RTPReceiver* self = (RTPReceiver*)param; return self->OnPsProcess(); } RTPReceiver::RTPReceiver() :m_LastPTS(-1) , m_LastIsKeyFrame(0) , m_SliceBuf(1024*1024) , m_h264DataFunc(NULL) , m_hVodEndFunc(NULL) , m_usrParam(NULL) , m_bPsExit(false) , m_psThreadPtr(nullptr) { m_LastStreamType = 0; recvTmpBuf = new BYTE[kVideoFrameSize]; } RTPReceiver::~RTPReceiver(){ if(recvTmpBuf != nullptr){ delete[] recvTmpBuf; } } void RTPReceiver::SetOutputCallback(CallBack_Stream cb, void* param) { m_h264DataFunc = cb; m_usrParam = param; } void RTPReceiver::SetVodEndCallback(CallBack_VodFileEnd cb, void* param) { m_hVodEndFunc = cb; m_usrParam = param; } void RTPReceiver::SetRequestStreamCallback(CallBack_Request_Stream cb){ m_callback_request_stream = cb; } int RTPReceiver::InitPS(){ m_psParser.SetReceiveFunction(ReceivePESFunction, this); m_psThreadPtr = new std::thread(ps_demuxer_thread_, this); if(nullptr == m_psThreadPtr){ return -1; } LOG_INFO("[{}] InitPS finished", m_deviceID); return 0; } void RTPReceiver::ClosePsThread(){ LOG_INFO("[{}] 3.", m_deviceID); m_bPsExit = true; // PS解包线程退出 if (m_psThreadPtr->joinable()) { m_psThreadPtr->join(); delete m_psThreadPtr; m_psThreadPtr = nullptr; } LOG_INFO("[{}] ps demux thread quit", m_deviceID); } // 处理去除了PS头的数据 // 下级厂商发过来的流有可能是MPEG-4/H264/SVAC中的任意一种 // 国标标准规定, 编码方(下级)可以选择实现H264、MPEG4或者SVAC, 但解码方(上级) // 必须同时实现对H264、MPEG4和SVAC的支持 void RTPReceiver::OnPsDemux(unsigned char streamId, BYTE *data, int len, bool key, uint64_t pts, uint64_t localPts) { if (!m_h264DataFunc) { return; } if (-1 == m_LastPTS) { if (!key) { return; } m_LastPTS = pts; m_LastIsKeyFrame = key; m_LastStreamType = streamId; } // 音频数据不处理 if (0xC0 == streamId) { return; } ////add by mds 20190424 //if (m_notToDecodCount > 50000)//针对大华相机,可能会很久不调用解码 //{ // byte_buffer bb(64); // bb << ERROR_REALSTREAM_INTERRUPT << "This session have a long time no decoding"; // LOG_INFO("[{}] Long time no decoding!!!m_notToDecodCount=[{}]", m_deviceID, m_notToDecodCount); // // if (m_usrParam) // { // if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size()); // //通知网关关闭句柄 // if(!((VideoSession *)GetUsrParam())->streamHandle().empty()) // { // LOG_INFO("[{}] ---->Notify hisense gateway release handle = {} !<----", m_deviceID, ((VideoSession *)GetUsrParam())->streamHandle()); // if (((VideoSession *)GetUsrParam())->video_type() == EREAL) // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // // if (((VideoSession *)GetUsrParam())->video_type() == ERECORD) // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // // ((VideoSession *)GetUsrParam())->streamHandle().clear(); // } // // m_hVodEndFunc(m_usrParam); // } // // bb.bset(0); // return; //} if (m_LastPTS != pts) { m_notToDecodCount = 0; m_h264DataFunc(m_usrParam, streamId, (char *)m_SliceBuf.head(), m_SliceBuf.len(), m_LastIsKeyFrame, m_LastPTS, localPts); m_SliceBuf.reset(); m_LastPTS = pts; m_LastIsKeyFrame = key; m_LastStreamType = streamId; } m_notToDecodCount++; m_SliceBuf.add((char*)data, len); } // 解PS包线程 int RTPReceiver::OnPsProcess() { LOG_INFO("[{}] started.", m_deviceID); while (!m_bPsExit) { m_psFrameMutex.lock(); // LOG_DEBUG("[{}] PS frame size : {}", m_deviceID, m_psVideoFrames.size()); if (m_psVideoFrames.size() <= 0){ m_psFrameMutex.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } Frame* frame = m_psVideoFrames.front(); m_psVideoFrames.pop(); m_psFrameMutex.unlock(); if (frame != nullptr) { int nRet = m_psParser.AddData(frame->buf_, frame->len_); if (nRet == -1) { LOG_INFO("m_psParser return -1--{}", m_deviceID); } else if (nRet == -2) { LOG_INFO("m_psParser return -2--{}", m_deviceID); } else if (nRet == -3) { LOG_INFO("m_psParser return -3--{}", m_deviceID); } delete frame; frame = nullptr; } } ClearPsVideoFrameList(); m_hVodEndFunc(m_usrParam); LOG_INFO("[{}] exited.", m_deviceID); return 0; } void RTPReceiver::SetDeviceID(string deviceID){ m_deviceID = deviceID; } int RTPReceiver::GetPsFrameListSize() { std::lock_guard l(m_psFrameMutex); return m_psVideoFrames.size(); } void RTPReceiver::ClearPsVideoFrameList() { std::lock_guard l(m_psFrameMutex); while (!m_psVideoFrames.empty()) { Frame* f = m_psVideoFrames.front(); delete f; m_psVideoFrames.pop(); } LOG_INFO("[{}] cleared ps video frame list!", m_deviceID); } int RTPReceiver::ParsePacket(RTPPacket* packet){ do { if (0 == packet->GetPayloadType()) { // 音频数据, 暂不处理 break; } // 判断是否收到完整的帧(有些厂商打的marker标记不准, 所以只能看时间戳来判断) uint32_t curPts = packet->GetTimestamp(); if (lastPts != 0 && curPts != lastPts) { mark = 1; } lastPts = curPts; int payloadLen = packet->GetPayloadLength(); if (offset + payloadLen > kVideoFrameSize) { offset = 0, mark = 0; break; } // LOG_DEBUG("[{}] ParsePacket GetPayloadLength", m_deviceID); if (mark) { BYTE* frameBuf = (BYTE*)malloc(sizeof(BYTE) * offset); if (!frameBuf) { offset = 0, mark = 0; break; } memcpy(frameBuf, recvTmpBuf, offset); if (!m_bPsExit){ std::lock_guard l(m_psFrameMutex); if (m_psVideoFrames.size() < 100) { // LOG_DEBUG("[{}]ParsePacket push", m_deviceID); m_psVideoFrames.push(new Frame(frameBuf, offset, false)); } else { free(frameBuf); } } else{ //若此时解码线程已经退出,不再往m_psVideoFrames推送帧,且退出当前线程 free(frameBuf); LOG_INFO("ParsePacket quit, device_id:{}", m_deviceID); return 1; } offset = 0; mark = 0; } memcpy(recvTmpBuf + offset, packet->GetPayloadData(), payloadLen); offset += payloadLen; } while (0); return 0; }