搞懂Python线程创建、同步的用法
使用线程可以把执行较慢、占据时间较长的任务放到后台处理。
主任务执行,可以通过异步线程可以监控处理的进度。
程多线程相当于把一个大任务拆分成多个子任务执行,完成任务的时间更短。
线程可以被抢占(中断)。
在其他线程正在运行时,线程可以暂时搁置(也称为睡眠) -- 这就是线程的退让。
内核线程:由操作系统内核创建和撤销。
用户线程:不需要内核支持而在用户程序中实现的线程。
_thread
threading(推荐使用)
_thread.start_new_thread ( function, args[, kwargs] )
function - 线程函数。
args - 传递给线程函数的参数,他必须是个tuple类型。
kwargs - 可选参数。
import _thread
import time
# 为线程定义一个函数
def print_time( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print ("%s: %s" % ( threadName, time.ctime(time.time()) ))
# 创建两个线程
try:
_thread.start_new_thread( print_time, ("Thread-1", 1, ) )
_thread.start_new_thread( print_time, ("Thread-2", 3, ) )
except:
print ("Error: 无法启动线程")
while 1:
pass
Thread-1: Sun Jul 19 17:39:39 2020
Thread-1: Sun Jul 19 17:39:40 2020
Thread-2: Sun Jul 19 17:39:41 2020
Thread-1: Sun Jul 19 17:39:41 2020
Thread-1: Sun Jul 19 17:39:42 2020
Thread-1: Sun Jul 19 17:39:43 2020
Thread-2: Sun Jul 19 17:39:44 2020
Thread-2: Sun Jul 19 17:39:47 2020
Thread-2: Sun Jul 19 17:39:50 2020
Thread-2: Sun Jul 19 17:39:53 2020
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
run(): 用以表示线程活动的方法。
start():启动线程活动。
join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
isAlive(): 返回线程是否活动的。
getName(): 返回线程名。
setName(): 设置线程名。
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("开始线程:" + self.name)
print_time(self.name, self.counter, 5)
print ("退出线程:" + self.name)
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threadName.exit()
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print ("退出主线程")
开始线程:Thread-1
开始线程:Thread-2
Thread-1: Sun Jul 19 17:46:04 2020
Thread-2: Sun Jul 19 17:46:05 2020
Thread-1: Sun Jul 19 17:46:05 2020
Thread-1: Sun Jul 19 17:46:06 2020
Thread-2: Sun Jul 19 17:46:07 2020
Thread-1: Sun Jul 19 17:46:07 2020
Thread-1: Sun Jul 19 17:46:08 2020
退出线程:Thread-1
Thread-2: Sun Jul 19 17:46:09 2020
Thread-2: Sun Jul 19 17:46:11 2020
Thread-2: Sun Jul 19 17:46:13 2020
退出线程:Thread-2
退出主线程
import threading
import time
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("开启线程: " + self.name)
# 获取锁,用于线程同步
threadLock.acquire()
print_time(self.name, self.counter, 3)
# 释放锁,开启下一个线程
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
threadLock = threading.Lock()
threads = []
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")
开启线程: Thread-1
开启线程: Thread-2
Thread-1: Sun Jul 19 17:52:03 2020
Thread-1: Sun Jul 19 17:52:04 2020
Thread-1: Sun Jul 19 17:52:05 2020
Thread-2: Sun Jul 19 17:52:07 2020
Thread-2: Sun Jul 19 17:52:09 2020
Thread-2: Sun Jul 19 17:52:11 2020
退出主线程
Queue.qsize() 返回队列的大小。
Queue.empty() 如果队列为空,返回True,反之False。
Queue.full() 如果队列满了,返回True,反之False。
Queue.full 与 maxsize 大小对应。
Queue.get([block[, timeout]]) 获取队列,timeout等待时间。
Queue.get_nowait() 相当Queue.get(False)。
Queue.put(item) 写入队列,timeout等待时间。
Queue.put_nowait(item) 相当Queue.put(item, False)。
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号。
Queue.join() 实际上意味着等到队列为空,再执行别的操作。
import threading
import time
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("开启线程: " + self.name)
# 获取锁,用于线程同步
threadLock.acquire()
print_time(self.name, self.counter, 3)
# 释放锁,开启下一个线程
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
threadLock = threading.Lock()
threads = []
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")
开启线程:Thread-1
开启线程:Thread-2
开启线程:Thread-3
Thread-3 processing One
Thread-2 processing Two
Thread-1 processing Three
Thread-3 processing Four
Thread-1 processing Five
退出线程:Thread-3
退出线程:Thread-2
退出线程:Thread-1
退出主线程
from time import time
def main():
total = 0
number_list = [x for x in range(1, 100000001)]
start = time()
for number in number_list:
total += number
print(total)
end = time()
print('Execution time: %.3fs' % (end - start))
if __name__ == '__main__':
main()
5000000050000000
Execution time: 12.869s
from multiprocessing import Process, Queue
from random import randint
from time import time
def task_handler(curr_list, result_queue):
total = 0
for number in curr_list:
total += number
result_queue.put(total)
def main():
processes = []
number_list = [x for x in range(1, 100000001)]
result_queue = Queue()
index = 0
# 启动8个进程将数据切片后进行运算
for _ in range(8):
p = Process(target=task_handler,
args=(number_list[index:index + 12500000], result_queue))
index += 12500000
processes.append(p)
p.start()
# 开始记录所有进程执行完成花费的时间
start = time()
for p in processes:
p.join()
# 合并执行结果
total = 0
while not result_queue.empty():
total += result_queue.get()
print(total)
end = time()
print('Execution time: ', (end - start), 's', sep='')
if __name__ == '__main__':
main()
5000000050000000
Execution time: 1.1408379077911377s