流畅的Python_注释笔记

Python 数据模型

特殊方法

内置函数会调用特殊方法,一般无需直接调用特殊方法,除了__init__方法。

__len__:调用len(obj),解释器会自动调用 obj.__len__()

对于Python内置的类型(list,str等),CPython还会抄近路提升计算速度,直接返回PyVarObject(内存中长度可变的内置对象的C语言结构体)里的ob_size属性

__repr__:被repr()函数调用,或者直接在控制台输入,把一个对象用字符串的形式表达出来以便辨认。面向开发者使用

__str__:被str()函数调用,或者用print函数打印时调用,返回的字符串对中断用户更友好。面向用户的

区别:如果一个对象没有__str__函数,Python又需要调用时,会用__repr__代替

__add__:执行+运算

__mul__:执行*运算

PS:中缀运算符的原则是,不改变操作对象,而是产生出一个新的值

__rmul__:解决交换律问题

__bool__:被bool()函数调用,应该返回bool类型

1
2
def __bool__(self):
return bool(self.a or self.b)

序列构成的数组

内置序列

按元素类型分类:

容器序列:list、tuple、collections.deque,可以存放不同类型的数据

扁平序列:str、bytes、bytearray、memoryview、array.array,只能容纳一种类型

按是否能被修改分类:

可变序列:list、bytearray、array.array、collections.deque、memoryview

不可变序列:tuple、str、bytes

列表推导式、生成器表达式

列表推导式

生成新的列表

str = '测试字符串'
arr = [ord(x) for x in str if ord(x) > 30000]

与使用filter加map的比较

list(filter(lambda x: x > 30000, map(ord, str)))

生成器表达式:生成列表以外的序列类型

逐个的产出元素,而不会在内存中留下一个列表,节省内存,提高效率。

str = '测试字符串'*1000000
arr = (ord(x) for x in str if ord(x) > 30000)

上述操作花费0.2s,如果换成列表推到式,将大约花费2s。

如果声称其表达式是函数调用中的唯一参数,那么不再需要括号围起来,如 tuple(x for x in arr)

元组

可以作为“不可变列表”,还可以用于没有字段名的纪录。

具名元组

namedtuple,有名称和字段名的元组。

定义:x = ('Beijing', 2018) 或省略括号 x = 'Beijing', 2018

访问元素:x[0]

元组拆包

city, year, pop = ('Tokyo', 2003, 32450)

等式左边是元组的省略括号的写法,等同于(city,year,pop) = ...

元组拆包可以应用到任何可迭代对象上,唯一的要求是,接受元素的元组的空档数必须要和被迭代对象中的元素数量一致。

对于不关心的元素,可以用_占位符代替。对于多余的元素,可以用*来忽略。

str = '测试字符串'

a,b,_,*args = str

*前缀只能用在一个变量名前面,但是这个变量可以出现在赋值表达式的任意位置。

拆包时带的变量类型被定义为list,如果加在方法的形参上面,则变量被定义为tuple。

a, *b = 'Beijing', 2018, 1
print(type(b)) #list

1
2
def my(a,*b):
print(type(b)) #tuple

不使用中间变量交换两个变量的值

b,a=a,b

嵌套元组拆包

name, cc, pop, (lat, long) = ('Tokyo', 'JP', 36.933, (35.689722, 139.691667))
print('{:^15}|{:^9.4f}|{:^9.4f}'.format(name, lat, long)) # :9个单位的空间,保留4位小数,^居中

具名元组

1
2
3
4
5
import collections

City = namedtuple('City', 'name country population coordinates')
City = namedtuple('City', ['name','country','population','coordinates'])
tokyo = City('Tokyo', 'JP', 36.933, (35.689722, 139.691667))

元素访问:

tokyo[0],tokyo.name

属性和方法:

City._fields:返回包含这个类所有字段名称的元组
City._make():通过接受一个可迭代对象来生成一个实例,和调用City()是一样的
City._asdict():以 collections.OrderedDict 形式返回,方便遍历元素

1
2
for k, v in tokyo._asdict().items():
print(k, v)

元组与列表的方法区别:元组没有列表的增删改/清空等方法,包含查询和统计的方法。

切片

slice

切片和区间操作都不包含范围的最后一个元素,(左闭右开)。

优点:快速计算长度,后一个数减前一个数。快速分割元素不重叠,mylist[:10]mylist[10:]

s = 'bicycle'
print(s[::3]) # start:stop:step

相当于s[slice(None,None,3)],即s.__getitem__(slice(None,None,3))

多维切片:二维的numpy.ndarray就可以用a[m:n, k:l]的方式来得到二维切片

省略:ellipsis,Ellipsis对象是ellipsis类的单一实例

给切片复制:切片可以放在复制语句的左边或作为del操作的对象,可以对序列进行嫁接,切除或就地修改

1
2
3
4
l[2:5] = [20,30]
del l[5:7]
l[3::2] = [11,22]
l[2:5] = 100 #报错,如果把切片放在赋值左侧·1,右侧则必须是个可迭代对象

对序列使用+和*

浅表复制,序列中包含引用的话,仅仅是复制引用。

创建一个3*3的列表

arr = [['_']*3]*3

实际上创建了一个[‘‘,’‘,’_’]列表,然后复制了3次引用。改变其中一个列表的元素 arr[1][2] = '0',将影响三个列表。

等同于:

1
2
3
row = ['_'] * 3
for i in range(3):
arr.append(row)

正确的做法:

arr = [['_'] * 3 for i in range(3)]

序列的增量赋值

a += b

增量赋值运算符+=和*=的表现取决于他们第一个操作对象。背后的特殊方法时 __iadd____imul__ ,如果a实现了 __iadd__ ,则会调用这个方法。

对于可变序列(list,bytearray…)来说,a就会就地改动(in-place),好像调用了 a.extend(b) 一样。

如果a没有实现 __iadd__,这个表达式就变得跟 a = a+b 一样了。会计算 a+b,得出一个新对象,然后赋值给a。

1
2
3
4
5
6
7
8
9
l=[1,2,3]
print(id(l))
l*=2
print(id(l)) # 两次id相等,in-place,效率高

t=(1,2,3)
print(id(t))
t*=2
print(id(t)) # 两次id不等,新的对象,效率低

问题:

1
2
3
4
5
6
t = (1, 2, [30, 40])
try:
t[2] += [50, 60]
except Exception as e:
pass
print(t)

结果:t变成(1,2,[30,40,50,60]),但仍会抛出TypeError异常

查看类似过程的字节码:

1
2
import dis
dis.dis('s[a]+=b')

步骤说明:

  1. 将s[a]的值存入TOS(Top Of Stack)
  2. TOS += b,TOS是一个list,可以执行
  3. s[a] = TOS,s是不可变的,所以执行失败

结论:

  1. 不要把可变对象放在元组里
  2. 增量赋值不是一个原子操作

list.sort()和sorted()

list.sort(),就地排序,返回None。

sorted,新建一个列表并返回。可接受任何形式的可迭代对象,如,字符串,不可变序列或生成器,最后都会返回一个列表。

list.sort()和sorted()的参数:

reverse:为True则降序排列

key:一个只有一个参数的函数,key=len,key=myfunc,默认值是恒等函数

包含key参数的内置函数还有,min()和max()等。

bisect管理已排序的序列

bisect模块包含两个主要函数,bisect和insort,都利用二分查找来在有序序列中查找或插入元素。

bisect.bisect:bisect(haystack, needle),在haystack(干草垛,必须升序)中找needle(针,待插入的元素)的位置

可选参数,lo和hi,控制搜索的范围。

bisect其实是bisect_right函数的别名,如果碰到相等的元素,bisect_right返回的插入位置是原序列中被插入元素相等的元素的之后的位置。

相应的还有bisect_left,对于这种情况则返回相等元素的原位置,即相等元素之前的位置。

1
2
3
4
5
6
7
8
9
import bisect

def grade(score):
breakpoints = [60, 70, 80, 90]
grades = 'FDCBA'
i = bisect.bisect(breakpoints, score)
return grades[i]

print([grade(x) for x in [59, 66, 99]])

bisect.insort:insort(seq,item)把变量item插入到序列seq中,并能保持seq的升序顺序

1
2
3
4
5
6
import bisect
import random

l = []
for i in range(10):
bisect.insort(l, random.randrange(100))

列表不是首选时

如果要存放1000万个浮点数的话,数组(array)的效率要高得多。因为数组背后存的不是float对象,而是数字的机器翻译,也就是字节表述。就跟C语言中的数组一样。

如果要频繁的进行FIFO的操作,deque速度会更快。如果检查元素是否存在的频率很高,用set更合适。因为set专为检查元素是否存在做过优化。

数组:数组支持所有跟可变序列有关的操作,pop,insert,extend。还有从文件读和存入文件的更快的方法,frombytes,tofile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from array import array
from random import random

# 用生成器表达式建立一个1000万个浮点数的数组。'd'是类型码,代表数组只能存放双精度浮点数
fs = array('d', (random() for i in range(10**7)))
print(fs[-1])
fp = open('floats.bin', 'wb')
fs.tofile(fp) #将1000万个浮点数以二进制格式写入一个二进制文件
fp.close()
fs2 = array('d')
fp = open('floats.bin','rb')
fs2.fromfile(fp, 10**7)
fp.close()
print(fs2[-1])

print(fs == fs2) #两个数组的内容一模一样

从Python3.4开始,数组类型不在支持诸如list.sort()这种就地排序,排序要用sorted函数新建一个数组。

内存视图:能让用户在不复制内容的情况下操作同一个数组的不同切片。memoryview.cast会把同一块内存里的内容打包成一个全新的memoryview对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import array

nums = array.array('i', [2, 3, 4, 5])

memv = memoryview(nums)
print(memv[0]) #2
memv[0] = 1
print(nums[0]) #1

memv_n = memv.cast('B')
memv_n[0] = 9
print(memv[0]) #9
print(memv_n[0]) #9
print(nums[0]) #9

双向队列和其他形式的队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from collections import deque

dq = deque(range(10), maxlen=10) # maxlen可选参数,指定队列的最大长度,超出的元素将被舍弃

dq.rotate(3) # 旋转:接受一个参数n,当n>0时,从队列最右边的n个元素移动到队列左边。当n<0时,反之。
dq.rotate(-4)

dq.append(10) # 从队列尾部(右边)添加一个元素
dq.append([10, 20]) # 从队列尾部添加一个列表元素
dq.appendleft(10) # 与append相反
dq.appendleft([10, 20])

dq.extend([10, 20]) # 接受可迭代对象,将所有元素一个一个添加到队列右边
dq.extendleft([10, 20]) # 与extend相反

dq.pop() # 删除队列最右边的一个元素,并返回
dq.popleft()

append和popleft都是原子操作,deque可以在多线程程序中安全地当做FIFO的栈使用。

字典和集合

泛映射类型

collections.abc模块中有Mapping和MutableMapping两个基类。

ABC:Abstract Base Class

sequence

非抽象映射类型一般不直接继承这些abc,他们的作用是作为形式化文档,为dict何其他映射类型定义形式接口。还可以通过isinstance来判断某个数据是否是映射类型。

my_dict = {} # 或其他类型的Map

isinstance(my_dict, abc.Mapping) # True

可散列类型:

如果一个对象是可散列的,那么在这个对象的声明周期中,它的散列值是不变的,而且这个对象要实现hash()方法。

另外可散列对象还有有qe()方法,这样才能和其他键值作比较。如果两个可散列对象是相等的,那么它们的散列值一定是一样的。

原子不可变数据类型(str、butes和数值),frozenset,tuple(必须包含的所有元素都是可散列类型),都是可散列的。

可散列tuple:

1
2
3
4
t1 = (1, 2)
hash(t1)
t2 = (1, [])
hash(t2) # TypeError: unhashable type: 'list'

字典的构造方法:

1
2
3
4
5
6
7
a = {'one': 1}
b = dict({'one': 1})
c = dict(one=1)
d = dict(zip(['one'], [1]))
e = dict([('one', 1)])

a == b == c == d == e # True

字典推导

1
2
3
4
5
6
7
8
9
10
11
CODES = [
(86, 'China'),
(91, 'India'),
(1, 'USA'),
(55, 'Brazil'),
(7, 'Russia'),
(81, 'Japan')
]

codes = {contry: code for code, contry in CODES if code > 10}
print(codes)

映射的常用方法

setdefault:

my_dict.setdefault(key, []).append(new_value)

等价于

1
2
3
if key not in my_dict:
my_dict[key] = []
my_dict[key].append(new_value)

等价于

1
2
3
v = my_dict.get(key, [])
v.append(new_value)
my_dict[key] = v

defaultdict:

1
2
3
from collections import defaultdict
d = defaultdict(list)
d[key].append(new_value)

把list的构造方法作为default_factory来创建一个defaultdict,只会在getitem里被调用,其他比如get方法,在找不到键时只会返回None。

__missing__

只被__getitem__调用,如defaultdict,在__missing__中实现了调用default_factory方法。

字典的变种

collections.OrderedDict:添加键的时候会保持顺序

collections.ChainMap:可以容纳数个不同的映射对象,然后进行键查找时会把这些对象当成一个整体逐个查找。这个功能再给有嵌套作用域的语言做解释器的时候很有用,如:

1
2
3
4
5
6
7
8
9
10
11
import builtins

print(locals()) #本地变量
print(globals()) #全局变量
print(vars(builtins))

collections.Counter:键-整数计数器,每次更新一个键的时候都会增加这个计数器。这个类型可以用来给可散列对象技术。

ct = collections.Counter('absdfxasdasdf')
ct.update('aaaadd')
print(ct.most_common(2)) # 最常见的2个键和它们的计数

UserDict:子类化,以UserDict为基类创造自定义映射类型,它并不是dict的子类,继承的是MutableMapping,有一个data属性,是一个dict实例。最终存储数据的地方。

不可变映射类型:返回一个映射的只读视图,但是时动态的,如果原映射有改动,可以通过视图观察到。

1
2
3
4
5
6
7
8
9

from types import MappingProxyType

d = {1: 'A'}
d_proxy = MappingProxyType(d)
print(d_proxy[1])
d_proxy[1] = 'B' # 报错
d[1] = 'B'
print(d_proxy[1])

set

保证元素的唯一性,集合中的元素必须是可散列的,set本身是不可散列的,但是frozenset可以

中缀运算:a|b,返回合集。a&b返回交集,a-b返回差集

计算重叠元素的个数

found = len(needles & haystack) #两侧都需要是集合类型

found = len(set(needles).intersection(haystack)) #如果一个对象还不是集合类型

直接使用set的交集计算比for遍历计算求和快的多

字面量:s = {1}s = set([1])

空集:s = set(),不能写{}(空字典)

直接写{1}的方式更快,因为Python会利用一个专门的BUILD_SET的字节码来创建集合。查看区别:

1
2
3
4
from dis import dis

dis('{1}')
dis('set([1])')

fronzenset只能通过构造方法创建。

集合推导:

1
2
3
from unicodedata import name

print({chr(c) for c in range(32, 256) if 'SIGN' in name(chr(c), '')})

集合的的继承关系:

set

基本操作:

  • +/-/&/|
  • e in s,元素e是否属于s
  • s <= z,s是否为z的子集
  • s < z,s是否为z的真子集
  • s >=z/s>z

dict和set的背后原理

散列表其实是一个稀疏数组(总有空白元素的数组),散列表的单元通常叫做表元(bucket),在dict的散列表当中,每个键值对都占用一个表元,每个表元都有两个部分,一个是对键的引用,另一个是对值的引用。因为所有表元的大小一致,所以可以通过偏移量来读取某个表元。

因为Python会设法保证大概还有三分之一的表元是空的,所以快要达到这个阈值时,原有的散列表会被复制到更大的空间里面进行扩容。

散列表算法

为了获取dict[search_key]背后的值,Python首先会调用hash(search_key)来计算search_key的散列值,取这个值最低的几位数字(具体几位看当前散列表的大小)当做偏移量,在散列表里查找表元。若找到的表元是空的,则抛出KeyError异常。若不是空的,则表元里会有一堆found_key:found_value,这时候进行校验search_key == found_key是否为真,如果相等,则返回found_value。

如果search_key和found_key不匹配的话,这种情况称为散列冲突。发生这种情况原因是,把随机元素映射到几位的数字上,而索引又依赖于这几位数字的一部分而已。为了解决散列冲突,算法会在散列值中另外再取几位,然后用特殊的方法处理一下,把新得到的数字再当成索引寻找表元,重复以上给欧成。

添加和更新操作几乎跟上面一样。

散列表

dict的实现及其限制

  • 键必须是可散列的

支持hash()函数,并且通过 __hash__() 方法得到的散列值是不变的。

支持通过 __eq__() 方法来检测相对性。

若a == b为真,则hash(a) == hash(b) 也为真。

所有由用户自定义的对象默认都是可散列的,散列值由id()来计算获取。

  • 字典内存开销巨大

为了保持散列表是稀疏的,将降低在空间上的效率。

如果要存放数量巨大的纪录,元组或具名元组构成的列表是比较好的选择。

  • 键查询很快

dict是典型的空间换时间,内存开销大,但是查询速度快。

  • 键的次序取决于添加顺序

添加新键又发生散列冲突时,新键可能会被安排存放到另一个位置。这将导致键的顺序乱掉。

  • 往字典里添加新键可能会改变已有键的顺序

无论何时添加新键,Python解释器都可能做出字典寇蓉的决定。扩容导致把字典中已有的元素添加到更大的新表中,过程中如果发生散列冲突,将导致新标中键的次序变化。
因此不要同时对字典进行迭代和修改,最好分开两步进行。

文本和字节序列

字符问题

Unicode字符标识和字节表述:

字符的标识,即码位。是0~1114111的数字,以4~6个十六进制数字表示。

字节表述取决于所用的编码,是在码位和字节序列之间转换时使用的算法。如UTF-8,字符在ASCII范围内用的编码成1个字节,一个汉字占3个字节。

字节概要

调用各自的构造方法,构建bytes或bytearray实例。

一个str对象和一个encoding关键字参数。

一个可迭代对象,提供0~255之间的数值。

一个实现了缓冲协议的对象(bytes,bytearray,memoryview,array.array),把源对象中的字节序列复制到新建的二进制序列中。

1
2
3
4
5
6
7
8
9
10
cafe = bytes('café', encoding='utf8')

print(cafe) # b'caf\xc3\xa9'
print(type(cafe)) # bytes
print(cafe[0]) # 99 0~255的整数
print(cafe[:1]) # b'c' 一个字节的切片仍是bytes类型

cafe_arr = bytearray(cafe)
print(cafe_arr) # bytearray(b'caf\xc3\xa9')
print(cafe_arr[-1:]) # bytearray(b'\xa9')

各个字节的值可能使用下列三种不同的方式显示,

  1. 可打印的ASCII范围内的字节(空格到~),使用ASCII字符本身。

  2. 制表符、换行符、回车符和\,使用转义序列\t、\n、\t和\。

  3. 其他字节,使用十六进制转义序列(\x00)

编码异常

UnicodeEncodeError:编码时,编解码器没有定义某个字符时,就会抛出

1
2
3
4
5
6
city = 'São Paulo'
city.encode('utf_8')
city.encode('cp437') #抛出异常
city.encode('cp437', errors='ignore') #忽略并删除无法编码的字符
city.encode('cp437', errors='replace') #把无法编码的字符替换成'?'
city.encode('cp437', errors='xmlcharrefreplace') #把无法编码的字符替换成XML实体

UnicodeDecodeError:解码时碰到无法转换的字节序列时会抛出

