WebSocketServer.cpp 6.23 KB
#include <iostream>

#include "WebSocketServer.h"
#include "./Utils/StringTools.hpp"
#include "ConfigParser.hpp"
#include "./Utils/logger.hpp"

using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;
using websocketpp::lib::bind;


WebSocketServer::WebSocketServer(/* args */)
{
}

WebSocketServer::~WebSocketServer()
{
}

int WebSocketServer::parse_invite(vector<string>& vec_msg, std::string ip) {
    string net_type = "";
    if (StringTools::to_lower(vec_msg[1]) == "udp") {
        net_type = "udp";
    } else if (StringTools::to_lower(vec_msg[1]) == "tcp") {
        net_type = "tcp";
    }

    LOG_INFO("invite {}:{}    net type: {}", vec_msg[2], vec_msg[3], vec_msg[1]);
    int ret = -100;
    if ("udp" == net_type) {
        ret = sip_server.RequestInvite_UDP(vec_msg[2].c_str(), ip.c_str(), atoi(vec_msg[3].c_str()));
    } else if ("tcp" == net_type)  {
        ret = sip_server.RequestInvite_TCP_a(vec_msg[2].c_str(), ip.c_str(), atoi(vec_msg[3].c_str()));
    }

    return ret;
}

string WebSocketServer::get_ip_from_hdl(websocketpp::connection_hdl hdl) {
    server::connection_ptr con = ws_server.get_con_from_hdl(hdl);
    // 获取客户端IP地址
    auto addr = con->get_socket().remote_endpoint().address().to_string();
    std::string ip = addr.substr(addr.find_last_of(":")+1);
    LOG_DEBUG("ip:{}", ip);
    return ip;
}

int WebSocketServer::msg_parser(websocketpp::connection_hdl hdl, string msg) {
    vector<string> vec_msg = StringTools::split(msg, "|");
    if (vec_msg.size() <= 0)  {
        return -1;
    }

    string ip = get_ip_from_hdl(hdl);

    string response_msg = "";
    int ret = -100;
    if ( StringTools::to_lower(vec_msg[0]) == "invite") {
        ret = parse_invite(vec_msg, ip);
        string strKey = vec_msg[2] + "_" + vec_msg[3];
        if (ret > 0) {
            std::lock_guard<std::mutex> l_hdl(m_channelport_hdl_map_mtx);
            m_channelport_hdl_map[strKey] = hdl;
        }
        
        response_msg = strKey +"|" + vec_msg[0] + "|" + vec_msg[1] + "|" + to_string(ret);
        ws_server.send(hdl, response_msg, websocketpp::frame::opcode::text);
    } else if ( StringTools::to_lower(vec_msg[0]) == "bye") {
        if (StringTools::to_lower(vec_msg[1]) == "invite") {
            LOG_INFO("BYE invite: {}", vec_msg[2]);
            
            std::lock_guard<std::mutex> l_hdl(m_channelport_hdl_map_mtx);
            string strKey = vec_msg[2] + "_" + vec_msg[3];
            auto it = m_channelport_hdl_map.find(strKey);
            if (it != m_channelport_hdl_map.end()) {
                m_channelport_hdl_map.erase(it);
            }

            ret = sip_server.ByeInvite(vec_msg[2], ip, atoi(vec_msg[3].c_str()));
        }
    } else {
        LOG_ERROR("on_message called with hdl: {}  and message:{}", hdl.lock().get(), msg);
    }

    return ret;
}

void WebSocketServer::response_client(std::string sip_channel_id, int rtp_port, std::string net_type, int ret) {

    std::lock_guard<std::mutex> l_hdl(m_channelport_hdl_map_mtx);

    string strKey = sip_channel_id + "_" + to_string(rtp_port);
    auto it = m_channelport_hdl_map.find(strKey);
    if (it == m_channelport_hdl_map.end()) {
        return;
    }

    try {
        // invite
        string response_msg = strKey +"|invite|" + net_type + "|" + to_string(ret);
        ws_server.send(it->second, response_msg, websocketpp::frame::opcode::text);
    } catch(const std::exception& e) {
        // 存在一种可能:连接已经断开,但是sip那边请求失败,这种时候应该是会抛出异常。所以这里用try-cache,屏蔽此情况造成的问题
        LOG_WARN("response failed because::{}", e.what());
    }
}

// Define a callback to handle incoming messages
void WebSocketServer::on_message(websocketpp::connection_hdl hdl, message_ptr msg) {

    if (msg->get_payload() == "stop-listening") {
        ws_server.stop_listening();
        return;
    }

    try {
        msg_parser(hdl,msg->get_payload());
    } catch (websocketpp::exception const & e) {
        LOG_ERROR("Echo failed because::{}", e.what());
        ws_server.send(hdl, to_string(-101), websocketpp::frame::opcode::text);
    }
}

void WebSocketServer::on_open(websocketpp::connection_hdl hdl)
{
    LOG_INFO("连接sip服务器成功!");
}

void WebSocketServer::on_fail(websocketpp::connection_hdl h){
    LOG_WARN("连接sip服务器失败!");
}

void WebSocketServer::on_close(websocketpp::connection_hdl h){
    LOG_INFO("on_close:{}", h.lock().get());

    std::lock_guard<std::mutex> l_hdl(m_channelport_hdl_map_mtx);

    for (auto it = m_channelport_hdl_map.begin(); it != m_channelport_hdl_map.end(); )
    {
        server::connection_ptr con = ws_server.get_con_from_hdl(it->second);
        server::connection_ptr con_std = ws_server.get_con_from_hdl(h);
        if (con == con_std) {
            it = m_channelport_hdl_map.erase(it);
        } else {
            it++;
        }
    }
}


void WebSocketServer::start() {
    ConfigParser* pConfig = ConfigParser::getInstance();
    if (!pConfig->init())
    {
        LOG_ERROR("get config failed!");
        return;
    }

    ServerInfo info = pConfig->getServerCfg();
    
    sip_server.Init(&info, this);

    try {
        // Set logging settings
        ws_server.set_access_channels(websocketpp::log::alevel::all);
        ws_server.clear_access_channels(websocketpp::log::alevel::frame_payload);

        // Initialize Asio
        ws_server.init_asio();

        // Register our message handler
        ws_server.set_message_handler(bind(&WebSocketServer::on_message,this,::_1,::_2));
        ws_server.set_open_handler(bind(&WebSocketServer::on_open, this, _1));
        ws_server.set_fail_handler(bind(&WebSocketServer::on_fail, this, _1));
        ws_server.set_close_handler(bind(&WebSocketServer::on_close, this, _1));

        // Listen on port 9002
        int port = info.getWsPort();
        LOG_INFO("websocket server listen:{}", port);
        ws_server.listen(port);

        // Start the server accept loop
        ws_server.start_accept();

        // Start the ASIO io_service run loop
        ws_server.run();
    } catch (websocketpp::exception const & e) {
        LOG_ERROR("websocket start failed:{}", e.what());
    } catch (...) {
        LOG_ERROR("websocket start failed: other exception");
    }
}