#include "WebsocketClient.h" #include "../common_header.h" #include using namespace std; using websocketpp::lib::placeholders::_1; using websocketpp::lib::placeholders::_2; void WebsocketClient::on_open(websocketpp::connection_hdl hdl) { LOG_INFO( "连接sip服务器成功!"); mbClosed = false; } void WebsocketClient::on_fail(websocketpp::connection_hdl h){ LOG_INFO( "连接sip服务器失败!"); mbClosed = true; } void WebsocketClient::on_close(websocketpp::connection_hdl h){ LOG_INFO( "on_close"); mbClosed = true; } void WebsocketClient::on_message(websocketpp::connection_hdl hdl, message_ptr msg) { try { int ret = msg_parser(hdl,msg->get_payload()); // 返回执行结果 // ws_server.send(hdl, to_string(ret), websocketpp::frame::opcode::text); } catch (websocketpp::exception const & e) { LOG_ERROR( "Echo failed because:({})", e.what()); } } int WebsocketClient::msg_parser(websocketpp::connection_hdl hdl, string msg) { vector vec_msg = StringTools::split(msg, "|"); if (vec_msg.size() <= 0) { return -1; } int ret = -100; if ( StringTools::to_lower(vec_msg[1]) == "invite") { string rKey = vec_msg[0]; ret = atoi(vec_msg[3].c_str()); if (ret <= 0) { LOG_ERROR("invite failed:{}", msg); response_invite_failed(rKey); } } else if ( StringTools::to_lower(vec_msg[0]) == "bye") { if (StringTools::to_lower(vec_msg[1]) == "invite") { std::cout << "bye invite: " << vec_msg[2] << std::endl; } } else { LOG_ERROR( "on_message called with hdl:{} and message:{}",hdl.lock().get(), msg); } return ret; } bool WebsocketClient::readCfg() { std::ifstream cfgFile("./gb28181_cfg.xml"); if(cfgFile.is_open()) { string strConfig; string str; while(cfgFile >> str) { strConfig += str; } CCatalogParser catPaser; mInfo = catPaser.DecodeServerConfig(strConfig.c_str()); } else { LOG_ERROR("read sip server config file failed!"); return false; } cfgFile.close(); return true; } int WebsocketClient::init() { if (!readCfg()) { return -1; } uri = "ws://" + mInfo.getWsIp() + ":" + to_string(mInfo.getWsPort()); LOG_INFO("{}", uri); m_client.set_access_channels(websocketpp::log::alevel::all); m_client.clear_access_channels(websocketpp::log::alevel::frame_payload); m_client.clear_access_channels(websocketpp::log::alevel::frame_header); m_client.init_asio(); m_client.set_message_handler(websocketpp::lib::bind(&WebsocketClient::on_message, this,::_1, ::_2)); m_client.set_open_handler(websocketpp::lib::bind(&WebsocketClient::on_open, this, _1)); m_client.set_fail_handler(websocketpp::lib::bind(&WebsocketClient::on_fail, this, _1)); m_client.set_close_handler(websocketpp::lib::bind(&WebsocketClient::on_close, this, _1)); m_client.start_perpetual(); thread_=websocketpp::lib::make_shared(&client::run,&m_client); if (connect() != 0) { return -2; } if (mbClosed) { return -2; } return 0; } int WebsocketClient::connect() { websocketpp::lib::error_code ec; client::connection_ptr con = m_client.get_connection(uri, ec); if(ec) { std::cout << "could not create connection because: " << ec.message() << std::endl; return -1; } m_hdl = con->get_handle(); auto ret = m_client.connect(con); auto status = con->get_state();; if (status < 2) { return 0; } return -1; } int WebsocketClient::reconnect() { websocketpp::lib::error_code ec; client::connection_ptr con = m_client.get_connection(uri, ec); if(ec) { std::cout << "could not create connection because: " << ec.message() << std::endl; return -1; } m_hdl = con->get_handle(); auto ret = m_client.connect(con); auto status = con->get_state();; if (status == 1) { return 0; } return -1; } int WebsocketClient::check_connect(){ if (mbClosed) { if (reconnect() == 0) { return 0; } } else { return 0; } return -1; } void WebsocketClient::close() { m_client.close(m_hdl, websocketpp::close::status::normal, ""); } void WebsocketClient::terminate() { m_client.stop_perpetual(); thread_->join(); } void WebsocketClient::send_text(std::string msg) { if (!m_hdl.expired()) { m_client.send(m_hdl, msg, websocketpp::frame::opcode::text); } } int WebsocketClient::GetMinRtpPort(){ return mInfo.getMinRtpPort(); } int WebsocketClient::GetMaxRtpPort(){ return mInfo.getMaxRtpPort(); } int WebsocketClient::InviteUdp(std::string sip_channel_id, int rtp_port, RTPReceiver* r) { if (check_connect() < 0) { return -1; } string str_invite = "invite|udp|" + sip_channel_id + "|" + to_string(rtp_port); send_text(str_invite); cache_receiver(sip_channel_id, rtp_port, r); return 0; } int WebsocketClient::InviteTcp(std::string sip_channel_id, int rtp_port, RTPReceiver* r) { if (check_connect() < 0) { return -1; } string str_invite = "invite|tcp|" + sip_channel_id + "|" + to_string(rtp_port); send_text(str_invite); cache_receiver(sip_channel_id, rtp_port, r); return 0; } int WebsocketClient::ByeInvite(std::string sip_channel_id, int rtp_port) { if (check_connect() < 0) { return -1; } string str_invite = "bye|invite|" + sip_channel_id + "|" + to_string(rtp_port); send_text(str_invite); return 0; } void WebsocketClient::cache_receiver(std::string sip_channel_id, int rtp_port, RTPReceiver* r) { std::lock_guard l(m_receiver_map_mtx); string rKey = sip_channel_id + "_" + to_string(rtp_port); m_receiver_map[rKey] = r; } int WebsocketClient::DeleteReceiverPair(std::string sip_channel_id, int rtp_port) { std::lock_guard l(m_receiver_map_mtx); string rKey = sip_channel_id + "_" + to_string(rtp_port); auto it = m_receiver_map.find(rKey); if (it != m_receiver_map.end()) { m_receiver_map.erase(it); } } void WebsocketClient::response_invite_failed(std::string rKey) { std::lock_guard l(m_receiver_map_mtx); auto it = m_receiver_map.find(rKey); if (it != m_receiver_map.end() && it->second) { it->second->RequestStreamFailed(); } }