tcp_server.cpp 5.9 KB
/*
   This IPv4 example listens for incoming packets and automatically adds destinations
   for new sources.
*/

#include "rtpsession.h"
#include "rtppacket.h"
#include "rtpudpv4transmitter.h"
#include "rtptcptransmitter.h"
#include "rtpipv4address.h"
#include "rtptcpaddress.h"
#include "rtpsessionparams.h"
#include "rtperrors.h"
#include "rtpsourcedata.h"
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <unistd.h>

#include <netinet/tcp.h>  
#include <sys/types.h>
#include <sys/socket.h>

using namespace jrtplib;
using namespace std;

//
// This function checks if there was a RTP error. If so, it displays an error
// message and exists.
//

void checkerror(int rtperr)
{
	if (rtperr < 0)
	{
		std::cout << "ERROR: " << RTPGetErrorString(rtperr) << std::endl;
		exit(-1);
	}
}

static long long get_cur_time(){
	chrono::time_point<chrono::system_clock, chrono::milliseconds> tpMs 
		= chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now());

	return tpMs.time_since_epoch().count();
}

//
// The new class routine
//

// class MyRTPSession : public RTPSession
// {
// protected:

// 	void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled)
// 	{
// 		// printf("timestamp: %ld  SSRC %x Got packet (%d bytes) in OnValidatedRTPPacket from source 0x%04x!\n", get_cur_time(), GetLocalSSRC(), 
// 			//    (int)rtppack->GetPayloadLength(), srcdat->GetSSRC());
// 		DeletePacket(rtppack);
// 		*ispackethandled = true;
// 	}

// 	void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength)
// 	{
// 		char msg[1024];

// 		memset(msg, 0, sizeof(msg));
// 		if (itemlength >= sizeof(msg))
// 			itemlength = sizeof(msg)-1;

// 		memcpy(msg, itemdata, itemlength);
// 		// printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC());
// 	}

// 	virtual void OnRTPPacket(RTPPacket* pack, const RTPTime& receiverTime, const RTPAddress* senderAddress)
// 	{
// 		AddDestination(*senderAddress);
// 	}

// 	virtual void OnRTCPCompoundPacket(RTCPCompoundPacket *pack, const RTPTime &receivetime,const RTPAddress *senderaddress)
// 	{
// 		//AddDestination(*senderaddress);
// 		//const char* name = "hi~";
// 		//SendRTCPAPPPacket(0, (const uint8_t*)name, "keeplive", 8);

// 		printf("send rtcp app");
// 	}
// };

bool bSocket = false;

class MyTCPTransmitter : public RTPTCPTransmitter
{
public:
	MyTCPTransmitter() : RTPTCPTransmitter(0){ }

	void OnSendError(SocketType sock)
	{
		cout << "timestamp: " << get_cur_time() << " : Error sending over socket " << sock << ", removing destination" << endl;
		DeleteDestination(RTPTCPAddress(sock));

		bSocket = false;
	}
	
	void OnReceiveError(SocketType sock)
	{
		cout << ": Error receiving from socket " << sock << ", removing destination" << endl;
		DeleteDestination(RTPTCPAddress(sock));
	}
};

RTPSession sess;
MyTCPTransmitter* trans;
RTPSessionParams   sessparams;

int thread_func(void* param){

	cout << "thread started..." << endl;

	int* p = (int*) param ;
	sockaddr_in   clientAddr;
	int  nLen = sizeof(sockaddr_in);
	int nServer = -1;//accept(*p, (sockaddr*)&clientAddr, (socklen_t * ) &nLen);
	// if (nServer == -1){
	// 	return -1;
	// }

	cout << "timestamp: " << get_cur_time() << "  while() start" << endl;

	while(true){

		while (!bSocket)
		{
			nServer = accept(*p, (sockaddr*)&clientAddr, (socklen_t * ) &nLen);
			if (nServer == -1)
			{
				std::this_thread::sleep_for(std::chrono::milliseconds(10));
				continue;
			}

			cout << "nServer = " << nServer << endl;
			sess.AddDestination(RTPTCPAddress(nServer));
			bSocket = true;
			break;
		}

		sess.BeginDataAccess();
        
        // check incoming packets
        if (sess.GotoFirstSourceWithData())
        {
            do
            {
                RTPPacket *pack;
                
                while ((pack = sess.GetNextPacket()) != NULL)
                {
                    // You can examine the data here
                    // printf("Got packet !\n");
					cout << "Got packet ! timestamp: " << get_cur_time() << endl;
                    
                    // we don't longer need the packet, so
                    // we'll delete it
                    sess.DeletePacket(pack);
                }
            } while (sess.GotoNextSourceWithData());
        }
        
        sess.EndDataAccess();

		sess.Poll();
		
		std::this_thread::sleep_for(std::chrono::milliseconds(10));
	}
	
	close(*p);

	// sess.BYEDestroy(RTPTime(10,0),0,0);
}

int nListener = -1;
int tcp_mode(int port){
	nListener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	if (nListener < 0)
	{
		return  -1;
	}
 
	sockaddr_in   serverAddr;
	memset(&serverAddr, 0, sizeof(sockaddr_in));
	serverAddr.sin_family = AF_INET;
	serverAddr.sin_port = htons(port);
	serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
	int  nRet = bind(nListener, (sockaddr*)&serverAddr, sizeof(serverAddr));
	if (nRet == -1)
	{
		return  -1;
	}

	if (listen(nListener, 1) == -1)
	{
		return  -1;
	}
 
	int  nPackSize = 45678;
	
	sessparams.SetProbationType(RTPSources::NoProbation);
	sessparams.SetOwnTimestampUnit(90000.0 / 25.0);
	sessparams.SetMaximumPacketSize(nPackSize + 64);

	trans = new MyTCPTransmitter();
	int status = trans->Init(false);
	status = trans->Create(65535, NULL);

	status = sess.Create(sessparams, trans);
	if (status < 0)
	{
		std::cout << "create session error!!" << std::endl;
		return -1;
	}

	std::cout << "begin.." << std::endl;
	
	thread* t = new std::thread(thread_func, &nListener);

	return 0;
}
//
// The main routine
// 

int main(int argc, char* argv[]){

	tcp_mode(40032);
	
	while(getchar() =='q');

	return 0;
}