1
2
3
4
octets = b'Montr\xe9al'
octets.decode('cp1252')
octets.decode('utf_8') #抛出异常
octets.decode('utf_8', errors='replace') #\xe9替换成“黑色菱形问号”(码位是U+FFFD),这是官方指定的 REPLACEMENT CHARACTER(替换字符),表示未知字符。

处理文本文件

1
2
3
4
5
6
7
8
9
import os

open('cafe.txt', 'w', encoding='utf_8').write('café') # 返回写入Unicode字符数,4
# 报告文件中有5个字节:caf是ASKII范围内的字符,占一个字节,最后一个特殊字符占两个字节
os.stat('cafe.txt').st_size

print(open('cafe.txt', encoding='utf8').read())
fp4 = open('cafe.txt', 'rb') # 以二进制模式打开文件。常规代码只应该使用二进制模式打开二进制文件
fp4.read() # b'caf\xc3\xa9'

如果读取时不指定编码格式,那么Python会使用系统默认的编码,很有可能出现乱码。所以需要在多台设备或场合下运行的代码,打开读取文件时应始终明确传入encoding参数。

编码默认值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import sys
import locale

expressions = """
locale.getpreferredencoding()
type(my_file)
my_file.encoding
sys.stdout.isatty()
sys.stdout.encoding
sys.stdin.isatty()
sys.stdin.encoding
sys.stderr.isatty()
sys.stderr.encoding
sys.getdefaultencoding()
sys.getfilesystemencoding()
"""


my_file = open('dummy', 'w')
for expression in expressions.split():
value = eval(expression)
print(expression.rjust(30), '->', repr(value))

输出:

1
2
3
4
5
6
7
8
9
10
11
locale.getpreferredencoding() -> 'cp936'
type(my_file) -> <class '_io.TextIOWrapper'>
my_file.encoding -> 'cp936'
sys.stdout.isatty() -> True
sys.stdout.encoding -> 'utf-8'
sys.stdin.isatty() -> True
sys.stdin.encoding -> 'utf-8'
sys.stderr.isatty() -> True
sys.stderr.encoding -> 'utf-8'
sys.getdefaultencoding() -> 'utf-8'
sys.getfilesystemencoding() -> 'utf-8'

一等函数

把函数视作对象

“一等对象”的定义为满足下述条件的程序实体:

  • 函数是一等对象。
  • 在运行时创建。
  • 可以赋值给变量,通过变量调用。
  • 能作为参数传给函数。
  • 能作为函数的返回结果。
1
2
3
4
5
6
7
8
9
10
11
12
def factorial(n):
'''return n!'''
return 1 if n < 2 else n * factorial(n-1)

print(factorial(42))
print(factorial.__doc__)
print(type(factorial))

fact = factorial
print(fact(5))
print(map(fact, range(10)))
print(list(map(fact, range(10))))

高阶函数

接受函数为参数或返回函数为结果的是函数是高阶函数,如map,sorted

  • map,filter返回生成器,因此现在它们的直接替代品是生成器表达式。
  • functools.reduce,最常用于求和。
  • all(iterable),如果iterable的每个元素都是真值,就返回True。
  • any(iterable),如果iterable中有元素是真值,就返回True。

匿名函数

因为句法限制,lambda函数的定义体不能赋值和使用while等语句,在Python中很少使用。

1
2
words = ['strawberry', 'fig', 'apple', 'cherry', 'raspberry', 'banana']
print(sorted(words, key=lambda word: word[::-1]))

可调用对象

可被调用运算符(即())应用的,叫做可调用对象,可以使用callable()函数判断。

包括:

  • 用户定义的函数(def或lambda创建)
  • 内置函数(C语言实现的函数,如len)
  • 内置方法(C语言实现的方法,如dict.get)
  • 方法(在类的定义体中定义的函数)
  • 类(调用顺序为 __new____init__ 初始化实例,然后返回给调用方)
  • 类的实例(定义了 __call__ 方法的类)
  • 生成器函数(使用yield关键字的函数或方法,返回的是生成器对象)

用户定义的可调用类型

实现了 __call__ 的对象:

1
2
3
4
5
6
7
8
9
10
11
12
class Bingo():
def __init__(self, items):
self._items = list(items)

def pick(self):
return self._items.pop()

def __call__(self):
return self.pick()

b = Bingo(range(3))
print(b())

函数内省

dir函数可以探知对象的属性,计算出函数专有而一般对象没有的属性:

1
2
3
4
5
class C(): pass
c = C()
def func(): pass

print(set(dir(func))-set(dir(c)))

从定位参数到仅限关键字参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def tag(name='p',  *content, cls=None, **attrs):
if cls:
attrs['class'] = cls
if attrs:
attrs_str = ''.join(' {}="{}"'.format(attr, value)
for attr, value in attrs.items())
else:
attrs_str = ''

if content:
return '\n'.join('<{}{}>{}</{}>'.format(name, attrs_str, c, name) for c in content)
return '<{}{} />'.format(name, attrs_str)


print(tag('br'))
print(tag('p', 'hello'))
print(tag('p', 'hello', 'world', id='33', cls='side'))
print(tag(content='内容', name='img'))
d = {'name': 'img', 'title': 'Sunset Boulevard',
'src': 'sunset.jpg', 'cls': 'framed'}
print(tag(**d))

cls即仅限关键字的参数,因为未指定关键字的参数会被cls前面的*content捕获,并且cls有默认值可不指定,所以想要指定的话就必须指定关键字。

获取关于参数的信息

1
2
3
4
5
6
7
8
9
10
11
from inspect import signature

def tag(name, *content, cls=None, **attrs):
pass

print(tag.__defaults__)
# None,如果形参是name='xx',则是('xx',)。返回一个元组,保存的定位参数和关键字参数的默认值
print(tag.__code__)
# <code object tag at ...>,一个 code 对象引用, 自身也有很多属性
print(tag.__code__.co_varnames) # ('name', 'cls', 'content', 'attrs')
print(tag.__code__.co_argcount) # 1

更好更直观的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
sig = signature(tag)

print(sig) # (name='p', *content, cls=None, **attrs)

for name, param in sig.parameters.items():
print(param.kind, ':', name, '=', param.default)
# POSITIONAL_OR_KEYWORD : name = <class 'inspect._empty'>。通过定位参数和关键字参数传入的形参
# VAR_POSITIONAL : content = <class 'inspect._empty'>。定位参数元组
# KEYWORD_ONLY : cls = None。仅限关键字参数
# VAR_KEYWORD : attrs = <class 'inspect._empty'>。关键字参数字典。

my_tag = {'name': 'img', 'title': 'Sunset Boulevard',
'src': 'sunset.jpg', 'cls': 'framed'}
bound_args = sig.bind(**my_tag) # 尝试绑定,验证参数,规则同调用
# <BoundArguments (name={'name': 'img', 'title': 'Sunset Boulevard', 'src': 'sunset.jpg', 'cls': 'framed'})>
print(bound_args)

for name, value in bound_args.arguments.items():
print(name, '=', value)
# name = img
# cls = framed
# attrs = {'title': 'Sunset Boulevard', 'src': 'sunset.jpg'}

del my_tag['name']
bound_args = sig.bind(**my_tag) #抛出TypeError,缺少name参数。或提供name的默认值

函数注解

1
2
3
4
5
6
7
8
9
10
11
12
from inspect import signature

def clip(text: str, max_len: 'int > 0123'=80) -> str:
pass

sig = signature(clip)
print(sig.return_annotation) # <class 'str'>
for param in sig.parameters.values():
note = repr(param.annotation).ljust(30)
print(note, ':', param.name, '=', param.default)
# <class 'str'> : text = <class 'inspect._empty'>
# 'int > 0123' : max_len = 80

函数式编程的包

operator模块:多个算术运算符提供了对应的函数, 从而避免编写lambda a, b: a*b这种平凡的匿名函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from functools import reduce
from operator import mul

def fact(n):
# return reduce(lambda curr, item: curr*item, range(1, n+1))
return reduce(mul, range(1, n+1))

itemgetter,itemgetter(1) 即 lambda fields: fields[1]
attrgetter

from operator import itemgetter, attrgetter
from collections import namedtuple

metro_data = [
('New York-Newark', 'US', 20.104, (40.808611, -74.020386)),
('Sao Paulo', 'BR', 19.649, (-23.547778, -46.635833)),
]

for city in sorted(metro_data, key=itemgetter(1)): # 按某个字段排序
print(city)

for city in metro_data:
print(itemgetter(0, 1)(city))

LatLong = namedtuple('LatLong', 'lat long')
Metropolis = namedtuple('Metropolis', 'name cc pop coord')
metro_areas = [Metropolis(name, cc, pop, LatLong(lat, long))
for name, cc, pop, (lat, long) in metro_data]
print(metro_areas[0])
print(metro_areas[0].coord.lat)

name_lat = attrgetter('name', 'coord.lat')
for city in sorted(metro_areas, key=attrgetter('coord.lat')):
print(name_lat(city))

methodcaller:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from operator import methodcaller

s = 'The time has come'

upcase = methodcaller('upper')
hiphenate = methodcaller('replace', ' ', '-')

print(upcase(s))
print(hiphenate(s))

使用functools.partial 冻结参数:
即柯里化,基于一个函数创建一个新的可调用对象,固定指定某些参数

from operator import mul
from functools import partial

triple = partial(mul, 3)
print(list(triple(i) for i in range(10)))

picture = partial(tag, 'img', cls='pic-frame')
picture(src='some.jpg')

使用一等函数实现设计模式

策略模式

经典的策略模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from abc import ABC, abstractmethod
from collections import namedtuple

Customer = namedtuple('Customer', 'name fidelity')

class LineItem():
def __init__(self, product, quantity, price):
self.product = product
self.quantity = quantity
self.price = price

def total(self):
return self.price * self.quantity


class Order():
def __init__(self, customer, cart, promotion=None):
self.customer = customer
self.cart = list(cart)
self.promotion = promotion

def total(self):
if not hasattr(self, '__total'):
self.__total = sum(c.total() for c in self.cart)
return self.__total

def due(self):
if self.promotion:
discount = self.promotion.discount(self)
else:
discount = 0
return self.total()-discount

def __repr__(self):
fmt = '<Order total:{:.2f} due:{:.2f}'
return fmt.format(self.total(), self.due())


class Promotion(ABC):

@abstractmethod
def discount(self, order):
pass


class FidelityPromo(Promotion):
def discount(self, order):
return order.total() * .05 if order.customer.fidelity > 1000 else 0


class BulkItemPromo(Promotion):
def discount(self, order):
return sum(item.total()*.1 for item in order.cart if item.quantity >= 20)


class LargeOrderPromo(Promotion):
def discount(self, order):
distinct_items = {item.product for item in order.cart}
return order.total() * .07 if len(distinct_items) >= 10 else 0


ann = Customer('Ann Smith', 1100)
cart = [LineItem('banana', 4, .5),
LineItem('apple', 10, 1.5),
LineItem('watermellon', 5, 5.0)]
print(Order(ann, cart, FidelityPromo()))

函数实现“策略”模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

def fidelity_promo(order):
return order.total() * .05 if order.customer.fidelity > 1000 else 0


def bulk_item_promo(order):
return sum(item.total()*.1 for item in order.cart if item.quantity >= 20)


def large_order_promo(order):
distinct_items = {item.product for item in order.cart}
return order.total() * .07 if len(distinct_items) >= 10 else 0


promos = [fidelity_promo, bulk_item_promo, large_order_promo]


def best_promo(order):
return max(p(order) for p in promos)


print(Order(ann, cart, best_promo))

找出模块中的全部策略

若想添加新的促销策略,要定义函数且加到promos列表中,下面尝试更灵活的方式。

globals:返回一个字典,表示当前的全局符号表

把策略方法封装到promotions模块中,使用inspect内省,取出

内省模块的全局命名空间, 构建 promos 列表:

1
2
3

promos = [globals()[name] for name in globals()
if name.endswith('_promo') and name != 'best_promo']

内省单独的 promotions 模块, 构建 promos 列表:

1
promos = [func for name, func in inspect.getmembers(promotions, inspect.isfunction)]

命令模式

命令模式是回调机制的面向对象替代品,目的是解耦调用者和接收者。做法是在调用者和接收者之间放一个Command对象,让它实现execute接口,调用接受者自己的方法执行所需的操作,这样调用者无需了解接收者的接口,不同的接收者也可以适应不同的Command子类。

在 Python 中使用函数或可调用对象实现回调更自然,可以不用为调用者提供一个Command实例,而是给它一个函数,调用者不用调用command.execute(),直接调用command()即可。

1
2
3
4
5
6
7
8
class MacroCommand():

def __init__(self, commands):
self.commands = commands

def __call__(self):
for command in self.commands:
command()

函数装饰器和闭包

装饰器基础

装饰器是可调用对象,参数是被装饰的函数。可以把它替换掉也可以原样返回。

1
2
3
4
5
6
7
8
9
10
def decorate(func):
def inner():
print('running inner')
return inner

@decorate
def target():
print('running target')

print(target) # <function decorate.<locals>.inner at ...>

装饰器只是语法糖:

1
2
3
@decorate
def target():
pass

可以转换为:

