PCollection

bigflow.pcollection.PCollection 定义

class bigflow.pcollection.PCollection(node, pipeline)

基类:bigflow.ptype.PType

用于表示分布式数据集的 bigflow.ptype.PType

注解

用户不应当直接使用其构造方法

参数:node (Node) -- LogicalPlan.Node
accumulate(zero, accumulate_fn, *side_inputs, **options)

等同于 bigflow.transforms.accumulate(self, zero, accumulate_fn, *side_inputs, **options)

参数:
  • zero (value or function) -- 初始值,或是一个返回初始值的方法
  • accumulate_fn (function) -- 聚合方法
  • *side_inputs -- 参与运算的SideInputs
  • **options -- 可配置选项
返回:

聚合结果

返回类型:

PObject

>>> from operator import add
>>> p.parallelize([1, 2, 3]).accumulate(0, add).get()
6
agg(io_description, fn, *args, **kargs)

选择一些字段去做一些聚合操作。

参数:
  • p (pcollection) -- 输入数据集,需要是一个每个元素都是dict的pcollection
  • io_description (str) -- 格式为: a,b=>c,d,e 即,输入字段=>输出字段
  • fn (callable) -- 函数原型为 (*input_pcollections) => (*output_pcollection_or_pobjects) 即,该函数的输入参数为多个pcollection, 每个pcollection表示数据的一个字段的全部行所拼成的一个pcollection。 该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
返回:

返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素输出的几个pcollection进行笛卡尔积并添加字段名后的结果。

例如::

>>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}])
>>> print x.apply(fields.agg,
>>>      'a, b => c, d, e',
>>>      lambda a, b: (
>>>         a.count(),
>>>         b.sum(),
>>>         a.flat_map(lambda x: xrange(x))
>>>     )
>>> ).get()

