Python stress test tool

I want to write a tool that can generate charts based on the results. I can customize the development on the basis of this project. If you like or help you, I will give you a start

Complete code github address

Technologies and dependencies used

  1. multithreading

  2. The lock

  3. requests

  4. pyecharts

Two ways to initiate requests are configured (you can optimize the multithreaded loop if there are too many)

  1. Fast request, send all requests in the request pool
  2. A slow request is sent within a specified time
def fast_run(begin, end) :
   log.logger.warning("Start index{} End index{}".format(begin, end))
   run_start_time = time.time()
   log.logger.info('Start making fast asynchronous requests.... ')
   for request in syncPool[int(begin):int(end)]:
       request.start()
   log.logger.warning("Request takes {:.2f} how-seconds".format((time.time() - run_start_time) * 1000))
   
def slow_run(begin, end, slowTime) :
   log.logger.warning("Start index{} End index{}".format(begin, end))
   run_start_time = time.time()
   log.logger.info('Start making slow asynchronous requests.... ')
   if slowTime == 0:
       fast_run(begin, end)
   else:
       waitTime = round(((slowTime * 1000) / len(syncPool)) / 1000.3)
       log.logger.info("Delay time is {} milliseconds".format(waitTime * 1000))
       for request in syncPool[int(begin):int(end)]:
           request.start()
           time.sleep(waitTime)
   log.logger.warning("Request takes {} how-seconds".format((time.time() - run_start_time) * 1000))
Copy the code

Data statistics

def out(length) :
   log.logger.warning(F "Total number of requests{int(length)}Time. "")
   log.logger.warning(F "Successful request{len(sync.success)}Time. "")
   log.logger.warning(F "Failed{len(sync.fail)}Time. "")
   log.logger.warning(F "Traffic limiting request{sync.limit}Time. "")
   log.logger.warning("Success rate {:.2f} %".format((len(sync.success) / int(length)) * 100))
   log.logger.warning("Failure rate {:.2f} %".format((len(sync.fail) / int(length)) * 100))
   log.logger.warning("Rate limiting % {:.2f} %".format(sync.limit / int(length) * 100))
   log.logger.warning("Maximum request time {:.2f} milliseconds".format(max(sync.response_time)))
   log.logger.warning("Minimum request time {:.2f} milliseconds".format(min(sync.response_time)))
   log.logger.warning("Average request time {:.2f} milliseconds".format(sum(sync.response_time) / len(sync.response_time)))
   Print the information in the successful request
   for success in sync.success:
       log.logger.debug(success.content)
   # Print the information in the failed request
   for fail in sync.fail:
       if fail is not None:
           log.logger.debug(fail.content)
Copy the code

Multithreaded request object

class SyncRequestTask(threading.Thread) :

   def __init__(self, threadId, url, method, params, header, timeout=10) :
       threading.Thread.__init__(self)
       self.setName(f"sync-{threadId}")
       self.url = url
       self.method = method
       self.params = params
       self.timeout = timeout
       self.header = header

   # Send request
   def request(self) :
       req = None
       try:
           if self.method == 'GET':
               req = self.doGet()
               self.add(req)
           else:
               req = self.doPost()
               self.add(req)
       except Exception as e:
           print(e)
           fail.append(req)

   def doGet(self) :
       startTime = time.time()

       s = sessions.session()
       req = s.get(self.url, headers=self.header, timeout=self.timeout)
       data_build(self.getName(), time.time() - startTime)
       req.close()
       return req

   def doPost(self) :
       # request_body = json.dumps(self.params)
       s = sessions.session()
       startTime = time.time()
       req = s.post(self.url, data=self.params, headers=self.header, timeout=self.timeout)
       data_build(self.getName(), time.time() - startTime)
       req.close()
       return req

   @staticmethod
   def add(request) :
       global lock
       global limit
       lock.acquire()
       if request.status_code == 200:
           success.append(request)
       elif request.status_code == 429:
           limit += 1
       else:
           fail.append(request)
       lock.release()

   def run(self) :
       # Start sending requests
       self.request()
Copy the code