1
2
3
def target():
pass
target = decorate(target()

装饰器的执行时机

装饰器在被装饰的函数定义之后立即执行,这通常是在导入时(即Python加载模块时)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def reister(func):
print('running reister {}'.format(func))
return func

@reister
def f1():
print('running f1')

def main():
print('running main')
f1()

if __name__ == '__main__':
main()

打印如下:

1
2
3
running reister <function f1 at ...>
running main
running f1

装饰器在真实代码中的常用方式:

  • 装饰器通常定义在一个模块中,然后应用到其他模块中的函数上。
  • 大多数装饰器会在内部定义一个函数,然后将其返回。也可能原样返回,比如很多Python Web框架使用这样的装饰器把函数添加到某种中央注册处。比如上面的例子中,使用促销装饰器把所有促销注册起来,供best_promo使用。

变量作用域

1
2
3
4
5
6
7
b = 6
def func(a):
print(a)
print(b)
b = 9

func(1)

会报错local variable ‘b’ referenced before assignment,Python判断b是局部变量,运行时从本地环境中获取,但是还未定义,所以报错。通过dis查看操作b的字节码为LOAD_GLOBAL。

改为:

1
2
3
4
5
6
7
8
9
b = 6
def func(a):
global b #让解释器把b当成全局变量
print(a)
print(b)
b = 9

func(1)
print(b) # 9

不再报错,读取和修改的都是全局变量。通过dis查看操作b的字节码为LOAD_FAST。
CPython VM是栈机器,LOAD和POP操作引用的是栈。

闭包

闭包指延伸了作用域的函数, 它能访问定义体之外定义的非全局变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def make_averager():
series = []

def averager(new_value):
series.append(new_value)
total = sum(series)
return total/len(series)

return averager

avg = make_averager()

print(avg(10))
print(avg(22))

在 averager 中, series 是自由变量(free variable, 指未在本地作用域中绑定的变量)。

闭包

1
2
3
4
print(avg.__code__.co_varnames)   # 局部变量
print(avg.__code__.co_freevars) # 自由变量
print(avg.__closure__) # 一个元组 (<cell at 0x0000028489CDA4C8: list object at ...>,)
print(avg.__closure__[0].cell_contents #

nonlocal 声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def make_averager():
count = 0
total = 0

def averager(new_value):
nonlocal count, total # python3的关键字,python2中通过赋值给可变对象(如字典)的属性来实现。不加这一句下面将会报错,因为相当于声明了局部变量的count 。count = count + 1。在读取count时报错
count += 1
total += new_value
return total / count

return averager

avg = make_averager()
print(avg(1))
print(avg(4))

实现一个简单的装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time

def clock(func):
def clocked(*args):
t0 = time.perf_counter()
result = func(*args)
elapsed = time.perf_counter() - t0
name = func.__name__
arg_str = ','.join(repr(arg) for arg in args)
print('[{:.8f}s] {}({}) -> {}'.format(elapsed, name, arg_str, result))
return result

return clocked


@clock
def snooze(seconds):
time.sleep(seconds)


@clock
def factorial(n):
return n if n == 1 else n * factorial(n-1)


if __name__ == '__main__':
print(factorial.__name__) # clocked。factorial已经被替换为clocked函数,__name__属性被覆盖
print('*'*40, 'Calling snooze(.123)')
snooze(.123)
print('*'*40, 'Calling factorial(6)')
factorial(6)

使用functools.wraps装饰器解决name覆盖的问题:

functools.wraps 装饰器会把相关的属性从 func 复制到 clocked 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def clock(func):

@functools.wraps(func)
def clocked(*args, **kwargs): # 支持关键字参数
t0 = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - t0
name = func.__name__
arg_lst = []
if args:
arg_lst.append(', '.join(repr(arg) for arg in args))
if kwargs:
pairs = ['{}={}'.format(k, v) for k, v in sorted(kwargs.items())]
arg_lst.append(', '.join(pairs))

arg_str = ', '.join(arg_lst)
print('[{:.8f}s] {}({}) -> {}'.format(elapsed, name, arg_str, result))
return result

return clocked

标准库中的装饰器

functools.lru_cache:

lru,即“Least Recently Used”。

它把耗时的函数的结果保存起来, 避免相同的参数重复计算。缓存不会无限制增长, 一段时间不用的缓存会被扔掉。

1
2
3
4
5
6
7
8
9
10
11
import functools

@functools.lru_cache() # ()调用,可传入配置参数
@clock
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-2) + fibonacci(n-1)

if __name__ == '__main__':
print(fibonacci(20))

极大的减少调用次数。

functools.lru_cache(maxsize=128, typed=False)

参数:

maxsize,指定存储多少个调用的结果,应该设为2的幂。

typed,True会把不同参数类型得到的结果区分开,比如1和1.0。

因为lru_cache使用字典存储结果,所以被修饰的函数,所有参数都必须是可散列的。

functools.singledispatch:

Python 不支持重载方法或函数, 所以我们不能使用不同的签名定义函数的变体, 也无法使用不同的方式处理不同的数据类型。 在Python 中, 一种常见的做法是把函数变成一个分派函数。singledispatch装饰器可以把整体方案拆分成多个模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from functools import singledispatch
from collections import abc
import numbers
import html

@singledispatch
def htmlize(obj):
content = html.escape(repr(obj))
return '<pre>{}</pre>'.format(content)

@htmlize.register(numbers.Integral)
def _(n): # 函数名称无所谓
return '<pre>{0} (0x{0:x})</pre>'.format(n)

@htmlize.register(tuple)
@htmlize.register(abc.MutableSequence) #可以叠放多个 register 装饰器
def _(seq):
inner = '</li>\n<li>'.join(htmlize(item) for item in seq)
return '<ul>\n<li>' + inner + '</li>\n</ul>'

叠放装饰器

1
2
3
4
@d1
@d2
def f():
print('f')

相当于:

1
2
3
def f():
print('f')
f = d1(d2(f))

参数化装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
registry = set()


def register(active=True):
def decorate(func):
print('running register(active={})->decorate({})'.format(active, func))
registry.add(func) if active else registry.discard(func)
return func
return decorate


@register(active=False)
def f1():
print('running f1()')


@register()
def f2():
print('running f2()')


print(registry) # 只有f2

装饰器函数金字塔:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time


def clock(): # 装饰器工厂
def decorate(func): # 装饰器
def clocked(*_args): # 包装被装饰的函数
t0 = time.time()
_result = func(*_args) # 被包装的函数返回的真正结果
elapsed = time.time() - t0
# pass
print('1')
return _result # clocked会取代原函数,因此应该返回被装饰函数的返回值
return clocked # 装饰器返回包装函数
return decorate # 工厂返回一个装饰器


@clock()
def funcname():
pass

对象引用、可变性和垃圾回收

标识、相等性和别名

id():

在 CPython 中, id() 返回对象的内存地址, 但是在其他 Python 解释器中可能是别的值。 关键是, ID 一定是唯一的数值标注, 而且在对象的生命周期中绝不会变。

==和is:

is运算符比==速度快,==是语法糖,等同于a.__eq__(b)

默认浅复制

列表的内置构造函数或list[:]语句,做的都是浅复制(只复制最外层的容器,容器中的元素只复制引用)。

1
2
3
4
5
6
7
8
9
10
11
12
l1 = [3, [66, 55, 44], (7, 8, 9)]
l2 = list(l1)

l1.append(100) # l1的容器添加一个元素
l1[1].remove(55)
print('l1:', l1)
print('l2:', l2)

l2[1] += [33, 22]
l2[2] += (10, 11) # l2 创建了新的元组
print('l1:', l1)
print('l2:', l2)

深复制和浅复制:

1
2
3
4
from copy import deepcopy

copy.copy(list) # 浅复制
copy.deepcopy(list) # 深复制

函数的参数作为引用时

Python 唯一支持的参数传递模式是共享传参,指函数的各个形式参数获得实参中各个引用的副本。 也就是说, 函数内部的形参是实参的别名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def f(a, b):
a += b
return a

x = 1
y = 2
f(x, y)
print(x, y) # x未改变

a = [1, 2]
b = [3, 4]
f(a, b)
print(a, b) # a改变

t = (10, 20)
u = (30, 40)
f(t, u)
print(t, u) # t未改变

不要使用可变类型作为参数的默认值,如:

1
2
def __init__(self, passengers=[]):
self.passengers = passengers

默认值在定义函数时计算(通常在加载模块时),因此默认值变成了函数对象的属性。因此,如果默认值是可变对象, 而且修改了它的值, 那么后续的函数调用都会受到影响。

默认值会储存在class.__init__.__defaults__的属性里(一个tuple)。

防御可变对象参数:

当参数是可变对象时,如果没有约定的情况下,最好创造一个可变参数的副本,如:

1
2
3
4
5
def __init__(self, passengers=None):
if passengers is None:
self.passengers = []
else:
self.passengers = list(passengers)

这样就不会影响外部的对象了。

del 和 垃圾回收

del语句删除名称,而不是对象。仅当删除的变量保存的是对象的最后一个引用, 或者无法得到对象时,del 命令才可能导致对象被当作垃圾回收。

在 CPython 中, 垃圾回收使用的主要算法是引用计数。 实际上, 每个对象都会统计有多少引用指向自己。 当引用计数归零时, 对象立即就被销毁: CPython 会在对象上调用 __del__ 方法(如果定义了),然后释放分配给对象的内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import weakref

def bye():
print('Gone with the wind...')

s1 = {1, 2, 3}
s2 = s1

ender = weakref.finalize(s1, bye)
print(ender.alive) # True
del s1
print(ender.alive) # True
s2 = 'spam' # Gone..
print(ender.alive) # False

弱引用

弱引用不会增加对象的引用数量,不会妨碍所指对象被当作垃圾回收。弱引用在缓存应用中很有用,因为我们不想仅因为被缓存引用而始终缓存对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> import weakref
>>> a_set = {0, 1}
>>> wref = weakref.ref(a_set) # 创建弱引用
>>> wref
<weakref at 0x0000021194B76598; to 'set' at 0x00000211969AF588>
>>> wref() # 返回被引用的对象,但因为是控制台会话,所以{0,1}会绑定给_变量
{0, 1}
>>> a_set = {0, 1, 2} # a_set 不再指代{0,1}集合,但是变量_仍然指代它
>>> wref()
{0, 1}
>>> _
{0, 1}
>>> wref() is None # 返回值False绑定给变量_,{0,1}没有强引用了
False
>>> wref() is None # {0,1}对象不存在了
True

weakref.ref类其实是低层接口,最好不要手动处理weakref.ref实例,而是使用weakref集合。

符合Python风格的对象

向量类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from array import array
import math


class Vector2d():
typecode = 'd'

def __init__(self, x, y):
self.x = float(x)
self.y = float(y)

def __iter__(self):
yield self.x
yield self.y
# return (i for i in (self.x, self.y))

def __repr__(self):
class_name = type(self).__name__
return '{}({!r},{!r})'.format(class_name, *self)

def __str__(self):
return str(tuple(str))

def __bytes__(self):
return (bytes([ord(self.typecode)]) +
bytes(array(self.typecode, self)))

def __eq__(self, other):
return tuple(self) == tuple(other)

def __abs__(self):
return math.hypot(self.x, self.y)

def __bool__(self):
return bool(abs(self))

@classmethod
def frombytes(cls, octets):
typecode = chr(octets[0])
memv = memoryview(octets[1:]).cast(typecode)
return cls(*memv)

classmethod与staticmethod

classmethod:

定义操作类的方法,第一个参数是类本身,最常见的用途是定义备选构造函数

staticmethod:

静态方法就是普通函数,只是碰巧在类的定义体中,而不是在模块层定义的。完全可以不用使用它。

1
2
3
4
5
6
7
8
9
10
11
12
class Demo():
@classmethod
def klassmeth(*args):
return args

@staticmethod
def statmeth(*args):
return args


print(Demo.klassmeth(1)) # (<class '__main__.Demo'>, 1)
print(Demo.statmeth(1)) # (1,)

格式化显示

委托给相应的 __format__(format_spec) 方法

1
2
3
4
brl = 1/2.43
print(brl) # 0.4115226337448559
print(format(brl, '.4f')) # 0.4115
print('1 BRL = {rate:.2f} USD'.format(rate=brl)) # 1 BRL = 0.41 USD

rate是字段名,.4f是格式规范微语言。

整数使用的代码有 ‘bcdoxXn’, 浮点数使用的代码有’eEfFgGn%’, 字符串使用的代码有 ‘s’。

格式规范微语言文档(https://docs.python.org/3/library/string.html#formatspec)

1
2
print(format(42, 'b'))  # b二进制 o八进制 x十六进制
print(format(2/3, '.1%')) # 66.7%

如果没有定义__format__方法,从object继承的方法会返回str(obj)。

print(format(v)) # (2.0, 3.0)

但是如果传入格式说明符,object.format方法会抛出TypeError。

print(format(v,'.2f)) # TypeError

给Vector2d定义__format__方法:

1
2
3
4
5
def __format__(self, fmt_spec=''):
compoments = (format(c, fmt_spec) for c in self)
return '({},{})'.format(*compoments)

print(format(v, '.2f')) # (2.00,3.00)

自定义格式代码:

假设我们自定义的代码为p,用来显示极坐标中的向量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def angle(self):
return math.atan2(self.y, self.x)

def __format__(self, fmt_spec=''):
if fmt_spec.endswith('p'):
fmt_spec = fmt_spec[:-1]
coords = (abs(self), self.angle())
outer_fmt = '<{},{}>'
else:
coords = self
outer_fmt = '({},{})'
compoments = (format(c, fmt_spec) for c in coords)
return outer_fmt.format(*compoments)


print(format(v, 'p')) # <3.605551275463989,0.982793723247329>

print(format(v, '.5fp')) # <3.60555,0.98279>

可散列的Vector2d

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# other code
def __init__(self, x, y):
self.__x = float(x) # 使用两个__把属性"标记"为私有的
self.__y = float(y)

@property # 把读值方法标记为特性
def x(self): # 方法名即公开属性名
return self.__x

@property
def y(self):
return self.__y

def __hash__(self):
return hash(self.x) ^ hash(self.y) # 最好使用位运算符异或(^)混合各分量的散列值

让这些向量不可变是有原因的, 因为这样才能实现hash 方法。

私有属性和“受保护”的属性

1
2
def __init__(self, x, y):
self.__x = float(x)

使用两个_作为前缀命名实例属性,Python会把属性名存入实例的dict属性中,而且会进行“名称改写”,如prop会被改写成_ClassNameprop。

1
2
3
v = Vector2d(3, 4)
print(v.__dict__) # {'_Vector2d__x': 3.0, '_Vector2d__y': 4.0},__dict__为实例的可读属性的键值字典
print(v.__dir__()) # __dir__()方法返回实例和类的所有属性名和方法名的list

也有人不喜欢这种不对称的名称,他们约定使用一个_前缀编写“受保护”的属性,如self._x。Python不会对这种属性名做特殊处理,这仅仅是程序员之间遵守的约定,他们不会在类外部访问这种属性。

使用slots类属性节省空间

创建一个类属性,使用 __slots__ 这个名字, 并把它的值设为一个字符串构成的可迭代对象。

1
2
class Vector2d:
__slots__ = ('__x', '__y')

作用是告诉解释器,这个类中的所有实例属性都在这儿了!

Python会在各个实例中使用类似元组的结构存储实例变量,从而避免使用消耗内存的dict属性。当同时有数百万个实例活动时,能节省大量内存。

副作用是定义了slots属性之后,实例不能再有slots 中所列名称之外的其他属性。 但故意这样禁止用户新增属性是不对的。

如果定义了slots属性,且想让对象支持弱引用,则必须把weakref添加到slots中。

解释器会忽略继承的slots属性,所以每个子类都要定义slots属性。

覆盖类属性

1
2
3
4
5
6
7
class Vector2d():
typecode = 'd'

v = Vector2d(3, 4)
print(v.typecode) # d
v.typecode = 'f'
print(v.typecode) # f

类属性为实例属性提供默认值,但是并不代表实例拥有此实例,如果为类属性赋值,类属性不会受影响,实例会创建同名的属性覆盖掉类属性。想要修改类属性,需要直接在类上修改,Vector2d.typecode = 'f'

序列的修改、散列和切片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from array import array
import reprlib
import math


class Vector():

typecode = 'd'

def __init__(self, components):
self._components = array(self.typecode, components)

def __iter__(self):
return iter(self._components)

def __repr__(self):
components = reprlib.repr(self._components)
components = components[components.find('['):-1]
return 'Vector({})'.format(components)

def __str__(self):
return str(tuple(self))

def __bytes__(self):
return (bytes([ord(self.typecode)]) +
bytes(self._components))

def __eq__(self, other):
return tuple(self) == tuple(other)

def __abs__(self):
return math.sqrt(sum(x ** 2 for x in self))

def __bool__(self):
return bool(abs(self))

@classmethod
def frombytes(cls, octets):
typecode = chr(octets[0])
memv = memoryview(octets[1:]).cast(typecode)
return cls(memv)

reprlib.repr() 函数获取 self._components 的有限长度表示形式(如 array(‘d’, [0.0, 1.0, 2.0, 3.0, 4.0, …])) 。

协议和鸭子类型

1
2
3
4
5
def __len__(self):
return len(self._components)

def __getitem__(self, index):
return self._components[index]

协议是非正式的接口, 只在文档中定义, 在代码中不定义。

例如, Python 的序列协议只需要 __len____getitem__ 两个方法。 任何类(如 Spam)只要实现了这两个方法, 就能用在任何期待序列的地方。

1
2
v = Vector([1, 2, 3, 4, 5])
print(v[1:2]) # array('d', [2.0])

可切片的序列

切片原理

1
2
3
4
5
6
7
8
9
10
class MySeq():
def __getitem__(self, index):
return index

s = MySeq()
print(s[1]) # 1
print(s[1:2]) # slice(1, 2, None)
print(s[1:2:3]) # slice(1, 2, 3)
print(s[1:2:3, 4]) # (slice(1, 2, 3), 4)。多维切片,__getitem__收到的是元组
print(s[1:2:3, 4:5]) # (slice(1, 2, 3), slice(4, 5, None))

slice.indices方法:

1
2
3
print(slice(None, 10, 2))  # slice(None, 10, 2)
print(slice(None, 10, 2).indices(5)) # (0, 5, 2)
print(slice(-3, None, None).indices(5)) # (2, 5, 1)

indices方法提供了内置序列实现的复杂“整顿”逻辑,用于优雅地处理索引缺失、负数、长度过长等情况。如果自定义的getitem最终不是依靠的底层序列,那么可以使用这个方法节省大量时间。

自定义切片的getitem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def __getitem__(self, index):
cls = type(self)
if isinstance(index, slice):
return cls(self._components[index]) # 返回当前类型的切片,而不是委托给数组返回一个数组的切片
elif isinstance(index, numbers.Integral):
return self._components[index]
else:
msg = '{cls.__name__} indices must be integers'
raise TypeError(msg.format(cls=cls))

s = Vector(range(100))
print(type(s[1])) # Vector
print(type(s[1:10])) # Vector
print(s[1:10, 2]) # TypeError

动态存取属性

使用x、y、z、t来取代v[0],v[1],v[2],v[3]。

1
2
3
4
5
6
7
8
9
def __getattr__(self, name):
cls = type(self)
if len(name) == 1:
pos = cls.shortcut_names.find(name)
if len(self._components) > pos >= 0:
return self._components[pos]
# .__name__会从cls中取值,!r以文本形式''
msg = '{.__name__!r} objects has no attribute {!r}'
raise AttributeError(msg.format(cls, name))

为了防止直接向v.x赋值而创建一个新的属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def __setattr__(self, name, value):
cls = type(self)
if len(name)=1:
if name in cls.shortcut_names:
error = 'readonly attribute {attr_name!r}'
elif name.islower():
error = "can't set attribute 'a' to 'z' in {cls_name!r}"
else:
error = ''

if error:
msg = error.format(cls_name=cls.__name__, attr_name=name)
raise AttributeError(msg)
super().__setattr__(name, value) # 在超类上调用 __setattr__ 方法, 提供标准行为

散列和快速等值测试

规约函数reduce,sum,any,all把序列或有限可迭代对象编程一个聚合结果。reduce函数的参数,第一个函数是接受两个参数的函数,第二个参数是一个可迭代的对象,第三个参数是初始值。

reduce(fn,lst):

fn(lst[0],lst[1]) -> r1
fn(r1,lst[2]) -> r2
fn(r2,lst[3]) -> r3
...

实现阶乘的三种方式:

1.

1
2
3
n = 0
for i in range(1, 6):
n ^= 1

2.

functools.reduce(lambda a, b: a ^ b, range(6))

3.

functools.reduce(operator.xor, range(6))

把Vector变成可散列的对象:

1
2
3
def __hash__(self):
hashes = (hash(x) for x in self._components)
return functools.reduce(operator.xor, hashes, 0)

修改 __eq__ 方法:

  • 使用zip函数

    1
    2
    3
    4
    5
    6
    7
    def __eq__(self, other):
    if len(self) != len(other):
    return False
    for a, b in zip(self, other):
    if a != b:
    return False
    return True
  • 使用all函数

    1
    2
    def __eq__(self, other):
    return len(self) == len(other) and all(a == b for a, b in zip(self, other))

出色的zip函数:

zip函数能并行迭代两个或多个可迭代对象,它返回的元组可以拆包成变量,分别对应各个并行输入中的一个元素。

1
2
3
4
5
6
from itertools import zip_longest

print(zip(range(3), 'ABC')) # <zip object at ...>
print(list(zip(range(3), 'ABC'))) # [(0, 'A'), (1, 'B'), (2, 'C')]
# [(0, 'A', 1), (1, 'B', 2), (2, 'C', 3), (-1, -1, 4)]
print(list(zip_longest(range(3), 'ABC', [1, 2, 3, 4], fillvalue=-1)))

itertools.chain函数:合并两个序列

1
2
3
4
5
6
import itertools

l1 = [1,2,3]
t1 = (4,5,6)
r1 = itertools.chain(l1,t1)
print(r1) #<itertools.chain object at ...>

接口:从协议到抽象基类

接口和协议

协议:非正式的接口,不能像正式接口那样施加限制,是Python实现多态的方式。一个类可依只实现部分接口,这是允许的。

1
2
3
4
5
6
7
class Foo:
def __getitem__(self, pos):
return range(0, 30, 10)[pos]

f = Foo()
f[1]
for i in f: print(i)

Foo类没有继承abc.Sequence,只实现了序列协议的一个方法:__getitem__,没有__iter__方法,但是仍然会后备使用__getitem__来进行迭代。没有实现__contains__方法,但是也能使用in运算。

使用猴子补丁在运行时实现协议

1
2
3
4
5
6
import random
from french_deck import FrenchDeck

f = FrenchDeck()
# 就地打乱, TypeError: 'FrenchDeck' object does not support item assignment
random.shuffle(f)

我们可以动态修正这个问题:

1
2
3
4
5
def set_card(self, position, card):
self._cards[position] = card

FrenchDeck.__setitem__ = set_card
random.shuffle(f)

猴子补丁: 在运行时修改类或模块, 而不改动源码。 猴子补丁很强大, 但是打补丁的代码与要打补丁的程序耦合十分紧密, 而且往往要处理隐藏和没有文档的部分

标准库中的抽象基类

collections.abc模块中的抽象基类

collections.abc:

collections.abc

Iterable、 Container 和 Sized:

各个集合应该继承这三个抽象基类, 或者至少实现兼容的协议。 Iterable 通过 __iter__ 方法支持迭代, Container 通过__contains__ 方法支持 in 运算符, Sized 通过 __len__ 方法支持len() 函数。

Sequence、 Mapping 和 Set:

这三个是主要的不可变集合类型, 而且各自都有可变的子类。

MappingView:

映射方法 .items()、 .keys() 和 .values() 返回的对象分别是 ItemsView、 KeysView 和 ValuesView 的实例。 前两个类还继承了 Set 类

Callable 和 Hashable:

没有Callable 或 Hashable 的子类。 这两个抽象基类的主要作用是为内置函数 isinstance 提供支持, 以一种安全的方式判断对象能不能调用或散列。若想检查是否能调用, 可以使用内置的 callable() 函数; 但是没有类似的 hashable() 函数, 因此测试对象是否可散列, 最好使用 isinstance(my_obj, Hashable)。

抽象基类的数字塔

numbers包定义的抽象基类是线性的层次结构,依次往下是,Number、Complex、Real、Rational、Integral。

检查是否为整数,isinstance(x,numbers.Integral)。

定义使用一个抽象基类

定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import abc

class Tombola(abc.ABC): #自己定义的抽象基类要继承 abc.ABC。
@abc.abstractmethod
def load(self, iterable):
"""从可迭代对象中添加元素。 """ #抽象方法使用 @abstractmethod 装饰器标记, 而且定义体中通常只有文档字符串。

@abc.abstractmethod
def pick(self):
"""随机删除元素, 然后将其返回。
如果实例为空, 这个方法应该抛出`LookupError`。
"""

def loaded(self): #抽象基类可以包含具体方法
"""如果至少有一个元素, 返回`True`, 否则返回`False`。 """
return bool(self.inspect())

def inspect(self):
"""返回一个有序元组, 由当前元素构成。 """
items = []
while True:
try:
items.append(self.pick())
except LookupError:
break
self.load(items) #抽象基类中的具体方法可以依赖抽象基类中的其他具体方法、 抽象方法或特性
return tuple(sorted(items))

错误的实现:

1
2
3
4
5
6
from tombola import Tombola

class Fake(Tombola):
pass

f = Fake() # TypeError: Can't instantiate abstract class Fake with abstract methods load, pick

正确的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  random
from tombola import Tombola

class BingoCage(Tombola):
def __init__(self, items):
self._randomizer = random.SystemRandom()
self._items = []
self.load(items)

def load(self, items):
self._items.extend(items)
self._randomizer.shuffle(self._items)

def pick(self):
try:
return self._items.pop()
except IndexError as e:
raise LookupError('pick from empty BingoCage')

def __call__(self):
self.pick()

虚拟子类:

在抽象基类上调用register方法,issubclass和isinstance等函数都能识别,但是注册的类不会从抽象基类中继承任何方法或属性。Python也不会做检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from random import randrange
from tombola import Tombola

@Tombola.register
class TomboList(list):
def pick(self):
if self:
position = randrange(len(self))
return self.pop(position)
else:
raise LookupError('pop from empty TomboList')

load = list.extend

def loaded(self):
return bool(self)

def inspect(self):
return tuple(sorted(self))

虚拟子类检查:

1
2
3
4
5
6
7
from tombola import Tombola
from tombolist import TomboList

print(issubclass(TomboList, Tombola)) # True

t = TomboList(range(100))
print(isinstance(t, Tombola)) # True

__mro__属性:方法解析顺序,按顺序列出类及其超类。

1
2
3
# (<class 'tombolist.TomboList'>, <class 'list'>, <class 'object'>)
print(TomboList.__mro__)
print(Tombola) # <class 'tombola.Tombola'>

Tombolist.__mro__ 中没有 Tombola, 因此 Tombolist 没有从Tombola 中继承任何方法。

继承内省属性:

1
2
3
4
5
# __subclasses__(),这个方法返回类的直接子类列表, 不含虚拟子类。
print(Tombola.__subclasses__()) # [<class 'bingo.BingoCage'>]

# _abc_registry,只有抽象基类有这个数据属性, 其值是一个 WeakSet 对象, 即抽象类注册的虚拟子类的弱引用。
print([x for x in Tombola._abc_registry]) # [<class 'tombolist.TomboList'>]

使用register

Tombola.register可以当做类装饰器使用,也可以当做函数调用Tombola.register(TomboList)。
这种做法更常见,可用于注册其他地方定义的类,例如,在collections.abc模块的源码中,
Sequence.register(tuple)
Sequence.register(str)
Sequence.register(range)
Sequence.register(memoryview)

__subclasshook__

1
2
3
4
5
6
7
8
from collections import abc

class Struggle():
def __len__(self):
return 23

print(issubclass(Struggle, abc.Sized)) # True
print(isinstance(Struggle(), abc.Sized)) # True

没有注册和继承,抽象基类也能把一个类识别为虚拟子类,因为abc.Sized实现了一个特殊的类方法,名为__subclasshook__。源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Sized(metaclass=ABCMeta):
__slots__ = ()

@abstractmethod
def __len__(self):
return 0

@classmethod
def __subclasshook__(cls, C):
if cls is Sized:
if any("__len__" in B.__dict__ for B in C.__mro__): # C及其超类__dict__属性里是否有__len__属性
return True
return NotImplemented

继承的优缺点

子类化内置类型很麻烦

直接子类化内置类型(如 dict、 list 或 str) 容易出错,因为内置类型的方法通常会忽略用户覆盖的方法(这种问题只发生在C语言实现的内置类型内部的方法委托上,而且只影响直接继承内置类型的用户自定义类)。

不要子类化内置类型, 用户自己定义的类应该继承 collections 模块(http://docs.python.org/3/library/collections.html) 中的类, 例如UserDict、 UserList 和 UserString, 这些类做了特殊设计, 因此易于扩展。

多重继承和方法解析顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class A:
def ping(self):
print('ping:', self)

class B(A):
def pong(self):
print('pong:', self)

class C(A):
def pong(self):
print('PONG:', self)

class D(B, C): # 声明中超类的顺序影响方法解析顺序
def ping(self):
super().ping()print('post-ping:', self)

def pingpong(self):
self.ping()
super().ping()
self.pong()
super().pong()
C.pong(self)

Python能区分子类调用的是哪个父类的方法,是因为Python会按照特定的顺序遍历继承图。这个顺序叫方法解析顺序(Method Resolution Order, MRO)。类都有一个名为__mro__的属性,它的值是一个元组,按照方法解析顺序列出各个超类,从当前类一直向上,直到object类。

1
D.__mro__   # (<class '__main__.D'>, <class '__main__.B'>, <class '__main__.C'>, <class '__main__.A'>, <class 'object'>)

也可以在子类中绕过方法解析顺序,直接调用某个超类的方法,比如,A.ping(self),而不是super().ping()。

正确重载运算符

可迭代的对象、迭代器和生成器

单词序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import re
import reprlib

RE_WORD = re.compile('\w+')

class Sentence():
def __init__(self, text):
self.text = text
self.words = RE_WORD.findall(text)

def __getitem__(self, index):
return self.words[index]

def __len__(self):
return len(self.words)

def __repr__(self):
return 'Sentence({})'.format(reprlib.repr(self.text))



from sentence import Sentence

s = Sentence('"The time has come," the Walrus said,')

print(s) # Sentence('"The time ha... Walrus said,')

for word in s:
print(word) # 可以迭代

print(list(s))

可以迭代的原因:

iter

从 Python 3.4 开始, 检查对象 x 能否迭代, 最准确的方法是:调用 iter(x) 函数(或直接进行迭代), 如果不可迭代, 再处理 TypeError 异常。 这比使用 isinstance(x, abc.Iterable) 更准确, 因为 iter(x)函数会考虑到遗留的 __getitem__ 方法, 而 abc.Iterable 类则不考虑。

可迭代的对象与迭代器的对比

使用 iter 内置函数可以获取迭代器的对象。 如果对象实现了能返回迭代器的 __iter__ 方法, 那么对象就是可迭代的。

可迭代的对象和迭代器之间的关系: Python 从可迭代的对象中获取迭代器。

1
2
3
s = 'ABC'
for c in s:
print(c)

字符串’ABC’是可迭代的对象,背后运行的是迭代器。

不使用for语句的话,就需要使用while和iter组合迭代。

1
2
3
4
5
6
7
8
9
10
s = 'ABC'

it = iter(s)
while True:
try:
print(next(it))
#或 print(it.__next__())
except StopIteration as e: # 如果没有下一个元素,抛出StopIteration
del it # 释放引用,废弃迭代器对象
break

标准的迭代器接口有两个方法:

__next__,返回下一个可用元素或无元素时抛出StopIteration异常。

__iter__,返回self,以便在应该使用可迭代对象的地方使用迭代器,例如在for循环中。

collections.abc.Iterator抽象基类中定义了__next__抽象方法。且这个抽象基类继承自collections.abc.Iterable类,这个类中定义了__iter__抽象方法。

iterable

1
2
3
i1 = id(s.__iter__())   # 返回字符串的迭代器
i2 = id(s.__iter__().__iter__()) # 迭代器返回自身
print(i1 == i2) # True

Python中的迭代器是一种协议,而不是某种特定的类型。所以判断一个对象x(或类)是否为迭代器最好的方式是调用isinstance(x, abc.Iterator)(或issubclass),得益于Iterator.__subclasshook__ 方法, 即使对象 x 所属的类不是Iterator 类的真实子类或虚拟子类, 也能这样检查。

1
2
3
4
5
6
7
8
9
class X():
def __iter__(self):
pass

def __next__(self):
pass

print(issubclass(X, collections.abc.Iterator)) #True,因为实现了__iter__和__next__协议
print(isinstance(X(), collections.abc.Iterable)) #True,因为事先了__iter__协议

迭代器的定义:

迭代器是这样的对象: 实现了无参数的 __next__ 方法, 返回序列中的下一个元素; 如果没有元素了, 那么抛出 StopIteration 异常。Python 中的迭代器还实现了 __iter__ 方法, 因此迭代器也可以迭代。

典型的迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Sentence():
# ...
def __iter__(self):
return SentenceIterator(self.words)


class SentenceIterator():
def __init__(self, words):
self.words = words
self.index = 0

def __next__(self):
try:
word = self.words[self.index]
except IndexError:
raise StopIteration()
self.index += 1
return word

def __iter__(self):
return self

为了“支持多种遍历”, 必须能从同一个可迭代的实例中获取多个独立的迭代器, 而且各个迭代器要能维护自身的内部状态, 因此这一模式正确的实现方式是, 每次调用 iter(my_iterable) 都新建一个独立的迭代器。 这就是为什么这个示例需要定义SentenceIterator 类。

可迭代的对象一定不能是自身的迭代器。 也就是说, 可迭代的对象必须实现 __iter__ 方法, 但不能实现 __next__ 方法。另一方面, 迭代器应该一直可以迭。 迭代器的 __iter__ 方法应该返回自身。

生成器函数

实现相同功能, 但却符合 Python 习惯的方式是, 用生成器函数代替SentenceIterator 类。

1
2
3
4
5
6
7
class Sentence():
# ...

def __iter__(self):
for word in self.words:
yield word # 产出当前的word
return # 可以不写,会触发生成器对象抛出 StopIteration 异常

生成器函数:只要 Python 函数的定义体中有 yield 关键字, 该函数就是生成器函数。 调用生成器函数时, 会返回一个生成器对象。 也就是说, 生成器函数是生成器工厂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def gen_123():
yield 1
yield 2
yield 3

g = gen_123() # 通过生成器函数创建一个新的生成器
print(g) # <generator object gen_123 at ...>
for i in g: # 迭代生成器时,执行函数到定义体中的下一个yield语句暂停住,返回产出的值
print(i)

def gen_AB():
print('start')
yield 'A'
print('continue')
yield 'B'
print('end')

for i in gen_AB():
print('for:',i)

打印结果:

1
2
3
4
5
start
for: A
continue
for: B # 执行了两次for循环中的语句
end # 生成器前进到了生成器函数的末尾,但是没有产出返回值(yield关键字),生成器对象抛出 StopIteration 异常。 for机制会捕获异常, 因此循环终止时没有报错。

惰性实现

1
2
3
4
5
6
7
class Sentence():
# ...

def __iter__(self):
# finditer函数构建一个迭代器,包含self.text中匹配RE_WORD的单词,产出MatchObject实例。
for match in RE_WORD.finditer(self.text):
yield match.group() # match.group()方法从MatchObject实例中提取匹配正则表达式的具体文本。

生成器表达式

生成器表达式可以理解为列表推导的惰性版本: 不会迫切地构建列表,而是返回一个生成器, 按需惰性生成元素。 如果说列表推导是制造列表的工厂,那么生成器表达式就是制造生成器的工厂。

1
2
def __iter__(self):
return (match.group() for match in RE_WORD.finditer(self.text))

这里不再是生成器函数了(没有yield),而是使用生成器表达式构建生成器,然后将其返回。

生成器表达式是语法糖: 完全可以替换成生成器函数。

何时使用生成器表达式

如果生成器表达式要分成多行写,就使用生成器函数,更灵活,提高可读性,可重用,且可以作为协程使用。

如果函数或构造函数只有一个参数,传入的生成器表达式不用自带一对括号了,只要有一对函数的括号就行了。

等差数列生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
def aritprog_gen(begin, step, end=None):
result = type(begin + step)(begin) # 转换类型
forever = end is None # 无穷
index = 0
while forever or result < end:
yield result
index += 1
result = begin + step * index # 没有不断的累加step的值,为了降低处理浮点数时累积效应致错的风险

i = aritprog_gen(0, 1.5)
print(next(i))
print(next(i))
... # 可以无穷迭代

itertools模块生成等差数列:

1
2
3
gen = itertools.count(0, 1.5)
print(next(gen))
print(next(gen))

itertools.count函数生成无穷等差数列,作用与上面的函数相同。

itertools.takewhile函数会生成一个使用另一个生成器的生成器, 在指定的条件计算结果为 False 时停止。 因此, 可以把这两个函数结合在一起使用。

1
2
gen = itertools.takewhile(lambda n: n < 5, itertools.count(0, 1.5))   # n是每次返回的产出值
[print(g) for g in gen]

重写aritprog_gen函数:

1
2
3
4
5
6
def aritprog_gen(begin, step, end=None):
result = type(begin + step)(begin)
gen = itertools.count(begin, step)
while end is not None:
gen = itertools.takewhile(lambda n: n < end, gen)
return gen

标准库中的生成器函数

用于“过滤”的生成器函数:

过滤生成器

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import itertools

def vowel(c):
return c.lower() in 'aeiou'

s = 'Aardvark'
print(list(filter(vowel, s))) # ['A', 'a', 'a']
print(list(itertools.filterfalse(vowel, s))) # ['r', 'd', 'v', 'r', 'k']

print(list(itertools.dropwhile(vowel, s))) # ['r', 'd', 'v', 'a', 'r', 'k']
print(list(itertools.takewhile(vowel, s))) # ['A', 'a']

print(list(itertools.compress(s, (1, 0, 1, 1, 0, 1)))) # ['A', 'r', 'd', 'a']
print(list(itertools.islice(s, 4))) # ['A', 'a', 'r', 'd']
print(list(itertools.islice(s, 4, 7))) # ['v', 'a', 'r']
print(list(itertools.islice(s, 1, 7, 2))) # ['a', 'd', 'a']

takewhile和filter的区别:

1
2
3
4
5
def condition(e):
return e < 5

i = itertools.takewhile(condition, itertools.count(0, 2)) # [0, 2, 4] 达成条件则停止迭代
i2 = filter(condition, itertools.count(0, 1.5)) # 会卡死,应用到可迭代对象的所有元素,决定是否返回

用于”映射”的生成器函数:

映射生成器

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import itertools
import operator

sample = [5, 4, 2, 7, 0, 3]

print(list(itertools.accumulate(sample))) # [5, 9, 11, 18, 18, 21],求和
print(list(itertools.accumulate(sample, min))) # [5, 4, 2, 2, 0, 0],求最小值
print(list(itertools.accumulate(sample, max))) # [5, 5, 5, 7, 7, 7],求最大值
# [1, 2, 6, 24, 120],计算1-5的阶乘
print(list(itertools.accumulate(range(1, 6), operator.mul)))

# [(1, 'a'), (2, 'b'), (3, 'c')],从start(1)开始给每个元素编号
print(list(enumerate('abc', 1)))
# [0, 1, 4, 9, 16, 25] 映射求积,到短的迭代对象耗尽位置
print(list(map(operator.mul, range(6), range(7))))
# ['a', 'bb', 'ccc'] 对一对元素求积
print(list(itertools.starmap(operator.mul, zip((1, 2, 3), ('a', 'b', 'c')))))
# ['a', 'bb', 'ccc'] 同上
print(list(itertools.starmap(operator.mul, enumerate('abc', 1))))
# [5.0, 4.5, 3.6666666666666665, 4.5, 3.6, 3.5],求阶段平均值
print(list(itertools.starmap(lambda a, b: b/a,
enumerate(itertools.accumulate(sample), 1))))

用于“合并”的生成器函数:

合并生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
print(list(itertools.chain('ABC', range(2))))  # ['A', 'B', 'C', 0, 1]
# [0, 'A', 1, 'B', 2, 'C']
print(list(itertools.chain.from_iterable(enumerate('ABC'))))
print(list(zip('ABC', range(5)))) # [('A', 0), ('B', 1), ('C', 2)]
# [('A', 0), ('B', 1), ('C', 2), ('?', 3), ('?', 4)]
print(list(itertools.zip_longest('ABC', range(5), fillvalue='?')))

itertools.product生成器是计算笛卡尔积的惰性方式

print(list(itertools.product('AB'))) # [('A',), ('B',)]
# [('A', 0), ('A', 1), ('B', 0), ('B', 1)]
print(list(itertools.product('AB', range(2))))
# [('A', 'A'), ('A', 'B'), ('B', 'A'), ('B', 'B')]
print(list(itertools.product('AB', repeat=2)))
print(list(itertools.product('AB', range(2), repeat=2)))

把输入的各个元素扩展成多个输出元素的生成器函数:

扩展生成器

1
2
3
4
5
6
7
8
9
10
11
ct = itertools.count(5)
ns = next(ct), next(ct), next(ct)
print(ns) # (5, 6, 7)。不能使用ct构建列表,因为ct是无穷的
print(list(itertools.islice(itertools.count(), 4))) # [0, 1, 2, 3],指定大于4的数将会抛出异常
cy = itertools.cycle('ABC')
print(next(cy)) # A
print(list(itertools.islice(cy, 5))) # ['B', 'C', 'A', 'B', 'C']
rp = itertools.repeat(7, 3)
print(next(rp), next(rp), next(rp)) # 7 7 7
# [0.0, 1.5, 3.0, 4.5, 6.0]
print(list(map(operator.mul, range(5), itertools.repeat(1.5))))

扩展生成器中的“组合学”生成器函数:

1
2
3
4
5
6
# [('A', 'B'), ('A', 'C'), ('B', 'C')]
print(list(itertools.combinations('ABC', 2)))
# [('A', 'A'), ('A', 'B'), ('A', 'C'), ('B', 'B'), ('B', 'C'), ('C', 'C')]
print(list(itertools.combinations_with_replacement('ABC', 2)))
# [('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'C'), ('C', 'A'), ('C', 'B')]
print(list(itertools.permutations('ABC', 2)))

用于重新排列元素的生成器函数:

排列生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
print(list(itertools.groupby('AAABB')))
# [('A', <itertools._grouper object at ...>), ('B', <itertools._grouper object at ...>)]
for key, group in itertools.groupby('AAABB'):
print(key, '->', list(group))
# A -> ['A', 'A', 'A']
# B -> ['B', 'B']
animals = 'rat duck bat eagle shark bear'.split()
animals.sort(key=len) # 为了使用groupby函数,输入要有序
for length, group in itertools.groupby(reversed(animals), len):
print(length, '->', list(group))
# 5 -> ['shark', 'eagle']
# 4 -> ['bear', 'duck']
# 3 -> ['bat', 'rat']

g1, g2 = itertools.tee('ABC')
print(next(g1)) # A
print(list(g2)) # ['A', 'B', 'C']

yield from

用于在生成器函数中方便的产出另一个生成器生成的值。

1
2
3
4
5
6
7
8
9
10
11
12
def chain(*iterables):
for it in iterables:
for i in it:
yield i

print(list(chain('ABC',range(3))))

def chain2(*iterables):
for it in iterables:
yield from it

print(list(chain2('ABC',range(3))))

可迭代的归约函数

归约函数

这里的每个内置函数都可以用functools.reduce实现,内置是为了方便使用。此外,对all和any函数来说,有一项重要的优化措施是 reduce 函数做不到的: 这两个函数会短路。

1
2
3
4
5
6
7
8
9
10
11
print(all([1, 2, 3]))  # True
print(all([1, 0, 3])) # False
print(all([])) # True

print(any([1, 2, 3])) # True
print(any([1, 0, 3])) # True
print(any([])) # False

g = (n for n in [0, 0.0, 7, 8])
print(any(g)) # True,一旦确定结果就立即停止使用迭代器
print(next(g)) # 8,剩余的元素

深入分析iter函数

1
2
3
4
5
6
7
def d6():
return randint(1, 6)

d6_iter = iter(d6, 1) # 不断调用第一个参数(没有参数的可调用对象),产出各个值
print(d6_iter) # <callable_iterator object at ...>
for roll in d6_iter:
print(roll) # 当可调用的对象返回这个值时,触发迭代器抛出StopIteration异常,而不产出(打印)哨符

上下文管理器和else块

if之外的else块

先做这个,再做那个。

1
2
3
4
5
for i in range(10):
if i == 15:
break
else:
print('else') # "如果中断了就不做了"。仅当for循环运行完毕时(即for循环没有被break语句中止)才运行else块。

while:

仅当while循环因为条件为假值而退出时(即while循环没有被break语句中止)才运行else块。

try:

仅当 try 块中没有异常抛出时才运行else块。官方文档还指出:“else 子句抛出的异常不会由前面的 except 子句处理。”

EAFP风格:

EAFP和LBYL

上下文管理器和with块

with 语句的目的是简化 try/finally 模式。 这种模式用于保证一段代码运行完毕后执行某项操作, 即便那段代码由于异常、 return 语句或sys.exit() 调用而中止, 也会执行指定的操作。

上下文管理器协议包含 __enter____exit__ 两个方法。 with 语句开始运行时, 会在上下文管理器对象上调用 __enter__ 方法。 with 语句运行结束后, 会在上下文管理器对象上调用 __exit__ 方法, 以此扮演 finally 子句的角色。

上下文管理器举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class LookingGlass():

def __enter__(self):
import sys
self.origin_write = sys.stdout.write
sys.stdout.write = self.reverse_write # 猴子补丁
return 'JABBERWOCKY' # __enter__方法一般返回上下文管理器自己,也可返回别的对象

def reverse_write(self, text):
self.origin_write(text[::-1])

def __exit__(self, exc_type, exc_value, traceback):
"""
exc_type:异常类(例如ZeroDivisionError)
exc_value:异常实例。有时会有参数传给异常构造方法,例如错误消息,这些参数可以使用exc_value.args获取
"""
import sys # 重复导入模块不会消耗很多资源,因为Python会缓存导入的模块
sys.stdout.write = self.origin_write
if exc_type is ZeroDivisionError:
print('Please DO NOT divide by zero')
return True # 如果__exit__方法返回None,或者True之外的值,with块中的任何异常都会向上冒泡
1
2
3
4
5
6
7
from mirror import LookingGlass

with LookingGlass() as g:
print('Hallo') # ollaH,打印出的内容是反向的
print(g) # YKCOWREBBAJ,g就是

print(g) # JABBERWOCKY,已经调用__exit__方法恢复打印方法

使用@contextmanager

contextlib.contextmanager 装饰器会把函数包装成实现 __enter____exit__ 方法的类:yield 语句前面的所有代码在 with 块开始时(即解释器调用 __enter__ 方法时) 执行, yield 语句后面的代码在with 块结束时(即调用 __exit__ 方法时) 执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import contextlib

@contextlib.contextmanager
def looking_glass():
import sys
original_write = sys.stdout.write

def reverse_write(text):
original_write(text[::-1]) # 在闭包中可以访问original_write

sys.stdout = reverse_write
yield 'JABBERWOCKY'
sys.stdout.write = original_write


from mirror import looking_glass

with looking_glass() as g:
print('abc') # cba
print(g) # YKCOWREBBAJ

print('abc') # abc
print(g) # JABBERWOCKY

上述示例有一个严重的错误:如果在with块中抛出了异常,Python 解释器会将其捕获,然后在 looking_glass函数的yield表达式里再次抛出。但是,那里没有处理错误的代码,因此looking_glass函数会中止,永远无法恢复成原来的sys.stdout.write方法。修复代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@contextlib.contextmanager
def looking_glass():
import sys
original_write = sys.stdout.write

def reverse_write(text):
original_write(text[::-1])

sys.stdout.write = reverse_write
try:
yield 'JABBERWOCKY' #要把yield语句放在try/finally语句中(或者放在with语句中),因为我们永远不知道上下文管理器的用户会在with块中做什么
except ZeroDivisionError: #假设的异常
msg = 'Please DO NOT divide by zero!'
finally:
sys.stdout.write = original_write
if msg:
print(msg)

协程

用作协程的生成器的基本行为

1
2
3
4
5
6
7
8
9
def simple_coroutine():
print('->coroutine started')
x = yield # 右边没有表达式,产出None
print('->coroutine received:', x)

c = simple_coroutine()
print(c) # <generator object simple_coroutine at ...>
print(next(c)) # ->coroutine started None
c.send('12') # ->coroutine received: 12 抛出StopIteration

协程的四种状态,可以用inspect.getgeneratorstate(…)函数确定:

  1. ‘GEN_CREATED’,等待开始执行
  2. ‘GEN_RUNNING’,解释器正在执行
  3. ‘GEN_SUSPENDED’,在yield表达式处暂停
  4. ‘GEN_CLOSED’,执行结束

仅当协程处于暂停时才能调用send方法,因此刚创建的协程必须先用next(coro)激活,也可用coro.send(None),这一步成为“预激”(即让协程向前执行到第一个yield表达式)。

使用协程计算移动平均值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count

a = averager()
next(a)
print(a.send(10))
print(a.send(12))

预激协程的装饰器

自定义一个装饰器:

1
2
3
4
5
6
7
8
9
from functools import wraps

def coroutine(func):
@wraps(func)
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer

使用:

1
2
3
4
5
from coroutil import coroutine

@coroutine
def averager():
...

终止协程和异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class DemoException(Exception):
pass

def demo_exc_handing():
print('coroutine started')
try:
while True:
try:
x = yield
except DemoException: # 特别处理此异常
print('DemoException handled,Continuning...')
else: # 如果没有异常,执行打印
print('coroutine received:{!r}'.format(x))
finally: # 未处理的其他异常,协程结束,执行清理等
print('coroutine ending')


d = demo_exc_handing()
next(d) # coroutine started
d.send('abc')
d.send(2.00)
d.throw(DemoException) # DemoException handled,Continuning...
d.throw(ZeroDivisionError) # 先coroutine ending,后抛出ZeroDivisionError

generator.throw(exc_type[, exc_value[, traceback]]) :

致使生成器在暂停的 yield 表达式处抛出指定的异常。

如果生成器处理了抛出的异常,代码会继续执行到下一个yield表达式,而产出的值会成为调用generator.throw方法得到的返回值。

如果生成器没有处理抛出的异常,协程会停止,即状态变成’GEN_CLOSED’,异常会向上冒泡,传到调用方的上下文中。

generator.close()

致使生成器在暂停的yield表达式处抛出GeneratorExit异常。

如果生成器没有处理这个异常,或者抛出了StopIteration异常(通常是指运行到结尾),调用方不会报错。

如果收到 GeneratorExit 异常,生成器一定不能产出值,否则解释器会抛出 RuntimeError 异常。

生成器抛出的其他异常会向上冒泡, 传给调用方。

让协程返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from collections import namedtuple

Result = namedtuple('Result', 'count average')


def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None: # 为了返回值,协程必须停止,因此加一个条件
break
total += term
count += 1
average = total/count

return Result(count, average)

a = averager()
next(a)
print(a.send(10)) # None,没有产出值
print(a.send(13))
print(a.send(None)) # StopIteration: Result(count=2, average=11.5)

异常对象的 value 属性保存着返回的值。

注意,return表达式的值会偷偷传给调用方,赋值给StopIteration异常的一个属性。 这样做有点不合常理。

更合理的获取:

1
2
3
4
try:
a.send(None)
except StopIteration as e:
print(e.value) # Result(count=2, average=11.5)

使用yield from

yield from x 表达式对 x 对象所做的第一件事是, 调用 iter(x), 从中获取迭代器。 因此, x 可以是任何可迭代的对象。

yield from 的主要功能是打开双向通道, 把最外层的调用方与最内层的子生成器连接起来, 这样二者可以直接发送和产出值, 还可以直接传入异常, 而不用在位于中间的协程中添加大量处理异常的样板代码。 有了这个结构, 协程可以把职责委托给子生成器。

1
2
3
4
5
6
7
8
9
10
11
12
def gen():
for c in 'AB':
yield c
for i in range(1, 3):
yield i

def gen_corountine():
yield from 'AB'
yield from range(1, 3)

print(list(gen()))
print(list(gen_corountine()))

术语说明:

  • 委派生成器:包含 yield from 表达式的生成器函数
  • 子生成器:从 yield from 表达式中 部分获取的生成器
  • 调用方:指代调用委派生成器的客户端代码。 即“客户端”

结构示意图:

协程

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from collections import namedtuple

Result = namedtuple('Result', 'count average')

def averager():
"""子生成器"""
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None: # 子生成器的退出条件
break
total += term
count += 1
average = total/count
return Result(count, average) # 结束时抛出StopIteration异常,把返回值赋到value属性上


def grouper(result, key):
"""委派生成器"""
while True: # 继续迭代,避免group实例执行迭代结束时抛出StopIteration异常
# grouper会在yield from表达式处暂停,发送的每个值都会经由yield from处理,通过管道传给averager实例,等待averager实例执行完毕。
# 抑制子生成器抛出的StopIteration异常,接收averager实例的返回值(从StopIteration异常的value属性中获取)
result[key] = yield from averager()


def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group) # 预激委派生成器,使委派生成器自身在yield from处暂停,接着委派生成器预激子生成器执行到第一个yeild语句处
for value in values:
# 传入的值最终到达averager函数中term = yield那一行,grouper并不知道传入的值是什么
group.send(value)
# 重要!结束averager函数执行,将返回值付给grouper的result变量。
# 如果没有这一句,在下一次循环时,新创建的grouper实例将会覆盖到group变量上,前一个实例(以及它创建的尚未终止的averager子生成器实例)被垃圾回收程序回收
group.send(None)
print(results)


data = {
'girls;kg': [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m': [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

if __name__ == '__main__':
main(data)

yield from链:

一个委派生成器使用yield from调用一个子生成器,而那个子生成器本身也是委派生成器,使用yield from调用另一个子生成器,以此类推。最终,这个链条要以一个只使用 yield表达式的简单生成器(或任何可迭代的对象)结束;任何yield from链条都必须由客户驱动,在最外层委派生成器上调用next(…)函数或.send(…)方法。可以隐式调用,例如使用for循环。

yield from 的意义

yield from的特性:

  1. 子生成器产出的值都直接传给委派生成器的调用方(即客户端代码)
  2. 使用 send() 方法发给委派生成器的值都直接传给子生成器。 如果发送的值是 None, 那么会调用子生成器的 __next__() 方法。 如果发送的值不是 None, 那么会调用子生成器的 send() 方法。 如果调用的方法抛出 StopIteration 异常, 那么委派生成器恢复运行。 任何其他异常都会向上冒泡, 传给委派生成器。
  3. 生成器退出时, 生成器(或子生成器) 中的 return expr 表达式会触发 StopIteration(expr) 异常抛出。
  4. yield from 表达式的值是子生成器终止时传给 StopIteration异常的第一个参数。
  5. 传入委派生成器的异常, 除了 GeneratorExit 之外都传给子生成器的 throw() 方法。 如果调用 throw() 方法时抛出StopIteration 异常, 委派生成器恢复运行。 StopIteration 之外的异常会向上冒泡, 传给委派生成器
  6. 如果把 GeneratorExit 异常传入委派生成器, 或者在委派生成器上调用 close() 方法, 那么在子生成器上调用 close() 方法, 如果它有的话。 如果调用 close() 方法导致异常抛出, 那么异常会向上冒泡, 传给委派生成器; 否则, 委派生成器抛出GeneratorExit 异常

yeidl from结构的伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
EXPR = 'AB'
import sys

def yield_demo():
_i = iter(EXPR) # EXPR可以是任何可迭代对象

try:
_y = next(_i) # 预激,保存第一次产出的结果
except StopIteration as e:
_r = e.value # 最简单情况下的返回值(RESULT)
else:
while 1: # 运行这个循环时,委派生成器会阻塞,只作为调用方和子生成器之间的通道
try:
_s = yield _y # 产出上次子生成器的产出值,等待调用方发送值,保存在_s中
except GeneratorExit as _e: # 用于关闭子生成器和委派生成器自己
try:
_m = _i.close
except AttributeError: # 子生成器可能没有close方法(比如字符串),静默失败
pass
else:
_m()
raise _e
except BaseException as _e: # 处理调用方通过.throw(...)方法传入的异常
_x = sys.exc_info()
try:
_m = _i.throw()
except AttributeError: # 子生成器可能没有throw方法,委派生成器抛出异常
raise _e
else: # 有throw方法,调用
try:
# 子生成器可能会处理掉出入的异常(然后继续循环),也可能抛出StopIteration异常,也可能抛出其他类型的异常
_y = _m(*_x)
except StopIteration as _e: # 处理StopIteration异常,获取返回值。如果是其他异常直接冒泡
_r = _e.value
break
else:
try:
if _s is None:
_y = next(_i)
else:
_y = _i.send(_s)
except StopIteration as _e:
_r = _e.value
break

return _r

离散事件仿真

离散事件仿真(DES),定义:

DES

出租车队运营仿真,代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from collections import namedtuple
from queue import PriorityQueue

Event = namedtuple('Event', 'time proc action')
# time 是事件发生的仿真时间
# proc 是出租车进程实例的编号
# action 描述活动的字符串


def taxi_process(ident, trips, start_time=0):
"""
每辆出租车调用一次taxi_process函数,创建一个生成器对象,表示各辆出租车的运营过程。
ident是出租车的编号,trips是出租车回家之前的行程数量,start_time是出租车离开车库的时间。
每次改变状态时传入一个time,把控制权让给仿真器,每次结束时返回一个Event对象
"""
time = yield Event(start_time, ident, 'leave garage')
for i in range(trips): # 每次行程运行一次
time = yield Event(time, ident, 'pick up passenger')
time = yield Event(time, ident, 'drop off passenger')
yield Event(time, ident, 'going home')
# 一个出租车进程结束


class Simulator:
def __init__(self, procs_map):
self.events = PriorityQueue() # 优先队列,可以按照指定优先顺序(item[0],即time)取值
self.procs = dict(procs_map) # 副本

def run(self, end_time):
# 预激、排定各辆出租车的第一个事件
for _, proc in sorted(self.procs.items()):
first_event = next(proc)
self.events.put(first_event)

sim_time = 0 # sim_time(仿真钟)归零
while sim_time < end_time:
if self.events.empty():
print('*** end of events ***')
break

current_event = self.events.get() # 获取优先队列中time属性最小的Event对象
# 每次循环时仿真钟不会以固定的量推进,是根据各个事件持续的时间推进
sim_time, proc_id, previous_action = current_event
print('taxi', proc_id, proc_id*' ', current_event)
active_proc = self.procs[proc_id]
next_time = sim_time + 2 # compute_duration(previous_action)
try:
next_event = active_proc.send(next_time)
except StopIteration:
del self.procs[proc_id]
else:
self.events.put(next_event)
else:
# 循环由于仿真事件到而退出,显示待完成的事件数量
msg = '*** end of simulation time: {} events pending ***'
print(msg.format(self.events.qsize()))


taxis = {i: taxi_process(i, (i+1)*2, i*5) for i in range(3)}
sim = Simulator(taxis)
sim.run(40)

事件:

驱动型框架(如 Tornado 和 asyncio) 的运作方式: 在单个线程中使用一个主循环驱动协程执行并发活动。 使用协程做面向事件编程时, 协程会不断把控制权让步给主循环, 激活并向前运行其他协程, 从而执行各个并发活动。

使用期物处理并发

网络下载的三种风格

依序下载的脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import os
import time
import sys

import requests # 不在标准库,一个空行隔开

POP20_CC = ('CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'


def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
if not os.path.exists(DEST_DIR):
os.makedirs(DEST_DIR)
with open(path, 'wb') as fp:
fp.write(img)


def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = requests.get(url) # 阻塞性I/O操作
return resp.content


def show(text):
print(text, end=' ')
sys.stdout.flush() # 刷新sys.stdout,这样能在一行消息中看到进度。在Python在正常情况下,遇到换行才会刷新stdout缓冲


def download_many(cc_list):
for cc in sorted(cc_list):
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower()+'.gif')
return len(cc_list)


def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time()-t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))


if __name__ == '__main__':
main(download_many)

使用concurrent.futures模块下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from concurrent import futures

from flags import save_flag, get_flag, show, main

MAX_WORKERS = 20

def download_one(cc):
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower()+'.gif')
return cc

def download_many(cc_list):
wokers = min(MAX_WORKERS, len(cc_list))
with futures.ThreadPoolExecutor(wokers) as executor:
# download_one函数会在多个线程中并发调用;map方法返回一个生成器,因此可以迭代,获取各个函数返回的值
res = executor.map(download_one, sorted(cc_list))
return len(list(res))

if __name__ == '__main__':
main(download_many)

期物在哪里

标准库中有两个名为 Future 的类: concurrent.futures.Future 和 asyncio.Future。 这两个类的作用相同: 两个 Future 类的实例都表示可能已经完成或者尚未完成的延迟计算。 这与 Twisted 引擎中的 Deferred 类、 Tornado 框架中的Future 类, 以及多个 JavaScript 库中的 Promise 对象类似。

使用as_completed函数改写download_many函数,来理解期物:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def download_many(cc_list):
cc_list = cc_list[:5]
with futures.ThreadPoolExecutor(3) as executor:
to_do = []
for cc in cc_list:
# submit方法排定可调用对象的执行时间,返回一个期物,表示这个待执行(或正执行)的操作
future = executor.submit(download_one, cc)
to_do.append(future)
msg = 'Scheduled for {}:{}'
print(msg.format(cc, future))

results = []

# as_completed 函数在期物运行结束后产出期物
for future in futures.as_completed(to_do):
res = future.result() # 获取该期物(已结束,不会阻塞)的结果
msg = '{} result:{!r}'
print(msg.format(future, res))
results.append(res)

return len(results)

# 打印:
# Scheduled for CN:<Future at 0x232955c9860 state=running>
# Scheduled for IN:<Future at 0x232955c9f28 state=running>
# Scheduled for US:<Future at 0x232955d9550 state=running> 有三个工作线程,前三个期物的状态是running
# Scheduled for ID:<Future at 0x232955d9a90 state=pending>
# Scheduled for BR:<Future at 0x232955d9b38 state=pending> 后两个是pending,等待有线程可用
# IN <Future at 0x232955c9f28 state=finished returned str> result:'IN' 第一个IN是download_one的子线程打印的,第二个'IN'是主线程获得返回后打印的
# CN <Future at 0x232955c9860 state=finished returned str> result:'CN'
# ID <Future at 0x232955d9a90 state=finished returned str> result:'ID'
# BR US <Future at 0x232955d9b38 state=finished returned str> result:'BR' 子线程都先把download_one的show执行了,然后返回结果。主线程遍历打印了结果
# <Future at 0x232955d9550 state=finished returned str> result:'US'

# 5 flags downloaded in 0.88s

严格来说, 我们目前测试的并发脚本都不能并行下载。 使用concurrent.futures 库实现的两个示例受 GIL(Global InterpreterLock, 全局解释器锁) 的限制, 而 flags_asyncio.py 脚本在单个线程中运行。(GIL几乎对 I/O 密集型处理无害)。

阻塞性I/O和GIL

Python 线程受 GIL的限制, 任何时候都只允许运行一个线程,但 flags_threadpool.py 脚本的下载速度仍比 flags.py 脚本快 5倍。flags_asyncio.py 脚本和 flags.py 脚本都在单个线程中运行, 前者仍比后者快 5 倍。

Python 标准库中的所有阻塞型 I/O 函数都会释放 GIL, 允许其他线程运行。 time.sleep() 函数也会释放 GIL。 因此, 尽管有GIL, Python 线程还是能在 I/O 密集型应用中发挥作用。

CPython 解释器本身就不是线程安全的, 因此有全局解释器锁(GIL) ,一次只允许使用一个线程执行 Python 字节码。 因此, 一个 Python 进程通常不能同时使用多个 CPU 核心。这是 CPython 解释器的局限, 与 Python 语言本身无关。 Jython 和 IronPython 没有这种限制。不过, 目前最快的 Python 解释器 PyPy 也有 GIL。

使用concurrent.futures模块启动进程

在CPU密集型作业中使用concurrent.futures模块轻松绕开GIL,ProcessPoolExecutor 和 ThreadPoolExecutor 类都实现了通用的Executor 接口, 因此使用ProcessPoolExecutor能特别轻松地把基于线程的方案转成基于进程的方案。而在下载国旗的示例或其他I/O密集型作业使用ProcessPoolExecutor类得不到任何好处。

实验Executor.map方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from time import sleep, strftime
from concurrent import futures

def display(*args):
print(strftime('[%H:%M:%S]'), end=' ')
print(*args)

def loiter(n):
msg = '{}loiter({}):start'
display(msg.format('\t'*n, n, n))
sleep(n)
msg = '{}loiter({}):done'
display(msg.format('\t'*n, n))
return n*10

def main():
display('Starting')
with futures.ThreadPoolExecutor(3) as exec:
results = exec.map(loiter, range(5))
display('results', results)
display('Waiting for individual results:')
for i, result in enumerate(results):
display('{}result {}:{}'.format('\t'*i, i, result))

main()

# 打印:
# [18:55:54] Starting
# [18:55:54] loiter(0):doint nothing for 0s
# [18:55:54] loiter(0):done #sleep 函数总会释放 GIL,这一句可能在loiter(1)之前,也可能在后
# [18:55:54] loiter(1):doint nothing for 1s
# [18:55:54] loiter(2):doint nothing for 2s #因为线程池中有三个职程,可以并发运行三个函数
# [18:55:54] results <generator object Executor.map.<locals>.result_iterator at ...> #目前不会阻塞
# [18:55:54] Waiting for individual results:
# [18:55:54] result 0:0
# [18:55:54] loiter(3):doint nothing for 3s #第一个线程可以启动第四个线程,运行loiter(3)。
# [18:55:55] loiter(1):done
# [18:55:55] loiter(4):doint nothing for 4s
# [18:55:55] result 1:10
# [18:55:56] loiter(2):done
# [18:55:56] result 2:20
# [18:55:57] loiter(3):done
# [18:55:57] result 3:30
# [18:55:59] loiter(4):done
# [18:55:59] result 4:40

map的特性:这个函数返回结果的顺序与调用开始的顺序一致,如果后调用的函数早完成则会处于阻塞状态。只能处理参数不同的同一个可调用对象。

executor.submit和futures.as_completed组合更灵活,不管提交顺序,只要有结果就获取。

futures.as_completed能处理的期物集合可以来自多个Executor实例。

线程和多进程的替代方案

如果futures.ThreadPoolExecutor 类对某个作业来说不够灵活, 可能要使用 threading 模块中的组件(如 Thread、 Lock、 Semaphore 等)自行制定方案, 比如说使用 queue 模块创建线程安全的队列, 在线程之间传递数据。 futures.ThreadPoolExecutor 类已经封装了这些组件。

对 CPU 密集型工作来说, 要启动多个进程, 规避 GIL。 创建多个进程最简单的方式是, 使用 futures.ProcessPoolExecutor 类。 不过和前面一样, 如果使用场景较复杂, 需要更高级的工具。 multiprocessing 模块的 API 与threading 模块相仿, 不过作业交给多个进程处理。

使用asyncio包处理并发

线程与协程的对比

spinner_thread.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import threading
import itertools
import time
import sys

class Signal():
go = True # 从外部控制线程关闭

def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char+' '+msg
write(status)
flush()
write('\x08'*len(status)) # 使用退格符(\x08)把光标移回来
time.sleep(.1)
if not signal.go:
break
write(' '*len(status)+'\x08'*len(status))

def slow_function():
time.sleep(3) # 假装I/O。调用sleep函数会阻塞主线程,一定要这么做,以便释放GIL,创建子线程
return 42

def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner)
spinner.start()
result = slow_function()
signal.go = False
spinner.join()
return result

def main():
result = supervisor()
print('Answer:', result)

if __name__ == '__main__':
main()

spinner_asyncio.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import asyncio
import itertools
import sys
import time


@asyncio.coroutine # 强烈建议打算交给asyncio处理的协程要使用@asyncio.coroutine装饰
def spin(msg):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' '+msg
write(status)
flush()
write('\x08'*len(status))
try:
yield from asyncio.sleep(.1) # 这样的休眠不会阻塞事件循环
# time.sleep(.1) # 将会无限循环执行,调用方也不能中止子协程的执行
except asyncio.CancelledError:
break
write(' '*len(status)+'\x08'*len(status))


@asyncio.coroutine
def slow_function():
yield from asyncio.sleep(3) # 表达式把控制权交给主事件循环(loop),在休眠结束后恢复这个协程。
return 42


@asyncio.coroutine
def supervisor():
# 排定spin协程的运行时间(顺序),使用一个Task对象包装spin协程,并立即返回
spinner = asyncio.ensure_future(spin('thinking!'))
# spinner object: <Task pending coro=<spin() running at .\index.py:7>>
print('spinner object:', spinner)
result = yield from slow_function() # 主循环在此处等待3秒,先去处理其他排定的协程
spinner.cancel()
return result


def main():
loop = asyncio.get_event_loop() # 获取事件循环的引用
result = loop.run_until_complete(supervisor()) # 驱动supervisor协程,让它运行完毕;这个协程的返回值是这次调用的返回值。
loop.close()
print('Answer:', result)

if __name__ == '__main__':
main()

以上两种supervisor实现之间的主要区别:

  1. asyncio.Task对象差不多与threading.Thread对象等效。
  2. Task对象用于驱动协程,Thread对象用于调用可调用对象。
  3. Task对象不由自己动手实例化,而是通过把协程传给asyncio.ensure_future(..)函数或loop.create_task(…)方法获取。
  4. 获取的Task对象已经排定了运行时间。Thread实例则必须调用start方法。
  5. 没有API能从外部终止线程。可以使用Task.cancel(),在协程内部抛出CancelledError异常,协程可以在暂停的yield处捕获这个异常,处理终止请求。
  6. supervisor协程必须在main函数中由loop.run_until_complete方法执行。

协程和线程的同步区别:

对协程来说, 无需保留锁, 在多个线程之间同步操作, 协程自身就会同步, 因为在任意时刻只有一个协程运行。 想交出控制权时, 可以使用 yield 或 yield from 把控制权交还调度程序。

asyncio.Future和concurrent.futures.Future区别

期物只是调度执行某物的结果。 在asyncio 包中,BaseEventLoop.create_task(…)方法接收一个协程, 排定它的运行时间,然后返回一个asyncio.Task实例——也是asyncio.Future类的实例,因为Task是Future的子类,用于包装协程。这与调用Executor.submit(…)方法创建concurrent.futures.Future实例是一个道理。

与concurrent.futures.Future类似,asyncio.Future类也提供了.done()、.add_done_callback(…)和.result() 等方法。

因为asyncio.Future类的目的是与yield from一起使用,所以通常不需要使用以下方法:

  1. 无需调用my_future.add_done_callback(…),因为可以直接把想在期物运行结束后执行的操作放在协程中 yield from my_future 表达式的后面。 这是协程的一大优势: 协程是可以暂停和恢复的函数。
  2. 无需调用my_future.result(),因为 yield from 从期物中产出的值就是结果(例如,result = yield from my_future)。

使用asyncio和aiohttp包下载

从Python3.4起,asyncio包只支持TCP和UDP。如果想使用HTTP,可以使用aiohttp包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio

import aiohttp

from flags import BASE_URL, save_flag, show, main

@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.ClientSession().get(url) # 非阻塞I/O操作
image = yield from resp.read()
return image

@asyncio.coroutine
def download_one(cc):
image = yield from get_flag(cc)
show(cc)
save_flag(image, cc.lower()+'.gif')
return cc

def download_many(cc_list):
loop = asyncio.get_event_loop()
to_do = [download_one(cc) for cc in sorted(cc_list)]
wait_coro = asyncio.wait(to_do) # wait 是一个协程,等传给它的所有协程运行完毕后结束
res, _ = loop.run_until_complete(wait_coro)
loop.close()
return len(res)

if __name__ == '__main__':
main(download_many)

asyncio.wait 会分别把各个协程包装进一个 Task 对象。 最终的结果是, wait 处理的所有对象都通过某种方式变成 Future 类的实例。 wait 是协程函数, 因此返回的是一个协程或生成器对象。

在asyncio包的API中使用yield from时:

asyncio的yield from

避免阻塞型调用

有两种方法能避免阻塞型调用中止整个应用程序的进程:

  1. 在单独的线程中运行各个阻塞型操作
  2. 把每个阻塞型操作转换成非阻塞的异步调用使用

多个线程是可以的, 但是各个操作系统线程(Python 使用的是这种线程) 消耗的内存达兆字节(具体的量取决于操作系统种类) 。 如果要处理几千个连接, 而每个连接都使用一个线程的话, 我们负担不起。

asyncio 的基础设施获得第一个响应后, 事件循环把响应发给等待结果的 get_flag 协程。 得到响应后, getflag 向前执行到下一个 yieldfrom 表达式处, 调用 resp.read() 方法, 然后把控制权还给主循环。其他响应会陆续返回(因为请求几乎同时发出) 。 所有 get flag 协程都获得结果后, 委派生成器 download_one 恢复, 保存图像文件。

改进asyncio下载脚本

把一个协程列表传给 asyncio.wait 函数, 经由loop.run_until_complete 方法驱动, 全部协程运行完毕后, 这个函数会返回所有下载结果。 可是, 为了更新进度条, 各个协程运行结束后就要立即获取结果。 为了集成进度条, 我们使用 as_completed 生成器函数;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import asyncio
import collections

import aiohttp
from aiohttp import web
import tqdm

from flags2_common import main, HTTPStatus, Result, save_flag

DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000


class FetchError(Exception):
def __init__(self, country_code):
self.country_code = country_code


@asyncio.coroutine
def get_flag(base_url, cc):
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
resp = yield from aiohttp.ClientSession().get(url)
if resp.status == 200:
image = yield from resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)


@asyncio.coroutine
# semaphore 参数是 asyncio.Semaphore 类的实例。 Semaphore 类是同步装置, 用于限制并发请求数量
def download_one(cc, base_url, semaphore, verbose):
try:
# 使用 with 表达式或手动在 semaphore 对象上调用 release 方法(递增)或 acquire 方法(递减)来操作内部计数器
with (yield from semaphore):
# 退出这个 with 语句后, semaphore 计数器的值会递减, 解除阻塞可能在等待同一个 semaphore 对象的其他协程实例
image = yield from get_flag(base_url, cc)
except web.HTTPNotFound:
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc # raise X from Y 句法链接原来的异常
else:
# 1, save_flag(image, cc.lower() + '.gif') # 直接调用 save_flag 访问本地文件系统会阻塞客户代码与 asyncio 事件循环共用的唯一线程
# 2, asyncio 的事件循环在背后维护着一个 ThreadPoolExecutor 对象,我们可以调用 run_in_executor 方法, 把可调用的对象发给它执行
# 第一个参数是 Executor 实例; 如果设为None, 使用事件循环的默认 ThreadPoolExecutor 实例
loop = asyncio.get_event_loop()
loop.run_in_executur(None, save_flag, image, cc.lower()+'.gif')

status = HTTPStatus.ok
msg = 'OK'

if verbose and msg:
print(cc, msg)
return Result(status, cc)


@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req): # 不直接调用,使用事件循环驱动
counter = collections.Counter()
# 最多允许激活 concur_req 个使用这个计数器的协程,锁
semaphore = asyncio.Semaphore(concur_req)
to_do = [download_one(cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # 循环调用 download_one 协程, 创建一个协程对象列表。
to_do_iter = asyncio.as_completed(to_do) # 获取一个迭代器, 这个迭代器会在期物运行结束后返回期物
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(
cc_list)) # 传给 tqdm 函数, 显示进度

for future in to_do_iter: # 迭代运行结束并返回结果的期物
try:
res = yield from future # 获取该期物的结果(此时已结束,不会阻塞)的最简单的方式,而不是使用future.result()
except FetchError as exc:
country_code = exc.country_code
try:
# 尝试从原来的异常(__cause__) 中获取错误消息
error_msg = exc.__cause__.args[0]
except IndexError:
error_msg = exc.__cause__.__class__.__name__
if verbose and error_msg:
msg = '*** Error for {}: {}'
print(msg.format(country_code, error_msg))
status = HTTPStatus.error
else:
status = res.status
counter[status] += 1
return counter


def download_many(cc_list, base_url, verbose, concur_req):
loop = asyncio.get_event_loop()
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
# download_many 函数只是实例化 downloader_coro 协程, 然后通过 run_until_complete 方法把它传给事件循环。
counts = loop.run_until_complete(coro)
loop.close()
return counts


if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

从回调到期物和协程

Python中的回调地狱:链式回调

1
2
3
4
5
6
7
8
9
10
11
12
def stage1(response1):
request2 = step1(response1)
api_call2(request2, stage2)

def stage2(response2):
request3 = step2(response2)
api_call3(request3, stage3)

def stage3(response3):
step3(response3)

api_call1(request1, stage1)

使用协程和yield from 结构做异步编程,无需回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
@asyncio.coroutine
def three_stages(request1):
response1 = yield from api_call1(request1)
# 第一步
request2 = step1(response1)
response2 = yield from api_call2(request2)
# 第二步
request3 = step2(response2)
response3 = yield from api_call3(request3)
# 第三步
step3(response3)

loop.create_task(three_stages(request1)) # 必须显式调度执行

使用yield from 异步编程,每次下载发起多次请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@asyncio.coroutine
def http_get(url):
resp = yield from aiohttp.ClientSession().get(url)
if resp.status == 200:
ctype = resp.headers.get('Content-Type', '').lower()
if 'json' in ctype or url.endswith('json'):
data = yield from resp.json() # 如果内容类型包含'json',解析为一个字典
else:
data = yield from resp.read() # 读取原始字节
return data

elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)


