Input

定义所有的数据源(Source),用于Pipeline.read()方法

实现一个Source需要实现四个接口:

  1. 有一个input_format属性,是一个flume::Loader
  2. 有一个objector属性,是一个Objector
  3. 有一个uris属性,返回一个uri列表
  4. 有一个transform_from_node方法,把一个Node变换成一个PType
  5. 有一个get_size方法,计算本文件读出数据量有多少。可以返回-1(?)表示未知大小。
class bigflow.input.FileBase(*path, **options)

基类:object

用于Pipeline.read()方法读取文件的基类

参数:*path -- 读取文件的path,必须均为str或unicode类型
get_size(pipeline)

获得所有读取文件在文件系统中的大小

返回:文件大小,以字节为单位
返回类型:int
class bigflow.input.SchemaTextFile(*path, **options)

基类:bigflow.input.TextFile

读取文本文件生成支持字段操作的SchemaPCollection

参数:
  • *path -- 读取文件的path, 必须均为str类型
  • **options --

    Arbitrary keyword arguments, 其中关键参数, (1). 若columns(list), 每一项为字段名,则生成SchemaPCollection的元素是dict,dict中的value类型都是str;

    (2). 若columns(list), 每一项为(字段名,类型),则生成SchemaPCollection的元素是dict, dict中的值类型是字段对应的类型;

    (3). 若columns(int),表示分割的列数,则生成SchemaPCollection的元素是tuple,tuple中的每个元素的类型都是str, separator(str)表示每行数据字段分隔符,默认分隔符是Tab(" ");

    (4). 若columns(list), 每一项为python基本类型(int, str, float),则生成SchemaPcollection的元素是tuple, 每个tuple中的元素的类型和columns中的类型一一对应;separator(str)表示每行数据字段分隔符,默认分隔符是Tab(" ");

    ignore_overflow(bool)表示如果文件有多余的列,是否可以忽略掉。默认为False,即出现多余的列时即会报错。

    ignore_illegal_line(bool): 表示当文件某一行的列数少于提供的字段数时,是否可以忽略该文件行。若不设置,则抛出异常

Example

>>> open("input-data", "w").write("XiaoA\t20\nXiaoB\t21\n")
>>> persons = _pipeline.read(input.SchemaTextFile("input-data", columns = ['name', 'age']))
>>> persons.get()
[{'age': '20', 'name': 'XiaoA'}, {'age': '21', 'name': 'XiaoB'}]
>>> open("input-data", "w").write("XiaoA\t20\nXiaoB\t21\n")
>>> persons = _pipeline.read(input.SchemaTextFile("input-data", columns = [('name', str), ('age', int)]))
>>> persons.get()
[{'age': 20, 'name': 'XiaoA'}, {'age': 21, 'name': 'XiaoB'}]
>>> open("temp_data.txt", "w").write("1\t2.0\tbiflow\n10\t20.10\tinf")
>>> data = p.read(input.SchemaTextFile("./temp_data.txt", columns=3))
>>> data.get()
[('1', '2.0', 'biflow'), ('10', '20.1', 'inf')]
>>> open("temp_data.txt", "w").write("1\t2.0\tbiflow\n10\t20.10\tinf")
>>> data = p.read(input.SchemaTextFile("./temp_data.txt", columns=[int, float, str]))
>>> data.get()
[(1, 2.0, 'biflow'), (10, 20.1, 'inf')]
transform_from_node(load_node, pipeline)

内部接口

class bigflow.input.SequenceFile(*path, **options)

基类:bigflow.input.FileBase

表示读取SequenceFile的数据源,SequenceFile的(Key, Value)必须均为BytesWritable,并由用户自行解析

