From 150d457dc81cd2a68789cc8492968227ddeeb49c Mon Sep 17 00:00:00 2001 From: Hu Chunming <2657262686@qq.com> Date: Mon, 22 Jul 2024 17:11:55 +0800 Subject: [PATCH] 代码暂存,未完成 --- .vscode/settings.json | 4 +++- src/decoder/gb28181/rtp/FFRtpParser.cpp | 127 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/decoder/gb28181/rtp/FFRtpParser.h | 46 ++++++++++++++++++++++++++++++++++++++++++++++ src/decoder/gb28181/rtp/RTPReceiver.cpp | 2 +- src/decoder/gb28181/rtp/RTPReceiver2.cpp | 287 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/decoder/gb28181/rtp/RTPReceiver2.h | 39 +++++++++++++++++++++++++++++++++++++++ src/decoder/gb28181/rtp/RTPTcpReceiver.cpp | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------ src/decoder/gb28181/rtp/RTPTcpReceiver.h | 2 ++ src/decoder/gb28181/rtp/Rtp.cpp | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ src/decoder/gb28181/rtp/Rtp.h | 69 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/demo/demo.cpp | 148 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------- 11 files changed, 872 insertions(+), 23 deletions(-) create mode 100644 src/decoder/gb28181/rtp/FFRtpParser.cpp create mode 100644 src/decoder/gb28181/rtp/FFRtpParser.h create mode 100644 src/decoder/gb28181/rtp/RTPReceiver2.cpp create mode 100644 src/decoder/gb28181/rtp/RTPReceiver2.h create mode 100644 src/decoder/gb28181/rtp/Rtp.cpp create mode 100644 src/decoder/gb28181/rtp/Rtp.h diff --git a/.vscode/settings.json b/.vscode/settings.json index 8cd21ac..e741e76 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -79,7 +79,9 @@ "forward_list": "cpp", "source_location": "cpp", "slist": "cpp", - "valarray": "cpp" + "valarray": "cpp", + "hash_map": "cpp", + "hash_set": "cpp" }, "C_Cpp_Runner.cCompilerPath": "gcc", "C_Cpp_Runner.cppCompilerPath": "g++", diff --git a/src/decoder/gb28181/rtp/FFRtpParser.cpp b/src/decoder/gb28181/rtp/FFRtpParser.cpp new file mode 100644 index 0000000..694e511 --- /dev/null +++ b/src/decoder/gb28181/rtp/FFRtpParser.cpp @@ -0,0 +1,127 @@ +// +// Created by bxc on 2023/4/18. +// 作者:北小菜 +// 邮箱:bilibili_bxc@126.com +// 西瓜视频主页:https://www.ixigua.com/home/4171970536803763 +// 哔哩哔哩主页:https://space.bilibili.com/487906612/ +// + +#include "FFRtpParser.h" +#include "Utils.h" +#include + +int avio_read_packet(void* opaque, uint8_t* buf, int buffsize){ + FFRtpParser* player = (FFRtpParser*)opaque; + + int ret = 0; + if (player->bufferSize >= buffsize) + { + memcpy(buf, player->buffer, buffsize); + player->bufferSize = player->bufferSize - buffsize; + memmove(player->buffer, player->buffer + buffsize, player->bufferSize); + ret = buffsize; + + LOG_INFO("avio_read_packet=%d", buffsize); + } + return ret; +} + +FFRtpParser::FFRtpParser() +{ +} + +FFRtpParser::~FFRtpParser() +{ + if (mVideoCodecPar) { + avcodec_parameters_free(&mVideoCodecPar); + } + if (mVideoCodecCtx) { + avcodec_close(mVideoCodecCtx); + mVideoCodecCtx = nullptr; + } + + if (mFmtCtx) { + avformat_close_input(&mFmtCtx); + mFmtCtx = nullptr; + } +} + +bool FFRtpParser::probe() +{ + mFmtCtx = avformat_alloc_context(); + + unsigned char* avioBuff = (unsigned char*)av_malloc(1920 * 1080); + mAvioCtx = avio_alloc_context(avioBuff, sizeof(avioBuff), 0, this, avio_read_packet, NULL, NULL); + //探测流(获取码流格式) + if (av_probe_input_buffer2(mAvioCtx, (const AVInputFormat**)&mInputFmt, "", NULL, 0, 0) < 0){ + LOG_ERROR("av_probe_input_buffer2 error"); + return false; + } + mFmtCtx->pb = mAvioCtx; + + //配置流参数 + //av_dict_set(&options, "fflags", "nobuffer", 0); //不缓存直接解码 + + //打开流 + if (avformat_open_input(&mFmtCtx, "", mInputFmt, &net_options) != 0) + { + LOG_ERROR("avformat_open_input error"); + return false; + } + //获取流信息 + if (avformat_find_stream_info(mFmtCtx, NULL) < 0)//? + { + LOG_ERROR("avformat_find_stream_info error"); + return false; + } + //获取视频流 + mVideoStream = av_find_best_stream(mFmtCtx, AVMEDIA_TYPE_VIDEO, -1, -1, NULL, 0); + if (mVideoStream < 0) + { + LOG_ERROR("av_find_best_stream error"); + return false; + } + //获取解码信息 + mVideoCodecPar = mFmtCtx->streams[mVideoStream]->codecpar; + const AVCodec* videoCodec = avcodec_find_decoder(mVideoCodecPar->codec_id); + if (!videoCodec){ + LOG_ERROR("avcodec_find_decoder error"); + return false; + } + mVideoCodecCtx = avcodec_alloc_context3(videoCodec); + + //codecpar为解码器上下文赋值 + if (avcodec_parameters_to_context(mVideoCodecCtx, mVideoCodecPar) != 0) + { + LOG_ERROR("avcodec_parameters_to_context error"); + return false; + } + + //设置解码器参数 + //av_dict_set(&codec_options, "tune", "zero-latency", 0);//设置零延迟 + //av_dict_set(&codec_options, "preset", "ultrafast", 0);//设置最模糊但是最快的解码方式 + //av_dict_set(&codec_options, "x265-params", "qp=20", 0);//设置265量化参数 + //量化参数:控制了视频帧中每一个宏区块(Macroblock)的压缩量。较大的数值,量化值更高,意味着更多的压缩,更低的质量,较小的数值代表相反的含义。 + + //打开解码器 + if (avcodec_open2(mVideoCodecCtx, videoCodec, &codec_options) < 0) + { + LOG_ERROR("avcodec_open2 error"); + return false; + } + LOG_INFO("mVideoCodecCtx->width=%d,mVideoCodecCtx->height=%d", mVideoCodecCtx->width, mVideoCodecCtx->height); + return true; +} + +void FFRtpParser::play(){ + LOG_INFO("start"); + + AVPacket pkt; + while (av_read_frame(mFmtCtx, &pkt) >= 0) { + if (pkt.stream_index == mVideoStream){ + + } + av_packet_unref(&pkt); + } + LOG_INFO("end"); +} \ No newline at end of file diff --git a/src/decoder/gb28181/rtp/FFRtpParser.h b/src/decoder/gb28181/rtp/FFRtpParser.h new file mode 100644 index 0000000..afa27b9 --- /dev/null +++ b/src/decoder/gb28181/rtp/FFRtpParser.h @@ -0,0 +1,46 @@ +// +// Created by bxc on 2023/4/18. +// 作者:北小菜 +// 邮箱:bilibili_bxc@126.com +// 西瓜视频主页:https://www.ixigua.com/home/4171970536803763 +// 哔哩哔哩主页:https://space.bilibili.com/487906612/ +// + +#ifndef GB28181_RTP_FFRTPPARSER_H +#define GB28181_RTP_FFRTPPARSER_H + +#include + +extern "C" +{ + #include + #include + #include +} + +#define RtpParser_buffer_max_size 4194304 // 4M = 4 * 1024 * 1024 = 4194304 字节 + +class FFRtpParser +{ +public: + FFRtpParser(); + ~FFRtpParser(); +public: + bool probe();//阻塞式探测国标流并获取解码参数 + void play();//在探测国标流成功以后,解码并渲染国标视频流 +public: + std::atomic buffer[RtpParser_buffer_max_size]; + std::atomic_int bufferSize {0}; +private: + AVFormatContext * mFmtCtx; + AVIOContext * mAvioCtx; + const AVInputFormat* mInputFmt; + int mVideoStream = -1; + AVCodecParameters * mVideoCodecPar; + AVCodecContext * mVideoCodecCtx; + + AVDictionary* net_options;//网络连接参数 + AVDictionary* codec_options;//编码参数 + +}; +#endif //GB28181_RTP_FFRTPPARSER_H \ No newline at end of file diff --git a/src/decoder/gb28181/rtp/RTPReceiver.cpp b/src/decoder/gb28181/rtp/RTPReceiver.cpp index 9f023af..ea00b6d 100644 --- a/src/decoder/gb28181/rtp/RTPReceiver.cpp +++ b/src/decoder/gb28181/rtp/RTPReceiver.cpp @@ -93,7 +93,7 @@ void RTPReceiver::ClosePsThread(){ LOG_INFO("[{}] 3.", m_SipChannelId); m_bPsExit = true; // PS解包线程退出 - if (m_psThreadPtr->joinable()) + if (m_psThreadPtr && m_psThreadPtr->joinable()) { m_psThreadPtr->join(); delete m_psThreadPtr; diff --git a/src/decoder/gb28181/rtp/RTPReceiver2.cpp b/src/decoder/gb28181/rtp/RTPReceiver2.cpp new file mode 100644 index 0000000..dbe9a87 --- /dev/null +++ b/src/decoder/gb28181/rtp/RTPReceiver2.cpp @@ -0,0 +1,287 @@ +#include "RTPReceiver2.h" +#include "rtppacket.h" +#include + +#include "../common_header.h" +#include "../websocket/WebsocketClient.h" + +#ifdef __linux__ +#include "arpa/inet.h" +#endif + +#include "Rtp.h" + +const int MAX_RTP_BUFFER_SIZE = 1024*1024*10; + +#define Server_cache_max_size 4194304 // 1M = 1 * 1024 * 1024 = 1048576 字节 +#define Server_rtp_max_size 1800 + + +RTPReceiver2::RTPReceiver2() +{ + mRecvCache = (uint8_t*)malloc(Server_cache_max_size); + mRecvRtpBuffer = (uint8_t*)malloc(Server_rtp_max_size); +} + +RTPReceiver2::~RTPReceiver2(){ + if (mRecvCache) { + free(mRecvCache); + mRecvCache = nullptr; + } + + if (mRecvRtpBuffer) { + free(mRecvRtpBuffer); + mRecvRtpBuffer = nullptr; + } +} + +int RTPReceiver2::init(const char* ip, uint16_t port, bool isUdp) { + if (!isUdp) { + LOG_INFO("tcp://%s:%d", ip, port); + startTcpServer(ip, port); + } + else { + LOG_INFO("udp://%s:%d", ip, port); + startUdpServer(ip, port); + } +} + +int RTPReceiver2::startUdpServer(const char* ip, uint16_t port) { + + int server_fd, ret; + struct sockaddr_in ser_addr; + + server_fd = socket(AF_INET, SOCK_DGRAM, 0); //AF_INET:IPV4;SOCK_DGRAM:UDP + if(server_fd < 0) + { + printf("create socket fail!\n"); + return -1; + } + + memset(&ser_addr, 0, sizeof(ser_addr)); + ser_addr.sin_family = AF_INET; + ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); //IP地址,需要进行网络序转换,INADDR_ANY:本地地址 + ser_addr.sin_port = htons(port); //端口号,需要网络序转换 + + ret = bind(server_fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr)); + if(ret < 0) + { + printf("socket bind fail!\n"); + return -1; + } + + + char recvBuf[10000]; + int recvBufSize; + + socklen_t len; + struct sockaddr_in clent_addr; //clent_addr用于记录发送方的地址信息 + while(!m_bRtpExit) + { + memset(recvBuf, 0, sizeof(recvBuf)); + len = sizeof(clent_addr); + recvBufSize = recvfrom(server_fd, recvBuf, sizeof(recvBuf), 0, (struct sockaddr*)&clent_addr, &len); //recvfrom是拥塞函数,没有数据就一直拥塞 + if(recvBufSize <= 0) { + printf("recieve data fail!\n"); + break; + } + + if ((mPlayer->bufferSize + recvBufSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) { + memcpy(mPlayer->buffer + mPlayer->bufferSize, recvBuf + RTP_HEADER_SIZE, recvBufSize - RTP_HEADER_SIZE); + mPlayer->bufferSize += recvBufSize - RTP_HEADER_SIZE; + } else { + LOG_ERROR("recvBufSize = {} over GB28181Player_buffer_max_size ", recvBufSize); + } + } + + close(server_fd); + + return 0; +} + +int RTPReceiver2::startTcpServer(const char* ip, uint16_t port) { + + int listenfd, connfd; + struct sockaddr_in servaddr; + char buff[4096]; + int n; + + if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1 ){ + printf("create socket error: %s(errno: %d)\n",strerror(errno),errno); + return 0; + } + + memset(&servaddr, 0, sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = htonl(INADDR_ANY); + servaddr.sin_port = htons(port); + + if( bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){ + printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno); + return 0; + } + + if( listen(listenfd, 10) == -1){ + printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno); + return 0; + } + + + char recvBuf[10000]; + int recvBufSize = 0; + + while (!m_bRtpExit) + { + LOG_INFO("阻塞监听新连接..."); + // 阻塞接收请求 start + + int clientFd = accept(listenfd, (struct sockaddr*)NULL, NULL); + if (clientFd < 0) { + LOG_ERROR("accept connection error"); + continue; + } + // 阻塞接收请求 end + LOG_INFO("发现新连接:clientFd=%d", clientFd); + + while (!m_bRtpExit) { + recvBufSize = recv(clientFd, recvBuf, sizeof(recvBuf), 0); + if (recvBufSize <= 0) { + LOG_ERROR("::recv error: clientFd={},recvBufSize={}", clientFd, recvBufSize); + break; + } + + parseTcpData(recvBuf, recvBufSize); + } + + close(clientFd); + LOG_INFO("关闭连接 clientFd={}", clientFd); + + } + + close(listenfd); + return 0; +} + +void RTPReceiver2::parseTcpData(char* recvBuf, int recvBufSize) { + + if ((mRecvCacheSize + recvBufSize) > Server_cache_max_size) { + LOG_ERROR("超过缓冲容量上限,忽略本次读取的数据。mRecvCacheSize=%d,recvBufSize=%d",mRecvCacheSize, recvBufSize); + + } + else { + memcpy(mRecvCache + mRecvCacheSize, recvBuf, recvBufSize); + mRecvCacheSize += recvBufSize; + } + //LOGI("cacheSize=%d,开始进入解析 ... ...", cacheSize); + + while (true) { + + if (mRecvCacheSize > 2) { + + bool success = false; + + if (mRecvCacheSize > 2) { + mRecvRtpBufferSize = ntohs(*(int16_t*)(mRecvCache)); + if ((mRecvCacheSize - 2) >= mRecvRtpBufferSize) { + success = true; + } + } + + if (success) { + mRecvCacheSize -= 2; + mRecvCacheSize -= mRecvRtpBufferSize; + + // 提取RTP + memcpy(mRecvRtpBuffer, mRecvCache + 2, mRecvRtpBufferSize); + memmove(mRecvCache, mRecvCache + 2 + mRecvRtpBufferSize, mRecvCacheSize); + + // RTP + RtpHeader rtpHeader; + parseRtpHeader(mRecvRtpBuffer, &rtpHeader); + printf("get a rtp seq=%d,RtpBufferSize=%d,mRecvCacheSize=%d,marker=%d,timestamp=%d\n", + rtpHeader.seq, + mRecvRtpBufferSize, + mRecvCacheSize,rtpHeader.marker, rtpHeader.timestamp); + + + // 将从mRecvCache提取出来的rtp字节流 mRecvRtpBuffer去掉RTP_HEADER_SIZE,存储到播放器缓存中 + if ((mPlayer->bufferSize + mRecvRtpBufferSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) { + memcpy(mPlayer->buffer + mPlayer->bufferSize, mRecvRtpBuffer + RTP_HEADER_SIZE, mRecvRtpBufferSize - RTP_HEADER_SIZE); + mPlayer->bufferSize += mRecvRtpBufferSize - RTP_HEADER_SIZE; + } + else { + LOG_ERROR("recvBufSize = %d over MAX_RTP_BUFFER_SIZE ", recvBufSize); + } + + } + else { + //LOGI("跳出解析:cacheSize=%d,pktSize=%d", cacheSize, pktSize); + break; + } + } + else { + //LOGI("跳出解析:缓冲数据未发现完整数据包"); + break; + } + } +} + +int RTPReceiver2::allocRtpPort() { + + WebsocketClient* pServer = WebsocketClient::getInstance(); + int MIN_RTP_PORT = pServer->GetMinRtpPort() ; + int MAX_RTP_PORT = pServer->GetMaxRtpPort(); + + int s_rtpPort = MIN_RTP_PORT; + + srand((unsigned int)time(NULL)); + s_rtpPort = MIN_RTP_PORT + (rand() % MIN_RTP_PORT); + + if (s_rtpPort % 2) + ++s_rtpPort; + + int count = 0; + + while (true) + { + if (s_rtpPort >= MAX_RTP_PORT) { + s_rtpPort = MIN_RTP_PORT; + count ++; + if (count > 1) { + LOG_ERROR("[{}] - 范围内没有可用的port", m_SipChannelId); + } + } + + int i = 0; + for (; i < 2; i++) { + sockaddr_in sRecvAddr; + int s = socket(AF_INET, SOCK_DGRAM, 0); + + sRecvAddr.sin_family = AF_INET; + sRecvAddr.sin_addr.s_addr = htonl(INADDR_ANY); + sRecvAddr.sin_port = htons(s_rtpPort + i); + + int nResult = bind(s, (sockaddr *)&sRecvAddr, sizeof(sRecvAddr)); + if (nResult != 0) { + break; + } + + nResult = close(s); + if (nResult != 0) { + LOG_ERROR("[{}] - closesocket failed : {}", m_SipChannelId, nResult); + break; + } + } + + if (i == 2) + break; + + s_rtpPort += 2; + } + + return s_rtpPort; +} + +void RTPReceiver2::RequestStreamFailed() { + m_bRtpExit = true; +} \ No newline at end of file diff --git a/src/decoder/gb28181/rtp/RTPReceiver2.h b/src/decoder/gb28181/rtp/RTPReceiver2.h new file mode 100644 index 0000000..4d247bf --- /dev/null +++ b/src/decoder/gb28181/rtp/RTPReceiver2.h @@ -0,0 +1,39 @@ +#ifndef _RTP_RECEIVER_H_ +#define _RTP_RECEIVER_H_ + +#include +#include +#include + +using namespace std; + + +class RTPReceiver2{ + +public: + RTPReceiver2(); + virtual ~RTPReceiver2(); + + int init(const char* ip, uint16_t port, bool isUdp); + + void RequestStreamFailed(); + + int allocRtpPort(); + +private: + int startUdpServer(const char* ip, uint16_t port); + int startTcpServer(const char* ip, uint16_t port); + + void parseTcpData(char* recvBuf, int recvBufSize); + +public: + uint8_t* mRecvCache {nullptr}; + uint64_t mRecvCacheSize {0}; + + uint8_t* mRecvRtpBuffer {nullptr}; // 从mRecvCache提取出来的rtp字节流 + int16_t mRecvRtpBufferSize {0};// 从mRecvCache提取出来的rtp字节流总长度 (rtpHeader+rtpBody) + + bool m_bRtpExit {false}; +}; + +#endif // _RTP_RECEIVER_H_ \ No newline at end of file diff --git a/src/decoder/gb28181/rtp/RTPTcpReceiver.cpp b/src/decoder/gb28181/rtp/RTPTcpReceiver.cpp index ae5ba99..4d101ef 100644 --- a/src/decoder/gb28181/rtp/RTPTcpReceiver.cpp +++ b/src/decoder/gb28181/rtp/RTPTcpReceiver.cpp @@ -110,8 +110,8 @@ RTPTcpReceiver::RTPTcpReceiver() } RTPTcpReceiver::~RTPTcpReceiver(){ - if (IsOpened()) - Close(); + + Close(); if(m_rtpSessionPtr != nullptr){ delete m_rtpSessionPtr; @@ -149,7 +149,7 @@ bool RTPTcpReceiver::Open(string channel_id){ } bool RTPTcpReceiver::IsOpened(){ - LOG_INFO("[{}] isopng:{} ", m_SipChannelId, m_bOpened); + LOG_INFO("[{}] isopen:{} ", m_SipChannelId, m_bOpened); return m_bOpened; } @@ -176,8 +176,7 @@ void RTPTcpReceiver::close_task(){ LOG_DEBUG("[{}] 1.", m_SipChannelId); // rtp接收线程退出 - if (m_rtpThread.joinable()) - { + if (m_rtpThread.joinable()) { m_rtpThread.join(); } @@ -239,7 +238,9 @@ int RTPTcpReceiver::initSession(int localPort){ m_rtpThread = std::thread(rtp_revc_thread_, this); m_listenFinishThread = std::thread(listen_finish_thread_, this); - InitPS(); + if (InitPS() != 0) { + return false; + } bool bRet = RequestStream(); if (!bRet) @@ -253,6 +254,7 @@ int RTPTcpReceiver::initSession(int localPort){ return 0; } +// 图灵版本的请求 int RTPTcpReceiver::OnRtpRecv() { if(nullptr == m_rtpSessionPtr){ @@ -360,6 +362,114 @@ end_flag: return 0; } + +int RTPTcpReceiver::OnRtpRecv2() +{ + if(nullptr == m_rtpSessionPtr){ + return -1; + } + + LOG_INFO("[{}] OnRtpRecv started, m_nListener : {}", m_SipChannelId, m_nListener); + + sockaddr_in clientAddr; + int nLen = sizeof(sockaddr_in); + SocketType nServer = -1; + + LOG_INFO("[{}] Poll started.", m_SipChannelId); + int reconn_times = 0; + int reaccept_times = 0; + bool bReconn = false; + while(!m_bRtpExit){ + while(!m_bAccepted){ + if(m_bRtpExit){ + goto end_flag; + } + + while (!bReconn){ + if(m_bRtpExit){ + goto end_flag; + } + + reconn_times++; + if(reconn_times > 10){ + // 10次请求都失败,结束任务 + m_bRtpExit = true; + goto end_flag; + } + LOG_DEBUG("[{}] RequestStream...", m_SipChannelId); + bReconn = RequestStream(); + if (bReconn){ + LOG_DEBUG("[{}] RequestStream, True", m_SipChannelId); + continue; + } + LOG_DEBUG("[{}] RequestStream, False", m_SipChannelId); + + std::this_thread::sleep_for(std::chrono::seconds(5)); + } + + LOG_DEBUG("[{}] accepting...", m_SipChannelId); + nServer = accept(m_nListener, (sockaddr*)&clientAddr, (socklen_t * ) &nLen); + if (-1 == nServer){ + reaccept_times++; + LOG_DEBUG("[{}] reaccept_times = {}", m_SipChannelId, reaccept_times); + if(reaccept_times > 600){ + LOG_DEBUG("[{}] reaccept_times > 600", m_SipChannelId); + bReconn = false; + reaccept_times = 0; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + LOG_DEBUG("[{}] accept success", m_SipChannelId); + m_rtpSessionPtr->AddDestination(RTPTCPAddress(nServer)); + m_bAccepted = true; + bReconn = false; + reconn_times = 0; + reaccept_times = 0; + + LOG_INFO("[{}] nServer={}", m_SipChannelId, nServer); + break; + } + + m_rtpSessionPtr->BeginDataAccess(); + if (m_rtpSessionPtr->GotoFirstSourceWithData()) + { + do + { + RTPPacket *pack; + + while ((pack = m_rtpSessionPtr->GetNextPacket()) != NULL) + { + // LOG_DEBUG("[{}] time: {} ", m_SipChannelId, UtilTools::get_cur_time_ms()); + ParsePacket(pack); + + m_rtpSessionPtr->DeletePacket(pack); + } + } while (m_rtpSessionPtr->GotoNextSourceWithData()); + } + + m_rtpSessionPtr->EndDataAccess(); + + m_rtpSessionPtr->Poll(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + +end_flag: + + m_rtpSessionPtr->Destroy(); + + if(nServer > 0){ + close(nServer); + } + if(m_nListener > 0){ + close(m_nListener); + } + + LOG_INFO("[{}] OnRtpRecv exited.", m_SipChannelId); + + return 0; +} + int RTPTcpReceiver::ListenFinish(){ while(!m_bRtpExit){ std::this_thread::sleep_for(std::chrono::seconds(3)); diff --git a/src/decoder/gb28181/rtp/RTPTcpReceiver.h b/src/decoder/gb28181/rtp/RTPTcpReceiver.h index 1513fd7..8478ea6 100644 --- a/src/decoder/gb28181/rtp/RTPTcpReceiver.h +++ b/src/decoder/gb28181/rtp/RTPTcpReceiver.h @@ -47,6 +47,7 @@ public: public: int OnRtpRecv(); + int OnRtpRecv2(); bool ReConnect(); int ListenFinish(); bool isClosing(); @@ -72,6 +73,7 @@ private: std::thread m_listenFinishThread; // RTP接收线程 + std::atomic_bool m_bNoData{false}; }; #endif // _RTP_TCP_RECEIVER_H_ diff --git a/src/decoder/gb28181/rtp/Rtp.cpp b/src/decoder/gb28181/rtp/Rtp.cpp new file mode 100644 index 0000000..03e2a8a --- /dev/null +++ b/src/decoder/gb28181/rtp/Rtp.cpp @@ -0,0 +1,49 @@ +// +// Created by bxc on 2023/4/18. +// 作者:北小菜 +// 邮箱:bilibili_bxc@126.com +// 西瓜视频主页:https://www.ixigua.com/home/4171970536803763 +// 哔哩哔哩主页:https://space.bilibili.com/487906612/ +// + +#include "Rtp.h" +#include +#include + +void rtpHeaderInit(struct RtpPacket* rtpPacket, uint8_t csrcLen, uint8_t extension, + uint8_t padding, uint8_t version, uint8_t payloadType, uint8_t marker, + uint16_t seq, uint32_t timestamp, uint32_t ssrc){ + rtpPacket->rtpHeader.csrcLen = csrcLen; + rtpPacket->rtpHeader.extension = extension; + rtpPacket->rtpHeader.padding = padding; + rtpPacket->rtpHeader.version = version; + rtpPacket->rtpHeader.payloadType = payloadType; + rtpPacket->rtpHeader.marker = marker; + rtpPacket->rtpHeader.seq = seq; + rtpPacket->rtpHeader.timestamp = timestamp; + rtpPacket->rtpHeader.ssrc = ssrc; +} +int parseRtpHeader(uint8_t* headerBuf, struct RtpHeader* rtpHeader){ + memset(rtpHeader,0,sizeof(*rtpHeader)); + /* byte 0 */ + rtpHeader->version = (headerBuf[0] & 0xC0) >> 6; + rtpHeader->padding = (headerBuf[0] & 0x20) >> 5; + rtpHeader->extension = (headerBuf[0] & 0x10) >> 4; + rtpHeader->csrcLen = (headerBuf[0] & 0x0F); + /* byte 1 */ + rtpHeader->marker = (headerBuf[1] & 0x80) >> 7; + rtpHeader->payloadType = (headerBuf[1] & 0x7F); + /* bytes 2,3 */ + rtpHeader->seq = ((headerBuf[2] & 0xFF) << 8) | (headerBuf[3] & 0xFF); + /* bytes 4-7 */ + rtpHeader->timestamp = ((headerBuf[4] & 0xFF) << 24) | ((headerBuf[5] & 0xFF) << 16) + | ((headerBuf[6] & 0xFF) << 8) + | ((headerBuf[7] & 0xFF)); + /* bytes 8-11 */ + rtpHeader->ssrc = ((headerBuf[8] & 0xFF) << 24) | ((headerBuf[9] & 0xFF) << 16) + | ((headerBuf[10] & 0xFF) << 8) + | ((headerBuf[11] & 0xFF)); + + return 0; +} + diff --git a/src/decoder/gb28181/rtp/Rtp.h b/src/decoder/gb28181/rtp/Rtp.h new file mode 100644 index 0000000..f672ea4 --- /dev/null +++ b/src/decoder/gb28181/rtp/Rtp.h @@ -0,0 +1,69 @@ +// +// Created by bxc on 2023/4/18. +// 作者:北小菜 +// 邮箱:bilibili_bxc@126.com +// 西瓜视频主页:https://www.ixigua.com/home/4171970536803763 +// 哔哩哔哩主页:https://space.bilibili.com/487906612/ +// + +#ifndef GB28181PLAYER_RTP_H +#define GB28181PLAYER_RTP_H + +#include + +#define RTP_VESION 2 +#define RTP_PAYLOAD_TYPE_H264 96 +#define RTP_PAYLOAD_TYPE_AAC 97 + +#define RTP_HEADER_SIZE 12 +#define RTP_MAX_SIZE 1400 + +/* + * + * 0 1 2 3 + * 7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0|7 6 5 4 3 2 1 0 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * |V=2|P|X| CC |M| PT | sequence number | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | timestamp | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | synchronization source (SSRC) identifier | + * +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + * | contributing source (CSRC) identifiers | + * : .... : + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * + */ +struct RtpHeader{ + /* byte 0 */ + uint8_t csrcLen:4; + uint8_t extension:1; + uint8_t padding:1; + uint8_t version:2; // 最高2位 + + /* byte 1 */ + uint8_t payloadType:7; + uint8_t marker:1; + + /* bytes 2,3 */ + uint16_t seq; + + /* bytes 4-7 */ + uint32_t timestamp; + + /* bytes 8-11 */ + uint32_t ssrc; +}; +struct RtpPacket{ + struct RtpHeader rtpHeader; + uint8_t payload[0]; +}; + +void rtpHeaderInit(struct RtpPacket* rtpPacket, uint8_t csrcLen, uint8_t extension, + uint8_t padding, uint8_t version, uint8_t payloadType, uint8_t marker, + uint16_t seq, uint32_t timestamp, uint32_t ssrc); + +int parseRtpHeader(uint8_t* headerBuf, struct RtpHeader* rtpHeader); + +#endif //GB28181PLAYER_RTP_H + diff --git a/src/demo/demo.cpp b/src/demo/demo.cpp index ef09d91..7cb0867 100755 --- a/src/demo/demo.cpp +++ b/src/demo/demo.cpp @@ -917,7 +917,7 @@ string createTask(void *handle, std::vector algor_vec, int gi, tparam.ipc_url = "rtsp://admin:admin@123456@192.168.60.176:554/cam/realmonitor?channel=1&subtype=0"; break; case 1: - tparam.ipc_url = "rtsp://122.97.218.170:8604/openUrl/V5nXRHa?params=eyJwcm90b2NhbCI6InJ0c3AiLCJjbGllbnRUeXBlIjoib3Blbl9hcGkiLCJleHByaWVUaW1lIjotMSwicHJvdG9jb2wiOiJydHNwIiwiZXhwaXJlVGltZSI6MzAwLCJlbmFibGVNR0MiOnRydWUsImV4cGFuZCI6InN0YW5kYXJkPXJ0c3Amc3RyZWFtZm9ybT1ydHAiLCJhIjoiMTBjZjM4N2JjY2Y5NDg3YzhjNWYzNjE2M2ViMWUyNTJ8MXwwfDEiLCJ0IjoxfQ=="; + tparam.ipc_url = "rtsp://admin:ad123456@192.168.60.165:554/cam/realmonitor?channel=1&subtype=0"; break; case 2: tparam.ipc_url = "rtsp://admin:ad123456@192.168.10.166:554/cam/realmonitor?channel=1&subtype=0"; @@ -1001,6 +1001,12 @@ string createTask(void *handle, std::vector algor_vec, int gi, case 27: tparam.ipc_url = "/data/share/data/Street_4k_265.mp4"; break; + case 28: + tparam.ipc_url = "http://111.200.42.56:8889/gajlqt.mp4"; + break; + case 29: + tparam.ipc_url = "http://192.168.60.179:10016/110.mp4"; + break; default: tparam.ipc_url = "/opt/share/data/Street.uvf"; break; @@ -1071,7 +1077,7 @@ string createTask_dvpp28181(void *handle, std::vector algor_ve tparam.ipc_url = "34020000001310004065"; break; case 1: - tparam.ipc_url = "34020000001310000001"; + tparam.ipc_url = "34020000001320000109"; break; case 2: tparam.ipc_url = "34020000001320000166"; @@ -1147,6 +1153,90 @@ string createTask_dvpp28181(void *handle, std::vector algor_ve return task_id_str; } +string createTask_dvpp28181_tcp(void *handle, std::vector algor_vec, int gi, bool bFlag = true){ + task_param tparam; + + switch(gi){ + case 8: + tparam.ipc_url = "34020000001310004065"; + break; + case 9: + tparam.ipc_url = "34020000001320000109"; + break; + case 10: + tparam.ipc_url = "34020000001320000166"; + break; + case 11: + tparam.ipc_url = "32120200002160000077"; + break; + case 12: + tparam.ipc_url = "34020000001320000207"; + break; + case 13: + tparam.ipc_url = "34020000001310000176"; + break; + default: + tparam.ipc_url = "34020000001310004065"; + break; + } + + tparam.algor_counts = algor_vec.size(); + tparam.dec_type = 3; + tparam.protocal = 1; + + if (bFlag){ + nTaskId = gi; + } + + std::string task_id_str = "test_task_id_" + std::to_string(nTaskId); + tparam.task_id = task_id_str.c_str(); + + nTaskId++; + + tparam.algor_config_params = new algor_config_param[tparam.algor_counts]; + + for (size_t idx = 0; idx < algor_vec.size(); ++idx) + set_task_params(tparam, idx, algor_vec.at(idx)); + + const int result_code = add_task(handle, tparam); + if (result_code != 0) + printf("[Error]: "); + printf("--- task_id: %s result code: %d\n", tparam.task_id, result_code); + + + // 释放参数 + for (size_t idx = 0; idx < algor_vec.size(); ++idx) { + if(tparam.algor_config_params[idx].algor_type == algorithm_type_t::VIDEO_TIMING_SNAPSHOT) { + algor_config_param_road_work* algor_param = (algor_config_param_road_work*)tparam.algor_config_params[idx].algor_init_config_param->algor_param; + delete algor_param; + algor_basic_config_param_t* basic_param = (algor_basic_config_param_t*)tparam.algor_config_params[idx].algor_init_config_param->basic_param; + delete basic_param; + + algor_init_config_param_t* config_param = tparam.algor_config_params[idx].algor_init_config_param; + delete config_param; + } else if(tparam.algor_config_params[idx].algor_type == algorithm_type_t::VEHICLE_SOLIDLINETURNAROUND) { + algor_config_param_manned_incident* algor_param = (algor_config_param_manned_incident*)tparam.algor_config_params[idx].algor_init_config_param->algor_param; + delete algor_param; + algor_basic_config_param_t* basic_param = (algor_basic_config_param_t*)tparam.algor_config_params[idx].algor_init_config_param->basic_param; + delete basic_param; + + algor_init_config_param_t* config_param = tparam.algor_config_params[idx].algor_init_config_param; + delete config_param; + } else if(tparam.algor_config_params[idx].algor_type == algorithm_type_t::VEHICLE_SOLIDLINETURNAROUND) { + algor_config_param_manned_incident* algor_param = (algor_config_param_manned_incident*)tparam.algor_config_params[idx].algor_init_config_param->algor_param; + delete algor_param; + algor_basic_config_param_t* basic_param = (algor_basic_config_param_t*)tparam.algor_config_params[idx].algor_init_config_param->basic_param; + delete basic_param; + + algor_init_config_param_t* config_param = tparam.algor_config_params[idx].algor_init_config_param; + delete config_param; + } + } + delete[] tparam.algor_config_params; + + return task_id_str; +} + void test_snapshot(void *handle){ task_param tparam; tparam.ipc_url = "rtsp://admin:ad123456@192.168.60.165:554/cam/realmonitor?channel=1&subtype=0"; @@ -1328,36 +1418,64 @@ init_mq_conn(handle); -char ch = 'a'; -while (ch != 'q') { - ch = getchar(); - switch (ch) +char ch[10] = "a"; +while (strcmp(ch,"q") != 0) { + fgets(ch,10,stdin); + int num = atoi(ch); + switch (num) { - case '0': + case 0: createTask_dvpp28181(handle, algor_vec, 0, false); break; - case '1': + case 1: createTask_dvpp28181(handle, algor_vec, 1, false); break; - case '2': + case 2: createTask_dvpp28181(handle, algor_vec, 2, false); break; - case '3': + case 3: createTask_dvpp28181(handle, algor_vec, 3, false); break; - case '4': + case 4: createTask_dvpp28181(handle, algor_vec, 4, false); break; - case '5': + case 5: createTask_dvpp28181(handle, algor_vec, 5, false); break; - case '6': + case 6: createTask(handle, algor_vec2, 2, false); break; - case '7': + case 7: createTask(handle, algor_vec2, 0, false); break; - case 'c': + case 8: + createTask_dvpp28181_tcp(handle, algor_vec, 8, false); + break; + case 9: + createTask_dvpp28181_tcp(handle, algor_vec, 9, false); + break; + case 10: + createTask_dvpp28181_tcp(handle, algor_vec, 10, false); + break; + case 11: + createTask_dvpp28181_tcp(handle, algor_vec, 11, false); + break; + case 12: + createTask_dvpp28181_tcp(handle, algor_vec, 12, false); + break; + case 13: + createTask_dvpp28181_tcp(handle, algor_vec, 13, false); + break; + case 14: + createTask(handle, algor_vec2, 28, false); + break; + case 15: + createTask(handle, algor_vec2, 1, false); + break; + case 16: + createTask(handle, algor_vec2, 29, false); + break; + case 100: close_all_task(handle); break; default: -- libgit2 0.21.4