@asyncio.coroutine
def get_contry(base_url, cc):
url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
metadata = yield from http_get(url)
return metadata['country'] # 获取字典中的值


@asyncio.coroutine
def get_flag(base_url, cc):
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
return (yield from http_get(url)) # 这里必须在外层加上括号


@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
try:
with (yield from semaphore):
image = yield from get_flag(base_url, cc)
with (yield from semaphore):
country = yield from get_contry(base_url, cc)

except web.HTTPNotFound:
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc # raise X from Y 句法链接原来的异常
else:
country = country.replace(' ', '_')
filename = '{}-{}.gif'.format(country, cc)
loop = asyncio.get_event_loop()
loop.run_in_executur(None, save_flag, image, filename)

status = HTTPStatus.ok
msg = 'OK'

if verbose and msg:
print(cc, msg)
return Result(status, cc)

使用asyncio包编写服务器

使用asyncio包编写TCP服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import sys
import asyncio

from charfinder import UnicodeNameIndex

CRLF = b'\r\n'
PROMPT = b'?> '

# 实例化 UnicodeNameIndex 类时, 它会使用 charfinder_index.pickle文件(如果没有则构建)
index = UnicodeNameIndex()


@asyncio.coroutine
# 这个协程要传给start_server,接收的两个参数是,asyncio.StreamReader和asyncio.StreamWriter
def handle_queries(reader, writer):
while True: # 循环处理会话, 直到从客户端收到控制字符
# 发送提示符,StreamWriter.write 方法不是协程,只是普通的函数,不能使用yield from
writer.write(PROMPT)
yield from writer.drain() # 刷新 writer 缓冲,因为它是协程,必须使用 yield from
data = yield from reader.readline() # 协程,返回bytes对象
try:
query = data.decode().strip()
except UnicodeDecodeError:
query = '\x00' # Telnet 客户端发送控制字符时,可能会抛出 UnicodeDecodeError异常;简单起见,假装发送的是空字符。
client = writer.get_extra_info('peername') # 返回与套接字连接的远程地址
print('Received from {}:{!r}!'.format(client, query))
if query:
if ord(query[:1]) < 32:
break # 收到控制字符或者空字符, 退出循环
# 产出包含 Unicode 码位、真正的字符和字符名称的字符串
lines = list(index.find_description_strs(query))
if lines:
# 把 lines 转换成 bytes 对象,并在每一行末尾添加回车符和换行符
# writer.writelines(line.encode()+CRLF for line in lines)
writer.writelines(line.encode('utf8')+CRLF for line in '卧室的')
writer.write(index.status(query, len(lines)).encode()+CRLF)
yield from writer.drain() # 刷新输出缓冲
print('Sent {} results'.format(len(lines)))
print('Close the client socket')
writer.close() # 关闭StreamWriter流


