并发编程

  首先感谢况老师的指导和潇哥的指点。以下内容来自初学的我,谨防入坑!

  促使我对并发/并行操作的关注是我对python同步多线程和异步多线程的理解空白,导致我2个月前的一次失败的同步多线程操作直到昨天改keras的imge处理底层代码时发现还有异步多线程实现的并发操作。在此记录下并发操作相关概念和理解。

  理解python的并发操作需要掌握python的全局解释器锁(GIL)以及多进程、多线程操作和协程操作。相关概念的区别如阻塞和非阻塞、并发和并行、同步和异步、计算操作密集和I/O密集的概念和区分。本文重点区分上述概念。

python为什么比C++/Java慢

  首先C++和Java是编译型语言,而Python则是一种解释型语言。编译型语言在程序执行前需要一个专门编译额过程,将代码编译成机器语言;而python是解释型语言不需要编译,在程序执行时将代码一步步翻译成机器语言。python的变量类型是动态的,解释器会根据程序将变量和所有的变量类型存放在内存中;而静态类型直接将变量与其类型绑定。形象的理解是python解释器制定规则,python执行代码时去匹配这个规则,然后再执行。

全局解释器锁(GIL)

  为了利用多核系统,Python必须支持多线程运行。但作为解释型语言,Python的解释器需要做到既安全又高效。解释器要注意避免在不同的线程操作内部共享的数据,同时还要保证在管理用户线程时保证总是有最大化的计算资源。为了保证不同线程同时访问数据时的安全性,Python使用了全局解释器锁(GIL)的机制。从名字上我们很容易明白,它是一个加在解释器上的全局(从解释器的角度看)锁(从互斥或者类似角度看)。这种方式当然很安全,但它也意味着:对于任何Python程序,不管有多少的处理器,任何时候都总是只有一个线程在执行。即:只有获得了全局解释器锁的线程才能操作Python对象或者调用Python/C API函数。

Python的GIL

  • CPython的线程是操作系统的原生线程。在Linux上为pthread,在Windows上为Win thread,完全由操作系统调度线程的执行。一个Python解释器进程内有一个主线程,以及多个用户程序的执行线程。即便使用多核心CPU平台,由于GIL的存在,也将禁止多线程的并行执行。
  • Python解释器进程内的多线程是以协作多任务方式执行。当一个线程遇到I/O任务时,将释放GIL。计算密集型(CPU-bound)的线程在执行大约100次解释器的计步(ticks)时,将释放GIL。计步(ticks)可粗略看作Python虚拟机的指令。计步实际上与时间片长度无关。可以通过sys.setcheckinterval()设置计步长度。
  • 在单核CPU上,数百次的间隔检查才会导致一次线程切换。在多核CPU上,存在严重的线程抖动(thrashing)。
  • Python 3.2开始使用新的GIL。新的GIL实现中用一个固定的超时时间来指示当前的线程放弃全局锁。在当前线程保持这个锁,且其他线程请求这个锁时,当前线程就会在5毫秒后被强制释放该锁。
  • 可以创建独立的进程来实现并行化。Python 2.6引进了多进程包multiprocessing。或者将关键组件用C/C++编写为Python扩展,通过ctypes使Python程序直接调用C语言编译动态链接库的导出函数。【来自维基百科】

小结

  1. 由于GIL的存在,一个python解释器进程执行多线程时实际上也只是在保护一个主线程的运行。
  2. I/O操作主要是读写任务,CPU操作主要是计算任务,当python执行I/O密集型任务,将释放GIL开多线程仍然可以通过让其他线程加快程序运行效率。如果是cpu密集型任务,线程切换导致cache missing造成不必要的开销切换,反而影响程序效率。在keras图像预处理接口keras.preprocessing.image中实现多线程读取文件夹名和文件名操作然后分批,在读取分批数据以达到减小内存的作用的

