Output

定义所有的数据输出抽象(Target),用于Pipeline.write()方法

class bigflow.output.FileBase(path, **options)

基类:object

用于Pipeline.write()方法读取文件的基类

参数:path (str) -- 写文件的path,必须为str类型
>>> # 对数据按照tab分割的第一个元素进行hash,将结果分割成2000个文件,并且保证每个文件内有序。
>>> # 代码示例:
>>> pipeline.write(
>>>         data.map(lambda x: x),
>>>         output.TextFile("your output dir")
>>>             .sort()
>>>             .partition(n=2000, partition_fn=lambda x, n: hash(x.split("\t", 1)[0]) % n)
>>> )
>>>
partition(n=None, partition_fn=None)

对输出结果进行分组

参数:
  • n (int) -- 输出结果的分组个数,具体表现为产生n个输出文件,文件内容为各组数据
  • partition_fn (callable) -- 用于指定分组方式的函数
返回:

返回self

返回类型:

FileBase

sort(reverse=False)

根据数据实际值对数据进行排序(默认为升序)

参数:reverse (bool) -- 是否降序排序
返回:返回self
返回类型:FileBase
sort_by(key_read_fn=None, reverse=False)

通过key_read_fn获取key,并根据key对数据进行排序(默认为升序)

参数:
  • key_reader_fn (callable) -- 用户获取key的函数
  • reverse (bool) -- 是否降序排序
返回:

返回self

返回类型:

FileBase

class bigflow.output.SchemaTextFile(path, **options)

基类:bigflow.output.TextFile

读取文本文件生成支持字段操作的SchemaPCollection

参数:
  • path (str) -- 写文件的path,必须为str类型
  • **options -- Arbitrary keyword arguments, 其中关键参数, 若SchemaPCollection的元素是dict, 必须指定columns(list)表示输出的字段名, 若SchemaPCollection的元素是tuple, 可以直接输出所有数据 separator(str)表示每行数据字段分隔符,默认分隔符是Tab(" ")

Example

>>> from bigflow import schema
>>> tps = _pipeline.parallelize([('XiaoA', 20), ('XiaoB', 21)])
>>> dicts = tps.map(lambda (name, age): {'name': name, 'age': age})
>>> dicts = dicts.map(lambda _: _, serde=schema.of(['name', 'age'])) # 下一个版本中这行可以省去
>>> _pipeline.write(dicts, output.SchemaTextFile('./output', columns = ['name', 'age']))
>>> _pipeline.run()
>>> print open('./output/part-00000').read()
XiaoA   20
XiaoB   21
transform_to_node(ptype)

内部接口

class bigflow.output.SequenceFile(path, **options)

基类:bigflow.output.FileBase

输出到SequenceFile文件的Target,SequenceFile的(Key, Value)将被写为BytesWritable,用户使用as_type()函数自行将数据序列化

参数:
  • path (str) -- 写文件的path,必须为str类型
  • **options -- 其中关键参数有: overwrite: 如果目标位置已经存在,是否进行覆盖写操作。默认为True。 async_mode: 是否使用异步写。默认为True。 key_serde: key如何被序列化为字符串。 value_serde: value如果被序列化为字符串。 需要注意, key_serde/value_serde如果设置,则数据必须是一个两个元素的tuple。 如果不设置,则认为全部的数据使用默认序列化器写到sequence file的value中,key为空。
as_type(kv_serializer)

通过kv_serializer将数据序列化为(Key, Value)

参数:kv_serializer (callable) -- 序列化函数
返回:返回self
返回类型:SequenceFile

注解

kv_deserializer的期望签名为:

kv_deserializer(object) => (str, str)

transform_to_node(ptype)
class bigflow.output.TextFile(path, **options)

基类:bigflow.output.FileBase

输出到文本文件的Target

Args:

path (str): 写文件的path,必须为str类型 **options: 其中关键参数有:

overwrite: 如果目标位置已经存在,是否进行覆盖写操作。默认为True。 async_mode: 是否使用异步写。默认为True。

record_delimiter: 输出文本的分隔符,默认'

';
若指定为None,则将所有数据按字节流连续输出
compression_types = {'gzip': 1}
transform_to_node(ptype)

内部接口

with_compression(compression_type)

对输出文件进行压缩

参数:compression_type (str) -- 压缩格式,目前仅支持"gzip"
返回:返回self
返回类型:TextFile
class bigflow.output.UserOutputBase

基类:object

用户Output基类

close()

用户可以重写该方法。

get_commiter()

用户可以重写该方法。 返回一个commiter, 默认表示不需要commit阶段 commiter应该是一个无参函数。

open(partition)

用户可以重写该方法。 传入参数partition表示这是第几个partition

partition_fn()

用户可以重写该方法。 返回一个partition fn。 partition_fn原型应为:(data, total_partition) => partition 返回None则表示不太关心如何partition。

如果partition_number

partition_number()

用户可以重写该方法。 返回一个int型的数,表示总共要把数据partition成多少份。

pre_process(pval)

用户可以重写该方法。 进行前处理,默认不处理

sink(data)

用户可以重写该方法。

该方法对每条数据调用一次

bigflow.output.user_define_format(user_output_base)

内部函数