def main(address='127.0.0.1', port=2323):
port = int(port)
loop = asyncio.get_event_loop()
# asyncio.start_server 协程运行结束后, 返回的协程对象返回一个 asyncio.Server 实例, 即一个 TCP 套接字服务器
server_coro = asyncio.start_server(
handle_queries, address, port, loop=loop)
# 驱动 server_coro 协程, 启动服务器(server)
server = loop.run_until_complete(server_coro)
host = server.sockets[0].getsockname() # 获取这个服务器的第一个套接字的地址和端口
print('Serving on {}. Hit CTRL-C to stop.'.format(host))
try:
loop.run_forever() # main 函数在这里阻塞,直到在服务器的控制台中按CTRL-C 键才会关闭
except KeyboardInterrupt: # 按CTRL-C
pass

print('Server shutting down.')
server.close()
# server.wait_closed() 方法返回一个期物,调用loop.run_until_complete运行期物
loop.run_until_complete(server.wait_closed())
loop.close()


if __name__ == '__main__':
# 处理可选的命令行参数的简便方式:展开 sys.argv[1:],传给main函数
main(*sys.argv[1:])

handle_queries 协程的名称是复数, 因为它启动交互式会话后能处理各个客户端发来的多次请求。
代码中所有的 I/O 操作都使用 bytes 格式。 因此, 我们要解码从网络中收到的字符串, 还要编码发出的字符串。Python 3 默认使用的编码是 UTF-8, 这里就隐式使用了这个编码。
run_until_complete 方法的参数是一个协程(start_server方法返回的结果)或一个 Future 对象(server.wait_closed 方法返回的结果),如果传给 run_until_complete 方法的参数是协程, 会把协程包装在 Task 对象中。
调用 loop.run_forever() 时阻塞。控制权流动到事件循环中, 而且一直待在那里。不过偶尔会回到 handle_queries 协程, 这个协程需要等待网络发送或接收数据时, 控制权又交还事件循环。
在事件循环运行期间, 只要有新客户端连接服务器就会启动一个handle_queries 协程实例。

