Commit 372e629ffafe987b4fd090032566f5086dd43684

Authored by ming
1 parent 2d236ac4

gb28181支持TCP数据流

.vscode/launch.json
@@ -37,6 +37,24 @@ @@ -37,6 +37,24 @@
37 "ignoreFailures": true 37 "ignoreFailures": true
38 } 38 }
39 ] 39 ]
  40 + },{
  41 + "name": "jrtp",
  42 + "type": "cppdbg",
  43 + "request": "launch",
  44 + "program": "${workspaceFolder}/bin/lib/jrtp_exe",
  45 + "args": ["40030","t"],
  46 + "stopAtEntry": false,
  47 + "cwd": "${workspaceFolder}/bin/lib",
  48 + "environment": [],
  49 + "externalConsole": false,
  50 + "MIMode": "gdb",
  51 + "setupCommands": [
  52 + {
  53 + "description": "Enable pretty-printing for gdb",
  54 + "text": "-enable-pretty-printing",
  55 + "ignoreFailures": true
  56 + }
  57 + ]
40 } 58 }
41 ] 59 ]
42 } 60 }
43 \ No newline at end of file 61 \ No newline at end of file
.vscode/settings.json
@@ -61,6 +61,7 @@ @@ -61,6 +61,7 @@
61 "streambuf": "cpp", 61 "streambuf": "cpp",
62 "cfenv": "cpp", 62 "cfenv": "cpp",
63 "cinttypes": "cpp", 63 "cinttypes": "cpp",
64 - "__nullptr": "cpp" 64 + "__nullptr": "cpp",
  65 + "list": "cpp"
65 } 66 }
66 } 67 }
67 \ No newline at end of file 68 \ No newline at end of file
jrtp/Makefile 0 → 100644
  1 +XX = g++
  2 +
  3 +
  4 +PROJECT_ROOT= /mnt/data/cmhu/FFNvDecoder
  5 +
  6 +DEPEND_DIR = $(PROJECT_ROOT)/bin
  7 +SRC_ROOT = $(PROJECT_ROOT)/jrtp
  8 +THIRDPARTY_ROOT = $(PROJECT_ROOT)/3rdparty
  9 +
  10 +
  11 +TARGET= $(DEPEND_DIR)/lib/jrtp_exe
  12 +
  13 +
  14 +JRTP_ROOT = $(THIRDPARTY_ROOT)/jrtp_export
  15 +
  16 +
  17 +INCLUDE= -I $(SRC_ROOT)\
  18 + -I $(JRTP_ROOT)/jrtplib/include/jrtplib3 \
  19 + -I $(JRTP_ROOT)/jthread/include/jthread
  20 +
  21 +LIBSPATH= -L $(JRTP_ROOT)/jthread/lib -l:libjthread.a \
  22 + -L $(JRTP_ROOT)/jrtplib/lib -l:libjrtp.a
  23 +
  24 +
  25 +CFLAGS= -g -O0 -fPIC $(INCLUDE) -pthread -lrt -lz -std=c++11 -fvisibility=hidden -Wl,-Bsymbolic -ldl
  26 + # -DUNICODE -D_UNICODE
  27 +
  28 +
  29 +SRCS:=$(wildcard $(SRC_ROOT)/*.cpp)
  30 +OBJS = $(patsubst %.cpp, %.o, $(notdir $(SRCS)))
  31 +
  32 +
  33 +$(TARGET):$(OBJS) $(CU_OBJS)
  34 + rm -f $(TARGET)
  35 + $(XX) -o $@ $^ $(CFLAGS) $(LIBSPATH) $(LIBS) -Wwrite-strings
  36 + rm -f *.o
  37 +
  38 +%.o:$(SRC_ROOT)/%.cpp
  39 + $(XX) $(CFLAGS) -c $<
  40 +
  41 +clean:
  42 + rm -f *.o $(TARGET)
0 \ No newline at end of file 43 \ No newline at end of file
jrtp/example3.cpp 0 → 100644
  1 +/*
  2 + This IPv4 example listens for incoming packets and automatically adds destinations
  3 + for new sources.
  4 +*/
  5 +
  6 +#include "rtpsession.h"
  7 +#include "rtppacket.h"
  8 +#include "rtpudpv4transmitter.h"
  9 +#include "rtptcptransmitter.h"
  10 +#include "rtpipv4address.h"
  11 +#include "rtptcpaddress.h"
  12 +#include "rtpsessionparams.h"
  13 +#include "rtperrors.h"
  14 +#include "rtpsourcedata.h"
  15 +#include <stdlib.h>
  16 +#include <stdio.h>
  17 +#include <iostream>
  18 +#include <string>
  19 +
  20 +using namespace jrtplib;
  21 +using namespace std;
  22 +
  23 +//
  24 +// This function checks if there was a RTP error. If so, it displays an error
  25 +// message and exists.
  26 +//
  27 +
  28 +void checkerror(int rtperr)
  29 +{
  30 + if (rtperr < 0)
  31 + {
  32 + std::cout << "ERROR: " << RTPGetErrorString(rtperr) << std::endl;
  33 + exit(-1);
  34 + }
  35 +}
  36 +
  37 +//
  38 +// The new class routine
  39 +//
  40 +
  41 +class MyRTPSession : public RTPSession
  42 +{
  43 +protected:
  44 + void OnNewSource(RTPSourceData *dat)
  45 + {
  46 + if (dat->IsOwnSSRC())
  47 + return;
  48 +
  49 + uint32_t ip;
  50 + uint16_t port;
  51 +
  52 + if (dat->GetRTPDataAddress() != 0)
  53 + {
  54 + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress());
  55 + ip = addr->GetIP();
  56 + port = addr->GetPort();
  57 + }
  58 + else if (dat->GetRTCPDataAddress() != 0)
  59 + {
  60 + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress());
  61 + ip = addr->GetIP();
  62 + port = addr->GetPort()-1;
  63 + }
  64 + else
  65 + return;
  66 +
  67 + RTPIPv4Address dest(ip,port);
  68 + AddDestination(dest);
  69 +
  70 + struct in_addr inaddr;
  71 + inaddr.s_addr = htonl(ip);
  72 + std::cout << "Adding destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl;
  73 + }
  74 +
  75 + void OnBYEPacket(RTPSourceData *dat)
  76 + {
  77 + if (dat->IsOwnSSRC())
  78 + return;
  79 +
  80 + uint32_t ip;
  81 + uint16_t port;
  82 +
  83 + if (dat->GetRTPDataAddress() != 0)
  84 + {
  85 + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress());
  86 + ip = addr->GetIP();
  87 + port = addr->GetPort();
  88 + }
  89 + else if (dat->GetRTCPDataAddress() != 0)
  90 + {
  91 + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress());
  92 + ip = addr->GetIP();
  93 + port = addr->GetPort()-1;
  94 + }
  95 + else
  96 + return;
  97 +
  98 + RTPIPv4Address dest(ip,port);
  99 + DeleteDestination(dest);
  100 +
  101 + struct in_addr inaddr;
  102 + inaddr.s_addr = htonl(ip);
  103 + std::cout << "Deleting destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl;
  104 + }
  105 +
  106 + void OnRemoveSource(RTPSourceData *dat)
  107 + {
  108 + if (dat->IsOwnSSRC())
  109 + return;
  110 + if (dat->ReceivedBYE())
  111 + return;
  112 +
  113 + uint32_t ip;
  114 + uint16_t port;
  115 +
  116 + if (dat->GetRTPDataAddress() != 0)
  117 + {
  118 + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTPDataAddress());
  119 + ip = addr->GetIP();
  120 + port = addr->GetPort();
  121 + }
  122 + else if (dat->GetRTCPDataAddress() != 0)
  123 + {
  124 + const RTPIPv4Address *addr = (const RTPIPv4Address *)(dat->GetRTCPDataAddress());
  125 + ip = addr->GetIP();
  126 + port = addr->GetPort()-1;
  127 + }
  128 + else
  129 + return;
  130 +
  131 + RTPIPv4Address dest(ip,port);
  132 + DeleteDestination(dest);
  133 +
  134 + struct in_addr inaddr;
  135 + inaddr.s_addr = htonl(ip);
  136 + std::cout << "Deleting destination " << std::string(inet_ntoa(inaddr)) << ":" << port << std::endl;
  137 + }
  138 +
  139 + void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled)
  140 + {
  141 + printf("SSRC %x Got packet (%d bytes) in OnValidatedRTPPacket from source 0x%04x!\n", GetLocalSSRC(),
  142 + (int)rtppack->GetPayloadLength(), srcdat->GetSSRC());
  143 + DeletePacket(rtppack);
  144 + *ispackethandled = true;
  145 + }
  146 +
  147 + void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength)
  148 + {
  149 + char msg[1024];
  150 +
  151 + memset(msg, 0, sizeof(msg));
  152 + if (itemlength >= sizeof(msg))
  153 + itemlength = sizeof(msg)-1;
  154 +
  155 + memcpy(msg, itemdata, itemlength);
  156 + printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC());
  157 + }
  158 +};
  159 +
  160 +class MyTCPTransmitter : public RTPTCPTransmitter
  161 +{
  162 +public:
  163 + MyTCPTransmitter() : RTPTCPTransmitter(0){ }
  164 +
  165 + void OnSendError(SocketType sock)
  166 + {
  167 + cout << ": Error sending over socket " << sock << ", removing destination" << endl;
  168 + DeleteDestination(RTPTCPAddress(sock));
  169 + }
  170 +
  171 + void OnReceiveError(SocketType sock)
  172 + {
  173 + cout << ": Error receiving from socket " << sock << ", removing destination" << endl;
  174 + DeleteDestination(RTPTCPAddress(sock));
  175 + }
  176 +};
  177 +
  178 +int udp_mode(int port){
  179 + MyRTPSession sess;
  180 +
  181 + std::string ipstr;
  182 + int status,i,num;
  183 +
  184 + num = 1000000*30;
  185 +
  186 + // Now, we'll create a RTP session, set the destination
  187 + // and poll for incoming data.
  188 +
  189 + RTPUDPv4TransmissionParams transparams;
  190 + RTPSessionParams sessparams;
  191 +
  192 + // IMPORTANT: The local timestamp unit MUST be set, otherwise
  193 + // RTCP Sender Report info will be calculated wrong
  194 + // In this case, we'll be just use 8000 samples per second.
  195 + sessparams.SetOwnTimestampUnit(1.0/8000.0);
  196 +
  197 + sessparams.SetAcceptOwnPackets(true);
  198 + transparams.SetPortbase(port);
  199 + status = sess.Create(sessparams,&transparams);
  200 + checkerror(status);
  201 +
  202 + std::cout << "begin.." << std::endl;
  203 +
  204 + for (i = 1 ; i <= num ; i++)
  205 + {
  206 + sess.BeginDataAccess();
  207 +
  208 + // check incoming packets
  209 + if (sess.GotoFirstSourceWithData())
  210 + {
  211 + do
  212 + {
  213 + RTPPacket *pack;
  214 +
  215 + while ((pack = sess.GetNextPacket()) != NULL)
  216 + {
  217 + // You can examine the data here
  218 + printf("Got packet !\n");
  219 +
  220 + // we don't longer need the packet, so
  221 + // we'll delete it
  222 + sess.DeletePacket(pack);
  223 + }
  224 + } while (sess.GotoNextSourceWithData());
  225 + }
  226 +
  227 + sess.EndDataAccess();
  228 +
  229 +#ifndef RTP_SUPPORT_THREAD
  230 + status = sess.Poll();
  231 + checkerror(status);
  232 +#endif // RTP_SUPPORT_THREAD
  233 +
  234 + RTPTime::Wait(RTPTime(0,1));
  235 + }
  236 +
  237 + sess.BYEDestroy(RTPTime(10,0),0,0);
  238 +
  239 + return 0;
  240 +}
  241 +
  242 +int tcp_mode(int port){
  243 + int nListener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  244 + if (nListener < 0)
  245 + {
  246 + return -1;
  247 + }
  248 +
  249 + sockaddr_in serverAddr;
  250 + memset(&serverAddr, 0, sizeof(sockaddr_in));
  251 + serverAddr.sin_family = AF_INET;
  252 + serverAddr.sin_port = htons(port);
  253 + serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
  254 + int nRet = bind(nListener, (sockaddr*)&serverAddr, sizeof(serverAddr));
  255 + if (nRet == -1)
  256 + {
  257 + return -1;
  258 + }
  259 +
  260 + if (listen(nListener, 1) == -1)
  261 + {
  262 + return -1;
  263 + }
  264 +
  265 + sockaddr_in clientAddr;
  266 + int nLen = sizeof(sockaddr_in);
  267 + int nServer = accept(nListener, (sockaddr*)&clientAddr, (socklen_t * ) &nLen);
  268 + if (nServer == -1)
  269 + {
  270 + return -1;
  271 + }
  272 +
  273 +
  274 + int nPackSize = 45678;
  275 + RTPSessionParams sessparams;
  276 + sessparams.SetProbationType(RTPSources::NoProbation);
  277 + sessparams.SetOwnTimestampUnit(90000.0 / 25.0);
  278 + sessparams.SetMaximumPacketSize(nPackSize + 64);
  279 +
  280 + MyTCPTransmitter trans;
  281 + int status = trans.Init(true);
  282 + status = trans.Create(65535, NULL);
  283 +
  284 + MyRTPSession sess;
  285 + status = sess.Create(sessparams, &trans);
  286 + if (status < 0)
  287 + {
  288 + std::cout << "create session error!!" << std::endl;
  289 + return -1;
  290 + }
  291 +
  292 + sess.AddDestination(RTPTCPAddress(nServer));
  293 +
  294 + std::cout << "begin.." << std::endl;
  295 +
  296 + while (true)
  297 + {
  298 + RTPTime::Wait(RTPTime(1,0));
  299 + }
  300 +
  301 +
  302 + while(true){
  303 + sess.BeginDataAccess();
  304 +
  305 + // check incoming packets
  306 + if (sess.GotoFirstSourceWithData())
  307 + {
  308 + do
  309 + {
  310 + RTPPacket *pack;
  311 +
  312 + while ((pack = sess.GetNextPacket()) != NULL)
  313 + {
  314 + // You can examine the data here
  315 + printf("Got packet !\n");
  316 +
  317 + // we don't longer need the packet, so
  318 + // we'll delete it
  319 + sess.DeletePacket(pack);
  320 + }
  321 + } while (sess.GotoNextSourceWithData());
  322 + }
  323 +
  324 + sess.EndDataAccess();
  325 +
  326 +#ifndef RTP_SUPPORT_THREAD
  327 + status = sess.Poll();
  328 + checkerror(status);
  329 +#endif // RTP_SUPPORT_THREAD
  330 +
  331 + RTPTime::Wait(RTPTime(1,0));
  332 + }
  333 +
  334 + sess.BYEDestroy(RTPTime(10,0),0,0);
  335 +
  336 + return 0;
  337 +}
  338 +//
  339 +// The main routine
  340 +//
  341 +
  342 +int main(int argc, char* argv[]){
  343 +
  344 + int port = atoi(argv[1]);
  345 + std::cout << "port:" << port << std::endl;
  346 +
  347 + while (true)
  348 + {
  349 + int ch = getchar();
  350 + if (ch == 'q')
  351 + {
  352 + break;
  353 + }
  354 +
  355 + switch (ch)
  356 + {
  357 + case 'u':
  358 + udp_mode(port);
  359 + break;
  360 + case 't':
  361 + tcp_mode(port);
  362 + break;
  363 + default:
  364 + break;
  365 + }
  366 +
  367 + /* code */
  368 + }
  369 +
  370 + return 0;
  371 +}
  372 +
