Blame view

src/decoder/gb28181/rtp/RTPUdpReceiver.cpp 6.19 KB
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
1
2
3
4
5
6
7
8
9
  
  #include "RTPUdpReceiver.h"
  #include <iostream>
  #include <time.h>
  
  #include <thread>
  #include <chrono>
  
  #include "../common_header.h"
69ee81f3   Hu Chunming   提交websocket clien...
10
  #include "../websocket/WebsocketClient.h"
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
  
  
  using namespace std;
  
  #define BUFFERSIZE_1024     4096
  #define BUFFERSIZE_GAP      4096//5120 //1024*5
  
  namespace 
  {
  	const int kVideoFrameSize         = BUFFERSIZE_1024*BUFFERSIZE_1024*5*2;
  	const int kRtpRecvBufferSize      = BUFFERSIZE_1024*BUFFERSIZE_1024*2;
  	const uint16_t kInvalidPort       = 0;
  }; // namespace
  
  class UdpRTPSession : public RTPSession
  {
  public:
  	UdpRTPSession() {}
  	virtual ~UdpRTPSession() {}
  
  private:
  	virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress)
  	{
  		AddDestination(*senderAddress);
  	}
  
  	virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress)
  	{
  		//AddDestination(*senderaddress);
  		//const char* name = "hi~";
  		//SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8);
  
  		//printf("send rtcp app");
  	}
  };
  
  static int rtp_revc_thread_(void* param)
  {
  	if (!param)
  	{
  		return -1;
  	}
  
  	RTPUdpReceiver* self = (RTPUdpReceiver*)param;
  	return self->OnRtpRecv();
  }
  
69ee81f3   Hu Chunming   提交websocket clien...
58
59
60
61
62
63
64
  static int connecting_thread_(void* param)
  {
  	if (!param) {
  		return -1;
  	}
  
  	RTPUdpReceiver* self = (RTPUdpReceiver*)param;
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
65
  	return self->CheckConnecting();
69ee81f3   Hu Chunming   提交websocket clien...
66
67
  }
  
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
68
  RTPUdpReceiver::RTPUdpReceiver() 