[{'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 1}]
aggregate(zero, aggregate_fn, combine_fn, *side_inputs, **options)

等同于 bigflow.transforms.aggregate(self, aggregate_fn, combine_fn, *side_inputs, **options)

参数:
  • pcollection (PCollection) -- 输入PCollection
  • zero (value or function) -- 初始值,或是一个返回初始值的方法
  • accumulate_fn (function) -- 聚合方法
  • *side_inputs -- 参与运算的SideInputs
  • **options -- 可配置选项
返回:

聚合结果

返回类型:

PObject

as_pobject()

等同于 to self.first()

返回:转换结果
返回类型:PObject
>>> _pipeline.parallelize(["A"]).as_pobject().get()
"A"
as_schema(fields)

根据字段,返回一个SchemaPCollection

参数:fields --

类型可以是,tuple,list,dict; 当fields是tuple或list时, 会判断每个元素的类型:

fields中的每个元素是python基本类型或一个serde; 接口将构造TupleSerde设置到PCollection每个元素

fields中的每个元素是python string,抛出异常

当fields是dict时:
fields的key标识字段类型,value标识该字段的类型,如 {"name": str, "age": int} 当前PCollection中的每个元素必须是dict,dict内的key必须相同。 fields内的key要和PCollection内的key必须相同
返回:表示转化后的PCollection
返回类型:PCollection

Examples

>>> data = self._pipeline.parallelize([("xiaoming", "PKU", 20)])
>>> d1 = data.as_schema((str, str, int))
>>> d2 = data.as_schema([str, str, int])
>>> print d1.get()
[('xiaoming', 'PKU', 20)]
>>>
>>> print d2.get()
[('xiaoming', 'PKU', 20)]
>>>
>>> data = self._pipeline.parallelize([{"name": "xiaoming", "school": "PKU", "age": 20}])
>>> d5 = data.as_schema({"name": str, "school": str, "age": int})
>>> print d5.get()
[{'age': 20, 'name': 'xiaoming', 'school': 'PKU'}]
>>>
cartesian(other, *others, **options)

与其他的PCollection做笛卡尔积

参数:
  • other (PCollection) -- 其他的PCollection
  • *others -- 更多的PCollection
返回:

表示结果的PCollection

返回类型:

PCollection

>>> _p1 = _pipeline.parallelize([1, 2, 3])
>>> _p2 = _pipeline.parallelize([4, 5])
>>> _p1.cartesian(_p2).get()
[(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]
cogroup(other, *others)

等同于 bigflow.transforms.cogroup(self, other, *others),

参数:
  • other (PCollection) -- 用于协同分组的PCollection
  • *others -- 更多的PCollection
返回:

分组结果

返回类型:

PTable

combine(fn, **options)

等同于 bigflow.transforms.combine(self, fn)

参数:
  • fn (callable) -- 合并函数
  • **options -- 可配置选项
返回:

合并结果

返回类型:

PObject

>>> _pipeline.parallelize([2, 4, 6, 10]).combine(sum).get()
22
count(**options)

返回元素的数量,等同于 bigflow.transforms.count(self)

返回:元素数量
返回类型:PObject
>>> p.parallelize([1, 2, 3, 4]).count().get()
4
diff(other)

返回与另一个PCollection中不相同的元素

参数:other (PCollection) -- 另一个PCollection
返回:表示结果的PCollection
返回类型:PCollection
>>> a = _pipeline.parallelize([1, 1, 2, 3])
>>> b = _pipeline.parallelize([1, 1, 2, 2])
>>> a.diff(b).get()
[(2, (1, 2)), (3, (1, 0))]
distinct()

元素去重,等同于 bigflow.transforms.distinct(self)

参数:**options -- 可配置选项
返回:不重复元素,以PCollection给出
返回类型:PCollection
>>> p.parallelize([2, 2, 1, 9, 3, 3]).distinct().get()
[2, 3, 1, 9]
filter(fn, *side_inputs, **options)

过滤元素,等同于 bigflow.transforms.filter(self, fn, *side_inputs, **options),

参数:
  • fn (function) -- 断言函数
  • *side_inputs -- 参与运算的SideInputs
  • **options -- 可配置选项
返回:

过滤结果

返回类型:

PCollection

>>> p.parallelize([3, 7, 1, 3, 2, 8]).filter(lambda x: x % 2 == 0).get()
[2, 8]
first(**options)

取第一个元素

返回:表示结果的PObject
返回类型:PObject
>>> p.parallelize([3, 7, 1, 3, 2, 8]).first.get()
3
flat_map(fn, *side_inputs, **options)

对所有元素进行一对多映射,等同于 bigflow.transforms.flat_map(self, fn, *side_inputs, **options)

参数:
  • fn (function) -- 变换函数
  • *side_inputs -- 参与运算的SideInputs
  • **options -- 可配置选项
返回:

变换结果

返回类型:

PCollection

>>> _pipeline.parallelize([1, 3, 5, 7]).flat_map(lambda x: [x, x * 2]).get()
[1, 2, 3, 5, 6, 7, 10, 14]
foreach(fn, *side_inputs, **options)

等同于 bigflow.transforms.foreach(self, fn, *side_inputs, **options)

参数:
  • fn (function) -- 变换函数
  • *side_inputs -- 参与运算的SideInputs
返回:

None

full_join(other, *others, **options)

与其他PCollection做全连接操作,等同于 bigflow.transforms.full_join(self, other, *others)

参数:
  • other (PCollection) -- 做连接操作的PCollection
  • *others -- 更多的PCollection
返回:

全连接结果

返回类型:

PCollection

>>> x = _pipeline.parallelize([("a", 1)])
>>> y = _pipeline.parallelize([("b", 2)])
>>> x.full_join(y).get()
[("a", (1, None)), ("b", (None, 2))]
group_by(key_extractor, value_extractor=None, **options)

对元素分组,等同于 bigflow.transforms.group_by(self, key_extractor, value_extractor)

参数:
  • key_extractor (function) -- 用于提取key的函数
  • value_extractor (function, optional) -- 用于提取value的函数
  • **options -- 可配置选项
返回:

分组结果

返回类型:

PTable

>>> _pcollection = _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)])
>>> _grouped = _pcollection.group_by(lambda x: x[0], lambda x: x[1]).get()
>>> _grouped.get()
{"A": [4, 3, 1], "B": [2]}
group_by_key(**options)

group_by 变换类似,但使用默认的key/value提取函数对元素分组

参数:**options -- 可配置参数
返回:分组结果
返回类型:PTable
>>> _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)]).group_by_key().get()
{"A": [4, 3, 1], "B": [2]}
intersection(other, output_duplicated=False)

返回与另一个PCollection的交集

