您好,欢迎访问代理记账网站
  • 价格透明
  • 信息保密
  • 进度掌控
  • 售后无忧

python可以做接口压测???看完这篇文章教你学会用Thread模块进行接口多线程压测!!!

开源系统源码学习学习:http://github.crmeb.net/u/lin
压测过程图片:
在这里插入图片描述
下面是代码模板直接加入接口请求,修改线程数相关参数直接运行,下面有实例介绍:


"""
	Thread 线程模块进行接口压测
"""
 
import threading,time
 
class My_Thread:
 
	def __init__(self):
 
		# 添加总启动的线程列表
		self.all_thread = []
		# 这里用 requests 中的elapsed.total_seconds()方法统计接口请求时间
		self.requests_time = []
		# 添加成功的测试请求
		self.Pass_requests = []
		# 添加失败的请求接口
		self.Fail_requests = []
 
	def test_script(self,event,threadName,runTime):
		"""
		:param event: 		Thread类中的event方法
		:param threadName: 	线程数
		:param runTime: 	持续时间,分钟单位
		"""
 
		print("线程 {} 初始化完毕,随时可以启动...\n".format(threadName))
		# 线程等待
		event.wait()
		print("线程 {} 开始执行...\n".format(threadName))
		# 获取当前时间秒
		now = time.time()
		while True:
			if now + (runTime * 60) > time.time():
				"""
				这里可以添加请求的接口/要测试的系统接口流程
				"""
				print("请求接口中...")
			else:
				break
 
	def run_thread(self,threadNum,runTime,startSeconds,endSeconds):
 
		"""
		:param	threadNum		启动的线程数量
		:param	runTime			持续时间分钟为单位
		:param	startSeconds	每个多少秒启动一个
		:param	endSeconds		每个多少秒结束一个
		"""
		# 实例化Event线程
		event = threading.Event()
		# 所有添加线程列表中
		for i in range(1,threadNum+1):
			self.all_thread.append(threading.Thread(target=self.test_script,args=(event,str(i),runTime)))
 
		event.clear()
		# 启动线程
		for thread in self.all_thread:
			time.sleep(startSeconds)
			thread.start()
 
		event.set()
		# 结束子线程
		for thread in self.all_thread:
			time.sleep(endSeconds)
			thread.join()
 
	def test_result(self):
		"""统计测试结果信息"""
 
		print("==================== 测试结果数据 ====================")
		print("总启动线程数:%s"%len(self.all_thread))
		print("总耗时:%s"%sum(self.requests_time))
		print("平均请求耗时:%s"%str(sum(self.requests_time)/len(self.requests_time)))
		print("请求成功:%s"%len(self.Pass_requests))
		print("请求失败:%s"%len(self.Fail_requests))
		print("==================== 测试结果数据 ====================")
 
if __name__ == '__main__':
 
	# 启动线程熟练 10 个
	threadNum = 10
	# 持续时间
	runTime = 1
	# 多长时间加载一个
	startSeconds = 1
	# 多长时间结束一个
	endSeconds = 1
	test = My_Thread().run_thread(threadNum,runTime,startSeconds,endSeconds)

基于LR自带订票系统,登录接口进行压测:


"""
    利用threading模块进行接口压测
"""
 
import requests, threading, re, time, datetime
 
event = threading.Event()
 
 
def get_session():
    headers = {
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
        "Cookie": "MSO=SID&1579350070",
        "Referer": "keep-alive",
        "Connection": "keep-alive",
    }
    try:
        r = requests.get(url="http://127.0.0.1:1080/WebTours/nav.pl?in=home", headers=headers)
        regex = re.compile(r"name=userSession value=(.*?)>")
        return regex.findall(r.text)[0]
    except Exception as e:
        print(e)
 
 
request_tm = []
Pass = []
Fail = []
 
 
def login(running_time,event,i):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded",
        "Cookie": "MSO=SID&1579351869",
        "Content-Length": "128"
    }
    body = {
        "userSession": get_session(),
        "username": "13720673941",
        "password": "111111",
        "login.x": "61",
        "login.y": "7",
        "JSFormSubmit": "off"
    }
 
    print("线程 {} 初始化完毕,随时可以启动...\n".format(i))
    # 线程等待
    event.wait()
    print("线程 {} 开始执行...\n".format(i))
    # 获取当前时间秒
    now = time.time()
    while True:
        if now + (running_time * 60) > time.time():
            try:
                global r
                r = requests.post(url="http://127.0.0.1:1080/WebTours/login.pl", headers=headers, data=body, timeout=5)
                request_tm.append(r.elapsed.total_seconds())
 
                if r.status_code == 200:
                    Pass.append(r.status_code)
                    print("第 {thread_num} 线程请求成功!协议状态码:{status_code}\n".format(thread_num=i, status_code=r.status_code))
            except Exception as e:
                Fail.append(r.status_code)
                print("第 {thread_num} 线程请求异常!协议状态码:{status_code}\n".format(thread_num=i, status_code=r.status_code))
                print(e)
        else:
            break
 
 
 
thread_list = []
 
 
def run(running_time,thread_num,start_time, end_time):
    """
    running_time: 持续时间 分钟为单位
    thread_num: 线程数
    start_time:几秒启动一个线程
    end_time:  几秒结束一个线程
    """
    for i in range(1, thread_num + 1):
        thread_list.append(threading.Thread(target=login, args=(running_time,event,i)))
 
    event.clear()
    for thread in thread_list:
        time.sleep(start_time)
        thread.start()
 
    event.set()
    for thread in thread_list:
        time.sleep(end_time)
        thread.join()
 
 
# --------------------------------------------------------------------------------
 
# 持续时间分钟
running_time = 1
# 线程数量
thread_num = 10
# 2秒增加一个线程
start_time = 1
# 1秒结束一个线程
end_time = 1
run(running_time, thread_num, start_time, end_time)
 
print("=================== 测试结果 ====================")
 
print("启动总线程:%s" % len(thread_list))
print("总耗时:%s" % sum(request_tm))
print("平均请求耗时:%s" % str(sum(request_tm) / len(request_tm)))
print("请求成功:%s" % len(Pass))
print("请求失败:%s" % len(Fail))
 
print("=================== 测试结束 ====================")

开源系统源码学习学习:http://github.crmeb.net/u/lin


分享:

低价透明

统一报价,无隐形消费

金牌服务

一对一专属顾问7*24小时金牌服务

信息保密

个人信息安全有保障

售后无忧

服务出问题客服经理全程跟进