Commit c2ff6d2adf6c4cf758483285e17ebaf5bcefb97b

Authored by Hu Chunming
1 parent c027963f

初步实现ffmepg接收rtp流

src/decoder/dvpp/DvppDecoder.cpp
@@ -174,7 +174,7 @@ AVCodecContext* DvppDecoder::init_FFmpeg(FFDecConfig config){ @@ -174,7 +174,7 @@ AVCodecContext* DvppDecoder::init_FFmpeg(FFDecConfig config){
174 } 174 }
175 175
176 // 查找视频流信息 176 // 查找视频流信息
177 - const AVCodec *decoder = nullptr; 177 + AVCodec *decoder = nullptr;
178 video_index = av_find_best_stream(fmt_ctx, AVMEDIA_TYPE_VIDEO, -1, -1, &decoder, 0); 178 video_index = av_find_best_stream(fmt_ctx, AVMEDIA_TYPE_VIDEO, -1, -1, &decoder, 0);
179 if (video_index < 0) { 179 if (video_index < 0) {
180 LOG_ERROR("[{}]- Cannot find a video stream in the input file!", m_dec_name); 180 LOG_ERROR("[{}]- Cannot find a video stream in the input file!", m_dec_name);
src/decoder/dvpp/DvppRtpDecoder.cpp
@@ -224,13 +224,13 @@ bool DvppRtpDecoder::isSurport(FFDecConfig&amp; cfg){ @@ -224,13 +224,13 @@ bool DvppRtpDecoder::isSurport(FFDecConfig&amp; cfg){
224 } 224 }
225 225
226 bool DvppRtpDecoder::start(){ 226 bool DvppRtpDecoder::start(){
  227 +
  228 + m_bRunning = true;
227 229
228 if(!probe()) { 230 if(!probe()) {
229 return false; 231 return false;
230 } 232 }
231 233
232 - m_bRunning = true;  
233 -  
234 m_read_thread = new std::thread([](void* arg) 234 m_read_thread = new std::thread([](void* arg)
235 { 235 {
236 DvppRtpDecoder* a=(DvppRtpDecoder*)arg; 236 DvppRtpDecoder* a=(DvppRtpDecoder*)arg;
@@ -362,62 +362,82 @@ void DvppRtpDecoder::release_ffmpeg() { @@ -362,62 +362,82 @@ void DvppRtpDecoder::release_ffmpeg() {
362 362
363 void DvppRtpDecoder::CacheBuffer(uint8_t* recvBuf, int recvBufSize) { 363 void DvppRtpDecoder::CacheBuffer(uint8_t* recvBuf, int recvBufSize) {
364 if ((m_bufferSize + recvBufSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) { 364 if ((m_bufferSize + recvBufSize - RTP_HEADER_SIZE) < MAX_RTP_BUFFER_SIZE) {
365 - memcpy(m_buffer + m_bufferSize, recvBuf + RTP_HEADER_SIZE, recvBufSize - RTP_HEADER_SIZE);  
366 - m_bufferSize += recvBufSize - RTP_HEADER_SIZE;  
367 - } else { 365 + memcpy(m_buffer + m_bufferSize, recvBuf + RTP_HEADER_SIZE, recvBufSize - RTP_HEADER_SIZE);
  366 + m_bufferSize += recvBufSize - RTP_HEADER_SIZE;
  367 + } else {
368 LOG_WARN("recvBufSize = {} over MAX_RTP_BUFFER_SIZE ", recvBufSize); 368 LOG_WARN("recvBufSize = {} over MAX_RTP_BUFFER_SIZE ", recvBufSize);
369 } 369 }
370 } 370 }
371 371
372 int DvppRtpDecoder::ReadBuffer(uint8_t* buf, int buffsize) { 372 int DvppRtpDecoder::ReadBuffer(uint8_t* buf, int buffsize) {
373 int ret = 0; 373 int ret = 0;
  374 + int count = 0;
  375 + while(m_bRunning && (m_bufferSize < 4096 || m_bufferSize < buffsize)){
  376 + std::this_thread::sleep_for(std::chrono::milliseconds(10));
  377 + count++;
  378 + if (count >= 1000) {
  379 + // 只等待10s
  380 + return -1;
  381 + }
  382 + }
  383 +
374 if (m_bufferSize >= buffsize) { 384 if (m_bufferSize >= buffsize) {
375 memcpy(buf, m_buffer, buffsize); 385 memcpy(buf, m_buffer, buffsize);
376 m_bufferSize = m_bufferSize - buffsize; 386 m_bufferSize = m_bufferSize - buffsize;
377 memmove(m_buffer, m_buffer + buffsize, m_bufferSize); 387 memmove(m_buffer, m_buffer + buffsize, m_bufferSize);
378 ret = buffsize; 388 ret = buffsize;
379 -  
380 - LOG_DEBUG("avio_read_packet={}", buffsize);  
381 } 389 }
382 390
  391 + printf("m_bufferSize=%d buffsize=%d\n", m_bufferSize.load(), buffsize);
  392 +
383 return ret; 393 return ret;
384 } 394 }
385 395
386 bool DvppRtpDecoder::probe() { 396 bool DvppRtpDecoder::probe() {
  397 +
387 // todo: 此处可能有泄露 398 // todo: 此处可能有泄露
388 - unsigned char* avioBuff = (unsigned char*)av_malloc(7680 * 4320);  
389 - ioCtx = avio_alloc_context(avioBuff, sizeof(avioBuff), 0, this, avio_read_packet, NULL, NULL); 399 + unsigned char* avioBuff = (unsigned char*)av_malloc(MAX_RTP_BUFFER_SIZE);
  400 + AVIOContext *ioCtx = avio_alloc_context(avioBuff, sizeof(avioBuff), 0, this, avio_read_packet, NULL, NULL);
390 //探测流(获取码流格式) 401 //探测流(获取码流格式)
391 - const AVInputFormat* inputFmt;  
392 - int ret = av_probe_input_buffer2(ioCtx, &inputFmt, "", NULL, 0, 0); 402 + AVInputFormat* inputFmt = nullptr;
  403 + int ret = av_probe_input_buffer(ioCtx, &inputFmt, "", NULL, 0, 0);
393 if (ret < 0){ 404 if (ret < 0){
394 - LOG_ERROR("av_probe_input_buffer2 error: {}", ret); 405 + LOG_ERROR("av_probe_input_buffer error: {}", ret);
395 return false; 406 return false;
396 } 407 }
397 408
398 do{ 409 do{
399 fmt_ctx = avformat_alloc_context(); 410 fmt_ctx = avformat_alloc_context();
  411 +
  412 + fmt_ctx->probesize = 10000000;//5 000 000
  413 + fmt_ctx->flags |= AVFMT_FLAG_NOBUFFER;
  414 + av_opt_set(fmt_ctx->priv_data,"preset","ultrafast",0);
  415 +
  416 + //AV_TIME_BASE = 1000 000
  417 + fmt_ctx->max_analyze_duration = 90 * AV_TIME_BASE;
  418 +
400 fmt_ctx->pb = ioCtx; 419 fmt_ctx->pb = ioCtx;
401 420
402 421
403 AVDictionary* net_options{nullptr};//网络连接参数 422 AVDictionary* net_options{nullptr};//网络连接参数
404 -  
405 - //配置流参数  
406 - //av_dict_set(&net_options, "fflags", "nobuffer", 0); //不缓存直接解码 423 + // av_dict_set(&net_options, "fflags", "nobuffer", 0); //不缓存直接解码
  424 + av_dict_set( &net_options, "bufsize", "655360", 0 );
  425 + av_dict_set( &net_options, "stimeout", "30000000", 0 ); // 单位为 百万分之一秒
  426 + av_dict_set( &net_options, "max_delay", "500000", 0); //设置最大时延
407 427
408 //打开流 428 //打开流
409 - ret = avformat_open_input(&fmt_ctx, "", inputFmt, &net_options);  
410 - if (ret != 0)  
411 - { 429 + ret = avformat_open_input(&fmt_ctx, 0, 0, &net_options);
  430 + if (ret != 0) {
412 LOG_ERROR("avformat_open_input error: {}", ret); 431 LOG_ERROR("avformat_open_input error: {}", ret);
413 break; 432 break;
414 } 433 }
  434 +
415 //获取流信息 435 //获取流信息
416 - if (avformat_find_stream_info(fmt_ctx, NULL) < 0)//?  
417 - { 436 + if (avformat_find_stream_info(fmt_ctx, NULL) < 0) {
418 LOG_ERROR("avformat_find_stream_info error"); 437 LOG_ERROR("avformat_find_stream_info error");
419 break; 438 break;
420 } 439 }
  440 +
421 //获取视频流 441 //获取视频流
422 mVideoIndex = av_find_best_stream(fmt_ctx, AVMEDIA_TYPE_VIDEO, -1, -1, NULL, 0); 442 mVideoIndex = av_find_best_stream(fmt_ctx, AVMEDIA_TYPE_VIDEO, -1, -1, NULL, 0);
423 if (mVideoIndex < 0) 443 if (mVideoIndex < 0)
@@ -550,8 +570,8 @@ void DvppRtpDecoder::read_thread() { @@ -550,8 +570,8 @@ void DvppRtpDecoder::read_thread() {
550 if (result == AVERROR_EOF || result < 0){ 570 if (result == AVERROR_EOF || result < 0){
551 av_packet_free(&pkt); 571 av_packet_free(&pkt);
552 pkt = nullptr; 572 pkt = nullptr;
553 - LOG_WARN("[{}]- Failed to read frame!", m_dec_name);  
554 - break; 573 + // LOG_WARN("[{}]- Failed to read frame!", m_dec_name);
  574 + continue;
555 } 575 }
556 576
557 if (m_DvppCacheCounter.load() > m_cache_gop){ 577 if (m_DvppCacheCounter.load() > m_cache_gop){
src/decoder/dvpp/DvppRtpDecoder.h
@@ -106,7 +106,6 @@ private: @@ -106,7 +106,6 @@ private:
106 106
107 // 读取数据 107 // 读取数据
108 AVFormatContext *fmt_ctx{nullptr}; 108 AVFormatContext *fmt_ctx{nullptr};
109 - AVIOContext * ioCtx{nullptr};  
110 int mVideoIndex {-1}; 109 int mVideoIndex {-1};
111 AVPixelFormat pix_fmt; 110 AVPixelFormat pix_fmt;
112 AVCodecContext *avctx{nullptr}; 111 AVCodecContext *avctx{nullptr};
src/decoder/gb28181/rtp2/RTPReceiver2.cpp
@@ -184,7 +184,7 @@ int RTPReceiver2::tcp_server() { @@ -184,7 +184,7 @@ int RTPReceiver2::tcp_server() {
184 char buff[4096]; 184 char buff[4096];
185 int n; 185 int n;
186 186
187 - if( (listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1 ){ 187 + if( (listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP)) == -1 ){
188 printf("create socket error: %s(errno: %d)\n",strerror(errno),errno); 188 printf("create socket error: %s(errno: %d)\n",strerror(errno),errno);
189 return 0; 189 return 0;
190 } 190 }
@@ -204,23 +204,30 @@ int RTPReceiver2::tcp_server() { @@ -204,23 +204,30 @@ int RTPReceiver2::tcp_server() {
204 return 0; 204 return 0;
205 } 205 }
206 206
207 - char recvBuf[10000]; 207 + uint8_t recvBuf[10000];
208 int recvBufSize = 0; 208 int recvBufSize = 0;
209 209
  210 + bool bFilter = false;
  211 +
210 while (!m_bRtpExit) 212 while (!m_bRtpExit)
211 { 213 {
212 - LOG_INFO("阻塞监听新连接..."); 214 + // LOG_INFO("阻塞监听新连接...");
213 // 阻塞接收请求 start 215 // 阻塞接收请求 start
214 socklen_t len = sizeof(sockaddr); 216 socklen_t len = sizeof(sockaddr);
215 sockaddr_in accept_addr; 217 sockaddr_in accept_addr;
216 int clientFd = accept(listenfd, (struct sockaddr*)&accept_addr, &len); 218 int clientFd = accept(listenfd, (struct sockaddr*)&accept_addr, &len);
217 if (clientFd < 0) { 219 if (clientFd < 0) {
218 - LOG_WARN("accept connection error"); 220 + if (!bFilter) {
  221 + LOG_WARN("accept connection warn");
  222 + bFilter = true;
  223 + }
  224 +
219 std::this_thread::sleep_for(std::chrono::milliseconds(5)); 225 std::this_thread::sleep_for(std::chrono::milliseconds(5));
220 continue; 226 continue;
221 } 227 }
222 // 阻塞接收请求 end 228 // 阻塞接收请求 end
223 LOG_INFO("发现新连接:clientFd={}", clientFd); 229 LOG_INFO("发现新连接:clientFd={}", clientFd);
  230 + bFilter = false;
224 231
225 while (!m_bRtpExit) { 232 while (!m_bRtpExit) {
226 recvBufSize = recv(clientFd, recvBuf, sizeof(recvBuf), 0); 233 recvBufSize = recv(clientFd, recvBuf, sizeof(recvBuf), 0);
@@ -244,7 +251,7 @@ int RTPReceiver2::tcp_server() { @@ -244,7 +251,7 @@ int RTPReceiver2::tcp_server() {
244 return 0; 251 return 0;
245 } 252 }
246 253
247 -void RTPReceiver2::parseTcpData(char* recvBuf, int recvBufSize) { 254 +void RTPReceiver2::parseTcpData(uint8_t* recvBuf, int recvBufSize) {
248 255
249 if ((mRecvCacheSize + recvBufSize) > Server_cache_max_size) { 256 if ((mRecvCacheSize + recvBufSize) > Server_cache_max_size) {
250 LOG_ERROR("超过缓冲容量上限,忽略本次读取的数据。mRecvCacheSize=%d,recvBufSize=%d",mRecvCacheSize, recvBufSize); 257 LOG_ERROR("超过缓冲容量上限,忽略本次读取的数据。mRecvCacheSize=%d,recvBufSize=%d",mRecvCacheSize, recvBufSize);
@@ -277,10 +284,10 @@ void RTPReceiver2::parseTcpData(char* recvBuf, int recvBufSize) { @@ -277,10 +284,10 @@ void RTPReceiver2::parseTcpData(char* recvBuf, int recvBufSize) {
277 284
278 struct RtpHeader rtpHeader; 285 struct RtpHeader rtpHeader;
279 parseRtpHeader(mRecvRtpBuffer, &rtpHeader); 286 parseRtpHeader(mRecvRtpBuffer, &rtpHeader);
280 - printf("get a rtp seq=%d,RtpBufferSize=%d,mRecvCacheSize=%d,marker=%d,timestamp=%d\n",  
281 - rtpHeader.seq,  
282 - mRecvRtpBufferSize,  
283 - mRecvCacheSize,rtpHeader.marker, rtpHeader.timestamp); 287 + // printf("get a rtp seq=%d,RtpBufferSize=%d,mRecvCacheSize=%d,marker=%d,timestamp=%d\n",
  288 + // rtpHeader.seq,
  289 + // mRecvRtpBufferSize,
  290 + // mRecvCacheSize,rtpHeader.marker, rtpHeader.timestamp);
284 291
285 // buffer 抛出 292 // buffer 抛出
286 m_buffer_cbk(m_bufferParam, mRecvRtpBuffer, mRecvRtpBufferSize, rtpHeader.timestamp); 293 m_buffer_cbk(m_bufferParam, mRecvRtpBuffer, mRecvRtpBufferSize, rtpHeader.timestamp);
src/decoder/gb28181/rtp2/RTPReceiver2.h
@@ -36,7 +36,7 @@ public: @@ -36,7 +36,7 @@ public:
36 36
37 private: 37 private:
38 bool start_server(string channel_id, int port, bool isUdp); 38 bool start_server(string channel_id, int port, bool isUdp);
39 - void parseTcpData(char* recvBuf, int recvBufSize); 39 + void parseTcpData(uint8_t* recvBuf, int recvBufSize);
40 40
41 public: 41 public:
42 uint8_t* mRecvCache {nullptr}; 42 uint8_t* mRecvCache {nullptr};