参数:other (PCollection) -- 另一个PCollection
返回:表示交集的PCollection
返回类型:PCollection
>>> a = _pipeline.parallelize([1, 1, 2, 3])
>>> b = _pipeline.parallelize([1, 1, 2, 2, 5])
>>> a.intersection(b).get()
[1, 2]
>>> a.intersection(b, output_duplicated = True).get()
[1, 1, 2]
is_empty()

判断此PCollection是否为空

返回:表示结果的PObject
返回类型:PObject
>>> a = _pipeline.parallelize([1, 2, 3, 4])
>>> transforms.is_empty(a).get()
False
>>> b = _pipeline.parallelize([])
>>> transforms.is_empty(b).get()
True
join(other, *others, **options)

与其他PCollection做连接操作,等同于 bigflow.transforms.join(self, other, *others)

参数:
  • other (PCollection) -- 做连接操作的PCollection
  • *others -- 更多的PCollection
返回:

连接结果

返回类型:

PCollection

>>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
>>> y = _pipeline.parallelize([("a", 2), ("a", 3)])
>>> x.join(y).get()
[("a", (1, 2)), ("a", (1, 3))]
left_join(other, *others, **options)

与其他PCollection做左连接操作,等同于 bigflow.transforms.left_join(self, other, *others)

参数:
  • other (PCollection) -- 做连接操作的PCollection
  • *others -- 更多的PCollection
返回:

左连接结果

返回类型:

PCollection

>>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
>>> y = _pipeline.parallelize([("a", 2)])
>>> x.left_join(y).get()
[("a", (1, 2)), ("b", (4, None))]
map(fn, *side_inputs, **options)

对所有元素进行一对一映射变换,等同于 bigflow.transforms.map(self, fn, *side_inputs, **options)

参数:
  • fn (function) -- 变换函数
  • *side_inputs -- 参与运算的SideInputs
  • **options -- 可配置选项
返回:

变换结果

返回类型:

PCollection

>>> p.parallelize([1, 3, 5, 7]).map(lambda x: x + 1).get()
[2, 4, 6, 8]
max(key=None, **options)

取最大元素,等同于 bigflow.transforms.max(self, key)

参数:
  • key (function, optional) -- 用于提取key的函数,与Python内置``max()``中的 key 参数相同
  • **options -- 可配置选项
返回:

包含最大元素的PObject

返回类型:

PObject

>>> p.parallelize([3, 7, 1, 3, 2, 8]).max().get()
8
max_elements(n, key=None, **options)

取前n大元素,等同于 bigflow.transforms.max_elements(self, n, key)

参数:
  • n (int) -- 必须大于0
  • key (function, optional) -- 用于提取key的函数,与Python内置``max()``中的 key 参数相同
  • **options -- 可配置选项
返回:

包含前n大元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject

返回类型:

PCollection

>>> p.parallelize([3, 7, 1, 3, 2, 8]).max_elements(2).get()
[8, 7]
min(key=None, **options)

取最小元素, 等同于 bigflow.transforms.min(self, key)

参数:
  • key (function, optional) -- 用于提取key的函数,与Python内置``min()``中的 key 参数相同
  • **options -- 可配置选项
返回:

最小元素

返回类型:

PObject

>>> p.parallelize([3, 7, 1, 3, 2, 8]).min().get()
1
min_elements(n, key=None, **options)

取前n小元素,等同于 bigflow.transforms.min_elements(self, key)

参数:
  • n (int) -- 必须大于0
  • key (function, optional) -- 用于提取key的函数,与Python内置``min()``中的 key 参数相同
  • **options -- 可配置选项
返回:

包含前n小元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject

返回类型:

PCollection

>>> p.parallelize([3, 7, 1, 3, 2, 8]).min_elements(2).get()
[1, 2]
reduce(fn, *side_inputs, **options)

使用给定的fn将所有元素规约为单个元素, 等同于 bigflow.transforms.reduce(self, fn, *side_inputs, **options),

参数:
  • fn (function) -- 规约函数
  • *side_inputs -- 参与运算的SideInputs
  • **options -- 可配置参数
返回:

规约结果

返回类型:

PObject

>>> p.parallelize([1, 2, 3, 4]).reduce(lambda x, y: x + y).get()
10
right_join(other, *others, **options)

与其他PCollection做右连接操作,等同于 bigflow.transforms.right_join(self, other, *others)

