3d2ab595
Hu Chunming
支持gb28181
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
|
/*
This is an example of how you can create a single poll thread for
multiple transmitters and sessions, using a single pre-created
RTPAbortDescriptors instance
*/
#include "rtpconfig.h"
#include <iostream>
using namespace std;
#if defined(RTP_SUPPORT_THREAD) && defined(RTP_SUPPORT_IPV6)
#include "rtpsession.h"
#include "rtpudpv6transmitter.h"
#include "rtpipv6address.h"
#include "rtpsessionparams.h"
#include "rtperrors.h"
#include "rtplibraryversion.h"
#include "rtpsourcedata.h"
#include "rtpabortdescriptors.h"
#include "rtpselect.h"
#include "rtprandom.h"
#include <jthread/jthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string>
#include <vector>
using namespace jrtplib;
using namespace jthread;
inline void checkerror(int rtperr)
{
if (rtperr < 0)
{
cerr << "ERROR: " << RTPGetErrorString(rtperr) << std::endl;
exit(-1);
}
}
class MyRTPSession : public RTPSession
{
public:
MyRTPSession(RTPRandom *rnd) : RTPSession(rnd) { }
~MyRTPSession() { }
protected:
void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled)
{
printf("SSRC %x Got packet in OnValidatedRTPPacket from source 0x%04x!\n", GetLocalSSRC(), srcdat->GetSSRC());
DeletePacket(rtppack);
*ispackethandled = true;
}
void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength)
{
char msg[1024];
memset(msg, 0, sizeof(msg));
if (itemlength >= sizeof(msg))
itemlength = sizeof(msg)-1;
memcpy(msg, itemdata, itemlength);
printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC());
}
};
class MyPollThread : public JThread
{
public:
MyPollThread(const vector<SocketType> &sockets, const vector<RTPSession *> &sessions)
: m_sockets(sockets), m_sessions(sessions)
{
if (m_mutex.Init() < 0)
{
cerr << "ERROR: unable to initialize mutex" << endl;
exit(-1);
}
m_stop = false;
}
void SignalStop()
{
m_mutex.Lock();
m_stop = true;
m_mutex.Unlock();
}
private:
void *Thread()
{
JThread::ThreadStarted();
vector<int8_t> flags(m_sockets.size());
bool done = false;
m_mutex.Lock();
done = m_stop;
m_mutex.Unlock();
while (!done)
{
double minInt = 10.0; // wait at most 10 secs
for (int i = 0 ; i < m_sessions.size() ; i++)
{
double nextInt = m_sessions[i]->GetRTCPDelay().GetDouble();
if (nextInt > 0 && nextInt < minInt)
minInt = nextInt;
else if (nextInt <= 0) // call the Poll function to make sure that RTCP packets are sent
{
//cout << "RTCP packet should be sent, calling Poll" << endl;
m_sessions[i]->Poll();
}
}
RTPTime waitTime(minInt);
//cout << "Waiting at most " << minInt << " seconds in select" << endl;
int status = RTPSelect(&m_sockets[0], &flags[0], m_sockets.size(), waitTime);
checkerror(status);
if (status > 0) // some descriptors were set
{
for (int i = 0 ; i < m_sockets.size() ; i++)
{
if (flags[i])
{
int idx = i/2; // two sockets per session
if (idx < m_sessions.size())
m_sessions[idx]->Poll();
}
}
}
m_mutex.Lock();
done = m_stop;
m_mutex.Unlock();
}
return 0;
}
JMutex m_mutex;
bool m_stop;
vector<SocketType> m_sockets;
vector<RTPSession *> m_sessions;
};
int main(void)
{
#ifdef RTP_SOCKETTYPE_WINSOCK
WSADATA dat;
WSAStartup(MAKEWORD(2,2),&dat);
#endif // RTP_SOCKETTYPE_WINSOCK
RTPAbortDescriptors abortDesc;
vector<SocketType> pollSockets;
checkerror(abortDesc.Init());
int numTrans = 5;
int portbaseBase = 6000;
vector<RTPUDPv6Transmitter *> transmitters;
for (int i = 0 ; i < numTrans ; i++)
{
RTPUDPv6TransmissionParams transParams;
transParams.SetPortbase(portbaseBase + i*2);
transParams.SetCreatedAbortDescriptors(&abortDesc);
RTPUDPv6Transmitter *pTrans = new RTPUDPv6Transmitter(0);
checkerror(pTrans->Init(true)); // We'll need thread safety
checkerror(pTrans->Create(64000, &transParams));
transmitters.push_back(pTrans);
}
vector<uint16_t> portBases;
vector<RTPSession *> sessions;
RTPRandom *rnd = RTPRandom::CreateDefaultRandomNumberGenerator();
for (int i = 0 ; i < transmitters.size() ; i++)
{
RTPUDPv6Transmitter *pTrans = transmitters[i];
RTPUDPv6TransmissionInfo *pInfo = static_cast<RTPUDPv6TransmissionInfo *>(pTrans->GetTransmissionInfo());
pollSockets.push_back(pInfo->GetRTPSocket());
pollSockets.push_back(pInfo->GetRTCPSocket());
portBases.push_back(pInfo->GetRTPPort());
pTrans->DeleteTransmissionInfo(pInfo);
RTPSession *pSess = new MyRTPSession(rnd); // make them all use the same random number generator
RTPSessionParams sessParams;
// We're going to use our own poll thread!
// IMPORTANT: don't use a single external RTPAbortDescriptors instance
// with the internal poll thread! It will cause threads to
// hang!
sessParams.SetUsePollThread(false);
sessParams.SetOwnTimestampUnit(1.0/8000.0);
checkerror(pSess->Create(sessParams, pTrans));
sessions.push_back(pSess);
}
// First, the pollSockets array will contain two sockets per session,
// and an extra entry will be added for the abort socket
pollSockets.push_back(abortDesc.GetAbortSocket());
// Let each session send to the next
uint8_t localHost[16] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 };
for (int i = 0 ; i < sessions.size() ; i++)
{
uint16_t destPortbase = portBases[(i+1)%portBases.size()];
checkerror(sessions[i]->AddDestination(RTPIPv6Address(localHost, destPortbase)));
}
MyPollThread myPollThread(pollSockets, sessions);
if (myPollThread.Start() < 0)
{
cerr << "ERROR: couldn't start own poll thread" << endl;
return -1;
}
cout << "Own poll thread started" << endl;
cout << "Main thread will sleep for 30 seconds (the sessions should still send RTCP packets to each other)" << endl;
RTPTime::Wait(RTPTime(30.0));
myPollThread.SignalStop(); // will cause the thread to end after an iteration
while (myPollThread.IsRunning())
{
abortDesc.SendAbortSignal(); // will make sure the thread isn't waiting for incoming data
RTPTime::Wait(RTPTime(0.01));
}
cout << "Own poll thread ended, cleaning up..." << endl;
for (int i = 0 ; i < sessions.size() ; i++)
delete sessions[i];
for (int i = 0 ; i < transmitters.size() ; i++)
delete transmitters[i];
delete rnd;
#ifdef RTP_SOCKETTYPE_WINSOCK
WSACleanup();
#endif // RTP_SOCKETTYPE_WINSOCK
cout << "Done" << endl;
return 0;
}
#else
int main(void)
{
cout << "Thread support or IPv6 support was not enabled at compile time" << endl;
return 0;
}
#endif // RTP_SUPPORT_THREAD && RTP_SUPPORT_IPV6
|