使用aiohttp包编写Web服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import sys
import asyncio
from aiohttp import web

import aiohttp

from charfinder import UnicodeNameIndex


TEMPLATE_NAME = 'http_charfinder.html'
SAMPLE_WORDS = ('bismillah chess cat circled Malayalam digit'
' Roman face Ethiopic black mark symbol dot'
' operator Braille hexagram').split()

ROW_TPL = '<tr><td>{code_str}</td><th>{char}</th><td>{name}</td></tr>'
LINK_TPL = '<a href="/?query={0}" title="find &quot;{0}&quot;">{0}</a>'
LINKS_HTML = ', '.join(LINK_TPL.format(word) for word in
sorted(SAMPLE_WORDS, key=str.upper))


index = UnicodeNameIndex()

template = ''
with open(TEMPLATE_NAME) as tpl:
template = tpl.read()
template = template.replace('{links}', LINKS_HTML)


def home(request): # 每次请求执行一遍
query = request.query.get('query').strip()
print('Query:{!r}'.format(query))
if query:
descriptions = list(index.find_descriptions(query))
res = '\n'.join(ROW_TPL.format(**descr._asdict())
for descr in descriptions)
msg = index.status(query, len(descriptions))
else:
descriptions = []
res = ''
msg = 'Enter words describing characters'

html = template.format(query=query, result=res, message=msg)
print('Sending {} results'.format(len(descriptions)))

return web.Response(content_type='text/html', charset='utf-8', text=html)


