locust_3

目标:使用locust 对websocket接口做压测

大佬文档:https://www.cnblogs.com/shenh/p/18321028

需要的库:

1
2
3
pip install websockets  locust   websocket-client
#更新最新版本locust
pip install --upgrade locust

服务端demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import websockets
import json
async def handle_connection(websocket, path):
print(f"New connection from {path}")
try:
async for message in websocket:
print(f"Received: {message}")
response = f"Echo: {message}"

response_data = {
"data": ["Example", "Data", "For", "hello_world"]
}

# 返回 JSON 响应
await websocket.send(json.dumps(response_data))
# await websocket.send(response)
except websockets.ConnectionClosed as e:
print(f"Connection closed: {e}")

async def main():
server = await websockets.serve(handle_connection, "localhost", 8765)
print("WebSocket server started on ws://localhost:8765")
await server.wait_closed()

if __name__ == "__main__":
asyncio.run(main())

客户端demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from locust import task, between, User, events
import os, json, time, websocket, socket, sys


class WebSocketClient(object):
def __init__(self, host):
self.host = host
self.ws = websocket.WebSocket()

def connect(self, url):
self.ws.connect(url=self.host + url)

return self.ws

def send(self, msg):
self.ws.send(msg)

def recv(self):
return self.ws.recv()

def close(self):
self.ws.close()


class WebsocketLocust(User):
# 定义抽象类
abstract = True

# 每个虚拟用户执行完一次任务后随机1~5秒等待,再执行下一个任务
# wait_time = between(1, 5)
def __init__(self, *args, **kwargs):
super(WebsocketLocust, self).__init__(*args, **kwargs)
# 输入真实的ws接口ip和端口
self.client = WebSocketClient('ws://localhost:8765')


# 定义用户行为
class UserTask(WebsocketLocust):
def on_start(self):
'''初始化数据,每个虚拟用户只执行一次'''
self.url = "/"
self.ws = self.client.connect(self.url)
self.params = {'name':'hello world'}

# 多个接口用@task(1)、@task(2)分配权限,按1:2流量分配
@task
def test(self):
start_time = time.time()
try:
self.ws.send(json.dumps(self.params)) # 将参数转换为 JSON 格式并通过 WebSocket 发送
r = self.ws.recv() # 接收从 WebSocket 服务器返回的响应
total_time = time.time() - start_time # 计算请求的总时间
result = json.loads(r)['data'] # 将响应数据解析为 JSON,并提取 'data' 部分
print(result) # 打印结果
print(type(result)) # 打印结果的类型
assert result[0] == 'Example' # 验证结果的第一个元素不等于 'Example'
events.request.fire(request_type="websockt", name=self.url, response_time=total_time * 1000,
response_length=len(r), response=result) # 记录请求事件

except AssertionError:
# 如果断言失败(即 result[0] == 'Example'),记录断言错误
events.request.fire(request_type="websockt", name=self.url, response_time=total_time * 1000,
response_length=0, exception=f"断言错误,response:{result}")

except socket.timeout:
# 如果发生超时错误,记录用户错误
events.user_error.fire(user_instance=UserTask, exception='Timeout', tb=sys.exc_info()[2])

except Exception as e:
# 处理其他异常,并记录用户错误
events.user_error.fire(user_instance=UserTask, exception=e, tb=sys.exc_info()[2])

def on_stop(self):
'''销毁数据,每个虚拟用户只执行一次'''
self.ws.close()


if __name__ == "__main__":
os.system('locust -f locust_test.py')

效果:

locust_3