本篇文章主要介绍如何使用locust对MQTT协议进行压测,jmeter也支持对mqtt协议进行压测,但是没有locust这么灵活,locust可以实现模拟大批量用户,并且跟踪每个用户的请求数量、失败数、平均响应时间,jmeter就没有这个效果,只是一个汇总的测试结果,并且locust适合mqtt这种千、万级请求,jmeter性能就没有这么强
一、安装locust
1.首先你本机需要安装python,因为locust是基于python环境,本次python的版本是3.9.2
2.使用pip3 命令安装locust,提示了Successfully 就代表安装成功
pip3 install locust
# 查看版本
locust --version
二、编写python代码
1.本次压测是使用本机安装的apache-activemq-5.15.9当作MQTT服务器
from locust import User, TaskSet, events, task, between
import paho.mqtt.client as mqtt
import time
import random
COUNTClient = 0
#broker_address="broker.mqttdashboard.com"
broker_address="127.0.0.1" #服务器ip地址,实际压测要根据mqtt的实际ip进行修改
REQUEST_TYPE = 'MQTT'
PUBLISH_TIMEOUT = 10000 #超时时间
def fire_locust_success(**kwargs):
events.request_success.fire(**kwargs)
def increment():
global COUNTClient
COUNTClient = COUNTClient+1
def time_delta(t1, t2):
return int((t2 - t1)*1000)
class Message(object):
def __init__(self, type, qos, topic, payload, start_time, timeout, name):
self.type = type,
self.qos = qos,
self.topic = topic
self.payload = payload
self.start_time = start_time
self.timeout = timeout
self.name = name
class PublishTask(TaskSet):
def on_start(self):
self.client.connect(host=broker_address, port=1883, keepalive=60)
self.client.disconnect()
@task(1)
def task_pub(self):
self.client.reconnect()
self.client.loop_start()
self.start_time = time.time()
topic = "jmeter相关技术交流" #主题名称,可根据需要进行修改
payload = "Device - " + str(self.client._client_id) #定义设备名称,可以按需更改
count = random.randint(0,9)
print('----执行随机数---{}'.format(count))
if count % 2 == 0:
payload ='{"ctrl":{"mac":"60A423FFFE5E0589","endpoint":1,"OnOff":1, "ack":"You are Done"}}'
else:
payload ='{"ctrl":{"mac":"60A423FFFE5E0589","endpoint":1,"OnOff":0, "ack":"You are Done"}}'
MQTTMessageInfo = self.client.publish(topic,payload,qos=0, retain=False)
pub_mid = MQTTMessageInfo.mid
print("Mid = " + str(pub_mid)) #用来在pycharm输出模拟的设备数
self.client.pubmessage[pub_mid] = Message(
REQUEST_TYPE, 0, topic, payload, self.start_time, PUBLISH_TIMEOUT, str(self.client._client_id)
)
MQTTMessageInfo.wait_for_publish()
self.client.disconnect()
self.client.loop_stop()
time.sleep(5)
wait_time = between(0.5, 10)
class MQTTLocust(User):
tasks = {PublishTask}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
increment()
client_name = "Device - " + str(COUNTClient)
self.client = mqtt.Client(client_name)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_publish = self.on_publish
self.client.pubmessage = {}
def on_connect(client, userdata, flags, rc, props=None):
fire_locust_success(
request_type=REQUEST_TYPE,
name='connect',
response_time=0,
response_length=0
)
def on_disconnect(client, userdata,rc,props=None):
print("Disconnected result code "+str(rc))
def on_publish(self, client, userdata, mid):
end_time = time.time()
message = client.pubmessage.pop(mid, None)
total_time = time_delta(message.start_time, end_time)
fire_locust_success(
request_type=REQUEST_TYPE,
name=str(self.client._client_id),
response_time=total_time,
response_length=len(message.payload)
)
三、启动locust
1.在你的python脚本目录执行如下命令:
locust -f program_backups.py --host=127.0.0.1
2.命令解释 program_backups.py是你编写的python脚本文件名称,host是对应你的mqtt服务器ip,本次是在pycharm的命令行执行该命令,
3.你也可以直接在cmd下执行
4.命令行提示 Starting Locust,代表执行成功
四、执行压测
1.在浏览器输入http://localhost:8089/ ,进入locust压测页面 Number of users代表要压测的用户数,Spawn rate 代表每秒加载的用户数,Host是你的mqtt服务器ip 点击Start swarming开始压测
2.Statistics显示每个用户的请求数、响应时间等指标,Charts通过图表展示每秒请求数(RPS),响应时间,用户数 Failures显示报错数,Download Data压测报告
3.Locust界面不支持压测场景的设置,所以需要自己进行手动更改,比如压测完20用户,需要模拟100用户,就需要点击New test进入用户设置界面,修改完成,点击start swarming即可
4.最后贴一张实际工作过程中的压测结果,将并发数递度增至10、20、30时,当并发达20时,网关长时间无响应,再进一步增至30,网关完全无响应,得出网关的性能瓶颈在设备数为300、并发20达到拐点(本次是对网关进行压测)。
如果文章对你有帮助,欢迎关注本人公众号,公众号与本平台文章同步,方便大家查阅,本人会持续推出与测试有关的文章,与大家分享测试技术,每一篇原创文章都是用心编写,杜绝抄袭复制
QQ技术交流群:加群请输入验证信息 51cto
微信二维码关注公众号:
关注之后,回复资源下载,即可获取本人共享的各种资源下载地:
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/168303.html