WebsocketClient.cpp 6.43 KB
#include "WebsocketClient.h"

#include "../common_header.h"

#include <fstream>

using namespace std;


using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;

void WebsocketClient::on_open(websocketpp::connection_hdl hdl)
{
    LOG_INFO( "连接sip服务器成功!");
    mbClosed = false;
}

void WebsocketClient::on_fail(websocketpp::connection_hdl h){
    LOG_INFO( "连接sip服务器失败!");
    mbClosed = true;
}

void WebsocketClient::on_close(websocketpp::connection_hdl h){
    LOG_INFO( "on_close");
    mbClosed = true;
}

void WebsocketClient::on_message(websocketpp::connection_hdl hdl, message_ptr msg)
{
     try {
        int ret = msg_parser(hdl,msg->get_payload());
        // 返回执行结果
        // ws_server.send(hdl, to_string(ret), websocketpp::frame::opcode::text);
    } catch (websocketpp::exception const & e) {
        LOG_ERROR( "Echo failed because:({})", e.what());
    }
}

int WebsocketClient::msg_parser(websocketpp::connection_hdl hdl, string msg) {
    vector<string> vec_msg = StringTools::split(msg, "|");
    if (vec_msg.size() <= 0)  {
        return -1;
    }

    int ret = -100;
    if ( StringTools::to_lower(vec_msg[1]) == "invite") {
        string rKey = vec_msg[0];
        ret = atoi(vec_msg[3].c_str());
        if (ret <= 0) {
            LOG_ERROR("invite failed:{}", msg);
            response_invite_failed(rKey);
        }
    } else if ( StringTools::to_lower(vec_msg[0]) == "bye") {
        if (StringTools::to_lower(vec_msg[1]) == "invite") {
            std::cout << "bye  invite: " << vec_msg[2] << std::endl;
        }
    } else {
        LOG_ERROR( "on_message called with hdl:{}  and message:{}",hdl.lock().get(), msg);
    }

    return ret;
}

bool WebsocketClient::readCfg() {
    std::ifstream cfgFile("./gb28181_cfg.xml");
    if(cfgFile.is_open()) {
        string strConfig;
        string str;
        while(cfgFile >> str)
        {
            strConfig += str;
        }
        CCatalogParser catPaser;
        mInfo = catPaser.DecodeServerConfig(strConfig.c_str());
    } else {
        LOG_ERROR("read sip server config file failed!");
        return false;
    }
    cfgFile.close();
    return true;
}

int WebsocketClient::init()
{
    if (!readCfg()) {
        return -1;
    }
    
    uri = "ws://" + mInfo.getWsIp() + ":" + to_string(mInfo.getWsPort());
    LOG_INFO("{}", uri);

    m_client.set_access_channels(websocketpp::log::alevel::all);
    m_client.clear_access_channels(websocketpp::log::alevel::frame_payload);
    m_client.clear_access_channels(websocketpp::log::alevel::frame_header);

    m_client.init_asio();


    m_client.set_message_handler(websocketpp::lib::bind(&WebsocketClient::on_message, this,::_1, ::_2));
    m_client.set_open_handler(websocketpp::lib::bind(&WebsocketClient::on_open, this, _1));
    m_client.set_fail_handler(websocketpp::lib::bind(&WebsocketClient::on_fail, this, _1));
    m_client.set_close_handler(websocketpp::lib::bind(&WebsocketClient::on_close, this, _1));

    m_client.start_perpetual();

    thread_=websocketpp::lib::make_shared<websocketpp::lib::thread>(&client::run,&m_client);

    if (connect() != 0) {
        return -2;
    }

    if (mbClosed) {
        return -2;
    }

    return 0;
}

int WebsocketClient::connect()
{
    websocketpp::lib::error_code ec;
    client::connection_ptr con = m_client.get_connection(uri, ec);
    if(ec) {
        std::cout << "could not create connection because: " << ec.message() << std::endl;
        return -1;
    }

    m_hdl = con->get_handle();
    auto ret = m_client.connect(con);
    auto status = con->get_state();;
    if (status < 2) {
        return 0;
    }

    return -1;
}

int WebsocketClient::reconnect()
{
    websocketpp::lib::error_code ec;
    client::connection_ptr con = m_client.get_connection(uri, ec);
    if(ec) {
        std::cout << "could not create connection because: " << ec.message() << std::endl;
        return -1;
    }

    m_hdl = con->get_handle();
    auto ret = m_client.connect(con);
    auto status = con->get_state();;
    if (status == 1) {
        return 0;
    }

    return -1;
}

int WebsocketClient::check_connect(){
    if (mbClosed) {
        if (reconnect() == 0) {
            return 0;
        }
    } else {
        return 0;
    }
    return -1;
}

void WebsocketClient::close()
{
    m_client.close(m_hdl, websocketpp::close::status::normal, "");
}

void WebsocketClient::terminate()
{
    m_client.stop_perpetual();
    thread_->join();
}

void WebsocketClient::send_text(std::string msg) {
    if (!m_hdl.expired()) {
        m_client.send(m_hdl, msg, websocketpp::frame::opcode::text);
    }
}

int WebsocketClient::GetMinRtpPort(){
    return mInfo.getMinRtpPort();
}
    
int WebsocketClient::GetMaxRtpPort(){
    return mInfo.getMaxRtpPort();
}

int WebsocketClient::InviteUdp(std::string sip_channel_id, int rtp_port, RTPReceiver* r) {
    if (check_connect() < 0) {
        return -1;
    }
    
    string str_invite = "invite|udp|" + sip_channel_id + "|" + to_string(rtp_port);
	send_text(str_invite);

    cache_receiver(sip_channel_id, rtp_port, r);

    return 0;
}

int WebsocketClient::InviteTcp(std::string sip_channel_id, int rtp_port, RTPReceiver* r) {
    if (check_connect() < 0) {
        return -1;
    }

    string str_invite = "invite|tcp|" + sip_channel_id + "|" + to_string(rtp_port);
	send_text(str_invite);

    cache_receiver(sip_channel_id, rtp_port, r);

    return 0;
}

int WebsocketClient::ByeInvite(std::string sip_channel_id, int rtp_port) {
    if (check_connect() < 0) {
        return -1;
    }

    string str_invite = "bye|invite|" + sip_channel_id + "|" + to_string(rtp_port);
	send_text(str_invite);

    return 0;
}

void WebsocketClient::cache_receiver(std::string sip_channel_id, int rtp_port, RTPReceiver* r) {
    std::lock_guard<std::mutex> l(m_receiver_map_mtx);
    string rKey = sip_channel_id + "_" + to_string(rtp_port);
    m_receiver_map[rKey] = r;
}

int WebsocketClient::DeleteReceiverPair(std::string sip_channel_id, int rtp_port) {
    std::lock_guard<std::mutex> l(m_receiver_map_mtx);
    string rKey = sip_channel_id + "_" + to_string(rtp_port);
    auto it = m_receiver_map.find(rKey);
    if (it != m_receiver_map.end()) {
        m_receiver_map.erase(it);
    }
}

void WebsocketClient::response_invite_failed(std::string rKey) {
    std::lock_guard<std::mutex> l(m_receiver_map_mtx);
    auto it = m_receiver_map.find(rKey);
    if (it != m_receiver_map.end() && it->second) {
        it->second->RequestStreamFailed();
    }
}