Input¶
定义所有的数据源(Source),用于Pipeline.read()方法
实现一个Source需要实现四个接口:
- 有一个input_format属性,是一个flume::Loader
- 有一个objector属性,是一个Objector
- 有一个uris属性,返回一个uri列表
- 有一个transform_from_node方法,把一个Node变换成一个PType
- 有一个get_size方法,计算本文件读出数据量有多少。可以返回-1(?)表示未知大小。
-
class
bigflow.input.
FileBase
(*path, **options)¶ 基类:
object
用于Pipeline.read()方法读取文件的基类
参数: *path -- 读取文件的path,必须均为str或unicode类型
-
class
bigflow.input.
SchemaTextFile
(*path, **options)¶ -
读取文本文件生成支持字段操作的SchemaPCollection
参数: - *path -- 读取文件的path, 必须均为str类型
- **options --
Arbitrary keyword arguments, 其中关键参数, (1). 若columns(list), 每一项为字段名,则生成SchemaPCollection的元素是dict,dict中的value类型都是str;
(2). 若columns(list), 每一项为(字段名,类型),则生成SchemaPCollection的元素是dict, dict中的值类型是字段对应的类型;
(3). 若columns(int),表示分割的列数,则生成SchemaPCollection的元素是tuple,tuple中的每个元素的类型都是str, separator(str)表示每行数据字段分隔符,默认分隔符是Tab(" ");
(4). 若columns(list), 每一项为python基本类型(int, str, float),则生成SchemaPcollection的元素是tuple, 每个tuple中的元素的类型和columns中的类型一一对应;separator(str)表示每行数据字段分隔符,默认分隔符是Tab(" ");
ignore_overflow(bool)表示如果文件有多余的列,是否可以忽略掉。默认为False,即出现多余的列时即会报错。
ignore_illegal_line(bool): 表示当文件某一行的列数少于提供的字段数时,是否可以忽略该文件行。若不设置,则抛出异常
Example
>>> open("input-data", "w").write("XiaoA\t20\nXiaoB\t21\n") >>> persons = _pipeline.read(input.SchemaTextFile("input-data", columns = ['name', 'age'])) >>> persons.get() [{'age': '20', 'name': 'XiaoA'}, {'age': '21', 'name': 'XiaoB'}]
>>> open("input-data", "w").write("XiaoA\t20\nXiaoB\t21\n") >>> persons = _pipeline.read(input.SchemaTextFile("input-data", columns = [('name', str), ('age', int)])) >>> persons.get() [{'age': 20, 'name': 'XiaoA'}, {'age': 21, 'name': 'XiaoB'}]
>>> open("temp_data.txt", "w").write("1\t2.0\tbiflow\n10\t20.10\tinf") >>> data = p.read(input.SchemaTextFile("./temp_data.txt", columns=3)) >>> data.get() [('1', '2.0', 'biflow'), ('10', '20.1', 'inf')]
>>> open("temp_data.txt", "w").write("1\t2.0\tbiflow\n10\t20.10\tinf") >>> data = p.read(input.SchemaTextFile("./temp_data.txt", columns=[int, float, str])) >>> data.get() [(1, 2.0, 'biflow'), (10, 20.1, 'inf')]
-
transform_from_node
(load_node, pipeline)¶ 内部接口
-
class
bigflow.input.
SequenceFile
(*path, **options)¶ -
表示读取SequenceFile的数据源,SequenceFile的(Key, Value)必须均为BytesWritable,并由用户自行解析
参数: - *path -- 读取文件的path,必须均为str类型
- **options --
其中关键参数: combine_multi_file: 是否可以合并多个文件到一个mapper中处理。默认为True。 partitioned: 默认为False,如果置为True,则返回的数据集为一个ptable,
ptable的key是split_info,value这个split上的全部数据所组成的pcollection。key_serde: key如何反序列化 value_serde: value如何反序列化 如果未设定key_serde/value_serde,则会忽略掉key,只把value用默认序列化器反序列化并返回。
Example
>>> from bigflow import serde >>> StrSerde = serde.StrSerde >>> lines = _pipeline.read( input.SequenceFile('path', key_serde=StrSerde(), value_serde=StrSerde())) >>> lines.get() [("key1", "value1"), ("key2", "value2")]
>>> import mytest_proto_pb2 >>> msg_type = mytest_proto_pb2.MyTestPbType >>> _pipeline.add_file("mytest_proto_pb2.py", "mytest_proto_pb2.py") >>> pbs = _pipeline.read(input.SequenceFile('path2', serde=serde.ProtobufSerde(msg_type))) >>> pbs.get() # 如果未设置key_serde/value_serde,则key会被丢弃。 >>> [<mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>, <mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>]
有时,Pb包在本地没有,例如,py文件在hdfs,则可以使用下边的方法:
>>> _pipeline.add_archive("hdfs:///proto.tar.gz", "proto") #add_archive暂时不支持本地模式 >>> def get_pb_msg_creator(module_name, class_name): ... import importlib ... return lambda: importlib.import_module(module_name).__dict__[class_name]() >>> pbs = _pipeline.read(input.SequenceFile('path2', serde=serde.ProtobufSerde(get_pb_msg_creator("proto.mytest_proto_pb2", "MyTestPbType")))) >>> pbs.get() >>> [<mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>, <mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>]
如果需要自定义Serde,参见:
bigflow.serde.Serde
。-
as_type
(kv_deserializer)¶ 通过kv_deserializer反序列化读取的(Key, Value)
kv_deserializer的期望签名为:
kv_deserializer(key: str, value: str) => object
-
transform_from_node
(load_node, pipeline)¶ 内部接口
-
class
bigflow.input.
SequenceFileStream
(*path, **options)¶ -
表示读取SequenceFile的无穷数据源,SequenceFile的(Key, Value)必须均为BytesWritable,并由用户自行解析
参数: - *path -- 读取文件的path,必须均为str类型
- **options --
可选的参数。
[Hint] max_record_num_per_round: 用于指定每轮订阅的日志条数,默认值为1000
[Hint] timeout_per_round: 用于指定每轮订阅的超时时间(单位为s),默认为10s
key_serde: key如何反序列化
value_serde: value如何反序列化
如果未设定key_serde/value_serde,则会忽略掉key,只把value用默认序列化器反序列化并返回。
注解
- 如果path中含有子目录,则以子目录作为数据源;如果path中没有子目录,则以path作为数据源
- 目录中有效的文件名为从0开始的正整数;如果文件不存在,会一直等待该文件
- 目录中的文件不允许被修改,添加需要保证原子(可以先写成其他文件名,然后进行mv)
-
as_type
(kv_deserializer)¶ 通过kv_deserializer反序列化读取的(Key, Value)
kv_deserializer的期望签名为:
kv_deserializer(key: str, value: str) => object
-
transform_from_node
(load_node, pipeline)¶ 内部方法
-
class
bigflow.input.
TextFile
(*path, **options)¶ -
表示读取的文本文件的数据源
参数: *path -- 读取文件的path,必须均为str类型 读取文件数据示例::
>>> lines1 = _pipeline.read(input.TextFile('hdfs:///my_hdfs_dir/')) >>> lines2 = _pipeline.read(input.TextFile('hdfs://host:port/my_hdfs_file')) >>> lines3 = _pipeline.read(input.TextFile('hdfs:///multi_path1', 'hdfs:///multi_path2')) >>> lines4 = _pipeline.read(input.TextFile('./local_file_by_rel_path/')) >>> lines5 = _pipeline.read(input.TextFile('/home/work/local_file_by_abs_path/')) >>> lines6 = _pipeline.read(input.TextFile(*['hdfs:///multi_path1', 'hdfs:///multi_path2']))
- **options: 其中关键参数:
combine_multi_file: 是否可以合并多个文件到一个mapper中处理。默认为True。
- partitioned: 默认为False,如果置为True,则返回的数据集为一个ptable,
- ptable的key是split_info,value这个split上的全部数据所组成的pcollection。:
>>> f1 = open('data1.txt', 'w') >>> f1.write('1 2 1') >>> f1.close() >>> f2 = open('data2.txt', 'w') >>> f2.write('1 2 2') >>> f2.close() >>> table = _pipeline.read(input.TextFile('./data1.txt', './data2.txt', partitioned = True)) >>> def wordcount(p): return p.flat_map(lambda line: line.split()) \ .group_by(lambda word: word) \ .apply_values(transforms.count)
>>> table.apply_values(wordcount).get() {'/home/data1.txt': {'1': 2, '2': 1}, '/home/data2.txt': {'1': 1, '2', 2}} # 需要注意,MR模式下,key可能不是这个格式的,切分方法也不一定是按照文件来的。
-
transform_from_node
(load_node, pipeline)¶ 内部接口
-
class
bigflow.input.
TextFileStream
(*path, **options)¶ -
表示读取的文本文件的无穷数据源。
参数: *path -- 读取文件目录的path,必须均为str类型 读取文件数据示例::
>>> lines1 = _pipeline.read(input.TextFileStream('hdfs:///my_hdfs_dir/')) >>> lines2 = _pipeline.read(input.TextFileStream('hdfs://host:port/my_hdfs_dir/')) >>> lines3 = _pipeline.read(input.TextFileStream('hdfs:///multi_path1', 'hdfs:///multi_path2')) >>> lines4 = _pipeline.read(input.TextFileStream('./local_file_by_rel_path/')) >>> lines5 = _pipeline.read(input.TextFileStream('/home/work/local_file_by_abs_path/')) >>> lines6 = _pipeline.read(input.TextFileStream(*['hdfs:///multi_path1', 'hdfs:///multi_path2'])) **options: 可选的参数。 [Hint] max_record_num_per_round: 用于指定每轮订阅的日志条数,默认值为1000 [Hint] timeout_per_round: 用于指定每轮订阅的超时时间(单位为s),默认为10s
注解
- 如果path中含有子目录,则以子目录作为数据源;如果path中没有子目录,则以path作为数据源
- 目录中有效的文件名为从0开始的正整数;如果文件不存在,会一直等待该文件
- 目录中的文件不允许被修改,添加需要保证原子(可以先写成其他文件名,然后进行mv)
-
transform_from_node
(load_node, pipeline)¶ 内部接口
-
class
bigflow.input.
UserInputBase
¶ 基类:
object
用户输入抽象基类
用户需要按以下方法重写split/load函数
Eg.
class LocalFileInput(UserInputBase): def __init__(self, dir): self._dir = dir def split(self): return [os.path.join(self._dir, filename) for filename in os.listdir(self._dir)] def load(split): with open(split) as f: for line in f.readline(): yield line.strip()
用户可以重写post_process以实现一些后处理, post_process有一个传入参数,是一个PTable,这个PTable的key是split string, value是这个split上的数据。 默认post_process方法是`bigflow.transforms.flatten_values`.
-
get_serde
()¶ User can override this method to set the serde
-
get_size
()¶ user can override this method to calculate the size of the input data
-
load
(split)¶ Load data from a split. The return value will be flattened into a PCollection.
-
post_process
(ptable)¶ User can override post_process method to do some post_process.
-
split
()¶ splits urls as some splits. User should override this method.
-
-
bigflow.input.
user_define_format
(user_input_base)¶ return a FileBase object from a UserInputBase