diff --git a/.vscode/launch.json b/.vscode/launch.json index d15a7c0..8c146b1 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -37,6 +37,24 @@ "ignoreFailures": true } ] + },{ + "name": "jrtp", + "type": "cppdbg", + "request": "launch", + "program": "${workspaceFolder}/bin/lib/jrtp_exe", + "args": ["40030","t"], + "stopAtEntry": false, + "cwd": "${workspaceFolder}/bin/lib", + "environment": [], + "externalConsole": false, + "MIMode": "gdb", + "setupCommands": [ + { + "description": "Enable pretty-printing for gdb", + "text": "-enable-pretty-printing", + "ignoreFailures": true + } + ] } ] } \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index b797d5c..69283fc 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -61,6 +61,7 @@ "streambuf": "cpp", "cfenv": "cpp", "cinttypes": "cpp", - "__nullptr": "cpp" + "__nullptr": "cpp", + "list": "cpp" } } \ No newline at end of file diff --git a/jrtp/Makefile b/jrtp/Makefile new file mode 100644 index 0000000..332116a --- /dev/null +++ b/jrtp/Makefile @@ -0,0 +1,42 @@ +XX = g++ + + +PROJECT_ROOT= /mnt/data/cmhu/FFNvDecoder + +DEPEND_DIR = $(PROJECT_ROOT)/bin +SRC_ROOT = $(PROJECT_ROOT)/jrtp +THIRDPARTY_ROOT = $(PROJECT_ROOT)/3rdparty + + +TARGET= $(DEPEND_DIR)/lib/jrtp_exe + + +JRTP_ROOT = $(THIRDPARTY_ROOT)/jrtp_export + + +INCLUDE= -I $(SRC_ROOT)\ + -I $(JRTP_ROOT)/jrtplib/include/jrtplib3 \ + -I $(JRTP_ROOT)/jthread/include/jthread + +LIBSPATH= -L $(JRTP_ROOT)/jthread/lib -l:libjthread.a \ + -L $(JRTP_ROOT)/jrtplib/lib -l:libjrtp.a + + +CFLAGS= -g -O0 -fPIC $(INCLUDE) -pthread -lrt -lz -std=c++11 -fvisibility=hidden -Wl,-Bsymbolic -ldl + # -DUNICODE -D_UNICODE + + +SRCS:=$(wildcard $(SRC_ROOT)/*.cpp) +OBJS = $(patsubst %.cpp, %.o, $(notdir $(SRCS))) + + +$(TARGET):$(OBJS) $(CU_OBJS) + rm -f $(TARGET) + $(XX) -o $@ $^ $(CFLAGS) $(LIBSPATH) $(LIBS) -Wwrite-strings + rm -f *.o + +%.o:$(SRC_ROOT)/%.cpp + $(XX) $(CFLAGS) -c $< + +clean: + rm -f *.o $(TARGET) \ No newline at end of file diff --git a/jrtp/example3.cpp b/jrtp/example3.cpp new file mode 100644 index 0000000..1ad653a --- /dev/null +++ b/jrtp/example3.cpp @@ -0,0 +1,372 @@ +/* + This IPv4 example listens for incoming packets and automatically adds destinations + for new sources. +*/ + +#include "rtpsession.h" +#include "rtppacket.h" +#include "rtpudpv4transmitter.h" +#include "rtptcptransmitter.h" +#include "rtpipv4address.h" +#include "rtptcpaddress.h" +#include "rtpsessionparams.h" +#include "rtperrors.h" +#include "rtpsourcedata.h" +#include +#include +#include +#include + +using namespace jrtplib; +using namespace std; + +// +// This function checks if there was a RTP error. If so, it displays an error +// message and exists. +// + +void checkerror(int rtperr) +{ + if (rtperr < 0) + { + std::cout << "ERROR: " << RTPGetErrorString(rtperr) << std::endl; + exit(-1); + } +} + +// +// The new class routine +// + +class MyRTPSession : public RTPSession +{ +protected: + void OnNewSource(RTPSourceData *dat) + { + if (dat->IsOwnSSRC()) + return; + + uint32_t ip; + uint16_t port; + + if (dat->GetRTPDataAddress() != 0) + { + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress()); + ip = addr->GetIP(); + port = addr->GetPort(); + } + else if (dat->GetRTCPDataAddress() != 0) + { + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress()); + ip = addr->GetIP(); + port = addr->GetPort()-1; + } + else + return; + + RTPIPv4Address dest(ip,port); + AddDestination(dest); + + struct in_addr inaddr; + inaddr.s_addr = htonl(ip); + std::cout << "Adding destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl; + } + + void OnBYEPacket(RTPSourceData *dat) + { + if (dat->IsOwnSSRC()) + return; + + uint32_t ip; + uint16_t port; + + if (dat->GetRTPDataAddress() != 0) + { + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress()); + ip = addr->GetIP(); + port = addr->GetPort(); + } + else if (dat->GetRTCPDataAddress() != 0) + { + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress()); + ip = addr->GetIP(); + port = addr->GetPort()-1; + } + else + return; + + RTPIPv4Address dest(ip,port); + DeleteDestination(dest); + + struct in_addr inaddr; + inaddr.s_addr = htonl(ip); + std::cout << "Deleting destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl; + } + + void OnRemoveSource(RTPSourceData *dat) + { + if (dat->IsOwnSSRC()) + return; + if (dat->ReceivedBYE()) + return; + + uint32_t ip; + uint16_t port; + + if (dat->GetRTPDataAddress() != 0) + { + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress()); + ip = addr->GetIP(); + port = addr->GetPort(); + } + else if (dat->GetRTCPDataAddress() != 0) + { + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress()); + ip = addr->GetIP(); + port = addr->GetPort()-1; + } + else + return; + + RTPIPv4Address dest(ip,port); + DeleteDestination(dest); + + struct in_addr inaddr; + inaddr.s_addr = htonl(ip); + std::cout << "Deleting destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl; + } + + void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled) + { + printf("SSRC %x Got packet (%d bytes) in OnValidatedRTPPacket from source 0x%04x!\n", GetLocalSSRC(), + (int)rtppack->GetPayloadLength(), srcdat->GetSSRC()); + DeletePacket(rtppack); + *ispackethandled = true; + } + + void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength) + { + char msg[1024]; + + memset(msg, 0, sizeof(msg)); + if (itemlength >= sizeof(msg)) + itemlength = sizeof(msg)-1; + + memcpy(msg, itemdata, itemlength); + printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC()); + } +}; + +class MyTCPTransmitter : public RTPTCPTransmitter +{ +public: + MyTCPTransmitter() : RTPTCPTransmitter(0){ } + + void OnSendError(SocketType sock) + { + cout << ": Error sending over socket " << sock << ", removing destination" << endl; + DeleteDestination(RTPTCPAddress(sock)); + } + + void OnReceiveError(SocketType sock) + { + cout << ": Error receiving from socket " << sock << ", removing destination" << endl; + DeleteDestination(RTPTCPAddress(sock)); + } +}; + +int udp_mode(int port){ + MyRTPSession sess; + + std::string ipstr; + int status,i,num; + + num = 1000000*30; + + // Now, we'll create a RTP session, set the destination + // and poll for incoming data. + + RTPUDPv4TransmissionParams transparams; + RTPSessionParams sessparams; + + // IMPORTANT: The local timestamp unit MUST be set, otherwise + // RTCP Sender Report info will be calculated wrong + // In this case, we'll be just use 8000 samples per second. + sessparams.SetOwnTimestampUnit(1.0/8000.0); + + sessparams.SetAcceptOwnPackets(true); + transparams.SetPortbase(port); + status = sess.Create(sessparams,&transparams); + checkerror(status); + + std::cout << "begin.." << std::endl; + + for (i = 1 ; i <= num ; i++) + { + sess.BeginDataAccess(); + + // check incoming packets + if (sess.GotoFirstSourceWithData()) + { + do + { + RTPPacket *pack; + + while ((pack = sess.GetNextPacket()) != NULL) + { + // You can examine the data here + printf("Got packet !\n"); + + // we don't longer need the packet, so + // we'll delete it + sess.DeletePacket(pack); + } + } while (sess.GotoNextSourceWithData()); + } + + sess.EndDataAccess(); + +#ifndef RTP_SUPPORT_THREAD + status = sess.Poll(); + checkerror(status); +#endif // RTP_SUPPORT_THREAD + + RTPTime::Wait(RTPTime(0,1)); + } + + sess.BYEDestroy(RTPTime(10,0),0,0); + + return 0; +} + +int tcp_mode(int port){ + int nListener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (nListener < 0) + { + return -1; + } + + sockaddr_in serverAddr; + memset(&serverAddr, 0, sizeof(sockaddr_in)); + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(port); + serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); + int nRet = bind(nListener, (sockaddr*)&serverAddr, sizeof(serverAddr)); + if (nRet == -1) + { + return -1; + } + + if (listen(nListener, 1) == -1) + { + return -1; + } + + sockaddr_in clientAddr; + int nLen = sizeof(sockaddr_in); + int nServer = accept(nListener, (sockaddr*)&clientAddr, (socklen_t * ) &nLen); + if (nServer == -1) + { + return -1; + } + + + int nPackSize = 45678; + RTPSessionParams sessparams; + sessparams.SetProbationType(RTPSources::NoProbation); + sessparams.SetOwnTimestampUnit(90000.0 / 25.0); + sessparams.SetMaximumPacketSize(nPackSize + 64); + + MyTCPTransmitter trans; + int status = trans.Init(true); + status = trans.Create(65535, NULL); + + MyRTPSession sess; + status = sess.Create(sessparams, &trans); + if (status < 0) + { + std::cout << "create session error!!" << std::endl; + return -1; + } + + sess.AddDestination(RTPTCPAddress(nServer)); + + std::cout << "begin.." << std::endl; + + while (true) + { + RTPTime::Wait(RTPTime(1,0)); + } + + + while(true){ + sess.BeginDataAccess(); + + // check incoming packets + if (sess.GotoFirstSourceWithData()) + { + do + { + RTPPacket *pack; + + while ((pack = sess.GetNextPacket()) != NULL) + { + // You can examine the data here + printf("Got packet !\n"); + + // we don't longer need the packet, so + // we'll delete it + sess.DeletePacket(pack); + } + } while (sess.GotoNextSourceWithData()); + } + + sess.EndDataAccess(); + +#ifndef RTP_SUPPORT_THREAD + status = sess.Poll(); + checkerror(status); +#endif // RTP_SUPPORT_THREAD + + RTPTime::Wait(RTPTime(1,0)); + } + + sess.BYEDestroy(RTPTime(10,0),0,0); + + return 0; +} +// +// The main routine +// + +int main(int argc, char* argv[]){ + + int port = atoi(argv[1]); + std::cout << "port:" << port << std::endl; + + while (true) + { + int ch = getchar(); + if (ch == 'q') + { + break; + } + + switch (ch) + { + case 'u': + udp_mode(port); + break; + case 't': + tcp_mode(port); + break; + default: + break; + } + + /* code */ + } + + return 0; +} + diff --git a/jrtp/tcp_server.cpp b/jrtp/tcp_server.cpp new file mode 100644 index 0000000..308b6db --- /dev/null +++ b/jrtp/tcp_server.cpp @@ -0,0 +1,244 @@ +/* + This IPv4 example listens for incoming packets and automatically adds destinations + for new sources. +*/ + +#include "rtpsession.h" +#include "rtppacket.h" +#include "rtpudpv4transmitter.h" +#include "rtptcptransmitter.h" +#include "rtpipv4address.h" +#include "rtptcpaddress.h" +#include "rtpsessionparams.h" +#include "rtperrors.h" +#include "rtpsourcedata.h" +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace jrtplib; +using namespace std; + +// +// This function checks if there was a RTP error. If so, it displays an error +// message and exists. +// + +void checkerror(int rtperr) +{ + if (rtperr < 0) + { + std::cout << "ERROR: " << RTPGetErrorString(rtperr) << std::endl; + exit(-1); + } +} + +static long long get_cur_time(){ + chrono::time_point tpMs + = chrono::time_point_cast(chrono::system_clock::now()); + + return tpMs.time_since_epoch().count(); +} + +// +// The new class routine +// + +// class MyRTPSession : public RTPSession +// { +// protected: + +// void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled) +// { +// // printf("timestamp: %ld SSRC %x Got packet (%d bytes) in OnValidatedRTPPacket from source 0x%04x!\n", get_cur_time(), GetLocalSSRC(), +// // (int)rtppack->GetPayloadLength(), srcdat->GetSSRC()); +// DeletePacket(rtppack); +// *ispackethandled = true; +// } + +// void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength) +// { +// char msg[1024]; + +// memset(msg, 0, sizeof(msg)); +// if (itemlength >= sizeof(msg)) +// itemlength = sizeof(msg)-1; + +// memcpy(msg, itemdata, itemlength); +// // printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC()); +// } + +// virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress) +// { +// AddDestination(*senderAddress); +// } + +// virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress) +// { +// //AddDestination(*senderaddress); +// //const char* name = "hi~"; +// //SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8); + +// printf("send rtcp app"); +// } +// }; + +bool bSocket = false; + +class MyTCPTransmitter : public RTPTCPTransmitter +{ +public: + MyTCPTransmitter() : RTPTCPTransmitter(0){ } + + void OnSendError(SocketType sock) + { + cout << "timestamp: " << get_cur_time() << " : Error sending over socket " << sock << ", removing destination" << endl; + DeleteDestination(RTPTCPAddress(sock)); + + bSocket = false; + } + + void OnReceiveError(SocketType sock) + { + cout << ": Error receiving from socket " << sock << ", removing destination" << endl; + DeleteDestination(RTPTCPAddress(sock)); + } +}; + +RTPSession sess; +MyTCPTransmitter* trans; +RTPSessionParams sessparams; + +int thread_func(void* param){ + + cout << "thread started..." << endl; + + int* p = (int*) param ; + sockaddr_in clientAddr; + int nLen = sizeof(sockaddr_in); + int nServer = -1;//accept(*p, (sockaddr*)&clientAddr, (socklen_t * ) &nLen); + // if (nServer == -1){ + // return -1; + // } + + cout << "timestamp: " << get_cur_time() << " while() start" << endl; + + while(true){ + + while (!bSocket) + { + nServer = accept(*p, (sockaddr*)&clientAddr, (socklen_t * ) &nLen); + if (nServer == -1) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + + cout << "nServer = " << nServer << endl; + sess.AddDestination(RTPTCPAddress(nServer)); + bSocket = true; + break; + } + + sess.BeginDataAccess(); + + // check incoming packets + if (sess.GotoFirstSourceWithData()) + { + do + { + RTPPacket *pack; + + while ((pack = sess.GetNextPacket()) != NULL) + { + // You can examine the data here + // printf("Got packet !\n"); + cout << "Got packet ! timestamp: " << get_cur_time() << endl; + + // we don't longer need the packet, so + // we'll delete it + sess.DeletePacket(pack); + } + } while (sess.GotoNextSourceWithData()); + } + + sess.EndDataAccess(); + + sess.Poll(); + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + close(*p); + + // sess.BYEDestroy(RTPTime(10,0),0,0); +} + +int nListener = -1; +int tcp_mode(int port){ + nListener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (nListener < 0) + { + return -1; + } + + sockaddr_in serverAddr; + memset(&serverAddr, 0, sizeof(sockaddr_in)); + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(port); + serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); + int nRet = bind(nListener, (sockaddr*)&serverAddr, sizeof(serverAddr)); + if (nRet == -1) + { + return -1; + } + + if (listen(nListener, 1) == -1) + { + return -1; + } + + int nPackSize = 45678; + + sessparams.SetProbationType(RTPSources::NoProbation); + sessparams.SetOwnTimestampUnit(90000.0 / 25.0); + sessparams.SetMaximumPacketSize(nPackSize + 64); + + trans = new MyTCPTransmitter(); + int status = trans->Init(false); + status = trans->Create(65535, NULL); + + status = sess.Create(sessparams, trans); + if (status < 0) + { + std::cout << "create session error!!" << std::endl; + return -1; + } + + std::cout << "begin.." << std::endl; + + thread* t = new std::thread(thread_func, &nListener); + + return 0; +} +// +// The main routine +// + +int main(int argc, char* argv[]){ + + tcp_mode(40032); + + while(getchar() =='q'); + + return 0; +} + diff --git a/src/AbstractDecoder.h b/src/AbstractDecoder.h index 41f5b32..8f696bc 100644 --- a/src/AbstractDecoder.h +++ b/src/AbstractDecoder.h @@ -32,6 +32,8 @@ typedef void(*POST_DECODE_CALLBACK)(const void * userPtr, AVFrame * gpuFrame); typedef void(*DECODE_FINISHED_CALLBACK)(const void* userPtr); +typedef bool(*DECODE_REQUEST_STREAM_CALLBACK)(); + struct FFDecConfig{ string uri; // 视频地址 POST_DECODE_CALLBACK post_decoded_cbk; // 解码数据回调接口 @@ -41,6 +43,7 @@ struct FFDecConfig{ int skip_frame{1}; // 跳帧数 int port; // gb28181接收数据的端口号 + DECODE_REQUEST_STREAM_CALLBACK request_stream_cbk; // gb28181请求流 }; enum DECODER_TYPE{ diff --git a/src/FFNvDecoder.cpp b/src/FFNvDecoder.cpp index 6426a24..4b20546 100644 --- a/src/FFNvDecoder.cpp +++ b/src/FFNvDecoder.cpp @@ -69,6 +69,9 @@ bool FFNvDecoder::init(FFDecConfig& cfg) m_bReal = true; } + post_decoded_cbk = cfg.post_decoded_cbk; + decode_finished_cbk = cfg.decode_finished_cbk; + return init(cfg.uri.c_str(), cfg.gpuid.c_str(),cfg.force_tcp); } diff --git a/src/FFNvDecoderManager.cpp b/src/FFNvDecoderManager.cpp index e461a09..edb4669 100644 --- a/src/FFNvDecoderManager.cpp +++ b/src/FFNvDecoderManager.cpp @@ -15,13 +15,12 @@ AbstractDecoder* FFNvDecoderManager::createDecoder(MgrDecConfig config){ if (config.cfg.post_decoded_cbk == nullptr || config.cfg.decode_finished_cbk== nullptr){ return nullptr; } - std::lock_guard l(m_mutex); auto it = decoderMap.find(config.name); if (it != decoderMap.end()){ - LOG_ERROR("已存在name所标记的解码器"); + LOG_ERROR("已存在name为{}的解码器", config.name); return nullptr; } @@ -41,8 +40,6 @@ AbstractDecoder* FFNvDecoderManager::createDecoder(MgrDecConfig config){ if (bRet) { dec->setName(config.name) ; - dec->post_decoded_cbk = config.cfg.post_decoded_cbk; - dec->decode_finished_cbk = config.cfg.decode_finished_cbk; decoderMap[config.name] = dec; LOG_INFO("[{}][{}]- 解码器初始化成功",config.name, config.cfg.uri); diff --git a/src/Makefile b/src/Makefile index 176b974..3da0ec5 100644 --- a/src/Makefile +++ b/src/Makefile @@ -30,7 +30,7 @@ INCLUDE= -I $(DEPEND_DIR)/include \ LIBSPATH= -L $(DEPEND_DIR)/lib -lavformat -lavcodec -lswscale -lavutil -lavfilter -lswresample -lavdevice \ -L $(CUDA_ROOT)/lib64 -lcuda -lcudart -lnvcuvid -lcurand -lcublas -lnvjpeg \ - -L $(SPDLOG_ROOT)/lib64 -l:libspdlog.a \ + -L $(SPDLOG_ROOT) -l:libspdlog.a \ -L $(JRTP_ROOT)/jthread/lib -l:libjthread.a \ -L $(JRTP_ROOT)/jrtplib/lib -l:libjrtp.a diff --git a/src/gb28181/FFGB28181Decoder.cpp b/src/gb28181/FFGB28181Decoder.cpp index 22a6ff0..debb325 100644 --- a/src/gb28181/FFGB28181Decoder.cpp +++ b/src/gb28181/FFGB28181Decoder.cpp @@ -12,6 +12,9 @@ extern "C" { #include "../logger.hpp" +#include"RTPTcpReceiver.h" +#include"RTPUdpReceiver.h" + #define ECLOSED 0 #define ECLOSING 1 #define ERUNNING 2 @@ -19,14 +22,14 @@ extern "C" { static void RTP_Stream_CallBack(void* userdata, int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts) { - FFGB28181Decoder* session = (FFGB28181Decoder*)userdata; - session->stream_callback(videoType, data, len, isKey, pts, localPts); + FFGB28181Decoder* decoder = (FFGB28181Decoder*)userdata; + decoder->stream_callback(videoType, data, len, isKey, pts, localPts); } static void RTP_Stream_End_CallBack(void* userdata) { - FFGB28181Decoder* session = (FFGB28181Decoder*)userdata; - session->stream_end_callback(); + FFGB28181Decoder* decoder = (FFGB28181Decoder*)userdata; + decoder->stream_end_callback(); } FFGB28181Decoder::FFGB28181Decoder() { @@ -59,20 +62,30 @@ void FFGB28181Decoder::close(){ m_status = ECLOSING; - LOG_INFO("real decode thread exit success 1--{}", m_dec_name); + LOG_DEBUG("real decode thread exit success 1--{}", m_dec_name); + + if(nullptr != m_rtpPtr){ + if (m_rtpPtr->IsOpened()) { + m_rtpPtr->Close(); + LOG_DEBUG("real decode thread exit success 2--{}", m_dec_name); + } - if (m_rtp.IsOpened()) { - m_rtp.Close(); - LOG_INFO("real decode thread exit success 4--{}", m_dec_name); + delete m_rtpPtr; + m_rtpPtr = nullptr; } - stream_end_callback(); + LOG_INFO("解码器关闭成功 --{}", m_dec_name); m_status = ECLOSED; } bool FFGB28181Decoder::init(FFDecConfig& cfg){ - if (m_rtp.IsOpened()){ + if(cfg.force_tcp){ + m_rtpPtr = new RTPTcpReceiver(); + }else{ + m_rtpPtr = new RTPUdpReceiver(); + } + if(nullptr == m_rtpPtr){ return false; } @@ -82,12 +95,23 @@ bool FFGB28181Decoder::init(FFDecConfig& cfg){ m_gpuid = atoi(cfg.gpuid.c_str()); - m_rtp.SetDeviceID(m_dec_name); + m_rtpPtr->SetDeviceID(m_dec_name); + + if(cfg.request_stream_cbk == nullptr){ + LOG_INFO("request_stream_cbk 为 nullptr -- {}", m_dec_name); + return false; + } + + post_decoded_cbk = cfg.post_decoded_cbk; + decode_finished_cbk = cfg.decode_finished_cbk; + m_rtpPtr->SetRequestStreamCallback(cfg.request_stream_cbk); m_port = cfg.port; m_cfg = cfg; + LOG_INFO("init - {} : ", m_dec_name, m_port); + return true; } @@ -95,10 +119,12 @@ bool FFGB28181Decoder::start() { m_status = ERUNNING; - m_rtp.SetOutputCallback(RTP_Stream_CallBack, this); - m_rtp.SetVodEndCallback(RTP_Stream_End_CallBack, this); + m_rtpPtr->SetOutputCallback(RTP_Stream_CallBack, this); + m_rtpPtr->SetVodEndCallback(RTP_Stream_End_CallBack, this); - return m_rtp.Open((uint16_t)m_port); + LOG_INFO("start - {} {}: ", m_dec_name, m_port); + + return m_rtpPtr->Open((uint16_t)m_port); } void FFGB28181Decoder::setDecKeyframe(bool bKeyframe){ @@ -135,6 +161,7 @@ void FFGB28181Decoder::stream_callback(int videoType, char* data, int len, int i AVDictionary *gpu_options = nullptr; if (m_pAVCodecCtx == nullptr) { + LOG_INFO("frame data is zero --{}", m_dec_name); if (VIDEO_TYPE_H264 == videoType) { if (m_gpuid >= 0){ m_pAVCodec = avcodec_find_decoder_by_name("h264_cuvid"); @@ -314,5 +341,5 @@ bool FFGB28181Decoder::isSurport(FFDecConfig& cfg){ } int FFGB28181Decoder::getCachedQueueLength(){ - return m_rtp.GetPsFrameListSize(); + return m_rtpPtr->GetPsFrameListSize(); } \ No newline at end of file diff --git a/src/gb28181/FFGB28181Decoder.h b/src/gb28181/FFGB28181Decoder.h index 2d848b0..12a085c 100644 --- a/src/gb28181/FFGB28181Decoder.h +++ b/src/gb28181/FFGB28181Decoder.h @@ -53,7 +53,7 @@ private: int m_gpuid {-1}; - RTPReceiver m_rtp; + RTPReceiver* m_rtpPtr; int m_port; uint64_t m_frameCount {}; diff --git a/src/gb28181/RTPReceiver.cpp b/src/gb28181/RTPReceiver.cpp index 7081de5..1be46d3 100644 --- a/src/gb28181/RTPReceiver.cpp +++ b/src/gb28181/RTPReceiver.cpp @@ -1,73 +1,50 @@ - #include "RTPReceiver.h" -#include -#include - -#include -#include - +#include "rtppacket.h" #include "../logger.hpp" - -using namespace std; +#include #define BUFFERSIZE_1024 1024 -#define BUFFERSIZE_GAP 1024//5120 //1024*5 +const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2; -namespace +// PS解包器回调 +static int ReceivePESFunction(unsigned char streamid, void * data, int size, uint64_t pts, uint64_t localPts, bool key, void* param) { - const int kH264EndFlag = 0x00000001; - const int kH264EndFlag_ = 0x000001; - const int kMpeg4IEndFlag = 0x000001B0; - const int kMpeg4PEndFlag = 0x000001B6; - const int kSvacEndFlag = 0x000001B6; - const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2; - const int kRtpRecvBufferSize = BUFFERSIZE_1024*BUFFERSIZE_1024*2; - const int kSockBufferSize = BUFFERSIZE_1024*BUFFERSIZE_1024*2; - const uint16_t kInvalidPort = 0; - - // PS解包器回调 - int ReceivePESFunction(unsigned char streamid, void * data, int size, uint64_t pts, uint64_t localPts, bool key, void* param) + if (NULL != data && size > 0) + { + ((RTPReceiver*)param)->OnPsDemux(streamid, (BYTE*)data, size, key, (uint64_t)pts, (uint64_t)localPts); + } + return 0; +} + +static int ps_demuxer_thread_(void* param) +{ + if (!param) { - if (NULL != data && size > 0) - { - ((RTPReceiver*)param)->OnPsDemux(streamid, (BYTE*)data, size, key, (uint64_t)pts, (uint64_t)localPts); - } - return 0; + return -1; } -}; // namespace - -static long long get_cur_time() { - // 获取操作系统当前时间点(精确到微秒) - chrono::time_point tpMicro - = chrono::time_point_cast(chrono::system_clock::now()); - // (微秒精度的)时间点 => (微秒精度的)时间戳 - return tpMicro.time_since_epoch().count(); + RTPReceiver* self = (RTPReceiver*)param; + return self->OnPsProcess(); } -RTPReceiver::RTPReceiver() -: m_localPort(kInvalidPort) -, m_bRtpExit(false) -, m_bPsExit(false) -, m_h264DataFunc(NULL) -, m_usrParam(NULL) -, m_bOpened(false) -, m_hVodEndFunc(NULL) -, m_LastPTS(-1) +RTPReceiver::RTPReceiver() +:m_LastPTS(-1) , m_LastIsKeyFrame(0) , m_SliceBuf(1024*1024) -, m_idleCount(-1) -,m_noDataCount(-1) +, m_h264DataFunc(NULL) +, m_hVodEndFunc(NULL) +, m_usrParam(NULL) +, m_bPsExit(false) +, m_psThreadPtr(nullptr) { - m_LastStreamType=0; + m_LastStreamType = 0; + recvTmpBuf = new BYTE[kVideoFrameSize]; } -RTPReceiver::~RTPReceiver() -{ - if (IsOpened()) - Close(); - - LOG_INFO("RTPReceiver::~RTPReceiver() destruct OK--{}", m_deviceID); +RTPReceiver::~RTPReceiver(){ + if(recvTmpBuf != nullptr){ + delete[] recvTmpBuf; + } } void RTPReceiver::SetOutputCallback(CallBack_Stream cb, void* param) @@ -82,376 +59,36 @@ void RTPReceiver::SetVodEndCallback(CallBack_VodFileEnd cb, void* param) m_usrParam = param; } -int RTPReceiver::rtp_revc_thread_(void* param) -{ - if (!param) - { - return -1; - } - - RTPReceiver* self = (RTPReceiver*)param; - return self->OnRtpRecv(); +void RTPReceiver::SetRequestStreamCallback(CallBack_Request_Stream cb){ + m_callback_request_stream = cb; } -int RTPReceiver::ps_demuxer_thread_(void* param) -{ - if (!param) - { - return -1; - } - - RTPReceiver* self = (RTPReceiver*)param; - return self->OnPsProcess(); -} - -int RTPReceiver::ps_decode_thread_(void* param) -{ - if (!param) - { - return -1; - } - - RTPReceiver* self = (RTPReceiver*)param; - return self->OnDecodeProcess(); -} - -bool RTPReceiver::Open(uint16_t localPort) -{ - LOG_INFO("--2---RTPReceiver::Open--{}", m_deviceID); - m_localPort = localPort; - - RTPSessionParams sessparams; - sessparams.SetUsePollThread(true); - sessparams.SetMinimumRTCPTransmissionInterval(10); - sessparams.SetOwnTimestampUnit(1.0/90000.0); - sessparams.SetAcceptOwnPackets(true); - - RTPUDPv4TransmissionParams transparams; - transparams.SetPortbase(m_localPort); - transparams.SetRTPReceiveBuffer(kRtpRecvBufferSize); - cout << "port: " << m_localPort << endl; - LOG_INFO("--3---RTPReceiver::Open--{}", m_deviceID); - - int err = m_rtpSession.Create(sessparams, &transparams); - LOG_INFO("--4---RTPReceiver::Open--{}", m_deviceID); - if (err != 0) - { - LOG_INFO("RTPReceiver::Open m_rtpSession.Create error: {}--{}", err, m_deviceID); - return false; - } - LOG_INFO("--5---RTPReceiver::Open--{}", m_deviceID); +int RTPReceiver::InitPS(){ + m_psParser.SetReceiveFunction(ReceivePESFunction, this); - m_bOpened = true; - LOG_INFO("RTPReceiver::Open ok--{}", m_deviceID); - - LOG_INFO("--1---RTPReceiver::Open--{}", m_deviceID); - m_rtpThread = std::thread(rtp_revc_thread_, this); - m_psThread = std::thread(ps_demuxer_thread_, this); + m_psThreadPtr = new std::thread(ps_demuxer_thread_, this); + if(nullptr == m_psThreadPtr){ + return -1; + } - return true; -} + LOG_INFO("[{}] InitPS finished", m_deviceID); -bool RTPReceiver::IsOpened() const -{ - return m_bOpened; + return 0; } -void RTPReceiver::Close() -{ - m_bRtpExit = true; +void RTPReceiver::ClosePsThread(){ + LOG_INFO("[{}] 3.", m_deviceID); m_bPsExit = true; - - // rtp接收线程退出 - if (m_rtpThread.joinable()) - { - m_rtpThread.join(); - } - m_rtpSession.Destroy(); - LOG_INFO("--2---RTPReceiver::Close rtp recv thread quit --{}", m_deviceID); - // PS解包线程退出 - if (m_psThread.joinable()) + if (m_psThreadPtr->joinable()) { - m_psThread.join(); + m_psThreadPtr->join(); + delete m_psThreadPtr; + m_psThreadPtr = nullptr; } - LOG_INFO("--3---RTPReceiver::Close ps demux thread quit--{}", m_deviceID); - - m_bOpened = false; -} - -int RTPReceiver::GetPsFrameListSize() -{ - std::lock_guard l(m_psFrameMutex); - return m_psVideoFrames.size(); -} - -void RTPReceiver::ClearPsVideoFrameList() -{ - std::lock_guard l(m_psFrameMutex); - while (!m_psVideoFrames.empty()) { - Frame* f = m_psVideoFrames.front(); - delete f; - m_psVideoFrames.pop(); - } - LOG_INFO("---->cleared ps video frame list!<----{}", m_deviceID); -} - -// 收RTP包线程 -int RTPReceiver::OnRtpRecv() -{ - uint32_t lastPts = 0; - uint64_t last_recv_ts{0}; - int offset = 0; - int mark = 0; - BYTE* recvTmpBuf = new BYTE[kVideoFrameSize]; - while (!m_bRtpExit) - { - //try - //{ - m_rtpSession.Poll(); - m_rtpSession.BeginDataAccess(); - if (m_rtpSession.GotoFirstSourceWithData()) - { - last_recv_ts = get_cur_time(); - m_idleCount = 0; - m_noDataCount = 0; - do - { - RTPPacket* packet; - while ((packet = m_rtpSession.GetNextPacket()) != NULL/* && !mark*/) - { - do { - if (0 == packet->GetPayloadType()) - { - // 音频数据, 暂不处理 - break; // goto skip_this_packet; - } - - // 判断是否收到完整的帧(有些厂商打的marker标记不准, 所以只能看时间戳来判断) - uint32_t curPts = packet->GetTimestamp(); - if (lastPts != 0 && curPts != lastPts) { - mark = 1; - } - lastPts = curPts; - - int payloadLen = packet->GetPayloadLength(); - if (offset + payloadLen > kVideoFrameSize) - { - offset = 0, mark = 0; - break; // goto skip_this_packet; - } - - if (mark) - { - BYTE* frameBuf = (BYTE*)malloc(sizeof(BYTE) * offset); - if (!frameBuf) { - offset = 0, mark = 0; - break; // goto skip_this_packet; - } - memcpy(frameBuf, recvTmpBuf, offset); - if (!m_bPsExit) - { - std::lock_guard l(m_psFrameMutex); - if (m_psVideoFrames.size() < 100) - { - m_psVideoFrames.push(new Frame(frameBuf, offset, false)); - } - else { - free(frameBuf); - } - } - else - { - //若此时解码线程已经退出,不再往m_psVideoFrames推送帧,且退出当前线程 - free(frameBuf); - LOG_INFO("OnPsProcess quit, device_id:{}", m_deviceID); - //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "recv video stream interruption!"); - } - offset = 0; - mark = 0; - } - - memcpy(recvTmpBuf + offset, packet->GetPayloadData(), payloadLen); - offset += payloadLen; - } while (0); - //skip_this_packet: - m_rtpSession.DeletePacket(packet); - } - } while (m_rtpSession.GotoNextSourceWithData()); - } - //else { - // if (m_idleCount != -1) - // { - // ++m_idleCount;//流中断计数 - // } - // if (m_noDataCount != 0) - // { - // --m_noDataCount;//没流计数 - // } - // //if (m_idleCount > 3000) { - // // m_hVodEndFunc(m_usrParam); - // // m_idleCount = 0; - // //历史流结束的时候,也会出现超时,这个是正常的 - // if(m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD) - // { - // if (m_idleCount > 10000) - // { - // //这里要判断下历史流是否结束,如果未结束,就设置为流中断 - // //由于record_stream_status这个函数返回值不准确,所以增加一个进度条大于80% - // if(record_stream_status(((VideoSession *)GetUsrParam())->streamHandle())) - // { - // LOG_INFO("************Record stream is finished**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress()); - // m_idleCount = -1; - // m_hVodEndFunc(m_usrParam); - // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); - // ((VideoSession *)GetUsrParam())->streamHandle().clear(); - // } - // else - // { - // //如果此时进度大于80% 算完成吧 - // if(((VideoSession *)GetUsrParam())->progress() > 0.80) - // { - // LOG_INFO("************Record stream is overtime**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress()); - - // m_idleCount = 0; - // m_hVodEndFunc(m_usrParam); - // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); - // ((VideoSession *)GetUsrParam())->streamHandle().clear(); - // } - // else - // { - // m_idleCount = -1; - // //LOG_ERROR("************post ERROR_REALSTREAM_INTERRUPT to structure****{}********", m_deviceID); - // //发送流中断 - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption!"); - // } - // } - // - // - // } - // - // if (m_noDataCount < -200000)//任务开始时没收到流 - // { - // //LOG_ERROR("************m_hVodEndFunc(m_usrParam)!!!m_hVodEndFunc(m_usrParam)********{}******", m_deviceID); - // m_noDataCount = -1; - - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption2!"); - // //m_hVodEndFunc(m_usrParam); - // } - // } - // else//实时任务断流 - // //if (m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) - // { - // - // //每超过3000次,发送一次send_vedio_eof 时长大约1.5s - // //若是30000,时长大约 18s - // if(m_idleCount > 30000) - // { - // uint64_t cts = get_cur_time(); - // float duration_not_recv = (cts - last_recv_ts) / 1000.0; - // - // //LOG_ERROR("************I haven't got stream from hik gateway exceed {}s,send eof********{}******", duration_not_recv, m_deviceID); - // m_idleCount = -1; - - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption!"); - // } - // - // if (m_noDataCount < -200000)//任务开始时没收到流 - // { - // //LOG_ERROR("************m_noDataCount < -200000********{}******", m_deviceID); - // m_noDataCount = -1; - - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption2!"); - // } - // - // } - //} - //} - // catch (GeneralException2& e) - //{ - // //LOG_ERROR("---> video streaming interruption!<---{}, error: {}", m_deviceID, e.err_msg()); - - // byte_buffer bb(64); - // bb << VasCmd::VAS_CMD_REALSTREAM_INTERRUPT << e.err_msg(); - - // if (m_usrParam) - // { - // if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) { - // try { - // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size()); - // } - // catch (GeneralException2& e) { - // //LOG_ERROR("[{}] send vas cmd VAS_CMD_REALSTREAM_INTERRUPT error: {}, {}", m_deviceID, e.err_code(), e.err_str()); - // } - // } - - // //通知网关关闭句柄 - // if(!((VideoSession *)GetUsrParam())->streamHandle().empty()) - // { - - // LOG_INFO("---->Notify hisense gateway release handle = {} !<----{}", ((VideoSession *)GetUsrParam())->streamHandle().c_str(), m_deviceID); - // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) - // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); - // - // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD) - // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); - // - // //清理保活的句柄 - // ((VideoSession *)GetUsrParam())->streamHandle().clear(); - // } - // } - // - // bb.bset(0); - // - //} - m_rtpSession.EndDataAccess(); - RTPTime::Wait(RTPTime(0, 500)); - } - - delete [] recvTmpBuf; - - return 0; -} - -// 解PS包线程 -int RTPReceiver::OnPsProcess() -{ - while (!m_bPsExit) { - m_psFrameMutex.lock(); - if (m_psVideoFrames.size() <= 0){ - m_psFrameMutex.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - continue; - } - Frame* frame = m_psVideoFrames.front(); - m_psVideoFrames.pop(); - m_psFrameMutex.unlock(); - if (frame != nullptr) - { - int nRet = m_psParser.AddData(frame->buf_, frame->len_); - if (nRet == -1) - { - LOG_INFO("m_psParser return -1--{}", m_deviceID); - } - else if (nRet == -2) - { - LOG_INFO("m_psParser return -2--{}", m_deviceID); - } - else if (nRet == -3) - { - LOG_INFO("m_psParser return -3--{}", m_deviceID); - } - - delete frame; - frame = nullptr; - } - - } - - ClearPsVideoFrameList(); - - return 0; + + LOG_INFO("[{}] ps demux thread quit", m_deviceID); } // 处理去除了PS头的数据 @@ -531,7 +168,129 @@ void RTPReceiver::OnPsDemux(unsigned char streamId, BYTE *data, int len, bool ke m_SliceBuf.add((char*)data, len); } -int RTPReceiver::OnDecodeProcess() +// 解PS包线程 +int RTPReceiver::OnPsProcess() { + LOG_INFO("[{}] started.", m_deviceID); + while (!m_bPsExit) { + m_psFrameMutex.lock(); + LOG_DEBUG("[{}] PS frame size : {}", m_deviceID, m_psVideoFrames.size()); + if (m_psVideoFrames.size() <= 0){ + m_psFrameMutex.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + Frame* frame = m_psVideoFrames.front(); + m_psVideoFrames.pop(); + m_psFrameMutex.unlock(); + if (frame != nullptr) + { + int nRet = m_psParser.AddData(frame->buf_, frame->len_); + if (nRet == -1) + { + LOG_INFO("m_psParser return -1--{}", m_deviceID); + } + else if (nRet == -2) + { + LOG_INFO("m_psParser return -2--{}", m_deviceID); + } + else if (nRet == -3) + { + LOG_INFO("m_psParser return -3--{}", m_deviceID); + } + + delete frame; + frame = nullptr; + } + } + + ClearPsVideoFrameList(); + + m_hVodEndFunc(m_usrParam); + + LOG_INFO("[{}] exited.", m_deviceID); + return 0; } + +void RTPReceiver::SetDeviceID(string deviceID){ + m_deviceID = deviceID; +} + +int RTPReceiver::GetPsFrameListSize() +{ + std::lock_guard l(m_psFrameMutex); + return m_psVideoFrames.size(); +} + +void RTPReceiver::ClearPsVideoFrameList() +{ + std::lock_guard l(m_psFrameMutex); + while (!m_psVideoFrames.empty()) { + Frame* f = m_psVideoFrames.front(); + delete f; + m_psVideoFrames.pop(); + } + LOG_INFO("[{}] cleared ps video frame list!", m_deviceID); +} + +int RTPReceiver::ParsePacket(RTPPacket* packet){ + do { + + if (0 == packet->GetPayloadType()) + { + // 音频数据, 暂不处理 + break; + } + + // 判断是否收到完整的帧(有些厂商打的marker标记不准, 所以只能看时间戳来判断) + uint32_t curPts = packet->GetTimestamp(); + if (lastPts != 0 && curPts != lastPts) { + mark = 1; + } + lastPts = curPts; + + int payloadLen = packet->GetPayloadLength(); + if (offset + payloadLen > kVideoFrameSize) + { + offset = 0, mark = 0; + break; + } + + LOG_DEBUG("[{}] ParsePacket GetPayloadLength", m_deviceID); + + if (mark) + { + BYTE* frameBuf = (BYTE*)malloc(sizeof(BYTE) * offset); + if (!frameBuf) { + offset = 0, mark = 0; + break; + } + memcpy(frameBuf, recvTmpBuf, offset); + if (!m_bPsExit){ + std::lock_guard l(m_psFrameMutex); + if (m_psVideoFrames.size() < 100) + { + LOG_DEBUG("[{}]ParsePacket push", m_deviceID); + m_psVideoFrames.push(new Frame(frameBuf, offset, false)); + } + else { + free(frameBuf); + } + } + else{ + //若此时解码线程已经退出,不再往m_psVideoFrames推送帧,且退出当前线程 + free(frameBuf); + LOG_INFO("ParsePacket quit, device_id:{}", m_deviceID); + return 1; + } + offset = 0; + mark = 0; + } + + memcpy(recvTmpBuf + offset, packet->GetPayloadData(), payloadLen); + offset += payloadLen; + } while (0); + + return 0; +} \ No newline at end of file diff --git a/src/gb28181/RTPReceiver.h b/src/gb28181/RTPReceiver.h index 149ade2..8a7f8f9 100644 --- a/src/gb28181/RTPReceiver.h +++ b/src/gb28181/RTPReceiver.h @@ -1,188 +1,146 @@ -#ifndef _RTP_RECEIVER_H_ -#define _RTP_RECEIVER_H_ - - -#include "demuxer.h" -#include "buffer.h" - - -#include "rtpudpv4transmitter.h" -#include "rtpipv4address.h" -#include "rtpsessionparams.h" -#include "rtpsession.h" -#include "rtppacket.h" -#include -#include -#include -#include -#include -#include - - -#define OUTTIME_RTCP 30*1000 -#define PAYLOAD 99 -#define PAYLOAD_PS 96 -#define PAYLOAD_H264 98 -#define PAYLOAD_MP4 97 - -#define UDP_SIZE 1400 -#define MIN_PORT 10000 -#define MAX_PORT 60000 -#define RTP_MAX_PACKET_LEN 1450 - -using namespace jrtplib; -using namespace std; - -typedef unsigned char BYTE; - -/** 视频数据回调 -* -* @param videoType [in] 视频类型 音频-0xC0、h264-0x1B、MPEG4-0x01、SVAC-0x80 -* @param data [in] 视频数据 -* @param len [in] 视频数据长度 -* @param isKey [in] 是否为关键帧 -* @param pts [in] 时间戳 -*/ -typedef void(*CallBack_Stream)(void* userdata, int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts); - -/** 录像回放完成回调消息通知 -*/ -typedef void(*CallBack_VodFileEnd)(void* userdata); - -int AllocRtpPort(void); - -class MyRTPSession : public RTPSession -{ -public: - MyRTPSession() {} - virtual ~MyRTPSession() {} - -private: - virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress) - { - AddDestination(*senderAddress); - } - - virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress) - { - //AddDestination(*senderaddress); - //const char* name = "hi~"; - //SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8); - - //printf("send rtcp app"); - } -}; - -// 标识帧, 注意buffer需要自己开辟和释放 -struct Frame { - Frame() { buf_ = NULL; len_ = 0; } - ~Frame() { - if (buf_ != nullptr) - { - free(buf_); - buf_ = nullptr; - } - } - Frame(BYTE* buf, int len, bool key) : buf_(buf), len_(len), key_(key) {} - BYTE* buf_; - int len_; - bool key_{}; -}; - -class FrameToDecode -{ -public: - FrameToDecode() - : m_SliceBuf(0) - , m_localPts(0) - , m_LastPTS(-1) - , m_LastIsKeyFrame(0) {} - FrameToDecode(unsigned char m_streamId) - : m_SliceBuf(0) - , m_localPts(0) - , m_LastPTS(-1) - , m_LastIsKeyFrame(0) - , m_streamId (m_streamId) {} - - void operator=(FrameToDecode &temp) - { - m_SliceBuf = temp.m_SliceBuf; - m_streamId = temp.m_streamId; - m_localPts = temp.m_localPts; - m_LastPTS = temp.m_LastPTS; - m_LastIsKeyFrame = temp.m_LastIsKeyFrame; - } - - CBuffer m_SliceBuf; - unsigned char m_streamId{}; - uint64_t m_localPts; - uint64_t m_LastPTS; - bool m_LastIsKeyFrame; -}; - - -class RTPReceiver -{ - RTPReceiver(const RTPReceiver& other); - RTPReceiver& operator= (const RTPReceiver& other); - -public: - RTPReceiver(); - ~RTPReceiver(); - - bool Open(uint16_t localPort); - bool IsOpened() const; - void Close(); - - int GetPsFrameListSize(); - void ClearPsVideoFrameList(); - - void OnPsDemux(unsigned char streamId, BYTE *data, int len, bool key, uint64_t pts, uint64_t localPts); - - void SetOutputCallback(CallBack_Stream cb, void* param); - void SetVodEndCallback(CallBack_VodFileEnd cb, void* param); - CallBack_VodFileEnd GetVodEndFunc(){ return m_hVodEndFunc; } - - void *GetUsrParam(){ return m_usrParam; } - void SetDeviceID(string deviceID){this->m_deviceID = deviceID; } - -private: - static int rtp_revc_thread_(void* param); - static int ps_demuxer_thread_(void* param); - static int ps_decode_thread_(void* param); - - int OnRtpRecv(); - int OnPsProcess(); - int OnDecodeProcess(); - -private: - std::thread m_rtpThread; // RTP接收线程 - std::thread m_psThread; // PS解包线程 - - uint16_t m_localPort; // RTP接收端口 - MyRTPSession m_rtpSession; // RTP会话 - std::atomic_bool m_bRtpExit; // 标识RTP收包线程闭 - std::atomic_bool m_bPsExit; // 标识PS解包线程关闭 - std::queue m_psVideoFrames; - mutex m_psFrameMutex; - - CMpeg2Demux m_psParser; - - void* m_usrParam; - std::atomic_bool m_bOpened; - - CallBack_Stream m_h264DataFunc; // 视频流回调 - CallBack_VodFileEnd m_hVodEndFunc; // 录像流结束回调 - - CBuffer m_SliceBuf; - uint64_t m_LastPTS; - bool m_LastIsKeyFrame; - unsigned char m_LastStreamType; - int64_t m_idleCount; - int64_t m_noDataCount;//线程计数,用于打开流成功但是实际没流过来 - - string m_deviceID; - int64_t m_notToDecodCount{0};//线程计数,用来代表多长时间没有调用解码回调,针对大华相机 -}; - -#endif // _RTP_RECEIVER_H_ +#ifndef _RTP_RECEIVER_H_ +#define _RTP_RECEIVER_H_ + +#include "buffer.h" +#include "demuxer.h" +#include "rtppacket.h" +#include +#include +#include +#include +#include + +typedef unsigned char BYTE; + +using namespace jrtplib; +using namespace std; + +/** 视频数据回调 +* +* @param videoType [in] 视频类型 音频-0xC0、h264-0x1B、MPEG4-0x01、SVAC-0x80 +* @param data [in] 视频数据 +* @param len [in] 视频数据长度 +* @param isKey [in] 是否为关键帧 +* @param pts [in] 时间戳 +*/ +typedef void(*CallBack_Stream)(void* userdata, int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts); + +/** 录像回放完成回调消息通知 +*/ +typedef void(*CallBack_VodFileEnd)(void* userdata); + +/** + * 请求流 +*/ +typedef bool(*CallBack_Request_Stream)(); + +// 标识帧, 注意buffer需要自己开辟和释放 +struct Frame { + Frame() { buf_ = NULL; len_ = 0; } + ~Frame() { + if (buf_ != nullptr) + { + free(buf_); + buf_ = nullptr; + } + } + Frame(BYTE* buf, int len, bool key) : buf_(buf), len_(len), key_(key) {} + BYTE* buf_; + int len_; + bool key_{}; +}; + +class FrameToDecode +{ +public: + FrameToDecode() + : m_SliceBuf(0) + , m_localPts(0) + , m_LastPTS(-1) + , m_LastIsKeyFrame(0) {} + FrameToDecode(unsigned char m_streamId) + : m_SliceBuf(0) + , m_localPts(0) + , m_LastPTS(-1) + , m_LastIsKeyFrame(0) + , m_streamId (m_streamId) {} + + void operator=(FrameToDecode &temp) + { + m_SliceBuf = temp.m_SliceBuf; + m_streamId = temp.m_streamId; + m_localPts = temp.m_localPts; + m_LastPTS = temp.m_LastPTS; + m_LastIsKeyFrame = temp.m_LastIsKeyFrame; + } + + CBuffer m_SliceBuf; + unsigned char m_streamId{}; + uint64_t m_localPts; + uint64_t m_LastPTS; + bool m_LastIsKeyFrame; +}; + +class RTPReceiver{ + +public: + RTPReceiver(); + ~RTPReceiver(); + + virtual bool Open(uint16_t localPort) = 0; + virtual bool IsOpened() = 0; + virtual void Close() = 0; + + void SetVodEndCallback(CallBack_VodFileEnd cb, void* param); + + void SetOutputCallback(CallBack_Stream cb, void* param); + + void SetRequestStreamCallback(CallBack_Request_Stream cb); + + void SetDeviceID(string deviceID); + + int GetPsFrameListSize(); + +public: + void OnPsDemux(unsigned char streamId, BYTE *data, int len, bool key, uint64_t pts, uint64_t localPts); + int OnPsProcess(); + void ClearPsVideoFrameList(); + int ParsePacket(RTPPacket* packet); + +public: + int InitPS(); + void ClosePsThread(); + void *GetUsrParam(){ return m_usrParam; } + +public: + CBuffer m_SliceBuf; + uint64_t m_LastPTS; + bool m_LastIsKeyFrame; + unsigned char m_LastStreamType; + + int64_t m_notToDecodCount{0};//线程计数,用来代表多长时间没有调用解码回调,针对大华相机 + + void* m_usrParam; + CallBack_Stream m_h264DataFunc; // 视频流回调 + + std::queue m_psVideoFrames; + mutex m_psFrameMutex; + + string m_deviceID; + + CMpeg2Demux m_psParser; + std::atomic_bool m_bPsExit; // 标识PS解包线程关闭 + + uint32_t lastPts{0}; + uint64_t last_recv_ts{0}; + int offset{0}; + int mark{0}; + BYTE* recvTmpBuf{nullptr}; + + std::thread* m_psThreadPtr; // PS解包线程 + + CallBack_VodFileEnd m_hVodEndFunc; // 录像流结束回调 + CallBack_Request_Stream m_callback_request_stream; //请求流回调 +}; + +#endif // _RTP_RECEIVER_H_ \ No newline at end of file diff --git a/src/gb28181/RTPTcpReceiver.cpp b/src/gb28181/RTPTcpReceiver.cpp new file mode 100644 index 0000000..3e20a78 --- /dev/null +++ b/src/gb28181/RTPTcpReceiver.cpp @@ -0,0 +1,305 @@ +#include"RTPTcpReceiver.h" +#include "../logger.hpp" + + +static long long get_cur_time() { + + chrono::time_point tpMicro + = chrono::time_point_cast(chrono::system_clock::now()); + + return tpMicro.time_since_epoch().count(); +} + +// class TcpRTPSession : public RTPSession +// { +// public: +// void setReceiver(RTPTcpReceiver* r){ +// tcpReceiver = r; +// } + +// protected: +// void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled) +// { +// // printf("SSRC %x Got packet (%d bytes) in OnValidatedRTPPacket from source 0x%04x!\n", GetLocalSSRC(), +// // (int)rtppack->GetPayloadLength(), srcdat->GetSSRC()); + +// LOG_DEBUG("SSRC {} Got packet ({} bytes) in OnValidatedRTPPacket from source {}}!\n", GetLocalSSRC(), +// (int)rtppack->GetPayloadLength(), srcdat->GetSSRC()); + +// if(nullptr != tcpReceiver){ +// tcpReceiver->ParsePacket(rtppack); +// } +// DeletePacket(rtppack); +// *ispackethandled = true; +// } + +// void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength) +// { +// char msg[1024]; + +// memset(msg, 0, sizeof(msg)); +// if (itemlength >= sizeof(msg)) +// itemlength = sizeof(msg)-1; + +// memcpy(msg, itemdata, itemlength); +// // printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC()); +// LOG_DEBUG("SSRC {} Received SDES item ({}): {} from SSRC {}\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC()); +// } + +// private: +// RTPTcpReceiver* tcpReceiver{nullptr}; +// }; + +class MyTCPTransmitter : public RTPTCPTransmitter +{ +public: + void setReceiver(RTPTcpReceiver* r){ + tcpReceiver = r; + } + +public: + MyTCPTransmitter() : RTPTCPTransmitter(0){ } + + void OnSendError(SocketType sock) + { + LOG_ERROR("Error sending over socket {}, removing destination", sock); + DeleteDestination(RTPTCPAddress(sock)); + if(nullptr != tcpReceiver && !tcpReceiver->isClosing()){ + tcpReceiver->RequestStream(); + } + } + + void OnReceiveError(SocketType sock) + { + LOG_ERROR("Error receiving over socket {}, removing destination", sock); + DeleteDestination(RTPTCPAddress(sock)); + } + +private: + RTPTcpReceiver* tcpReceiver{nullptr}; +}; + +static int rtp_revc_thread_(void* param) +{ + if (!param) + { + return -1; + } + + RTPTcpReceiver* self = (RTPTcpReceiver*)param; + return self->OnRtpRecv(); +} + + +RTPTcpReceiver::RTPTcpReceiver() +: m_bRtpExit(false) +, m_bOpened(false) +, m_idleCount(-1) +, m_noDataCount(-1) +, m_nListener(-1) +, m_bAccepted(false) +, m_bClosing(false) +{ + m_rtpSessionPtr = new RTPSession(); + m_pSessparams = new RTPSessionParams(); + m_pTrans = new MyTCPTransmitter(); +} + +RTPTcpReceiver::~RTPTcpReceiver(){ + if (IsOpened()) + Close(); + + if(m_rtpSessionPtr != nullptr){ + delete m_rtpSessionPtr; + m_rtpSessionPtr = nullptr; + } + + if(m_pSessparams != nullptr){ + delete m_pSessparams; + m_pSessparams = nullptr; + } + + if(m_pTrans != nullptr){ + delete m_pTrans; + m_pTrans = nullptr; + } +} + +bool RTPTcpReceiver::Open(uint16_t localPort){ + if(0 != initSession(localPort)){ + return false; + } + + m_bOpened = true; + + LOG_INFO("[{}] started.", m_deviceID); + + return true; +} + +bool RTPTcpReceiver::IsOpened(){ + LOG_INFO("[{}] isopng:{} ", m_deviceID, m_bOpened); + return m_bOpened; +} + +void RTPTcpReceiver::Close(){ + + m_bClosing = true; + + m_bAccepted = true; + m_bRtpExit = true; + + LOG_DEBUG("[{}] 1.", m_deviceID); + + // rtp接收线程退出 + if (m_rtpThread.joinable()) + { + m_rtpThread.join(); + } + + LOG_DEBUG("[{}] 2.", m_deviceID); + + ClosePsThread(); + + m_bOpened = false; + + LOG_INFO("[{}] closed.", m_deviceID); +} + +bool RTPTcpReceiver::isClosing(){ + return m_bClosing; +} + +int RTPTcpReceiver::initSession(int localPort){ + m_nListener = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); + if (m_nListener < 0) + { + return -1; + } + + sockaddr_in serverAddr; + memset(&serverAddr, 0, sizeof(sockaddr_in)); + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(localPort); + serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); + int nRet = bind(m_nListener, (sockaddr*)&serverAddr, sizeof(serverAddr)); + if (nRet == -1) + { + LOG_ERROR("[{}] 绑定端口失败: {}", m_deviceID, localPort); + return -1; + } + + if (listen(m_nListener, 1) == -1) + { + LOG_ERROR("[{}] listen 失败", m_deviceID); + return -1; + } + + int nPackSize = 45678; + m_pSessparams->SetProbationType(RTPSources::NoProbation); + m_pSessparams->SetOwnTimestampUnit(90000.0 / 25.0); + m_pSessparams->SetMaximumPacketSize(nPackSize + 64); + + int status = m_pTrans->Init(false); + status = m_pTrans->Create(65535, NULL); + m_pTrans->setReceiver(this); + + status = m_rtpSessionPtr->Create(*m_pSessparams, m_pTrans); + if (status < 0) + { + LOG_ERROR("[{}] create session error!!", m_deviceID); + return -1; + } + + m_rtpThread = std::thread(rtp_revc_thread_, this); + + InitPS(); + + bool bRet = RequestStream(); + if (!bRet) + { + LOG_INFO("[{}] 请求流失败!", m_deviceID); + return -1; + } + + LOG_INFO("[{}] 初始化成功, congratulations !!!", m_deviceID); + + return 0; +} + +int RTPTcpReceiver::OnRtpRecv() +{ + if(nullptr == m_rtpSessionPtr){ + return -1; + } + + LOG_INFO("[{}] OnRtpRecv started, m_nListener : {}", m_deviceID, m_nListener); + + sockaddr_in clientAddr; + int nLen = sizeof(sockaddr_in); + SocketType nServer = -1; + + LOG_INFO("[{}] Poll started.", m_deviceID); + int status = -1; + while(!m_bRtpExit){ + while(!m_bAccepted){ + LOG_DEBUG("[{}] accepting...", m_deviceID); + nServer = accept(m_nListener, (sockaddr*)&clientAddr, (socklen_t * ) &nLen); + if (-1 == nServer){ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + m_rtpSessionPtr->AddDestination(RTPTCPAddress(nServer)); + m_bAccepted = true; + + LOG_INFO("[{}] nServer={}", m_deviceID, nServer); + break; + } + + m_rtpSessionPtr->BeginDataAccess(); + if (m_rtpSessionPtr->GotoFirstSourceWithData()) + { + do + { + RTPPacket *pack; + + while ((pack = m_rtpSessionPtr->GetNextPacket()) != NULL) + { + LOG_DEBUG("[{}] time: {} ", m_deviceID, get_cur_time()); + ParsePacket(pack); + + m_rtpSessionPtr->DeletePacket(pack); + } + } while (m_rtpSessionPtr->GotoNextSourceWithData()); + } + + m_rtpSessionPtr->EndDataAccess(); + + m_rtpSessionPtr->Poll(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + m_rtpSessionPtr->Destroy(); + + if(nServer > 0){ + close(nServer); + } + if(m_nListener > 0){ + close(m_nListener); + } + + LOG_INFO("[{}] OnRtpRecv exited.", m_deviceID); + + return 0; +} + +bool RTPTcpReceiver::RequestStream(){ + bool bConnect = m_callback_request_stream(); + if(!bConnect){ + Close(); + return false; + } + m_bAccepted = false; + + return true; +} \ No newline at end of file diff --git a/src/gb28181/RTPTcpReceiver.h b/src/gb28181/RTPTcpReceiver.h new file mode 100644 index 0000000..aa78e58 --- /dev/null +++ b/src/gb28181/RTPTcpReceiver.h @@ -0,0 +1,85 @@ +#ifndef _RTP_TCP_RECEIVER_H_ +#define _RTP_TCP_RECEIVER_H_ + + +#include "demuxer.h" +#include "buffer.h" + +#include "rtpsession.h" +#include "rtptcptransmitter.h" +#include "rtpipv4address.h" +#include "rtptcpaddress.h" +#include "rtpsessionparams.h" +#include "rtperrors.h" +#include "rtpsourcedata.h" +#include "rtpsocketutil.h" +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "RTPReceiver.h" + + +#define OUTTIME_RTCP 30*1000 +#define PAYLOAD 99 +#define PAYLOAD_PS 96 +#define PAYLOAD_H264 98 +#define PAYLOAD_MP4 97 + +#define UDP_SIZE 1400 +#define MIN_PORT 10000 +#define MAX_PORT 60000 +#define RTP_MAX_PACKET_LEN 1450 + +using namespace jrtplib; +using namespace std; + + + +class TcpRTPSession; +class MyTCPTransmitter; + +class RTPTcpReceiver:public RTPReceiver +{ +public: + RTPTcpReceiver(); + ~RTPTcpReceiver(); + + bool Open(uint16_t localPort); + bool IsOpened(); + void Close(); + +public: + int OnRtpRecv(); + bool RequestStream(); + bool isClosing(); + +private: + int initSession(int localPort); + +private: + + std::atomic_bool m_bRtpExit; // 标识RTP收包线程闭 + + std::atomic_bool m_bOpened; + std::atomic_bool m_bAccepted; + std::atomic_bool m_bClosing; + + int64_t m_idleCount; + int64_t m_noDataCount;//线程计数,用于打开流成功但是实际没流过来 + + std::thread m_rtpThread; // RTP接收线程 + SocketType m_nListener; + + RTPSession* m_rtpSessionPtr; // RTP会话 + RTPSessionParams* m_pSessparams; + MyTCPTransmitter* m_pTrans; +}; + +#endif // _RTP_TCP_RECEIVER_H_ diff --git a/src/gb28181/RTPUdpReceiver.cpp b/src/gb28181/RTPUdpReceiver.cpp new file mode 100644 index 0000000..49818ea --- /dev/null +++ b/src/gb28181/RTPUdpReceiver.cpp @@ -0,0 +1,331 @@ + +#include "RTPUdpReceiver.h" +#include +#include + +#include +#include + +#include "../logger.hpp" + +using namespace std; + +#define BUFFERSIZE_1024 4096 +#define BUFFERSIZE_GAP 4096//5120 //1024*5 + +namespace +{ + const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2; + const int kRtpRecvBufferSize = BUFFERSIZE_1024*BUFFERSIZE_1024*2; + const uint16_t kInvalidPort = 0; +}; // namespace + +class UdpRTPSession : public RTPSession +{ +public: + UdpRTPSession() {} + virtual ~UdpRTPSession() {} + +private: + virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress) + { + AddDestination(*senderAddress); + } + + virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress) + { + //AddDestination(*senderaddress); + //const char* name = "hi~"; + //SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8); + + //printf("send rtcp app"); + } +}; + + +static long long get_cur_time() { + + chrono::time_point tpMicro + = chrono::time_point_cast(chrono::system_clock::now()); + + return tpMicro.time_since_epoch().count(); +} + +static int rtp_revc_thread_(void* param) +{ + if (!param) + { + return -1; + } + + RTPUdpReceiver* self = (RTPUdpReceiver*)param; + return self->OnRtpRecv(); +} + +RTPUdpReceiver::RTPUdpReceiver() +: m_bRtpExit(false) +, m_bOpened(false) +, m_idleCount(-1) +,m_noDataCount(-1) +{ + m_sessparamsPtr = new RTPSessionParams(); + m_transparamsPtr = new RTPUDPv4TransmissionParams(); + m_rtpSessionPtr = new UdpRTPSession(); +} + +RTPUdpReceiver::~RTPUdpReceiver() +{ + if (IsOpened()) + Close(); + + if(nullptr != m_sessparamsPtr){ + delete m_sessparamsPtr; + m_sessparamsPtr = nullptr; + } + + if(nullptr != m_transparamsPtr){ + delete m_transparamsPtr; + m_transparamsPtr = nullptr; + } + + if(nullptr != m_rtpSessionPtr){ + delete m_rtpSessionPtr; + m_rtpSessionPtr = nullptr; + } +} + +bool RTPUdpReceiver::Open(uint16_t localPort) +{ + m_sessparamsPtr->SetUsePollThread(true); + m_sessparamsPtr->SetMinimumRTCPTransmissionInterval(10); + m_sessparamsPtr->SetOwnTimestampUnit(1.0/90000.0); + m_sessparamsPtr->SetAcceptOwnPackets(true); + + m_transparamsPtr->SetPortbase(localPort); + m_transparamsPtr->SetRTPReceiveBuffer(kRtpRecvBufferSize); + + LOG_INFO("[{}] port: {}", m_deviceID, localPort); + + int err = m_rtpSessionPtr->Create(*m_sessparamsPtr, m_transparamsPtr); + if (err != 0) + { + LOG_ERROR("[{}] Create error: {}", m_deviceID, err); + return false; + } + + m_rtpThreadPtr = new std::thread(rtp_revc_thread_, this); + if (nullptr == m_rtpThreadPtr) + { + LOG_ERROR("[{}] Create m_rtpThreadPtr error", m_deviceID); + return false; + } + + + if (InitPS() != 0) + { + return false; + } + + m_bOpened = true; + LOG_INFO("[{}] Open ok", m_deviceID); + + return true; +} + +bool RTPUdpReceiver::IsOpened() +{ + return m_bOpened; +} + +void RTPUdpReceiver::Close() +{ + m_bRtpExit = true; + + // rtp接收线程退出 + if (nullptr != m_rtpThreadPtr && m_rtpThreadPtr->joinable()) + { + m_rtpThreadPtr->join(); + delete m_rtpThreadPtr; + m_rtpThreadPtr = nullptr; + } + m_rtpSessionPtr->Destroy(); + + ClosePsThread(); + + m_bOpened = false; + + LOG_INFO("[{}] closed.", m_deviceID); +} + +// 收RTP包线程 +int RTPUdpReceiver::OnRtpRecv() +{ + if(nullptr == m_rtpSessionPtr){ + return -1; + } + + LOG_INFO("[{}] OnRtpRecv started.", m_deviceID); + while (!m_bRtpExit) + { + //try + //{ + m_rtpSessionPtr->Poll(); + m_rtpSessionPtr->BeginDataAccess(); + + if (m_rtpSessionPtr->GotoFirstSourceWithData()) + { + LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_deviceID); + last_recv_ts = get_cur_time(); + m_idleCount = 0; + m_noDataCount = 0; + do + { + RTPPacket* packet; + while ((packet = m_rtpSessionPtr->GetNextPacket()) != NULL) + { + LOG_INFO("OnRtpRecv GetNextPacket --{}", m_deviceID); + int ret = ParsePacket(packet); + m_rtpSessionPtr->DeletePacket(packet); + + if(ret != 0){ + m_bRtpExit = true; + } + } + } while (m_rtpSessionPtr->GotoNextSourceWithData()); + } + //else { + // if (m_idleCount != -1) + // { + // ++m_idleCount;//流中断计数 + // } + // if (m_noDataCount != 0) + // { + // --m_noDataCount;//没流计数 + // } + // //if (m_idleCount > 3000) { + // // m_hVodEndFunc(m_usrParam); + // // m_idleCount = 0; + // //历史流结束的时候,也会出现超时,这个是正常的 + // if(m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD) + // { + // if (m_idleCount > 10000) + // { + // //这里要判断下历史流是否结束,如果未结束,就设置为流中断 + // //由于record_stream_status这个函数返回值不准确,所以增加一个进度条大于80% + // if(record_stream_status(((VideoSession *)GetUsrParam())->streamHandle())) + // { + // LOG_INFO("************Record stream is finished**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress()); + // m_idleCount = -1; + // m_hVodEndFunc(m_usrParam); + // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); + // ((VideoSession *)GetUsrParam())->streamHandle().clear(); + // } + // else + // { + // //如果此时进度大于80% 算完成吧 + // if(((VideoSession *)GetUsrParam())->progress() > 0.80) + // { + // LOG_INFO("************Record stream is overtime**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress()); + + // m_idleCount = 0; + // m_hVodEndFunc(m_usrParam); + // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); + // ((VideoSession *)GetUsrParam())->streamHandle().clear(); + // } + // else + // { + // m_idleCount = -1; + // //LOG_ERROR("************post ERROR_REALSTREAM_INTERRUPT to structure****{}********", m_deviceID); + // //发送流中断 + // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption!"); + // } + // } + // + // + // } + // + // if (m_noDataCount < -200000)//任务开始时没收到流 + // { + // //LOG_ERROR("************m_hVodEndFunc(m_usrParam)!!!m_hVodEndFunc(m_usrParam)********{}******", m_deviceID); + // m_noDataCount = -1; + + // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption2!"); + // //m_hVodEndFunc(m_usrParam); + // } + // } + // else//实时任务断流 + // //if (m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) + // { + // + // //每超过3000次,发送一次send_vedio_eof 时长大约1.5s + // //若是30000,时长大约 18s + // if(m_idleCount > 30000) + // { + // uint64_t cts = get_cur_time(); + // float duration_not_recv = (cts - last_recv_ts) / 1000.0; + // + // //LOG_ERROR("************I haven't got stream from hik gateway exceed {}s,send eof********{}******", duration_not_recv, m_deviceID); + // m_idleCount = -1; + + // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption!"); + // } + // + // if (m_noDataCount < -200000)//任务开始时没收到流 + // { + // //LOG_ERROR("************m_noDataCount < -200000********{}******", m_deviceID); + // m_noDataCount = -1; + + // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption2!"); + // } + // + // } + //} + //} + // catch (GeneralException2& e) + //{ + // //LOG_ERROR("---> video streaming interruption!<---{}, error: {}", m_deviceID, e.err_msg()); + + // byte_buffer bb(64); + // bb << VasCmd::VAS_CMD_REALSTREAM_INTERRUPT << e.err_msg(); + + // if (m_usrParam) + // { + // if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) { + // try { + // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size()); + // } + // catch (GeneralException2& e) { + // //LOG_ERROR("[{}] send vas cmd VAS_CMD_REALSTREAM_INTERRUPT error: {}, {}", m_deviceID, e.err_code(), e.err_str()); + // } + // } + + // //通知网关关闭句柄 + // if(!((VideoSession *)GetUsrParam())->streamHandle().empty()) + // { + + // LOG_INFO("---->Notify hisense gateway release handle = {} !<----{}", ((VideoSession *)GetUsrParam())->streamHandle().c_str(), m_deviceID); + // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL) + // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); + // + // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD) + // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle()); + // + // //清理保活的句柄 + // ((VideoSession *)GetUsrParam())->streamHandle().clear(); + // } + // } + // + // bb.bset(0); + // + //} + m_rtpSessionPtr->EndDataAccess(); + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + LOG_INFO("[{}] OnRtpRecv exited.", m_deviceID); + + return 0; +} + + diff --git a/src/gb28181/RTPUdpReceiver.h b/src/gb28181/RTPUdpReceiver.h new file mode 100644 index 0000000..172508f --- /dev/null +++ b/src/gb28181/RTPUdpReceiver.h @@ -0,0 +1,62 @@ +#ifndef _RTP_UDP_RECEIVER_H_ +#define _RTP_UDP_RECEIVER_H_ + +#include "rtpudpv4transmitter.h" +#include "rtpipv4address.h" +#include "rtpsessionparams.h" +#include "rtpsession.h" +#include +#include +#include +#include +#include + +#include "RTPReceiver.h" + + +#define OUTTIME_RTCP 30*1000 +#define PAYLOAD 99 +#define PAYLOAD_PS 96 +#define PAYLOAD_H264 98 +#define PAYLOAD_MP4 97 + +#define UDP_SIZE 1400 +#define MIN_PORT 10000 +#define MAX_PORT 60000 +#define RTP_MAX_PACKET_LEN 1450 + +using namespace jrtplib; +using namespace std; + + +class UdpRTPSession; + +class RTPUdpReceiver: public RTPReceiver +{ +public: + RTPUdpReceiver(); + ~RTPUdpReceiver(); + + virtual bool Open(uint16_t localPort); + virtual bool IsOpened() ; + virtual void Close() ; + +public: + int OnRtpRecv(); + +private: + std::thread* m_rtpThreadPtr; // RTP接收线程 + + UdpRTPSession* m_rtpSessionPtr; // RTP会话 + std::atomic_bool m_bRtpExit; // 标识RTP收包线程闭 + + std::atomic_bool m_bOpened; + + int64_t m_idleCount; + int64_t m_noDataCount;//线程计数,用于打开流成功但是实际没流过来 + + RTPSessionParams* m_sessparamsPtr; + RTPUDPv4TransmissionParams* m_transparamsPtr; +}; + +#endif // _RTP_UDP_RECEIVER_H_ diff --git a/src/gb28181/demuxer.h b/src/gb28181/demuxer.h index c863529..7e6ab5c 100644 --- a/src/gb28181/demuxer.h +++ b/src/gb28181/demuxer.h @@ -8,6 +8,9 @@ { CMpeg2Demux class. } { } {*******************************************************/ +#ifndef _DEMUXER_H_ +#define _DEMUXER_H_ + #include #include "buffer.h" @@ -46,8 +49,8 @@ //typedef long long INT64; //typedef unsigned long long UINT64; -typedef int ReceiveFunction(unsigned char streamType, void* data, int size, uint64_t pts, uint64_t localPts, bool bKey, void* userData);//esص -typedef int ReceiveFunction2(unsigned int streamtype, void * Data, int Size, uint64_t pts, bool iskeyfram, void* userdata);//psص +typedef int ReceiveFunction(unsigned char streamType, void* data, int size, uint64_t pts, uint64_t localPts, bool bKey, void* userData);//es�ص� +typedef int ReceiveFunction2(unsigned int streamtype, void * Data, int Size, uint64_t pts, bool iskeyfram, void* userdata);//ps�ص� static /*_inline*/ unsigned int asm_swap32(unsigned int x); static /*_inline*/ unsigned short asm_swap16(unsigned short x); @@ -77,4 +80,6 @@ public: int AddData(void * Data, int Size/*, DWORD pts*/); void SetReceiveFunction(ReceiveFunction * func, void* userdata); void SetReceiveFunction2(ReceiveFunction2 * func2, void* userdata2); -}; \ No newline at end of file +}; + +#endif // _DEMUXER_H_ \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 35aff9a..9a43f7f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -28,8 +28,6 @@ #define MIN_RTP_PORT 10000 #define MAX_RTP_PORT 60000 -string data_home = "/mnt/f/fiss/data/"; - // ȡ MIN_RTP_PORT(10000)~MAX_RTP_PORT(60000)֮�������˿�(ż���������������˿ڿ���) int allocRtpPort() { @@ -90,7 +88,7 @@ int sum2 = 0; cudaStream_t stream[2]; -string data_home = "/mnt/data/cmhu/FFNvDecoder/data/"; +string data_home = "/data/tongtu/"; #define checkCudaErrors(S) do {CUresult status; \ @@ -173,7 +171,7 @@ void postDecoded(const void * userPtr, AVFrame * gpuFrame){ // cout << "decode successed ✿✿ヽ(°▽°)ノ✿ " << endl; int sum = sum1; - if (decoder->getName() == "dec1") + if (decoder->getName() == "dec0") { sum1 ++ ; sum = sum1; @@ -301,17 +299,17 @@ void decode_finished_cbk(const void* userPtr){ cout << "当前时间戳: " << get_cur_time() << endl; } +bool decode_request_stream_cbk(){ + cout << "需在此请求流" << endl; + return true; +} + // string test_uri = "rtmp://192.168.10.56:1935/objecteye/1"; // string test_uri = "/home/cmhu/data/output_800x480.mp4"; // string test_uri = "/home/cmhu/data/output_1920x1080.mp4"; // string test_uri = "rtsp://176.10.0.2:8554/stream"; // string test_uri = "/mnt/f/fiss/test_data/h265.mp4"; string test_uri = "rtsp://176.10.0.4:8554/stream"; -char* gpuid = "0"; -string test_uri = "ws://127.0.0.1:10000/sms/34020000002020000001/flv/hls/34020000001110005555_34020000001310005554.flv"; -// string test_uri = "rtsp://176.10.0.4:8554/stream"; - -char* gpu_id = "0"; void createDecode(int index, const char* gpu_id){ FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance(); @@ -323,7 +321,7 @@ void createDecode(int index, const char* gpu_id){ config.cfg.force_tcp = true; config.dec_type = DECODER_TYPE_FFMPEG; - config.cfg.gpuid = gpuid; + config.cfg.gpuid = gpu_id; // if (index % 2 == 0) // { // config.cfg.gpuid = "0"; @@ -343,19 +341,20 @@ void createDecode(int index, const char* gpu_id){ pDecManager->startDecodeByName(config.name); } -void createGB28181Decode(int index, char* gpuid){ +void createGB28181Decode(int index, char* gpu_id, int port){ FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance(); MgrDecConfig config; config.name = "dec" + to_string(index); config.cfg.uri = config.name; config.cfg.post_decoded_cbk = postDecoded; config.cfg.decode_finished_cbk = decode_finished_cbk; + config.cfg.request_stream_cbk = decode_request_stream_cbk; config.cfg.force_tcp = true; config.dec_type = DECODER_TYPE_GB28181; - config.cfg.port = 30012;//allocRtpPort(); + config.cfg.port = port;//allocRtpPort(); - config.cfg.gpuid = gpuid; + config.cfg.gpuid = gpu_id; AbstractDecoder* decoder = pDecManager->createDecoder(config); if (!decoder) @@ -376,23 +375,14 @@ void logFF(void *, int level, const char *fmt, va_list ap) int main(int argc, char* argv[]){ test_uri = argv[1]; - gpuid = argv[2]; - cout << test_uri << " gpu_id:" << gpu_id << endl; + char* gpuid = argv[2]; + int port = atoi(argv[3]); + cout << test_uri << " gpu_id:" << gpuid << " port:" << port << endl; // av_log_set_callback(&logFF); CheckCUDAProperty(atoi(gpuid)); - FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance(); - - // int count = 99; - // for (size_t i = 0; i < count ; i++) - // { - // createDecode(i); - // } - - - pthread_t m_decode_thread; pthread_create(&m_decode_thread,0, [](void* arg) @@ -400,7 +390,7 @@ int main(int argc, char* argv[]){ // cudaSetDevice(atoi(gpuid)); while (true) { - std::this_thread::sleep_for(std::chrono::milliseconds(5000)); + std::this_thread::sleep_for(std::chrono::minutes(1)); FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance(); int count = pDecManager->count(); cout << "当前时间:" << get_cur_time() << " 当前运行路数: " << pDecManager->count() << endl; @@ -410,8 +400,6 @@ int main(int argc, char* argv[]){ } ,nullptr); - - FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance(); int i = 0; @@ -423,18 +411,17 @@ int main(int argc, char* argv[]){ { break; } - switch (ch) { case 'f': case 'F': - createDecode(i, gpu_id); + createDecode(i, gpuid); i++; break; case 'g': case 'G': - createGB28181Decode(i, gpu_id); + createGB28181Decode(i, gpuid, port); i++; break; case 'r':