69ee81f3   Hu Chunming   提交websocket clien...
69
  :m_bOpened(false)
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
70
71
72
  , m_idleCount(-1)
  ,m_noDataCount(-1)
  {
69ee81f3   Hu Chunming   提交websocket clien...
73
  	m_bRtpExit = false;
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
74
75
76
77
78
79
80
  	m_sessparamsPtr = new RTPSessionParams();
  	m_transparamsPtr = new RTPUDPv4TransmissionParams();
  	m_rtpSessionPtr = new UdpRTPSession();
  }
  
  RTPUdpReceiver::~RTPUdpReceiver()
  {
69ee81f3   Hu Chunming   提交websocket clien...
81
  	m_bRtpExit = true;
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
82
83
84
85
86
87
88
89
90
91
92
  
  	if(nullptr != m_sessparamsPtr){
  		delete m_sessparamsPtr;
  		m_sessparamsPtr = nullptr;
  	}
  
  	if(nullptr != m_transparamsPtr){
  		delete m_transparamsPtr;
  		m_transparamsPtr = nullptr;
  	}
  
69ee81f3   Hu Chunming   提交websocket clien...
93
94
95
96
97
98
  	if (nullptr != m_connThreadPtr && m_connThreadPtr->joinable()) {
          m_connThreadPtr->join();
  		delete m_connThreadPtr;
  		m_connThreadPtr = nullptr;
      }
  
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
99
100
101
102
103
104
  	if(nullptr != m_rtpSessionPtr){
  		delete m_rtpSessionPtr;
  		m_rtpSessionPtr = nullptr;
  	}
  }
  
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
105
  bool RTPUdpReceiver::Open(string channel_id)
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
106
  {
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
107
108
  	m_SipChannelId = channel_id;
  
69ee81f3   Hu Chunming   提交websocket clien...
109
110
  	m_rtp_port = allocRtpPort();
  	if (m_rtp_port < 0) {
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
111
112
  		return false;
  	}
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
113
  
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
114
115
116
117
118
  	m_sessparamsPtr->SetUsePollThread(true);
  	m_sessparamsPtr->SetMinimumRTCPTransmissionInterval(10);
  	m_sessparamsPtr->SetOwnTimestampUnit(1.0/90000.0);
  	m_sessparamsPtr->SetAcceptOwnPackets(true);
  	
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
119
  	m_transparamsPtr->SetPortbase(m_rtp_port);
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
120
121
  	m_transparamsPtr->SetRTPReceiveBuffer(kRtpRecvBufferSize);
  
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
122
      LOG_INFO("[{}] port: {}", m_SipChannelId, m_rtp_port);
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
123
124
  	
  	int err = m_rtpSessionPtr->Create(*m_sessparamsPtr, m_transparamsPtr);
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
125
126
  	if (err != 0) {	
  		LOG_ERROR("[{}] Create error: {}", m_SipChannelId, err);
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
127
128
129
130
  		return false;
  	}
  
  	m_rtpThreadPtr = new std::thread(rtp_revc_thread_, this);
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
131
132
  	if (nullptr == m_rtpThreadPtr) {
  		LOG_ERROR("[{}] Create m_rtpThreadPtr error", m_SipChannelId);
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
133
134
  		return false;
  	}
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
135
  
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
136
  	if (InitPS() != 0) {
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
137
138
139
  		return false;
  	}
  
69ee81f3   Hu Chunming   提交websocket clien...
140
141
142
143
144
145
146
  	// InitPS()成功就得起该线程,因为ClosePsThread是在这里完成的
  	m_connThreadPtr = new std::thread(connecting_thread_, this);
  	if (nullptr == m_connThreadPtr) {
  		LOG_ERROR("[{}] Create m_connThreadPtr error", m_SipChannelId);
  		return false;
  	}
  
c3c38c3c   Hu Chunming   代码优化
147
148
149
150
151
152
  	bool bReq = RequestStream();
  	if (!bReq) {
  		LOG_INFO("[{}] RequestStream failed !", m_SipChannelId);
  		Close();
  		return false;
  	}
69ee81f3   Hu Chunming   提交websocket clien...
153
  
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
154
  	m_bOpened = true;
69ee81f3   Hu Chunming   提交websocket clien...
155
  	m_bNoData = false;
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
156
      LOG_INFO("[{}] Open ok", m_SipChannelId);
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
157
158
159
160
  
  	return true;
  }
  
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
161
162
163
164
165
166
167
168
  bool RTPUdpReceiver::IsOpened()
  {
  	return m_bOpened;
  }
  
  void RTPUdpReceiver::Close()
  {
      m_bRtpExit = true;
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
169
170
171
172
173
174
175
176
177
  }
  
  // RTP包线程
  int RTPUdpReceiver::OnRtpRecv()
  {
  	if(nullptr == m_rtpSessionPtr){
  		return -1;
  	}
  
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
178
179
  	m_bRecvExit = false;
  
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
180
  	LOG_INFO("[{}] OnRtpRecv started.", m_SipChannelId);
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
181
  	while (!m_bRecvExit)
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
182
  	{
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
183
184
185
186
187
188
189
190
191
192
  		m_rtpSessionPtr->Poll();
  		m_rtpSessionPtr->BeginDataAccess();
  		
  		if (m_rtpSessionPtr->GotoFirstSourceWithData())
  		{
  			// LOG_INFO("OnRtpRecv GotoFirstSourceWithData --{}", m_SipChannelId);
  			last_recv_ts = UtilTools::get_cur_time_ms();
  			m_idleCount = 0;
  			m_noDataCount = 0;
  			do
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
193
  			{
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
194
195
  				RTPPacket* packet;
  				while ((packet = m_rtpSessionPtr->GetNextPacket()) != NULL) 
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
196
  				{
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
197
198
199
200
201
202
203
  					m_bNoData = false;
  					// LOG_INFO("OnRtpRecv GetNextPacket --{}", m_SipChannelId);
  					int ret = ParsePacket(packet);
  					m_rtpSessionPtr->DeletePacket(packet);
  					
  					if(ret != 0){
  						m_bRecvExit = true;
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
204
  					}
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
205
206
207
208
  				}
  			} while (m_rtpSessionPtr->GotoNextSourceWithData());
  		}
  
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
209
210
211
212
213
  		m_rtpSessionPtr->EndDataAccess();
  
  		std::this_thread::sleep_for(std::chrono::milliseconds(10));
  	}
  
74d1e6a8   Hu Chunming   完成gb28181大体的代码,未完...
214
  	LOG_INFO("[{}] OnRtpRecv exited.", m_SipChannelId);
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
215
216
217
218
  
  	return 0;
  }
  
