盒子
盒子
文章目录
  1. 多进程
    1. 进程池阻塞与非阻塞
      1. 异步进程池(非阻塞)
      2. 同步进程池(阻塞)
      3. REF
    2. 多进程pool
    3. 多进程pool queue
    4. 多进程pool.map
  2. 多线程
    1. 多线程queue
    2. 多线程下载

python 多线程与多进程

多进程

进程池阻塞与非阻塞

异步进程池(非阻塞)

from multiprocessing import Pool
def test(i):
print (i)
if __name__=="__main__":
pool = Pool(processes=10)
for i in xrange(500):
'''
For循环中执行步骤:
(1)循环遍历,将500个子进程添加到进程池(相对父进程会阻塞)
(2)每次执行10个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)

apply_async为异步进程池写法。
异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而For循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的。
'''
pool.apply_async(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
print ('test')
pool.close()
pool.join()

执行顺序:For循环内执行了2个步骤,第一步:将500个对象放入进程池(阻塞)。第二步:同时执行10个子进程(非阻塞),有结束的就立即添加,维持10个子进程运行。(apply_async方法的会在执行完for循环的添加步骤后,直接执行后面的print语句,而apply方法会等所有进程池中的子进程运行完以后再执行后面的print语句)

注意:调用join之前,先调用close或者terminate方法,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束。

同步进程池(阻塞)

from multiprocessing import Pool
def test(p):
print (p)
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=10)
for i in xrange(500):
'''
实际测试发现,for循环内部执行步骤:
(1)遍历500个可迭代对象,往进程池放一个子进程
(2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)
for循环执行完毕,再执行print函数。
'''
pool.apply(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
print ('test')
pool.close()
pool.join()

说明:for循环内执行的步骤顺序,往进程池中添加一个子进程,执行子进程,等待执行完毕再添加一个子进程…..等500个子进程都执行完了,再执行print (‘test’)。(从结果来看,并没有多进程并发)

REF

https://thief.one/2016/11/24/Multiprocessing-Pool/

多进程pool

多进程.进程池阻塞与非阻塞

多进程pool queue

import os
from multiprocessing import Manager, Pool
import sys

def get_hash_queue(i, n, image_path, q_put):
# print the doing process
sys.stdout.write("\rLoading process: {}/{}".format(str(n-i), str(n)))
sys.stdout.flush()
'''Dealwith image_path here.'''
q_put.put_nowait([image_path])
return

if __name__ == "__main__":
# init pool and queue
q_get = Manager().Queue()
q_put = Manager().Queue()
pool = Pool(processes = 4)

# put pic path into get queue: q_get
pics = os.listdir(folder)
for pic in pics:
path = os.path.join(folder, pic)
q_get.put(path)

# use multi process to get hash result and put the result into q_put
n = q_get.qsize()
while True:
try:
image_path = q_get.get_nowait()
i = q_get.qsize()
except:
break
else:
pool.apply_async(get_hash_queue, (i, n, image_path, q_put, ))
pool.close()
pool.join()
'''Dealwith q_put here.'''

多进程pool.map

import os
import cv2
import shutil
from multiprocessing import Pool
import argparse

pic_suffix = set()
pic_suffix.update(['.BMP', '.DIB', '.JPEG', '.JPG', '.JPE', '.PNG', '.PBM', '.PGM', '.PPM', '.SR', '.RAS', '.TIFF', '.TIF', '.EXR', '.JP2'])

def image_judge(image_path):
try:
new_path = os.path.join('./trash', os.path.split(image_path)[-1])
# not image
if os.path.splitext(image_path)[-1].upper() not in pic_suffix:
print ('Not image %s' % (new_path))
shutil.move(image_path, './trash')
return

# image is none
image = cv2.imread(image_path)
if image is None:
print ('Image none: %s' % (new_path))
shutil.move(image_path, './trash')
return
except:
# catch except
print ('Image error: %s' % (new_path))
shutil.move(image_path, './trash')
return

def entry_function(folder):
os.makedirs('./trash', exist_ok=True)
pic_list = []
pics = os.listdir(folder)
for pic in pics:
pic_list.append(os.path.join(folder, pic))
pool = Pool()
pool.map(image_judge, pic_list)
pool.close()
pool.join()

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--folder', type = str, required = True, help = 'path to folder which contains the images')
args = parser.parse_args()

entry_function(args.folder)

多线程

多线程queue

import os
import threading
import queue

def fetch_img_func(q):
while True:
try:
# unblock read from queue
url = q.get_nowait() # https://example.com/example.jpg
except:
break
else:
try:
'''do something with url here'''

if __name__ == "__main__":
q = queue.Queue()

'''
Put something into queue here.
eg:
url = https://example.com/example.jpg
q.put(url)
'''

threads = []
for i in range(args.thread):
thread = threading.Thread(target=fetch_img_func, args=(q, ))
threads.append(thread)
thread.start()
for i in threads:
i.join()

多线程下载

https://github.com/Holmeyoung/image-process/blob/master/image_download.py

支持一下
万一真的就有人扫了呢
  • 微信扫一扫
  • 支付宝扫一扫