@asyncio.coroutine
def init(loop, address, port): # init 协程产出一个服务器, 交给事件循环驱动
app = web.Application(loop=loop)
app.router.add_route('GET', '/', home)
# 返回一个 aiohttp.web.RequestHandler实例,在 app 对象上设置路由处理 HTTP 请求
handler = app.make_handler()
# 创建服务器, 以 handler 为协议处理程序, 并把服务器绑定在指定的地址和端口上
server = yield from loop.create_server(handler, address, port)
return server.sockets[0].getsockname()


def main(address='127.0.0.1', port=8111):
port = int(port)
loop = asyncio.get_event_loop()
host = loop.run_until_complete(
init(loop, address, port)) # 执行init方法,启动服务器,获取服务器的地址和端口
print('Serving on {}. Hit CTRL-C to stop.'.format(host))
try:
loop.run_forever() # 控制权在事件循环手上时, main 函数会在这里阻塞
except KeyboardInterrupt:
pass
print('Server shutting down.')
loop.close()


if __name__ == '__main__':
main(*sys.argv[1:])

只有驱动协程,协程才能做事,而驱动 asyncio.coroutine 装饰的协程有两种方法:

  1. yield from

  2. 传给 asyncio 包中某个参数为协程或期物的函数,例如 run_until_complete。

动态属性和特性

使用动态属性转换数据

使用动态属性访问JSON类数据

下载和加载,osconfeed.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from urllib.request import urlopen
import warnings
import os
import json

URL = 'http://www.oreilly.com/pub/sc/osconfeed'
JSON = 'data/osconfeed.json'

def load():
if not os.path.exists(JSON):
msg = 'downloading {} to {}'.format(URL, JSON)
warnings.warn(msg)
with urlopen(URL) as remote, open(JSON, 'wb') as local: # 同时进行下载和保存
local.write(remote.read())

with open(JSON, encoding='utf8') as fp:
return json.load(fp)

动态访问属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from collections import abc
import keyword


class FrozenJSON():
"""一个只读接口,使用属性表示法访问JSON类对象
"""

def __init__(self, mapping):
self.__data = {}
for key, value in mapping.items():
if keyword.iskeyword(key): # 判断键是否为Python关键词,特殊处理
key += '_'
self.__data[key] = value

def __getattr__(self, name):
if hasattr(self.__data, name):
# 如果 name 是实例属性 __data 的属性,返回那个属性,如 keys
return getattr(self.__data, name)
else:
# 否则从 self.__data 中获取 name 键对应的元素, 返回调用FrozenJSON.build() 方法得到的结果。
return FrozenJSON.build(self.__data[name])

@classmethod
def build(cls, obj): # 备选构造函数
if isinstance(obj, abc.Mapping):
return cls(obj)
elif isinstance(obj, abc.MutableSequence):
return [cls.build(item) for item in obj]
else:
return obj

动态访问:

1
2
3
4
5
6
7
8
9
10
from osconfeed import load
from fronzen_json import FrozenJSON

raw_feed = load()

print(raw_feed['Schedule']['events'][40]['name'])

feed = FrozenJSON(raw_feed)

print(feed.Schedule.events[40].name) #动态访问属性

使用new方法灵活创建对象

__new__: 这是个类方法(使用特殊方式处理, 因此不必使用 @classmethod 装饰器) , 必须返回一个实例(自身类或其他类的实例,返回其他类的实例解释器不会调用 __init__ 方法)。 返回的实例会作为第一个参数(即 self) 传给 __init__ 方法。

所以 __init__ 方法其实是“初始化方法”。 真正的构造方法是 __new__。我们几乎不需要自己编写 __new__ 方法, 因为从 object 类继承的实现已经足够了。

Python构件对象的过程的伪代码:

1
2
3
4
5
6
7
8
9
10
# 构建对象的伪代码
def object_maker(the_class, some_arg):
new_object = the_class.__new__(some_arg)
if isinstance(new_object, the_class):
the_class.__init__(new_object, some_arg)
return new_object

# 下述两个语句的作用基本等效
x = Foo('bar')
x = object_maker(Foo, 'bar')

使用 __new__ 方法取代build方法,构建可能是也可能不是FrozenJSON实例的新对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def __init__(self, mapping):    #__new__构建的“原始”对象,在__init__里进行初始化
self.__data = {}
for key, value in mapping.items():
if keyword.iskeyword(key):
key += '_'
self.__data[key] = value

def __getattr__(self, name):
if hasattr(self.__data, name):
return getattr(self.__data, name)
else:
return FrozenJSON(self.__data[name])

def __new__(cls, arg): # cls是类本身
if isinstance(arg, abc.Mapping):
# 默认的行为是委托给超类(这里是object)的 __new__ 方法来构建“原始”对象。背后真正的构建操作由解释器调用 C 语言实现的 object.__new__ 方法执行
return super().__new__(cls)
elif isinstance(arg, abc.MutableSequence):
return [cls(item) for item in arg]
else:
return arg

shelve模块

使用特性验证属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class LineItem():
def __init__(self, desc, weight, price):
self.desc = desc
self.weight = weight # 这里赋值已经使用了特性的设值方法
self.price = price

def subtotal(self):
return self.weight*self.price

@property
def weight(self): # 读值方法
return self.__weight

@weight.setter # 这个装饰器把读值方法和设值方法绑定在一起
def weight(self, value): # 设值方法
if value > 0:
self.__weight = value
else:
raise ValueError('value must be > 0')

特性全解析

特性会覆盖实例属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Class():
data = 'class data' # 类属性

@property
def prop(self):
'''prop doc'''
return 'prop value'


obj = Class()
print(dir(obj)) # 查询所有属性名列表,包括从父类继承的,返回__dir__()的结果
print(vars(obj)) # {},实例的属性名值对,返回__dict__属性的值
print(obj.data) # class data,获取的是Class.data的值
obj.data = 'bar' # 创建一个实例属性,覆盖了类属性
print(vars(obj)) # {'data': 'bar'}
print(Class.data) # class data,类属性没有改变

print(Class.prop) # <property object at ...>,特性相当于类属性,获取的是特性对象本身,不会运行特性的读值方法
print(obj.prop) # prop value,实例调用,执行了特性的读值方法
# obj.prop = 'foo' # AttributeError: can't set attribute,只读特性,报错
obj.__dict__['prop'] = 'foo' # 通过__dict__成功设置实例属性
print(vars(obj)) # {'data': 'bar', 'prop': 'foo'}
print(obj.prop) # prop value,仍会运行特性的读值方法,特性覆盖了实例属性
Class.prop = 'baz' # 相当于销毁了特性对象
print(obj.prop) # foo,现在读取的是实例属性


Class.data = property(lambda self: 'class data prop', doc='data doc')
print(obj.data) # class data prop,实例的data属性被类的data特性覆盖了
del Class.data
print(obj.data) # bar,恢复原样

obj.attr 这样的表达式不会从 obj 开始寻找attr,而是从 obj.__class__ 开始。而且, 仅当类中没有名为 attr的特性时, Python 才会在 obj 实例中寻找。

特性工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def quantity(storage_name):
def qty_getter(instance):
return instance.__dict__[storage_name]

def qty_setter(instance, value):
if value > 0:
instance.__dict__[storage_name] = value
else:
raise ValueError('value must be > 0')

return property(qty_getter, qty_setter)


class LineItem():
weight = quantity('weight') # 把自定义特性定义为类属性
price = quantity('price')

def __init__(self, desc, weight, price):
self.desc = desc
self.weight = weight # 特性已经激活
self.price = price

def subtotal(self):
return self.weight*self.price


i = LineItem('desc', 10, 20)
print(LineItem.weight)
# {'desc': 'desc', 'weight': 10, 'price': 20},实例也创建了属性的,真正用于存储值
print(vars(i))
print(i.weight) # 10,调用了特性的读值方法

处理属性删除操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class BlackKnight():
def __init__(self):
self.members = ['an arm', 'another arm', 'a leg', 'another leg']
self.phrases = ["'Tis but a scratch.",
"It's just a flesh wound.",
"I'm invincible!",
"All right, we'll call it a draw."]

@property
def member(self):
print('next member is:')
return self.members[0]

@member.deleter
def member(self):
text = 'BLACK KNIGHT (loses {})\n-- {}'
print(text.format(self.members.pop(0), self.phrases.pop(0)))


knight = BlackKnight()
print(knight.member)

del knight.member
del knight.member
del knight.member
del knight.member

处理属性的重要属性和函数

特殊属性

1
2
3
4
5
6
7
b = BlackKnight()

print(type(BlackKnight)) # <class 'type'>
print(BlackKnight.__class__) # <class 'type'>

print(type(b)) # <class '__main__.BlackKnight'>
print(b.__class__) # <class '__main__.BlackKnight'>

__class__:对象所属类的引用(即 obj.__class__ 与 type(obj) 的作用相同) 。Python 的某些特殊方法, 例如 __getattr__, 只在对象的类中寻找, 而不在实例中寻找。

__dict__:一个映射, 存储对象或类的可写属性(键值对)。 有 __dict__ 属性的对象,任何时候都能随意设置新属性。 如果类有 __slots__ 属性, 它的实例可能没有 __dict__ 属性。 参见下面对 __slots__ 属性的说明。

__slots__:类可以定义这个这属性, 限制实例能有哪些属性。 __slots__ 属性的值是一个字符串组成的元组, 指明允许有的属性。 如果 __slots__ 中没有 __dict__, 那么该类的实例没有 __dict__ 属性, 实例只允许有指定名称的属性。

内置函数

dir([object]):列出对象的大多数属性。dir 函数的目的是交互式使用, 因此没有提供完整的属性列表, 只列出一组“重要的”属性名。 dir 函数能审查有或没有 __dict__ 属性的对象。 dir 函数不会列出 __dict__ 属性本身, 但会列出其中的键。 dir 函数也不会列出类的几个特殊属性, 例如 __mro____bases____name__。 如果没有指定可选的 object 参数, dir 函数会列出当前作用域中的名称。

getattr(object, name[, default]):从 object 对象中获取 name 字符串对应的属性。 获取的属性可能来自对象所属的类或超类。 如果没有指定的属性, getattr 函数抛出AttributeError 异常, 或者返回 default 参数的值(如果设定了这个参数的话) 。

hasattr(object, name):如果 object 对象中存在指定的属性, 或者能以某种方式(例如继承) 通过 object 对象获取指定的属性, 返回 True。 文档说: “这个函数的实现方法是调用 getattr(object, name) 函数, 看看是否抛出AttributeError 异常。 ”

setattr(object, name, value):把 object 对象指定属性的值设为 value, 前提是 object 对象能接受那个值。 这个函数可能会创建一个新属性, 或者覆盖现有的属性。

vars([object]):返回 object 对象的 __dict__ 属性; 如果实例所属的类定义了 __slots__ 属性, 实例没有 __dict__ 属性, 那么 vars 函数不能处理那个实例(相反, dir 函数能处理这样的实例) 。 如果没有指定参数,那么 vars() 函数的作用与 locals() 函数一样: 返回表示本地作用域的字典。

特殊方法

使用点号或内置的 getattr、 hasattr 和 setattr 函数存取属性都会触发下述列表中相应的特殊方法。 但是, 直接通过实例的 __dict__ 属性读写属性不会触发这些特殊方法——如果需要, 通常会使用这种方式跳过特殊方法。
要假定特殊方法从类上获取, 即便操作目标是实例也是如此。 因此, 特殊方法不会被同名实例属性遮盖。例如, obj.attr 和 getattr(obj, ‘attr’, 42) 都会触发 Class.__getattribute__(obj, 'attr') 方法。

__delattr__(self, name):只要使用 del 语句删除属性, 就会调用这个方法。 例如, del obj.attr 语句触发 Class.__delattr__(obj, 'attr') 方法。

__dir__(self):把对象传给 dir 函数时调用, 列出属性。 例如, dir(obj) 触发 Class.__dir__(obj) 方法。

__getattr__(self, name):仅当获取指定的属性失败, 搜索过 obj、 Class 和超类之后调用。表达式 obj.no_such_attr、 getattr(obj, ‘no_such_attr’) 和hasattr(obj, ‘no_such_attr’) 可能会触发 Class.__getattr__(obj, 'no_such_attr') 方法, 但是, 仅当在obj、 Class 和超类中找不到指定的属性时才会触发。

__setattr__(self, name, value):尝试设置指定的属性时总会调用这个方法。 点号和 setattr 内置函数会触发这个方法。 例如, obj.attr = 42 和 setattr(obj,’attr’, 42) 都会触发 Class.__setattr__(obj, ‘attr’, 42) 方法。

__getattribute__(self, name):尝试获取指定的属性时总会调用这个方法, 不过, 寻找的属性是特殊属性或特殊方法时除外。 点号与 getattr 和 hasattr 内置函数会触发这个方法。 调用 __getattribute__ 方法且抛出 AttributeError 异常时, 才会调用 __getattr__ 方法。 为了在获取 obj 实例的属性时不导致无限递归, __getattribute__ 方法的实现要使用 super().__getattribute__(obj, name)

属性描述符

描述符是对多个属性运用相同存取逻辑的一种方式。描述符是实现了特定协议的类, 这个协议包括 __get____set____delete__ 方法。 property 类实现了完整的描述符协议。 通常, 可以只实现部分协议。我们在真实的代码中见到的大多数描述符只实现了 __get____set__ 方法, 还有很多只实现了其中的一个。Django模型的字段就是描述符。

描述符示例:验证属性

把19章的把 quantity 特性工厂函数重构成 Quantity 描述符类。

术语说明:

  • 描述符类:实现描述符协议的类
  • 托管类:把描述符实例声明为类属性的类
  • 描述符实例:描述符类的各个实例, 声明为托管类的类属性
  • 托管实例:托管类的实例
  • 储存属性:托管实例中存储自身托管属性的属性
  • 托管属性:托管类中由描述符实例处理的公开属性, 值存储在储存属性中。 也就是说, 描述符实例和储存属性为托管属性建立了基础
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Quantity():  # 基于协议实现,不用继承
def __init__(self, storage_name):
self.storage_name = storage_name # 储存属性的名称

def __set__(self, instance, value): # 为托管属性赋值时调用,设置托管实例的储存属性
if value > 0:
# 如果使用内置的setattr 函数, 会再次触发 __set__ 方法, 导致无限递归
instance.__dict__[self.storage_name] = value
else:
raise ValueError('value must be > 0')


class LineItem():
weight = Quantity('weight') # 把描述符实例绑定到类属性 weight
price = Quantity('price')

def __init__(self, desc, weight, price):
self.desc = desc
self.weight = weight
self.price = price

def subtotal(self):
return self.weight*self.price


truffle = LineItem('White truffe', 100, 10)
print(vars(truffle))

自动获取储存属性的名称

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Quantity():
__counter = 0 # Quantity的私有类属性

def __init__(self):
cls = self.__class__
prefix = cls.__name__
index = cls.__counter
self.storage_name = '_{}#{}'.format(prefix, index) # 为每个储存属性生成一个唯一的名称
cls.__counter += 1


def __get__(self, instance, owner): # instance是托管实例的引用, owner是托管类(如 LineItem) 的引用
if instance is None:
return self
else:
return getattr(instance, self.storage_name)

def __set__(self, instance, value):
if value > 0:
# 因为托管属性和储存属性的名称不同, 所以把储存属性传给 getattr 函数不会触发描述符
setattr(instance, self.storage_name, value)
else:
raise ValueError('value must be > 0')


class LineItem():
weight = Quantity()
price = Quantity()

pass

重构描述符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import abc


class AutoStorage(): # AutoStorage 类提供了之前 Quantity 描述符的大部分功能
__counter = 0

def __init__(self):
cls = self.__class__
prefix = cls.__name__
index = cls.__counter
self.storage_name = '_{}#{}'.format(prefix, index)
cls.__counter += 1

def __get__(self, instance, owner):
if instance is None:
return self
else:
return getattr(instance, self.storage_name)

def __set__(self, instance, value):
setattr(instance, self.storage_name, value)


class Validated(abc.ABC, AutoStorage):
def __set__(self, instance, value): # 把验证操作委托给 validate 方法
value = self.validate(instance, value)
super().__set__(instance, value)

@abc.abstractmethod
def validate(self, instance, value):
"""return validated value or raise ValueError"""


class Quantity(Validated):
"""a number greater than zero"""

def validate(self, instance, value):
if value <= 0:
raise ValueError('value must be > 0')
return value


class NonBlank(Validated):
"""a string with at least one non-space character"""

def validate(self, instance, value):
value = value.strip()
if len(value) == 0:
raise ValueError('value cannot be empty or blank')
return value


class LineItem():
desc = NonBlank()
weight = Quantity()
price = Quantity()

pass
1
2
3
4
5
import lineitem

print(lineitem.Quantity.__mro__) # (<class 'lineitem.Quantity'>, <class 'lineitem.Validated'>, <class 'abc.ABC'>, <class 'lineitem.AutoStorage'>, <class 'object'>)

print(LineItem5.Quantity.__mro__)

覆盖型和非覆盖型描述符对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def cls_name(obj_or_cls):
cls = type(obj_or_cls)
if cls is type:
cls = obj_or_cls
return cls.__name__.split('.')[-1]


def display(obj):
cls = type(obj)
if cls is type:
return '<class {}>'.format(obj.__name__)
elif cls in [type(None), int]:
return repr(obj)
else:
return '<{} object>'.format(cls_name(obj))


def print_args(name, *args):
pseudo_args = ', '.join(display(x) for x in args)
print('-> {}.__{}__({})'.format(cls_name(args[0]), name, pseudo_args))


class Overriding(): # 有 __get__ 和 __set__ 方法的典型覆盖型描述符
def __get__(self, instance, owner):
print_args('get', self, instance, owner)

def __set__(self, instance, value):
print_args('set', self, instance, value)


class OverridingNoGet: # 没有 __get__ 方法的覆盖型描述符
def __set__(self, instance, value):
print_args('set', self, instance, value)


class NonOverriding: # 没有 __set__ 方法, 所以这是非覆盖型描述符
def __get__(self, instance, owner):
print_args('get', self, instance, owner)


class Managed:
over = Overriding()
over_no_get = OverridingNoGet()
non_over = NonOverriding()

def spam(self): # 为了对比, 因为方法也是描述符
print('-> Managed.spam({})'.format(display(self)))


obj = Managed()

覆盖型描述符:也叫数据描述符或强制描述符,实现 __set__ 方法的描述符属于覆盖型描述符, 因为虽然描述符是类属性, 但是实现 __set__ 方法的话, 会覆盖对实例属性的赋值操作

1
2
3
4
5
6
7
8
obj.over  # (<Overriding object>, <Managed object>, <class Managed>)
Managed.over # (<Overriding object>, None, <class Managed>)
obj.over = 7 # (<Overriding object>, <Managed object>, 7)
obj.over # (<Overriding object>, <Managed object>, <class Managed>)
obj.__dict__['over'] = 8
print(vars(obj)) # {'over': 8}
obj.over # 即使有了over的实例属性,Managed.over仍会覆盖读取obj.over操作
# (<Overriding object>, <Managed object>, <class Managed>)

没有 __get__ 方法的覆盖性描述符:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 没有实现__get__方法,因此从类中获取描述符实例 <__main__.OverridingNoGet object at 0x000002A88A67B8D0>
print(obj.over_no_get)
# <__main__.OverridingNoGet object at 0x000002A88A67B8D0>
print(Managed.over_no_get)
# 触发__set__方法,(<OverridingNoGet object>, <Managed object>, 7)
obj.over_no_get = 7
# 仍是托管类中的描述符实例,<__main__.OverridingNoGet object at 0x000001A913D8C908>
print(obj.over_no_get)
obj.__dict__['over_no_get'] = 9
print(obj.over_no_get) # 9,over_no_get 实例属性会遮盖描述符, 但是只有读操作是如此
# 仍然经过描述符的 __set__ 方法处理。(<OverridingNoGet object>, <Managed object>, 7)
obj.over_no_get = 7
print(obj.over_no_get) # 9,只要有同名的实例属性存在,描述符就会被遮盖

