温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Python学习—python中的线程

发布时间:2020-06-13 16:07:30 来源:网络 阅读:616 作者:长安223 栏目:编程语言

1.线程定义

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个进程至少有一个线程,一个进程必定有一个主线程。

2.创建线程

创建线程的两个模块:
(1)thread(在python3中改名为_thread)
(2)threding
_thread提供了低级别的、原始的线程以及一个简单的锁。threading基于Java的线程模型设计。thread和threading模块都可以用来创建和管理线程,而thread模块提供了基本的线程和锁支持。threading提供的是更高级的完全的线程管理。低级别的thread模块是推荐给高手用,一般应用程序推荐使用更高级的threading模块:
1.它更先进,有完善的线程管理支持,此外,在thread模块的一些属性会和threading模块的这些属性冲突。
2.thread模块有很少的(实际上是一个)同步原语,而threading却有很多。
3.thread模块没有很好的控制,特别当你的进程退出时,比如:当主线程执行完退出时,其他的线程都会无警告,无保存的死亡,而threading会允许默认,重要的子线程完成后再退出,它可以特别指定daemon类型的线程。

1.通过模块直接创建线程

_thread模块创建进程

import _thread

def job(name):
    print("%s正在做工作........" %name)
    print("%s工作完成..........." %name)

if __name__ == "__main__":
    try:
        #_thread模块 创建2个线程,再加上主线程,这个程序运行就一共有三个线程
        _thread.start_new_thread(job,('ddd',))
        _thread.start_new_thread(job,('eee',))
    except Exception as e:
        print("创建线程失败。",e)
    else:
        print("创建线程成功。")
    print('主线程结束')

每次运行程序可以看到不同的结果:

(1)
创建线程成功。eee正在做工作........
eee工作完成...........
ddd正在做工作........
ddd工作完成...........

(2)
创建线程成功。ddd正在做工作........eee正在做工作........
eee工作完成...........
ddd工作完成...........

(3)
创建线程成功。
ddd正在做工作........

(4)
创建线程成功。eee正在做工作........

这些结果不同,是因为线程并发执行,三个线程来回切换在cpu工作,且当主线程结束后,不管其它线程是否完成工作都被迫结束。
通过threading模块创建线程

def job(name):
    print("%s正在做第一部分工作........" %name)
    print("%s正在做第二部分工作........" %name)
    print("%s正在做第三部分工作........" %name)
    print("%s工作完成..........." %name)

if __name__ == "__main__":
    try:
        #threading模块 创建新的线程 返回一个线程对象
        #target 为线程需要做的任务,args为任务传递所需要参数(参数用元组组织起来),name为创建的线程命名(可以不取名)
        t1 = threading.Thread(target=job,args=('aaa',),name='job1_name')
        # start方法使线程开始执行
        t1.start()
        t2 = threading.Thread(target=job,args=('bbb',),name='job2_name')
        t2.start()
    except Exception as e:
        print("创建线程失败\n",e)
    print('主线程结束.....')

每次运行程序的结果:

(1)
aaa正在做第一部分工作........
aaa正在做第二部分工作........
bbb正在做第一部分工作........
主线程结束.....
aaa正在做第三部分工作........
aaa工作完成...........
bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........

(2)
aaa正在做第一部分工作........
bbb正在做第一部分工作........
bbb正在做第二部分工作........
bbb正在做第三部分工作........主线程结束.....
bbb工作完成...........
aaa正在做第二部分工作........
aaa正在做第三部分工作........
aaa工作完成...........

(3)
aaa正在做第一部分工作........
aaa正在做第二部分工作........
aaa正在做第三部分工作........bbb正在做第一部分工作........

aaa工作完成...........bbb正在做第二部分工作........
主线程结束.....
bbb正在做第三部分工作........
bbb工作完成...........

可以看到,不同的多个线程是相互交叉着在cpu执行的,和_thread不同的是它创建了一个线程类对象,也不会因为主线程的结束而结束所有的线程。

