Commit 150d457dc81cd2a68789cc8492968227ddeeb49c

Authored by Hu Chunming
1 parent 8a3c2932

代码暂存,未完成

.vscode/settings.json
... ... @@ -79,7 +79,9 @@
79 79 "forward_list": "cpp",
80 80 "source_location": "cpp",
81 81 "slist": "cpp",
82   - "valarray": "cpp"
  82 + "valarray": "cpp",
  83 + "hash_map": "cpp",
  84 + "hash_set": "cpp"
83 85 },
84 86 "C_Cpp_Runner.cCompilerPath": "gcc",
85 87 "C_Cpp_Runner.cppCompilerPath": "g++",
... ...
src/decoder/gb28181/rtp/FFRtpParser.cpp 0 → 100644
  1 +//
  2 +// Created by bxc on 2023/4/18.
  3 +// 作者:北小菜
  4 +// 邮箱:bilibili_bxc@126.com
  5 +// 西瓜视频主页:https://www.ixigua.com/home/4171970536803763
  6 +// 哔哩哔哩主页:https://space.bilibili.com/487906612/
  7 +//
  8 +
  9 +#include "FFRtpParser.h"
  10 +#include "Utils.h"
  11 +#include <string.h>
  12 +
  13 +int avio_read_packet(void* opaque, uint8_t* buf, int buffsize){
  14 + FFRtpParser* player = (FFRtpParser*)opaque;
  15 +
  16 + int ret = 0;
  17 + if (player->bufferSize >= buffsize)
  18 + {
  19 + memcpy(buf, player->buffer, buffsize);
  20 + player->bufferSize = player->bufferSize - buffsize;
  21 + memmove(player->buffer, player->buffer + buffsize, player->bufferSize);
  22 + ret = buffsize;
  23 +
  24 + LOG_INFO("avio_read_packet=%d", buffsize);
  25 + }
  26 + return ret;
  27 +}
  28 +
  29 +FFRtpParser::FFRtpParser()
  30 +{
  31 +}
  32 +
  33 +FFRtpParser::~FFRtpParser()
  34 +{
  35 + if (mVideoCodecPar) {
  36 + avcodec_parameters_free(&mVideoCodecPar);
  37 + }
  38 + if (mVideoCodecCtx) {
  39 + avcodec_close(mVideoCodecCtx);
  40 + mVideoCodecCtx = nullptr;
  41 + }
  42 +
  43 + if (mFmtCtx) {
  44 + avformat_close_input(&mFmtCtx);
  45 + mFmtCtx = nullptr;
  46 + }
  47 +}
  48 +
  49 +bool FFRtpParser::probe()
  50 +{
  51 + mFmtCtx = avformat_alloc_context();
  52 +
  53 + unsigned char* avioBuff = (unsigned char*)av_malloc(1920 * 1080);
  54 + mAvioCtx = avio_alloc_context(avioBuff, sizeof(avioBuff), 0, this, avio_read_packet, NULL, NULL);
  55 + //探测流(获取码流格式)
  56 + if (av_probe_input_buffer2(mAvioCtx, (const AVInputFormat**)&mInputFmt, "", NULL, 0, 0) < 0){
  57 + LOG_ERROR("av_probe_input_buffer2 error");
  58 + return false;
  59 + }
  60 + mFmtCtx->pb = mAvioCtx;
  61 +
  62 + //配置流参数
  63 + //av_dict_set(&options, "fflags", "nobuffer", 0); //不缓存直接解码
  64 +
  65 + //打开流
  66 + if (avformat_open_input(&mFmtCtx, "", mInputFmt, &net_options) != 0)
  67 + {
  68 + LOG_ERROR("avformat_open_input error");
  69 + return false;
  70 + }
  71 + //获取流信息
  72 + if (avformat_find_stream_info(mFmtCtx, NULL) < 0)//?
  73 + {
  74 + LOG_ERROR("avformat_find_stream_info error");
  75 + return false;
  76 + }
  77 + //获取视频流
  78 + mVideoStream = av_find_best_stream(mFmtCtx, AVMEDIA_TYPE_VIDEO, -1, -1, NULL, 0);
  79 + if (mVideoStream < 0)
  80 + {
  81 + LOG_ERROR("av_find_best_stream error");
  82 + return false;
  83 + }
  84 + //获取解码信息
  85 + mVideoCodecPar = mFmtCtx->streams[mVideoStream]->codecpar;
  86 + const AVCodec* videoCodec = avcodec_find_decoder(mVideoCodecPar->codec_id);
  87 + if (!videoCodec){
  88 + LOG_ERROR("avcodec_find_decoder error");
  89 + return false;
  90 + }
  91 + mVideoCodecCtx = avcodec_alloc_context3(videoCodec);
  92 +
  93 + //codecpar为解码器上下文赋值
  94 + if (avcodec_parameters_to_context(mVideoCodecCtx, mVideoCodecPar) != 0)
  95 + {
  96 + LOG_ERROR("avcodec_parameters_to_context error");
  97 + return false;
  98 + }
  99 +
  100 + //设置解码器参数
  101 + //av_dict_set(&codec_options, "tune", "zero-latency", 0);//设置零延迟
  102 + //av_dict_set(&codec_options, "preset", "ultrafast", 0);//设置最模糊但是最快的解码方式
  103 + //av_dict_set(&codec_options, "x265-params", "qp=20", 0);//设置265量化参数
  104 + //量化参数:控制了视频帧中每一个宏区块(Macroblock)的压缩量。较大的数值,量化值更高,意味着更多的压缩,更低的质量,较小的数值代表相反的含义。
  105 +
  106 + //打开解码器
  107 + if (avcodec_open2(mVideoCodecCtx, videoCodec, &codec_options) < 0)
  108 + {
  109 + LOG_ERROR("avcodec_open2 error");
  110 + return false;
  111 + }
  112 + LOG_INFO("mVideoCodecCtx->width=%d,mVideoCodecCtx->height=%d", mVideoCodecCtx->width, mVideoCodecCtx->height);
  113 + return true;
  114 +}
  115 +
  116 +void FFRtpParser::play(){
  117 + LOG_INFO("start");
  118 +
  119 + AVPacket pkt;
  120 + while (av_read_frame(mFmtCtx, &pkt) >= 0) {
  121 + if (pkt.stream_index == mVideoStream){
  122 +
  123 + }
  124 + av_packet_unref(&pkt);
  125 + }
  126 + LOG_INFO("end");
  127 +}