jrtp/tcp_server.cpp 0 → 100644
  1 +/*
  2 + This IPv4 example listens for incoming packets and automatically adds destinations
  3 + for new sources.
  4 +*/
  5 +
  6 +#include "rtpsession.h"
  7 +#include "rtppacket.h"
  8 +#include "rtpudpv4transmitter.h"
  9 +#include "rtptcptransmitter.h"
  10 +#include "rtpipv4address.h"
  11 +#include "rtptcpaddress.h"
  12 +#include "rtpsessionparams.h"
  13 +#include "rtperrors.h"
  14 +#include "rtpsourcedata.h"
  15 +#include <stdlib.h>
  16 +#include <stdio.h>
  17 +#include <iostream>
  18 +#include <string>
  19 +#include <thread>
  20 +#include <chrono>
  21 +#include <unistd.h>
  22 +
  23 +#include <netinet/tcp.h>
  24 +#include <sys/types.h>
  25 +#include <sys/socket.h>
  26 +
  27 +using namespace jrtplib;
  28 +using namespace std;
  29 +
  30 +//
  31 +// This function checks if there was a RTP error. If so, it displays an error
  32 +// message and exists.
  33 +//
  34 +
  35 +void checkerror(int rtperr)
  36 +{
  37 + if (rtperr < 0)
  38 + {
  39 + std::cout << "ERROR: " << RTPGetErrorString(rtperr) << std::endl;
  40 + exit(-1);
  41 + }
  42 +}
  43 +
  44 +static long long get_cur_time(){
  45 + chrono::time_point<chrono::system_clock, chrono::milliseconds> tpMs
  46 + = chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now());
  47 +
  48 + return tpMs.time_since_epoch().count();
  49 +}
  50 +
  51 +//
  52 +// The new class routine
  53 +//
  54 +
  55 +// class MyRTPSession : public RTPSession
  56 +// {
  57 +// protected:
  58 +
  59 +// void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled)
  60 +// {
  61 +// // printf("timestamp: %ld SSRC %x Got packet (%d bytes) in OnValidatedRTPPacket from source 0x%04x!\n", get_cur_time(), GetLocalSSRC(),
  62 +// // (int)rtppack->GetPayloadLength(), srcdat->GetSSRC());
  63 +// DeletePacket(rtppack);
  64 +// *ispackethandled = true;
  65 +// }
  66 +
  67 +// void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength)
  68 +// {
  69 +// char msg[1024];
  70 +
  71 +// memset(msg, 0, sizeof(msg));
  72 +// if (itemlength >= sizeof(msg))
  73 +// itemlength = sizeof(msg)-1;
  74 +
  75 +// memcpy(msg, itemdata, itemlength);
  76 +// // printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC());
  77 +// }
  78 +
  79 +// virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress)
  80 +// {
  81 +// AddDestination(*senderAddress);
  82 +// }
  83 +
  84 +// virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress)
  85 +// {
  86 +// //AddDestination(*senderaddress);
  87 +// //const char* name = "hi~";
  88 +// //SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8);
  89 +
  90 +// printf("send rtcp app");
  91 +// }
  92 +// };
  93 +
  94 +bool bSocket = false;
  95 +
  96 +class MyTCPTransmitter : public RTPTCPTransmitter
  97 +{
  98 +public:
  99 + MyTCPTransmitter() : RTPTCPTransmitter(0){ }
  100 +
  101 + void OnSendError(SocketType sock)
  102 + {
  103 + cout << "timestamp: " << get_cur_time() << " : Error sending over socket " << sock << ", removing destination" << endl;
  104 + DeleteDestination(RTPTCPAddress(sock));
  105 +
  106 + bSocket = false;
  107 + }
  108 +
  109 + void OnReceiveError(SocketType sock)
  110 + {
  111 + cout << ": Error receiving from socket " << sock << ", removing destination" << endl;
  112 + DeleteDestination(RTPTCPAddress(sock));
  113 + }
  114 +};
  115 +
  116 +RTPSession sess;
  117 +MyTCPTransmitter* trans;
  118 +RTPSessionParams sessparams;
  119 +
  120 +int thread_func(void* param){
  121 +
  122 + cout << "thread started..." << endl;
  123 +
  124 + int* p = (int*) param ;
  125 + sockaddr_in clientAddr;
  126 + int nLen = sizeof(sockaddr_in);
  127 + int nServer = -1;//accept(*p, (sockaddr*)&clientAddr, (socklen_t * ) &nLen);
  128 + // if (nServer == -1){
  129 + // return -1;
  130 + // }
  131 +
  132 + cout << "timestamp: " << get_cur_time() << " while() start" << endl;
  133 +
  134 + while(true){
  135 +
  136 + while (!bSocket)
  137 + {
  138 + nServer = accept(*p, (sockaddr*)&clientAddr, (socklen_t * ) &nLen);
  139 + if (nServer == -1)
  140 + {
  141 + std::this_thread::sleep_for(std::chrono::milliseconds(10));
  142 + continue;
  143 + }
  144 +
  145 + cout << "nServer = " << nServer << endl;
  146 + sess.AddDestination(RTPTCPAddress(nServer));
  147 + bSocket = true;
  148 + break;
  149 + }
  150 +
  151 + sess.BeginDataAccess();
  152 +
  153 + // check incoming packets
  154 + if (sess.GotoFirstSourceWithData())
  155 + {
  156 + do
  157 + {
  158 + RTPPacket *pack;
  159 +
  160 + while ((pack = sess.GetNextPacket()) != NULL)
  161 + {
  162 + // You can examine the data here
  163 + // printf("Got packet !\n");
  164 + cout << "Got packet ! timestamp: " << get_cur_time() << endl;
  165 +
  166 + // we don't longer need the packet, so
  167 + // we'll delete it
  168 + sess.DeletePacket(pack);
  169 + }
  170 + } while (sess.GotoNextSourceWithData());
  171 + }
  172 +
  173 + sess.EndDataAccess();
  174 +
  175 + sess.Poll();
  176 +
  177 + std::this_thread::sleep_for(std::chrono::milliseconds(10));
  178 + }
  179 +
  180 + close(*p);
  181 +
  182 + // sess.BYEDestroy(RTPTime(10,0),0,0);
  183 +}
  184 +
  185 +int nListener = -1;
  186 +int tcp_mode(int port){
  187 + nListener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  188 + if (nListener < 0)
  189 + {
  190 + return -1;
  191 + }
  192 +
  193 + sockaddr_in serverAddr;
  194 + memset(&serverAddr, 0, sizeof(sockaddr_in));
  195 + serverAddr.sin_family = AF_INET;
  196 + serverAddr.sin_port = htons(port);
  197 + serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
  198 + int nRet = bind(nListener, (sockaddr*)&serverAddr, sizeof(serverAddr));
  199 + if (nRet == -1)
  200 + {
  201 + return -1;
  202 + }
  203 +
  204 + if (listen(nListener, 1) == -1)
  205 + {
  206 + return -1;
  207 + }
  208 +
  209 + int nPackSize = 45678;
  210 +
  211 + sessparams.SetProbationType(RTPSources::NoProbation);
  212 + sessparams.SetOwnTimestampUnit(90000.0 / 25.0);
  213 + sessparams.SetMaximumPacketSize(nPackSize + 64);
  214 +
  215 + trans = new MyTCPTransmitter();
  216 + int status = trans->Init(false);
  217 + status = trans->Create(65535, NULL);
  218 +
  219 + status = sess.Create(sessparams, trans);
  220 + if (status < 0)
  221 + {
  222 + std::cout << "create session error!!" << std::endl;
  223 + return -1;
  224 + }
  225 +
  226 + std::cout << "begin.." << std::endl;
  227 +
  228 + thread* t = new std::thread(thread_func, &nListener);
  229 +
  230 + return 0;
  231 +}
  232 +//
  233 +// The main routine
  234 +//
  235 +
  236 +int main(int argc, char* argv[]){
  237 +
  238 + tcp_mode(40032);
  239 +
  240 + while(getchar() =='q');
  241 +
  242 + return 0;
  243 +}
  244 +
src/AbstractDecoder.h
@@ -32,6 +32,8 @@ typedef void(*POST_DECODE_CALLBACK)(const void * userPtr, AVFrame * gpuFrame); @@ -32,6 +32,8 @@ typedef void(*POST_DECODE_CALLBACK)(const void * userPtr, AVFrame * gpuFrame);
32 32
33 typedef void(*DECODE_FINISHED_CALLBACK)(const void* userPtr); 33 typedef void(*DECODE_FINISHED_CALLBACK)(const void* userPtr);
34 34
  35 +typedef bool(*DECODE_REQUEST_STREAM_CALLBACK)();
  36 +
