本节是第五讲的第十五小节,本节主要介绍二进制文件处理数据以及自定义二进制文件等。
文件处理(File Handling)
航空器事故记录
Name Data Type Notes
report_id str Minimum length 8 and no whitespace
date datetime.date
airport str Nonempty and no newlines
aircraft_id str Nonempty and no newlines
aircraft_type str Nonempty and no newlines
pilot_percent_hours_on_type float Range 0.0 to 100.0
pilot_total_hours int Positive and nonzero
midair bool
narrative str Multiline
通过将同样的航空器事故数据集用于二进制、文本格式与XML等格式,可以比较不同数据格式的处理过程以及处理这些数据所需的代码。如图展示了用于读、写每种代码格式所需的代码行数以及总数。文件大小基于596个航空器事故记录中的一个特定样本,是个近似值*。同样的数据使用不同文件名并以压缩的二进制格式保存时,大小会有一些字节的差异,因为文件名也包含在压缩后的数据中,而文件名的长度是不同的。类似地,XML文件的大小也会有变化,因为有些XML写入器为文本数据内的引号使用实体("用于表示“,用于表示)而有些并不这样做。
Format Reader/Writer Reader + Writer Lines of Code Total Lines of Code Output File Size(-KB)
Binary Pickle (gzip compressed) 20 + 16= 36 160
Binary Pickle 20 + 16= 36 416
Binary Manual (gzip compressed) 60 + 34= 94 132
Binary Manual 60 + 34= 94 356
Plain text Regex reader/manual writer 39 + 28= 67 436
Plain text Manual 53 + 28= 81 436
XML Element tree 37 + 27= 64 460
XML DOM 44 + 36= 80 460
XML SAX reader/manual writer 55 + 37= 92 464
代码均取自同一个程序convert-incidents.py。该程序用于以某种格式读取航空器事故数据,之后以另一种格式写入。下面给出的是该程序的控制台帮助文本。
Usage: convert-incidents.py [options] infite outfile
Reads aircraft incident data from infile and writes the data to
outfile. The data formats used depend on the file extensions:
.aix is XML, .ait is text (UTF-8 encoding), .aib is binary,
.aip is pickle, and .html is HTML (only allowed for the outfile).
All formats are platform-independent.
Options:
-h, --help show this help message and exit
-f, --force write the outfile even if it exists [default: off]
-v, --verbose report results [default: off]
-r READER, --reader=READER
reader (XML): 'dom’, 'd', 'etree‘, 'sax', 's'
reader (text): 'manual‘, 'm’,'regex','r'
[default: etree for XML, manual for text]
-w WRITER, --writer=WRITER
writer (XML): 'dom', 'd', 'etree', 'e',
'manual', 'm [default: manual]
-z, --compress compress .aib/.aip outfile [default: off]
-t, --test execute doctests and exit (use with -v for verbose)
程序给出的选项要比通常情况下的需求复杂得多,因为终端用户一般并不会关心对哪种特定格式使用的是什么读取器或写入器。在一个更贴近实际的程序版本中,读取器或写入器选项将不会存在,并且我们对每种数据格式只会实现唯一的一种读取器或写入器。类似地,测试选项在这里的作用是为了便于对程序进行测试,而在实际的程序中将不复存在。
该程序定义了一个自定义异常:
class IncidentError(Exception): pass
航空器事故存放在Incident对象中,下面给出的是其class行以及初始化程序:
class Incident:
def __init__(self, report_id, date, airport, aircraft_id, aircraft_type, pilot_percent_hours_on_type, pilot_total_hours, midair, narrative=""):
assert len(report_id) >= 8 and len(report_id.split()) == 1, "invalid report ID"
self.__report_id = report_id
self.date = date
self.airport = airport
self.aircraft_id = aircraft_id
self.aircraft_type = aircraft_type
self.pilot_percent_hours_on_type = pilot_percent_hours_on_type
self.pilot_total_hours = pilot_total_hours
self.midair = midair
self.narrative = narrative
报告ID是在创建Incident类时进行验证的,并且作为只读的report_id特性形式存在。所有其他的数据属性都是读/写属性。比如,下面给出的是data这一特性的代码:
©property
def date(self):
return self.__date
@date.setter
def date(self, date):
assert isinstance(date, datetime.date),"invalid date"
self.__date = date
所有其他特性遵循相同的模式,不同之处仅在于使用不同的断言,因此我们在这里不一一赘述。由于我们使用了断言,因此,如果尝试使用无效数据创建Incident,或者试图将现存的某个事故记录的读/写特性设置为无效值,那么程序就会失败。我们选择使用这种强硬的方法,是因为我们需要确保保存或加载的数据总是有效的,如果无效,我们希望程序终止并给出相关信息,而不是带着错误继续运行。
事故集存放在Incidentcollection中,该类是dict的一个子类,因此,借助于继承机制,我们可以获取大量的功能,比如对项存取操作符[]的支持,并使用该操作符获取、 设置或删除其中的某个事故记录。下面给出的是该类的class行以及其中的一些方法。
class IncidentCollection(dict):
def values(self):
for report_id in self.keys():
yield self[report_id]
def items(self):
for report_id in self.keys():
yield (report_id, self[report_id])
def __iter__(self):
for report_id in sorted(super().keys()):
yield report_id
keys = __iter__
我们不需要重新实现初始化程序,因为dict.__init__()已足够。字典键为报告的ID, 字典值则为Incidents.我们重新实现了 values()、items()与keys()等方法,以便其迭代子以报告ID的顺序进行处理,之所以有这种效果,是因为values()方法与items()方法 以IncidentCollection.keys()返回的键进行迭代这一方法(实际上就是IncidentCollection.__iter__()的另外一个名称)本身则以基类的dict.keys()方法返回的有序键进行迭代。
此外,Incidentcollection类还包含export()方法与import_()方法。(我们在方法名的结尾使用下划线,以便与内置的import语句区分开。)export()方法可以接受的参数包括一个文件名,以及可选的写入器与压缩标志,该方法以文件名与写入器为基础, 并将所需要的文件处理工作传递给更专业化的方法,比如export_xml_dom()或 export_xml_etree()。 import_()方法可以接受的参数包括文件名以及一个可选的读取器, 其工作方式与export()方法类似。读取二进制格式数据的导入方法并不会被告知文件是 否进行了压缩——而是需要自行判断并根据不同情况进行正确处理。
二进制数据的读写(Writing and Reading Binary Data)
即便在没有进行压缩处理的情况下,二进制格式通常也是占据磁盘空间最小、保存与加载速度最快的数据格式。最简单的方法是使用pickles,尽管对二进制数据进行手动处理应该会生成最小的文件。
带可选压缩的Pickle(Pickles and Optional Compression)
Pickle提供了从Python程序中保存数据(或向Python程序加载数据)的最简单方法,但在上一节中曾经讲过,pickle没有安全机制(没有加密,也没有数字签名),因此,加载来自不可信源的pickle可能是危险的。之所以会产生安全问题, 是因为pickle可以导入任意模块并调用任意函数,因此,来自不可信源的pickle中的数据可能会被恶意操纵,比如,在加载pickle时使得解释器执行一些有害的行为。 尽管如此,pickle通常仍然是处理ad hoc数据的理想选择,针对个人用途的程序更是如此。
在创建文件格式时,将保存代码写在加载代码之前通常更容易,因此,我们首先看如何将事故数据保存到pickle中。
def export_pickle(self, filename, compress=False):
fh = None
try:
if compress:
fh = gzip.open(filename, "wb")
else:
fh = open(filename, "wb")
pickle.dump(self, fh, pickle.HIGHEST_PROTOCOL)
return True
except (EnvironmentError, pickle.PicklingError) as err:
print("{0}: export error: {1}".format(os.path.basename(sys.argv[0]), err))
return False
finally:
if fh is not None:
fh.close()
如果要求进行压缩,我们可以使用gzip模块的gzip.open()函数来打开文件,否则就使用内置的open()函数。在以二进制模式pickling数据时,我们必须使用“二进制 写"模式(“wb”)。(在 Python 3.0 中,pickle.HIGHEST_PROTOCOL 表示 protocol 3, 一种紧凑的二进制pickle格式。)
对于错误处理,我们选择的方式是只要发生错误就立即向用户报告,并向调用者返回一个布尔型值,以表明是成功还是失败。我们还使用了 finally语句块,以确保文件被关闭,而不管是否有错误发生。这段代码与前一章中看到的代码非常类似,但还是有比较微妙的一点需要注意。 pickle数据是self, —个dict,但字典的值是Incident对象,也就是说,属于自定义类的一个对象。pickle模块具有足够的自适应能力保存大多数自定义类的对象,而不需要人工干预。
通常,布尔型、数值型以及字符串都可以pickled,类(包括自定义类)的实例也可以pickled,前提是其私有的__dict__是picklable。此外,还有任意内置的组合类型 (元组、列表、递归结构等)。并且,对自定义类中通常不能被pickled的其他类型的对象或实例(比如,因为其包含一个nonpicklable属性)进行pickle也是可能的,这或者通过给pickle模块一些帮助,或者通过实现自定义的pickle与unpickle函数来完成。 所有相关的详细资料在pickle模块的在线文档中都有提供。
要读回pickled数据,我们需要区分开压缩的与未压缩的pickle。使用gzip压缩的任意文件都以一个特定的魔数引导,魔数(magic number)是一个或多个字节组成的序列,位于文件的起始处,用于指明文件的类型。对gzip文件,其魔数为两个字节的0xlF 0x8B,并存放在一个bytes变量中:
CZIP_MACIC = b“\x1F\x8B"
下面给出的是用于读入事故pickles文件的代码:
def import_pickle(self, filename):
fh = None
try:
fh = open(filename, "rb")
magic = fh.read(len(GZIP_MAGIC))
if magic == GZIP_MAGIC:
fh.close()
fh = gzip.open(filename, "rb")
else:
fh.seek(0)
self.clear()
self.update(pickle.load(fh))
return True
except (EnvironmentError, pickle.UnpicklingError) as err:
print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]), err))
return False
finally:
if fh is not None:
fh.close()
我们并不知道给定的文件是否进行了压缩,但无论压缩与否,都以“二进制读”的模式打开文件。之后读入其头两个字节,如果这两个字节与gzip魔数相同,就关闭该文件,并使用gzip.open()函数重新创建一个文件对象。如果该文件没有进行压缩,就使用open()返回的文件对象,并调用其seek()方法将文件指针重置到文件起始处,以便下一步对文件数据的读取操作(在pickle.load()函数内部进行)可以从文件起始处开始。
我们不能对self赋值,因为这会擦除使用中的IncidentCollection对象,因此,我们的方法是清除所有事故,使得字典变空,之后借助dict.update()方法,并使用从pickle 加载的IncidentCollection字典中的所有事故来生成字典。
要注意的是,处理器类型是big-endian还是little-endian没有实际影响,因为对魔数,我们读入的只是单独的字节,而对数据,pickle模块会自动处理字节序。
带可选压缩的原始二进制数据(Raw Binary Data With Optional Compression)
如果编写自己的代码来处理原始二进制数据,就可以对文件格式施加完全的控制, 这应该比使用pickle更具安全性,因为恶意的无效数据将由我们自己的代码控制,而不是由解释器执行。创建自定义的二进制文件时,创建一个用于标识文件类型的魔数以及用于标识文件格式版本的版本号是有意义的,下面给出的是convert-incidents.py程序中使用的定义:
MAGIC = b"AIB\x00"
FORMAT_VERSION = b"\x00\x01"s
我们使用4个字节表示魔数,使用两个字节表示版本号。字节序不是问题,因为数据是以单独的字节形式写入的,而不是以整数的字节表示形式写入的,因此,在任何处理器体系结构上都是一致的。
要读写原始二进制数据,我们必须有一些方法,以便实现Python对象与适当的二进制表示形式之间的转换。我们所需要的大部分功能都是由struct模块以及bytes与 bytearray这两种数据类型提供的。
遗憾的是,struct模块只能处理指定长度的字符串,而我们需要可变长度的字符串, 以便表示报告与航空器ID,以及机场、航空器类型、描述性的文本信息等对象。为满足这些需求,我们创建了一个名为pack_string()的函数,该函数以一个字符串为参数, 并返回一个bytes对象,其中包含两个部分:第1部分是一个整数型的长度计数,第2 部分则是长度计数UTF-8编码字节(表示字符串的文本)序列。
由于只有在export_binary()函数内部才需要使用pack_string()函数,我们将 pack_string()函数定义在export_binary()函数之内,这意味着,在export_binary()函数之外,pack_string()函数是不可见的,也即该函数仅仅是一个本地的帮助者函数。下面给出的是export_binary()函数的起点,以及完全嵌套在其中的pack_string()函数:
def export_binary(self, filename, compress=False):
def pack_string(string):
data = string.encode("utf8")
format = "<H{0}s".format(len(data))
return struct.pack(format, len(data), data)
bytes与bytearray数据类型
Python提供了两种数据类型用于处理原始字节:固定的数据类型bytes,可变的数据类型bytearray。这两种数据类型都用于存放0个或多个8位的无符号整数(字节),每个字节所代表的值范围在0到255之间。这两种数据类型与字符串都非常类似,并提供了很多同样的方法,包括对数据分片的支持等。此外,bytearrays还提供了一些变异的、类似于列表的方法。尽管bytes或bytearray数据类型的分片返回的是同样类型的对象,但使用项存取操作符[]存取单独的字节时返回的却是整数——指定的字节所代表的值。比如:
word = b"Animal"
x = b"A"
word[0] ==x # returns: False # word[0]== 65; x == b”A”
word[:1] ==x # returns: True # word[:1]== b"A"; x == b”A"
word[0]==x[0] # returns: True # word[0]== 65; x[0]==65
#下面给出其他一些bytes与bytearray数据类型的实例:
data = b"5 Hills \x35\x20\x48\x69\x6C\x6C\x73"
data.upper() # returns: b‘5 HILLS 5 HILLS'
data.replace(b”ill“,b”at") # returns: b'5 Hats 5 Hats'
bytes.fromhex(“35 20 48 69 6C 6C 73") # returns: b'5 Hills'
bytes.fromhex("352048696C6C73") # returns: b'5 Hills'
data = bytearray(data) # data is now a bytearray
data.pop(10) # returns: 72 (ord("H"))
data.insert(10, ord("B")) # data == b'5 Hills 5 Bills'
只对字符串有意义的方法,比如bytes.upper(),假定字节是使用ASCII进行编码的。bytes.fromhex()类方法忽略空格,并将每一个包含两个数字的子字符串解释为十六进制数,因此,“35”被认为是一个字节,其值为0x35,其他实例依此类推。
str.encode()方法返回一个bytes对象,并根据制定的编码格式对字符串进行编码。UTF-8是一种非常便利的编码格式,因为这种编码可以表示任意的Unicode字符,并且在表示ASCII字符时尤其紧凑(每个字节表示一个)。变量format被设置为存放一个struct格式(基于字符串的长度),比如,给定字符串"en.wikipedia.org”,则格式应该为"<H16s" (little-endian字节顺序,2字节的无符号整数,16字节的byte字符串),返回的bytes对象则应该为b'\x10\x00en.wikipedia.org'。便利的是,Python会尽可能地以紧凑格式来表示bytes对象,主要是使用可打印的ASCII字符,否则就使用十六进制转义字符(以及一些特殊的转义字符,比如\t与\n)。
语法 bytes 与 bytearray 方法
ba.append(i) 将整数i(取值范围0到255)附加到bytearray ba中
b.capitalize() 返回bytes/bytearray b的副本,并且第一个字符变为大写(如果是一个ASCII字符)
b.center(width, byte) 返回b的副本,b在长度为width的区域中间,并使用空格或给定的byte (可选的) 进行填充
b.count(x, start, end) 返回 bytes/bytearray x 在 bytes/bytearray b 中(或 b 的 start:end 分片中)出现的次数
b.decode(encoding, error) 返回一个str对象,代表使用UTF-8编码表示的(或使用指定的encoding表示并根据可选的error参数进行错误处理)字节
b.endswith(x, start, end) 如果b (或b的start:end分片)以bytes/bytearray x或使用元组x中的任意 bytes/bytearrays结尾,就返回True,否则返回False
b.expandtabs(size) 返回bytes/bytearray b的副本,并且其中的制表符使用空格(个数为8的倍数,或指定的size)替代
ba.extend(seq) 使用序列seq中的所有ints对bytearray ba进行扩展,所有ints必须在0到255之间
b.find(x, start, end) 返回bytes/bytearray x在b (或b的startxnd分片)中最左边的位置,如果没有找到, 就返回-1。使用rfind()方法可以找到最右边的位置
b.fromhex(h) 返回一个bytes对象,其字节对应的是str h中的十六进制整数
b.index(x, start, end) 返回x在b (或b的start:end分片)中最左边的位置,如果没找到,就产生valueError 异常。使用rindex()方法可以找到最右边的位置
ba.insert(p, i) 将整数i (取值范围0到255)插入到ba中的位置p处
b.isalnum() 如果bytes/bytearray b非空,并且b中的每个字符都是ASCII字母数字字符,就返回 True
b.isalpha() 如果bytes/bytearray b非空,并且b中的每个字符都是ASCH字母字符,就返回True
b.isdigit() 如果bytes/bytearray b非空,并且b中的每个字符都是ASCII数字,就返回True
b.islower() 如果bytes/bytearray b包含至少一个可小写的ASCII字符,并且其所有可小写的字符都是小写的,就返回True
b.isspace() 如果bytes/bytearray b非空,并且b中的每个字符都是ASCII空格字符,就返回True
b.istitle() 如果b是非空并且首字母大写的,就返回True
b.isupper() 如果bytes/bytearray b包含至少一个可大写的ASCII字符,并且其所有可大写的字符都是小写的,就返回True
b.join(seq) 返回序列seq中每个bytes/bytearray进行连接后所得的结果,并在每两个之间添加一 个b (可以为空)
b.ljust(width, byte) 返回bytes/bytearray b的副本,并且要求左对齐,长度为width,使用空格或给定的byte(可选的)进行填充。使用rjust()方法可以右对齐
b.lower() 返回bytes/bytearray b的副本,其中的ASCII字符都为小写
b.partition(sep) 返回一个元组,其中包含3个bytes对象一包括b的最左边bytes/bytearray sep之前 的那部分、sep本身以及b中sep之后的那部分;如果b中不包含sep,就返回b以及 两个为空的bytes对象。使用rpartitionO方法可以在sep的最右边出现处进行分割
ba.pop(p) 移除并返回ba中索引位置p处的整数
ba.remove(i) 从bytearray ba中移除整数i的首次出现
b.replace(x,y, n) 返回b的一个副本,其中bytes/bytearray x的每个(或最多n个,如果给定)出现都 用y进行替代
ba.reverse() 反转bytearray ba的字节
b.split(x, n) 返回一个字节列表,在x处进行分割(至多n次),如果没有给定n,就在可能的地方都进行分割;如果没有给定x,就在空白字符处进行分割。使用rsplit()可以从右边开始进行分割
b.splitlines(f) 返回对b进行分割(在行终结符处)后产生的行列表,如果f不为True,就剥离掉行终结符
b.startswith(x, start,end) 如果 bytes/bytearray b (或 b 的 start:end 分片)以 bytes/bytearray x (或元组 x 中的任意bytes/bytearrays)引导,就返回True,否则返回False
b.strip(x) 返回b的副本,并剥离掉开始与结尾处的空白字符(或bytes/bytearray x中的字节), Istrip()只剥离起始处的,rstrip()只剥离结尾处的
b.swapcase() 返回b的副本,并使其中的大写字符变为小写,小写字符变为大写
b.title() 返回b的副本,其中每个字的第一个ASCII字符都是大写的,其他所有ASCII字符 则都是小写的
b.translate(bt,d) 返回b的一个副本,其中不包括来自d的字节,并且每个字节都被bytes bt的相应字节替换
b.upper() 返回bytes/bytearray b的副本,其中ASCII字符都变为大写
b.zfill(w) 返回b的副本,如果长度小于w,就使用引导字符(0x30)进行填充,使其长度为w
struct模块
提供了 struct.pack()、struct.unpack()以及其他一些函数,还提供了 struct.Struct()类。struct.pack()函数以一个struct格式化字符串以及一个或多个值为参数,并返回一个bytes对象,其中存放的是按照该格式规范表示的所有这些参数值。 struct.unpack()函数以一个格式规范以及一个bytes或bytearray对象为参数,并返回一个元组,其中的值原本使用该格式规范进行了打包,比如:
data = struct.pack("<2h", 11,-9) # data == b'\x0b\x00\xf7\xff‘
items = struct.unpack("<2h", data) # items ==(11, -9)
格式化字符串包含一个或多个字符,大多数字符都表示某种特定类型的值。如果对某种类型需要不止一个值,我们或者将该字符多写几次(次数与需要该类型值的位置数相同),比如"hh”,或者使用一个计数值来引导该字符,比如我们这里所做的("2h")。
很多格式化字符都在struct模块的在线文档中进行了描述,包括“b” ( 8位的有符号整数)、“B”(8位的无符号整数)、“h”(16位的有符号整数——这里的实例中使用了这一格式化字符)、“H”(16位的无符号整数)、“i”(32位的有符号整数)、 “I”(32位的无符号整数)、“q”(64位的有符号整数)、“Q”(64位的无符号整数)、 “f (32位浮点数)、“d” ( 64位浮点数,等价于Python的float类型)、"?”(布尔型)、 “s”(bytes或bytearray对象一字节字符串),还有很多其他的格式化字符。
对有些数据类型,比如多字节整数,处理器的字节序会影响字节顺序,我们可以强制使用某种特定的字节顺序,而不管处理器本身的体系结构,这是通过使用 endianness字符引导格式化字符串来实现的。在本书中,我们总是使用“<”,表示的是little-endian字节顺序,这也是广泛使用的Intel、AMD等处理器采用的本原字节序。big-endian (也称为网络字节顺序)则使用“>”(或"!”)进行标识。如果没 有显式地指定字节序,就使用该机器本身的字节序。我们建议总是使用字节序,即便与机器本身的字节序相同,因为这样做可以保证数据是可移植的。
struct.calcsize() 函数以一个格式规范为参数,并返回使用该格式规范的struct所占据的字节数。格式规范也可以通过创建一个struct.Struct()对象存储(将该格式规范作为其参数),而struct.Struct()对象的大小则由其size属性指定。比如:
TWO_SHORTS = struct.Struct("<2h")
data =TWO_SHORTS.pack(11, -9) # data == b'\x0b\x00\xf7\xff'
items = TWO_SHORTS.unpack(data) # items ==(11, -9)
在两个实例中,11都是0x000b,但被转换为字节0x0b 0x00,因为我们使用的是little-endian字节顺序。
pack_string()函数可以处理至多包含65 535个UTF-8字符的字符串。我们可以很容易地使用不同类型的整数来表示字节计数,比如,4字节的有符号整数(格式为“i”), 表示字符串至多可以包含231-1 (多于20亿)个字符。
struct模块提供了一种类似的内置格式,即“p”,该格式将单个字节作为字符存放, 其后跟随至多255个字符。出于打包的需要,使用格式“p”的代码会比完全手动完成要简单一些,但“p”同时也有一个至多使用255个UTF-8字符的限制,并且在拆分时几乎没有提供任何优势。(出于比较的需要,使用“p”的pack_string()函数与 unpack_string()函数在 convert-incidents.py 源文件中提供。)现在,.我们将注意力转移到export_binary()方法的余下代码:
我们没有给出except代码块与finally代码块,因为使用的代码与上一小节中给出的基本上是相同的,不同之处就是except代码块捕获的特定异常有所差别。
我们从以“二进制写”模式打开文件开始,文件或者是通常的文件,或者是gzip压缩后的文件,依赖于compress标记。之后写入4字节的魔数(期望该数值对本程序是独 一无二的)与2字节的版本号*。使用版本号的好处是将来改变格式时更加容易——读入版本号时,我们使用该值确定使用哪些代码来进行读取操作。
fh = None
try:
if compress:
fh = gzip.open(filename, "wb")
else:
fh = open(filename, "wb")
fh.write(MAGIC)
fh.write(FORMAT_VERSION)
for incident in self.values():
data = bytearray()
data.extend(pack_string(incident.report_id))
data.extend(pack_string(incident.airport))
data.extend(pack_string(incident.aircraft_id))
data.extend(pack_string(incident.aircraft_type))
data.extend(pack_string(incident.narrative.strip()))
data.extend(NumbersStruct.pack(
incident.date.toordinal(),
incident.pilot_percent_hours_on_type,
incident.pilot_total_hours,
incident.midair))
fh.write(data)
return True
接下来,我们在所有事故记录上进行迭代,对每一条事故记录,我们都创建一个 bytearray。我们将数据的每一项都添加到字节数组,从可变长度的字符串开始。 date.toordinal()方法会返回一个单一的整数,表示的是存储的日期,通过将这一整数传递给datetime.date.fromordinal()方法,可以恢复日期数据。NumbersStruct则是在程序前面使用如下语句定义的:
NumbersStruct = struct.Struct("<ldi?")
这一格式指定了 little-endian字节顺序、一个无符号32位整数(用于表示日期序数)、一个64位float (用于表示该类型飞行时间所占百分比)、一个32位整数(用于表示总飞行时间)以及一个布尔型值(用于表示该事故是否是空中发生)。图7-3中展示了整个航空器事故记录的结构。
bytearray包含了某条事故记录的所有数据后,我们将其写入到磁盘中,所有事故都写入到磁盘后,返回True (假定没有错误发生)。finally语句块可以确保文件恰好在返回之前关闭。数据的读回不像写入那么直接——首先,我们需要更多的错误检査操作。并且, 读回可变长度的字符串也是棘手的。下面给出的是import_binary()方法的起点,以及完整的unpack_string()函数,该函数用于读回可变长度的字符串:
def import_binary(self, filename):
def unpack_string(fh, eof_is_error=True):
uint16 = struct.Struct("<H")
length_data = fh.read(uint16.size)
if not length_data:
if eof_is_error:
raise ValueError("missing or corrupt string size")
return None
length = uint16.unpack(length_data)[0]
if length == 0:
return ""
data = fh.read(length)
if not data or len(data) != length:
raise ValueError("missing or corrupt string")
format = "<{0}s".format(length)
return struct.unpack(format, data)[0].decode("utf8")
每条事故记录都以报告ID这一字符串开始,尝试读取该字符串并成功读取时, 文件指针将处在新记录的起始处,读取失败,文件指针则在文件末尾处,并可以结束。 在尝试读取报告ID时,我们将eof_is_error标记设置为False,因为如果没有多余的数据,那么这种设置仅仅意味着工作巳经完成。对所有其他字符串,其默认值则为True, 因为如果任意其他字符串不再包含数据,就说明是一个错误。(即便一个空字符串也是 由一个16位的无符号整数长度引导的。我们从尝试读取字符串的长度开始,如果失败,就返回None,以便表示已经到了文件末尾(如果我们读取的是一条新事故记录),或者产生ValueError异常,以便表示数据损坏或丢失。struct.unpack()函数与struct.Struct.unpack()方法总是返回一个元组, 即便其中只包含一个单一的值。我们拆分出长度数据,并将其代表的数字存放在变量 length中。现在,我们已经知道,需要读取多少个字节才可以完整地读回该字符串。
th = None
try:
fh = open(filename, "rb")
magic = fh.read(len(GZIP_MAGIC))
if magic == GZIP_MAGIC:
fh.close()
fh = gzip.open(filename, "rb")
else:
fh.seek(0)
magic = fh.read(len(MAGIC))
if magic != MAGIC:
raise ValueError("invalid .aib file format")
version = fh.read(len(FORMAT_VERSION))
if version > FORMAT_VERSION:
raise ValueError("unrecognized .aib file version")
self.clear()
如果长度为0,就只是简单地返回一个空字符串。如果长度不为0,就尝试读取指定数 量的字节数。如果没有获取任何数据,或数据长度不是我们所期望的长度值(也就是说太少),就产生ValueError异常。
如果读取了正确数量的字节,就为struct.unpack()函数创建一个适当的格式化字符串,并返回一个字符串——此字符串从拆分数据并将字节解码为UTF-8得来。(理论上,对上面给出的代码,我们可以使用语句return data.decode("utf8”)来替代其最后两行,但我们更愿意经历拆分过程,因为“s”格式对我们的数据(在读回时必须反转) 执行一些转换是可能的——尽管不太可能。)
现在我们来查看import_binary()方法的其余部分,为便于说明,将其分为两部分讲解。
文件可以是压缩的,也可以是未压缩的,因此,我们使用了与读取pickle时同样的技术,也即使用gzip.open()函数或内置的open()函数来打开文件。
打开文件并且文件指针位于文件起始处时,我们首先读入头4个字节(len(MAGIC))。 如果与我们的魔数值不匹配,就可以判断不是一个二进制航空器事故数据文件,并产生一 个ValueError异常。接下来要读入的是2字节的版本号,并根据不同的版本号使用不同的读入代码,不过这里只是检查版本号是不是本程序不能读取的后续版本。
如果魔数是对的,版本号也是我们可以进行处理的,就可以开始读入数据,因此, 我们从清除所有现存事故记录开始,以便将字典清空。
while True:
report_id = unpack_string(fh, False)
if report_id is None:
break
data = {}
data["report_id"] = report_id
for name in ("airport", "aircraft_id", "aircraft_type", "narrative"):
data[name] = unpack_string(fh)
other_data = fh.read(NumbersStruct.size)
numbers = NumbersStruct.unpack(other_data)
data["date“] = datetime.date.fromordinal(numbers[0])
data["pilot_percent_hours_on_type"] = numbers[l]
data["pilot_total_hours"] = numbers[2]
data["midair"] = numbers[3]
incident = lncident(**data)
self[incident.report_id] = incident
return True
while语句块一直运行,直至已超出数据范围。我们从尝试获取报告ID开始,如 果返回的是None,就说明已经到达文件末尾,此时可以跳出循环。如果尚未到达文件 末尾,就创建一个名为data的字典来存放某个事故记录的数据,并尝试获取该事故记 录的其余数据。对字符串,我们使用unpack_string()方法,对其他数据,则使用 NumbersStruct结构一次读入。由于我们将日期存储为顺序的,因此,如果要读回日期数据,就必须对其进行反向转换。但对于其他数据项,我们可以只是使用拆分后的数据——而不需要进行验证或转换,因为首先我们存储时就使用正确的数据类型,在读回同样的数据时也只需要使用NumbersStruct结构中存放的格式。
如果有任何错误发生,比如没有成功地拆分所有数字,就产生异常,并由except 语句块进行处理。(我们没有展示except语句块与finally语句块,因为这两个语句块 在结构上与上一小节中import_pickle()方法的相应语句块是相同的。)
最后,我们使用方便的映射拆分语法来创建一个Incident对象,之后将其存放在 incidents 字典中。
除了对可变长度字符串的处理功能外,struct模块也为以二进制格式保存与加载数据提供了很多便利。对可变长度字符串,这里展示的pack_string()方法与unpack_string() 方法可以满足大多数需求。
以上内容部分摘自视频课程05后端编程Python15文件处理(上),更多实操示例请参照视频讲解。跟着张员外讲编程,学习更轻松,不花钱还能学习真本领。