Others¶
Author: zhangyuncong Date: 2015-09-23 16:18:06 Last Modified by: zhangyuncong Last Modified time: 2015-12-24 16:18:06
-
class
bigflow.future.fields.
FieldsDictSerde
(fields_to_types)¶ -
Use for the dict with know fields
-
deserialize
(buf)¶
-
serialize
(obj)¶
-
-
bigflow.future.fields.
agg
(p, io_description, fn, *args, **kargs)¶ 选择一些字段去做一些聚合操作。
参数: - p (pcollection) -- 输入数据集,需要是一个每个元素都是dict的pcollection
- io_description (str) -- 格式为: a,b=>c,d,e 即,输入字段=>输出字段
- fn (callable) -- 函数原型为 (*input_pcollections) => (*output_pcollection_or_pobjects) 即,该函数的输入参数为多个pcollection, 每个pcollection表示数据的一个字段的全部行所拼成的一个pcollection。 该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
返回: 返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素输出的几个pcollection进行笛卡尔积并添加字段名后的结果。
例如::
>>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}]) >>> print x.apply(fields.agg, >>> 'a, b => c, d, e', >>> lambda a, b: ( >>> a.count(), >>> b.sum(), >>> a.flat_map(lambda x: xrange(x)) >>> ) >>> ).get() [{'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 1}]
-
bigflow.future.fields.
flatten
(p)¶ 打平PTable为一个PCollection。如果K,V中有同样的字段,则以value中为准。
参数: p (pcollection) -- 输入数据集,需要是一个PTable,key,value中都必须为字典。 返回: 返回一个每个元素是一个dict的pcollection,表示PTable打平后的结果, 如果key,value中有同样字段,以value为准。 例如::
>>> x = _pipeline.parallelize([{'a': 1, 'b': 2.0, 'c': 1}, >>> {'a': 1, 'b': 2.0, 'c': 2}, >>> {'a': 2, 'b': 1.0, 'c': 3}, >>> ]) >>> print (x.apply(fields.group_by, 'a, b') >>> .apply_values(fields.agg, >>> 'a,b,c=>a,b,c', >>> lambda a, b, c: (a.sum(), b.sum(), c.sum())) >>> .apply(fields.flatten) >>> .get() >>> ) [{'a': 2, 'c': 3, 'b': 1.0}, {'a': 2, 'c': 3, 'b': 4.0}]
-
bigflow.future.fields.
get_out_fields_serde
(tpserde, out_fields)¶ 内部函数
-
bigflow.future.fields.
get_serde_of_field
(sd, field, default=None)¶ get serde of field
-
bigflow.future.fields.
get_serde_of_fields
(sd, fields, dft=None)¶ 内部函数
-
bigflow.future.fields.
group_by
(p, fields)¶ 按fields分组。
参数: - p (pcollection) -- 输入数据集,需要是一个每个元素都是dict的pcollection
- fields (str/[str]) -- 如果fields为一个str,则会按“,”进行切割,然后按切割出的字段进行分组。 如果fields为一个list,则直接按list中的多个字段进行分组。
返回: 返回一个key为一个包含指定字段的dict,value为原数据集的PTable。
需要注意的是,由于python原生的dict的key不能为dict,所以,这个返回的PTable上不能调用get操作, 如果需要get结果,可以先调用flatten。
例如::
>>> x = _pipeline.parallelize([{'a': 1, 'b': 2.0, 'c': 1}, >>> {'a': 1, 'b': 2.0, 'c': 2}, >>> {'a': 2, 'b': 1.0, 'c': 3}, >>> ]) >>> print (x.apply(fields.group_by, 'a, b') >>> .apply_values(fields.agg, >>> 'a,b,c=>a,b,c', >>> lambda a, b, c: (a.sum(), b.sum(), c.sum())) >>> .apply(fields.flatten) >>> .get() >>> ) [{'a': 2, 'c': 3, 'b': 1.0}, {'a': 2, 'c': 3, 'b': 4.0}]
-
bigflow.future.fields.
of
(fields_dict)¶ 创建FieldsDictSerde,用来序列化、反序列化有指定字段的字典。 因为字段已知,则key无需序列化,序列化出来的数据会小于marshal序列化后的结果。
参数: fields_to_types -- 可以传入一个字段名组成的列表, 也可以传入一个key是字段名,value是相应字段所用的serde的dict。 如果只传入字段名,则表示所有类型都是可被marshal序列化的类型。 返回: 相应的序列化器。
-
bigflow.future.fields.
select
(p, io_description, fn, *args, **kargs)¶ 对每条数据选择一些字段进行变换。
参数: - p (pcollection) -- 输入数据集,需要是一个每个元素都是dict的pcollection
- io_description (str) -- 格式为: a,b=>c,d,e 即,输入字段=>输出字段
- fn (callable) -- 函数原型为 (*input_pobjects) => (*output_pcollection_or_pobjects) 即,该函数的输入参数为多个pobject,每个pobject表示数据的一个字段, 对这个pobject上进行的操作会执行在每行数据上;该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
返回: 返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素相当于对原数据每条数据进行一次fn处理, 处理后返回的tuple中的所有数据集进行笛卡尔积, 最终再把所有输入数据处理后得出的结果拼成一个数据集。
例如::
>>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}]) >>> print x.apply(fields.select, >>> 'a, b => c, d, e', >>> lambda a, b: ( >>> a.map(lambda x: x + 1), >>> b.map(lambda x, y: x / y, a), >>> a.flat_map(lambda x: xrange(x)) >>> ) >>> ).get() [{'c': 2, 'd': 2.0, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 1}]
-
bigflow.future.fields.
select_cols
(val, select_fields)¶ 内部函数