非覆盖型描述符:也叫非数据描述符或遮盖型描述符,没有实现 __set__ 方法的描述符是非覆盖型描述符。

1
2
3
4
5
6
7
obj.non_over  # (<NonOverriding object>, <Managed object>, <class Managed>)
obj.non_over = 7 # 非覆盖型描述符,没有干涉赋值操作的__set__方法
print(obj.non_over) # 7,名为 non_over 的实例属性,把类的同名描述符属性遮盖掉了
Managed.non_over # (<NonOverriding object>, None, <class Managed>)
del obj.non_over
# 删掉实例属性后,仍然触发了描述符。(<NonOverriding object>, <Managed object>, <class Managed>)
obj.non_over

在类中覆盖描述符

不管描述符是不是覆盖型, 为类属性赋值都能覆盖描述符。读类属性的操作可以由依附在托管类上定义有 __get__ 方法的描述符处理,但是写类属性的操作不会由依附在托管类上定义有 __set__ 方法的描述符处理

1
2
3
4
Managed.over = 1
Managed.over_no_get = 2
Managed.non_over = 3
print(obj.over, obj.over_no_get, obj.non_over) # 1 2 3,描述符真的不见了

方法是描述符

在类中定义的函数属于绑定方法(bound method) , 因为用户定义的函数都有 __get__ 方法, 所以依附到类上时, 就相当于描述符。

方法是非覆盖型描述符。

1
2
3
4
5
6
7
8
9
obj = Managed()

# obj.spam 获取的是绑定方法对象。<bound method Managed.spam of <__main__.Managed object at ...>>
print(obj.spam)
# Managed.spam 获取的是函数。<function Managed.spam at 0x0000013D5D3DD400>
print(Managed.spam)
obj.spam = 7 # 为 obj.spam 赋值,创建了同名的实例属性,将会遮盖类属性,导致无法通过 obj 实例访问 spam 方法
print(obj.spam) # 7
print(Managed.spam) # <function Managed.spam at 0x0000013D5D3DD400>

与描述符一样, 通过托管类访问时, 函数的 __get__ 方法会返回自身的引用。 但是, 通过实例访问时, 函数的 __get__ 方法返回的是绑定方法对象: 一种可调用的对象, 里面包装着函数, 并把托管实例(例如 obj) 绑定给函数的第一个参数(即 self) , 这与 functools.partial 函数的行为一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import collections


class Text(collections.UserString):
def __repr__(self):
return 'Text({!r})'.format(self.data)

def reverse(self):
return self[::-1]


word = Text('forward')
print(word) # forward
print(word.reverse()) # drawrof
print(Text.reverse(word)) # drawrof,在类上调用方法相当于调用函数
# <class 'function'> <class 'method'>
print(type(Text.reverse), type(word.reverse))
# 函数都是非覆盖型描述符。 在函数上调用 __get__ 方法时传入实例, 得到的是绑定到那个实例上的方法。
# <bound method Text.reverse of Text('forward')>
print(Text.reverse.__get__(word))
# 如果 instance 参数的值是 None, 那么得到的是函数本身
# <function Text.reverse at 0x000001F3A93DF0D0>
print(Text.reverse.__get__(None, Text))
# word.reverse 表达式其实会调用Text.reverse.__get__(word), 返回对应的绑定方法
# <bound method Text.reverse of Text('forward')>
print(word.reverse)
print(word.reverse.__self__) # forward,调用这个方法的实例引用。
print(word.reverse.__func__ is Text.reverse) # True,依附在托管类上那个原始函数的引用

绑定方法对象还有个 __call__ 方法, 用于处理真正的调用过程。 这个方法会调用 __func__ 属性引用的原始函数, 把函数的第一个参数设为绑定方法的 __self__ 属性。

描述符用法建议

  • 使用特性以保持简单

内置的 property 类创建的其实是覆盖型描述符, __set__ 方法和 __get__ 方法都实现了, 即便不定义设值方法也是如此。 特性的 __set__ 方法默认抛出 AttributeError: can’t set attribute,因此创建只读属性最简单的方式是使用特性, 这能避免下一条所述的问题。

  • 只读描述符必须有 __set__ 方法

如果使用描述符类实现只读属性, 要记住, __get____set__ 两个方法必须都定义, 否则, 实例的同名属性会遮盖描述符。 只读属性的 __set__ 方法只需抛出 AttributeError 异常, 并提供合适的错误消息。

  • 用于验证的描述符可以只有 __set__ 方法

对仅用于验证的描述符来说, __set__ 方法应该检查 value 参数获得的值, 如果有效, 使用描述符实例的名称为键, 直接在实例的 __dict__ 属性中设置。 这样, 从实例中读取同名属性的速度很快, 因为不用经过 __get__ 方法处理。

  • 仅有 __get__ 方法的描述符可以实现高效缓存

如果只编写了 __get__ 方法, 那么创建的是非覆盖型描述符。 这种描述符可用于执行某些耗费资源的计算, 然后为实例设置同名属性,缓存结果。 同名实例属性会遮盖描述符, 因此后续访问会直接从实例的 __dict__ 属性中获取值, 而不会再触发描述符的 __get__ 方法。

  • 非特殊的方法可以被实例属性遮盖

由于函数和方法只实现了 __get__ 方法, 它们不会处理同名实例属性的赋值操作。 因此, 像 my_obj.the_method = 7 这样简单赋值之后, 后续通过该实例访问 the_method 得到的是数字 7——但是不影响类或其他实例。 然而, 特殊方法不受这个问题的影响。 解释器只会在类中寻找特殊的方法, 也就是说, repr(x) 执行的其实是 x.__class__.__repr__(x), 因此 x 的 __repr__ 属性对 repr(x) 方法调用没有影响。 出于同样的原因, 实例的 __getattr__ 属性不会破坏常规的属性访问规则。

类元编程

类元编程是指在运行时创建或定制类的技艺。元类是类元编程最高级的工具: 使用元类可以创建具有某种特质的全新类种, 例如我们见过的抽象基类。

类工厂函数

1
2
3
4
5
class Dog:
def __init__(self, name, weight, owner):
self.name = name
self.weight = weight
self.owner = owner

避免编写上述样板代码,我们下面创建一个类工厂函数,即可变对象版本的collections.namedtuple。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def record_factory(cls_name, field_names):
try:
field_names = field_names.replace(',', ' ').split()
except AttributeError: # 不能调用.replace或.split方法
pass # 假定field_names本就是标识符组成的序列

# 构建元组,这将成为新建类的 __slots__ 属性;这么做还设定了拆包和字符串表示形式中各字段的顺序
field_names = tuple(field_names)

def __init__(self, *args, **kwargs):
attrs = dict(zip(self.__slots__, args))
attrs.update(kwargs)
for name, value in attrs.items():
setattr(self, name, value)

def __iter__(self):
for name in self.__slots__:
yield getattr(self, name)

def __repr__(self):
values = ', '.join('{}={!r}'.format(*i)
for i in zip(self.__slots__, self))
return '{}({})'.format(self.__class__.__name__, values)

cls_attrs = dict(__slots__=field_names,
__init__=__init__,
__iter__=__iter__,
__repr__=__repr__)

return type(cls_name, (object,), cls_attrs) # 调用 type 构造方法,构建新类,然后返回


Dog = record_factory('Dog', 'name weight owner')
rex = Dog('Rex', 30, 'Bob')
print(rex) # Dog(name='Rex', weight=30, owner='Bob')

type 的实例是类。

1
2
3
4
ty = type('MyClass', (object,), {'x': 1})
print(type(ty)) # <class 'type'>
obj = ty()
print(obj) # <__main__.MyClass object at ...>

相当于

1
2
3
4
class MyClass():
x = 1

print(type(MyClass)) # <class 'type'>

定制描述符的类装饰器

类装饰器与函数装饰器非常类似, 是参数为类对象的函数, 返回原来的类或修改后的类。

1
2
3
4
5
6
7
8
9
10
11
12
def entity(cls):  # 类装饰器的参数是一个类
for key, attr in cls.__dict__.items():
if isinstance(attr, Validated):
type_name = type(attr).__name__
attr.storage_name = '_{}#{}'.format(type_name, key)
return cls

@entity
class LineItem():
desc = NonBlank()
weight = Quantity()
price = Quantity()

类装饰器能以较简单的方式做到以前需要使用元类去做的事情——创建类时定制类。

类装饰器有个重大缺点: 只对直接依附的类有效。 这意味着, 被装饰的类的子类可能继承也可能不继承装饰器所做的改动,这个缺点在下面解决。

导入时和运行时比较

导入时和运行时比较

evaltime.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from evalsupport import deco_alpha

print('<[1]> evaltime module start')

class ClassOne():
print('<[2]> ClassOne body')

def __init__(self):
print('<[3]> ClassOne.__init__')

def __del__(self):
print('<[4]> ClassOne.__del__')

def method_x(self):
print('<[5]> ClassOne.method_x')

class ClassTwo(object):
print('<[6]> ClassTwo body')

@deco_alpha
class ClassThree():
print('<[7]> ClassThree body')

def method_y(self):
print('<[8]> ClassThree.method_y')

class ClassFour(ClassThree):
print('<[9]> ClassFour body')

def method_y(self):
print('<[10]> ClassFour.method_y')

if __name__ == '__main__':
print('<[11]> ClassOne tests', 30 * '.')
one = ClassOne()
one.method_x()
print('<[12]> ClassThree tests', 30 * '.')
three = ClassThree()
three.method_y()
print('<[13]> ClassFour tests', 30 * '.')
four = ClassFour()
four.method_y()

print('<[14]> evaltime module end')

evalsupport.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
print('<[100]> evalsupport module start')

def deco_alpha(cls):
print('<[200]> deco_alpha')

def inner_1(self):
print('<[300]> deco_alpha:inner_1')

cls.method_y = inner_1
return cls

class MetaAleph(type):
print('<[400]> MetaAleph body')

def __init__(cls, name, bases, dic): # cls(self):这是要初始化的类对象(例如 ClassFive)
print('<[500]> MetaAleph.__init__')

def inner_2(self): # self,是初始化类的实例
print('<[600]> MetaAleph.__init__:inner_2')

cls.method_z = inner_2

print('<[700]> evalsupport module end')

场景1:import evaltime

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> import evaltime

# evalsupport 模块中的所有顶层代码在导入模块时运行; 解释器会编译 deco_alpha 函数, 但是不会执行定义体。
<[100] > evalsupport module start
<[400] > MetaAleph body # MetaAleph 类的定义体运行了
<[700] > evalsupport module end
<[1] > evaltime module start
<[2] > ClassOne body # 每个类的定义体都执行了
<[6] > ClassTwo body
<[7] > ClassThree body
<[200] > deco_alpha # 先计算被装饰的类 ClassThree 的定义体, 然后运行装饰器函数。
<[9] > ClassFour body
<[14] > evaltime module end # evaltime 模块是导入的, 因此不会运行 if __name__ == '__main__': 块

场景2:python3 evaltime.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>> py evaltime.py

<[100] > evalsupport module start
<[400] > MetaAleph body
<[700] > evalsupport module end
<[1] > evaltime module start
<[2] > ClassOne body
<[6] > ClassTwo body
<[7] > ClassThree body
<[200] > deco_alpha
<[9] > ClassFour body
<[11] > ClassOne tests ..............................
<[3] > ClassOne.__init__
<[5] > ClassOne.method_x
<[12] > ClassThree tests ..............................
<[300] > deco_alpha: inner_1
<[13] > ClassFour tests ..............................
<[10] > ClassFour.method_y # 类装饰器可能对子类没有影响
<[14] > evaltime module end
<[4] > ClassOne.__del__ # 程序结束时, 绑定在全局变量 one 上的 ClassOne 实例才会被垃圾回收程序回收。

元类基础知识

元类是制造类的工厂, 不过不是函数, 而是类。元类是用于构建类的类。

根据 Python 对象模型, 类是对象, 因此类肯定是另外某个类的实例。 默认情况下, Python 中的类是 type 类的实例。 也就是说, type 是大多数内置的类和用户定义的类的元类。

1
2
3
print('spam'.__class__)  # <class 'str'>
print(str.__class__) # <class 'type'>
print(type.__class__) # 为了避免无限回溯, type 是其自身的实例。

type和object的关系

type和object

ABCMeta和type的关系

ABCMeta和type

所有类都是 type 的实例, 但是元类还是 type 的子类, 因此可以作为制造类的工厂。 具体来说, 元类可以通过实现 __init__ 方法定制实例。 元类的 __init__ 方法可以做到类装饰器能做的任何事情, 但是作用更大。

元类计算时间的练习:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from evalsupport import deco_alpha
from evalsupport import MetaAleph

print('<[1]> evaltime module start')


@deco_alpha
class ClassThree():
print('<[2]> ClassThree body')

def method_y(self):
print('<[3]> ClassThree.method_y')


class ClassFour(ClassThree):
print('<[4]> ClassFour body')

def method_y(self):
print('<[5]> ClassFour.method_y')


class ClassFive(metaclass=MetaAleph):
print('<[6]> ClassFive body')

def __init__(self):
print('<[7]> ClassFive.__init__')

def method_z(self):
print('<[8]> ClassFive.method_z')


class ClassSix(ClassFive):
print('<[9]> ClassSix body')

def method_z(self):
print('<[10]> ClassSix.method_z')


if __name__ == '__main__':
print('<[11]> ClassThree tests', 30 * '.')
three = ClassThree()
three.method_y()
print('<[12]> ClassFour tests', 30 * '.')
four = ClassFour()
four.method_y()
print('<[13]> ClassFive tests', 30 * '.')
five = ClassFive()
five.method_z()
print('<[14]> ClassSix tests', 30 * '.')
six = ClassSix()
six.method_z()

print('<[15]> evaltime_meta module end')

场景3:import evaltime_meta

1
2
3
4
5
6
7
8
9
10
11
12
13
14
>>> import evaltime_meta

<[100] > evalsupport module start
<[400] > MetaAleph body
<[700] > evalsupport module end
<[1] > evaltime module start
<[2] > ClassThree body
<[200] > deco_alpha
<[4] > ClassFour body
<[6] > ClassFive body
<[500] > MetaAleph.__init__ #创建 ClassFive 时调用了 MetaAleph.__init__ 方法
<[9] > ClassSix body
<[500] > MetaAleph.__init__ #创建 ClassFive 的子类 ClassSix 时也调用了 MetaAleph.__init__ 方法
<[15] > evaltime_meta module end

场景4:py evaltime_meta.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
>>> py evaltime_meta.py

<[100] > evalsupport module start
<[400] > MetaAleph body
<[700] > evalsupport module end
<[1] > evaltime module start
<[2] > ClassThree body
<[200] > deco_alpha
<[4] > ClassFour body
<[6] > ClassFive body
<[500] > MetaAleph.__init__
<[9] > ClassSix body
<[500] > MetaAleph.__init__
<[11] > ClassThree tests ..............................
<[300] > deco_alpha: inner_1
<[12] > ClassFour tests ..............................
<[5] > ClassFour.method_y #没有直接依附装饰器的 ClassFour 类不受影响
<[13] > ClassFive tests ..............................
<[7] > ClassFive.__init__
<[600] > MetaAleph.__init__: inner_2
<[14] > ClassSix tests ..............................
<[7] > ClassFive.__init__
<[600] > MetaAleph.__init__: inner_2 #ClassSix 类没有直接引用 MetaAleph 类, 但是却受到了影响, 因为它是 ClassFive 的子类, 进而也是 MetaAleph 类的实例, 所以由 MetaAleph.__init__ 方法初始化。
<[15] > evaltime_meta module end

定制描述符的元类

替代 @entity 装饰器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class EntityMeta(type):
"""元类, 用于创建带有验证字段的业务实体"""

def __init__(cls, name, bases, attr_dict): # 在超类(在这里是 type) 上调用 __init__ 方法
super().__init__(name, bases, attr_dict)
for name, attr in attr_dict.items(): # 与 @entity 装饰器的逻辑一样
if isinstance(attr, Validated):
type_name = type(attr).__name__
attr.storage_name = '_{}#{}'.format(type_name, name)


# 这个类的存在只是为了用起来便利: 这个模块的用户直接继承Entity 类即可, 无需关心 EntityMeta 元类
class Entity(metaclass=EntityMeta):
"""带有验证字段的业务实体"""

import line_item as model

class LineItem(model.Entity):
desc = model.NonBlank()
weight = model.Quantity()
price = model.Quantity()

pass

元类的特殊方法 prepare

在某些应用中, 可能需要知道类的属性定义的顺序。
type 构造方法及元类的 __new____init__ 方法都会收到要计算的类的定义体, 形式是名称到属性的映射。 然而在默认情况下, 那个映射是字典; 也就是说,元类或类装饰器获得映射时, 属性在类定义体中的顺序已经丢失了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class EntityMeta(type):
"""元类, 用于创建带有验证字段的业务实体"""

@classmethod
def __prepare__(cls, name, bases):
return collections.OrderedDict()

def __init__(cls, name, bases, attr_dict):
super().__init__(name, bases, attr_dict)
cls._field_names = [] # 非私有属性
# 这里的 attr_dict 是那个OrderedDict 对象, 由解释器在调用 __init__ 方法之前调用 __prepare__ 方法时获得(prepare->new->init)
# 因此, 这个 for 循环会按照添加属性的顺序迭代属性
for key, attr in attr_dict.items():
if isinstance(attr, Validated):
type_name = type(attr).__name__
attr.storage_name = '_{}#{}'.format(type_name, key)
cls._field_names.append(key)


# 这个类的存在只是为了用起来便利: 这个模块的用户直接继承Entity 类即可, 无需关心 EntityMeta 元类
class Entity(metaclass=EntityMeta):
"""带有验证字段的业务实体"""
@classmethod
def field_names(cls):
for name in cls._field_names:
yield name

调用:

1
2
for name in LineItem.field_names():
print(name)

__prepare__:这个特殊方法只在元类中有用, 而且必须声明为类方法(即使用@classmethod 装饰器定义) 。 解释器调用元类的 __new__ 方法之前会先调用 __prepare__ 方法, 使用类定义体中的属性创建映射。 __prepare__ 方法的第一个参数是元类, 随后两个参数分别是要构建的类的名称和基类组成的元组, 返回值必须是映射。 元类构建新类时,__prepare__ 方法返回的映射会传给 __new__ 方法的最后一个参数, 然后再传给 __init__ 方法。

框架和库会使用元类协助程序员执行很多任务, 例如:

  • 验证属性
  • 一次把装饰器依附到多个方法上
  • 序列化对象或转换数据
  • 对象关系映射
  • 基于对象的持久存储
  • 动态转换使用其他语言编写的类结构

类作为对象

Python 数据模型为每个类定义了很多属性,除了 __mro____class____name__ 属性之外,还有:

cls.__bases__:由类的基类组成的元组

cls.__qualname__:其值是类或函数的限定名称, 即从模块的全局作用域到类的点分路径。例如:内部类ClassTwo 的 __qualname__ 属性, 其值是字符串 ‘ClassOne.ClassTwo’, 而 __name__ 属性的值是 ‘ClassTwo’

cls.__subclasses__():包含类的直接子类。 这个方法的实现使用弱引用, 防止在超类和子类(子类在 __bases__ 属性中储存指向超类的强引用) 之间出现循环引用。 这个方法返回的列表中是内存里现存的子类

cls.mro():超类元组

dir(…) 函数不会列出本节提到的任何一个属性。

欢迎打赏