#include #include "WebSocketServer.h" #include "./Utils/StringTools.hpp" #include "ConfigParser.hpp" #include "./Utils/logger.hpp" using websocketpp::lib::placeholders::_1; using websocketpp::lib::placeholders::_2; using websocketpp::lib::bind; WebSocketServer::WebSocketServer(/* args */) { } WebSocketServer::~WebSocketServer() { } int WebSocketServer::parse_invite(vector& vec_msg, std::string ip) { string net_type = ""; if (StringTools::to_lower(vec_msg[1]) == "udp") { net_type = "udp"; } else if (StringTools::to_lower(vec_msg[1]) == "tcp") { net_type = "tcp"; } LOG_INFO("invite {}:{} net type: {}", vec_msg[2], vec_msg[3], vec_msg[1]); int ret = -100; if ("udp" == net_type) { ret = sip_server.RequestInvite_UDP(vec_msg[2].c_str(), ip.c_str(), atoi(vec_msg[3].c_str())); } else if ("tcp" == net_type) { ret = sip_server.RequestInvite_TCP_a(vec_msg[2].c_str(), ip.c_str(), atoi(vec_msg[3].c_str())); } return ret; } string WebSocketServer::get_ip_from_hdl(websocketpp::connection_hdl hdl) { server::connection_ptr con = ws_server.get_con_from_hdl(hdl); // 获取客户端IP地址 auto addr = con->get_socket().remote_endpoint().address().to_string(); std::string ip = addr.substr(addr.find_last_of(":")+1); LOG_DEBUG("ip:{}", ip); return ip; } int WebSocketServer::msg_parser(websocketpp::connection_hdl hdl, string msg) { vector vec_msg = StringTools::split(msg, "|"); if (vec_msg.size() <= 0) { return -1; } string ip = get_ip_from_hdl(hdl); string response_msg = ""; int ret = -100; if ( StringTools::to_lower(vec_msg[0]) == "invite") { ret = parse_invite(vec_msg, ip); string strKey = vec_msg[2] + "_" + vec_msg[3]; if (ret > 0) { std::lock_guard l_hdl(m_channelport_hdl_map_mtx); m_channelport_hdl_map[strKey] = hdl; } response_msg = strKey +"|" + vec_msg[0] + "|" + vec_msg[1] + "|" + to_string(ret); ws_server.send(hdl, response_msg, websocketpp::frame::opcode::text); } else if ( StringTools::to_lower(vec_msg[0]) == "bye") { if (StringTools::to_lower(vec_msg[1]) == "invite") { LOG_INFO("BYE invite: {}", vec_msg[2]); std::lock_guard l_hdl(m_channelport_hdl_map_mtx); string strKey = vec_msg[2] + "_" + vec_msg[3]; auto it = m_channelport_hdl_map.find(strKey); if (it != m_channelport_hdl_map.end()) { m_channelport_hdl_map.erase(it); } ret = sip_server.ByeInvite(vec_msg[2], ip, atoi(vec_msg[3].c_str())); } } else { LOG_ERROR("on_message called with hdl: {} and message:{}", hdl.lock().get(), msg); } return ret; } void WebSocketServer::response_client(std::string sip_channel_id, int rtp_port, std::string net_type, int ret) { std::lock_guard l_hdl(m_channelport_hdl_map_mtx); string strKey = sip_channel_id + "_" + to_string(rtp_port); auto it = m_channelport_hdl_map.find(strKey); if (it == m_channelport_hdl_map.end()) { return; } try { // invite string response_msg = strKey +"|invite|" + net_type + "|" + to_string(ret); ws_server.send(it->second, response_msg, websocketpp::frame::opcode::text); } catch(const std::exception& e) { // 存在一种可能:连接已经断开,但是sip那边请求失败,这种时候应该是会抛出异常。所以这里用try-cache,屏蔽此情况造成的问题 LOG_WARN("response failed because::{}", e.what()); } } // Define a callback to handle incoming messages void WebSocketServer::on_message(websocketpp::connection_hdl hdl, message_ptr msg) { if (msg->get_payload() == "stop-listening") { ws_server.stop_listening(); return; } try { msg_parser(hdl,msg->get_payload()); } catch (websocketpp::exception const & e) { LOG_ERROR("Echo failed because::{}", e.what()); ws_server.send(hdl, to_string(-101), websocketpp::frame::opcode::text); } } void WebSocketServer::on_open(websocketpp::connection_hdl hdl) { LOG_INFO("连接sip服务器成功!"); } void WebSocketServer::on_fail(websocketpp::connection_hdl h){ LOG_WARN("连接sip服务器失败!"); } void WebSocketServer::on_close(websocketpp::connection_hdl h){ LOG_INFO("on_close:{}", h.lock().get()); std::lock_guard l_hdl(m_channelport_hdl_map_mtx); for (auto it = m_channelport_hdl_map.begin(); it != m_channelport_hdl_map.end(); ) { server::connection_ptr con = ws_server.get_con_from_hdl(it->second); server::connection_ptr con_std = ws_server.get_con_from_hdl(h); if (con == con_std) { it = m_channelport_hdl_map.erase(it); } else { it++; } } } void WebSocketServer::start() { ConfigParser* pConfig = ConfigParser::getInstance(); if (!pConfig->init()) { LOG_ERROR("get config failed!"); return; } ServerInfo info = pConfig->getServerCfg(); sip_server.Init(&info, this); try { // Set logging settings ws_server.set_access_channels(websocketpp::log::alevel::all); ws_server.clear_access_channels(websocketpp::log::alevel::frame_payload); // Initialize Asio ws_server.init_asio(); // Register our message handler ws_server.set_message_handler(bind(&WebSocketServer::on_message,this,::_1,::_2)); ws_server.set_open_handler(bind(&WebSocketServer::on_open, this, _1)); ws_server.set_fail_handler(bind(&WebSocketServer::on_fail, this, _1)); ws_server.set_close_handler(bind(&WebSocketServer::on_close, this, _1)); // Listen on port 9002 int port = info.getWsPort(); LOG_INFO("websocket server listen:{}", port); ws_server.listen(port); // Start the server accept loop ws_server.start_accept(); // Start the ASIO io_service run loop ws_server.run(); } catch (websocketpp::exception const & e) { LOG_ERROR("websocket start failed:{}", e.what()); } catch (...) { LOG_ERROR("websocket start failed: other exception"); } }