A Tip to Run Testcases Efficiently

In some situation, we cannot run testcases concurrently, because the user(file or other attribute) status is changed in testcase. We need reset user status in teardown to ensure next testcase can use the user.

It`s a trouble when the cases continuously increasing. The runtime is proportional to the case num. If you want to run run cases concurrently, you need use different user in all cases.

So I designed XPool. It`s useful for those situations bellow:

  • 1st: All cases in test group use one account(or file), and the status of account will be changed in testcase.
  • 2nd: There are a lot of cases in group, but cannot run concurrently due to first reason.
  • 3rd: You really want to run concurrently.

What`s XPool

The Flow Chart of XPool

Before using XPool, you need prepare several user(or file) and write reset function(or api).

  1. Case gets fresh user by XPool api.
  2. Then the user will be pushed to invalid pool.
  3. The user will be recycled by XPool automatically by worker after certain seconds (or by release api after case finish).

Demo Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""DO NOT COPY DIRECTLY !
You need make the code more robust before use it.
"""
import flask
from flask import Flask
import redis
from flask_apscheduler import APScheduler


ValidPool = 'xpool:valid'
InvalidPool = 'xpool:invalid'
LockKey = 'xpool:lock_{xid}'


class Config(object):
JOBS = [
{
'id': 'job',
'func': 'scheduler:reset_worker',
'trigger': 'interval',
'seconds': 5
}
]
SCHEDULER_API_ENABLED = True


app = Flask(__name__)
app.config.from_object(Config())
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()

redis = redis.Redis(host='127.0.0.1', port=6379)


@app.route('/api/get', methods=['GET'])
def get():
args = flask.request.args.to_dict()
duration = int(args.get('duration', 30))
# get xid from valid pool
xid = redis.spop(ValidPool)
if not xid:
return {'status': 1, 'msg': 'get nothing in pool'}, 200

lock = LockKey.format(xid=xid)
# lock the xid for x seconds
redis.setex(name=lock, time=duration, value='1')
# push xid to invalid pool
redis.sadd(InvalidPool, xid)

return {'status': 0, 'xid': xid}, 200


def reset(xid):
# reset function
print('reset xid: {}'.format(xid))
return


def reset_worker():
xid_list = redis.smembers(InvalidPool)
if not xid_list:
return
for xid in xid_list:
lock = LockKey.format(xid=xid)
# judge xid lock
if redis.get(lock):
continue
# reset xid
reset(xid)
# remove xid from invalid pool
redis.srem(InvalidPool, xid)
# push xid to valid pool
redis.sadd(ValidPool, xid)
return

if __name__ == '__main__':
app.run(port=8001)