0 128 \ No newline at end of file
... ...
src/decoder/gb28181/rtp/FFRtpParser.h 0 → 100644
  1 +//
  2 +// Created by bxc on 2023/4/18.
  3 +// 作者:北小菜
  4 +// 邮箱:bilibili_bxc@126.com
  5 +// 西瓜视频主页:https://www.ixigua.com/home/4171970536803763
  6 +// 哔哩哔哩主页:https://space.bilibili.com/487906612/
  7 +//
  8 +
  9 +#ifndef GB28181_RTP_FFRTPPARSER_H
  10 +#define GB28181_RTP_FFRTPPARSER_H
  11 +
  12 +#include <atomic>
  13 +
  14 +extern "C"
  15 +{
  16 + #include <libavcodec/avcodec.h>
  17 + #include <libavformat/avformat.h>
  18 + #include <libswscale/swscale.h>
  19 +}
  20 +
  21 +#define RtpParser_buffer_max_size 4194304 // 4M = 4 * 1024 * 1024 = 4194304 字节
  22 +
  23 +class FFRtpParser
  24 +{
  25 +public:
  26 + FFRtpParser();
  27 + ~FFRtpParser();
  28 +public:
  29 + bool probe();//阻塞式探测国标流并获取解码参数
  30 + void play();//在探测国标流成功以后,解码并渲染国标视频流
  31 +public:
  32 + std::atomic<char> buffer[RtpParser_buffer_max_size];
  33 + std::atomic_int bufferSize {0};
  34 +private:
  35 + AVFormatContext * mFmtCtx;
  36 + AVIOContext * mAvioCtx;
  37 + const AVInputFormat* mInputFmt;
  38 + int mVideoStream = -1;
  39 + AVCodecParameters * mVideoCodecPar;
  40 + AVCodecContext * mVideoCodecCtx;
  41 +
  42 + AVDictionary* net_options;//网络连接参数
  43 + AVDictionary* codec_options;//编码参数
  44 +
  45 +};
  46 +#endif //GB28181_RTP_FFRTPPARSER_H
