#include "RTPReceiver.h" #include #include #include #include #include "../logger.hpp" using namespace std; #define BUFFERSIZE_1024 1024 #define BUFFERSIZE_GAP 1024//5120 //1024*5 namespace { const int kH264EndFlag = 0x00000001; const int kH264EndFlag_ = 0x000001; const int kMpeg4IEndFlag = 0x000001B0; const int kMpeg4PEndFlag = 0x000001B6; const int kSvacEndFlag = 0x000001B6; const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2; const int kRtpRecvBufferSize = BUFFERSIZE_1024*BUFFERSIZE_1024*2; const int kSockBufferSize = BUFFERSIZE_1024*BUFFERSIZE_1024*2; const uint16_t kInvalidPort = 0; // PS解包器回调 int ReceivePESFunction(unsigned char streamid, void * data, int size, uint64_t pts, uint64_t localPts, bool key, void* param) { if (NULL != data && size > 0) { ((RTPReceiver*)param)->OnPsDemux(streamid, (BYTE*)data, size, key, (uint64_t)pts, (uint64_t)localPts); } return 0; } }; // namespace static long long get_cur_time() { // 获取操作系统当前时间点(精确到微秒) chrono::time_point tpMicro = chrono::time_point_cast(chrono::system_clock::now()); // (微秒精度的)时间点 => (微秒精度的)时间戳 return tpMicro.time_since_epoch().count(); } RTPReceiver::RTPReceiver() : m_localPort(kInvalidPort) , m_bRtpExit(false) , m_bPsExit(false) , m_h264DataFunc(NULL) , m_usrParam(NULL) , m_bOpened(false) , m_hVodEndFunc(NULL) , m_LastPTS(-1) , m_LastIsKeyFrame(0) , m_SliceBuf(1024*1024) , m_idleCount(-1) ,m_noDataCount(-1) { m_LastStreamType=0; } RTPReceiver::~RTPReceiver() { if (IsOpened()) Close(); LOG_INFO("RTPReceiver::~RTPReceiver() destruct OK--{}", m_deviceID); } void RTPReceiver::SetOutputCallback(CallBack_Stream cb, void* param) { m_h264DataFunc = cb; m_usrParam = param; } void RTPReceiver::SetVodEndCallback(CallBack_VodFileEnd cb, void* param) { m_hVodEndFunc = cb; m_usrParam = param; } int RTPReceiver::rtp_revc_thread_(void* param) { if (!param) { return -1; } RTPReceiver* self = (RTPReceiver*)param; return self->OnRtpRecv(); } int RTPReceiver::ps_demuxer_thread_(void* param) { if (!param) { return -1; } RTPReceiver* self = (RTPReceiver*)param; return self->OnPsProcess(); } int RTPReceiver::ps_decode_thread_(void* param) { if (!param) { return -1; } RTPReceiver* self = (RTPReceiver*)param; return self->OnDecodeProcess(); } bool RTPReceiver::Open(uint16_t localPort) { LOG_INFO("--2---RTPReceiver::Open--{}", m_deviceID); m_localPort = localPort; RTPSessionParams sessparams; sessparams.SetUsePollThread(true); sessparams.SetMinimumRTCPTransmissionInterval(10); sessparams.SetOwnTimestampUnit(1.0/90000.0); sessparams.SetAcceptOwnPackets(true); RTPUDPv4TransmissionParams transparams; transparams.SetPortbase(m_localPort); transparams.SetRTPReceiveBuffer(kRtpRecvBufferSize); cout << "port: " << m_localPort << endl; LOG_INFO("--3---RTPReceiver::Open--{}", m_deviceID); int err = m_rtpSession.Create(sessparams, &transparams); LOG_INFO("--4---RTPReceiver::Open--{}", m_deviceID); if (err != 0) { LOG_INFO("RTPReceiver::Open m_rtpSession.Create error: {}--{}", err, m_deviceID); return false; } LOG_INFO("--5---RTPReceiver::Open--{}", m_deviceID); m_psParser.SetReceiveFunction(ReceivePESFunction, this); m_bOpened = true; LOG_INFO("RTPReceiver::Open ok--{}", m_deviceID); LOG_INFO("--1---RTPReceiver::Open--{}", m_deviceID); m_rtpThread = std::thread(rtp_revc_thread_, this); m_psThread = std::thread(ps_demuxer_thread_, this); return true; } bool RTPReceiver::IsOpened() const { return m_bOpened; } void RTPReceiver::Close() { m_bRtpExit = true; m_bPsExit = true; // rtp接收线程退出 if (m_rtpThread.joinable()) { m_rtpThread.join(); } m_rtpSession.Destroy(); LOG_INFO("--2---RTPReceiver::Close rtp recv thread quit --{}", m_deviceID); // PS解包线程退出 if (m_psThread.joinable()) { m_psThread.join(); } LOG_INFO("--3---RTPReceiver::Close ps demux thread quit--{}", m_deviceID); m_bOpened = false; } int RTPReceiver::GetPsFrameListSize() { std::lock_guard l(m_psFrameMutex); return m_psVideoFrames.size(); } void RTPReceiver::ClearPsVideoFrameList() { std::lock_guard l(m_psFrameMutex); while (!m_psVideoFrames.empty()) { Frame* f = m_psVideoFrames.front(); delete f; m_psVideoFrames.pop(); } LOG_INFO("---->cleared ps video frame list!<----{}", m_deviceID); } // 收RTP包线程 int RTPReceiver::OnRtpRecv() { uint32_t lastPts = 0; uint64_t last_recv_ts{0}; int offset = 0; int mark = 0; BYTE* recvTmpBuf = new BYTE[kVideoFrameSize]; while (!m_bRtpExit) { //try //{ m_rtpSession.Poll(); m_rtpSession.BeginDataAccess(); if (m_rtpSession.GotoFirstSourceWithData()) { last_recv_ts = get_cur_time(); m_idleCount = 0; m_noDataCount = 0; do { RTPPacket* packet; while ((packet = m_rtpSession.GetNextPacket()) != NULL/* && !mark*/) { do { if (0 == packet->GetPayloadType()) { // 音频数据, 暂不处理 break; // goto skip_this_packet; } // 判断是否收到完整的帧(有些厂商打的marker标记不准, 所以只能看时间戳来判断) uint32_t curPts = packet->GetTimestamp(); if (lastPts != 0 && curPts != lastPts) { mark = 1; } lastPts = curPts; int payloadLen = packet->GetPayloadLength(); if (offset + payloadLen > kVideoFrameSize) { offset = 0, mark = 0; break; // goto skip_this_packet; } if (mark) { BYTE* frameBuf = (BYTE*)malloc(sizeof(BYTE) * offset); if (!frameBuf) { offset = 0, mark = 0; break; // goto skip_this_packet; } memcpy(frameBuf, recvTmpBuf, offset); if (!m_bPsExit) { std::lock_guard l(m_psFrameMutex); if (m_psVideoFrames.size() < 100) { m_psVideoFrames.push(new Frame(frameBuf, offset, false)); } else { free(frameBuf); } } else { //若此时解码线程已经退出,不再往m_psVideoFrames推送帧,且退出当前线程 free(frameBuf); LOG_INFO("OnPsProcess quit, device_id:{}", m_deviceID); //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "recv video stream interruption!"); } offset = 0; mark = 0; } memcpy(recvTmpBuf + offset, packet->GetPayloadData(), payloadLen); offset += payloadLen; } while (0); //skip_this_packet: m_rtpSession.DeletePacket(packet); } } while (m_rtpSession.GotoNextSourceWithData()); } //else { // if (m_idleCount != -1) // { // ++m_idleCount;//流中断计数 // } // if (m_noDataCount != 0) // { // --m_noDataCount;//没流计数 // } // //if (m_idleCount > 3000) { // // m_hVodEndFunc(m_usrParam); // // m_idleCount = 0; // //历史流结束的时候,也会出现超时,这个是正常的 // if(m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD) // { // if (m_idleCount > 10000) // { // //这里要判断下历史流是否结束,如果未结束,就设置为流中断 // //由于record_stream_status这个函数返回值不准确,所以增加一个进度条大于80% // if(record_stream_status(((VideoSession *)GetUsrParam())->streamHandle())) // { // LOG_INFO("************Record stream is finished**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress()); // m_idleCount = -1; // m_hVodEndFunc(m_usrParam); // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // ((VideoSession *)GetUsrParam())->streamHandle().clear(); // } // else // { // //如果此时进度大于80% 算完成吧 // if(((VideoSession *)GetUsrParam())->progress() > 0.80) // { // LOG_INFO("************Record stream is overtime**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress()); // m_idleCount = 0; // m_hVodEndFunc(m_usrParam); // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // ((VideoSession *)GetUsrParam())->streamHandle().clear(); // } // else // { // m_idleCount = -1; // //LOG_ERROR("************post ERROR_REALSTREAM_INTERRUPT to structure****{}********", m_deviceID); // //发送流中断 // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption!"); // } // } // // // } // // if (m_noDataCount < -200000)//任务开始时没收到流 // { // //LOG_ERROR("************m_hVodEndFunc(m_usrParam)!!!m_hVodEndFunc(m_usrParam)********{}******", m_deviceID); // m_noDataCount = -1; // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption2!"); // //m_hVodEndFunc(m_usrParam); // } // } // else//实时任务断流 // //if (m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) // { // // //每超过3000次,发送一次send_vedio_eof 时长大约1.5s // //若是30000,时长大约 18s // if(m_idleCount > 30000) // { // uint64_t cts = get_cur_time(); // float duration_not_recv = (cts - last_recv_ts) / 1000.0; // // //LOG_ERROR("************I haven't got stream from hik gateway exceed {}s,send eof********{}******", duration_not_recv, m_deviceID); // m_idleCount = -1; // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption!"); // } // // if (m_noDataCount < -200000)//任务开始时没收到流 // { // //LOG_ERROR("************m_noDataCount < -200000********{}******", m_deviceID); // m_noDataCount = -1; // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption2!"); // } // // } //} //} // catch (GeneralException2& e) //{ // //LOG_ERROR("---> video streaming interruption!<---{}, error: {}", m_deviceID, e.err_msg()); // byte_buffer bb(64); // bb << VasCmd::VAS_CMD_REALSTREAM_INTERRUPT << e.err_msg(); // if (m_usrParam) // { // if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) { // try { // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size()); // } // catch (GeneralException2& e) { // //LOG_ERROR("[{}] send vas cmd VAS_CMD_REALSTREAM_INTERRUPT error: {}, {}", m_deviceID, e.err_code(), e.err_str()); // } // } // //通知网关关闭句柄 // if(!((VideoSession *)GetUsrParam())->streamHandle().empty()) // { // LOG_INFO("---->Notify hisense gateway release handle = {} !<----{}", ((VideoSession *)GetUsrParam())->streamHandle().c_str(), m_deviceID); // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD) // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // // //清理保活的句柄 // ((VideoSession *)GetUsrParam())->streamHandle().clear(); // } // } // // bb.bset(0); // //} m_rtpSession.EndDataAccess(); RTPTime::Wait(RTPTime(0, 500)); } delete [] recvTmpBuf; return 0; } // 解PS包线程 int RTPReceiver::OnPsProcess() { while (!m_bPsExit) { m_psFrameMutex.lock(); if (m_psVideoFrames.size() <= 0){ m_psFrameMutex.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } Frame* frame = m_psVideoFrames.front(); m_psVideoFrames.pop(); m_psFrameMutex.unlock(); if (frame != nullptr) { int nRet = m_psParser.AddData(frame->buf_, frame->len_); if (nRet == -1) { LOG_INFO("m_psParser return -1--{}", m_deviceID); } else if (nRet == -2) { LOG_INFO("m_psParser return -2--{}", m_deviceID); } else if (nRet == -3) { LOG_INFO("m_psParser return -3--{}", m_deviceID); } delete frame; frame = nullptr; } } ClearPsVideoFrameList(); return 0; } // 处理去除了PS头的数据 // 下级厂商发过来的流有可能是MPEG-4/H264/SVAC中的任意一种 // 国标标准规定, 编码方(下级)可以选择实现H264、MPEG4或者SVAC, 但解码方(上级) // 必须同时实现对H264、MPEG4和SVAC的支持 void RTPReceiver::OnPsDemux(unsigned char streamId, BYTE *data, int len, bool key, uint64_t pts, uint64_t localPts) { if (!m_h264DataFunc) { return; } if (-1 == m_LastPTS) { if (!key) { return; } m_LastPTS = pts; m_LastIsKeyFrame = key; m_LastStreamType = streamId; } // 音频数据不处理 if (0xC0 == streamId) { return; } ////add by mds 20190424 //if (m_notToDecodCount > 50000)//针对大华相机,可能会很久不调用解码 //{ // byte_buffer bb(64); // bb << ERROR_REALSTREAM_INTERRUPT << "This session have a long time no decoding"; // LOG_INFO("[{}] Long time no decoding!!!m_notToDecodCount=[{}]", m_deviceID, m_notToDecodCount); // // if (m_usrParam) // { // if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size()); // //通知网关关闭句柄 // if(!((VideoSession *)GetUsrParam())->streamHandle().empty()) // { // LOG_INFO("[{}] ---->Notify hisense gateway release handle = {} !<----", m_deviceID, ((VideoSession *)GetUsrParam())->streamHandle()); // if (((VideoSession *)GetUsrParam())->video_type() == EREAL) // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // // if (((VideoSession *)GetUsrParam())->video_type() == ERECORD) // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); // // ((VideoSession *)GetUsrParam())->streamHandle().clear(); // } // // m_hVodEndFunc(m_usrParam); // } // // bb.bset(0); // return; //} if (m_LastPTS != pts) { m_notToDecodCount = 0; m_h264DataFunc(m_usrParam, streamId, (char *)m_SliceBuf.head(), m_SliceBuf.len(), m_LastIsKeyFrame, m_LastPTS, localPts); m_SliceBuf.reset(); m_LastPTS = pts; m_LastIsKeyFrame = key; m_LastStreamType = streamId; } m_notToDecodCount++; m_SliceBuf.add((char*)data, len); } int RTPReceiver::OnDecodeProcess() { return 0; }