进程

  进程之间不共享任何状态,进程的调度由操作系统完成,每个进程都有自己独立的内存空间,进程间通讯主要是通过信号传递的方式来实现的,实现方式有多种,信号量、管道、事件等,任何一种方式的通讯效率都需要过内核,导致通讯效率比较低。由于是独立的内存空间,上下文切换的时候需要保存先调用栈的信息、cpu各寄存器的信息、虚拟内存、以及打开的相关句柄等信息,所以导致上下文进程间切换开销很大,通讯麻烦。

  简单来讲,每一个应用程序都有一个自己的进程。 操作系统会为这些进程分配一些执行资源,例如内存空间等。 在进程中,又可以创建一些线程,他们共享这些内存空间,并由操作系统调用, 以便并行计算。

创建进程

  Python2.6以后提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。multiprocessing给每个进程赋予单独的Python解释器,这样就规避了全局解释锁所带来的问题。借助这个包,可以轻松完成从单进程到并发执行的转换。

  multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。具体的细节以后再说吧,实践很少就是全记下来了也记不久留个坑;以后有需求了再补充:进程通信,锁,列队,进程通信管道等

用multiprocessing开启进程,其代码块必须放在if __name__ == '__main__':

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import time
import random
from multiprocessing import Process

def start_process(name):
print('%s cur ele' %name)
time.sleep(3)
print('%s cur ele end' %name)
# 进程开启必须放在main()下,multiprocessing为每个进程开一个解释器
if __name__ == '__main__':
for i in ('ele1', 'ele2', 'ele3'):
p = Process(target=start_process, args=(i,))
p.start()
print('主进程')
"""OUTPUT
主进程
ele1 cur ele
ele2 cur ele
ele3 cur ele
ele1 cur ele end
ele2 cur ele end
ele3 cur ele end
"""
#没有time.sleep
import time
import random
from multiprocessing import Process

def start_process(name):
print('%s cur ele' %name)
# time.sleep(3)
print('%s cur ele end' %name)

if __name__ == '__main__': # 进程开启必须放在main()下
for i in ('ele1', 'ele2', 'ele3'):
p = Process(target=start_process, args=(i,))
p.start()
print('主进程')
"""
主进程
ele1 cur ele
ele1 cur ele end
ele2 cur ele
ele2 cur ele end
ele3 cur ele
ele3 cur ele end
"""
class start_process(Process):
def __init__(self,name):
super().__init__()
self.name = name

def run(self):
print('%s cur ele' %self.name)
time.sleep(3)
print('%s cur ele' %self.name)

if __name__ == '__main__': # 进程开启必须放在main()下
print("CPU个数:",cpu_count())
for i in ('ele1', 'ele2', 'ele3','ele4','ele5'):
p = start_process(i,)
p.start()
p.join()#加入进程同步
print('主进程')
"""
CPU个数: 4
ele1 cur ele
ele1 cur ele
ele2 cur ele
ele2 cur ele
ele3 cur ele
ele3 cur ele
ele4 cur ele
ele4 cur ele
ele5 cur ele
ele5 cur ele
主进程
"""

进程池

  进程池就是预先创建进程,需要的时候就从进程池拿,进程的创建和销毁统一由进程池管理。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。python内置的multiprocessing.pool实现进程池管理。在python官方文档里显示该功能在交互式解释器中并不能完好运行,而且实践发现spyder不打印子进程输出 。

线程

  操作系统为进程分配执行的资源比如内存空间,而线程就在进程下面共享进程的资源。多线程可以处理多进程访问同一资源麻烦的问题。

创建线程

  Python提供两个模块进行多线程的操作,分别是threadthreading,前者是比较低级的模块,用于更底层的操作,一般应用级别的开发不常用。