0 47 \ No newline at end of file
... ...
src/decoder/gb28181/rtp/RTPReceiver.cpp
... ... @@ -93,7 +93,7 @@ void RTPReceiver::ClosePsThread(){
93 93 LOG_INFO("[{}] 3.", m_SipChannelId);
94 94 m_bPsExit = true;
95 95 // PS解包线程退出
96   - if (m_psThreadPtr->joinable())
  96 + if (m_psThreadPtr && m_psThreadPtr->joinable())
97 97 {
98 98 m_psThreadPtr->join();
99 99 delete m_psThreadPtr;
... ...
src/decoder/gb28181/rtp/RTPReceiver2.cpp 0 → 100644
  1 +#include "RTPReceiver2.h"
  2 +#include "rtppacket.h"
  3 +#include <thread>
  4 +
  5 +#include "../common_header.h"
  6 +#include "../websocket/WebsocketClient.h"
  7 +
  8 +#ifdef __linux__
  9 +#include "arpa/inet.h"
  10 +#endif
  11 +
  12 +#include "Rtp.h"
  13 +
  14 +const int MAX_RTP_BUFFER_SIZE = 1024*1024*10;
  15 +
  16 +#define Server_cache_max_size 4194304 // 1M = 1 * 1024 * 1024 = 1048576 字节
  17 +#define Server_rtp_max_size 1800
  18 +
  19 +
  20 +RTPReceiver2::RTPReceiver2()
  21 +{
  22 + mRecvCache = (uint8_t*)malloc(Server_cache_max_size);
  23 + mRecvRtpBuffer = (uint8_t*)malloc(Server_rtp_max_size);
  24 +}
  25 +
  26 +RTPReceiver2::~RTPReceiver2(){
  27 + if (mRecvCache) {
  28 + free(mRecvCache);
  29 + mRecvCache = nullptr;
  30 + }
  31 +
  32 + if (mRecvRtpBuffer) {
  33 + free(mRecvRtpBuffer);
  34 + mRecvRtpBuffer = nullptr;
  35 + }
  36 +}
  37 +
  38 +int RTPReceiver2::init(const char* ip, uint16_t port, bool isUdp) {
  39 + if (!isUdp) {
  40 + LOG_INFO("tcp://%s:%d", ip, port);
  41 + startTcpServer(ip, port);
  42 + }
  43 + else {
  44 + LOG_INFO("udp://%s:%d", ip, port);
  45 + startUdpServer(ip, port);
  46 + }
  47 +}
  48 +
  49 +int RTPReceiver2::startUdpServer(const char* ip, uint16_t port) {
  50 +
  51 + int server_fd, ret;
  52 + struct sockaddr_in ser_addr;
  53 +
  54 + server_fd = socket(AF_INET, SOCK_DGRAM, 0); //AF_INET:IPV4;SOCK_DGRAM:UDP
  55 + if(server_fd < 0)
  56 + {
  57 + printf("create socket fail!\n");
  58 + return -1;
  59 + }
  60 +
  61 + memset(&ser_addr, 0, sizeof(ser_addr));
  62 + ser_addr.sin_family = AF_INET;
  63 + ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); //IP地址,需要进行网络序转换,INADDR_ANY:本地地址
  64 + ser_addr.sin_port = htons(port); //端口号,需要网络序转换
  65 +
  66 + ret = bind(server_fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
  67 + if(ret < 0)
  68 + {
  69 + printf("socket bind fail!\n");
  70 + return -1;
  71 + }
  72 +
  73 +
  74 + char recvBuf[10000];
  75 + int recvBufSize;
  76 +
  77 + socklen_t len;
  78 + struct sockaddr_in clent_addr; //clent_addr用于记录发送方的地址信息
  79 + while(!m_bRtpExit)
  80 + {
  81 + memset(recvBuf, 0, sizeof(recvBuf));
  82 + len = sizeof(clent_addr);
  83 + recvBufSize = recvfrom(server_fd, recvBuf, sizeof(recvBuf), 0, (struct sockaddr*)&clent_addr, &len); //recvfrom是拥塞函数,没有数据就一直拥塞
  84 + if(recvBufSize <= 0) {
  85 + printf("recieve data fail!\n");
  86 + break;
  87 + }
  88 +
  89 + if ((mPlayer->bufferSize + recvBufSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) {
  90 + memcpy(mPlayer->buffer + mPlayer->bufferSize, recvBuf + RTP_HEADER_SIZE, recvBufSize - RTP_HEADER_SIZE);
  91 + mPlayer->bufferSize += recvBufSize - RTP_HEADER_SIZE;
  92 + } else {
  93 + LOG_ERROR("recvBufSize = {} over GB28181Player_buffer_max_size ", recvBufSize);
  94 + }
  95 + }
  96 +
  97 + close(server_fd);
  98 +
  99 + return 0;
  100 +}
  101 +
  102 +int RTPReceiver2::startTcpServer(const char* ip, uint16_t port) {
  103 +
  104 + int listenfd, connfd;
  105 + struct sockaddr_in servaddr;
  106 + char buff[4096];
  107 + int n;
  108 +
  109 + if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1 ){
  110 + printf("create socket error: %s(errno: %d)\n",strerror(errno),errno);
  111 + return 0;
  112 + }
  113 +
  114 + memset(&servaddr, 0, sizeof(servaddr));
  115 + servaddr.sin_family = AF_INET;
  116 + servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  117 + servaddr.sin_port = htons(port);
  118 +
  119 + if( bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){
  120 + printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno);
  121 + return 0;
  122 + }
  123 +
  124 + if( listen(listenfd, 10) == -1){
  125 + printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno);
  126 + return 0;
  127 + }
  128 +
  129 +
  130 + char recvBuf[10000];
  131 + int recvBufSize = 0;
  132 +
  133 + while (!m_bRtpExit)
  134 + {
  135 + LOG_INFO("阻塞监听新连接...");
  136 + // 阻塞接收请求 start
  137 +
  138 + int clientFd = accept(listenfd, (struct sockaddr*)NULL, NULL);
  139 + if (clientFd < 0) {
  140 + LOG_ERROR("accept connection error");
  141 + continue;
  142 + }
  143 + // 阻塞接收请求 end
  144 + LOG_INFO("发现新连接:clientFd=%d", clientFd);
  145 +
  146 + while (!m_bRtpExit) {
  147 + recvBufSize = recv(clientFd, recvBuf, sizeof(recvBuf), 0);
  148 + if (recvBufSize <= 0) {
  149 + LOG_ERROR("::recv error: clientFd={},recvBufSize={}", clientFd, recvBufSize);
  150 + break;
  151 + }
  152 +
  153 + parseTcpData(recvBuf, recvBufSize);
  154 + }
  155 +
  156 + close(clientFd);
  157 + LOG_INFO("关闭连接 clientFd={}", clientFd);
  158 +
  159 + }
  160 +
  161 + close(listenfd);
  162 + return 0;
  163 +}
  164 +
  165 +void RTPReceiver2::parseTcpData(char* recvBuf, int recvBufSize) {
  166 +
  167 + if ((mRecvCacheSize + recvBufSize) > Server_cache_max_size) {
  168 + LOG_ERROR("超过缓冲容量上限,忽略本次读取的数据。mRecvCacheSize=%d,recvBufSize=%d",mRecvCacheSize, recvBufSize);
  169 +
  170 + }
  171 + else {
  172 + memcpy(mRecvCache + mRecvCacheSize, recvBuf, recvBufSize);
  173 + mRecvCacheSize += recvBufSize;
  174 + }
  175 + //LOGI("cacheSize=%d,开始进入解析 ... ...", cacheSize);
  176 +
  177 + while (true) {
  178 +
  179 + if (mRecvCacheSize > 2) {
  180 +
  181 + bool success = false;
  182 +
  183 + if (mRecvCacheSize > 2) {
  184 + mRecvRtpBufferSize = ntohs(*(int16_t*)(mRecvCache));
  185 + if ((mRecvCacheSize - 2) >= mRecvRtpBufferSize) {
  186 + success = true;
  187 + }
  188 + }
  189 +
  190 + if (success) {
  191 + mRecvCacheSize -= 2;
  192 + mRecvCacheSize -= mRecvRtpBufferSize;
  193 +
  194 + // 提取RTP
  195 + memcpy(mRecvRtpBuffer, mRecvCache + 2, mRecvRtpBufferSize);
  196 + memmove(mRecvCache, mRecvCache + 2 + mRecvRtpBufferSize, mRecvCacheSize);
  197 +
  198 + // RTP
  199 + RtpHeader rtpHeader;
  200 + parseRtpHeader(mRecvRtpBuffer, &rtpHeader);
  201 + printf("get a rtp seq=%d,RtpBufferSize=%d,mRecvCacheSize=%d,marker=%d,timestamp=%d\n",
  202 + rtpHeader.seq,
  203 + mRecvRtpBufferSize,
  204 + mRecvCacheSize,rtpHeader.marker, rtpHeader.timestamp);
  205 +
  206 +
  207 + // 将从mRecvCache提取出来的rtp字节流 mRecvRtpBuffer去掉RTP_HEADER_SIZE,存储到播放器缓存中
  208 + if ((mPlayer->bufferSize + mRecvRtpBufferSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) {
  209 + memcpy(mPlayer->buffer + mPlayer->bufferSize, mRecvRtpBuffer + RTP_HEADER_SIZE, mRecvRtpBufferSize - RTP_HEADER_SIZE);
  210 + mPlayer->bufferSize += mRecvRtpBufferSize - RTP_HEADER_SIZE;
  211 + }
  212 + else {
  213 + LOG_ERROR("recvBufSize = %d over MAX_RTP_BUFFER_SIZE ", recvBufSize);
  214 + }
  215 +
  216 + }
  217 + else {
  218 + //LOGI("跳出解析:cacheSize=%d,pktSize=%d", cacheSize, pktSize);
  219 + break;
  220 + }
  221 + }
  222 + else {
  223 + //LOGI("跳出解析:缓冲数据未发现完整数据包");
  224 + break;
  225 + }
  226 + }
  227 +}
  228 +
  229 +int RTPReceiver2::allocRtpPort() {
  230 +
  231 + WebsocketClient* pServer = WebsocketClient::getInstance();
  232 + int MIN_RTP_PORT = pServer->GetMinRtpPort() ;
  233 + int MAX_RTP_PORT = pServer->GetMaxRtpPort();
  234 +
  235 + int s_rtpPort = MIN_RTP_PORT;
  236 +
  237 + srand((unsigned int)time(NULL));
  238 + s_rtpPort = MIN_RTP_PORT + (rand() % MIN_RTP_PORT);
  239 +
  240 + if (s_rtpPort % 2)
  241 + ++s_rtpPort;
  242 +
  243 + int count = 0;
  244 +
  245 + while (true)
  246 + {
  247 + if (s_rtpPort >= MAX_RTP_PORT) {
  248 + s_rtpPort = MIN_RTP_PORT;
  249 + count ++;
  250 + if (count > 1) {
  251 + LOG_ERROR("[{}] - 范围内没有可用的port", m_SipChannelId);
  252 + }
  253 + }
  254 +
  255 + int i = 0;
  256 + for (; i < 2; i++) {
  257 + sockaddr_in sRecvAddr;
  258 + int s = socket(AF_INET, SOCK_DGRAM, 0);
  259 +
  260 + sRecvAddr.sin_family = AF_INET;
  261 + sRecvAddr.sin_addr.s_addr = htonl(INADDR_ANY);
  262 + sRecvAddr.sin_port = htons(s_rtpPort + i);
  263 +
  264 + int nResult = bind(s, (sockaddr *)&sRecvAddr, sizeof(sRecvAddr));
  265 + if (nResult != 0) {
  266 + break;
  267 + }
  268 +
  269 + nResult = close(s);
  270 + if (nResult != 0) {
  271 + LOG_ERROR("[{}] - closesocket failed : {}", m_SipChannelId, nResult);
  272 + break;
  273 + }
  274 + }
  275 +
  276 + if (i == 2)
  277 + break;
  278 +
  279 + s_rtpPort += 2;
  280 + }
  281 +
  282 + return s_rtpPort;
  283 +}
  284 +
  285 +void RTPReceiver2::RequestStreamFailed() {
  286 + m_bRtpExit = true;
  287 +}
0 288 \ No newline at end of file
... ...
src/decoder/gb28181/rtp/RTPReceiver2.h 0 → 100644
  1 +#ifndef _RTP_RECEIVER_H_
  2 +#define _RTP_RECEIVER_H_
  3 +
  4 +#include <stdint.h>
  5 +#include <atomic>
  6 +#include <thread>
  7 +
  8 +using namespace std;
  9 +
  10 +
  11 +class RTPReceiver2{
  12 +
  13 +public:
  14 + RTPReceiver2();
  15 + virtual ~RTPReceiver2();
  16 +
  17 + int init(const char* ip, uint16_t port, bool isUdp);
  18 +
  19 + void RequestStreamFailed();
  20 +
  21 + int allocRtpPort();
  22 +
  23 +private:
  24 + int startUdpServer(const char* ip, uint16_t port);
  25 + int startTcpServer(const char* ip, uint16_t port);
  26 +
  27 + void parseTcpData(char* recvBuf, int recvBufSize);
  28 +
  29 +public:
  30 + uint8_t* mRecvCache {nullptr};
  31 + uint64_t mRecvCacheSize {0};
  32 +
  33 + uint8_t* mRecvRtpBuffer {nullptr}; // 从mRecvCache提取出来的rtp字节流
  34 + int16_t mRecvRtpBufferSize {0};// 从mRecvCache提取出来的rtp字节流总长度 (rtpHeader+rtpBody)
  35 +
  36 + bool m_bRtpExit {false};
  37 +};
  38 +
  39 +#endif // _RTP_RECEIVER_H_
0 40 \ No newline at end of file
... ...
src/decoder/gb28181/rtp/RTPTcpReceiver.cpp
... ... @@ -110,8 +110,8 @@ RTPTcpReceiver::RTPTcpReceiver()
110 110 }
111 111  
112 112 RTPTcpReceiver::~RTPTcpReceiver(){
113   - if (IsOpened())
114   - Close();
  113 +
  114 + Close();
115 115  
116 116 if(m_rtpSessionPtr != nullptr){
117 117 delete m_rtpSessionPtr;
... ... @@ -149,7 +149,7 @@ bool RTPTcpReceiver::Open(string channel_id){
149 149 }
150 150  
151 151 bool RTPTcpReceiver::IsOpened(){
152   - LOG_INFO("[{}] isopng:{} ", m_SipChannelId, m_bOpened);
  152 + LOG_INFO("[{}] isopen:{} ", m_SipChannelId, m_bOpened);
153 153 return m_bOpened;
154 154 }
155 155  
... ... @@ -176,8 +176,7 @@ void RTPTcpReceiver::close_task(){
176 176 LOG_DEBUG("[{}] 1.", m_SipChannelId);
177 177  
178 178 // rtp接收线程退出
179   - if (m_rtpThread.joinable())
180   - {
  179 + if (m_rtpThread.joinable()) {
181 180 m_rtpThread.join();
182 181 }
183 182  
... ... @@ -239,7 +238,9 @@ int RTPTcpReceiver::initSession(int localPort){
239 238 m_rtpThread = std::thread(rtp_revc_thread_, this);
240 239 m_listenFinishThread = std::thread(listen_finish_thread_, this);
241 240  
242   - InitPS();
  241 + if (InitPS() != 0) {
  242 + return false;
  243 + }
243 244  
244 245 bool bRet = RequestStream();
245 246 if (!bRet)
... ... @@ -253,6 +254,7 @@ int RTPTcpReceiver::initSession(int localPort){
253 254 return 0;
254 255 }
255 256  
  257 +// 图灵版本的请求
256 258 int RTPTcpReceiver::OnRtpRecv()
257 259 {
258 260 if(nullptr == m_rtpSessionPtr){
... ... @@ -360,6 +362,114 @@ end_flag:
360 362 return 0;
361 363 }
362 364  
  365 +
  366 +int RTPTcpReceiver::OnRtpRecv2()
  367 +{
  368 + if(nullptr == m_rtpSessionPtr){
  369 + return -1;
  370 + }
  371 +
  372 + LOG_INFO("[{}] OnRtpRecv started, m_nListener : {}", m_SipChannelId, m_nListener);
  373 +
  374 + sockaddr_in clientAddr;
  375 + int nLen = sizeof(sockaddr_in);
  376 + SocketType nServer = -1;
  377 +
  378 + LOG_INFO("[{}] Poll started.", m_SipChannelId);
  379 + int reconn_times = 0;
  380 + int reaccept_times = 0;
  381 + bool bReconn = false;
  382 + while(!m_bRtpExit){
  383 + while(!m_bAccepted){
  384 + if(m_bRtpExit){
  385 + goto end_flag;
  386 + }
  387 +
  388 + while (!bReconn){
  389 + if(m_bRtpExit){
  390 + goto end_flag;
  391 + }
  392 +
  393 + reconn_times++;
  394 + if(reconn_times > 10){
  395 + // 10次请求都失败,结束任务
  396 + m_bRtpExit = true;
  397 + goto end_flag;
  398 + }
  399 + LOG_DEBUG("[{}] RequestStream...", m_SipChannelId);
  400 + bReconn = RequestStream();
  401 + if (bReconn){
  402 + LOG_DEBUG("[{}] RequestStream, True", m_SipChannelId);
  403 + continue;
  404 + }
  405 + LOG_DEBUG("[{}] RequestStream, False", m_SipChannelId);
  406 +
  407 + std::this_thread::sleep_for(std::chrono::seconds(5));
  408 + }
  409 +
  410 + LOG_DEBUG("[{}] accepting...", m_SipChannelId);
  411 + nServer = accept(m_nListener, (sockaddr*)&clientAddr, (socklen_t * ) &nLen);
  412 + if (-1 == nServer){
  413 + reaccept_times++;
  414 + LOG_DEBUG("[{}] reaccept_times = {}", m_SipChannelId, reaccept_times);
  415 + if(reaccept_times > 600){
  416 + LOG_DEBUG("[{}] reaccept_times > 600", m_SipChannelId);
  417 + bReconn = false;
  418 + reaccept_times = 0;
  419 + }
  420 + std::this_thread::sleep_for(std::chrono::milliseconds(100));
  421 + continue;
  422 + }
  423 + LOG_DEBUG("[{}] accept success", m_SipChannelId);
  424 + m_rtpSessionPtr->AddDestination(RTPTCPAddress(nServer));
  425 + m_bAccepted = true;
  426 + bReconn = false;
  427 + reconn_times = 0;
  428 + reaccept_times = 0;
  429 +
  430 + LOG_INFO("[{}] nServer={}", m_SipChannelId, nServer);
  431 + break;
  432 + }
  433 +
  434 + m_rtpSessionPtr->BeginDataAccess();
  435 + if (m_rtpSessionPtr->GotoFirstSourceWithData())
  436 + {
  437 + do
  438 + {
  439 + RTPPacket *pack;
  440 +
  441 + while ((pack = m_rtpSessionPtr->GetNextPacket()) != NULL)
  442 + {
  443 + // LOG_DEBUG("[{}] time: {} ", m_SipChannelId, UtilTools::get_cur_time_ms());
  444 + ParsePacket(pack);
  445 +
  446 + m_rtpSessionPtr->DeletePacket(pack);
  447 + }
  448 + } while (m_rtpSessionPtr->GotoNextSourceWithData());
  449 + }
  450 +
  451 + m_rtpSessionPtr->EndDataAccess();
  452 +
  453 + m_rtpSessionPtr->Poll();
  454 + std::this_thread::sleep_for(std::chrono::milliseconds(10));
  455 + }
  456 +
  457 +end_flag:
  458 +
  459 + m_rtpSessionPtr->Destroy();
  460 +
  461 + if(nServer > 0){
  462 + close(nServer);
  463 + }
  464 + if(m_nListener > 0){
  465 + close(m_nListener);
  466 + }
  467 +
  468 + LOG_INFO("[{}] OnRtpRecv exited.", m_SipChannelId);
  469 +
  470 + return 0;
  471 +}
  472 +
363 473 int RTPTcpReceiver::ListenFinish(){
364 474 while(!m_bRtpExit){
365 475 std::this_thread::sleep_for(std::chrono::seconds(3));
... ...
src/decoder/gb28181/rtp/RTPTcpReceiver.h
... ... @@ -47,6 +47,7 @@ public:
47 47  
48 48 public:
49 49 int OnRtpRecv();
  50 + int OnRtpRecv2();
50 51 bool ReConnect();
51 52 int ListenFinish();
52 53 bool isClosing();
... ... @@ -72,6 +73,7 @@ private:
72 73  
73 74 std::thread m_listenFinishThread; // RTP接收线程
74 75  
  76 + std::atomic_bool m_bNoData{false};
75 77 };
76 78  
77 79 #endif // _RTP_TCP_RECEIVER_H_
... ...
src/decoder/gb28181/rtp/Rtp.cpp 0 → 100644
  1 +//
  2 +// Created by bxc on 2023/4/18.
  3 +// 作者:北小菜
  4 +// 邮箱:bilibili_bxc@126.com
  5 +// 西瓜视频主页:https://www.ixigua.com/home/4171970536803763
  6 +// 哔哩哔哩主页:https://space.bilibili.com/487906612/
  7 +//
  8 +
  9 +#include "Rtp.h"
  10 +#include <stdio.h>
  11 +#include <string.h>
  12 +
  13 +void rtpHeaderInit(struct RtpPacket* rtpPacket, uint8_t csrcLen, uint8_t extension,
  14 + uint8_t padding, uint8_t version, uint8_t payloadType, uint8_t marker,
  15 + uint16_t seq, uint32_t timestamp, uint32_t ssrc){
  16 + rtpPacket->rtpHeader.csrcLen = csrcLen;
  17 + rtpPacket->rtpHeader.extension = extension;
  18 + rtpPacket->rtpHeader.padding = padding;
  19 + rtpPacket->rtpHeader.version = version;
  20 + rtpPacket->rtpHeader.payloadType = payloadType;
  21 + rtpPacket->rtpHeader.marker = marker;
  22 + rtpPacket->rtpHeader.seq = seq;
  23 + rtpPacket->rtpHeader.timestamp = timestamp;
  24 + rtpPacket->rtpHeader.ssrc = ssrc;
  25 +}
  26 +int parseRtpHeader(uint8_t* headerBuf, struct RtpHeader* rtpHeader){
  27 + memset(rtpHeader,0,sizeof(*rtpHeader));
  28 + /* byte 0 */
  29 + rtpHeader->version = (headerBuf[0] & 0xC0) >> 6;
  30 + rtpHeader->padding = (headerBuf[0] & 0x20) >> 5;
  31 + rtpHeader->extension = (headerBuf[0] & 0x10) >> 4;
  32 + rtpHeader->csrcLen = (headerBuf[0] & 0x0F);
  33 + /* byte 1 */
  34 + rtpHeader->marker = (headerBuf[1] & 0x80) >> 7;
  35 + rtpHeader->payloadType = (headerBuf[1] & 0x7F);
  36 + /* bytes 2,3 */
  37 + rtpHeader->seq = ((headerBuf[2] & 0xFF) << 8) | (headerBuf[3] & 0xFF);
  38 + /* bytes 4-7 */
  39 + rtpHeader->timestamp = ((headerBuf[4] & 0xFF) << 24) | ((headerBuf[5] & 0xFF) << 16)
  40 + | ((headerBuf[6] & 0xFF) << 8)
  41 + | ((headerBuf[7] & 0xFF));
  42 + /* bytes 8-11 */
  43 + rtpHeader->ssrc = ((headerBuf[8] & 0xFF) << 24) | ((headerBuf[9] & 0xFF) << 16)
  44 + | ((headerBuf[10] & 0xFF) << 8)
  45 + | ((headerBuf[11] & 0xFF));
  46 +
  47 + return 0;
  48 +}
  49 +
... ...
src/decoder/gb28181/rtp/Rtp.h 0 → 100644
  1 +//
  2 +// Created by bxc on 2023/4/18.
  3 +// 作者:北小菜
  4 +// 邮箱:bilibili_bxc@126.com
  5 +// 西瓜视频主页:https://www.ixigua.com/home/4171970536803763
  6 +// 哔哩哔哩主页:https://space.bilibili.com/487906612/
  7 +//
  8 +
  9 +#ifndef GB28181PLAYER_RTP_H
  10 +#define GB28181PLAYER_RTP_H
  11 +
  12 +#include <stdint.h>
  13 +
  14 +#define RTP_VESION 2
  15 +#define RTP_PAYLOAD_TYPE_H264 96
  16 +#define RTP_PAYLOAD_TYPE_AAC 97
  17 +
  18 +#define RTP_HEADER_SIZE 12
  19 +#define RTP_MAX_SIZE 1400
  20 +
  21 +/*
  22 + *
  23 + * 0 1 2 3
  24 + * 7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0
  25 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  26 + * |V=2|P|X| CC |M| PT | sequence number |
  27 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  28 + * | timestamp |
  29 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  30 + * | synchronization source (SSRC) identifier |
  31 + * +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
  32 + * | contributing source (CSRC) identifiers |
  33 + * : .... :
  34 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  35 + *
  36 + */
  37 +struct RtpHeader{
  38 + /* byte 0 */
  39 + uint8_t csrcLen:4;
  40 + uint8_t extension:1;
  41 + uint8_t padding:1;
  42 + uint8_t version:2; // 最高2位
  43 +
  44 + /* byte 1 */
  45 + uint8_t payloadType:7;
  46 + uint8_t marker:1;
  47 +
  48 + /* bytes 2,3 */
  49 + uint16_t seq;
  50 +
  51 + /* bytes 4-7 */
  52 + uint32_t timestamp;
  53 +
  54 + /* bytes 8-11 */
  55 + uint32_t ssrc;
  56 +};
  57 +struct RtpPacket{
  58 + struct RtpHeader rtpHeader;
  59 + uint8_t payload[0];
  60 +};
  61 +
  62 +void rtpHeaderInit(struct RtpPacket* rtpPacket, uint8_t csrcLen, uint8_t extension,
  63 + uint8_t padding, uint8_t version, uint8_t payloadType, uint8_t marker,
  64 + uint16_t seq, uint32_t timestamp, uint32_t ssrc);
  65 +
  66 +int parseRtpHeader(uint8_t* headerBuf, struct RtpHeader* rtpHeader);
  67 +
  68 +#endif //GB28181PLAYER_RTP_H
  69 +
... ...
src/demo/demo.cpp
... ... @@ -917,7 +917,7 @@ string createTask(void *handle, std::vector&lt;algorithm_type_t&gt; algor_vec, int gi,
917 917 tparam.ipc_url = "rtsp://admin:admin@123456@192.168.60.176:554/cam/realmonitor?channel=1&subtype=0";
918 918 break;
919 919 case 1:
920   - tparam.ipc_url = "rtsp://122.97.218.170:8604/openUrl/V5nXRHa?params=eyJwcm90b2NhbCI6InJ0c3AiLCJjbGllbnRUeXBlIjoib3Blbl9hcGkiLCJleHByaWVUaW1lIjotMSwicHJvdG9jb2wiOiJydHNwIiwiZXhwaXJlVGltZSI6MzAwLCJlbmFibGVNR0MiOnRydWUsImV4cGFuZCI6InN0YW5kYXJkPXJ0c3Amc3RyZWFtZm9ybT1ydHAiLCJhIjoiMTBjZjM4N2JjY2Y5NDg3YzhjNWYzNjE2M2ViMWUyNTJ8MXwwfDEiLCJ0IjoxfQ==";
  920 + tparam.ipc_url = "rtsp://admin:ad123456@192.168.60.165:554/cam/realmonitor?channel=1&subtype=0";
921 921 break;
922 922 case 2:
923 923 tparam.ipc_url = "rtsp://admin:ad123456@192.168.10.166:554/cam/realmonitor?channel=1&subtype=0";
... ... @@ -1001,6 +1001,12 @@ string createTask(void *handle, std::vector&lt;algorithm_type_t&gt; algor_vec, int gi,
1001 1001 case 27:
1002 1002 tparam.ipc_url = "/data/share/data/Street_4k_265.mp4";
1003 1003 break;
  1004 + case 28:
  1005 + tparam.ipc_url = "http://111.200.42.56:8889/gajlqt.mp4";
  1006 + break;
  1007 + case 29:
  1008 + tparam.ipc_url = "http://192.168.60.179:10016/110.mp4";
  1009 + break;
1004 1010 default:
1005 1011 tparam.ipc_url = "/opt/share/data/Street.uvf";
1006 1012 break;
... ... @@ -1071,7 +1077,7 @@ string createTask_dvpp28181(void *handle, std::vector&lt;algorithm_type_t&gt; algor_ve
1071 1077 tparam.ipc_url = "34020000001310004065";
1072 1078 break;
1073 1079 case 1:
1074   - tparam.ipc_url = "34020000001310000001";
  1080 + tparam.ipc_url = "34020000001320000109";
1075 1081 break;
1076 1082 case 2:
1077 1083 tparam.ipc_url = "34020000001320000166";
... ... @@ -1147,6 +1153,90 @@ string createTask_dvpp28181(void *handle, std::vector&lt;algorithm_type_t&gt; algor_ve
1147 1153 return task_id_str;
1148 1154 }
1149 1155  
  1156 +string createTask_dvpp28181_tcp(void *handle, std::vector<algorithm_type_t> algor_vec, int gi, bool bFlag = true){
  1157 + task_param tparam;
  1158 +
  1159 + switch(gi){
  1160 + case 8:
  1161 + tparam.ipc_url = "34020000001310004065";
  1162 + break;
  1163 + case 9:
  1164 + tparam.ipc_url = "34020000001320000109";
  1165 + break;
  1166 + case 10:
  1167 + tparam.ipc_url = "34020000001320000166";
  1168 + break;
  1169 + case 11:
  1170 + tparam.ipc_url = "32120200002160000077";
  1171 + break;
  1172 + case 12:
  1173 + tparam.ipc_url = "34020000001320000207";
  1174 + break;
  1175 + case 13:
  1176 + tparam.ipc_url = "34020000001310000176";
  1177 + break;
  1178 + default:
  1179 + tparam.ipc_url = "34020000001310004065";
  1180 + break;
  1181 + }
  1182 +
  1183 + tparam.algor_counts = algor_vec.size();
  1184 + tparam.dec_type = 3;
  1185 + tparam.protocal = 1;
  1186 +
  1187 + if (bFlag){
  1188 + nTaskId = gi;
  1189 + }
  1190 +
  1191 + std::string task_id_str = "test_task_id_" + std::to_string(nTaskId);
  1192 + tparam.task_id = task_id_str.c_str();
  1193 +
  1194 + nTaskId++;
  1195 +
  1196 + tparam.algor_config_params = new algor_config_param[tparam.algor_counts];
  1197 +
  1198 + for (size_t idx = 0; idx < algor_vec.size(); ++idx)
  1199 + set_task_params(tparam, idx, algor_vec.at(idx));
  1200 +
  1201 + const int result_code = add_task(handle, tparam);
  1202 + if (result_code != 0)
  1203 + printf("[Error]: ");
  1204 + printf("--- task_id: %s result code: %d\n", tparam.task_id, result_code);
  1205 +
  1206 +
  1207 + // 释放参数
  1208 + for (size_t idx = 0; idx < algor_vec.size(); ++idx) {
  1209 + if(tparam.algor_config_params[idx].algor_type == algorithm_type_t::VIDEO_TIMING_SNAPSHOT) {
  1210 + algor_config_param_road_work* algor_param = (algor_config_param_road_work*)tparam.algor_config_params[idx].algor_init_config_param->algor_param;
  1211 + delete algor_param;
  1212 + algor_basic_config_param_t* basic_param = (algor_basic_config_param_t*)tparam.algor_config_params[idx].algor_init_config_param->basic_param;
  1213 + delete basic_param;
  1214 +
  1215 + algor_init_config_param_t* config_param = tparam.algor_config_params[idx].algor_init_config_param;
  1216 + delete config_param;
  1217 + } else if(tparam.algor_config_params[idx].algor_type == algorithm_type_t::VEHICLE_SOLIDLINETURNAROUND) {
  1218 + algor_config_param_manned_incident* algor_param = (algor_config_param_manned_incident*)tparam.algor_config_params[idx].algor_init_config_param->algor_param;
  1219 + delete algor_param;
  1220 + algor_basic_config_param_t* basic_param = (algor_basic_config_param_t*)tparam.algor_config_params[idx].algor_init_config_param->basic_param;
  1221 + delete basic_param;
  1222 +
  1223 + algor_init_config_param_t* config_param = tparam.algor_config_params[idx].algor_init_config_param;
  1224 + delete config_param;
  1225 + } else if(tparam.algor_config_params[idx].algor_type == algorithm_type_t::VEHICLE_SOLIDLINETURNAROUND) {
  1226 + algor_config_param_manned_incident* algor_param = (algor_config_param_manned_incident*)tparam.algor_config_params[idx].algor_init_config_param->algor_param;
  1227 + delete algor_param;
  1228 + algor_basic_config_param_t* basic_param = (algor_basic_config_param_t*)tparam.algor_config_params[idx].algor_init_config_param->basic_param;
  1229 + delete basic_param;
  1230 +
  1231 + algor_init_config_param_t* config_param = tparam.algor_config_params[idx].algor_init_config_param;
  1232 + delete config_param;
  1233 + }
  1234 + }
  1235 + delete[] tparam.algor_config_params;
  1236 +
  1237 + return task_id_str;
  1238 +}
  1239 +
1150 1240 void test_snapshot(void *handle){
1151 1241 task_param tparam;
1152 1242 tparam.ipc_url = "rtsp://admin:ad123456@192.168.60.165:554/cam/realmonitor?channel=1&subtype=0";
... ... @@ -1328,36 +1418,64 @@ init_mq_conn(handle);
1328 1418  
1329 1419  
1330 1420  
1331   -char ch = 'a';
1332   -while (ch != 'q') {
1333   - ch = getchar();
1334   - switch (ch)
  1421 +char ch[10] = "a";
  1422 +while (strcmp(ch,"q") != 0) {
  1423 + fgets(ch,10,stdin);
  1424 + int num = atoi(ch);
  1425 + switch (num)
1335 1426 {
1336   - case '0':
  1427 + case 0:
1337 1428 createTask_dvpp28181(handle, algor_vec, 0, false);
1338 1429 break;
1339   - case '1':
  1430 + case 1:
1340 1431 createTask_dvpp28181(handle, algor_vec, 1, false);
1341 1432 break;
1342   - case '2':
  1433 + case 2:
1343 1434 createTask_dvpp28181(handle, algor_vec, 2, false);
1344 1435 break;
1345   - case '3':
  1436 + case 3:
1346 1437 createTask_dvpp28181(handle, algor_vec, 3, false);
1347 1438 break;
1348   - case '4':
  1439 + case 4:
1349 1440 createTask_dvpp28181(handle, algor_vec, 4, false);
1350 1441 break;
1351   - case '5':
  1442 + case 5:
1352 1443 createTask_dvpp28181(handle, algor_vec, 5, false);
1353 1444 break;
1354   - case '6':
  1445 + case 6:
1355 1446 createTask(handle, algor_vec2, 2, false);
1356 1447 break;
1357   - case '7':
  1448 + case 7:
1358 1449 createTask(handle, algor_vec2, 0, false);
1359 1450 break;
1360   - case 'c':
  1451 + case 8:
  1452 + createTask_dvpp28181_tcp(handle, algor_vec, 8, false);
  1453 + break;
  1454 + case 9:
  1455 + createTask_dvpp28181_tcp(handle, algor_vec, 9, false);
  1456 + break;
  1457 + case 10:
  1458 + createTask_dvpp28181_tcp(handle, algor_vec, 10, false);
  1459 + break;
  1460 + case 11:
  1461 + createTask_dvpp28181_tcp(handle, algor_vec, 11, false);
  1462 + break;
  1463 + case 12:
  1464 + createTask_dvpp28181_tcp(handle, algor_vec, 12, false);
  1465 + break;
  1466 + case 13:
  1467 + createTask_dvpp28181_tcp(handle, algor_vec, 13, false);
  1468 + break;
  1469 + case 14:
  1470 + createTask(handle, algor_vec2, 28, false);
  1471 + break;
  1472 + case 15:
  1473 + createTask(handle, algor_vec2, 1, false);
  1474 + break;
  1475 + case 16:
  1476 + createTask(handle, algor_vec2, 29, false);
  1477 + break;
  1478 + case 100:
1361 1479 close_all_task(handle);
1362 1480 break;
1363 1481 default:
... ...