RTPReceiver.cpp 7.4 KB
#include "RTPReceiver.h"
#include "rtppacket.h"
#include "../logger.hpp"
#include <thread>

#define BUFFERSIZE_1024     1024
const int kVideoFrameSize         = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2;

// PS解包器回调
static int ReceivePESFunction(unsigned char streamid, void * data, int size, uint64_t pts, uint64_t localPts, bool key, void* param)
{
    if (NULL != data && size > 0)
    {
        ((RTPReceiver*)param)->OnPsDemux(streamid, (BYTE*)data, size, key, (uint64_t)pts, (uint64_t)localPts);
    }
    return 0;
}

static int ps_demuxer_thread_(void* param)
{
	if (!param)
	{
		return -1;
	}

	RTPReceiver* self = (RTPReceiver*)param;
	return self->OnPsProcess();
}

RTPReceiver::RTPReceiver()
:m_LastPTS(-1)
, m_LastIsKeyFrame(0)
, m_SliceBuf(1024*1024)
, m_h264DataFunc(NULL)
, m_hVodEndFunc(NULL)
, m_usrParam(NULL)
, m_bPsExit(false)
, m_psThreadPtr(nullptr)
{
    m_LastStreamType = 0;
    recvTmpBuf = new BYTE[kVideoFrameSize];
}

RTPReceiver::~RTPReceiver(){
    if(recvTmpBuf != nullptr){
        delete[] recvTmpBuf;
    }
}

void RTPReceiver::SetOutputCallback(CallBack_Stream cb, void* param)
{
	m_h264DataFunc = cb;
	m_usrParam = param;
}

void RTPReceiver::SetVodEndCallback(CallBack_VodFileEnd cb, void* param)
{
    m_hVodEndFunc = cb;
	m_usrParam = param;
}

void RTPReceiver::SetRequestStreamCallback(CallBack_Request_Stream cb){
    m_callback_request_stream = cb;
}

int RTPReceiver::InitPS(){
    
	m_psParser.SetReceiveFunction(ReceivePESFunction, this);

    m_psThreadPtr = new std::thread(ps_demuxer_thread_, this);
    if(nullptr == m_psThreadPtr){
        return -1;
    }

    LOG_INFO("[{}] InitPS finished", m_deviceID);

    return 0;
}

void RTPReceiver::ClosePsThread(){
    LOG_INFO("[{}] 3.", m_deviceID);
    m_bPsExit = true;
    // PS解包线程退出
    if (m_psThreadPtr->joinable())
    {
        m_psThreadPtr->join();
        delete m_psThreadPtr;
        m_psThreadPtr = nullptr;
    }
    
    LOG_INFO("[{}] ps demux thread quit", m_deviceID);
}

// 处理去除了PS头的数据
// 下级厂商发过来的流有可能是MPEG-4/H264/SVAC中的任意一种
// 国标标准规定, 编码方(下级)可以选择实现H264、MPEG4或者SVAC, 但解码方(上级)
// 必须同时实现对H264、MPEG4和SVAC的支持
void RTPReceiver::OnPsDemux(unsigned char streamId, BYTE *data, int len, bool key, uint64_t pts, uint64_t localPts)
{
	if (!m_h264DataFunc)
	{
		return;
	}

	if (-1 == m_LastPTS)
	{
		if (!key)
		{
			return;
		}
		m_LastPTS = pts;
		m_LastIsKeyFrame = key;
		m_LastStreamType = streamId;
	}

	// 音频数据不处理
	if (0xC0 == streamId)
	{
		return;
	}
	
	////add by mds 20190424
	//if (m_notToDecodCount > 50000)//针对大华相机,可能会很久不调用解码
	//{
	//	byte_buffer bb(64);
	//	bb << ERROR_REALSTREAM_INTERRUPT << "This session have a long time no decoding";
 //   	LOG_INFO("[{}] Long time no decoding!!!m_notToDecodCount=[{}]", m_deviceID, m_notToDecodCount);
	//		
	//	if (m_usrParam)
	//	{
 //   		if (((VideoSession *)GetUsrParam())->msgChan()->is_valid())
 //       		((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size());

	//		//通知网关关闭句柄
	//		if(!((VideoSession *)GetUsrParam())->streamHandle().empty())
	//		{
 //   			LOG_INFO("[{}] ---->Notify hisense gateway release handle = {} !<----", m_deviceID, ((VideoSession *)GetUsrParam())->streamHandle());

 //   			if (((VideoSession *)GetUsrParam())->video_type() == EREAL)
 //       			real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());
 //         
 //   			if (((VideoSession *)GetUsrParam())->video_type() == ERECORD)
 //       			record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());
 //                                                 
 //   			((VideoSession *)GetUsrParam())->streamHandle().clear();
	//		}
 //       
	//		m_hVodEndFunc(m_usrParam);
	//	}
 //     
	//	bb.bset(0);
	//	return;
	//}
	
	if (m_LastPTS != pts)
	{
		m_notToDecodCount = 0;
		m_h264DataFunc(m_usrParam, streamId, (char *)m_SliceBuf.head(), m_SliceBuf.len(), m_LastIsKeyFrame, m_LastPTS, localPts);

		m_SliceBuf.reset();

		m_LastPTS = pts;
		m_LastIsKeyFrame = key;
		m_LastStreamType = streamId;
	}

	m_notToDecodCount++;
	m_SliceBuf.add((char*)data, len);
}

