PCollection¶
bigflow.pcollection.PCollection
定义
-
class
bigflow.pcollection.
PCollection
(node, pipeline)¶ -
用于表示分布式数据集的
bigflow.ptype.PType
注解
用户不应当直接使用其构造方法
参数: node (Node) -- LogicalPlan.Node -
accumulate
(zero, accumulate_fn, *side_inputs, **options)¶ 等同于
bigflow.transforms.accumulate(self, zero, accumulate_fn, *side_inputs, **options)
参数: - zero (value or function) -- 初始值,或是一个返回初始值的方法
- accumulate_fn (function) -- 聚合方法
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置选项
返回: 聚合结果
返回类型: >>> from operator import add >>> p.parallelize([1, 2, 3]).accumulate(0, add).get() 6
-
agg
(io_description, fn, *args, **kargs)¶ 选择一些字段去做一些聚合操作。
参数: - p (pcollection) -- 输入数据集,需要是一个每个元素都是dict的pcollection
- io_description (str) -- 格式为: a,b=>c,d,e 即,输入字段=>输出字段
- fn (callable) -- 函数原型为 (*input_pcollections) => (*output_pcollection_or_pobjects) 即,该函数的输入参数为多个pcollection, 每个pcollection表示数据的一个字段的全部行所拼成的一个pcollection。 该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
返回: 返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素输出的几个pcollection进行笛卡尔积并添加字段名后的结果。
例如::
>>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}]) >>> print x.apply(fields.agg, >>> 'a, b => c, d, e', >>> lambda a, b: ( >>> a.count(), >>> b.sum(), >>> a.flat_map(lambda x: xrange(x)) >>> ) >>> ).get() [{'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 1}]
-
aggregate
(zero, aggregate_fn, combine_fn, *side_inputs, **options)¶ 等同于
bigflow.transforms.aggregate(self, aggregate_fn, combine_fn, *side_inputs, **options)
参数: - pcollection (PCollection) -- 输入PCollection
- zero (value or function) -- 初始值,或是一个返回初始值的方法
- accumulate_fn (function) -- 聚合方法
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置选项
返回: 聚合结果
返回类型:
-
as_pobject
()¶ 等同于 to
self.first()
返回: 转换结果 返回类型: PObject >>> _pipeline.parallelize(["A"]).as_pobject().get() "A"
-
as_schema
(fields)¶ 根据字段,返回一个SchemaPCollection
参数: fields -- 类型可以是,tuple,list,dict; 当fields是tuple或list时, 会判断每个元素的类型:
fields中的每个元素是python基本类型或一个serde; 接口将构造TupleSerde设置到PCollection每个元素fields中的每个元素是python string,抛出异常
- 当fields是dict时:
- fields的key标识字段类型,value标识该字段的类型,如 {"name": str, "age": int} 当前PCollection中的每个元素必须是dict,dict内的key必须相同。 fields内的key要和PCollection内的key必须相同
返回: 表示转化后的PCollection 返回类型: PCollection Examples
>>> data = self._pipeline.parallelize([("xiaoming", "PKU", 20)]) >>> d1 = data.as_schema((str, str, int)) >>> d2 = data.as_schema([str, str, int]) >>> print d1.get() [('xiaoming', 'PKU', 20)] >>> >>> print d2.get() [('xiaoming', 'PKU', 20)] >>> >>> data = self._pipeline.parallelize([{"name": "xiaoming", "school": "PKU", "age": 20}]) >>> d5 = data.as_schema({"name": str, "school": str, "age": int}) >>> print d5.get() [{'age': 20, 'name': 'xiaoming', 'school': 'PKU'}] >>>
-
cartesian
(other, *others, **options)¶ 与其他的PCollection做笛卡尔积
参数: - other (PCollection) -- 其他的PCollection
- *others -- 更多的PCollection
返回: 表示结果的PCollection
返回类型: >>> _p1 = _pipeline.parallelize([1, 2, 3]) >>> _p2 = _pipeline.parallelize([4, 5]) >>> _p1.cartesian(_p2).get() [(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]
-
cogroup
(other, *others)¶ 等同于
bigflow.transforms.cogroup(self, other, *others)
,参数: - other (PCollection) -- 用于协同分组的PCollection
- *others -- 更多的PCollection
返回: 分组结果
返回类型:
-
combine
(fn, **options)¶ 等同于
bigflow.transforms.combine(self, fn)
参数: - fn (callable) -- 合并函数
- **options -- 可配置选项
返回: 合并结果
返回类型: >>> _pipeline.parallelize([2, 4, 6, 10]).combine(sum).get() 22
-
count
(**options)¶ 返回元素的数量,等同于
bigflow.transforms.count(self)
返回: 元素数量 返回类型: PObject >>> p.parallelize([1, 2, 3, 4]).count().get() 4
-
diff
(other)¶ 返回与另一个PCollection中不相同的元素
参数: other (PCollection) -- 另一个PCollection 返回: 表示结果的PCollection 返回类型: PCollection >>> a = _pipeline.parallelize([1, 1, 2, 3]) >>> b = _pipeline.parallelize([1, 1, 2, 2]) >>> a.diff(b).get() [(2, (1, 2)), (3, (1, 0))]
-
distinct
()¶ 元素去重,等同于
bigflow.transforms.distinct(self)
参数: **options -- 可配置选项 返回: 不重复元素,以PCollection给出 返回类型: PCollection >>> p.parallelize([2, 2, 1, 9, 3, 3]).distinct().get() [2, 3, 1, 9]
-
filter
(fn, *side_inputs, **options)¶ 过滤元素,等同于
bigflow.transforms.filter(self, fn, *side_inputs, **options)
,参数: - fn (function) -- 断言函数
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置选项
返回: 过滤结果
返回类型: >>> p.parallelize([3, 7, 1, 3, 2, 8]).filter(lambda x: x % 2 == 0).get() [2, 8]
-
first
(**options)¶ 取第一个元素
返回: 表示结果的PObject 返回类型: PObject >>> p.parallelize([3, 7, 1, 3, 2, 8]).first.get() 3
-
flat_map
(fn, *side_inputs, **options)¶ 对所有元素进行一对多映射,等同于
bigflow.transforms.flat_map(self, fn, *side_inputs, **options)
参数: - fn (function) -- 变换函数
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置选项
返回: 变换结果
返回类型: >>> _pipeline.parallelize([1, 3, 5, 7]).flat_map(lambda x: [x, x * 2]).get() [1, 2, 3, 5, 6, 7, 10, 14]
-
foreach
(fn, *side_inputs, **options)¶ 等同于
bigflow.transforms.foreach(self, fn, *side_inputs, **options)
参数: - fn (function) -- 变换函数
- *side_inputs -- 参与运算的SideInputs
返回: None
-
full_join
(other, *others, **options)¶ 与其他PCollection做全连接操作,等同于
bigflow.transforms.full_join(self, other, *others)
参数: - other (PCollection) -- 做连接操作的PCollection
- *others -- 更多的PCollection
返回: 全连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1)]) >>> y = _pipeline.parallelize([("b", 2)]) >>> x.full_join(y).get() [("a", (1, None)), ("b", (None, 2))]
-
group_by
(key_extractor, value_extractor=None, **options)¶ 对元素分组,等同于
bigflow.transforms.group_by(self, key_extractor, value_extractor)
参数: - key_extractor (function) -- 用于提取key的函数
- value_extractor (function, optional) -- 用于提取value的函数
- **options -- 可配置选项
返回: 分组结果
返回类型: >>> _pcollection = _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)]) >>> _grouped = _pcollection.group_by(lambda x: x[0], lambda x: x[1]).get() >>> _grouped.get() {"A": [4, 3, 1], "B": [2]}
-
group_by_key
(**options)¶ 与
group_by
变换类似,但使用默认的key/value提取函数对元素分组参数: **options -- 可配置参数 返回: 分组结果 返回类型: PTable >>> _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)]).group_by_key().get() {"A": [4, 3, 1], "B": [2]}
-
intersection
(other, output_duplicated=False)¶ 返回与另一个PCollection的交集
参数: other (PCollection) -- 另一个PCollection 返回: 表示交集的PCollection 返回类型: PCollection >>> a = _pipeline.parallelize([1, 1, 2, 3]) >>> b = _pipeline.parallelize([1, 1, 2, 2, 5]) >>> a.intersection(b).get() [1, 2] >>> a.intersection(b, output_duplicated = True).get() [1, 1, 2]
-
is_empty
()¶ 判断此PCollection是否为空
返回: 表示结果的PObject 返回类型: PObject >>> a = _pipeline.parallelize([1, 2, 3, 4]) >>> transforms.is_empty(a).get() False >>> b = _pipeline.parallelize([]) >>> transforms.is_empty(b).get() True
-
join
(other, *others, **options)¶ 与其他PCollection做连接操作,等同于
bigflow.transforms.join(self, other, *others)
参数: - other (PCollection) -- 做连接操作的PCollection
- *others -- 更多的PCollection
返回: 连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1), ("b", 4)]) >>> y = _pipeline.parallelize([("a", 2), ("a", 3)]) >>> x.join(y).get() [("a", (1, 2)), ("a", (1, 3))]
-
left_join
(other, *others, **options)¶ 与其他PCollection做左连接操作,等同于
bigflow.transforms.left_join(self, other, *others)
参数: - other (PCollection) -- 做连接操作的PCollection
- *others -- 更多的PCollection
返回: 左连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1), ("b", 4)]) >>> y = _pipeline.parallelize([("a", 2)]) >>> x.left_join(y).get() [("a", (1, 2)), ("b", (4, None))]
-
map
(fn, *side_inputs, **options)¶ 对所有元素进行一对一映射变换,等同于
bigflow.transforms.map(self, fn, *side_inputs, **options)
参数: - fn (function) -- 变换函数
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置选项
返回: 变换结果
返回类型: >>> p.parallelize([1, 3, 5, 7]).map(lambda x: x + 1).get() [2, 4, 6, 8]
-
max
(key=None, **options)¶ 取最大元素,等同于
bigflow.transforms.max(self, key)
参数: - key (function, optional) -- 用于提取key的函数,与Python内置``max()``中的
key
参数相同 - **options -- 可配置选项
返回: 包含最大元素的PObject
返回类型: >>> p.parallelize([3, 7, 1, 3, 2, 8]).max().get() 8
- key (function, optional) -- 用于提取key的函数,与Python内置``max()``中的
-
max_elements
(n, key=None, **options)¶ 取前n大元素,等同于
bigflow.transforms.max_elements(self, n, key)
参数: - n (int) -- 必须大于0
- key (function, optional) -- 用于提取key的函数,与Python内置``max()``中的
key
参数相同 - **options -- 可配置选项
返回: 包含前n大元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject
返回类型: >>> p.parallelize([3, 7, 1, 3, 2, 8]).max_elements(2).get() [8, 7]
-
min
(key=None, **options)¶ 取最小元素, 等同于
bigflow.transforms.min(self, key)
参数: - key (function, optional) -- 用于提取key的函数,与Python内置``min()``中的
key
参数相同 - **options -- 可配置选项
返回: 最小元素
返回类型: >>> p.parallelize([3, 7, 1, 3, 2, 8]).min().get() 1
- key (function, optional) -- 用于提取key的函数,与Python内置``min()``中的
-
min_elements
(n, key=None, **options)¶ 取前n小元素,等同于
bigflow.transforms.min_elements(self, key)
参数: - n (int) -- 必须大于0
- key (function, optional) -- 用于提取key的函数,与Python内置``min()``中的
key
参数相同 - **options -- 可配置选项
返回: 包含前n小元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject
返回类型: >>> p.parallelize([3, 7, 1, 3, 2, 8]).min_elements(2).get() [1, 2]
-
reduce
(fn, *side_inputs, **options)¶ 使用给定的fn将所有元素规约为单个元素, 等同于
bigflow.transforms.reduce(self, fn, *side_inputs, **options)
,参数: - fn (function) -- 规约函数
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置参数
返回: 规约结果
返回类型: >>> p.parallelize([1, 2, 3, 4]).reduce(lambda x, y: x + y).get() 10
-
right_join
(other, *others, **options)¶ 与其他PCollection做右连接操作,等同于
bigflow.transforms.right_join(self, other, *others)
参数: - other (PCollection) -- 做连接操作的PCollection
- *others -- 更多的PCollection
返回: 右连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1), ("b", 4)]) >>> y = _pipeline.parallelize([("a", 2), ("a", 3)]) >>> x.right_join(y).get() [("a", (1, 2)), ("a", (1, 3))]
-
select
(io_description, fn, *args, **kargs)¶ 对每条数据选择一些字段进行变换。
参数: - p (pcollection) -- 输入数据集,需要是一个每个元素都是dict的pcollection
- io_description (str) -- 格式为: a,b=>c,d,e 即,输入字段=>输出字段
- fn (callable) -- 函数原型为 (*input_pobjects) => (*output_pcollection_or_pobjects) 即,该函数的输入参数为多个pobject,每个pobject表示数据的一个字段, 对这个pobject上进行的操作会执行在每行数据上;该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
返回: 返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素相当于对原数据每条数据进行一次fn处理, 处理后返回的tuple中的所有数据集进行笛卡尔积, 最终再把所有输入数据处理后得出的结果拼成一个数据集。
例如::
>>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}]) >>> print x.apply(fields.select, >>> 'a, b => c, d, e', >>> lambda a, b: ( >>> a.map(lambda x: x + 1), >>> b.map(lambda x, y: x / y, a), >>> a.flat_map(lambda x: xrange(x)) >>> ) >>> ).get() [{'c': 2, 'd': 2.0, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 1}]
-
sort
(reverse=False)¶ 对元素排序,等同于
bigflow.transforms.sort(self, reverse)
参数: reverse (bool) -- 若True则降序排列,否则为升序排列 返回: 排序结果 返回类型: PCollection >>> _pipeline.parallelize([3, 1, 2, 8]).sort().get() [1, 2, 3, 8]
-
sort_by
(key, reverse=False)¶ 使用给定的key对元素排序,等同于
bigflow.transforms.sort_by(self, fn, reverse)
参数: - key (function, optional) -- 用于提取key的函数,与Python内置``sort()``中的
key
参数相同 - reverse (bool) -- 若True则降序排列,否则为升序排列
返回: 排序结果
返回类型: >>> _pipeline.parallelize([3, 1, 2, 8]).sort_by().get() [1, 2, 3, 8]
- key (function, optional) -- 用于提取key的函数,与Python内置``sort()``中的
-
substract
(other)¶ 已废弃,请使用subtract.
-
subtract
(other)¶ 返回不存在另一个PCollection中的元素,相当于做容器减法
参数: other (PCollection) -- 作为减数的PCollection 返回: 表示减法结果的PCollection 返回类型: PCollection >>> a = _pipeline.parallelize([1, 2, 3, 3, 4]) >>> b = _pipeline.parallelize([1, 2, 5]) >>> a.subtract(b).get() [3, 3, 4]
-
sum
()¶ 将所有元素相加,等同于
bigflow.transforms.sum(self)
返回: 相加结果 返回类型: PObject >>> _pipeline.parallelize([3, 1, 2, 8]).sum().get() 14
-
take
(n, **options)¶ 给定PCollection中的任意n个元素,等同于
bigflow.transforms.take()
参数: 返回: 表示结果的PCollection
返回类型: >>> _pipeline.parallelize([1, 2, 3, 4]).take(3).get() [1, 2, 3]
>>> _n = _pipeline.parallelize(2) >>> _pipeline.parallelize([1, 2, 3, 4]).take(_n).get() [1, 2]
-
transform
(*args, **options)¶
-
union
(other, *others, **options)¶ 将元素与其他PCollection/PObject中的所有元素共同构成新的PCollection 等同于
bigflow.transforms.union(self, other, *others)
参数: - other (PCollection or PObject) -- 其他PCollection/PObject
- *others -- 其他PCollection/PObject
返回: 表示结果的PCollection
返回类型: >>> _p1 = _pipeline.parallelize([1, 2, 3, 4]) >>> _p2 = _pipeline.parallelize([5, 6, 7, 8]) >>> _p1.union(_p2).get() [1, 2, 3, 4, 5, 6, 7, 8]
-