HP-Socket
工作流程
- 创建监听器
- 创建通信组件(同时绑定监听器)
- 启动通信组件
- 连接到目标主机(Agent组件)
- 处理通信事件(OnConnect/OnReceive/OnClose等)
- 停止通信组件(可选:在第7步销毁通信组件时会自动停止组件)
- 销毁通信组件
- 销毁监听器
传输速度测试
客户端发送信息给服务端
- 分次发送多个文件,2G大小在20秒左右
- 单次发送已打包的大文件,2G大小在7秒左右
之前速度异常原因:
每次OnReceive和OnSend都执行了cout,拉低了整体速度
发送失败
发送端
1. 等待接收端处理数据时间超过最长心跳时间,自动断连,已解决(检测发送过程是否断连,并重连)
2. 【不稳定复现】发送大文件时,可能在发送端输出缓冲区中囤积过量数据,无法发送到接收端【暂时没有复现】
猜测原因:1. 接收端输入缓冲区空间不足,且数据因接收端问题(数据处理失败)无法继续取出。
2. 接收端因某种原因(未知)停止接收数据(可以检查输入缓冲区数据量来确认缓存区是否已满,有没有在接收数据)。
图像数据打包和还原
每次发送数据先发送一段包头,包头数据包括当前图像id,行数,列数,图像编码类型,当前发送数据大小,图像总数量。
用于接收端检测数据传输是否完整(pack模式下每个小包必然是完整包,因此只需要在当检测到下一个包头的时候,检验当前收到的数据大小是否与图像大小一致就可以判断是否产生丢包)
目前选用的pack模式流程
Mermaid Loading...
Mermaid Loading...
代码
客户端
#include<HPSocket/HPSocket.h>
#include<vector>
#include<opencv2/opencv.hpp>
#include"pack.h"
#pragma pack 1
using namespace std;
bool sending = false;
class CListenerImpl : public ITcpClientListener
{
public:
virtual EnHandleResult OnHandShake(ITcpClient* pSender, CONNID dwConnID)
{
return HR_OK;
}
virtual EnHandleResult OnPrepareConnect(ITcpClient* pSender, CONNID dwConnID, SOCKET socket)
{
cout << "PrepareConnect"<<endl;
return HR_OK;
}
virtual EnHandleResult OnConnect(ITcpClient* pSender, CONNID dwConnID)
{
cout << "Connected" << endl;
return HR_OK;
}
virtual EnHandleResult OnReceive(ITcpClient* pSender, CONNID dwConnID,const BYTE* pData, int iLength)
{
//cout << "Recevied from " << dwConnID << " With Data and Length = " << iLength << endl;
return HR_OK;
}
virtual EnHandleResult OnReceive(ITcpClient* pSender, CONNID dwConnID, int iLength)
{
//cout << "Recevied from " << dwConnID << " Length = " << iLength << endl;
return HR_OK;
}
virtual EnHandleResult OnSend(ITcpClient* pSender, CONNID dwConnID, const BYTE* pData, int iLength)
{
//cout << "Send data from " << dwConnID << " Length = "<< iLength << endl;
return HR_OK;
}
virtual EnHandleResult OnClose(ITcpClient* pSender, CONNID dwConnID, EnSocketOperation enOperation, int iErrorCode)
{
//cout << "Closed\n";
//cout << enOperation << endl << iErrorCode << endl;
//cout << "Closed";
return HR_OK;
}
};
struct imgHeader {
int id;
int rows;
int cols;
int type = CV_8UC3;
int size;
int sum = 3000;
int sumSize;
};
uchar* pack(cv::Mat* imgInput, int imgLength, int zoom, int &nowLength)
{
int sumSize = 0;
vector<cv::Mat> imgs(imgLength);
for (int i = 0; i < imgLength; i++)
{
cv::resize(imgInput[i], imgs[i], cv::Size(imgInput[i].rows / zoom, imgInput[i].cols / zoom));
sumSize += imgs[i].total() * imgs[i].elemSize();
}
sumSize += imgs.size() * sizeof(imgHeader);
uchar* buffer = packImages(imgs.data(), imgs.size(), sumSize); // 已知所有图片数据
nowLength = sumSize;
return buffer;
}
void send(cv::Mat* imgInput, int imgLength,int zoom,
char* _ip, int _port, int _socketBufferSize)
{
clock_t startTime, endTime;
startTime = clock();
int nowLength = 0;
uchar* buffer = pack(imgInput, imgLength, zoom, nowLength);
endTime = clock();
cout << "[INFO] Pack Time(ms): " << endTime - startTime << endl;
CListenerImpl listen;
CTcpPackClientPtr client(&listen);
if (!client->Start(_ip, _port))
{
cout << "[ERROR] FAIL TO START...ERROR CODE : " << client->GetLastError() << endl;
exit(1);
}
printf("[SUCCESS] 启动客户端服务成功\n");
while (!client->IsConnected());
startTime = clock();
int st = 0;
while (nowLength)
{
if (client->GetState() == SS_STOPPED)
{
cout << "[WARNING] Stopped" << endl;
if (!client->Start(_ip, _port))
{
cout << "[ERROR] FAIL TO START...ERROR CODE : " << client->GetLastError() << endl;
exit(1);
}
}
int size = min(nowLength, _socketBufferSize);
if (!client->Send(buffer + st, size))
{
printf("[ERROR] Wrong Reason = %d\n", client->GetLastError());
}
else {
nowLength -= size;
st += size;
}
}
int len = 0;
// 等待缓存区内未发送数据发送完毕
while (1)
{
client->GetPendingDataLength(len);
if (len == 0)break;
}
endTime = clock();
cout << "[INFO] send time(ms): " << endTime - startTime << endl;
client->Stop();
delete[] buffer;
}
int main()
{
cv::String path = "imgs";
vector<cv::String> fn;
cv::glob(path, fn, false);
int count = fn.size();
vector<cv::Mat> tmpimgs(count);
for (int i = 0; i < count; i++)
{
tmpimgs[i] = cv::imread(fn[i]);
}
char _ip[20];
memcpy(_ip, "127.0.0.1", sizeof("127.0.0.1"));
send(tmpimgs.data(), count,4, _ip,5000,4096);
return 0;
}
服务端
#include <hpsocket/HPSocket.h>
#include <iostream>
#include <fstream>
#include<vector>
#include<opencv2/opencv.hpp>
#include<ctime>
#include"unpack.h"
#pragma pack 1
using namespace std;
struct imgHeader {
int id;
int rows;
int cols;
int type = CV_8UC3;
int size;
int sum;
int sumSize;
};
int preferLength = 0;
char* buffer = new char[1024];
int bufferP = 0;
bool jumpout = 0;
clock_t startTime, endTime;
unsigned long long length = 0;
class CListenerImpl : public ITcpServerListener
{
public:
virtual EnHandleResult OnPrepareListen(ITcpServer* pSender, SOCKET soListen)
{
return HR_OK;
}
virtual EnHandleResult OnAccept(ITcpServer* pSender, CONNID dwConnID, UINT_PTR soClient)
{
return HR_OK;
}
virtual EnHandleResult OnHandShake(ITcpServer* pSender, CONNID dwConnID)
{
//cout << "HandShaked" << endl;
return HR_OK;
}
virtual EnHandleResult OnReceive(ITcpServer* pSender, CONNID dwConnID, const BYTE* pData, int iLength)
{
if (length == 0)
{
startTime = clock();
char* tmp = new char[sizeof(imgHeader)];
memcpy(tmp, const_cast<BYTE*>(pData), sizeof(imgHeader));
imgHeader* nowHeader = reinterpret_cast<imgHeader*>(tmp);
//printf("id = %d \nrows = %d \ncols = %d \ntype = %d \nsize = %d \nsum = %d \nsumSize = %d\n "
// , nowHeader->id, nowHeader->rows, nowHeader->cols, nowHeader->type, nowHeader->size, nowHeader->sum, nowHeader->sumSize);
buffer = new char[nowHeader->sumSize];
preferLength = nowHeader->sumSize;
delete nowHeader;
}
memcpy(buffer+bufferP, const_cast<BYTE*>(pData), iLength);
bufferP += iLength;
length += iLength;
return HR_OK;
}
virtual EnHandleResult OnReceive(ITcpServer* pSender, CONNID dwConnID, int iLength)
{
//cout << "Receved From " << dwConnID << " Length = " << iLength << endl;
return HR_OK;
}
virtual EnHandleResult OnSend(ITcpServer* pSender, CONNID dwConnID, const BYTE* pData, int iLength)
{
return HR_OK;
}
virtual EnHandleResult OnClose(ITcpServer* pSender, CONNID dwConnID, EnSocketOperation enOperation, int iErrorCode)
{
if (length == preferLength)
{
endTime = clock();
cout << "receive time(ms): " << endTime - startTime << endl;
//cout << length << endl;
cout << "Closed" << endl;
cout << enOperation << endl << iErrorCode << endl;
jumpout = 1;
}
else {
cout << "[ERROR] ERROR CODE : " << iErrorCode << endl;
return HR_ERROR;
}
return HR_OK;
}
virtual EnHandleResult OnShutdown(ITcpServer* pSender)
{
return HR_OK;
}
};
void serverListen(char* _ip, int _port, int _socketBufferSize, vector<cv::Mat>& imgs,bool &stop)
{
CListenerImpl s_listener;
CTcpPackServerPtr s_pserver(&s_listener);
if (!s_pserver->Start(_ip, _port))
exit(1);
while (!stop)
{
if (!s_pserver->GetConnectionCount())continue;
if (jumpout) {
startTime = clock();
vector<cv::Mat> _imgs = unpackImages(buffer, length);
imgs = _imgs;
endTime = clock();
cout << "unpack time(ms): " << endTime - startTime << endl;
delete[] buffer;
buffer = new char[1024];
preferLength = 0;
bufferP = 0;
jumpout = 0;
length = 0;
_imgs.clear();
}
}
delete[] buffer;
s_pserver->Stop();
}
int main(int argc, char* const argv[])
{
CListenerImpl s_listener;
CTcpPackServerPtr s_pserver(&s_listener);
//s_pserver->SetSocketBufferSize(SOCKETBUFFERSIZE);
//s_pserver->SetMaxPackSize(0x3FFFFF);
//s_pserver->SetSocketBufferSize(0x3fffff);
char _ip[20];
memcpy(_ip, "127.0.0.1", sizeof("127.0.0.1"));
vector<cv::Mat> imgs;
bool stop = false;
thread th(serverListen, _ip, 5000, 4096, ref(imgs), ref(stop));
cout << "Press Enter to stop..." << endl;
cin.get();
stop = true;
th.join();
return 0;
}
ToDo List
1.当内存中准备好对应的图像数据后,计算打包+发送的时间。
当内存中已准备好数据时,100个图像总计2G大小打包只需要1.5~2s,完全发送需要7s左右。加上服务端接收+解包,总计需要9s左右
2.当内存中准备好对应的图像数据后,计算压缩图像(长宽各自变为之前的1/2)+打包+发送的时间;接收端计算接收+解包+恢复原始长宽的时间。
压缩+打包0.2s左右,发送1.5s左右,接收1.5s左右,解包+恢复0.8s左右,总计时间2.7s左右
- 目前选择方案 当内存中准备好对应的图像数据后,计算压缩图像(长宽各自变为之前的1/4)(压缩大小至1/16)+打包+发送的时间;接收端计算接收+解包+恢复原始长宽的时间。
压缩+打包0.2s左右,发送1.5s左右,接收1.5s左右,解包+恢复1.7s左右,总计时间3.4s左右
目前通信速度可以使用,如果网络波动/断线,存在丢包问题,服务端和客户端会发出错误提示。
实际环境下只有在网线断开的情况才会产生丢包,目前的解决方案为:在发出错误提示后修复网络连接重新执行发包。
接口设计
void serverListen(char* _ip, int _port, int _socketBufferSize, vector<cv::Mat>& imgs,bool &stop);
// 服务端监听,传入vector<cv::Mat> imgs和bool stop用于获取数据和控制监听停止
// 没有加锁,不对多个该函数进行多线程并行操作,只能作为子线程依附主线程,通过主线程控制。
void send(cv::Mat* imgInput, int imgLength,int zoom,
char* _ip, int _port, int _socketBufferSize);
/**
* 根据输入的图像列表和监听器发送数据到服务端
* @param cv::Mat* imgInput 输入的图像数组
* @param int imgLength 输入图像个数(数组大小)
* @param int zoom 图像缩放倍率,大于1为缩小,例如zoom = 4 尺寸缩小4倍
**/
// 发送完成后自动关闭销毁。