使用join方法
在A线程中调用了B线程的join法时,表示只有当B线程执行完毕时,A线程才能继续执行。多个线程使用了join方法,剩下的其它线程只有在这些线程执行完后才能继续执行。
这里调用的join方法是没有传参的,join方法其实也可以传递一个参数给它的。
join方法中如果传入参数,则表示这样的意思:如果A线程中掉用B线程的join(10),则表示A线程会等待B线程执行10毫秒,10毫秒过后,A、B线程并行执行。
需要注意的是,jdk规定,join(0)的意思不是A线程等待B线程0秒,而是A线程等待B线程无限时间,直到B线程执行完毕,即join(0)等价于join()。

def job(name):
    print("%s正在做第一部分工作........" %name)
    print("%s正在做第二部分工作........" %name)
    print("%s正在做第三部分工作........" %name)
    print("%s工作完成..........." %name)

if __name__ == "__main__":
    try:
        #threading模块 创建新的线程 返回一个线程对象
        #target 为线程需要做的任务,args为任务传递所需要参数(参数用元组组织起来),name为创建的线程命名(可以不取名)
        t1 = threading.Thread(target=job,args=('aaa',),name='job1_name')
        # start方法使线程开始执行
        t1.start()
        t2 = threading.Thread(target=job,args=('bbb',),name='job2_name')
        t2.start()
        t1.join()
        t2.join()
    except Exception as e:
        print("创建线程失败\n",e)
    print('主线程结束.....')

每次运行程序的结果:

(1)
aaa正在做第一部分工作........
aaa正在做第二部分工作........bbb正在做第一部分工作........
aaa正在做第三部分工作........
aaa工作完成...........

bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........
主线程结束.....

(2)
aaa正在做第一部分工作........
bbb正在做第一部分工作........
bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........
aaa正在做第二部分工作........
aaa正在做第三部分工作........
aaa工作完成...........
主线程结束.....

(3)
aaa正在做第一部分工作........bbb正在做第一部分工作........

bbb正在做第二部分工作........
bbb正在做第三部分工作........aaa正在做第二部分工作........
aaa正在做第三部分工作........

bbb工作完成...........
aaa工作完成...........
主线程结束.....
2.通过继承Thread类创建线程

当通过继承Thread类来创建线程时,需要传入参数,可以在构造方法增加相应的属性,以此来传入所需要的参数。
Thread类有一个run方法,当创建一个线程后,使用start方法时,实际上就是在调用类里面的run方法,因此可以在继承Thread类的时候,重写run方法来完成自己的任务。

import threading

class Jobthread(threading.Thread):
    def __init__(self,name):
        super(Jobthread, self).__init__()
        self.name = name
    #重写Thread类的run方法
    def run(self):
        print('%s线程待完成第一部分工作' %self.name)
        print("%s正在做第二部分工作........" %self.name)
        print("%s正在做第二部分工作........" %self.name)

if __name__ == "__main__":
    #实例化类创建第一个线程对象
    t1 = Jobthread('aaa')
    t1.start()
    #实例化类创建第二个线程对象
    t2 = Jobthread('bbb')
    t2.start()
    t1.join()
    t2.join()
    print('主线程结束.....')

每次运行程序的结果:

(1)
wwww正在做第一部分工作
wwww正在做第二部分工作........
eeee正在做第一部分工作
eeee正在做第二部分工作........
eeee正在做第二部分工作........
wwww正在做第二部分工作........
主线程结束.....

(2)
wwww正在做第一部分工作
eeee正在做第一部分工作
wwww正在做第二部分工作........
eeee正在做第二部分工作........
wwww正在做第二部分工作........
eeee正在做第二部分工作........
主线程结束.....

(3)
wwww正在做第一部分工作eeee正在做第一部分工作
eeee正在做第二部分工作........

wwww正在做第二部分工作........
wwww正在做第二部分工作........
eeee正在做第二部分工作........
主线程结束.....

