Output¶
定义所有的数据输出抽象(Target),用于Pipeline.write()方法
-
class
bigflow.output.
FileBase
(path, **options)¶ 基类:
object
用于Pipeline.write()方法读取文件的基类
参数: path (str) -- 写文件的path,必须为str类型 >>> # 对数据按照tab分割的第一个元素进行hash,将结果分割成2000个文件,并且保证每个文件内有序。 >>> # 代码示例: >>> pipeline.write( >>> data.map(lambda x: x), >>> output.TextFile("your output dir") >>> .sort() >>> .partition(n=2000, partition_fn=lambda x, n: hash(x.split("\t", 1)[0]) % n) >>> ) >>>
-
class
bigflow.output.
SchemaTextFile
(path, **options)¶ -
读取文本文件生成支持字段操作的SchemaPCollection
参数: - path (str) -- 写文件的path,必须为str类型
- **options -- Arbitrary keyword arguments, 其中关键参数, 若SchemaPCollection的元素是dict, 必须指定columns(list)表示输出的字段名, 若SchemaPCollection的元素是tuple, 可以直接输出所有数据 separator(str)表示每行数据字段分隔符,默认分隔符是Tab(" ")
Example
>>> from bigflow import schema >>> tps = _pipeline.parallelize([('XiaoA', 20), ('XiaoB', 21)]) >>> dicts = tps.map(lambda (name, age): {'name': name, 'age': age}) >>> dicts = dicts.map(lambda _: _, serde=schema.of(['name', 'age'])) # 下一个版本中这行可以省去 >>> _pipeline.write(dicts, output.SchemaTextFile('./output', columns = ['name', 'age'])) >>> _pipeline.run() >>> print open('./output/part-00000').read() XiaoA 20 XiaoB 21
-
transform_to_node
(ptype)¶ 内部接口
-
class
bigflow.output.
SequenceFile
(path, **options)¶ -
输出到SequenceFile文件的Target,SequenceFile的(Key, Value)将被写为BytesWritable,用户使用as_type()函数自行将数据序列化
参数: - path (str) -- 写文件的path,必须为str类型
- **options -- 其中关键参数有: overwrite: 如果目标位置已经存在,是否进行覆盖写操作。默认为True。 async_mode: 是否使用异步写。默认为True。 key_serde: key如何被序列化为字符串。 value_serde: value如果被序列化为字符串。 需要注意, key_serde/value_serde如果设置,则数据必须是一个两个元素的tuple。 如果不设置,则认为全部的数据使用默认序列化器写到sequence file的value中,key为空。
-
as_type
(kv_serializer)¶ 通过kv_serializer将数据序列化为(Key, Value)
参数: kv_serializer (callable) -- 序列化函数 返回: 返回self 返回类型: SequenceFile 注解
kv_deserializer的期望签名为:
kv_deserializer(object) => (str, str)
-
transform_to_node
(ptype)¶
-
class
bigflow.output.
TextFile
(path, **options)¶ -
输出到文本文件的Target
- Args:
path (str): 写文件的path,必须为str类型 **options: 其中关键参数有:
overwrite: 如果目标位置已经存在,是否进行覆盖写操作。默认为True。 async_mode: 是否使用异步写。默认为True。
record_delimiter: 输出文本的分隔符,默认'
- ';
- 若指定为None,则将所有数据按字节流连续输出
-
compression_types
= {'gzip': 1}¶
-
transform_to_node
(ptype)¶ 内部接口
-
class
bigflow.output.
UserOutputBase
¶ 基类:
object
用户Output基类
-
close
()¶ 用户可以重写该方法。
-
get_commiter
()¶ 用户可以重写该方法。 返回一个commiter, 默认表示不需要commit阶段 commiter应该是一个无参函数。
-
open
(partition)¶ 用户可以重写该方法。 传入参数partition表示这是第几个partition
-
partition_fn
()¶ 用户可以重写该方法。 返回一个partition fn。 partition_fn原型应为:(data, total_partition) => partition 返回None则表示不太关心如何partition。
如果partition_number
-
partition_number
()¶ 用户可以重写该方法。 返回一个int型的数,表示总共要把数据partition成多少份。
-
pre_process
(pval)¶ 用户可以重写该方法。 进行前处理,默认不处理
-
sink
(data)¶ 用户可以重写该方法。
该方法对每条数据调用一次
-
-
bigflow.output.
user_define_format
(user_output_base)¶ 内部函数