参数:
  • *path -- 读取文件的path,必须均为str类型
  • **options --

    其中关键参数: combine_multi_file: 是否可以合并多个文件到一个mapper中处理。默认为True。 partitioned: 默认为False,如果置为True,则返回的数据集为一个ptable,

    ptable的key是split_info,value这个split上的全部数据所组成的pcollection。

    key_serde: key如何反序列化 value_serde: value如何反序列化 如果未设定key_serde/value_serde,则会忽略掉key,只把value用默认序列化器反序列化并返回。

Example

>>> from bigflow import serde
>>> StrSerde = serde.StrSerde
>>> lines = _pipeline.read(
        input.SequenceFile('path', key_serde=StrSerde(), value_serde=StrSerde()))
>>> lines.get()
[("key1", "value1"), ("key2", "value2")]
>>> import mytest_proto_pb2
>>> msg_type = mytest_proto_pb2.MyTestPbType
>>> _pipeline.add_file("mytest_proto_pb2.py", "mytest_proto_pb2.py")
>>> pbs = _pipeline.read(input.SequenceFile('path2', serde=serde.ProtobufSerde(msg_type)))
>>> pbs.get()  # 如果未设置key_serde/value_serde,则key会被丢弃。
>>> [<mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>,
     <mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>]

有时,Pb包在本地没有,例如,py文件在hdfs,则可以使用下边的方法:

>>> _pipeline.add_archive("hdfs:///proto.tar.gz", "proto") #add_archive暂时不支持本地模式
>>> def get_pb_msg_creator(module_name, class_name):
...     import importlib
...     return lambda: importlib.import_module(module_name).__dict__[class_name]()
>>> pbs = _pipeline.read(input.SequenceFile('path2', serde=serde.ProtobufSerde(get_pb_msg_creator("proto.mytest_proto_pb2", "MyTestPbType"))))
>>> pbs.get()
>>> [<mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>,
     <mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>]

如果需要自定义Serde,参见:bigflow.serde.Serde

as_type(kv_deserializer)

通过kv_deserializer反序列化读取的(Key, Value)

kv_deserializer的期望签名为:

kv_deserializer(key: str, value: str) => object

transform_from_node(load_node, pipeline)

内部接口

class bigflow.input.SequenceFileStream(*path, **options)

基类:bigflow.input.FileBase

表示读取SequenceFile的无穷数据源,SequenceFile的(Key, Value)必须均为BytesWritable,并由用户自行解析

参数:
  • *path -- 读取文件的path,必须均为str类型
  • **options --

    可选的参数。

    [Hint] max_record_num_per_round: 用于指定每轮订阅的日志条数,默认值为1000

    [Hint] timeout_per_round: 用于指定每轮订阅的超时时间(单位为s),默认为10s

    key_serde: key如何反序列化

    value_serde: value如何反序列化

    如果未设定key_serde/value_serde,则会忽略掉key,只把value用默认序列化器反序列化并返回。

注解

  1. 如果path中含有子目录,则以子目录作为数据源;如果path中没有子目录,则以path作为数据源
  2. 目录中有效的文件名为从0开始的正整数;如果文件不存在,会一直等待该文件
  3. 目录中的文件不允许被修改,添加需要保证原子(可以先写成其他文件名,然后进行mv)
as_type(kv_deserializer)

通过kv_deserializer反序列化读取的(Key, Value)

kv_deserializer的期望签名为:

kv_deserializer(key: str, value: str) => object

transform_from_node(load_node, pipeline)

内部方法

class bigflow.input.TextFile(*path, **options)

基类:bigflow.input.FileBase

表示读取的文本文件的数据源

参数:*path -- 读取文件的path,必须均为str类型

读取文件数据示例::

>>> lines1 = _pipeline.read(input.TextFile('hdfs:///my_hdfs_dir/'))
>>> lines2 = _pipeline.read(input.TextFile('hdfs://host:port/my_hdfs_file'))
>>> lines3 = _pipeline.read(input.TextFile('hdfs:///multi_path1', 'hdfs:///multi_path2'))
>>> lines4 = _pipeline.read(input.TextFile('./local_file_by_rel_path/'))
>>> lines5 = _pipeline.read(input.TextFile('/home/work/local_file_by_abs_path/'))
>>> lines6 = _pipeline.read(input.TextFile(*['hdfs:///multi_path1', 'hdfs:///multi_path2']))
**options: 其中关键参数:

combine_multi_file: 是否可以合并多个文件到一个mapper中处理。默认为True。

partitioned: 默认为False,如果置为True,则返回的数据集为一个ptable,
ptable的key是split_info,value这个split上的全部数据所组成的pcollection。:
>>> f1 = open('data1.txt', 'w')
>>> f1.write('1 2 1')
>>> f1.close()
>>> f2 = open('data2.txt', 'w')
>>> f2.write('1 2 2')
>>> f2.close()
>>> table = _pipeline.read(input.TextFile('./data1.txt', './data2.txt', partitioned = True))
>>> def wordcount(p):
        return p.flat_map(lambda line: line.split()) \
                .group_by(lambda word: word) \
                .apply_values(transforms.count)
>>> table.apply_values(wordcount).get()
{'/home/data1.txt': {'1': 2, '2': 1}, '/home/data2.txt': {'1': 1, '2', 2}}
# 需要注意,MR模式下,key可能不是这个格式的,切分方法也不一定是按照文件来的。
transform_from_node(load_node, pipeline)

内部接口

class bigflow.input.TextFileStream(*path, **options)

基类:bigflow.input.FileBase

表示读取的文本文件的无穷数据源。

参数:*path -- 读取文件目录的path,必须均为str类型

读取文件数据示例::

  >>> lines1 = _pipeline.read(input.TextFileStream('hdfs:///my_hdfs_dir/'))
  >>> lines2 = _pipeline.read(input.TextFileStream('hdfs://host:port/my_hdfs_dir/'))
  >>> lines3 = _pipeline.read(input.TextFileStream('hdfs:///multi_path1', 'hdfs:///multi_path2'))
  >>> lines4 = _pipeline.read(input.TextFileStream('./local_file_by_rel_path/'))
  >>> lines5 = _pipeline.read(input.TextFileStream('/home/work/local_file_by_abs_path/'))
  >>> lines6 = _pipeline.read(input.TextFileStream(*['hdfs:///multi_path1', 'hdfs:///multi_path2']))

**options: 可选的参数。

    [Hint] max_record_num_per_round: 用于指定每轮订阅的日志条数,默认值为1000

    [Hint] timeout_per_round: 用于指定每轮订阅的超时时间(单位为s),默认为10s

注解

  1. 如果path中含有子目录,则以子目录作为数据源;如果path中没有子目录,则以path作为数据源
  2. 目录中有效的文件名为从0开始的正整数;如果文件不存在,会一直等待该文件
  3. 目录中的文件不允许被修改,添加需要保证原子(可以先写成其他文件名,然后进行mv)
transform_from_node(load_node, pipeline)

内部接口

class bigflow.input.UserInputBase

基类:object

用户输入抽象基类

用户需要按以下方法重写split/load函数

Eg.

class LocalFileInput(UserInputBase):
    def __init__(self, dir):
        self._dir = dir

    def split(self):
        return [os.path.join(self._dir, filename) for filename in os.listdir(self._dir)]

    def load(split):
        with open(split) as f:
        for line in f.readline():
            yield line.strip()

用户可以重写post_process以实现一些后处理, post_process有一个传入参数,是一个PTable,这个PTable的key是split string, value是这个split上的数据。 默认post_process方法是`bigflow.transforms.flatten_values`.

get_serde()

User can override this method to set the serde

get_size()

user can override this method to calculate the size of the input data

load(split)

Load data from a split. The return value will be flattened into a PCollection.

post_process(ptable)

User can override post_process method to do some post_process.

split()

splits urls as some splits. User should override this method.

bigflow.input.user_define_format(user_input_base)

return a FileBase object from a UserInputBase