可以看到,通过继承线程类,然后重写run方法,实例化这个类,这样也可以新创建线程,在某些情况下,这样还更加方便。

3.守护线程-daemon

线程的Daemon属性:当主线程执行结束, 让没有执行完成的线程强制结束的一个属性:daemon
setDaemon方法是改变线程类的一个属性:daemon,也可以在创建线程的时候指定这个属性的值,他的值默认为None

import threading
import time

# 任务1:
def music(name):
    for i in range(2):
        print("正在听音乐%s" %(name))
        time.sleep(2)
        print('听音乐结束')
# 任务2:
def code(name):
    for i in range(2):
        print("正在编写代码%s" %(name))
        time.sleep(2)
        print('写代码结束')

if __name__ == '__main__':
    t1 = threading.Thread(target=music, args=("中国梦",))
    t2 = threading.Thread(target=code, args=("爬虫", ))

    # 将t1线程声明为守护线程, 如果设置为True, 子线程启动, 当主线程执行结束, 子线程也结束
    # 设置setDaemon必须在启动线程之前进行设置;
    t1.setDaemon(True)
    t2.setDaemon(True)
    t1.start()
    t2.start()
    print('完成任务......')

运行结果:

(1)
正在听音乐中国梦正在编写代码爬虫
完成任务......

(2)
正在听音乐中国梦
正在编写代码爬虫完成任务......

当设置daemon属性为True,就和_thread模块的线程一样主线程结束,其它线程也被迫结束

4.线程中的锁

1.全局解释锁

什么是全局解释器锁(GIL)
Python代码的执行由Python 虚拟机(也叫解释器主循环,CPython版本)来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
即全局解释器锁,使得在同一时间内,python解释器只能运行一个线程的代码,这大大影响了python多线程的性能。
需要明确的一点是GIL并不是Python的特性
GIL是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
python GIL 会影响多线程等性能的原因:
因为在多线程的情况下,只有当线程获得了一个全局锁的时候,那么该线程的代码才能运行,而全局锁只有一个,所以使用python多线程,在同一时刻也只有一个线程在运行,因此在即使在多核的情况下也只能发挥出单核的性能。
经过GIL这一道关卡处理,会增加执行的开销。这意味着,如果你想提高代码的运行速度,使用threading包并不是一个很好的方法。
在多线程环境中,Python 虚拟机按以下方式执行:

  1. 设置GIL
  2. 切换到一个线程去运行
  3. 运行:
    a. 指定数量的字节码指令,或者
    b. 线程主动让出控制(可以调用time.sleep(0))
  4. 把线程设置为睡眠状态
  5. 解锁GIL
  6. 再次重复以上所有步骤
    既然python在同一时刻下只能运行一个线程的代码,那线程之间是如何调度的呢?
    对于有io操作的线程,当一个线程在做io操作的时候,因为io操作不需要cpu,所以,这个时候,python会释放python全局锁,这样其他需要运行的线程就会使用该锁。
    对于cpu密集型的线程,比如一个线程可能一直需要使用cpu做计算,那么python中会有一个执行指令的计数器,当一个线程执行了一定数量的指令时,该线程就会停止执行并让出当前的锁,这样其他的线程就可以执行代码了。
    由上面可知,至少有两种情况python会做线程切换,一是一但有IO操作时,会有线程切换,二是当一个线程连续执行了一定数量的指令时,会出现线程切换。当然此处的线程切换不一定就一定会切换到其他线程执行,因为如果当前线程优先级比较高的话,可能在让出锁以后,又继续获得锁,并优先执行。

这里就可以将操作分两种:
i/o密集型
cpu密集型(计算密集型)
对于前者我们尽可能的采用多线程方式,后者尽可能采用多进程方式

2.线程锁

为什么会需要线程锁?
多个线程对同一个数据进行修改时, 会出现不可预料的情况。
例如:

def add():
    global money
    for i in range(1000000):
        money += 1

def reduce():
    global money
    for  i in range(1000000):
        money -= 1

