前言 Apache Arrow是一个用于内存分析的跨语言开发平台。它定义了一种标准的、语言无关的列式内存数据格式。 这种格式支持平整的和嵌套的数据结构。它还提供了一些计算库,零拷贝流式消息和内部进程通信。 Arrow的主要用处可以是大数据的快速移动和处理。由于是开发平台,Arrow包含了许多组件:
Arrow列式内存格式:一个标准和高效的内存表示。可用于平的和嵌套的数据结构,做到了语言无关。
Arrow IPC格式:一种高效的序列化格式,并且带有元信息,可用于进程和异构环境间的通信
Arrow Flight RPC协议:基于Arrow IPC格式,用于远程服务交换arrow数据与应用定义的语义数据
C++, C, C#, Go, Python, Matlib, Java等等库
Grandiva: 一个LLVM编译器
Plasma对象存储:一个共享内存blob存储
本文主要展示一些python的实践案例和源码解读, 希望能够总结以期有进一步了解Arrow.
Dive into 下面让我们去看一看pyarrow的源代码(1.0.0)
pyarrow项目目录 ![](/images/screenshots/Screen Shot 2020-08-28 at 3.53.07 PM.png)
pip安装的pyarrow少了一些cython编写的pyx代码,这些文件被编译成pxd或so后可以被py代码import, 比如from pyarrow.lib import (ChunkedArray, RecordBatch, Table)
是从lib.so中导入的。
pyarrow._init_ .py源码解读 首先导入版本号,如果不是通过包安装,那么版本通过解析git describe
确定版本。 接着导入cython的pyarrow.lib库,由于Cython有个bug(https://github.com/cython/cython/issues/3603 ), 这里暂时关掉gc。 然后有一个show_versions
的函数可以查看c++版本信息:
1 2 3 4 5 6 7 8 9 >>> pa.show_versions() pyarrow version info -------------------- Package kind: manylinux2010 Arrow C++ library version: 1.0.0 Arrow C++ compiler: GNU 8.3.1 Arrow C++ compiler flags: -fdiagnostics-color=always -O3 -DNDEBUG Arrow C++ git revision: b0d623957db820de4f1ff0a5ebd3e888194a48f0 Arrow C++ git description: apache-arrow-0.16.0-1340-gb0d623957
然后导入Cython定义的各种类型, 导入buffer和IO相关。关于Pyarrow的memory和IO, 下面会介绍。 导入异常, 导入序列化相关,到这lib模块导入完毕。 然后从hdfs.py,ipc.py, filesystem.py, serialization.py,types.py导入相关模块, 定义启动plasma server入口函数和一些其他的包工具函数。
pyarrow的内存和IO管理 本节主要总结pyarrow的内存管理和IO管理,涉及buffer, memory pool和file-like/stream-like对象
访问和分配内存 在pyarrow.__init__.py
可以看到代码的引入:
1 2 3 4 5 6 7 8 9 from pyarrow.lib import (Buffer, ResizableBuffer, foreign_buffer, py_buffer, Codec, compress, decompress, allocate_buffer) from pyarrow.lib import (MemoryPool, LoggingMemoryPool, ProxyMemoryPool, total_allocated_bytes, set_memory_pool, default_memory_pool, logging_memory_pool, proxy_memory_pool, log_memory_allocations, jemalloc_set_decay_ms)
pyarrow.Buffer Buffer
对象是C++代码arrow::Buffer
的封装,作为基础工具管理C++中的arrow内存。一个buffer代表一段连续的内存空间。 大部分buffer拥有他们各自的内存,但是也有例外。Buffer对象可以允许高级array类安全地和属于或不属于他们的内存交互。arrow::Buffer
允许一个buffer访问另一个buffer通过zero-copy, 同时保持内存的生命周期和清晰的父子关系。arrow::Buffer
有很多种实现,但是对外接口是一致的:一个数据指针和长度。有点类似python自带的buffer和memoryview
对象
1 2 3 4 5 6 7 8 9 10 11 >>> import pyarrow as pa>>> data=b'aaaaaaaaaaaaaaaaaaaaaa' >>> buf = pa.py_buffer(data) >>> buf<pyarrow.lib.Buffer object at 0x7fabc0e05d30 > >>> buf.size22 >>> buf.to_pybytes() b'aaaaaaaaaaaaaaaaaaaaaa'
外部的内存,只要有指针和size,保持接口一致,也可以通过foreign_buffer()
来访问。 在创建buffer之后,可以通过memoryview或python buffer装换,这种转化是zero-copy.
Memory Pools 所有内存分配和释放(malloc/free)都可通过arrow::MemoryPool
来追踪。代码在memory.pxi
1 2 3 4 5 6 7 8 9 10 11 12 >>> import pyarrow as pa>>> pa.total_allocated_bytes()0 >>> buf = pa.allocate_buffer(1024 ,resizable=True )>>> pa.total_allocated_bytes()1024 >>> buf.resize(2048 )>>> pa.total_allocated_bytes()2048 >>> buf=None >>> pa.total_allocated_bytes()0
输入输出 Arrow C++库有几个抽象接口用于不同IO类型:
只读流
随机可访问只读文件
只写流
随机可访问只写文件
可读可写可随机访问文件
在pyarrow.__init__.py
可以看到代码的引入:
1 2 3 4 5 6 7 8 9 10 11 12 from pyarrow.lib import (HdfsFile, NativeFile, PythonFile, BufferedInputStream, BufferedOutputStream, CompressedInputStream, CompressedOutputStream, TransformInputStream, transcoding_input_stream, FixedSizeBufferWriter, BufferReader, BufferOutputStream, OSFile, MemoryMappedFile, memory_map, create_memory_map, have_libhdfs, MockOutputStream, input_stream, output_stream) from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table, concat_arrays, concat_tables)
为了能够和python自带file
对象行为一致,arrow定义了NativeFile
(其实是个stream).代码在io.pxi NativeFile
是所有arrow流的基类,arrow流可以是可读,可写,也可以支持seek
.NativeFile
暴露的方法用于读写python的数据对象,然后把他们变成stream传递给其他arrow工具,比如Arrow IPC. cython代码中定义了好几种NativeFile子类:
OSFile, 使用操作系统的描述符
MemoryMappedFile, 使用memory maps做zero-copy读和写。
BufferReader, 把对象转成arrow buffer使用Zero-copy reader
BufferOutputStream, 内存中写数据,最后生成buffer
FixedSizeBufferWriter, 再一句生成的buffer中写数据
HdfsFile, hadoop生态读写数据
PythonFile, 在C++中交互python文件对象,可以对python文件对象使用c++的方法,但是可能有GIL的限制。
CompressedInputStream and CompressedOutputStream, 从流中压缩和解压数据。
高级API input streams input_streams()
函数可以从各种输入创建可读NativeFile
1 2 3 4 5 6 7 8 >>> buf = memoryview (b'some data' )>>> stream = pa.input_stream(buf)>>> stream.read(4 )b'some' >>> stream.read()b' data' >>> stream.read()b''
output streams 同理,output_stream
把stream写成文件。
1 2 3 4 5 6 7 >>> with pa.output_stream('example1.dat' ) as stream:... stream.write(b'some data' )... 9 >>> f = open ('example1.dat' , 'rb' )>>> f.read()b'some data'
OSFile和Memory Mapped Files 对于在磁盘上的文件读写,pyarrow提供标准系统级别的文件api和memory-mapped文件。memory-mapped是在用户态创建虚拟空间来映射磁盘上的内容。 通过对這段虚拟内存的讀取和修改, 实现对文件的讀取和修改。使用虚拟内存映射进行文件读写有几个好处:
可以不用读取整个文件进入物理内存,文件已经在虚拟内存中
可以用对内存的操作命令来操作文件,更加高效
由于实际上这个mapped文件还是文件,与进程无关,所以这段虚拟内存可以共享给多个进程。
1 2 3 4 5 6 7 8 >>> mmap = pa.memory_map('example1.dat' )>>> mmap.read()b'some data' >>> mmap.seek(5 )5 >>> buf=mmap.read_buffer(4 ) >>> buf.to_pybytes()b'data'
内存中buffer读写 1 2 3 4 5 6 7 8 >>> writer = pa.BufferOutputStream()>>> writer.write(b'hello, friends' )>>> buf = writer.getvalue()>>> reader = pa.BufferReader(buf)>>> reader.seek(7 )>>> reader.read(7 )b'friends'
plasma plasma是arrow的一个共享对象存储,plasma只能用在单机上,客户端和服务端使用unix domain socket通信。plasma中的对象是不可变的。
pyarrow.plasma源码 这个文件一上来要导入TensorFlow相关库,暂时跳过。 主要功能函数式,用来启动plasma server.
1 2 3 4 5 6 7 8 9 10 def start_plasma_store(plasma_store_memory, use_valgrind=False, use_profiler=False, plasma_directory=None, use_hugepages=False, external_store=None): # plasma_store_memory定义存储大小 # use_valgrind定义是否使用valgrind和use_profiler互斥 # use_profiler定义是否测试性能 # plasma_directory定义mmap文件位置 # use_hugepages是否使用大文件存储,需要划分文件格式 # external_store溢出的对象存储到外部位置,由于plasma超过预设空间时候会溢出对象。
这个函数会默认创建/tmp/test_plasma-plasma.sock
用于客户端sock连接, 然后就是普通的shell命令行选取参数启动server.
使用plasma共享pandas dataframe 由于arrow支持平整或嵌套的数据结构,尤其适合pandas dataframe或者numpy(使用tensor), 然后我们可以把arrow格式的数据持久化到plasma用于共享对象,实现数据的高效读取。 核心代码就三步,1.创建plasma对象,2.存入转为recordbatch的dataframe, 3.读取plasma的dataframe对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 class PlasmaClient : def __init__ (self, location="plasma" , *args, **kwargs ): self.client = plasma.connect(location) def get_object_id (self, name ): name = self.pad_str_with_20char(name) id = plasma.ObjectID(name) return id def pad_str_with_20char (self, name ): if len (name) < 20 : name = f"{name:<20 } " else : name = name[:20 ] return name.encode() def decode_hex_bytes (self, hex ): return bytes .fromhex(hex ).decode() def get_object (self, object_id ): obj = self.client.get(object_id) return obj def set_object (self, obj ): self.client.put(obj) def create_buffer (self, obj, object_id ): object_size = len (obj) buf = memoryview (self.client.create(object_id, object_size)) for i in range (object_size): buf[i] = i % 128 def seal_buffer (self, object_id ): self.client.seal(object_id) def get_buffer (self, object_id ): [buffer] = self.client.get_buffers([object_id]) return buffer def list_objects (self ): return self.client.list ()
然后写一个子类能够使用plasma存储对象来读取dataframe:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 class PlasmaPandas (PlasmaClient ): def __init__ (self, name, location="plasma" ): super ().__init__(location) self.object_id = self.get_object_id(name) def dataFrame2recordBatch (self, df ): return pa.RecordBatch.from_pandas(df) def create_plasma_obj_from_record_batch (self, record_batch ): mock_sink = pa.MockOutputStream() stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) stream_writer.write_batch(record_batch) stream_writer.close() data_size = mock_sink.size() buf = self.client.create(self.object_id, data_size) stream = pa.FixedSizeBufferWriter(buf) stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema) stream_writer.write_batch(record_batch) stream_writer.close() self.seal_buffer(self.object_id) def get_df_by_name (self ): [data] = self.client.get_buffers([self.object_id]) arrow_buffer = pa.BufferReader(data) reader = pa.RecordBatchStreamReader(arrow_buffer) record_batch = reader.read_next_batch() result = record_batch.to_pandas() return result def get_df_column (self, column ): [data] = self.client.get_buffers([self.object_id]) arrow_buffer = pa.BufferReader(data) reader = pa.RecordBatchStreamReader(arrow_buffer) table = reader.read_all() try : result = table.select(["index" , column]).to_pandas() except KeyError: return pd.DataFrame() return result def store_df_in_store (self, df ): rb = self.dataFrame2recordBatch(df) self.create_plasma_obj_from_record_batch(rb)
在应用中使用:
1 2 3 4 pp = PlasmaPandas('obj_key' ) df = pd.read_csv('example.csv' ) pp.store_df_in_store(df) df = pp.get_df_column('col_name' )