網站首頁 編程語言 正文
一.功能介紹
??基于boost asio實現server端通信,采用one by one的同步處理方式,并且設置連接等待超時。下面給出了string和byte兩種數據類型的通信方式,可覆蓋基本通信場景需求。
二.string類型數據交互
??規定server與client雙方交互的數據格式是string,并且server采用read_until的方式接收來自client的消息,通過delimiter(分隔符)來判斷一幀數據接收完成,當未收到來自client的delimiter,那么server會一直等待,直到收到delimiter或超時。此處設置了本次回話的連接超時時間SESSION_TIMEOUT,程序中定義為2min,每次收到數據后重新計時,若是連續2min中內有收到來自client的任何消息,那么server會自動斷開本次連接,并且析構本次的session。
??通過string發送和接收的數據采用ASCII碼的編碼方式,因此不能直接發送byte數據,不然會產生亂碼(第三部分為byte數據交互);采用string數據傳輸方式可以方便的進行序列化與反序列化,例如采用json對象的傳輸的方式,可以方便的組織交互協議。
??下面是功能實現的完整程序,程序編譯的前提是已經安裝了boost庫,boost庫的安裝及使用方法在我的前述博客已有提到:boost庫安裝及使用
2.1 程序源碼
mian.cpp
#include "software.hpp"
int main(int argc, char** argv)
{
if(2 != argc){
std::cout<<"Usage: "<<argv[0]<< " port"<<std::endl;
return -1;
}
try {
boost::asio::io_context io;
int port = atoi(argv[1]); // get server port
software::server(io, port); // 開啟一個server,ip地址為server主機地址,port為mian函數傳入
}
catch (std::exception& e) {
std::cout<<"main exception: " << e.what()<<std::endl;
}
return 0;
}
software.hpp
#ifndef __SOFTWARE_HPP__
#define __SOFTWARE_HPP__
#include <string>
#include <iostream>
#include <boost/asio.hpp>
namespace software {
//! Session deadline duration
constexpr auto SESSION_TIMEOUT = std::chrono::minutes(2);
//! Protocol delimiter to software client;分隔符:接收來自client的string數據必須以"}\n"結尾
static constexpr char const* delimiter = "}";
namespace asio = boost::asio;
using tcp = asio::ip::tcp;
/**
* @brief Session for software
* Inherit @class enable_shared_from_this<>
* in order to give the lifecycle to io context,
* it'll causes the lifecycle automatically end when connection break
* (async operation will return when connection break)
* @code
* asio::io_context io;
* session sess(io, std::move(socket));
* io.run();
* @endcode
*/
class session
{
public:
/* session constructor function */
session(asio::io_context& io, tcp::socket socket);
/* session destructor function */
~session();
private:
/*! Async read session socket */
void do_read();
/*! Async wait deadline */
void async_deadline_wait();
/*! software on message handler */
void on_message(std::string&& message);
private:
tcp::socket socket_; // tcp socket
std::string recv_data_; // recv buffer[string]
asio::steady_timer deadline_; // wait deadline time,expire it will disconnect auto
};
/**
* @brief Start server to software(同步方式accept)
* Will serve client one by one(同步方式)
* @param[in] io The asio io context
* @param[in] port The listen port
*/
inline void server(asio::io_context& io, unsigned short port)
{
std::cout<<"sync server start, listen port: " << port << std::endl;
tcp::acceptor acceptor(io, tcp::endpoint(tcp::v4(), port));
// 一次處理一個連接[one by one]
while (true) {
using namespace std;
// client請求放在隊列中,循環逐個處理,處理完繼續阻塞
tcp::socket sock(io);
acceptor.accept(sock); // 一開始會阻塞在這,等待software client連接
io.restart();
session sess(io, std::move(sock)); // io socket
io.run(); // run until session async operations done,調用run()函數進入io事件循環
}
}
} // namespace software
#endif
software.cpp
#include "software.hpp"
using namespace std;
namespace software {
/**
* @brief Session construct function
* @param[in] io The io context
* @param[in] socket The connected session socket
*/
session::session(asio::io_context& io, tcp::socket socket)
: socket_(std::move(socket))
, deadline_(io)
{
std::cout<<"session created: " << socket_.remote_endpoint() <<std::endl;
do_read(); //在構造函數中調用do_read()函數完成對software數據的讀取
async_deadline_wait(); //set on-request-deadline
}
session::~session()
{
std::cout<<"session destruct!" << std::endl;
}
/**
* @brief 從software異步讀取數據并存放在recv_data_中
*/
void session::do_read()
{
auto handler = [this](std::error_code ec, std::size_t length) {
// recv data success, dispose the received data in [on_message] func
if (!ec && socket_.is_open() && length != 0) {
on_message(recv_data_.substr(0, length));
recv_data_.erase(0, length); // 將recv_data_擦除為0
do_read(); // Register async read operation again,重新執行讀取操作
}
// error occured, shutdown the session
else if (socket_.is_open()) {
std::cout<<"client offline, close session" << std::endl;
socket_.shutdown(asio::socket_base::shutdown_both); // 關閉socket
socket_.close(); // 關閉socket
deadline_.cancel(); // deadline wait計時取消
}
};
std::cout<<"server waiting message..." << std::endl;
// block here until received the delimiter
asio::async_read_until(socket_, asio::dynamic_buffer(recv_data_),
delimiter, // 讀取終止條件(分隔符號)
handler); // 消息處理句柄函數
deadline_.expires_after(SESSION_TIMEOUT); // close session if no request,超時2min自動關閉session
}
/**
* @brief Async wait for the deadline,計時等待函數
* @pre @a deadline_.expires_xxx() must called
*/
void session::async_deadline_wait()
{
using namespace std::chrono;
deadline_.async_wait(
//! lambda function
[this](std::error_code) {
if (!socket_.is_open())
return;
if (deadline_.expiry() <= asio::steady_timer::clock_type::now()) {
std::cout<< "client no data more than <"
<< duration_cast<milliseconds>(SESSION_TIMEOUT).count()
<< "> ms, shutdown" << std::endl;
socket_.shutdown(asio::socket_base::shutdown_both);
socket_.close();
return;
}
async_deadline_wait();
}
);
}
/**
* @brief SOFTWARE on message handler
* @param[in] message The received message
* &&表示右值引用,可以將字面常量、臨時對象等右值綁定到右值引用上(也可以綁定到const 左值引用上,但是左值不能綁定到右值引用上)
* 右值引用也可以看作起名,只是它起名的對象是一個將亡值。然后延續這個將亡值的生命,直到這個引用銷毀的右值的生命也結束了。
*/
void session::on_message(std::string&& message)
{
using namespace std;
try {
// print receive data
std::cout<<"recv from client is: "<<message<<std::endl;
// response to client
string send_buf = "hello client, you send data is: " + message;
asio::write(socket_, asio::buffer(send_buf));
}
catch (exception& ex) {
std::cout<<"some exception occured: "<< ex.what() << std::endl;
}
}
} // namespace software
分析一下系統執行流程:
- 在main函數中傳入io和port,調用 software.hpp中的server(asio::io_context& io, unsigned short port)函數。
- 在server()函數中while(True)循環體中accept來自client的連接,每次接收到一個client的連接會創建一個session對象,在session對象中處理本次的連接socket。注意,此處采用的是one by one的同步處理方式,只有上一個session處理完成才能處理下一個session的請求,但是同步發送的請求消息不會丟失,只是暫時不會處理和返回;總的來說,server會按照請求的順序進行one by one處理。
- session對象創建時會調用構造函數,其構造函數主要做了兩件事情:一是調用do_read()函數進行等待讀取來自client的數據并處理;二是通過async_deadline_wait()設置本次session連接的超時處理方法,超時時間默認設置為SESSION_TIMEOUT:deadline_.expires_after(SESSION_TIMEOUT)。
- 在do_read()函數中采用async_read_until()函數讀取來自client的數據,async_read_until()函數會將傳入的delimiter分隔符作為本次接收的結束標識。
- 當判斷本次接收數據完成后,會調用handler句柄對消息進行處理,在handler句柄中主要做了兩件事情:一是將收到的string信息傳入到on_message()消息處理函數中進行處理,只有當本條消息處理完成后才能接收下一條消息并處理,消息會阻塞等待,但是不會丟失;二是在消息處理完成后再次調用do_read()函數,進入read_until()等待消息,如此循環…
- 當發生錯誤或異常,在hander中會關閉本次socket連接,并且不會再調用其他循環體,表示本次session通信結束,之后調用析構函數析構session對象。
??socket_.shutdown(asio::socket_base::shutdown_both); // 關閉socket
??socket_.close(); // 關閉socket
??deadline_.cancel(); // deadline wait計時取消
- 在on_message()消息處理函數中會對收到的string數據進行處理(上述程序中以打印代替),然后調用asio::write(socket_, asio::buffer(send_buf))將response發送給client。
2.2 編譯&&執行
編譯:g++ main.cpp software.cpp -o iotest -lpthread -lboost_system -std=c++17
執行:./iotest 11112 (監聽端口為11112)
2.3 程序執行結果
可以看出,client發送的每條消息都要以"}"結束,這是設定的delimter分隔符。
可以看出,當超過2min沒有收到來自clinet的消息,server會自動斷開連接。
??tips:client1和clinet2可同時與server建立連接并發送數據,但是server會按照連接建立的先后順序對client發送的請求進行one by one處理,比如clinet1先與server建立了連接,那么只有等到clinet1的所有請求執行完成才會處理client2發送的請求;在等待期間client2發送的請求不會處理,但不會丟失。
三.byte類型數據交互
??上述給出了string類型數據的交互,但是string類型的數據只能采用ASCII碼的方式傳輸,在某些場景中,例如傳感器,需要交互byte類型的數據。因此下面給出了byte[hex]類型數據的交互。與上述的string數據交互流程基本一致,幾點區別在下面闡述:
- 將session類中的string recv_data_;替換成u_int8_t recv_data_[MAX_RECV_LEN];
- 數據讀取方式由read_until()改為:socket_.async_receive(asio::buffer(recv_data_,MAX_RECV_LEN),handler);
- on_message()數據處理函數變為:void on_message(const u_int8_t* recv_buf,std::size_t recv_len);
- 數據發送方式變為:socket_.async_send(asio::buffer(recv_buf,recv_len),[](error_code ec, size_t size){});
3.1 程序源碼
mian.cpp
??同上
software.hpp
#ifndef __SOFTWARE_HPP__
#define __SOFTWARE_HPP__
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#define MAX_RECV_LEN 2048
namespace software {
//! Session deadline duration
constexpr auto SESSION_TIMEOUT = std::chrono::minutes(2);
//! Protocol delimiter to software client;分隔符:接收來自client的string數據必須以"}\n"結尾
static constexpr char const* delimiter = "}";
namespace asio = boost::asio;
using tcp = asio::ip::tcp;
/**
* @brief Session for software
* Inherit @class enable_shared_from_this<>
* in order to give the lifecycle to io context,
* it'll causes the lifecycle automatically end when connection break
* (async operation will return when connection break)
* @code
* asio::io_context io;
* session sess(io, std::move(socket));
* io.run();
* @endcode
*/
class session
{
public:
/* session constructor function */
session(asio::io_context& io, tcp::socket socket);
/* session destructor function */
~session();
private:
/*! Async read session socket */
void do_read();
/*! Async wait deadline */
void async_deadline_wait();
/*! software on message handler */
void on_message(const u_int8_t* recv_buf,std::size_t recv_len);
private:
tcp::socket socket_; // tcp socket
u_int8_t recv_data_[MAX_RECV_LEN]; // recv buffer[byte]
asio::steady_timer deadline_; // wait deadline time,expire it will disconnect auto
};
/**
* @brief Start server to software(同步方式accept)
* Will serve client one by one(同步方式)
* @param[in] io The asio io context
* @param[in] port The listen port
*/
inline void server(asio::io_context& io, unsigned short port)
{
std::cout<<"sync server start, listen port: " << port << std::endl;
tcp::acceptor acceptor(io, tcp::endpoint(tcp::v4(), port));
// 一次處理一個連接[one by one]
while (true) {
using namespace std;
// client請求放在隊列中,循環逐個處理,處理完繼續阻塞
tcp::socket sock(io);
acceptor.accept(sock); // 一開始會阻塞在這,等待software client連接
io.restart();
session sess(io, std::move(sock)); // io socket
io.run(); // run until session async operations done,調用run()函數進入io事件循環
}
}
} // namespace software
#endif
software.cpp
#include "software.hpp"
using namespace std;
namespace software {
/**
* @brief Session construct function
* @param[in] io The io context
* @param[in] socket The connected session socket
*/
session::session(asio::io_context& io, tcp::socket socket)
: socket_(std::move(socket))
, deadline_(io)
{
std::cout<<"session created: " << socket_.remote_endpoint() <<std::endl;
do_read(); //在構造函數中調用do_read()函數完成對software數據的讀取
async_deadline_wait(); //set on-request-deadline
}
session::~session()
{
std::cout<<"session destruct!" << std::endl;
}
/**
* @brief 從software異步讀取數據并存放在recv_data_中
*/
void session::do_read()
{
auto handler = [this](std::error_code ec, std::size_t length) {
// recv data success, dispose the received data in [on_message] func
if (!ec && socket_.is_open() && length != 0) {
on_message(recv_data_, length);
memset(recv_data_,0,sizeof(recv_data_));// 將recv_data_擦除為0
do_read(); // Register async read operation again,重新執行讀取操作
}
// error occured, shutdown the session
else if (socket_.is_open()) {
std::cout<<"client offline, close session" << std::endl;
socket_.shutdown(asio::socket_base::shutdown_both); // 關閉socket
socket_.close(); // 關閉socket
deadline_.cancel(); // deadline wait計時取消
}
};
std::cout<<"server waiting message..." << std::endl;
//block here to receive some byte from client
socket_.async_receive(asio::buffer(recv_data_,MAX_RECV_LEN),handler);
deadline_.expires_after(SESSION_TIMEOUT); // close session if no request,超時2min自動關閉session
}
/**
* @brief Async wait for the deadline,計時等待函數
* @pre @a deadline_.expires_xxx() must called
*/
void session::async_deadline_wait()
{
using namespace std::chrono;
deadline_.async_wait(
//! lambda function
[this](std::error_code) {
if (!socket_.is_open())
return;
if (deadline_.expiry() <= asio::steady_timer::clock_type::now()) {
std::cout<< "client no data more than <"
<< duration_cast<milliseconds>(SESSION_TIMEOUT).count()
<< "> ms, shutdown" << std::endl;
socket_.shutdown(asio::socket_base::shutdown_both);
socket_.close();
return;
}
async_deadline_wait();
}
);
}
/**
* @brief SOFTWARE on message handler
* @param[in] recv_buf The received byte array address
* @param[in] recv_len The received byte length
*/
void session::on_message(const u_int8_t* recv_buf,std::size_t recv_len)
{
using namespace std;
try {
// print receive data
std::cout<<"recv data length is: "<<recv_len<<" data is: ";
for(int i = 0; i<recv_len; i++)
printf("%x ",recv_buf[i]);
std::cout<<std::endl;
// response to client
socket_.async_send(asio::buffer(recv_buf,recv_len),[](error_code ec, size_t size){});
}
catch (exception& ex) {
std::cout<<"some exception occured: "<< ex.what() << std::endl;
}
}
} // namespace software
3.2 編譯&&執行
編譯:g++ main.cpp software.cpp -o iotest -lpthread -lboost_system -std=c++17
執行:./iotest 11112 (監聽端口為11112)
3.3 程序執行結果
原文鏈接:https://blog.csdn.net/weixin_42700740/article/details/125873199
相關推薦
- 2022-09-02 Qt為exe添加ico圖片的簡單實現步驟_C 語言
- 2022-09-17 python生成requirements.txt文件的推薦方法_python
- 2022-07-24 示例剖析golang中的CSP并發模型_Golang
- 2022-05-04 詳解Python使用apscheduler定時執行任務_python
- 2022-02-01 微信小程序批量獲取input的輸入值,監聽輸入框,數據同步
- 2022-12-03 C++可擴展性與多線程超詳細精講_C 語言
- 2022-05-07 Python真題案例之蛇形數組詳解_python
- 2022-02-27 select組件選中后獲取當前值對應的對象信息
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支