第一种方法是创建threading.Thread的子类,重写run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import threading
import time
class MyThread(threading.Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
for i in range(5):
print ('thread {}, @number: {}'.format(self.name, i))
time.sleep(2)

def main():
print ("Start main threading")
# 创建三个线程
for i in range(3):
threads = MyThread("thread-%i"%i)
# 启动三个线程
threads.start()
threads.join()
print ("End Main threading")

if __name__ == '__main__':
print("=======创建同步三个线程=======")
main()
print("=======创建异步多线程=======")
threads = [MyThread("thread-%i"%i) for i in range(3)]
for t in threads:
t.start()
# 一次让新创建的线程执行 join
for t in threads:
t.join()
print ("End Async Main threading")
"""
受GIL控制同步时由一个线程控制,多进程在等待上个进程时受系统调度已经开始执行了
异步多线程在等待时执行了下一个线程
=======创建同步三个线程=======
Start main threading
thread thread-2, @number: 0
thread thread-2, @number: 1
thread thread-2, @number: 2
thread thread-2, @number: 3
thread thread-2, @number: 4
End Main threading
=======创建异步多线程=======
thread thread-0, @number: 0
thread thread-1, @number: 0
thread thread-2, @number: 0
thread thread-0, @number: 1
thread thread-1, @number: 1
thread thread-2, @number: 1
thread thread-0, @number: 2
thread thread-1, @number: 2
thread thread-2, @number: 2
thread thread-0, @number: 3
thread thread-1, @number: 3
thread thread-2, @number: 3
thread thread-0, @number: 4
thread thread-2, @number: 4
thread thread-1, @number: 4
End Async Main threading
"""

线程池

  线程池的出发点和进程池类似,线程为了控制和管理线程即创建和销毁。具体的将线程放进一个池子,一方面我们可以控制同时工作的线程数量,一方面也避免了创建和销毁产生的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import time
import threading
from random import random
import queue

def double(n):
return n * 2
class Worker(threading.Thread):
def __init__(self, queue):
super(Worker, self).__init__()
self._q = queue
self.daemon = True
self.start()

def run(self):
while 1:
f, args, kwargs = self._q.get()
try:
print('USE:{}'.format(self.name))
print(f(*args, **kwargs))
except Exception as e:
print(e)
self._q.task_done()


class ThreadPool(object):
def __init__(self, max_num=5):
self._q = queue.Queue(max_num)
for _ in range(max_num):
Worker(self._q) # create worker thread

def add_task(self, f, *args, **kwargs):
self._q.put((f, args, kwargs))

def wait_compelete(self):
self._q.join()

pool = ThreadPool()
for _ in range(8):
wt = random()
pool.add_task(double, wt)
time.sleep(wt)

pool.wait_compelete()

多进程和多线程的计算开销

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
from multiprocessing import Process , Queue , cpu_count
import time
from threading import Thread

# 定义全局变量Queue
g_queue = Queue()
# 定义一个队列,并定义初始化队列的函数
def init_queue():
print("init g_queue start")
while not g_queue.empty():
g_queue.get()
for _index in range(10):
g_queue.put(_index)
print("init g_queue end")
return

# 定义IO密集型任务和计算密集型任务,分别从队列中获取任务数据
# 定义一个IO密集型任务:利用time.sleep()
def task_io(task_id):
print("IOTask[%s] start" % task_id)
while not g_queue.empty():
time.sleep(1)
try:
data = g_queue.get(block=True, timeout=1)
print("IOTask[%s] get data: %s" % (task_id, data))
except Exception as excep:
print("IOTask[%s] error: %s" % (task_id, str(excep)))
print("IOTask[%s] end" % task_id)
return

g_search_list = list(range(10000))
# 定义一个计算密集型任务:利用一些复杂加减乘除、列表查找等
def task_cpu(task_id):
print("CPUTask[%s] start" % task_id)
while not g_queue.empty():
count = 0
for i in range(10000):
count += pow(3*2, 3*2) if i in g_search_list else 0
try:
data = g_queue.get(block=True, timeout=1)
print("CPUTask[%s] get data: %s" % (task_id, data))
except Exception as excep:
print("CPUTask[%s] error: %s" % (task_id, str(excep)))
print("CPUTask[%s] end" % task_id)
return task_id

if __name__ == '__main__':
print("cpu count:", cpu_count(), "\n")

print("========== 直接执行IO密集型任务 ==========")
init_queue()
time_0 = time.time()
task_io(0)
print("结束:", time.time() - time_0, "\n")

print("========== 多线程执行IO密集型任务 ==========")
init_queue()
time_0 = time.time()
thread_list = [Thread(target=task_io, args=(i,)) for i in range(5)]
for t in thread_list:
t.start()
for t in thread_list:
if t.is_alive():
t.join()
print("结束:", time.time() - time_0, "\n")

print("========== 多进程执行IO密集型任务 ==========")
init_queue()
time_0 = time.time()
process_list = [Process(target=task_io, args=(i,)) for i in range(cpu_count())]
for p in process_list:
p.start()
for p in process_list:
if p.is_alive():
p.join()
print("结束:", time.time() - time_0, "\n")

print("========== 直接执行CPU密集型任务 ==========")
init_queue()
time_0 = time.time()
task_cpu(0)
print("结束:", time.time() - time_0, "\n")

print("========== 多线程执行CPU密集型任务 ==========")
init_queue()
time_0 = time.time()
thread_list = [Thread(target=task_cpu, args=(i,)) for i in range(5)]
for t in thread_list:
t.start()
for t in thread_list:
if t.is_alive():
t.join()
print("结束:", time.time() - time_0, "\n")

print("========== 多进程执行cpu密集型任务 ==========")
init_queue()
time_0 = time.time()
process_list = [Process(target=task_cpu, args=(i,)) for i in range(cpu_count())]
for p in process_list:
p.start()
for p in process_list:
if p.is_alive():
p.join()
print("结束:", time.time() - time_0, "\n")
"""
cpu count: 4

========== 直接执行IO密集型任务 ==========
init g_queue start
init g_queue end
IOTask[0] start
IOTask[0] get data: 0
IOTask[0] get data: 1
IOTask[0] get data: 2
IOTask[0] get data: 3
IOTask[0] get data: 4
IOTask[0] get data: 5
IOTask[0] get data: 6
IOTask[0] get data: 7
IOTask[0] get data: 8
IOTask[0] get data: 9
IOTask[0] end
结束: 10.0093834400177

========== 多线程执行IO密集型任务 ==========
init g_queue start
init g_queue end
IOTask[0] start
IOTask[1] start
IOTask[2] start
IOTask[3] start
IOTask[4] start
IOTask[0] get data: 0
IOTask[1] get data: 1
IOTask[2] get data: 2
IOTask[3] get data: 3
IOTask[4] get data: 4
IOTask[0] get data: 5
IOTask[1] get data: 6
IOTask[3] get data: 7
IOTask[2] get data: 8
IOTask[4] get data: 9
IOTask[2] end
IOTask[4] end
IOTask[0] error:
IOTask[0] end
IOTask[1] error:
IOTask[1] end
IOTask[3] error:
IOTask[3] end
结束: 4.019008159637451

========== 多进程执行IO密集型任务 ==========
init g_queue start
init g_queue end
IOTask[3] start
IOTask[3] end
IOTask[0] start
IOTask[0] end
IOTask[1] start
IOTask[1] end
IOTask[2] start
IOTask[2] end
结束: 0.2742750644683838

========== 直接执行CPU密集型任务 ==========
init g_queue start
init g_queue end
CPUTask[0] start
CPUTask[0] get data: 0
CPUTask[0] get data: 1
CPUTask[0] get data: 2
CPUTask[0] get data: 3
CPUTask[0] get data: 4
CPUTask[0] get data: 5
CPUTask[0] get data: 6
CPUTask[0] get data: 7
CPUTask[0] get data: 8
CPUTask[0] get data: 9
CPUTask[0] end
结束: 8.974508285522461

========== 多线程执行CPU密集型任务 ==========
init g_queue start
init g_queue end
CPUTask[0] start
CPUTask[1] start
CPUTask[2] start
CPUTask[3] start
CPUTask[4] start
CPUTask[1] get data: 0
CPUTask[3] get data: 1
CPUTask[0] get data: 2
CPUTask[2] get data: 3
CPUTask[4] get data: 4
CPUTask[1] get data: 5
CPUTask[3] get data: 6
CPUTask[2] get data: 7
CPUTask[0] get data: 8
CPUTask[4] get data: 9
CPUTask[4] end
CPUTask[1] error:
CPUTask[1] end
CPUTask[2] error:
CPUTask[2] end
CPUTask[3] error:
CPUTask[3] end
CPUTask[0] error:
CPUTask[0] end
结束: 13.33148455619812

========== 多进程执行cpu密集型任务 ==========
init g_queue start
init g_queue end
CPUTask[1] start
CPUTask[1] end
CPUTask[2] start
CPUTask[2] end
CPUTask[3] start
CPUTask[3] end
CPUTask[0] start
CPUTask[0] end
结束: 0.2652151584625244
"""

协程

  协程,又称微线程,纤程。英文名Coroutine。协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。协程描述部分来自博客园

  子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。

  协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

  线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员

  协程的优点:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

  缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

  Python中的协程和生成器很相似但又稍有不同。主要区别在于:

  • 生成器是数据的生产者
  • 协程则是数据的消费者

  在前面的博客中讲过生成器用yeild来实现动态的返回程序的结果,即实现了一个协程。协程会消费掉(拿去用)发送给它的值。

生产者消费者模式

  生产者负责生产数据,存放在缓存区,而消费者负责从缓存区获得数据并处理数据。生产者消费者模式即将一件事分成流水线模型,生产者产生的输出交付给消费者处理,可以实现同一时刻,前后同时运行。

生产者消费者模式
生产者消费者模式

生产者消费者模式的优点:

  • 解耦 假设生产者和消费者分别是两个线程。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。如果未来消费者的代码发生变化,可能会影响到生产者的代码。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

  • 并发 由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区通信的,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。

  • 支持忙闲不均 当生产者制造数据快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,慢慢处理掉。而不至于因为消费者的性能造成数据丢失或影响生产者生产。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from queue import Queue
import random,threading,time

#生产者类
class Producer(threading.Thread):
def __init__(self, name,queue):
threading.Thread.__init__(self, name=name)
self.data=queue

def run(self):
for i in range(5):
print("%s is producing %d to the queue!" % (self.getName(), i))
self.data.put(i)#将数据放入队列
time.sleep(random.randrange(10)/5)
print("%s finished!" % self.getName())

#消费者类
class Consumer(threading.Thread):
def __init__(self,name,queue):
threading.Thread.__init__(self,name=name)
self.data=queue
def run(self):
for i in range(5):
val = self.data.get()#从队列中获取数据执行下面的操作
print("%s is consuming. %d in the queue is consumed!" % (self.getName(),val))
time.sleep(random.randrange(10))
print("%s finished!" % self.getName())

def main():
queue = Queue()
producer = Producer('Producer',queue)
consumer = Consumer('Consumer',queue)

producer.start()
consumer.start()

producer.join()
consumer.join()
print ('All threads finished!')

if __name__ == '__main__':
main()
"""
Producer is producing 0 to the queue!
Consumer is consuming. 0 in the queue is consumed!
Producer is producing 1 to the queue!
Producer is producing 2 to the queue!
Consumer is consuming. 1 in the queue is consumed!
Producer is producing 3 to the queue!
Producer is producing 4 to the queue!
Producer finished!
Consumer is consuming. 2 in the queue is consumed!
Consumer is consuming. 3 in the queue is consumed!
Consumer is consuming. 4 in the queue is consumed!
Consumer finished!All threads finished!
"""

阻塞与非阻塞

  阻塞是指调用线程或者进程被操作系统挂起。 非阻塞是指调用线程或者进程不会被操作系统挂起。

同步与异步

同步是指代码调用IO操作时,必须等待IO操作完成才返回的调用方式。 异步是指代码调用IO操作时,不必等IO操作完成就返回的调用方式。

同步是最原始的调用方式。 异步则需要多线程,多CPU或者非阻塞IO的支持。

  在multiprocessing中通常用apply/map和apply_async/map_async函数完成同步异步(阻塞和非阻塞)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""
从keras image剥离的文件夹处理的源码,用阻塞和非阻塞方式实现并发操作
"""
from __future__ import absolute_import
from __future__ import print_function
import numpy as np
import re
from scipy import linalg
import scipy.ndimage as ndi
from six.moves import range
import os
import threading
import warnings
import multiprocessing.pool
from functools import partial
import time

try:
from PIL import Image as pil_image
except ImportError:
pil_image = None

def _count_valid_files_in_directory(directory, white_list_formats, follow_links):
def _recursive_list(subpath):
return sorted(os.walk(subpath, followlinks=follow_links), key=lambda tpl: tpl[0])

samples = 0
for root, _, files in _recursive_list(directory):
for fname in files:
is_valid = False
for extension in white_list_formats:
if fname.lower().endswith('.' + extension):
is_valid = True
break
if is_valid:
samples += 1
return samples


def _list_valid_filenames_in_directory(directory, white_list_formats,
class_indices, follow_links):
def _recursive_list(subpath):
return sorted(os.walk(subpath, followlinks=follow_links), key=lambda tpl: tpl[0])

classes = []
filenames = []
subdir = os.path.basename(directory)
basedir = os.path.dirname(directory)
for root, _, files in _recursive_list(directory):
for fname in files:
is_valid = False
for extension in white_list_formats:
if fname.lower().endswith('.' + extension):
is_valid = True
break
if is_valid:
classes.append(class_indices[subdir])
# add filename relative to directory
absolute_path = os.path.join(root, fname)
filenames.append(os.path.relpath(absolute_path, basedir))
return classes, filenames
def main(pool,apply):
for dirpath in (os.path.join(directory, subdir) for subdir in classes):
results.append(apply(_list_valid_filenames_in_directory,
(dirpath, white_list_formats,class_indices, False)))
pool.close()
pool.join()


if __name__ =='__main__':
results = []
directory = 'E:\\hxd\\零样本学习\\data\\Atrain\\train\\'
white_list_formats = {'png', 'jpg', 'jpeg', 'bmp', 'ppm'}
classes = []
for subdir in sorted(os.listdir(directory)):
if os.path.isdir(os.path.join(directory, subdir)):
classes.append(subdir)
class_indices = dict(zip(classes, range(len(classes))))
print('==========IO密集型非阻塞多线程==========')
start=time.time()
pool = multiprocessing.pool.ThreadPool()
main(pool,pool.apply_async)
pool.close()
pool.join()
print('ThreadPool Time:%f'%(time.time()-start))
print('==========IO密集型非阻塞多进程==========')
results = []
start=time.time()
pool = multiprocessing.Pool()
main(pool,pool.apply_async)
pool.close()
pool.join()
print('ProcessPool Time:%f'%(time.time()-start))
print('==========IO密集型阻塞多线程==========')
start=time.time()
pool = multiprocessing.pool.ThreadPool()
main(pool,pool.apply)
pool.close()
pool.join()
print('ThreadPool Time:%f'%(time.time()-start))
print('==========IO密集型阻塞多进程==========')
results = []
start=time.time()
pool = multiprocessing.Pool()
main(pool,pool.apply)
pool.close()
pool.join()
print('ProcessPool Time:%f'%(time.time()-start))
"""
==========IO密集型非阻塞多线程==========
ThreadPool Time:1.554570
==========IO密集型非阻塞多进程==========
ProcessPool Time:1.008292
==========IO密集型阻塞多线程==========
ThreadPool Time:1.458078
==========IO密集型阻塞多进程==========
ProcessPool Time:2.155637
"""

总结

  在实际应用中合理的应用并发多进程和多线程可以提高程序运行效率,针对不同任务类型如IO密集型任务的网络爬虫,CPU密集型的XGBoost的c++实现上均使用了多线程。本文从概念和简单实践出发分别阐述了多进程,多线程,协程等具体操作,以及生产者与消费者,同步与异步和阻塞与非阻塞等相关概念。期望能在将来遇到实际问题时能够有法可循。

Reference

  1. GIL-维基百科
  2. https://www.cnblogs.com/vipchenwei/p/7809967.html

  3. https://blog.csdn.net/SecondLieutenant/article/details/79396984

  4. https://eastlakeside.gitbooks.io/interpy-zh/content/Coroutines/
  5. https://segmentfault.com/a/1190000008909344