1.socket请求代码from tool import tcpSendMsg from locust import HttpUser, task, between, TaskSet,events import time,os class UserBehavior(TaskSet): host 10.7.0.66 port 8100 def on_start(self): 读取txt文件数据并为每个用户分配唯一行数据 self.user_lines [] # 读取txt文件中的数据行 if os.path.exists(hn_station_terminal_code.txt): with open(hn_station_terminal_code.txt, r, encodingutf-8) as f: self.user_lines [line.strip() for line in f.readlines() if line.strip()] # 为每个用户分配唯一的数据行基于用户ID if self.user_lines: user_index hash(id(self)) % len(self.user_lines) self.my_line_data self.user_lines[user_index] # 解析数据行假设格式为 username:password:email parts self.my_line_data.split(,) self.station_code parts[0] if len(parts) 0 else self.terminal_code parts[1] if len(parts) 1 else task(2) # 权重为2执行频率更高 def task1(self): trade SN登录 start_time time.time() bet_msg xxx;%s;1;%s;%s;%(self.station_code,self.station_code,self.terminal_code) re tcpSendMsg(self.host, self.port, trade, bet_msg) #print(self.station_code,self.terminal_code) print(re) content_length len(re) events.request.fire(request_typePOST, nametrade, response_time(time.time() - start_time) * 1000, response_lengthcontent_length ) class WebsiteUser(HttpUser): tasks [UserBehavior] wait_time between(0, 0) # 每次任务间的等待时间2.http请求代码from locust import HttpUser, task, between, TaskSet,events import requests import time, json class UserBehavior(TaskSet): def on_start(self): pass task(2) # 权重为2执行频率更高 def task1(self): trade APP签到请求 start_time time.time() data {lat:22.53533571881133,lng:113.95242007973332,stationId:2020800724734316544} # 发送http请求 header { Content-Type: application/json, Authorization: xxx } url xxx re requests.post(url, datajson.dumps(data), headersheader).text print(re) content_length len(re) events.request.fire(request_typePOST, nametrade, response_time(time.time() - start_time) * 1000, response_lengthcontent_length ) class WebsiteUser(HttpUser): tasks [UserBehavior] wait_time between(0, 0) # 每次任务间的等待时间3.运行命令locust -f locust_sn_login.py --web-hostlocalhost --hosthttp://localhost:8089