if __name__ =="__main__":
    money = 0
    t1 = threading.Thread(target=add)
    t2 = threading.Thread(target=reduce)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(money)

因为没有对变量money做访问限制,在某一个线程对其进行操作时,另一个线程仍可以对它进行访问、操作,致使最终结果出错,且不可预料,不是期待值。

(1)
55651
(2)
-133447
(3)
-236364

当我们使用线程锁的时候:

import threading

def add(lock):
    global money
    lock.acquire()
    for i in range(100000):
        money += 1
    lock.release()

def reduce(lock):
    global money
    lock.acquire()
    for  i in range(1000000):
        money -= 1
    lock.release()

if __name__ =="__main__":
    money = 0
    lock = threading.Lock()
    t1 = threading.Thread(target=add,args=(lock,))
    t2 = threading.Thread(target=reduce,args=(lock,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(money)

运行结果正确,始终为0

5.多线程

使用多线程来查ip的地理位置

import json
from urllib.request import urlopen

class Job(threading.Thread):
    def __init__(self,ip):
        super(Job,self).__init__()
        self.ip = ip
    def check_ip(self):
        url = 'http://ip.taobao.com/service/getIpInfo.php?ip=%s' % self.ip
        text = urlopen(url).read().decode('utf-8')
        d = json.loads(text)['data']
        country = d['country']
        city = d['city']
        print(self.ip+':\t'+country+'\t'+city)
    def run(self):
        self.check_ip()

if __name__ == "__main__":
    tt = []
    ips = ['172.25.254.23', '111.213.215.66', '152.158.32.54', '164.52.196.89','214.63.145.189']
    for ip in ips:
        t = Job(ip)
        t.start()
        tt.append(t)
    [i.join() for i in tt]

结果:

172.25.254.23:  XX  内网IP
214.63.145.189: 美国  XX
164.52.196.89:  印度  XX
111.213.215.66: 中国  上海
152.158.32.54:  欧洲  XX

6.生产者消费者模型

1.模型引入

1). 理论上多线程执行任务, 会产生一些数据, 为其他程序执行作铺垫;
2). 多线程是不能返回任务执行结果的, 因此需要一个容器来存储多线程产生的数据
3). 这个容器如何选择? list(栈, 队列), tuple(x), set(x), dict(x), 此处选择队列来实现
队列与多线程

import threading
from queue import Queue

def job(l,queue):
    # 将任务的结果存储到队列中
    queue.put(sum(l))

def use_thread():
    # 实例化一个队列, 用来存储每个线程执行的结果
    q = Queue()
    li = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]
    threads = []
    for i in li:
        t = threading.Thread(target=job,args=(i,q))
        threads.append(t)
        t.start()
    # join方法等待所有子线程执行结束
    [i.join() for i in threads]
    # 从队列里面拿出所有的运行结果
    result = [q.get() for i in li]
    print(result)

if __name__ == "__main__":
    use_thread()

运行结果:

[21, 27, 33, 39]
2.生产者消费者模型

在软件开发的过程中,经常碰到这样的场景:
某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。
为了容易理解,我们举一个寄信的例子。假设你要寄一封信,大致过程如下:
1、你把信写好——相当于生产者生产数据
2、你把信放入邮箱——相当于生产者把数据放入缓冲区
3、邮递员把信从邮箱取出,做相应处理——相当于消费者把数据取出缓冲区,处理数据
生产者消费者模式的优点
1.解耦
假设生产者和消费者分别是两个线程。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。如果未来消费者的代码发生变化,可能会影响到生产者的代码。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
举个例子:我们去邮局投递信件,如果不使用邮箱(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他。这就产生了你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员 换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮箱相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
2.并发
由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区通信的,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
继续上面的例子:如果我们不使用邮箱,就得在邮局等邮递员,直到他回来,把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞)。或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
3.支持忙闲不均
当生产者制造数据快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,慢慢处理掉。而不至于因为消费者的性能造成数据丢失或影响生产者生产。
我们再拿寄信的例子:假设邮递员一次只能带走1000封信,万一碰上情人节(或是圣诞节)送贺卡,需要寄出去的信超过了1000封,这时候邮箱这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮箱中,等下次过来时再拿走。