35 struct FFDecConfig{ 37 struct FFDecConfig{
36 string uri; // 视频地址 38 string uri; // 视频地址
37 POST_DECODE_CALLBACK post_decoded_cbk; // 解码数据回调接口 39 POST_DECODE_CALLBACK post_decoded_cbk; // 解码数据回调接口
@@ -41,6 +43,7 @@ struct FFDecConfig{ @@ -41,6 +43,7 @@ struct FFDecConfig{
41 int skip_frame{1}; // 跳帧数 43 int skip_frame{1}; // 跳帧数
42 44
43 int port; // gb28181接收数据的端口号 45 int port; // gb28181接收数据的端口号
  46 + DECODE_REQUEST_STREAM_CALLBACK request_stream_cbk; // gb28181请求流
44 }; 47 };
45 48
46 enum DECODER_TYPE{ 49 enum DECODER_TYPE{
src/FFNvDecoder.cpp
@@ -69,6 +69,9 @@ bool FFNvDecoder::init(FFDecConfig&amp; cfg) @@ -69,6 +69,9 @@ bool FFNvDecoder::init(FFDecConfig&amp; cfg)
69 m_bReal = true; 69 m_bReal = true;
70 } 70 }
71 71
  72 + post_decoded_cbk = cfg.post_decoded_cbk;
  73 + decode_finished_cbk = cfg.decode_finished_cbk;
  74 +
72 return init(cfg.uri.c_str(), cfg.gpuid.c_str(),cfg.force_tcp); 75 return init(cfg.uri.c_str(), cfg.gpuid.c_str(),cfg.force_tcp);
73 } 76 }
74 77
src/FFNvDecoderManager.cpp
@@ -15,13 +15,12 @@ AbstractDecoder* FFNvDecoderManager::createDecoder(MgrDecConfig config){ @@ -15,13 +15,12 @@ AbstractDecoder* FFNvDecoderManager::createDecoder(MgrDecConfig config){
15 if (config.cfg.post_decoded_cbk == nullptr || config.cfg.decode_finished_cbk== nullptr){ 15 if (config.cfg.post_decoded_cbk == nullptr || config.cfg.decode_finished_cbk== nullptr){
16 return nullptr; 16 return nullptr;
17 } 17 }
18 -  
19 18
20 std::lock_guard<std::mutex> l(m_mutex); 19 std::lock_guard<std::mutex> l(m_mutex);
21 20
22 auto it = decoderMap.find(config.name); 21 auto it = decoderMap.find(config.name);
23 if (it != decoderMap.end()){ 22 if (it != decoderMap.end()){
24 - LOG_ERROR("已存在name所标记的解码器"); 23 + LOG_ERROR("已存在name为{}的解码器", config.name);
25 return nullptr; 24 return nullptr;
26 } 25 }
27 26
@@ -41,8 +40,6 @@ AbstractDecoder* FFNvDecoderManager::createDecoder(MgrDecConfig config){ @@ -41,8 +40,6 @@ AbstractDecoder* FFNvDecoderManager::createDecoder(MgrDecConfig config){
41 if (bRet) 40 if (bRet)
42 { 41 {
43 dec->setName(config.name) ; 42 dec->setName(config.name) ;
44 - dec->post_decoded_cbk = config.cfg.post_decoded_cbk;  
45 - dec->decode_finished_cbk = config.cfg.decode_finished_cbk;  
46 decoderMap[config.name] = dec; 43 decoderMap[config.name] = dec;
47 44
48 LOG_INFO("[{}][{}]- 解码器初始化成功",config.name, config.cfg.uri); 45 LOG_INFO("[{}][{}]- 解码器初始化成功",config.name, config.cfg.uri);
src/Makefile
@@ -30,7 +30,7 @@ INCLUDE= -I $(DEPEND_DIR)/include \ @@ -30,7 +30,7 @@ INCLUDE= -I $(DEPEND_DIR)/include \
30 30
31 LIBSPATH= -L $(DEPEND_DIR)/lib -lavformat -lavcodec -lswscale -lavutil -lavfilter -lswresample -lavdevice \ 31 LIBSPATH= -L $(DEPEND_DIR)/lib -lavformat -lavcodec -lswscale -lavutil -lavfilter -lswresample -lavdevice \
32 -L $(CUDA_ROOT)/lib64 -lcuda -lcudart -lnvcuvid -lcurand -lcublas -lnvjpeg \ 32 -L $(CUDA_ROOT)/lib64 -lcuda -lcudart -lnvcuvid -lcurand -lcublas -lnvjpeg \
33 - -L $(SPDLOG_ROOT)/lib64 -l:libspdlog.a \ 33 + -L $(SPDLOG_ROOT) -l:libspdlog.a \
34 -L $(JRTP_ROOT)/jthread/lib -l:libjthread.a \ 34 -L $(JRTP_ROOT)/jthread/lib -l:libjthread.a \
35 -L $(JRTP_ROOT)/jrtplib/lib -l:libjrtp.a 35 -L $(JRTP_ROOT)/jrtplib/lib -l:libjrtp.a
36 36
src/gb28181/FFGB28181Decoder.cpp
@@ -12,6 +12,9 @@ extern &quot;C&quot; { @@ -12,6 +12,9 @@ extern &quot;C&quot; {
12 12
13 #include "../logger.hpp" 13 #include "../logger.hpp"
14 14
  15 +#include"RTPTcpReceiver.h"
  16 +#include"RTPUdpReceiver.h"
  17 +
15 #define ECLOSED 0 18 #define ECLOSED 0
16 #define ECLOSING 1 19 #define ECLOSING 1
17 #define ERUNNING 2 20 #define ERUNNING 2
@@ -19,14 +22,14 @@ extern &quot;C&quot; { @@ -19,14 +22,14 @@ extern &quot;C&quot; {
19 22
20 static void RTP_Stream_CallBack(void* userdata, int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts) 23 static void RTP_Stream_CallBack(void* userdata, int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts)
21 { 24 {
22 - FFGB28181Decoder* session = (FFGB28181Decoder*)userdata;  
23 - session->stream_callback(videoType, data, len, isKey, pts, localPts); 25 + FFGB28181Decoder* decoder = (FFGB28181Decoder*)userdata;
  26 + decoder->stream_callback(videoType, data, len, isKey, pts, localPts);
24 } 27 }
25 28
26 static void RTP_Stream_End_CallBack(void* userdata) 29 static void RTP_Stream_End_CallBack(void* userdata)
27 { 30 {
28 - FFGB28181Decoder* session = (FFGB28181Decoder*)userdata;  
29 - session->stream_end_callback(); 31 + FFGB28181Decoder* decoder = (FFGB28181Decoder*)userdata;
  32 + decoder->stream_end_callback();
30 } 33 }
31 34
32 FFGB28181Decoder::FFGB28181Decoder() { 35 FFGB28181Decoder::FFGB28181Decoder() {
@@ -59,20 +62,30 @@ void FFGB28181Decoder::close(){ @@ -59,20 +62,30 @@ void FFGB28181Decoder::close(){
59 62
60 m_status = ECLOSING; 63 m_status = ECLOSING;
61 64
62 - LOG_INFO("real decode thread exit success 1--{}", m_dec_name); 65 + LOG_DEBUG("real decode thread exit success 1--{}", m_dec_name);
  66 +
  67 + if(nullptr != m_rtpPtr){
  68 + if (m_rtpPtr->IsOpened()) {
  69 + m_rtpPtr->Close();
  70 + LOG_DEBUG("real decode thread exit success 2--{}", m_dec_name);
  71 + }
63 72
64 - if (m_rtp.IsOpened()) {  
65 - m_rtp.Close();  
66 - LOG_INFO("real decode thread exit success 4--{}", m_dec_name); 73 + delete m_rtpPtr;
  74 + m_rtpPtr = nullptr;
67 } 75 }
68 76
69 - stream_end_callback(); 77 + LOG_INFO("解码器关闭成功 --{}", m_dec_name);
70 78
71 m_status = ECLOSED; 79 m_status = ECLOSED;
72 } 80 }
73 81
74 bool FFGB28181Decoder::init(FFDecConfig& cfg){ 82 bool FFGB28181Decoder::init(FFDecConfig& cfg){
75 - if (m_rtp.IsOpened()){ 83 + if(cfg.force_tcp){
  84 + m_rtpPtr = new RTPTcpReceiver();
  85 + }else{
  86 + m_rtpPtr = new RTPUdpReceiver();
  87 + }
  88 + if(nullptr == m_rtpPtr){
76 return false; 89 return false;
77 } 90 }
78 91
@@ -82,12 +95,23 @@ bool FFGB28181Decoder::init(FFDecConfig&amp; cfg){ @@ -82,12 +95,23 @@ bool FFGB28181Decoder::init(FFDecConfig&amp; cfg){
82 95
83 m_gpuid = atoi(cfg.gpuid.c_str()); 96 m_gpuid = atoi(cfg.gpuid.c_str());
84 97
85 - m_rtp.SetDeviceID(m_dec_name); 98 + m_rtpPtr->SetDeviceID(m_dec_name);
  99 +
  100 + if(cfg.request_stream_cbk == nullptr){
  101 + LOG_INFO("request_stream_cbk 为 nullptr -- {}", m_dec_name);
  102 + return false;
  103 + }
  104 +
  105 + post_decoded_cbk = cfg.post_decoded_cbk;
  106 + decode_finished_cbk = cfg.decode_finished_cbk;
  107 + m_rtpPtr->SetRequestStreamCallback(cfg.request_stream_cbk);
86 108
87 m_port = cfg.port; 109 m_port = cfg.port;
88 110
89 m_cfg = cfg; 111 m_cfg = cfg;
90 112
  113 + LOG_INFO("init - {} : ", m_dec_name, m_port);
  114 +
91 return true; 115 return true;
92 } 116 }
93 117
@@ -95,10 +119,12 @@ bool FFGB28181Decoder::start() { @@ -95,10 +119,12 @@ bool FFGB28181Decoder::start() {
95 119
96 m_status = ERUNNING; 120 m_status = ERUNNING;
97 121
98 - m_rtp.SetOutputCallback(RTP_Stream_CallBack, this);  
99 - m_rtp.SetVodEndCallback(RTP_Stream_End_CallBack, this); 122 + m_rtpPtr->SetOutputCallback(RTP_Stream_CallBack, this);
  123 + m_rtpPtr->SetVodEndCallback(RTP_Stream_End_CallBack, this);
100 124
101 - return m_rtp.Open((uint16_t)m_port); 125 + LOG_INFO("start - {} {}: ", m_dec_name, m_port);
  126 +
  127 + return m_rtpPtr->Open((uint16_t)m_port);
102 } 128 }
103 129
104 void FFGB28181Decoder::setDecKeyframe(bool bKeyframe){ 130 void FFGB28181Decoder::setDecKeyframe(bool bKeyframe){
@@ -135,6 +161,7 @@ void FFGB28181Decoder::stream_callback(int videoType, char* data, int len, int i @@ -135,6 +161,7 @@ void FFGB28181Decoder::stream_callback(int videoType, char* data, int len, int i
135 AVDictionary *gpu_options = nullptr; 161 AVDictionary *gpu_options = nullptr;
136 162
137 if (m_pAVCodecCtx == nullptr) { 163 if (m_pAVCodecCtx == nullptr) {
  164 + LOG_INFO("frame data is zero --{}", m_dec_name);
138 if (VIDEO_TYPE_H264 == videoType) { 165 if (VIDEO_TYPE_H264 == videoType) {
139 if (m_gpuid >= 0){ 166 if (m_gpuid >= 0){
140 m_pAVCodec = avcodec_find_decoder_by_name("h264_cuvid"); 167 m_pAVCodec = avcodec_find_decoder_by_name("h264_cuvid");
@@ -314,5 +341,5 @@ bool FFGB28181Decoder::isSurport(FFDecConfig&amp; cfg){ @@ -314,5 +341,5 @@ bool FFGB28181Decoder::isSurport(FFDecConfig&amp; cfg){
314 } 341 }
315 342
316 int FFGB28181Decoder::getCachedQueueLength(){ 343 int FFGB28181Decoder::getCachedQueueLength(){
317 - return m_rtp.GetPsFrameListSize(); 344 + return m_rtpPtr->GetPsFrameListSize();
318 } 345 }
319 \ No newline at end of file 346 \ No newline at end of file
src/gb28181/FFGB28181Decoder.h
@@ -53,7 +53,7 @@ private: @@ -53,7 +53,7 @@ private:
53 53
54 int m_gpuid {-1}; 54 int m_gpuid {-1};
55 55
56 - RTPReceiver m_rtp; 56 + RTPReceiver* m_rtpPtr;
57 int m_port; 57 int m_port;
58 uint64_t m_frameCount {}; 58 uint64_t m_frameCount {};
59 59
src/gb28181/RTPReceiver.cpp
1 -  
2 #include "RTPReceiver.h" 1 #include "RTPReceiver.h"
3 -#include <iostream>  
4 -#include <time.h>  
5 -  
6 -#include <thread>  
7 -#include <chrono>  
8 - 2 +#include "rtppacket.h"
9 #include "../logger.hpp" 3 #include "../logger.hpp"
10 -  
11 -using namespace std; 4 +#include <thread>
12 5
13 #define BUFFERSIZE_1024 1024 6 #define BUFFERSIZE_1024 1024
14 -#define BUFFERSIZE_GAP 1024//5120 //1024*5 7 +const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2;
15 8
16 -namespace 9 +// PS解包器回调
  10 +static int ReceivePESFunction(unsigned char streamid, void * data, int size, uint64_t pts, uint64_t localPts, bool key, void* param)
17 { 11 {
18 - const int kH264EndFlag = 0x00000001;  
19 - const int kH264EndFlag_ = 0x000001;  
20 - const int kMpeg4IEndFlag = 0x000001B0;  
21 - const int kMpeg4PEndFlag = 0x000001B6;  
22 - const int kSvacEndFlag = 0x000001B6;  
23 - const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2;  
24 - const int kRtpRecvBufferSize = BUFFERSIZE_1024*BUFFERSIZE_1024*2;  
25 - const int kSockBufferSize = BUFFERSIZE_1024*BUFFERSIZE_1024*2;  
26 - const uint16_t kInvalidPort = 0;  
27 -  
28 - // PS解包器回调  
29 - int ReceivePESFunction(unsigned char streamid, void * data, int size, uint64_t pts, uint64_t localPts, bool key, void* param) 12 + if (NULL != data && size > 0)
  13 + {
  14 + ((RTPReceiver*)param)->OnPsDemux(streamid, (BYTE*)data, size, key, (uint64_t)pts, (uint64_t)localPts);
  15 + }
  16 + return 0;
  17 +}
  18 +
  19 +static int ps_demuxer_thread_(void* param)
  20 +{
  21 + if (!param)
30 { 22 {
31 - if (NULL != data && size > 0)  
32 - {  
33 - ((RTPReceiver*)param)->OnPsDemux(streamid, (BYTE*)data, size, key, (uint64_t)pts, (uint64_t)localPts);  
34 - }  
35 - return 0; 23 + return -1;
36 } 24 }
37 25
38 -}; // namespace  
39 -  
40 -static long long get_cur_time() {  
41 - // 获取操作系统当前时间点(精确到微秒)  
42 - chrono::time_point<chrono::system_clock, chrono::milliseconds> tpMicro  
43 - = chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now());  
44 - // (微秒精度的)时间点 => (微秒精度的)时间戳  
45 - return tpMicro.time_since_epoch().count(); 26 + RTPReceiver* self = (RTPReceiver*)param;
  27 + return self->OnPsProcess();
46 } 28 }
47 29
48 -RTPReceiver::RTPReceiver()  
49 -: m_localPort(kInvalidPort)  
50 -, m_bRtpExit(false)  
51 -, m_bPsExit(false)  
52 -, m_h264DataFunc(NULL)  
53 -, m_usrParam(NULL)  
54 -, m_bOpened(false)  
55 -, m_hVodEndFunc(NULL)  
56 -, m_LastPTS(-1) 30 +RTPReceiver::RTPReceiver()
  31 +:m_LastPTS(-1)
57 , m_LastIsKeyFrame(0) 32 , m_LastIsKeyFrame(0)
58 , m_SliceBuf(1024*1024) 33 , m_SliceBuf(1024*1024)
59 -, m_idleCount(-1)  
60 -,m_noDataCount(-1) 34 +, m_h264DataFunc(NULL)
  35 +, m_hVodEndFunc(NULL)
  36 +, m_usrParam(NULL)
  37 +, m_bPsExit(false)
  38 +, m_psThreadPtr(nullptr)
61 { 39 {
62 - m_LastStreamType=0; 40 + m_LastStreamType = 0;
  41 + recvTmpBuf = new BYTE[kVideoFrameSize];
63 } 42 }
64 43
65 -RTPReceiver::~RTPReceiver()  
66 -{  
67 - if (IsOpened())  
68 - Close();  
69 -  
70 - LOG_INFO("RTPReceiver::~RTPReceiver() destruct OK--{}", m_deviceID); 44 +RTPReceiver::~RTPReceiver(){
  45 + if(recvTmpBuf != nullptr){
  46 + delete[] recvTmpBuf;
  47 + }
71 } 48 }
72 49
73 void RTPReceiver::SetOutputCallback(CallBack_Stream cb, void* param) 50 void RTPReceiver::SetOutputCallback(CallBack_Stream cb, void* param)
@@ -82,376 +59,36 @@ void RTPReceiver::SetVodEndCallback(CallBack_VodFileEnd cb, void* param) @@ -82,376 +59,36 @@ void RTPReceiver::SetVodEndCallback(CallBack_VodFileEnd cb, void* param)
82 m_usrParam = param; 59 m_usrParam = param;
83 } 60 }
84 61
85 -int RTPReceiver::rtp_revc_thread_(void* param)  
86 -{  
87 - if (!param)  
88 - {  
89 - return -1;  
90 - }  
91 -  
92 - RTPReceiver* self = (RTPReceiver*)param;  
93 - return self->OnRtpRecv(); 62 +void RTPReceiver::SetRequestStreamCallback(CallBack_Request_Stream cb){
  63 + m_callback_request_stream = cb;
94 } 64 }
95 65
96 -int RTPReceiver::ps_demuxer_thread_(void* param)  
97 -{  
98 - if (!param)  
99 - {  
100 - return -1;  
101 - }  
102 -  
103 - RTPReceiver* self = (RTPReceiver*)param;  
104 - return self->OnPsProcess();  
105 -}  
106 -  
107 -int RTPReceiver::ps_decode_thread_(void* param)  
108 -{  
109 - if (!param)  
110 - {  
111 - return -1;  
112 - }  
113 -  
114 - RTPReceiver* self = (RTPReceiver*)param;  
115 - return self->OnDecodeProcess();  
116 -}  
117 -  
118 -bool RTPReceiver::Open(uint16_t localPort)  
119 -{  
120 - LOG_INFO("--2---RTPReceiver::Open--{}", m_deviceID);  
121 - m_localPort = localPort;  
122 -  
123 - RTPSessionParams sessparams;  
124 - sessparams.SetUsePollThread(true);  
125 - sessparams.SetMinimumRTCPTransmissionInterval(10);  
126 - sessparams.SetOwnTimestampUnit(1.0/90000.0);  
127 - sessparams.SetAcceptOwnPackets(true);  
128 -  
129 - RTPUDPv4TransmissionParams transparams;  
130 - transparams.SetPortbase(m_localPort);  
131 - transparams.SetRTPReceiveBuffer(kRtpRecvBufferSize);  
132 - cout << "port: " << m_localPort << endl;  
133 - LOG_INFO("--3---RTPReceiver::Open--{}", m_deviceID);  
134 -  
135 - int err = m_rtpSession.Create(sessparams, &transparams);  
136 - LOG_INFO("--4---RTPReceiver::Open--{}", m_deviceID);  
137 - if (err != 0)  
138 - {  
139 - LOG_INFO("RTPReceiver::Open m_rtpSession.Create error: {}--{}", err, m_deviceID);  
140 - return false;  
141 - }  
142 - LOG_INFO("--5---RTPReceiver::Open--{}", m_deviceID); 66 +int RTPReceiver::InitPS(){
  67 +
143 m_psParser.SetReceiveFunction(ReceivePESFunction, this); 68 m_psParser.SetReceiveFunction(ReceivePESFunction, this);
144 69
145 - m_bOpened = true;  
146 - LOG_INFO("RTPReceiver::Open ok--{}", m_deviceID);  
147 -  
148 - LOG_INFO("--1---RTPReceiver::Open--{}", m_deviceID);  
149 - m_rtpThread = std::thread(rtp_revc_thread_, this);  
150 - m_psThread = std::thread(ps_demuxer_thread_, this); 70 + m_psThreadPtr = new std::thread(ps_demuxer_thread_, this);
  71 + if(nullptr == m_psThreadPtr){
  72 + return -1;
  73 + }
151 74
152 - return true;  
153 -} 75 + LOG_INFO("[{}] InitPS finished", m_deviceID);
154 76
155 -bool RTPReceiver::IsOpened() const  
156 -{  
157 - return m_bOpened; 77 + return 0;
158 } 78 }
159 79
160 -void RTPReceiver::Close()  
161 -{  
162 - m_bRtpExit = true; 80 +void RTPReceiver::ClosePsThread(){
  81 + LOG_INFO("[{}] 3.", m_deviceID);
163 m_bPsExit = true; 82 m_bPsExit = true;
164 -  
165 - // rtp接收线程退出  
166 - if (m_rtpThread.joinable())  
167 - {  
168 - m_rtpThread.join();  
169 - }  
170 - m_rtpSession.Destroy();  
171 - LOG_INFO("--2---RTPReceiver::Close rtp recv thread quit --{}", m_deviceID);  
172 -  
173 // PS解包线程退出 83 // PS解包线程退出
174 - if (m_psThread.joinable()) 84 + if (m_psThreadPtr->joinable())
175 { 85 {
176 - m_psThread.join(); 86 + m_psThreadPtr->join();
  87 + delete m_psThreadPtr;
  88 + m_psThreadPtr = nullptr;
177 } 89 }
178 - LOG_INFO("--3---RTPReceiver::Close ps demux thread quit--{}", m_deviceID);  
179 -  
180 - m_bOpened = false;  
181 -}  
182 -  
183 -int RTPReceiver::GetPsFrameListSize()  
184 -{  
185 - std::lock_guard<std::mutex> l(m_psFrameMutex);  
186 - return m_psVideoFrames.size();  
187 -}  
188 -  
189 -void RTPReceiver::ClearPsVideoFrameList()  
190 -{  
191 - std::lock_guard<std::mutex> l(m_psFrameMutex);  
192 - while (!m_psVideoFrames.empty()) {  
193 - Frame* f = m_psVideoFrames.front();  
194 - delete f;  
195 - m_psVideoFrames.pop();  
196 - }  
197 - LOG_INFO("---->cleared ps video frame list!<----{}", m_deviceID);  
198 -}  
199 -  
200 -// 收RTP包线程  
201 -int RTPReceiver::OnRtpRecv()  
202 -{  
203 - uint32_t lastPts = 0;  
204 - uint64_t last_recv_ts{0};  
205 - int offset = 0;  
206 - int mark = 0;  
207 - BYTE* recvTmpBuf = new BYTE[kVideoFrameSize];  
208 - while (!m_bRtpExit)  
209 - {  
210 - //try  
211 - //{  
212 - m_rtpSession.Poll();  
213 - m_rtpSession.BeginDataAccess();  
214 - if (m_rtpSession.GotoFirstSourceWithData())  
215 - {  
216 - last_recv_ts = get_cur_time();  
217 - m_idleCount = 0;  
218 - m_noDataCount = 0;  
219 - do  
220 - {  
221 - RTPPacket* packet;  
222 - while ((packet = m_rtpSession.GetNextPacket()) != NULL/* && !mark*/)  
223 - {  
224 - do {  
225 - if (0 == packet->GetPayloadType())  
226 - {  
227 - // 音频数据, 暂不处理  
228 - break; // goto skip_this_packet;  
229 - }  
230 -  
231 - // 判断是否收到完整的帧(有些厂商打的marker标记不准, 所以只能看时间戳来判断)  
232 - uint32_t curPts = packet->GetTimestamp();  
233 - if (lastPts != 0 && curPts != lastPts) {  
234 - mark = 1;  
235 - }  
236 - lastPts = curPts;  
237 -  
238 - int payloadLen = packet->GetPayloadLength();  
239 - if (offset + payloadLen > kVideoFrameSize)  
240 - {  
241 - offset = 0, mark = 0;  
242 - break; // goto skip_this_packet;  
243 - }  
244 -  
245 - if (mark)  
246 - {  
247 - BYTE* frameBuf = (BYTE*)malloc(sizeof(BYTE) * offset);  
248 - if (!frameBuf) {  
249 - offset = 0, mark = 0;  
250 - break; // goto skip_this_packet;  
251 - }  
252 - memcpy(frameBuf, recvTmpBuf, offset);  
253 - if (!m_bPsExit)  
254 - {  
255 - std::lock_guard<std::mutex> l(m_psFrameMutex);  
256 - if (m_psVideoFrames.size() < 100)  
257 - {  
258 - m_psVideoFrames.push(new Frame(frameBuf, offset, false));  
259 - }  
260 - else {  
261 - free(frameBuf);  
262 - }  
263 - }  
264 - else  
265 - {  
266 - //若此时解码线程已经退出,不再往m_psVideoFrames推送帧,且退出当前线程  
267 - free(frameBuf);  
268 - LOG_INFO("OnPsProcess quit, device_id:{}", m_deviceID);  
269 - //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "recv video stream interruption!");  
270 - }  
271 - offset = 0;  
272 - mark = 0;  
273 - }  
274 -  
275 - memcpy(recvTmpBuf + offset, packet->GetPayloadData(), payloadLen);  
276 - offset += payloadLen;  
277 - } while (0);  
278 - //skip_this_packet:  
279 - m_rtpSession.DeletePacket(packet);  
280 - }  
281 - } while (m_rtpSession.GotoNextSourceWithData());  
282 - }  
283 - //else {  
284 - // if (m_idleCount != -1)  
285 - // {  
286 - // ++m_idleCount;//流中断计数  
287 - // }  
288 - // if (m_noDataCount != 0)  
289 - // {  
290 - // --m_noDataCount;//没流计数  
291 - // }  
292 - // //if (m_idleCount > 3000) {  
293 - // // m_hVodEndFunc(m_usrParam);  
294 - // // m_idleCount = 0;  
295 - // //历史流结束的时候,也会出现超时,这个是正常的  
296 - // if(m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD)  
297 - // {  
298 - // if (m_idleCount > 10000)  
299 - // {  
300 - // //这里要判断下历史流是否结束,如果未结束,就设置为流中断  
301 - // //由于record_stream_status这个函数返回值不准确,所以增加一个进度条大于80%  
302 - // if(record_stream_status(((VideoSession *)GetUsrParam())->streamHandle()))  
303 - // {  
304 - // LOG_INFO("************Record stream is finished**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress());  
305 - // m_idleCount = -1;  
306 - // m_hVodEndFunc(m_usrParam);  
307 - // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());  
308 - // ((VideoSession *)GetUsrParam())->streamHandle().clear();  
309 - // }  
310 - // else  
311 - // {  
312 - // //如果此时进度大于80% 算完成吧  
313 - // if(((VideoSession *)GetUsrParam())->progress() > 0.80)  
314 - // {  
315 - // LOG_INFO("************Record stream is overtime**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress());  
316 -  
317 - // m_idleCount = 0;  
318 - // m_hVodEndFunc(m_usrParam);  
319 - // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());  
320 - // ((VideoSession *)GetUsrParam())->streamHandle().clear();  
321 - // }  
322 - // else  
323 - // {  
324 - // m_idleCount = -1;  
325 - // //LOG_ERROR("************post ERROR_REALSTREAM_INTERRUPT to structure****{}********", m_deviceID);  
326 - // //发送流中断  
327 - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption!");  
328 - // }  
329 - // }  
330 - //  
331 - //  
332 - // }  
333 - //  
334 - // if (m_noDataCount < -200000)//任务开始时没收到流  
335 - // {  
336 - // //LOG_ERROR("************m_hVodEndFunc(m_usrParam)!!!m_hVodEndFunc(m_usrParam)********{}******", m_deviceID);  
337 - // m_noDataCount = -1;  
338 -  
339 - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption2!");  
340 - // //m_hVodEndFunc(m_usrParam);  
341 - // }  
342 - // }  
343 - // else//实时任务断流  
344 - // //if (m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL)  
345 - // {  
346 - //  
347 - // //每超过3000次,发送一次send_vedio_eof 时长大约1.5s  
348 - // //若是30000,时长大约 18s  
349 - // if(m_idleCount > 30000)  
350 - // {  
351 - // uint64_t cts = get_cur_time();  
352 - // float duration_not_recv = (cts - last_recv_ts) / 1000.0;  
353 - //  
354 - // //LOG_ERROR("************I haven't got stream from hik gateway exceed {}s,send eof********{}******", duration_not_recv, m_deviceID);  
355 - // m_idleCount = -1;  
356 -  
357 - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption!");  
358 - // }  
359 - //  
360 - // if (m_noDataCount < -200000)//任务开始时没收到流  
361 - // {  
362 - // //LOG_ERROR("************m_noDataCount < -200000********{}******", m_deviceID);  
363 - // m_noDataCount = -1;  
364 -  
365 - // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption2!");  
366 - // }  
367 - //  
368 - // }  
369 - //}  
370 - //}  
371 - // catch (GeneralException2& e)  
372 - //{  
373 - // //LOG_ERROR("---> video streaming interruption!<---{}, error: {}", m_deviceID, e.err_msg());  
374 -  
375 - // byte_buffer bb(64);  
376 - // bb << VasCmd::VAS_CMD_REALSTREAM_INTERRUPT << e.err_msg();  
377 -  
378 - // if (m_usrParam)  
379 - // {  
380 - // if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) {  
381 - // try {  
382 - // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size());  
383 - // }  
384 - // catch (GeneralException2& e) {  
385 - // //LOG_ERROR("[{}] send vas cmd VAS_CMD_REALSTREAM_INTERRUPT error: {}, {}", m_deviceID, e.err_code(), e.err_str());  
386 - // }  
387 - // }  
388 -  
389 - // //通知网关关闭句柄  
390 - // if(!((VideoSession *)GetUsrParam())->streamHandle().empty())  
391 - // {  
392 -  
393 - // LOG_INFO("---->Notify hisense gateway release handle = {} !<----{}", ((VideoSession *)GetUsrParam())->streamHandle().c_str(), m_deviceID);  
394 - // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL)  
395 - // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());  
396 - //  
397 - // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD)  
398 - // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());  
399 - //  
400 - // //清理保活的句柄  
401 - // ((VideoSession *)GetUsrParam())->streamHandle().clear();  
402 - // }  
403 - // }  
404 - //  
405 - // bb.bset(0);  
406 - //  
407 - //}  
408 - m_rtpSession.EndDataAccess();  
409 - RTPTime::Wait(RTPTime(0, 500));  
410 - }  
411 -  
412 - delete [] recvTmpBuf;  
413 -  
414 - return 0;  
415 -}  
416 -  
417 -// 解PS包线程  
418 -int RTPReceiver::OnPsProcess()  
419 -{  
420 - while (!m_bPsExit) {  
421 - m_psFrameMutex.lock();  
422 - if (m_psVideoFrames.size() <= 0){  
423 - m_psFrameMutex.unlock();  
424 - std::this_thread::sleep_for(std::chrono::milliseconds(10));  
425 - continue;  
426 - }  
427 - Frame* frame = m_psVideoFrames.front();  
428 - m_psVideoFrames.pop();  
429 - m_psFrameMutex.unlock();  
430 - if (frame != nullptr)  
431 - {  
432 - int nRet = m_psParser.AddData(frame->buf_, frame->len_);  
433 - if (nRet == -1)  
434 - {  
435 - LOG_INFO("m_psParser return -1--{}", m_deviceID);  
436 - }  
437 - else if (nRet == -2)  
438 - {  
439 - LOG_INFO("m_psParser return -2--{}", m_deviceID);  
440 - }  
441 - else if (nRet == -3)  
442 - {  
443 - LOG_INFO("m_psParser return -3--{}", m_deviceID);  
444 - }  
445 -  
446 - delete frame;  
447 - frame = nullptr;  
448 - }  
449 -  
450 - }  
451 -  
452 - ClearPsVideoFrameList();  
453 -  
454 - return 0; 90 +
  91 + LOG_INFO("[{}] ps demux thread quit", m_deviceID);
455 } 92 }
456 93
457 // 处理去除了PS头的数据 94 // 处理去除了PS头的数据
@@ -531,7 +168,129 @@ void RTPReceiver::OnPsDemux(unsigned char streamId, BYTE *data, int len, bool ke @@ -531,7 +168,129 @@ void RTPReceiver::OnPsDemux(unsigned char streamId, BYTE *data, int len, bool ke
531 m_SliceBuf.add((char*)data, len); 168 m_SliceBuf.add((char*)data, len);
532 } 169 }
533 170
534 -int RTPReceiver::OnDecodeProcess() 171 +// 解PS包线程
  172 +int RTPReceiver::OnPsProcess()
535 { 173 {
  174 + LOG_INFO("[{}] started.", m_deviceID);
  175 + while (!m_bPsExit) {
  176 + m_psFrameMutex.lock();
  177 + LOG_DEBUG("[{}] PS frame size : {}", m_deviceID, m_psVideoFrames.size());
  178 + if (m_psVideoFrames.size() <= 0){
  179 + m_psFrameMutex.unlock();
  180 + std::this_thread::sleep_for(std::chrono::milliseconds(10));
  181 + continue;
  182 + }
  183 + Frame* frame = m_psVideoFrames.front();
  184 + m_psVideoFrames.pop();
  185 + m_psFrameMutex.unlock();
  186 + if (frame != nullptr)
  187 + {
  188 + int nRet = m_psParser.AddData(frame->buf_, frame->len_);
  189 + if (nRet == -1)
  190 + {
  191 + LOG_INFO("m_psParser return -1--{}", m_deviceID);
  192 + }
  193 + else if (nRet == -2)
  194 + {
  195 + LOG_INFO("m_psParser return -2--{}", m_deviceID);
  196 + }
  197 + else if (nRet == -3)
  198 + {
  199 + LOG_INFO("m_psParser return -3--{}", m_deviceID);
  200 + }
  201 +
  202 + delete frame;
  203 + frame = nullptr;
  204 + }
  205 + }
  206 +
  207 + ClearPsVideoFrameList();
  208 +
  209 + m_hVodEndFunc(m_usrParam);
  210 +
  211 + LOG_INFO("[{}] exited.", m_deviceID);
  212 +
536 return 0; 213 return 0;
537 } 214 }
  215 +
  216 +void RTPReceiver::SetDeviceID(string deviceID){
  217 + m_deviceID = deviceID;
  218 +}
  219 +
  220 +int RTPReceiver::GetPsFrameListSize()
  221 +{
  222 + std::lock_guard<std::mutex> l(m_psFrameMutex);
  223 + return m_psVideoFrames.size();
  224 +}
  225 +
  226 +void RTPReceiver::ClearPsVideoFrameList()
  227 +{
  228 + std::lock_guard<std::mutex> l(m_psFrameMutex);
  229 + while (!m_psVideoFrames.empty()) {
  230 + Frame* f = m_psVideoFrames.front();
  231 + delete f;
  232 + m_psVideoFrames.pop();
  233 + }
  234 + LOG_INFO("[{}] cleared ps video frame list!", m_deviceID);
  235 +}
  236 +
  237 +int RTPReceiver::ParsePacket(RTPPacket* packet){
  238 + do {
  239 +
  240 + if (0 == packet->GetPayloadType())
  241 + {
  242 + // 音频数据, 暂不处理
  243 + break;
  244 + }
  245 +
  246 + // 判断是否收到完整的帧(有些厂商打的marker标记不准, 所以只能看时间戳来判断)
  247 + uint32_t curPts = packet->GetTimestamp();
  248 + if (lastPts != 0 && curPts != lastPts) {
  249 + mark = 1;
  250 + }
  251 + lastPts = curPts;
  252 +
  253 + int payloadLen = packet->GetPayloadLength();
  254 + if (offset + payloadLen > kVideoFrameSize)
  255 + {
  256 + offset = 0, mark = 0;
  257 + break;
  258 + }
  259 +
  260 + LOG_DEBUG("[{}] ParsePacket GetPayloadLength", m_deviceID);
  261 +
  262 + if (mark)
  263 + {
  264 + BYTE* frameBuf = (BYTE*)malloc(sizeof(BYTE) * offset);
  265 + if (!frameBuf) {
  266 + offset = 0, mark = 0;
  267 + break;
  268 + }
  269 + memcpy(frameBuf, recvTmpBuf, offset);
  270 + if (!m_bPsExit){
  271 + std::lock_guard<std::mutex> l(m_psFrameMutex);
  272 + if (m_psVideoFrames.size() < 100)
  273 + {
  274 + LOG_DEBUG("[{}]ParsePacket push", m_deviceID);
  275 + m_psVideoFrames.push(new Frame(frameBuf, offset, false));
  276 + }
  277 + else {
  278 + free(frameBuf);
  279 + }
  280 + }
  281 + else{
  282 + //若此时解码线程已经退出,不再往m_psVideoFrames推送帧,且退出当前线程
  283 + free(frameBuf);
  284 + LOG_INFO("ParsePacket quit, device_id:{}", m_deviceID);
  285 + return 1;
  286 + }
  287 + offset = 0;
  288 + mark = 0;
  289 + }
  290 +
  291 + memcpy(recvTmpBuf + offset, packet->GetPayloadData(), payloadLen);
  292 + offset += payloadLen;
  293 + } while (0);
  294 +
  295 + return 0;
  296 +}
538 \ No newline at end of file 297 \ No newline at end of file
src/gb28181/RTPReceiver.h
1 -#ifndef _RTP_RECEIVER_H_  
2 -#define _RTP_RECEIVER_H_  
3 -  
4 -  
5 -#include "demuxer.h"  
6 -#include "buffer.h"  
7 -  
8 -  
9 -#include "rtpudpv4transmitter.h"  
10 -#include "rtpipv4address.h"  
11 -#include "rtpsessionparams.h"  
12 -#include "rtpsession.h"  
13 -#include "rtppacket.h"  
14 -#include <queue>  
15 -#include <iostream>  
16 -#include <atomic>  
17 -#include <thread>  
18 -#include <string>  
19 -#include <mutex>  
20 -  
21 -  
22 -#define OUTTIME_RTCP 30*1000  
23 -#define PAYLOAD 99  
24 -#define PAYLOAD_PS 96  
25 -#define PAYLOAD_H264 98  
26 -#define PAYLOAD_MP4 97  
27 -  
28 -#define UDP_SIZE 1400  
29 -#define MIN_PORT 10000  
30 -#define MAX_PORT 60000  
31 -#define RTP_MAX_PACKET_LEN 1450  
32 -  
33 -using namespace jrtplib;  
34 -using namespace std;  
35 -  
36 -typedef unsigned char BYTE;  
37 -  
38 -/** 视频数据回调  
39 -*  
40 -* @param videoType [in] 视频类型 音频-0xC0、h264-0x1B、MPEG4-0x01、SVAC-0x80  
41 -* @param data [in] 视频数据  
42 -* @param len [in] 视频数据长度  
43 -* @param isKey [in] 是否为关键帧  
44 -* @param pts [in] 时间戳  
45 -*/  
46 -typedef void(*CallBack_Stream)(void* userdata, int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts);  
47 -  
48 -/** 录像回放完成回调消息通知  
49 -*/  
50 -typedef void(*CallBack_VodFileEnd)(void* userdata);  
51 -  
52 -int AllocRtpPort(void);  
53 -  
54 -class MyRTPSession : public RTPSession  
55 -{  
56 -public:  
57 - MyRTPSession() {}  
58 - virtual ~MyRTPSession() {}  
59 -  
60 -private:  
61 - virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress)  
62 - {  
63 - AddDestination(*senderAddress);  
64 - }  
65 -  
66 - virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress)  
67 - {  
68 - //AddDestination(*senderaddress);  
69 - //const char* name = "hi~";  
70 - //SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8);  
71 -  
72 - //printf("send rtcp app");  
73 - }  
74 -};  
75 -  
76 -// 标识帧, 注意buffer需要自己开辟和释放  
77 -struct Frame {  
78 - Frame() { buf_ = NULL; len_ = 0; }  
79 - ~Frame() {  
80 - if (buf_ != nullptr)  
81 - {  
82 - free(buf_);  
83 - buf_ = nullptr;  
84 - }  
85 - }  
86 - Frame(BYTE* buf, int len, bool key) : buf_(buf), len_(len), key_(key) {}  
87 - BYTE* buf_;  
88 - int len_;  
89 - bool key_{};  
90 -};  
91 -  
92 -class FrameToDecode  
93 -{  
94 -public:  
95 - FrameToDecode()  
96 - : m_SliceBuf(0)  
97 - , m_localPts(0)  
98 - , m_LastPTS(-1)  
99 - , m_LastIsKeyFrame(0) {}  
100 - FrameToDecode(unsigned char m_streamId)  
101 - : m_SliceBuf(0)  
102 - , m_localPts(0)  
103 - , m_LastPTS(-1)  
104 - , m_LastIsKeyFrame(0)  
105 - , m_streamId (m_streamId) {}  
106 -  
107 - void operator=(FrameToDecode &temp)  
108 - {  
109 - m_SliceBuf = temp.m_SliceBuf;  
110 - m_streamId = temp.m_streamId;  
111 - m_localPts = temp.m_localPts;  
112 - m_LastPTS = temp.m_LastPTS;  
113 - m_LastIsKeyFrame = temp.m_LastIsKeyFrame;  
114 - }  
115 -  
116 - CBuffer m_SliceBuf;  
117 - unsigned char m_streamId{};  
118 - uint64_t m_localPts;  
119 - uint64_t m_LastPTS;  
120 - bool m_LastIsKeyFrame;  
121 -};  
122 -  
123 -  
124 -class RTPReceiver  
125 -{  
126 - RTPReceiver(const RTPReceiver& other);  
127 - RTPReceiver& operator= (const RTPReceiver& other);  
128 -  
129 -public:  
130 - RTPReceiver();  
131 - ~RTPReceiver();  
132 -  
133 - bool Open(uint16_t localPort);  
134 - bool IsOpened() const;  
135 - void Close();  
136 -  
137 - int GetPsFrameListSize();  
138 - void ClearPsVideoFrameList();  
139 -  
140 - void OnPsDemux(unsigned char streamId, BYTE *data, int len, bool key, uint64_t pts, uint64_t localPts);  
141 -  
142 - void SetOutputCallback(CallBack_Stream cb, void* param);  
143 - void SetVodEndCallback(CallBack_VodFileEnd cb, void* param);  
144 - CallBack_VodFileEnd GetVodEndFunc(){ return m_hVodEndFunc; }  
145 -  
146 - void *GetUsrParam(){ return m_usrParam; }  
147 - void SetDeviceID(string deviceID){this->m_deviceID = deviceID; }  
148 -  
149 -private:  
150 - static int rtp_revc_thread_(void* param);  
151 - static int ps_demuxer_thread_(void* param);  
152 - static int ps_decode_thread_(void* param);  
153 -  
154 - int OnRtpRecv();  
155 - int OnPsProcess();  
156 - int OnDecodeProcess();  
157 -  
158 -private:  
159 - std::thread m_rtpThread; // RTP接收线程  
160 - std::thread m_psThread; // PS解包线程  
161 -  
162 - uint16_t m_localPort; // RTP接收端口  
163 - MyRTPSession m_rtpSession; // RTP会话  
164 - std::atomic_bool m_bRtpExit; // 标识RTP收包线程闭  
165 - std::atomic_bool m_bPsExit; // 标识PS解包线程关闭  
166 - std::queue<Frame*> m_psVideoFrames;  
167 - mutex m_psFrameMutex;  
168 -  
169 - CMpeg2Demux m_psParser;  
170 -  
171 - void* m_usrParam;  
172 - std::atomic_bool m_bOpened;  
173 -  
174 - CallBack_Stream m_h264DataFunc; // 视频流回调  
175 - CallBack_VodFileEnd m_hVodEndFunc; // 录像流结束回调  
176 -  
177 - CBuffer m_SliceBuf;  
178 - uint64_t m_LastPTS;  
179 - bool m_LastIsKeyFrame;  
180 - unsigned char m_LastStreamType;  
181 - int64_t m_idleCount;  
182 - int64_t m_noDataCount;//线程计数,用于打开流成功但是实际没流过来  
183 -  
184 - string m_deviceID;  
185 - int64_t m_notToDecodCount{0};//线程计数,用来代表多长时间没有调用解码回调,针对大华相机  
186 -};  
187 -  
188 -#endif // _RTP_RECEIVER_H_ 1 +#ifndef _RTP_RECEIVER_H_
  2 +#define _RTP_RECEIVER_H_
  3 +
  4 +#include "buffer.h"
  5 +#include "demuxer.h"
  6 +#include "rtppacket.h"
  7 +#include <stdint.h>
  8 +#include <mutex>
  9 +#include <queue>
  10 +#include <atomic>
  11 +#include <thread>
  12 +
  13 +typedef unsigned char BYTE;
  14 +
  15 +using namespace jrtplib;
  16 +using namespace std;
  17 +
  18 +/** 视频数据回调
  19 +*
  20 +* @param videoType [in] 视频类型 音频-0xC0、h264-0x1B、MPEG4-0x01、SVAC-0x80
  21 +* @param data [in] 视频数据
  22 +* @param len [in] 视频数据长度
  23 +* @param isKey [in] 是否为关键帧
  24 +* @param pts [in] 时间戳
  25 +*/
  26 +typedef void(*CallBack_Stream)(void* userdata, int videoType, char* data, int len, int isKey, uint64_t pts, uint64_t localPts);
  27 +
  28 +/** 录像回放完成回调消息通知
  29 +*/
  30 +typedef void(*CallBack_VodFileEnd)(void* userdata);
  31 +
  32 +/**
  33 + * 请求流
  34 +*/
  35 +typedef bool(*CallBack_Request_Stream)();
  36 +
  37 +// 标识帧, 注意buffer需要自己开辟和释放
  38 +struct Frame {
  39 + Frame() { buf_ = NULL; len_ = 0; }
  40 + ~Frame() {
  41 + if (buf_ != nullptr)
  42 + {
  43 + free(buf_);
  44 + buf_ = nullptr;
  45 + }
  46 + }
  47 + Frame(BYTE* buf, int len, bool key) : buf_(buf), len_(len), key_(key) {}
  48 + BYTE* buf_;
  49 + int len_;
  50 + bool key_{};
  51 +};
  52 +
  53 +class FrameToDecode
  54 +{
  55 +public:
  56 + FrameToDecode()
  57 + : m_SliceBuf(0)
  58 + , m_localPts(0)
  59 + , m_LastPTS(-1)
  60 + , m_LastIsKeyFrame(0) {}
  61 + FrameToDecode(unsigned char m_streamId)
  62 + : m_SliceBuf(0)
  63 + , m_localPts(0)
  64 + , m_LastPTS(-1)
  65 + , m_LastIsKeyFrame(0)
  66 + , m_streamId (m_streamId) {}
  67 +
  68 + void operator=(FrameToDecode &temp)
  69 + {
  70 + m_SliceBuf = temp.m_SliceBuf;
  71 + m_streamId = temp.m_streamId;
  72 + m_localPts = temp.m_localPts;
  73 + m_LastPTS = temp.m_LastPTS;
  74 + m_LastIsKeyFrame = temp.m_LastIsKeyFrame;
  75 + }
  76 +
  77 + CBuffer m_SliceBuf;
  78 + unsigned char m_streamId{};
  79 + uint64_t m_localPts;
  80 + uint64_t m_LastPTS;
  81 + bool m_LastIsKeyFrame;
  82 +};
  83 +
  84 +class RTPReceiver{
  85 +
  86 +public:
  87 + RTPReceiver();
  88 + ~RTPReceiver();
  89 +
  90 + virtual bool Open(uint16_t localPort) = 0;
  91 + virtual bool IsOpened() = 0;
  92 + virtual void Close() = 0;
  93 +
  94 + void SetVodEndCallback(CallBack_VodFileEnd cb, void* param);
  95 +
  96 + void SetOutputCallback(CallBack_Stream cb, void* param);
  97 +
  98 + void SetRequestStreamCallback(CallBack_Request_Stream cb);
  99 +
  100 + void SetDeviceID(string deviceID);
  101 +
  102 + int GetPsFrameListSize();
  103 +
  104 +public:
  105 + void OnPsDemux(unsigned char streamId, BYTE *data, int len, bool key, uint64_t pts, uint64_t localPts);
  106 + int OnPsProcess();
  107 + void ClearPsVideoFrameList();
  108 + int ParsePacket(RTPPacket* packet);
  109 +
  110 +public:
  111 + int InitPS();
  112 + void ClosePsThread();
  113 + void *GetUsrParam(){ return m_usrParam; }
  114 +
  115 +public:
  116 + CBuffer m_SliceBuf;
  117 + uint64_t m_LastPTS;
  118 + bool m_LastIsKeyFrame;
  119 + unsigned char m_LastStreamType;
  120 +
  121 + int64_t m_notToDecodCount{0};//线程计数,用来代表多长时间没有调用解码回调,针对大华相机
  122 +
  123 + void* m_usrParam;
  124 + CallBack_Stream m_h264DataFunc; // 视频流回调
  125 +
  126 + std::queue<Frame*> m_psVideoFrames;
  127 + mutex m_psFrameMutex;
  128 +
  129 + string m_deviceID;
  130 +
  131 + CMpeg2Demux m_psParser;
  132 + std::atomic_bool m_bPsExit; // 标识PS解包线程关闭
  133 +
  134 + uint32_t lastPts{0};
  135 + uint64_t last_recv_ts{0};
  136 + int offset{0};
  137 + int mark{0};
  138 + BYTE* recvTmpBuf{nullptr};
  139 +
  140 + std::thread* m_psThreadPtr; // PS解包线程
  141 +
  142 + CallBack_VodFileEnd m_hVodEndFunc; // 录像流结束回调
  143 + CallBack_Request_Stream m_callback_request_stream; //请求流回调
  144 +};
  145 +
  146 +#endif // _RTP_RECEIVER_H_
189 \ No newline at end of file 147 \ No newline at end of file
src/gb28181/RTPTcpReceiver.cpp 0 → 100644
  1 +#include"RTPTcpReceiver.h"
  2 +#include "../logger.hpp"
  3 +
  4 +
  5 +static long long get_cur_time() {
  6 +
  7 + chrono::time_point<chrono::system_clock, chrono::milliseconds> tpMicro
  8 + = chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now());
  9 +
  10 + return tpMicro.time_since_epoch().count();
  11 +}
  12 +
  13 +// class TcpRTPSession : public RTPSession
  14 +// {
  15 +// public:
  16 +// void setReceiver(RTPTcpReceiver* r){
  17 +// tcpReceiver = r;
  18 +// }
  19 +
  20 +// protected:
  21 +// void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled)
  22 +// {
  23 +// // printf("SSRC %x Got packet (%d bytes) in OnValidatedRTPPacket from source 0x%04x!\n", GetLocalSSRC(),
  24 +// // (int)rtppack->GetPayloadLength(), srcdat->GetSSRC());
  25 +
  26 +// LOG_DEBUG("SSRC {} Got packet ({} bytes) in OnValidatedRTPPacket from source {}}!\n", GetLocalSSRC(),
  27 +// (int)rtppack->GetPayloadLength(), srcdat->GetSSRC());
  28 +
  29 +// if(nullptr != tcpReceiver){
  30 +// tcpReceiver->ParsePacket(rtppack);
  31 +// }
  32 +// DeletePacket(rtppack);
  33 +// *ispackethandled = true;
  34 +// }
  35 +
  36 +// void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength)
  37 +// {
  38 +// char msg[1024];
  39 +
  40 +// memset(msg, 0, sizeof(msg));
  41 +// if (itemlength >= sizeof(msg))
  42 +// itemlength = sizeof(msg)-1;
  43 +
  44 +// memcpy(msg, itemdata, itemlength);
  45 +// // printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC());
  46 +// LOG_DEBUG("SSRC {} Received SDES item ({}): {} from SSRC {}\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC());
  47 +// }
  48 +
  49 +// private:
  50 +// RTPTcpReceiver* tcpReceiver{nullptr};
  51 +// };
  52 +
  53 +class MyTCPTransmitter : public RTPTCPTransmitter
  54 +{
  55 +public:
  56 + void setReceiver(RTPTcpReceiver* r){
  57 + tcpReceiver = r;
  58 + }
  59 +
  60 +public:
  61 + MyTCPTransmitter() : RTPTCPTransmitter(0){ }
  62 +
  63 + void OnSendError(SocketType sock)
  64 + {
  65 + LOG_ERROR("Error sending over socket {}, removing destination", sock);
  66 + DeleteDestination(RTPTCPAddress(sock));
  67 + if(nullptr != tcpReceiver && !tcpReceiver->isClosing()){
  68 + tcpReceiver->RequestStream();
  69 + }
  70 + }
  71 +
  72 + void OnReceiveError(SocketType sock)
  73 + {
  74 + LOG_ERROR("Error receiving over socket {}, removing destination", sock);
  75 + DeleteDestination(RTPTCPAddress(sock));
  76 + }
  77 +
  78 +private:
  79 + RTPTcpReceiver* tcpReceiver{nullptr};
  80 +};
  81 +
  82 +static int rtp_revc_thread_(void* param)
  83 +{
  84 + if (!param)
  85 + {
  86 + return -1;
  87 + }
  88 +
  89 + RTPTcpReceiver* self = (RTPTcpReceiver*)param;
  90 + return self->OnRtpRecv();
  91 +}
  92 +
  93 +
  94 +RTPTcpReceiver::RTPTcpReceiver()
  95 +: m_bRtpExit(false)
  96 +, m_bOpened(false)
  97 +, m_idleCount(-1)
  98 +, m_noDataCount(-1)
  99 +, m_nListener(-1)
  100 +, m_bAccepted(false)
  101 +, m_bClosing(false)
  102 +{
  103 + m_rtpSessionPtr = new RTPSession();
  104 + m_pSessparams = new RTPSessionParams();
  105 + m_pTrans = new MyTCPTransmitter();
  106 +}
  107 +
  108 +RTPTcpReceiver::~RTPTcpReceiver(){
  109 + if (IsOpened())
  110 + Close();
  111 +
  112 + if(m_rtpSessionPtr != nullptr){
  113 + delete m_rtpSessionPtr;
  114 + m_rtpSessionPtr = nullptr;
  115 + }
  116 +
  117 + if(m_pSessparams != nullptr){
  118 + delete m_pSessparams;
  119 + m_pSessparams = nullptr;
  120 + }
  121 +
  122 + if(m_pTrans != nullptr){
  123 + delete m_pTrans;
  124 + m_pTrans = nullptr;
  125 + }
  126 +}
  127 +
  128 +bool RTPTcpReceiver::Open(uint16_t localPort){
  129 + if(0 != initSession(localPort)){
  130 + return false;
  131 + }
  132 +
  133 + m_bOpened = true;
  134 +
  135 + LOG_INFO("[{}] started.", m_deviceID);
  136 +
  137 + return true;
  138 +}
  139 +
  140 +bool RTPTcpReceiver::IsOpened(){
  141 + LOG_INFO("[{}] isopng:{} ", m_deviceID, m_bOpened);
  142 + return m_bOpened;
  143 +}
  144 +
  145 +void RTPTcpReceiver::Close(){
  146 +
  147 + m_bClosing = true;
  148 +
  149 + m_bAccepted = true;
  150 + m_bRtpExit = true;
  151 +
  152 + LOG_DEBUG("[{}] 1.", m_deviceID);
  153 +
  154 + // rtp接收线程退出
  155 + if (m_rtpThread.joinable())
  156 + {
  157 + m_rtpThread.join();
  158 + }
  159 +
  160 + LOG_DEBUG("[{}] 2.", m_deviceID);
  161 +
  162 + ClosePsThread();
  163 +
  164 + m_bOpened = false;
  165 +
  166 + LOG_INFO("[{}] closed.", m_deviceID);
  167 +}
  168 +
  169 +bool RTPTcpReceiver::isClosing(){
  170 + return m_bClosing;
  171 +}
  172 +
  173 +int RTPTcpReceiver::initSession(int localPort){
  174 + m_nListener = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
  175 + if (m_nListener < 0)
  176 + {
  177 + return -1;
  178 + }
  179 +
  180 + sockaddr_in serverAddr;
  181 + memset(&serverAddr, 0, sizeof(sockaddr_in));
  182 + serverAddr.sin_family = AF_INET;
  183 + serverAddr.sin_port = htons(localPort);
  184 + serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
  185 + int nRet = bind(m_nListener, (sockaddr*)&serverAddr, sizeof(serverAddr));
  186 + if (nRet == -1)
  187 + {
  188 + LOG_ERROR("[{}] 绑定端口失败: {}", m_deviceID, localPort);
  189 + return -1;
  190 + }
  191 +
  192 + if (listen(m_nListener, 1) == -1)
  193 + {
  194 + LOG_ERROR("[{}] listen 失败", m_deviceID);
  195 + return -1;
  196 + }
  197 +
  198 + int nPackSize = 45678;
  199 + m_pSessparams->SetProbationType(RTPSources::NoProbation);
  200 + m_pSessparams->SetOwnTimestampUnit(90000.0 / 25.0);
  201 + m_pSessparams->SetMaximumPacketSize(nPackSize + 64);
  202 +
  203 + int status = m_pTrans->Init(false);
  204 + status = m_pTrans->Create(65535, NULL);
  205 + m_pTrans->setReceiver(this);
  206 +
  207 + status = m_rtpSessionPtr->Create(*m_pSessparams, m_pTrans);
  208 + if (status < 0)
  209 + {
  210 + LOG_ERROR("[{}] create session error!!", m_deviceID);
  211 + return -1;
  212 + }
  213 +
  214 + m_rtpThread = std::thread(rtp_revc_thread_, this);
  215 +
  216 + InitPS();
  217 +
  218 + bool bRet = RequestStream();
  219 + if (!bRet)
  220 + {
  221 + LOG_INFO("[{}] 请求流失败!", m_deviceID);
  222 + return -1;
  223 + }
  224 +
  225 + LOG_INFO("[{}] 初始化成功, congratulations !!!", m_deviceID);
  226 +
  227 + return 0;
  228 +}
  229 +
  230 +int RTPTcpReceiver::OnRtpRecv()
  231 +{
  232 + if(nullptr == m_rtpSessionPtr){
  233 + return -1;
  234 + }
  235 +
  236 + LOG_INFO("[{}] OnRtpRecv started, m_nListener : {}", m_deviceID, m_nListener);
  237 +
  238 + sockaddr_in clientAddr;
  239 + int nLen = sizeof(sockaddr_in);
  240 + SocketType nServer = -1;
  241 +
  242 + LOG_INFO("[{}] Poll started.", m_deviceID);
  243 + int status = -1;
  244 + while(!m_bRtpExit){
  245 + while(!m_bAccepted){
  246 + LOG_DEBUG("[{}] accepting...", m_deviceID);
  247 + nServer = accept(m_nListener, (sockaddr*)&clientAddr, (socklen_t * ) &nLen);
  248 + if (-1 == nServer){
  249 + std::this_thread::sleep_for(std::chrono::milliseconds(10));
  250 + continue;
  251 + }
  252 + m_rtpSessionPtr->AddDestination(RTPTCPAddress(nServer));
  253 + m_bAccepted = true;
  254 +
  255 + LOG_INFO("[{}] nServer={}", m_deviceID, nServer);
  256 + break;
  257 + }
  258 +
  259 + m_rtpSessionPtr->BeginDataAccess();
  260 + if (m_rtpSessionPtr->GotoFirstSourceWithData())
  261 + {
  262 + do
  263 + {
  264 + RTPPacket *pack;
  265 +
  266 + while ((pack = m_rtpSessionPtr->GetNextPacket()) != NULL)
  267 + {
  268 + LOG_DEBUG("[{}] time: {} ", m_deviceID, get_cur_time());
  269 + ParsePacket(pack);
  270 +
  271 + m_rtpSessionPtr->DeletePacket(pack);
  272 + }
  273 + } while (m_rtpSessionPtr->GotoNextSourceWithData());
  274 + }
  275 +
  276 + m_rtpSessionPtr->EndDataAccess();
  277 +
  278 + m_rtpSessionPtr->Poll();
  279 + std::this_thread::sleep_for(std::chrono::milliseconds(10));
  280 + }
  281 +
  282 + m_rtpSessionPtr->Destroy();
  283 +
  284 + if(nServer > 0){
  285 + close(nServer);
  286 + }
  287 + if(m_nListener > 0){
  288 + close(m_nListener);
  289 + }
  290 +
  291 + LOG_INFO("[{}] OnRtpRecv exited.", m_deviceID);
  292 +
  293 + return 0;
  294 +}
  295 +
  296 +bool RTPTcpReceiver::RequestStream(){
  297 + bool bConnect = m_callback_request_stream();
  298 + if(!bConnect){
  299 + Close();
  300 + return false;
  301 + }
  302 + m_bAccepted = false;
  303 +
  304 + return true;
  305 +}
0 \ No newline at end of file 306 \ No newline at end of file
src/gb28181/RTPTcpReceiver.h 0 → 100644
  1 +#ifndef _RTP_TCP_RECEIVER_H_
  2 +#define _RTP_TCP_RECEIVER_H_
  3 +
  4 +
  5 +#include "demuxer.h"
  6 +#include "buffer.h"
  7 +
  8 +#include "rtpsession.h"
  9 +#include "rtptcptransmitter.h"
  10 +#include "rtpipv4address.h"
  11 +#include "rtptcpaddress.h"
  12 +#include "rtpsessionparams.h"
  13 +#include "rtperrors.h"
  14 +#include "rtpsourcedata.h"
  15 +#include "rtpsocketutil.h"
  16 +#include <stdlib.h>
  17 +#include <stdio.h>
  18 +#include <iostream>
  19 +#include <string>
  20 +
  21 +#include <queue>
  22 +#include <atomic>
  23 +#include <thread>
  24 +#include <mutex>
  25 +
  26 +#include "RTPReceiver.h"
  27 +
  28 +
  29 +#define OUTTIME_RTCP 30*1000
  30 +#define PAYLOAD 99
  31 +#define PAYLOAD_PS 96
  32 +#define PAYLOAD_H264 98
  33 +#define PAYLOAD_MP4 97
  34 +
  35 +#define UDP_SIZE 1400
  36 +#define MIN_PORT 10000
  37 +#define MAX_PORT 60000
  38 +#define RTP_MAX_PACKET_LEN 1450
  39 +
  40 +using namespace jrtplib;
  41 +using namespace std;
  42 +
  43 +
  44 +
  45 +class TcpRTPSession;
  46 +class MyTCPTransmitter;
  47 +
  48 +class RTPTcpReceiver:public RTPReceiver
  49 +{
  50 +public:
  51 + RTPTcpReceiver();
  52 + ~RTPTcpReceiver();
  53 +
  54 + bool Open(uint16_t localPort);
  55 + bool IsOpened();
  56 + void Close();
  57 +
  58 +public:
  59 + int OnRtpRecv();
  60 + bool RequestStream();
  61 + bool isClosing();
  62 +
  63 +private:
  64 + int initSession(int localPort);
  65 +
  66 +private:
  67 +
  68 + std::atomic_bool m_bRtpExit; // 标识RTP收包线程闭
  69 +
  70 + std::atomic_bool m_bOpened;
  71 + std::atomic_bool m_bAccepted;
  72 + std::atomic_bool m_bClosing;
  73 +
  74 + int64_t m_idleCount;
  75 + int64_t m_noDataCount;//线程计数,用于打开流成功但是实际没流过来
  76 +
  77 + std::thread m_rtpThread; // RTP接收线程
  78 + SocketType m_nListener;
  79 +
  80 + RTPSession* m_rtpSessionPtr; // RTP会话
  81 + RTPSessionParams* m_pSessparams;
  82 + MyTCPTransmitter* m_pTrans;
  83 +};
  84 +
  85 +#endif // _RTP_TCP_RECEIVER_H_
src/gb28181/RTPUdpReceiver.cpp 0 → 100644
  1 +
  2 +#include "RTPUdpReceiver.h"
  3 +#include <iostream>
  4 +#include <time.h>
  5 +
  6 +#include <thread>
  7 +#include <chrono>
  8 +
  9 +#include "../logger.hpp"
  10 +
  11 +using namespace std;
  12 +
  13 +#define BUFFERSIZE_1024 4096
  14 +#define BUFFERSIZE_GAP 4096//5120 //1024*5
  15 +
  16 +namespace
  17 +{
  18 + const int kVideoFrameSize = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2;
  19 + const int kRtpRecvBufferSize = BUFFERSIZE_1024*BUFFERSIZE_1024*2;
  20 + const uint16_t kInvalidPort = 0;
  21 +}; // namespace
  22 +
  23 +class UdpRTPSession : public RTPSession
  24 +{
  25 +public:
  26 + UdpRTPSession() {}
  27 + virtual ~UdpRTPSession() {}
  28 +
  29 +private:
  30 + virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress)
  31 + {
  32 + AddDestination(*senderAddress);
  33 + }
  34 +
  35 + virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress)
  36 + {
  37 + //AddDestination(*senderaddress);
  38 + //const char* name = "hi~";
  39 + //SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8);
  40 +
  41 + //printf("send rtcp app");
  42 + }
  43 +};
  44 +
  45 +
  46 +static long long get_cur_time() {
  47 +
  48 + chrono::time_point<chrono::system_clock, chrono::milliseconds> tpMicro
  49 + = chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now());
  50 +
  51 + return tpMicro.time_since_epoch().count();
  52 +}
  53 +
  54 +static int rtp_revc_thread_(void* param)
  55 +{
  56 + if (!param)
  57 + {
  58 + return -1;
  59 + }
  60 +
  61 + RTPUdpReceiver* self = (RTPUdpReceiver*)param;
  62 + return self->OnRtpRecv();
  63 +}
  64 +
  65 +RTPUdpReceiver::RTPUdpReceiver()
  66 +: m_bRtpExit(false)
  67 +, m_bOpened(false)
  68 +, m_idleCount(-1)
  69 +,m_noDataCount(-1)
  70 +{
  71 + m_sessparamsPtr = new RTPSessionParams();
  72 + m_transparamsPtr = new RTPUDPv4TransmissionParams();
  73 + m_rtpSessionPtr = new UdpRTPSession();
  74 +}
  75 +
  76 +RTPUdpReceiver::~RTPUdpReceiver()
  77 +{
  78 + if (IsOpened())
  79 + Close();
  80 +
  81 + if(nullptr != m_sessparamsPtr){
  82 + delete m_sessparamsPtr;
  83 + m_sessparamsPtr = nullptr;
  84 + }
  85 +
  86 + if(nullptr != m_transparamsPtr){
  87 + delete m_transparamsPtr;
  88 + m_transparamsPtr = nullptr;
  89 + }
  90 +
  91 + if(nullptr != m_rtpSessionPtr){
  92 + delete m_rtpSessionPtr;
  93 + m_rtpSessionPtr = nullptr;
  94 + }
  95 +}
  96 +
  97 +bool RTPUdpReceiver::Open(uint16_t localPort)
  98 +{
  99 + m_sessparamsPtr->SetUsePollThread(true);
  100 + m_sessparamsPtr->SetMinimumRTCPTransmissionInterval(10);
  101 + m_sessparamsPtr->SetOwnTimestampUnit(1.0/90000.0);
  102 + m_sessparamsPtr->SetAcceptOwnPackets(true);
  103 +
  104 + m_transparamsPtr->SetPortbase(localPort);
  105 + m_transparamsPtr->SetRTPReceiveBuffer(kRtpRecvBufferSize);
  106 +
  107 + LOG_INFO("[{}] port: {}", m_deviceID, localPort);
  108 +
  109 + int err = m_rtpSessionPtr->Create(*m_sessparamsPtr, m_transparamsPtr);
  110 + if (err != 0)
  111 + {
  112 + LOG_ERROR("[{}] Create error: {}", m_deviceID, err);
  113 + return false;
  114 + }
  115 +
  116 + m_rtpThreadPtr = new std::thread(rtp_revc_thread_, this);
  117 + if (nullptr == m_rtpThreadPtr)
  118 + {
  119 + LOG_ERROR("[{}] Create m_rtpThreadPtr error", m_deviceID);
  120 + return false;
  121 + }
  122 +
  123 +
  124 + if (InitPS() != 0)
  125 + {
  126 + return false;
  127 + }
  128 +
  129 + m_bOpened = true;
  130 + LOG_INFO("[{}] Open ok", m_deviceID);
  131 +
  132 + return true;
  133 +}
  134 +
  135 +bool RTPUdpReceiver::IsOpened()
  136 +{
  137 + return m_bOpened;
  138 +}
  139 +
  140 +void RTPUdpReceiver::Close()
  141 +{
  142 + m_bRtpExit = true;
  143 +
  144 + // rtp接收线程退出
  145 + if (nullptr != m_rtpThreadPtr && m_rtpThreadPtr->joinable())
  146 + {
  147 + m_rtpThreadPtr->join();
  148 + delete m_rtpThreadPtr;
  149 + m_rtpThreadPtr = nullptr;
  150 + }
  151 + m_rtpSessionPtr->Destroy();
  152 +
  153 + ClosePsThread();
  154 +
  155 + m_bOpened = false;
  156 +
  157 + LOG_INFO("[{}] closed.", m_deviceID);
  158 +}
  159 +
  160 +// 收RTP包线程
  161 +int RTPUdpReceiver::OnRtpRecv()
  162 +{
  163 + if(nullptr == m_rtpSessionPtr){
  164 + return -1;
  165 + }
  166 +
  167 + LOG_INFO("[{}] OnRtpRecv started.", m_deviceID);
  168 + while (!m_bRtpExit)
  169 + {
  170 + //try
  171 + //{
  172 + m_rtpSessionPtr->Poll();
  173 + m_rtpSessionPtr->BeginDataAccess();
  174 +
  175 + if (m_rtpSessionPtr->GotoFirstSourceWithData())
  176 + {
  177 + LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_deviceID);
  178 + last_recv_ts = get_cur_time();
  179 + m_idleCount = 0;
  180 + m_noDataCount = 0;
  181 + do
  182 + {
  183 + RTPPacket* packet;
  184 + while ((packet = m_rtpSessionPtr->GetNextPacket()) != NULL)
  185 + {
  186 + LOG_INFO("OnRtpRecv GetNextPacket --{}", m_deviceID);
  187 + int ret = ParsePacket(packet);
  188 + m_rtpSessionPtr->DeletePacket(packet);
  189 +
  190 + if(ret != 0){
  191 + m_bRtpExit = true;
  192 + }
  193 + }
  194 + } while (m_rtpSessionPtr->GotoNextSourceWithData());
  195 + }
  196 + //else {
  197 + // if (m_idleCount != -1)
  198 + // {
  199 + // ++m_idleCount;//流中断计数
  200 + // }
  201 + // if (m_noDataCount != 0)
  202 + // {
  203 + // --m_noDataCount;//没流计数
  204 + // }
  205 + // //if (m_idleCount > 3000) {
  206 + // // m_hVodEndFunc(m_usrParam);
  207 + // // m_idleCount = 0;
  208 + // //历史流结束的时候,也会出现超时,这个是正常的
  209 + // if(m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD)
  210 + // {
  211 + // if (m_idleCount > 10000)
  212 + // {
  213 + // //这里要判断下历史流是否结束,如果未结束,就设置为流中断
  214 + // //由于record_stream_status这个函数返回值不准确,所以增加一个进度条大于80%
  215 + // if(record_stream_status(((VideoSession *)GetUsrParam())->streamHandle()))
  216 + // {
  217 + // LOG_INFO("************Record stream is finished**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress());
  218 + // m_idleCount = -1;
  219 + // m_hVodEndFunc(m_usrParam);
  220 + // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());
  221 + // ((VideoSession *)GetUsrParam())->streamHandle().clear();
  222 + // }
  223 + // else
  224 + // {
  225 + // //如果此时进度大于80% 算完成吧
  226 + // if(((VideoSession *)GetUsrParam())->progress() > 0.80)
  227 + // {
  228 + // LOG_INFO("************Record stream is overtime**{}**m_progress = {}********", m_deviceID, ((VideoSession *)GetUsrParam())->progress());
  229 +
  230 + // m_idleCount = 0;
  231 + // m_hVodEndFunc(m_usrParam);
  232 + // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());
  233 + // ((VideoSession *)GetUsrParam())->streamHandle().clear();
  234 + // }
  235 + // else
  236 + // {
  237 + // m_idleCount = -1;
  238 + // //LOG_ERROR("************post ERROR_REALSTREAM_INTERRUPT to structure****{}********", m_deviceID);
  239 + // //发送流中断
  240 + // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption!");
  241 + // }
  242 + // }
  243 + //
  244 + //
  245 + // }
  246 + //
  247 + // if (m_noDataCount < -200000)//任务开始时没收到流
  248 + // {
  249 + // //LOG_ERROR("************m_hVodEndFunc(m_usrParam)!!!m_hVodEndFunc(m_usrParam)********{}******", m_deviceID);
  250 + // m_noDataCount = -1;
  251 +
  252 + // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Record time video streaming interruption2!");
  253 + // //m_hVodEndFunc(m_usrParam);
  254 + // }
  255 + // }
  256 + // else//实时任务断流
  257 + // //if (m_usrParam && ((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL)
  258 + // {
  259 + //
  260 + // //每超过3000次,发送一次send_vedio_eof 时长大约1.5s
  261 + // //若是30000,时长大约 18s
  262 + // if(m_idleCount > 30000)
  263 + // {
  264 + // uint64_t cts = get_cur_time();
  265 + // float duration_not_recv = (cts - last_recv_ts) / 1000.0;
  266 + //
  267 + // //LOG_ERROR("************I haven't got stream from hik gateway exceed {}s,send eof********{}******", duration_not_recv, m_deviceID);
  268 + // m_idleCount = -1;
  269 +
  270 + // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption!");
  271 + // }
  272 + //
  273 + // if (m_noDataCount < -200000)//任务开始时没收到流
  274 + // {
  275 + // //LOG_ERROR("************m_noDataCount < -200000********{}******", m_deviceID);
  276 + // m_noDataCount = -1;
  277 +
  278 + // //throw GeneralException2(ERROR_REALSTREAM_INTERRUPT, "Real time video streaming interruption2!");
  279 + // }
  280 + //
  281 + // }
  282 + //}
  283 + //}
  284 + // catch (GeneralException2& e)
  285 + //{
  286 + // //LOG_ERROR("---> video streaming interruption!<---{}, error: {}", m_deviceID, e.err_msg());
  287 +
  288 + // byte_buffer bb(64);
  289 + // bb << VasCmd::VAS_CMD_REALSTREAM_INTERRUPT << e.err_msg();
  290 +
  291 + // if (m_usrParam)
  292 + // {
  293 + // if (((VideoSession *)GetUsrParam())->msgChan()->is_valid()) {
  294 + // try {
  295 + // ((VideoSession *)GetUsrParam())->msgChan()->send_msg(bb.data_ptr(), bb.data_size());
  296 + // }
  297 + // catch (GeneralException2& e) {
  298 + // //LOG_ERROR("[{}] send vas cmd VAS_CMD_REALSTREAM_INTERRUPT error: {}, {}", m_deviceID, e.err_code(), e.err_str());
  299 + // }
  300 + // }
  301 +
  302 + // //通知网关关闭句柄
  303 + // if(!((VideoSession *)GetUsrParam())->streamHandle().empty())
  304 + // {
  305 +
  306 + // LOG_INFO("---->Notify hisense gateway release handle = {} !<----{}", ((VideoSession *)GetUsrParam())->streamHandle().c_str(), m_deviceID);
  307 + // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::EREAL)
  308 + // real_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());
  309 + //
  310 + // if (((VideoSession *)GetUsrParam())->video_type() == VideoType::ERECORD)
  311 + // record_stream_stop(((VideoSession *)GetUsrParam())->streamHandle());
  312 + //
  313 + // //清理保活的句柄
  314 + // ((VideoSession *)GetUsrParam())->streamHandle().clear();
  315 + // }
  316 + // }
  317 + //
  318 + // bb.bset(0);
  319 + //
  320 + //}
  321 + m_rtpSessionPtr->EndDataAccess();
  322 +
  323 + std::this_thread::sleep_for(std::chrono::milliseconds(10));
  324 + }
  325 +
  326 + LOG_INFO("[{}] OnRtpRecv exited.", m_deviceID);
  327 +
  328 + return 0;
  329 +}
  330 +
  331 +
src/gb28181/RTPUdpReceiver.h 0 → 100644
  1 +#ifndef _RTP_UDP_RECEIVER_H_
  2 +#define _RTP_UDP_RECEIVER_H_
  3 +
  4 +#include "rtpudpv4transmitter.h"
  5 +#include "rtpipv4address.h"
  6 +#include "rtpsessionparams.h"
  7 +#include "rtpsession.h"
  8 +#include <queue>
  9 +#include <iostream>
  10 +#include <thread>
  11 +#include <string>
  12 +#include <mutex>
  13 +
  14 +#include "RTPReceiver.h"
  15 +
  16 +
  17 +#define OUTTIME_RTCP 30*1000
  18 +#define PAYLOAD 99
  19 +#define PAYLOAD_PS 96
  20 +#define PAYLOAD_H264 98
  21 +#define PAYLOAD_MP4 97
  22 +
  23 +#define UDP_SIZE 1400
  24 +#define MIN_PORT 10000
  25 +#define MAX_PORT 60000
  26 +#define RTP_MAX_PACKET_LEN 1450
  27 +
  28 +using namespace jrtplib;
  29 +using namespace std;
  30 +
  31 +
  32 +class UdpRTPSession;
  33 +
  34 +class RTPUdpReceiver: public RTPReceiver
  35 +{
  36 +public:
  37 + RTPUdpReceiver();
  38 + ~RTPUdpReceiver();
  39 +
  40 + virtual bool Open(uint16_t localPort);
  41 + virtual bool IsOpened() ;
  42 + virtual void Close() ;
  43 +
  44 +public:
  45 + int OnRtpRecv();
  46 +
  47 +private:
  48 + std::thread* m_rtpThreadPtr; // RTP接收线程
  49 +
  50 + UdpRTPSession* m_rtpSessionPtr; // RTP会话
  51 + std::atomic_bool m_bRtpExit; // 标识RTP收包线程闭
  52 +
  53 + std::atomic_bool m_bOpened;
  54 +
  55 + int64_t m_idleCount;
  56 + int64_t m_noDataCount;//线程计数,用于打开流成功但是实际没流过来
  57 +
  58 + RTPSessionParams* m_sessparamsPtr;
  59 + RTPUDPv4TransmissionParams* m_transparamsPtr;
  60 +};
  61 +
  62 +#endif // _RTP_UDP_RECEIVER_H_
src/gb28181/demuxer.h
@@ -8,6 +8,9 @@ @@ -8,6 +8,9 @@
8 { CMpeg2Demux class. } 8 { CMpeg2Demux class. }
9 { } 9 { }
10 {*******************************************************/ 10 {*******************************************************/
  11 +#ifndef _DEMUXER_H_
  12 +#define _DEMUXER_H_
  13 +
11 #include <stdint.h> 14 #include <stdint.h>
12 #include "buffer.h" 15 #include "buffer.h"
13 16
@@ -46,8 +49,8 @@ @@ -46,8 +49,8 @@
46 //typedef long long INT64; 49 //typedef long long INT64;
47 //typedef unsigned long long UINT64; 50 //typedef unsigned long long UINT64;
48 51
49 -typedef int ReceiveFunction(unsigned char streamType, void* data, int size, uint64_t pts, uint64_t localPts, bool bKey, void* userData);//es»Øµ÷  
50 -typedef int ReceiveFunction2(unsigned int streamtype, void * Data, int Size, uint64_t pts, bool iskeyfram, void* userdata);//ps»Øµ÷ 52 +typedef int ReceiveFunction(unsigned char streamType, void* data, int size, uint64_t pts, uint64_t localPts, bool bKey, void* userData);//es�ص�
  53 +typedef int ReceiveFunction2(unsigned int streamtype, void * Data, int Size, uint64_t pts, bool iskeyfram, void* userdata);//ps�ص�
51 54
52 static /*_inline*/ unsigned int asm_swap32(unsigned int x); 55 static /*_inline*/ unsigned int asm_swap32(unsigned int x);
53 static /*_inline*/ unsigned short asm_swap16(unsigned short x); 56 static /*_inline*/ unsigned short asm_swap16(unsigned short x);
@@ -77,4 +80,6 @@ public: @@ -77,4 +80,6 @@ public:
77 int AddData(void * Data, int Size/*, DWORD pts*/); 80 int AddData(void * Data, int Size/*, DWORD pts*/);
78 void SetReceiveFunction(ReceiveFunction * func, void* userdata); 81 void SetReceiveFunction(ReceiveFunction * func, void* userdata);
79 void SetReceiveFunction2(ReceiveFunction2 * func2, void* userdata2); 82 void SetReceiveFunction2(ReceiveFunction2 * func2, void* userdata2);
80 -};  
81 \ No newline at end of file 83 \ No newline at end of file
  84 +};
  85 +
  86 +#endif // _DEMUXER_H_
82 \ No newline at end of file 87 \ No newline at end of file
src/main.cpp
@@ -28,8 +28,6 @@ @@ -28,8 +28,6 @@
28 #define MIN_RTP_PORT 10000 28 #define MIN_RTP_PORT 10000
29 #define MAX_RTP_PORT 60000 29 #define MAX_RTP_PORT 60000
30 30
31 -string data_home = "/mnt/f/fiss/data/";  
32 -  
33 // ȡ MIN_RTP_PORT(10000)~MAX_RTP_PORT(60000)֮�������˿�(ż���������������˿ڿ���) 31 // ȡ MIN_RTP_PORT(10000)~MAX_RTP_PORT(60000)֮�������˿�(ż���������������˿ڿ���)
34 int allocRtpPort() { 32 int allocRtpPort() {
35 33
@@ -90,7 +88,7 @@ int sum2 = 0; @@ -90,7 +88,7 @@ int sum2 = 0;
90 88
91 cudaStream_t stream[2]; 89 cudaStream_t stream[2];
92 90
93 -string data_home = "/mnt/data/cmhu/FFNvDecoder/data/"; 91 +string data_home = "/data/tongtu/";
94 92
95 93
96 #define checkCudaErrors(S) do {CUresult status; \ 94 #define checkCudaErrors(S) do {CUresult status; \
@@ -173,7 +171,7 @@ void postDecoded(const void * userPtr, AVFrame * gpuFrame){ @@ -173,7 +171,7 @@ void postDecoded(const void * userPtr, AVFrame * gpuFrame){
173 // cout << "decode successed ✿✿ヽ(°▽°)ノ✿ " << endl; 171 // cout << "decode successed ✿✿ヽ(°▽°)ノ✿ " << endl;
174 172
175 int sum = sum1; 173 int sum = sum1;
176 - if (decoder->getName() == "dec1") 174 + if (decoder->getName() == "dec0")
177 { 175 {
178 sum1 ++ ; 176 sum1 ++ ;
179 sum = sum1; 177 sum = sum1;
@@ -301,17 +299,17 @@ void decode_finished_cbk(const void* userPtr){ @@ -301,17 +299,17 @@ void decode_finished_cbk(const void* userPtr){
301 cout << "当前时间戳: " << get_cur_time() << endl; 299 cout << "当前时间戳: " << get_cur_time() << endl;
302 } 300 }
303 301
  302 +bool decode_request_stream_cbk(){
  303 + cout << "需在此请求流" << endl;
  304 + return true;
  305 +}
  306 +
304 // string test_uri = "rtmp://192.168.10.56:1935/objecteye/1"; 307 // string test_uri = "rtmp://192.168.10.56:1935/objecteye/1";
305 // string test_uri = "/home/cmhu/data/output_800x480.mp4"; 308 // string test_uri = "/home/cmhu/data/output_800x480.mp4";
306 // string test_uri = "/home/cmhu/data/output_1920x1080.mp4"; 309 // string test_uri = "/home/cmhu/data/output_1920x1080.mp4";
307 // string test_uri = "rtsp://176.10.0.2:8554/stream"; 310 // string test_uri = "rtsp://176.10.0.2:8554/stream";
308 // string test_uri = "/mnt/f/fiss/test_data/h265.mp4"; 311 // string test_uri = "/mnt/f/fiss/test_data/h265.mp4";
309 string test_uri = "rtsp://176.10.0.4:8554/stream"; 312 string test_uri = "rtsp://176.10.0.4:8554/stream";
310 -char* gpuid = "0";  
311 -string test_uri = "ws://127.0.0.1:10000/sms/34020000002020000001/flv/hls/34020000001110005555_34020000001310005554.flv";  
312 -// string test_uri = "rtsp://176.10.0.4:8554/stream";  
313 -  
314 -char* gpu_id = "0";  
315 313
316 void createDecode(int index, const char* gpu_id){ 314 void createDecode(int index, const char* gpu_id){
317 FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance(); 315 FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance();
@@ -323,7 +321,7 @@ void createDecode(int index, const char* gpu_id){ @@ -323,7 +321,7 @@ void createDecode(int index, const char* gpu_id){
323 config.cfg.force_tcp = true; 321 config.cfg.force_tcp = true;
324 config.dec_type = DECODER_TYPE_FFMPEG; 322 config.dec_type = DECODER_TYPE_FFMPEG;
325 323
326 - config.cfg.gpuid = gpuid; 324 + config.cfg.gpuid = gpu_id;
327 // if (index % 2 == 0) 325 // if (index % 2 == 0)
328 // { 326 // {
329 // config.cfg.gpuid = "0"; 327 // config.cfg.gpuid = "0";
@@ -343,19 +341,20 @@ void createDecode(int index, const char* gpu_id){ @@ -343,19 +341,20 @@ void createDecode(int index, const char* gpu_id){
343 pDecManager->startDecodeByName(config.name); 341 pDecManager->startDecodeByName(config.name);
344 } 342 }
345 343
346 -void createGB28181Decode(int index, char* gpuid){ 344 +void createGB28181Decode(int index, char* gpu_id, int port){
347 FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance(); 345 FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance();
348 MgrDecConfig config; 346 MgrDecConfig config;
349 config.name = "dec" + to_string(index); 347 config.name = "dec" + to_string(index);
350 config.cfg.uri = config.name; 348 config.cfg.uri = config.name;
351 config.cfg.post_decoded_cbk = postDecoded; 349 config.cfg.post_decoded_cbk = postDecoded;
352 config.cfg.decode_finished_cbk = decode_finished_cbk; 350 config.cfg.decode_finished_cbk = decode_finished_cbk;
  351 + config.cfg.request_stream_cbk = decode_request_stream_cbk;
353 config.cfg.force_tcp = true; 352 config.cfg.force_tcp = true;
354 353
355 config.dec_type = DECODER_TYPE_GB28181; 354 config.dec_type = DECODER_TYPE_GB28181;
356 - config.cfg.port = 30012;//allocRtpPort(); 355 + config.cfg.port = port;//allocRtpPort();
357 356
358 - config.cfg.gpuid = gpuid; 357 + config.cfg.gpuid = gpu_id;
359 358
360 AbstractDecoder* decoder = pDecManager->createDecoder(config); 359 AbstractDecoder* decoder = pDecManager->createDecoder(config);
361 if (!decoder) 360 if (!decoder)
@@ -376,23 +375,14 @@ void logFF(void *, int level, const char *fmt, va_list ap) @@ -376,23 +375,14 @@ void logFF(void *, int level, const char *fmt, va_list ap)
376 int main(int argc, char* argv[]){ 375 int main(int argc, char* argv[]){
377 376
378 test_uri = argv[1]; 377 test_uri = argv[1];
379 - gpuid = argv[2];  
380 - cout << test_uri << " gpu_id:" << gpu_id << endl; 378 + char* gpuid = argv[2];
  379 + int port = atoi(argv[3]);
  380 + cout << test_uri << " gpu_id:" << gpuid << " port:" << port << endl;
381 381
382 // av_log_set_callback(&logFF); 382 // av_log_set_callback(&logFF);
383 383
384 CheckCUDAProperty(atoi(gpuid)); 384 CheckCUDAProperty(atoi(gpuid));
385 385
386 - FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance();  
387 -  
388 - // int count = 99;  
389 - // for (size_t i = 0; i < count ; i++)  
390 - // {  
391 - // createDecode(i);  
392 - // }  
393 -  
394 -  
395 -  
396 pthread_t m_decode_thread; 386 pthread_t m_decode_thread;
397 pthread_create(&m_decode_thread,0, 387 pthread_create(&m_decode_thread,0,
398 [](void* arg) 388 [](void* arg)
@@ -400,7 +390,7 @@ int main(int argc, char* argv[]){ @@ -400,7 +390,7 @@ int main(int argc, char* argv[]){
400 // cudaSetDevice(atoi(gpuid)); 390 // cudaSetDevice(atoi(gpuid));
401 while (true) 391 while (true)
402 { 392 {
403 - std::this_thread::sleep_for(std::chrono::milliseconds(5000)); 393 + std::this_thread::sleep_for(std::chrono::minutes(1));
404 FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance(); 394 FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance();
405 int count = pDecManager->count(); 395 int count = pDecManager->count();
406 cout << "当前时间:" << get_cur_time() << " 当前运行路数: " << pDecManager->count() << endl; 396 cout << "当前时间:" << get_cur_time() << " 当前运行路数: " << pDecManager->count() << endl;
@@ -410,8 +400,6 @@ int main(int argc, char* argv[]){ @@ -410,8 +400,6 @@ int main(int argc, char* argv[]){
410 } 400 }
411 ,nullptr); 401 ,nullptr);
412 402
413 -  
414 -  
415 403
416 FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance(); 404 FFNvDecoderManager* pDecManager = FFNvDecoderManager::getInstance();
417 int i = 0; 405 int i = 0;
@@ -423,18 +411,17 @@ int main(int argc, char* argv[]){ @@ -423,18 +411,17 @@ int main(int argc, char* argv[]){
423 { 411 {
424 break; 412 break;
425 } 413 }
426 -  
427 414
428 switch (ch) 415 switch (ch)
429 { 416 {
430 case 'f': 417 case 'f':
431 case 'F': 418 case 'F':
432 - createDecode(i, gpu_id); 419 + createDecode(i, gpuid);
433 i++; 420 i++;
434 break; 421 break;
435 case 'g': 422 case 'g':
436 case 'G': 423 case 'G':
437 - createGB28181Decode(i, gpu_id); 424 + createGB28181Decode(i, gpuid, port);
438 i++; 425 i++;
439 break; 426 break;
440 case 'r': 427 case 'r':