69ee81f3   Hu Chunming   提交websocket clien...
219
220
221
222
223
224
225
  bool RTPUdpReceiver::RequestStream() {
  	WebsocketClient* pClient = WebsocketClient::getInstance();
  	if (pClient){
  		if (pClient->InviteUdp(m_SipChannelId, m_rtp_port, this) < 0) {
  			return false;
  		}
  	}
c8285c8d   Hu Chunming   GB28181 UDP 有重大进展...
226
  
69ee81f3   Hu Chunming   提交websocket clien...
227
228
229
  	return true;
  }
  
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
230
231
  int RTPUdpReceiver::CheckConnecting() {
  	LOG_INFO("[{}] CheckConnecting started.", m_SipChannelId);
69ee81f3   Hu Chunming   提交websocket clien...
232
233
234
235
236
  
  	int count = 0;
  	while (!m_bRtpExit)
  	{
  		if (m_bNoData) {
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
237
238
239
240
  			// bool bReq = RequestStream();
  			// if (!bReq) {
  			// 	LOG_INFO("[{}] RequestStream failed !", m_SipChannelId);
  			// }
69ee81f3   Hu Chunming   提交websocket clien...
241
  
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
242
  			wait_times(50); // 等待5s
69ee81f3   Hu Chunming   提交websocket clien...
243
244
245
  			
  			count++;
  
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
246
247
  			if (count > 60) {
  				// 3min 依然没数据过来,则关闭
69ee81f3   Hu Chunming   提交websocket clien...
248
249
250
251
252
253
254
255
256
257
  				m_bRtpExit = true;
  				break;
  			}
  		} else {
  			m_bNoData = true;
  
  			wait_times(100);	// 等待10s 10s之内正常有数据的情况 m_bNoData 已经被置为false
  		}
  	}
  
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
258
  	m_bRecvExit = true;
69ee81f3   Hu Chunming   提交websocket clien...
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
  
  	// 结束整个任务
  	WebsocketClient* pClient = WebsocketClient::getInstance();
  	if (pClient){
  		pClient->ByeInvite(m_SipChannelId, m_rtp_port);
  	}
  
  	LOG_DEBUG("[{}] ByeInvite", m_SipChannelId);
      
      // rtp接收线程退出
      if (nullptr != m_rtpThreadPtr && m_rtpThreadPtr->joinable())
      {
          m_rtpThreadPtr->join();
  		delete m_rtpThreadPtr;
  		m_rtpThreadPtr = nullptr;
      }
  
69ee81f3   Hu Chunming   提交websocket clien...
276
277
      m_rtpSessionPtr->Destroy();
  
69ee81f3   Hu Chunming   提交websocket clien...
278
279
280
281
  	ClosePsThread();
  
      m_bOpened = false;
  
9a922a54   Hu Chunming   自动请求流修改到了sip服务端,解...
282
  	LOG_INFO("[{}] CheckConnecting exited.", m_SipChannelId);
69ee81f3   Hu Chunming   提交websocket clien...
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
  
  	return 0;
  }
  
  // 对退出命令敏感的延时
  bool RTPUdpReceiver::wait_times(int times) {
  	int count_sleep = times;
  	while (!m_bRtpExit) {
  		count_sleep-- ;
  		if (count_sleep <= 0) {
  			count_sleep = times;
  			break;
  		}
  		std::this_thread::sleep_for(std::chrono::milliseconds(100));
  	}
  }