实例:
1.文件ipfile.txt中有大量的ip地址,要求将ip地址取出来再与端口号组合,放入队列中
2.从队列中取出地址,依次访问并返回访问结果

import  threading
from queue import Queue
from urllib.request import urlopen
import time

class Procuder(threading.Thread):
    def __init__(self, q):
        super(Procuder, self).__init__()
        self.q = q
    def run(self):
        portlist = [80, 443, 7001, 8000, 8080]
        with open('ipfile.txt') as f:
            for ip in f:
                for port in portlist:
                    url = 'http://%s:%s' % (ip.strip(), port)
                    self.q.put(url)

class Consumer(threading.Thread):
    def __init__(self, q):
        super(Consumer, self).__init__()
        self.q = q
    def run(self):
        # 阻塞0.001妙使生产者先运行再队列中放入数据
        time.sleep(0.001)
        #只要队列不为空就一直“消费”数据
        while not self.q.empty():
            try:
                url = self.q.get()
                urlObj = urlopen(url)
            except Exception as e:
                print("%s unknown url" %(url))
            else:
                print("%s is ok" %(url))

if __name__ == '__main__':
    q = Queue(20)
    p1 = Procuder(q)
    p1.start()
    c = Consumer(q)
    c.start()
    # 阻塞调用线程,直到队列中的所有任务被处理掉,再继续向下执行。
    q.join()

运行结果就不截图了。

7.线程池

传统多线程方案会使用“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。

一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。

使用线程池:
由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。

#导入模块 注意: python3.2版本以后才可以使用;
from concurrent.futures import ThreadPoolExecutor
import time

#需要完成的任务
def job(n):
    sum = 0
    for i in range(1,n+1):
        sum += i
    return sum

if __name__ =="__main__":
    #实例化线程池对象,设置线程池有10个线程
    pool = ThreadPoolExecutor(max_workers=10)

    #向线程池提交任务submit方法返回一个_base.Future对象,这个对象含有许多方法。便于我们对线程操作
    f1 = pool.submit(job,20)
    f2 = pool.submit(job,16)

    #查看线程是否完成任务(线程是否被销毁,完成任务的线程会被释放)
    #这里休眠1妙是因为线程在完成工作后会被释放,如果立即查看线程状态,可能线程正在释放中,会返False,这里等待1妙让线程完成释放之后在查看线程状态。
    time.sleep(1)
    print(f1.done())
    print(f2.done())

    #直接获取任务执行结果
    print(f1.result())
    print(f2.result())

运行结果:

True
True
210
136
实现线程池的三种方法(实际可以看成2种)

concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:
(1)map可以保证输出的顺序, submit输出的顺序是乱的
(2)如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()
(3)submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。

from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from urllib.request import urlopen

urls = ['http://a.cn','http://1688.cn','http://jd.cn','http://qq.cn','http://qq.com','http://111.231.215.66']*10

def get_page(url):
    try:
        content = urlopen(url).read()
    except:
        return {'url:'+url+'   page_len:'+str(0)}
    else:
        return {'url:'+url+'   page_len:'+str(len(content))}

# 1.通过for循环打印结果
print('第一种方法:')
pool = ThreadPoolExecutor(max_workers=10)
resultlist = [pool.submit(get_page,url) for url in urls]
for i in resultlist:
    print(i.result())

# 2.通过方法as_completed
print('第二种方法:')
pool = ThreadPoolExecutor(max_workers=10)
resultlist = [pool.submit(get_page,url) for url in urls]
for i in as_completed(resultlist):
    print(i.result())

# 3.通过map方法
print('第三种方法:')
pool = ThreadPoolExecutor(max_workers=10)
for res in pool.map(get_page,urls):
    print(res)
向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI