测试开发进阶(八)

测试开发进阶(八)

并发和并行

多任务:操作系统可以同时运行多个任务。

  • 并发:任务数>cpu核数,每个cpu快速切换任务

  • 并行:任务数<cpu核数,每个cpu执行一个任务

多线程模块:threading

1
import threading 

异步处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import time
from threading import Thread


def use_time(func):
def wrapper(*args, **kwargs):
begin = time.time()
func(*args, **kwargs)
end = time.time()
print(f'耗时:{end - begin}')

return wrapper


def work1():
for i in range(6):
time.sleep(1)
print('执行浇花任务中...')


def work2():
for i in range(5):
time.sleep(1)
print('执行打墙任务中...')


@use_time
def main():
work1()
work2()


@use_time
def main1():
t1 = Thread(target=work2)
t1.start()
print('打墙的任务交给线程异步去处理')
work1()


if __name__ == '__main__':
main() # 耗时:11.038895845413208
main1() # 耗时:6.021285057067871

threading模块

Thread类:

  • run():用以表示线程活动的方法
  • start():启动线程活动
  • join([time]):设置主线程会等待time秒后再往下执行,time默认为子线程结束,多个子线程之间设置的值会增加
  • isAlive():返回线程是否活动的
  • getName():返回线程名
  • setName():设置线程名

定义自己的Thread类

重写run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def run(self):
"""Method representing the thread's activity.

You may override this method in a subclass. The standard run() method
invokes the callable object passed to the object's constructor as the
target argument, if any, with sequential and keyword arguments taken
from the args and kwargs arguments, respectively.

"""
try:
if self._target:
self._target(*self._args, **self._kwargs)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# -*- coding:utf-8 -*-
"""
@Describe: 0821_2
@Author: zhongxin
@Time: 2019-08-21 20:40
@Email: 490336534@qq.com
"""
import time
from threading import Thread
import requests


def use_time(func):
def wrapper(*args, **kwargs):
begin = time.time()
func(*args, **kwargs)
end = time.time()
print(f'耗时:{end - begin}')

return wrapper


# 定义线程类
class MyThread(Thread):

def run(self):
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"
}
for i in range(10):
requests.get(url='https://www.baidu.com', headers=headers)
print(f'第{i + 1}次发送请求')


@use_time
def main0():
t = MyThread()
t.start()
t.join()


@use_time
def main():
t_list = []
for i in range(10):
t = MyThread() # 创建线程对象
t.start() # 开启该线程
t_list.append(t)
# 遍历所有线程对象,设置主线程等待子线程执行完
for t in t_list:
t.join()


if __name__ == '__main__':
main0() # 耗时:1.2900490760803223
main() # 耗时:2.2023119926452637

传参

在run方法中:

self._target(*self._args, **self._kwargs)

从Thread的init方法中可以看到

self._args = args
self._kwargs = kwargs

所以传递方式为修改args和kwargs的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
"""This constructor should always be called with keyword arguments. Arguments are:

*group* should be None; reserved for future extension when a ThreadGroup
class is implemented.

*target* is the callable object to be invoked by the run()
method. Defaults to None, meaning nothing is called.

*name* is the thread name. By default, a unique name is constructed of
the form "Thread-N" where N is a small decimal number.

*args* is the argument tuple for the target invocation. Defaults to ().

*kwargs* is a dictionary of keyword arguments for the target
invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke
the base class constructor (Thread.__init__()) before doing anything
else to the thread.

"""
assert group is None, "group argument must be None for now"
if kwargs is None:
kwargs = {}
self._target = target
self._name = str(name or _newname())
self._args = args
self._kwargs = kwargs
if daemon is not None:
self._daemonic = daemon
else:
self._daemonic = current_thread().daemon
self._ident = None
self._tstate_lock = None
self._started = Event()
self._is_stopped = False
self._initialized = True
# sys.stderr is not stored in the class like
# sys.exc_info since it can be changed between instances
self._stderr = _sys.stderr
# For debugging and _after_fork()
_dangling.add(self)

普通方式传参

1
2
3
4
5
6
7
8
9
10
def work1(name):
for i in range(6):
time.sleep(1)
print(f'{name}执行浇花任务中...')

@use_time
def main():
t1 = Thread(target=work1, args=('zx',))
t1.start()
t1.join()

自定义类方式传参

重写init方法:

1
2
3
def __init__(self, url):
super().__init__()
self.url = url
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class MyThread(Thread):

def __init__(self, url):
super().__init__()
self.url = url

def run(self):
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"
}
for i in range(10):
requests.get(url=self.url, headers=headers)
print(f'第{i + 1}次发送请求')


@use_time
def main():
t_list = []
for i in range(10):
t = MyThread(url='https://www.baidu.com') # 创建线程对象
t.start() # 开启该线程
t_list.append(t)
# 遍历所有线程对象,设置主线程等待子线程执行完
for t in t_list:
t.join()


if __name__ == '__main__':
main() # 耗时:2.2023119926452637

全局变量修改

多线程可以共用全局变量:使用的是同一块内存

会出现资源竞争

python中的线程是没办法进行并行执行,只能并发

线程之间什么时候会进行切换

  • IO耗时操作:网络,文件,输入等耗时的IO操作,会自动切换
  • 线程的执行时间到达一定的阈值,会自动切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# -*- coding:utf-8 -*-
"""
@Describe: 0821_3
@Author: zhongxin
@Time: 2019-08-21 21:20
@Email: 490336534@qq.com
"""
from threading import Thread

num = 0


def work1():
global num
for i in range(1000000):
num += 1


def work2():
global num
for i in range(1000000):
num += 1


def main():
t1 = Thread(target=work1)
t2 = Thread(target=work2)
t1.start()
t2.start()
t1.join()
t2.join()
print(num)


if __name__ == '__main__':
main() # 1380941

解决方案:

  • 通过加锁来处理
  • 通过队列来处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from threading import Thread, Lock

# 创建一把锁
meta = Lock()
num = 0


def work1():
global num
for i in range(1000000):
meta.acquire() # 上锁
num += 1
meta.release() # 解锁


def work2():
global num
for i in range(1000000):
meta.acquire()
num += 1
meta.release()


def main():
t1 = Thread(target=work1)
t2 = Thread(target=work2)
t1.start()
t2.start()
t1.join()
t2.join()
print(num)


if __name__ == '__main__':
main()

GIL

  1. python语言和GIL没有关系,仅仅由于历史原因在于Cpython虚拟机(解释器),难以移出GIL
  2. GIL:全局解释器锁,每个线程在执行的过程中都需要获取GIL,保证同一时刻只有一个线程可以执行代码
  3. 线程释放GIL的情况,在IO操作等会引起阻塞的system call之前,可以暂时释放GIL,但在执行完毕后,必须重新获取GIL,python3+使用计时器(执行时间达到阈值后,当前线程释放GIL),python2+,tickets计数达到100
  4. python使用多进程可以利用多核CPU资源

死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from threading import Thread, Lock

metaA = Lock()
metaB = Lock()
num = 0


def work1():
global num
for i in range(100000):
metaA.acquire()
metaB.acquire()
num += 1
metaB.release()
metaA.release()


def work2():
global num
for i in range(100000):
metaB.acquire()
metaA.acquire()
num += 1
metaA.release()
metaB.release()


def main():
t1 = Thread(target=work1)
t2 = Thread(target=work2)
t1.start()
t2.start()
t1.join()
t2.join()
print(num)


if __name__ == '__main__':
main()
 wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
您的支持将鼓励我继续创作!