Others

Author: zhangyuncong Date: 2015-09-23 16:18:06 Last Modified by: zhangyuncong Last Modified time: 2015-12-24 16:18:06

class bigflow.future.fields.FieldsDictSerde(fields_to_types)

基类:bigflow.serde.CppSerde

Use for the dict with know fields

deserialize(buf)
serialize(obj)
bigflow.future.fields.agg(p, io_description, fn, *args, **kargs)

选择一些字段去做一些聚合操作。

参数:
  • p (pcollection) -- 输入数据集,需要是一个每个元素都是dict的pcollection
  • io_description (str) -- 格式为: a,b=>c,d,e 即,输入字段=>输出字段
  • fn (callable) -- 函数原型为 (*input_pcollections) => (*output_pcollection_or_pobjects) 即,该函数的输入参数为多个pcollection, 每个pcollection表示数据的一个字段的全部行所拼成的一个pcollection。 该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
返回:

返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素输出的几个pcollection进行笛卡尔积并添加字段名后的结果。

例如::

>>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}])
>>> print x.apply(fields.agg,
>>>      'a, b => c, d, e',
>>>      lambda a, b: (
>>>         a.count(),
>>>         b.sum(),
>>>         a.flat_map(lambda x: xrange(x))
>>>     )
>>> ).get()

[{'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 1}]
bigflow.future.fields.flatten(p)

打平PTable为一个PCollection。如果K,V中有同样的字段,则以value中为准。

参数:p (pcollection) -- 输入数据集,需要是一个PTable,key,value中都必须为字典。
返回:返回一个每个元素是一个dict的pcollection,表示PTable打平后的结果, 如果key,value中有同样字段,以value为准。

例如::

>>> x = _pipeline.parallelize([{'a': 1, 'b': 2.0, 'c': 1},
>>>                            {'a': 1, 'b': 2.0, 'c': 2},
>>>                            {'a': 2, 'b': 1.0, 'c': 3},
>>>                            ])

>>> print (x.apply(fields.group_by, 'a, b')
>>>         .apply_values(fields.agg,
>>>                       'a,b,c=>a,b,c',
>>>                       lambda a, b, c: (a.sum(), b.sum(), c.sum()))
>>>         .apply(fields.flatten)
>>>         .get()
>>> )

[{'a': 2, 'c': 3, 'b': 1.0}, {'a': 2, 'c': 3, 'b': 4.0}]
bigflow.future.fields.get_out_fields_serde(tpserde, out_fields)

内部函数

bigflow.future.fields.get_serde_of_field(sd, field, default=None)

get serde of field

bigflow.future.fields.get_serde_of_fields(sd, fields, dft=None)

内部函数

bigflow.future.fields.group_by(p, fields)

按fields分组。

参数:
  • p (pcollection) -- 输入数据集,需要是一个每个元素都是dict的pcollection
  • fields (str/[str]) -- 如果fields为一个str,则会按“,”进行切割,然后按切割出的字段进行分组。 如果fields为一个list,则直接按list中的多个字段进行分组。
返回:

返回一个key为一个包含指定字段的dict,value为原数据集的PTable。

需要注意的是,由于python原生的dict的key不能为dict,所以,这个返回的PTable上不能调用get操作, 如果需要get结果,可以先调用flatten。

例如::

>>> x = _pipeline.parallelize([{'a': 1, 'b': 2.0, 'c': 1},
>>>                            {'a': 1, 'b': 2.0, 'c': 2},
>>>                            {'a': 2, 'b': 1.0, 'c': 3},
>>>                            ])

>>> print (x.apply(fields.group_by, 'a, b')
>>>         .apply_values(fields.agg,
>>>                       'a,b,c=>a,b,c',
>>>                       lambda a, b, c: (a.sum(), b.sum(), c.sum()))
>>>         .apply(fields.flatten)
>>>         .get()
>>> )

[{'a': 2, 'c': 3, 'b': 1.0}, {'a': 2, 'c': 3, 'b': 4.0}]
bigflow.future.fields.of(fields_dict)

创建FieldsDictSerde,用来序列化、反序列化有指定字段的字典。 因为字段已知,则key无需序列化,序列化出来的数据会小于marshal序列化后的结果。

参数:fields_to_types -- 可以传入一个字段名组成的列表, 也可以传入一个key是字段名,value是相应字段所用的serde的dict。 如果只传入字段名,则表示所有类型都是可被marshal序列化的类型。
返回:相应的序列化器。
bigflow.future.fields.select(p, io_description, fn, *args, **kargs)

对每条数据选择一些字段进行变换。

参数:
  • p (pcollection) -- 输入数据集,需要是一个每个元素都是dict的pcollection
  • io_description (str) -- 格式为: a,b=>c,d,e 即,输入字段=>输出字段
  • fn (callable) -- 函数原型为 (*input_pobjects) => (*output_pcollection_or_pobjects) 即,该函数的输入参数为多个pobject,每个pobject表示数据的一个字段, 对这个pobject上进行的操作会执行在每行数据上;该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
返回:

返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素相当于对原数据每条数据进行一次fn处理, 处理后返回的tuple中的所有数据集进行笛卡尔积, 最终再把所有输入数据处理后得出的结果拼成一个数据集。

例如::

>>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}])
>>> print x.apply(fields.select,
>>>      'a, b => c, d, e',
>>>      lambda a, b: (
>>>         a.map(lambda x: x + 1),
>>>         b.map(lambda x, y: x / y, a),
>>>         a.flat_map(lambda x: xrange(x))
>>>     )
>>> ).get()

[{'c': 2, 'd': 2.0, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 1}]
bigflow.future.fields.select_cols(val, select_fields)

内部函数