/* This IPv4 example listens for incoming packets and automatically adds destinations for new sources. */ #include "rtpsession.h" #include "rtppacket.h" #include "rtpudpv4transmitter.h" #include "rtptcptransmitter.h" #include "rtpipv4address.h" #include "rtptcpaddress.h" #include "rtpsessionparams.h" #include "rtperrors.h" #include "rtpsourcedata.h" #include #include #include #include #include #include #include #include #include #include using namespace jrtplib; using namespace std; // // This function checks if there was a RTP error. If so, it displays an error // message and exists. // void checkerror(int rtperr) { if (rtperr < 0) { std::cout << "ERROR: " << RTPGetErrorString(rtperr) << std::endl; exit(-1); } } static long long get_cur_time(){ chrono::time_point tpMs = chrono::time_point_cast(chrono::system_clock::now()); return tpMs.time_since_epoch().count(); } // // The new class routine // // class MyRTPSession : public RTPSession // { // protected: // void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled) // { // // printf("timestamp: %ld SSRC %x Got packet (%d bytes) in OnValidatedRTPPacket from source 0x%04x!\n", get_cur_time(), GetLocalSSRC(), // // (int)rtppack->GetPayloadLength(), srcdat->GetSSRC()); // DeletePacket(rtppack); // *ispackethandled = true; // } // void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength) // { // char msg[1024]; // memset(msg, 0, sizeof(msg)); // if (itemlength >= sizeof(msg)) // itemlength = sizeof(msg)-1; // memcpy(msg, itemdata, itemlength); // // printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC()); // } // virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress) // { // AddDestination(*senderAddress); // } // virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress) // { // //AddDestination(*senderaddress); // //const char* name = "hi~"; // //SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8); // printf("send rtcp app"); // } // }; bool bSocket = false; class MyTCPTransmitter : public RTPTCPTransmitter { public: MyTCPTransmitter() : RTPTCPTransmitter(0){ } void OnSendError(SocketType sock) { cout << "timestamp: " << get_cur_time() << " : Error sending over socket " << sock << ", removing destination" << endl; DeleteDestination(RTPTCPAddress(sock)); bSocket = false; } void OnReceiveError(SocketType sock) { cout << ": Error receiving from socket " << sock << ", removing destination" << endl; DeleteDestination(RTPTCPAddress(sock)); } }; RTPSession sess; MyTCPTransmitter* trans; RTPSessionParams sessparams; int thread_func(void* param){ cout << "thread started..." << endl; int* p = (int*) param ; sockaddr_in clientAddr; int nLen = sizeof(sockaddr_in); int nServer = -1;//accept(*p, (sockaddr*)&clientAddr, (socklen_t * ) &nLen); // if (nServer == -1){ // return -1; // } cout << "timestamp: " << get_cur_time() << " while() start" << endl; while(true){ while (!bSocket) { nServer = accept(*p, (sockaddr*)&clientAddr, (socklen_t * ) &nLen); if (nServer == -1) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } cout << "nServer = " << nServer << endl; sess.AddDestination(RTPTCPAddress(nServer)); bSocket = true; break; } sess.BeginDataAccess(); // check incoming packets if (sess.GotoFirstSourceWithData()) { do { RTPPacket *pack; while ((pack = sess.GetNextPacket()) != NULL) { // You can examine the data here // printf("Got packet !\n"); cout << "Got packet ! timestamp: " << get_cur_time() << endl; // we don't longer need the packet, so // we'll delete it sess.DeletePacket(pack); } } while (sess.GotoNextSourceWithData()); } sess.EndDataAccess(); sess.Poll(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } close(*p); // sess.BYEDestroy(RTPTime(10,0),0,0); } int nListener = -1; int tcp_mode(int port){ nListener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (nListener < 0) { return -1; } sockaddr_in serverAddr; memset(&serverAddr, 0, sizeof(sockaddr_in)); serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(port); serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); int nRet = bind(nListener, (sockaddr*)&serverAddr, sizeof(serverAddr)); if (nRet == -1) { return -1; } if (listen(nListener, 1) == -1) { return -1; } int nPackSize = 45678; sessparams.SetProbationType(RTPSources::NoProbation); sessparams.SetOwnTimestampUnit(90000.0 / 25.0); sessparams.SetMaximumPacketSize(nPackSize + 64); trans = new MyTCPTransmitter(); int status = trans->Init(false); status = trans->Create(65535, NULL); status = sess.Create(sessparams, trans); if (status < 0) { std::cout << "create session error!!" << std::endl; return -1; } std::cout << "begin.." << std::endl; thread* t = new std::thread(thread_func, &nListener); return 0; } // // The main routine // int main(int argc, char* argv[]){ tcp_mode(40032); while(getchar() =='q'); return 0; }