参数:
  • other (PCollection) -- 做连接操作的PCollection
  • *others -- 更多的PCollection
返回:

右连接结果

返回类型:

PCollection

>>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
>>> y = _pipeline.parallelize([("a", 2), ("a", 3)])
>>> x.right_join(y).get()
[("a", (1, 2)), ("a", (1, 3))]
select(io_description, fn, *args, **kargs)

对每条数据选择一些字段进行变换。

参数:
  • p (pcollection) -- 输入数据集,需要是一个每个元素都是dict的pcollection
  • io_description (str) -- 格式为: a,b=>c,d,e 即,输入字段=>输出字段
  • fn (callable) -- 函数原型为 (*input_pobjects) => (*output_pcollection_or_pobjects) 即,该函数的输入参数为多个pobject,每个pobject表示数据的一个字段, 对这个pobject上进行的操作会执行在每行数据上;该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
返回:

返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素相当于对原数据每条数据进行一次fn处理, 处理后返回的tuple中的所有数据集进行笛卡尔积, 最终再把所有输入数据处理后得出的结果拼成一个数据集。

例如::

>>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}])
>>> print x.apply(fields.select,
>>>      'a, b => c, d, e',
>>>      lambda a, b: (
>>>         a.map(lambda x: x + 1),
>>>         b.map(lambda x, y: x / y, a),
>>>         a.flat_map(lambda x: xrange(x))
>>>     )
>>> ).get()

[{'c': 2, 'd': 2.0, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 1}]
sort(reverse=False)

对元素排序,等同于 bigflow.transforms.sort(self, reverse)

参数:reverse (bool) -- 若True则降序排列,否则为升序排列
返回:排序结果
返回类型:PCollection
>>> _pipeline.parallelize([3, 1, 2, 8]).sort().get()
[1, 2, 3, 8]
sort_by(key, reverse=False)

使用给定的key对元素排序,等同于 bigflow.transforms.sort_by(self, fn, reverse)

参数:
  • key (function, optional) -- 用于提取key的函数,与Python内置``sort()``中的 key 参数相同
  • reverse (bool) -- 若True则降序排列,否则为升序排列
返回:

排序结果

返回类型:

PCollection

>>> _pipeline.parallelize([3, 1, 2, 8]).sort_by().get()
[1, 2, 3, 8]
substract(other)

已废弃,请使用subtract.

subtract(other)

返回不存在另一个PCollection中的元素,相当于做容器减法

参数:other (PCollection) -- 作为减数的PCollection
返回:表示减法结果的PCollection
返回类型:PCollection
>>> a = _pipeline.parallelize([1, 2, 3, 3, 4])
>>> b = _pipeline.parallelize([1, 2, 5])
>>> a.subtract(b).get()
[3, 3, 4]
sum()

将所有元素相加,等同于 bigflow.transforms.sum(self)

返回:相加结果
返回类型:PObject
>>> _pipeline.parallelize([3, 1, 2, 8]).sum().get()
14
take(n, **options)

给定PCollection中的任意n个元素,等同于 bigflow.transforms.take()

参数:
  • n (int or PObject) -- 元素数量
  • **options -- 可配置参数
返回:

表示结果的PCollection

返回类型:

PCollection

>>> _pipeline.parallelize([1, 2, 3, 4]).take(3).get()
[1, 2, 3]
>>> _n = _pipeline.parallelize(2)
>>> _pipeline.parallelize([1, 2, 3, 4]).take(_n).get()
[1, 2]
transform(*args, **options)

等同于 bigflow.transforms.transform(self, *args, **options)

union(other, *others, **options)

将元素与其他PCollection/PObject中的所有元素共同构成新的PCollection 等同于 bigflow.transforms.union(self, other, *others)

参数:
  • other (PCollection or PObject) -- 其他PCollection/PObject
  • *others -- 其他PCollection/PObject
返回:

表示结果的PCollection

返回类型:

PCollection

>>> _p1 = _pipeline.parallelize([1, 2, 3, 4])
>>> _p2 = _pipeline.parallelize([5, 6, 7, 8])
>>> _p1.union(_p2).get()
[1, 2, 3, 4, 5, 6, 7, 8]
window_into(window, **options)

对元素根据Window分组

参数:
  • window (Window) -- 用于分组的Window
  • **options -- 可配置选项
返回:

分组结果

返回类型:

PTable