Notion Blog
技术分享1 分钟阅读

locust 脚本自动生成

json请求数据动态生成locust 脚本

import os

def write_locustfile(data):
    print(os.getcwd().replace('\\','/')  + '/file/test.py')
    path = os.getcwd().replace('\\','/')  + f'/file/{data[0]["name"]}.py'
    file = open(path,'w',encoding='utf-8')

    str_tmp = f"""
import json
from locust import HttpUser, task, between

# function  包装task
class MyHttpUser(HttpUser):
    wait_time = between(1, 2)  # 执行任务 等待时长   检查点 思考时间
    host = 'http://127.0.0.1'

"""
    for itme in data:
        task_tmp = f"""
    @task
    def {itme["name"]}(self):
        url = '{itme["url"]}'
        payload = json.dumps({itme["payload"]})
        headers = {itme["headers"]}
        self.client.post(url,data=payload,headers=headers)
        """

        str_tmp = str_tmp + task_tmp

    file.write(str_tmp)
    file.close()

list_data = [{"name":"login","url":"https://www.zaojingyoutu.top:8000/api/login/","payload":{"username": "zjy","password": "123456"},"headers":{'Content-Type': 'application/json'}},{"name":"baidu","url":"https://www.baidu.com/","payload":None,"headers":None}]

write_locustfile(list_data)根据不同的系统杀locust

curl转locust脚本

import json
import uncurl

def curl_to_locust(curl):
    # 解析curl命令为requests代码
    parsed = uncurl.parse_context(curl)
    # 获取请求方法、URL、头部和数据
    method = parsed.method.lower()
    url = parsed.url
    headers = dict(parsed.headers)
    data = json.loads(parsed.data)
    # 生成locust脚本的字符串
    locust = f"""
from locust import HttpUser, task, between

class MyUser(HttpUser):
    wait_time = between(1, 5) # 设置用户操作的等待时间

    @task # 装饰器,说明下面是一个任务
    def my_task(self): # 定义一个任务方法,方法名可以自己改
        url = '{url}' # 接口请求的URL地址
        headers = {headers} # 定义请求头
        data = {data} # 定义请求体
        with self.client.{method}(url, headers=headers, data=data, catch_response=True) as response: # 使用self.client发起请求,catch_response=True允许自定义断言
            if response.status_code == 200: # 根据实际情况编写断言条件
                response.success() # 断言成功,标记成功
            else:
                response.failure('my_task 接口失败!') # 断言失败,标记失败
"""
    return locust

locust的HTML测试报告生成

from locust.html import get_html_report #方法的位置

res=get_html_report(env) with open(‘report.html’, ‘w’) as f: f.write(res)

根据不同的系统杀locust

杀掉所有在运行的locust进程:ps -ef |grep locust|grep -v grep | awk ‘{print $2}’ | xargs kill -9

win: taskkill /f /IM locust.exe

判断系统

if 'win32' == sys.platform:
    cmd = 'taskkill /f /IM locust.exe'
elif 'linux' == sys.platform:
    cmd = 'ps -ef |grep locust|grep -v grep | awk '{print $2}' | xargs kill -9'
os

有关使用上的问题,欢迎您在底部评论区留言,一起交流~

读者评论

评论会同步写入该文在 Notion 中的页面底部(与正文同页,便于管理)。

0/1500

暂无评论,欢迎抢沙发。