// 解PS包线程
int RTPReceiver::OnPsProcess()
{
    LOG_INFO("[{}] started.", m_deviceID);
    while (!m_bPsExit) {
		m_psFrameMutex.lock();
        LOG_DEBUG("[{}] PS frame size : {}", m_deviceID, m_psVideoFrames.size());
		if (m_psVideoFrames.size() <= 0){
			m_psFrameMutex.unlock();
			std::this_thread::sleep_for(std::chrono::milliseconds(10));
			continue;
		}
        Frame* frame = m_psVideoFrames.front();
		m_psVideoFrames.pop();
		m_psFrameMutex.unlock();
		if (frame != nullptr)
		{
			int nRet = m_psParser.AddData(frame->buf_, frame->len_);
			if (nRet == -1)
			{
				LOG_INFO("m_psParser return -1--{}", m_deviceID);
			}
			else if (nRet == -2)
			{
				LOG_INFO("m_psParser return -2--{}", m_deviceID);
			}
			else if (nRet == -3)
			{
				LOG_INFO("m_psParser return -3--{}", m_deviceID);
			}

			delete frame;
			frame = nullptr;
		}
    }

	ClearPsVideoFrameList();

    m_hVodEndFunc(m_usrParam);

	LOG_INFO("[{}] exited.", m_deviceID);

	return 0;
}

void RTPReceiver::SetDeviceID(string deviceID){
    m_deviceID = deviceID; 
}

int RTPReceiver::GetPsFrameListSize()
{
	std::lock_guard<std::mutex> l(m_psFrameMutex);
	return m_psVideoFrames.size();
}

void RTPReceiver::ClearPsVideoFrameList()
{
	std::lock_guard<std::mutex> l(m_psFrameMutex);
	while (!m_psVideoFrames.empty()) {
		Frame* f = m_psVideoFrames.front();
		delete f;
		m_psVideoFrames.pop();
	}
    LOG_INFO("[{}] cleared ps video frame list!", m_deviceID);
}

int RTPReceiver::ParsePacket(RTPPacket* packet){
    do {

        if (0 == packet->GetPayloadType()) 
        {
                // 音频数据, 暂不处理
            break; 
        }

        // 判断是否收到完整的帧(有些厂商打的marker标记不准, 所以只能看时间戳来判断)
        uint32_t curPts = packet->GetTimestamp();
        if (lastPts != 0 && curPts != lastPts) {
            mark = 1;
        }
        lastPts = curPts;

        int payloadLen = packet->GetPayloadLength();
        if (offset + payloadLen > kVideoFrameSize)
        {
            offset = 0, mark = 0;
            break;
        }

        LOG_DEBUG("[{}] ParsePacket GetPayloadLength", m_deviceID);

        if (mark)
        {
            BYTE* frameBuf = (BYTE*)malloc(sizeof(BYTE) * offset);
            if (!frameBuf) {
                offset = 0, mark = 0;
                break; 
            }
            memcpy(frameBuf, recvTmpBuf, offset);
            if (!m_bPsExit){
                std::lock_guard<std::mutex> l(m_psFrameMutex);
                if (m_psVideoFrames.size() < 100)
                {
                    LOG_DEBUG("[{}]ParsePacket push", m_deviceID);
                    m_psVideoFrames.push(new Frame(frameBuf, offset, false));
                }
                else {
                    free(frameBuf);
                }
            }
            else{
                //若此时解码线程已经退出,不再往m_psVideoFrames推送帧,且退出当前线程
                free(frameBuf);
                LOG_INFO("ParsePacket quit, device_id:{}", m_deviceID);
                return 1;
            }
            offset = 0;
            mark = 0;
        }

        memcpy(recvTmpBuf + offset, packet->GetPayloadData(), payloadLen);
        offset += payloadLen;
    } while (0);

    return 0;
}