PObject

bigflow.pobject.PObject 定义

class bigflow.pobject.PObject(node, pipeline)

基类:bigflow.ptype.PType

用于表示单个元素的 bigflow.ptype.PType,通常为聚合类变换的结果,例如 bigflow.pcollection.PCollection.combine(), bigflow.pcollection.PCollection.aggregate()

注解

用户不应当直接使用其构造方法

注解

Python不允许重载这3个运算符(and, or, not),用户不应该在PObject上使用这三个运算符。Bigflow重载的是按位运算(&, |, ^)

PObject类上重载了以下操作符:

双目操作符:

__add__, __sub__, __mul__, __div__, __floordiv__, __mod__, __pow__, __lshift__, __rshift__, __and__, __xor__, __or__, __lt__, __le__, __eq__, __ge__, __gt__, __ne__ __radd__, __rsub__, __rmul__, __rdiv__, __rfloordiv__, __rmod__, __rpow__, __rlshift__, __rrshift__, __rand__, __rxor__, __ror__

单目操作符:

__neg__, __pos__, __abs__, __invert__

这些操作都将返回一个把相应数据进行相应变换后的pobject。

例如:p.sum() / p.count()等价于p.sum().map(lambda s, c: s / c, p.count())

同时, PObject 禁止了 bool 操作, 调用 bool(PObject) 或者 if PObject 将会抛出异常. :param node: LogicalPlan.Node :type node: Node

as_pcollection()

将PObject转为PCollection

返回:变换结果
返回类型:PCollection
cartesian(*pvalues, **options)

求当前算子与pvalues的笛卡尔积。 等价于 bigflow.transforms.cartesian(self, *pvalues, **options)

Args: *pvalues (PObject/PCollection) :returns: 此PObject与所有参数的笛卡尔积。结果PCollection中的每条记录是一个tuple。

每个tuple的第n个元素是第n个输入ptype对象的记录。
返回类型:PCollection
>>> _p1 = _pipeline.parallelize(1)
>>> _p2 = _pipeline.parallelize(2)
>>> _p1.cartesian(_p2).get()
[(1, 2)]
>>> _p3 = _pipeline.parallelize([3, 4])
>>> _p1.cartesian(_p3).get()
[(1, 3), (1, 4)]
>>> _p1.cartesian(_p2, _p3).get()
[(1, 2, 3), (1, 2, 4)]
ceil()

PObject ceil(类似于math.ceil)算子

返回:PObject
>>> _p1 = _pipeline.parallelize(1.2)
>>> print _p1.ceil().get()
2
flat_map(fn, *side_inputs, **option)

对包含的元素进行一对多变换,等同于 transforms.map(self, fn, *side_inputs, **options)

参数:
  • fn (function) -- 变换函数
  • *side_inputs -- 参与运算的SideInputs
  • **options -- 可配置选项
返回:

变换结果

返回类型:

PCollection

>>> _pipeline.parallelize(1).flat_map(lambda x: [x, x * 2]).get()
[1, 2]
floor()

PObject floor(类似于math.floor)算子

返回:PObject
>>> _p1 = _pipeline.parallelize(1.2)
>>> print _p1.floor().get()
1
map(fn, *side_inputs, **options)

对包含的元素进行变换,等同于 transforms.map(self, fn, *side_inputs, **options)

参数:
  • fn (function) -- 变换函数
  • *side_inputs -- 参与运算的SideInputs
  • **options -- 可配置选项
返回:

变换结果

返回类型:

PObject

>>> p.parallelize(1).map(lambda x: x + 1).get()
2
not_()

PObject not算子

返回:PObject
>>> _p1 = _pipeline.parallelize(1)
>>> print _p1.not_().get()
False
round(n=0)

PObject round(类似于math.round)算子 :param n: 小数点后面保留的位数

返回:PObject
>>> _p1 = _pipeline.parallelize(1.2)
>>> print _p1.round().get()
1
union(other, *others, **option)

将元素与其他PCollection/PObject中的所有元素共同构成PCollection 等同于 transforms.union(self, other, *others)

参数:
  • other (PCollection or PObject) -- 其他PCollection/PObject
  • *others -- 其他PCollection/PObject
返回:

表示结果的PCollection

返回类型:

PCollection

>>> _p1 = _pipeline.parallelize(1)
>>> _p2 = _pipeline.parallelize([2, 3])
>>> _p1.union(_p2).get()
[1, 2, 3]