Transforms¶
定义Bigflow Python中所有的变换
Author: Wang, Cong(bigflow-opensource@baidu.com), panyunhong(bigflow-opensource@baidu.com)
注意:除特殊说明外,所有变换的用户自定义方法(UDF)输入参数都不允许修改
-
bigflow.transforms.
accumulate
(pcollection, zero, accumulate_fn, *side_inputs, **options)¶ 将给定的PCollection按照一个初始值和方法聚合为PObject
假设输入类型为I,输出类型为O,则zero、accumulate_fn的期望签名为:
zero: O或zero() => O
accumulate_fn: accumulate_fn(O, I) => O (accumulate_fn的第一个参数允许被修改)
参数: - pcollection (PCollection) -- 输入PCollection
- zero (value or function) -- 初始值,或是一个返回初始值的方法
- accumulate_fn (function) -- 聚合方法
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置选项
返回: 聚合结果
返回类型: 由于该函数的语义是数据必然按顺序一条条的流过,限制了该函数可以进行的优化工作, 所以如果可以使用aggregate或reduce替换时,尽量使用aggregate/reduce替换该函数。
>>> from bigflow import transforms >>> _p = _pipeline.parallelize([1, 2, 3]) >>> transforms.accumulate(_p, 0, lambda x, y: x + y).get() 6
TODO: Another example
-
bigflow.transforms.
aggregate
(pcollection, zero, aggregate_fn, combine_fn, *side_inputs, **options)¶ 将给定的PCollection按照初始值、初段聚合方法和汇总方法聚合为PObject
假设输入类型I,输出类型为O,则zero、aggregate_fn、combine_fn的期望签名为:
zero: O或zero() => O
aggregate_fn: aggregate_fn(O, I) => O (aggregate_fn的第一个参数允许被修改)
combine_fn: combine_fn(O, O) => O (combine_fn的第一个参数允许被修改)
在执行时aggregate会把输入pcollection先切分成许多个分片,然后对每个分片使用zero生成一个O类型的初始值。
随后,在每个分片上,持续调用aggregate_fn,将分片上全部的数据聚合为一个O类型的值。
最后,会再将所有分片上生成的那些O类型的值汇聚到一起,使用combine_fn最终聚合到一起。
分片的规则用户不应作任何假设。
参数: - pcollection (PCollection) -- 输入PCollection
- zero (value or callable) -- 初始值,或是一个返回初始值的方法。
- aggregate_fn (callable) -- 初段聚合方法。该方法需要两个参数。
- combine_fn (callable) -- 汇总方法
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置选项
返回: 聚合结果
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize(["viva", "la", "vida"]) >>> transforms.aggregate(_p, 0, lambda x, y: x + len(y), lambda x, y: x + y).get() # sum words length 10
-
bigflow.transforms.
cartesian
(*pcollections, **options)¶ 对多个输入PCollection求笛卡尔积,返回一个PCollection
参数: - *pcollections -- 输入PCollection
- **options -- 可配置选项
返回: 笛卡尔积
返回类型: >>> from bigflow import transforms >>> p1 = _pipeline.parallelize([1, 2, 3]) >>> p2 = _pipeline.parallelize(['a', 'b', 'c']) >>> transforms.cartesian(p1, p2) [(1, 'a'), (1, 'b'), (1, 'c'), (2, 'a'), (2, 'b'), (2, 'c'), (3, 'a'), (3, 'b'), (3, 'c')]
>>> p3 = _pipeline.parallelize(1) >>> p4 = _pipeline.parallelize(2) >>> p3.cartesian(p4).get() [(1, 2)] >>> p5 = _pipeline.parallelize([3, 4]) >>> p3.cartesian(p5).get() [(1, 3), (1, 4)] >>> p3.cartesian(p4, p5).get() [(1, 2, 3), (1, 2, 4)]
-
bigflow.transforms.
cogroup
(*pcollections, **options)¶ 对传入的所有pcollection进行协同分组。
cogroup要求所有传入的PCollection的每个元素都是一个(k, v)对, cogroup会用k来作为分组的key,对多个输入PCollection进行协同分组, 返回一个PTable表示分组结果。
这个返回的PTable的每个value为一个tuple,tuple的每个元素是一个PCollection, 其中第n个PCollection表示输入的第n个PCollection在当前key下的全部数据。
如果某个输入PCollection在某个key下无数据,则对应的PCollection为一个空PCollection。
目前不能像group_by指定key_extractor。
group_by_key可以理解成是cogroup只传有一个参数的特殊情况。
参数: - *pcollections -- 输入PCollection
- **options -- 可配置选项
返回: 分组结果
返回类型: >>> from bigflow import transforms >>> _p1 = _pipeline.parallelize([("A", 1), ("A", 2), ("B", 3)]) >>> _p2 = _pipeline.parallelize([("A", 4)]) >>> _p = transforms.cogroup(_p1, _p2) # _p的值为{"A": ([1, 2], [4]), "B": ([3], [])} ,但由于实现难度较大,PTable的value为tuple of PCollection时的get操作暂不支持。 >>> _p.apply_values(lambda x, y: transforms.union(x, y)).get() {"A": [1, 2, 4], "B": [3]}
>>> def distinct_and_join(p, q): # 去重,并join ... return p.cogroup(q) \ ... .apply_values(lambda a, b: (a.distinct(), b.distinct())) \ ... .apply_values(transforms.cartesian) \ ... .flatten() >>> _p1 = _pipeline.parallelize([("A", 1), ("A", 2), ("A", 1), ("C", 1)]) >>> _p2 = _pipeline.parallelize([("A", 3), ("A", 3), ("B", 2)]) >>> print distinct_and_join(_p1, _p2).get() [("A", (1, 3)), ("A", (2, 3))] >>> # 未来bigflow会自动将p.distinct().join(q.distinct())优化成上边的样子(正在进行中)
>>> def semi_join(p, q): # 同key的join结果只输出一条 ... return p.cogroup(q) \ ... .apply_values(lambda a, b: (a.take(1), b.take(1))) \ ... .apply_values(transforms.cartesian) \ ... .flatten() >>> print semi_join(_p1, _p2).get() [("A", (1, 3))]
-
bigflow.transforms.
combine
(pcollection, fn, **options)¶ 给定一个合并函数,聚合输入PCollection中所有元素,这些元素以迭代器的形式给出
默认情况下,输入类型与输出类型需要一致,假设为O类型,fn的期望签名为 fn([O...]) => O,[]表示输入可遍历
在执行时会把输入pcollection先切分成许多个分片,然后对每个分片的数据组成一个列表,然后调用fn, 将每个分片中的数据合并成一个O类型的变量。 然后,会再将所有分片上生成的那些O类型的值汇聚到一起,组成一个列表,再使用fn最终聚合到一起。
分片的规则用户不应作任何假设。
用户可以显式的指定pre_combine=False,关掉预聚合。如果关掉预聚合,则会直接将全部的数据组成一个列表交给fn, 聚合成一个值。则这种情况下,fn需要的输入类型与fn返回的类型可以是不同类型的。
参数: - pcollection (PCollection) -- 输入PCollection
- fn (function) -- 合并函数
- **options -- 可配置选项 其中重要配置项为: pre_combine(bool): 是否进行预聚合。默认为True。
返回: 合并结果
返回类型: >>> _p = _pipeline.parallelize([2, 4, 6, 10]) >>> transforms.combine(_p, sum).get() 22 >>> number_list = self._pipeline.parallelize(['2', '3', '4']) >>> to_lookup = {'2': 2, '3': 3, '4': 4} >>> def lookup_dict_and_sum(numbers): ... return sum(map(lambda s: to_lookup[s], numbers)) >>> number_list.combine(lookup_dict_and_sum, pre_combine=False).get() 9 >>> number_list.combine(lookup_dict_and_sum).get() Error may occur (because of pre_combine, 1)
-
bigflow.transforms.
count
(pcollection, **options)¶ 返回给定PCollection中元素的数量
参数: - pcollection (PCollection) -- 输入PCollection
- **options -- 可配置选项
返回: 元素数量
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([1, 2, 3, 4]) >>> transforms.count(_p).get() 4
-
bigflow.transforms.
diff
(pcollection1, pcollection2)¶ 对于给定的PCollection1和PCollection2,返回两者不相同的元素
参数: - pcollection1 (PCollection) -- 输入1
- pcollection2 (PCollection) -- 输入2
返回: 表示差异的PCollection
返回类型: >>> from bigflow import transforms >>> a = _pipeline.parallelize([1, 1, 2, 3]) >>> b = _pipeline.parallelize([1, 1, 2, 2]) >>> transforms.diff(a, b).get() [(2, (1, 2)), (3, (1, 0))]
-
bigflow.transforms.
distinct
(pcollection, **options)¶ 返回给定PCollection中所有不重复元素
参数: - pcollection (PCollection) -- 输入PCollection
- **options -- 可配置选项
返回: 不重复元素,以PCollection给出
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([2, 2, 1, 9, 3, 3]) >>> transforms.distinct(_p).get() [2, 3, 1, 9]
-
bigflow.transforms.
extract_keys
(ptable, **options)¶ 提取给定PTable中所有的key
参数: - ptable (PTable) -- 输入PTable
- **options -- 可配置选项
返回: 所有的key,以PCollection给出
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]}) >>> transforms.extract_keys(_p).get() ["A", "B"]
-
bigflow.transforms.
extract_values
(ptable, **options)¶ 提取给定PTable中所有的value
参数: - ptable (PTable) -- 输入PTable
- **options -- 可配置选项
返回: 所有的value,以PCollection给出
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]}) >>> transforms.extract_values(_p).get() [2, 3, 4, 5]
无论PTable为多少层嵌套,都会抽取出最内层的value
>>> _p = _pipeline.parallelize({"A": {"a": [2, 3], "b": [1, 4]}, "B": {"c": [6, 7], "d": [9, 9]}} >>> print _p >>> {k0: {k1: [...]}} # 嵌套PTable >>> transforms.extract_values(_p).get() >>> [2, 3, 1, 4, 6, 7, 9, 9]
-
bigflow.transforms.
filter
(pcollection, fn, *side_inputs, **options)¶ 对于给定的PCollection和一个断言函数,返回只满足断言函数元素的PCollection
假设输入类型为I,fn的期望签名为 fn(I) => bool
参数: - pcollection (PCollection) -- 输入PCollection
- fn (function) -- 断言函数
- **options -- 可配置选项
返回: 过滤结果
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8]) >>> transforms.filter(_p, lambda x: x % 2 == 0).get() [2, 8]
-
bigflow.transforms.
first
(pcollection, **options)¶ 取出PCollection中的第一个元素
参数: - pcollection (PCollection) -- 输入PCollection
- **options -- 可配置选项
返回: 取出的单个元素,以PObject给出
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8]) >>> transforms.first(_p).get() 3
-
bigflow.transforms.
flat_map
(pvalue, fn, *side_inputs, **options)¶ 对PCollection中的每个元素做一对N映射
对变换函数必须返回一个可遍历变量(即实现了__iter__()方法),将迭代器中的所有元素 构造PCollection
假设输入类型为I,fn的期望签名为 fn(I) => [O...],[]表示返回结果可遍历
参数: - pvalue (PCollection or PObject) -- 输入P类型
- fn (function) -- 变换函数
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置选项
- Results:
- PCollection: 变换后的PCollection
>>> from bigflow import transforms >>> _p = _pipeline.parallelize([1, 3, 5, 7]) >>> transforms.flat_map(_p, lambda x: [x, x * 2]).get() [1, 2, 3, 5, 6, 7, 10, 14] >>> transforms.flat_map(_p, lambda x: [[x, x * 2]]).get() [[1, 2], [3, 6], [5, 10], [7, 14]] >>> transforms.flat_map(_p, lambda x: (x, x * 2)).get() [1, 2, 3, 6, 5, 10, 7, 14] >>> transforms.flat_map(_p, lambda x: [(x, x * 2)]).get() [(1, 2), (3, 6), (5, 10), (7, 14)]
注意返回结果可以为空:
>>> transforms.flat_map(_p, lambda x: []) []
如果返回的对象不能被遍历,则运行时会报错。 典型的错误用法包括None或返回一个单个元素。
返回对象只要是可迭代类型即可,不必一定是list。 特别是,需要输出较多数据时, 使用list可能会导致内存占用过大, 用户可以直接利用python的yield语法生成一个generator, 达到不需要占用大量内存的目的。
>>> _p = _pipeline.parallelize([3, 5]) >>> def make_partial_sum(x): ... sum = 0 ... for i in xrange(1, x + 1): ... sum += i ... yield sum
>>> transforms.flat_map(_p, make_partial_sum) [1, 3, 6, 1, 3, 6, 10, 15]
这种用法可以避免产生大list,从而避免内存占用过大的问题。
-
bigflow.transforms.
flatten
(ptable, **options)¶ 对于给定PTable中的key和value中每一个元素,构造(key, value)对,结果保存在PCollection中
参数: - ptable (PTable) -- 输入PTable
- **options -- 可配置选项
返回: (key, value)对,结果以PCollection表示
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]}) >>> transforms.flatten(_p).get() [("A", 2), ("A", 3), ("B", 4), ("B", 5)]
-
bigflow.transforms.
flatten_values
(ptable, **options)¶ 等价于
extract_values(ptable)
参数: - ptable (PTable) -- 输入PTable
- **options -- 可配置选项
返回: 所有的value,以PCollection给出
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]}) >>> transforms.flatten_values(_p).get() [2, 3, 4, 5]
-
bigflow.transforms.
foreach
(pvalue, fn, *side_inputs, **options)¶ 对给定的PCollection/PObject中的每个元素应用一个函数,函数并不期望有任何的 返回,而是利用其副作用产生效果。 该函数一般用于产出数据到外部存储,同一条数据可能会被多次调用, 用户需要注意在下游去重,或想办法保证foreach操作具有幂等性质。
假设输入类型为I,fn的期望签名为 fn(I) => object,即返回值类型任意(并被忽略)
参数: - pvalue (PCollection or PObject) -- 输入P类型
- fn (function) -- 变换函数
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置选项
- Results:
- None
>>> from bigflow import lazy_var >>> r = lazy_var.declare(lambda: redis.Redis(host='x.x.x.x', port=x, db=0)) >>> x = _pipeline.parallelize([("a", "1"), ("b", "4")]) >>> x.foreach(lambda (k, v): r.get().set(k, v)) >>> _pipeline.run() # all the data will be written into redis
-
bigflow.transforms.
full_join
(*pcollections, **options)¶ 对于多个输入PCollection,根据key对PCollection做全连接操作 ,连接结果为(key, (value 1, value 2, ..., value n)),若第m个PCollection没有元素, 则value m为None
参数: - *pcollections -- 输入PCollection
- **options -- 可配置选项
返回: 连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1)]) >>> y = _pipeline.parallelize([("b", 2)]) >>> transforms.full_join(x, y).get() [("a", (1, None)), ("b", (None, 2))]
-
bigflow.transforms.
group_by
(pcollection, key_extractor, value_extractor=None, **options)¶ 利用给定的key_extractor和value_extractor对输入PCollection分组,返回一个表示分组结果的PTable
参数: - pcollection (PCollection) -- 输入PCollection
- key_extractor (function) -- 用于提取key的函数
- value_extractor (function, optional) -- 用于提取value的函数
- **options -- 可配置选项
返回: 分组结果
返回类型: >>> _p = _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)]) >>> transforms.group_by(_p, lambda x: x[0], lambda x: x[1]).get() {"A": [4, 3, 1], "B": [2]}
-
bigflow.transforms.
group_by_key
(pcollection, **options)¶ 利用给定的PCollection,使用一个默认的key/value提取函数对输入的PCollection分组 ,返回一个表示分组的PTable
参数: - pcollection (PCollection) -- 输入PCollection
- **options -- 可配置选项
返回: 分组结果
返回类型: >>> _p = _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)]) >>> transforms.group_by_key(_p).get() {"A": [4, 3, 1], "B": [2]}
-
bigflow.transforms.
idl_to_str
(pcollection, **options)¶ 对于给定的PCollection,对每条数据执行idl解包。并过滤掉idl packet类型为Heartbeat和EOF的数据。
参数: - pcollection (PCollection) -- 输入
- **options --
可配置选项
log_type: idl数据类型,目前支持log_text和log_bin,默认为log_text
返回: 处理后的PCollection
返回类型:
-
bigflow.transforms.
intersection
(pcollection1, pcollection2, output_duplicated=False)¶ 对于给定的PCollection1和PCollection2,返回所有同时存在于PCollection1和PCollection2 中的元素,即取两者交集
参数: - pcollection1 (PCollection) -- 输入1
- pcollection2 (PCollection) -- 输入2
返回: 相交结果
返回类型: >>> from bigflow import transforms >>> a = _pipeline.parallelize([1, 1, 2, 3]) >>> b = _pipeline.parallelize([1, 1, 2, 2, 5]) >>> transforms.intersection(a, b).get() [1, 2] >>> transforms.intersection(a, b, output_duplicated = True).get() [1, 1, 2]
-
bigflow.transforms.
is_empty
(pcollection)¶ 对于输入PCollection,返回其是否为空
参数: pcollection (PCollection) -- 输入PCollection 返回: 表示返回结果的PObject 返回类型: PObject >>> from bigflow import transforms >>> a = _pipeline.parallelize([1, 2, 3, 4]) >>> transforms.is_empty(a).get() False >>> b = _pipeline.parallelize([]) >>> transforms.is_empty(b).get() True
-
bigflow.transforms.
join
(*pcollections, **options)¶ 对于多个输入PCollection,根据key对PCollection做内连接操作 ,连接结果为(key, (value1, value2, ..., valuen))
参数: - *pcollections -- 输入PCollection
- **options -- 可配置选项
返回: 连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1), ("b", 4)]) >>> y = _pipeline.parallelize([("a", 2), ("a", 3)]) >>> transforms.join(x, y).get() [("a", (1, 2)), ("a", (1, 3))]
-
bigflow.transforms.
left_join
(*pcollections, **options)¶ 对于多个输入PCollection,根据key对PCollection做左连接操作 ,连接结果为(key, (value 1, value 2, ..., value n)),若第m个PCollection没有元素, 则value m为None
参数: - *pcollections -- 输入PCollection
- **options -- 可配置选项
返回: 连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1), ("b", 4)]) >>> y = _pipeline.parallelize([("a", 2)]) >>> transforms.left_join(x, y).get() [("a", (1, 2)), ("b", (4, None))]
-
bigflow.transforms.
make_tuple
(*pobjects, **options)¶ 将所有输入的PObject合并成一个PObject(tuple)。
除返回值类型及输入类型都全是PObject外, 结果与
bigflow.transforms.cartesian(self, *pvalues, **options)
相同。Args: *pobjects (PObject) 待操作PObjects。所有输入都必须是PObject. :returns: 返回一个PObject(tuple), tuple中的第n个元素是第n个输入PObject对应的值。 :rtype: PObject
>>> p1 = _pipeline.parallelize(1) >>> p2 = _pipeline.parallelize(2) >>> transforms.make_tuple(p1, p2).get() (1, 2) >>> p3 = _pipeline.parallelize([3, 4]) >>> transforms.make_tuple(p1, p3).get() !!! AssertionError >>> transforms.make_tuple(p1, p1, p2).get() (1, 1, 2)
-
bigflow.transforms.
map
(pvalue, fn, *side_inputs, **options)¶ 对PCollection中的每个元素做一对一映射
对给定的PCollection/PObject中的每个元素应用一个变换函数,以函数的返回结果 构造PCollection/PObject
假设输入类型为I,fn的期望签名为 fn(I) => O
参数: - pvalue (PCollection or PObject) -- 输入P类型
- fn (function) -- 变换函数
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置选项
- Results:
- PType: 变换后的PCollection/PObject,与输入类型一致
>>> from bigflow import transforms >>> _p = _pipeline.parallelize([1, 3, 5, 7]) >>> transforms.map(_p, lambda x: x + 1).get() [2, 4, 6, 8] >>> transforms.map(_p, lambda x: [x, x * 2]).get() [[1, 2], [3, 6], [5, 10], [7, 14]]
-
bigflow.transforms.
max
(pcollection, key=None, **options)¶ 得到输入PCollection中最大的元素
参数: - pcollection (PCollection) -- 输入PCollection
- key (function, optional) -- 用于提取key的函数,与Python内置``max()``中的
key
参数相同 - **options -- 可配置选项
返回: 包含最大元素的PObject
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8]) >>> transforms.max(_p).get() 8 >>> transforms.max(_p, lambda val: -val).get() 1
-
bigflow.transforms.
max_elements
(pcollection, n, key=None, **options)¶ 得到输入PCollection中前n大的元素
参数: - pcollection (PCollection) -- 输入PCollection
- n (int/PObject) -- 必须大于0
- key (function, optional) -- 用于提取key的函数,与Python内置``max()``中的
key
参数相同 - **options -- 可配置选项
返回: 包含前n大元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8]) >>> transforms.max_elements(_p, 2).get() [8, 7] >>> n = _p.count().map(lambda x: x - 1) # n is PObject(5) >>> transforms.max_elements(_p, n, lambda val: -val).get() # key is -val >>> [1, 3, 3, 2, 7]
-
bigflow.transforms.
min
(pcollection, key=None, **options)¶ 得到输入PCollection中最小的元素
参数: - pcollection (PCollection) -- 输入PCollection
- key (function, optional) -- 用于提取key的函数,与Python内置``min()``中的
key
参数相同 - **options -- 可配置选项
返回: 包含最小元素的PObject
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8]) >>> transforms.min(_p).get() 1 >>> transforms.min(_p, lambda val: -val).get() 8
-
bigflow.transforms.
min_elements
(pcollection, n, key=None, **options)¶ 得到输入PCollection中前n小的元素
参数: - pcollection (PCollection) -- 输入PCollection
- n (int/PObject) -- 必须大于0
- key (function, optional) -- 用于提取key的函数,与Python内置``max()``中的
key
参数相同 - **options -- 可配置选项
返回: 包含前n小元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8]) >>> transforms.min_elements(_p, 2).get() [1, 2] >>> n = _p.count().map(lambda x: x - 1) # n is PObject(5) >>> transforms.min_elements(_p, n, lambda val: -val).get() # key is -val >>> [8, 3, 3, 2, 7]
-
bigflow.transforms.
pipe
(pvalue, command, **options)¶ 对于给定的PCollection/PTable,返回通过command处理后的PCollection
参数: - pvalue (PCollection/PTable) -- 输入
- command -- 命令行
- **options --
可配置选项
type: pipe类型,目前支持streaming和bistreaming,默认为streaming
buffer_size: 缓存大小(单条数据),默认64MB
input_fields_num: 输入command的一条数据有几个field,默认为1。 PTable上调用pipe不需要指定;如果PCollection上调用pipe需要输入多个field,则要指定改配置,并且PCollection的元素类型需为tuple
output_fields_num: command输出的数据有几个field,默认为1
field_delimiter: streaming模式下field的分割符,默认为tab(制表符)
返回: 处理后的PCollection
返回类型: 注解
- pipe作用于PCollection上,pipe会将数据直接发送到管道中,框架对数据如何划分不做任何保证;
2. pipe作用于PTable上,pipe会将PTable的Key和数据一起发送到管道中(支持嵌套), 并保证相同Key的数据会顺序发送到管道中,例如下列代码:
>>> from bigflow import transforms >>> p = _pipeline.parallelize({ >>> 'key1_a': { >>> 'key2_a': ['value1', 'value2'], >>> 'key2_b': ['value3', 'value4'] >>> }, >>> 'key2_b': { >>> 'key2_c': ['value5', 'value6'] >>> } >>> }) >>> transforms.pipe(p, 'cat').get()
用户程序(cat)接收到的数据为,column间默认使用制表符(tab)作为分割符:
key1_a key2_a value1
key1_a key2_a value2
key1_a key2_b value3
key1_a key2_b value4
key1_b key2_c value5
key1_b key2_c value6
3. 尽量不要在PTable上通过apply_values中使用pipe(应该使用apply), 不仅性能极差而且发送给管道的数据不包含Key;
>>> from bigflow import transforms >>> p = _pipeline.parallelize([1, 1, 2, 3]) >>> transforms.pipe(p, 'cat').get() ['1', '1', '2', '3']
>>> from bigflow import transforms >>> p = _pipeline.parallelize({'A': [1, 2], 'B': [2, 3]}) >>> transforms.pipe(p, 'cat').get() ['A 1', 'A 2', 'B 2', 'B 3']
>>> from bigflow import transforms >>> p = _pipeline.parallelize([(1, 'a'), (1, 'a'), (2, 'b'), (3, 'c')]) >>> transforms.pipe(p, 'cat', type='bistreaming', input_fields_num=2, output_fields_num=2).get() [('1', 'a'), ('1', 'a'), ('2', 'b'), ('3', 'c')]
-
bigflow.transforms.
reduce
(pcollection, fn, *side_inputs, **options)¶ 对于属于PCollection,使用给定的fn将所有元素规约为单个元素
假设输入类型为I,fn的期望签名为 fn(I1, I2) => I,即输出的类型必须与输入相同 (fn的第一个参数允许被修改)
参数: - pcollection (PCollection) -- 输入PCollection
- fn (function) -- 规约函数
- *side_inputs -- 参与运算的SideInputs
- **options -- 可配置参数
返回: 规约结果
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([1, 2, 3, 4]) >>> transforms.reduce(_p, lambda x, y: x + y).get() 10
-
bigflow.transforms.
right_join
(*pcollections, **options)¶ 对于多个输入PCollection,根据key对PCollection做右连接操作 ,连接结果为(key, (value 1, value 2, ..., value n)),若第m个PCollection没有元素, 则value m为None
参数: - *pcollections -- 输入PCollection
- **options -- 可配置选项
返回: 连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1), ("b", 4)]) >>> y = _pipeline.parallelize([("a", 2), ("a", 3)]) >>> transforms.right_join(x, y).get() [("a", (1, 2)), ("a", (1, 3))]
-
bigflow.transforms.
sort
(pcollection, reverse=False)¶ 对于输入PCollection,将其进行排序
参数: - pcollection (PCollection) -- 输入PCollection
- reverse (bool) -- 若True则降序排列,否则为升序排列
返回: 排序结果
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([3, 1, 2, 8]) >>> transforms.sort(_p).get() [1, 2, 3, 8]
-
bigflow.transforms.
sort_by
(pcollection, key, reverse=False)¶ 对于输入PCollection,使用给定的key将其进行排序
参数: - pcollection (PCollection) -- 输入PCollection
- key (function, optional) -- 用于提取key的函数,与Python内置``sort()``中的
key
参数相同。提取的key不能为None。可以返回一个key的列表,每个key都可以分别按照升序或者降序排 - reverse (bool) -- 若True则降序排列,否则为升序排列
返回: 排序结果
返回类型: >>> from bigflow import transforms >>> from bigflow.transform_impls import sort >>> _p = _pipeline.parallelize([3, 1, 2, 8]) >>> p2 = _pipeline.parallelize([(1, 2), (3, 4), (3, 5), (2, 6), (2, 4)] >>> transforms.sort_by(_p).get() >>> transforms.sort_by(p2, lambda rec:[sort.ASC(rec[0]), sort.DESC(rec[1])]).get() [1, 2, 3, 8] [(1, 2), (2, 6), (2, 4), (3, 5), (3, 4)]
注解
sort时所有元素类型必须相同,否则可能出现结果不正确。例如,元素1与元素2.0可能排序结果不正确。 但作为排序的key的多列可以类型不同。
sort/sort_by后的数据集只有以下操作可以保证顺序:
- accumulate,transform操作
- aggregate的第一个聚合函数(即签名为O+I=>O的那个函数)。
- first 和 take (暂不保证语义,未来会支持)
由于操作性质,其它操作保序也无意义(或语义不明),故不保证顺序。
3. sort后调用write并不保证顺序。如果想保证输出有序,可以参考此文档: http://bigflow.cloud/zh/rst/bigflow.output.html
-
bigflow.transforms.
str_to_idl
(pcollection, **options)¶ 对于给定的PCollection,对每条数据执行idl打包。要求输入的数据类型为str。
参数: - pcollection (PCollection) -- 输入
- **options --
可配置选项
log_type: idl数据类型,目前支持log_text和log_bin,默认为log_text
返回: 处理后的PCollection
返回类型:
-
bigflow.transforms.
substract
(pcollection1, pcollection2)¶ 此接口已废弃。请使用subtract。
-
bigflow.transforms.
subtract
(pcollection1, pcollection2)¶ 对于给定的PCollection1和PCollection2,返回所有存在于PCollection1但不在PCollection2 中的元素,相当于做容器减法
参数: - pcollection1 (PCollection) -- 作为被减数的PCollection
- pcollection2 (PCollection) -- 作为减数的PCollection
返回: 表示结果的PCollection
返回类型: >>> from bigflow import transforms >>> a = _pipeline.parallelize([1, 2, 3, 4]) >>> b = _pipeline.parallelize([1, 2, 5]) >>> transforms.subtract(a, b).get() [3, 4]
-
bigflow.transforms.
sum
(pcollection, **options)¶ 对于输入PCollection,求其所有包含元素相加的结果
参数: - pcollection (PCollection) -- 输入PCollection
- **options -- 可配置参数
返回: 表示结果的PObject
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([1, 2, 3, 4]) >>> transforms.sum(_p).get() 10
-
bigflow.transforms.
take
(pcollection, n, **options)¶ 取给定PCollection中的任意n个元素。 (如果总元素数量不足n,则返回输入pcollection)
参数: - pcollection (PCollection) -- 输入PCollection
- n (int or PObject) -- 元素数量
- **options -- 可配置参数
返回: 表示结果的PCollection
返回类型: >>> from bigflow import transforms >>> _p = _pipeline.parallelize([1, 2, 3, 4]) >>> transforms.take(_p, 3).get() [1, 2, 3] >>> _n = _pipeline.parallelize(2) >>> transforms.take(_p, _n).get() [1, 2]
>>> _n = _pipeline.parallelize([1, 2, 3, 4]) >>> transforms.take(_p, 10).get() [1, 2, 3, 4]
-
bigflow.transforms.
to_list_pobject
(pvalue, **options)¶ 对于给定的PCollection,聚合为PObject,PObject的内容为list
参数: - pvalue (PCollection) -- 输入
- **options -- 可配置选项
返回: 聚合后的list
返回类型: >>> from bigflow import transforms >>> a = _pipeline.parallelize([1, 1, 2, 3]) >>> transforms.to_list_pobject(a).get() [1, 1, 2, 3]
>>> type(a) bigflow.pcollection.PCollection
>>> a.map(lambda x: str(type(x))).get() ["<type 'int'>", "<type 'int'>", "<type 'int'>", "<type 'int'>"]
>>> b = transforms.to_list_pobject(a) >>> type(b) bigflow.pobject.PObject
>>> b.map(lambda x: str(type(x))).get() "<type 'list'>"
注解
这个是最易被滥用的一个transform。 它可以使一个PCollection转化为一个元素为list的PObject, 用户可以在后续的map操作中拿到这个list进行任意的单机操作, 在一些场景下,如复用现有单机代码时,比较有用。 但是,该变换将使得Bigflow许多优化无法执行,导致运行效率下降, 另外,在apply_values中使用时, 由于每个分组的数据必须转化为一个list, 则导致在一个分组内数据过多时,会占用大量内存资源, 甚至可能引起作业因内存占用过多而Out-Of-Memory失败。
故,使用该变换前,请三思,尽量使用以下其它算子替换掉此方法, 一些较为通用的替代方案如下(下列替换方案,按顺序前边的比后边的效率高):
-
bigflow.transforms.
to_pobject
(pvalue, **options)¶ 对于给定的PCollection/PTable,聚合为PObject,PObject的内容为list/dict
参数: - pvalue (PCollection/PTable) -- 输入
- **options -- 可配置选项
返回: 聚合后的list/dict
返回类型: >>> from bigflow import transforms >>> a = _pipeline.parallelize([1, 1, 2, 3]) >>> transforms.to_pobject(a).get() [1, 1, 2, 3]
>>> from bigflow import transforms >>> b = _pipeline.parallelize({'e': 'f', 'g': 'h'}) >>> transforms.to_pobject(b).get() {'e': 'f', 'g': 'h'}
-
bigflow.transforms.
transform
(pcollection, first_arg, *other_args, **options)¶ 对给定PCollection进行任意的变换,结果为另一个PCollection
transform有两种形式,形式一:
基本原型为`transform(pcollection, initializer, transformer, finalizer, *side_inputs, **options)`
transform将PCollection的处理分为3个阶段: 初始化,遍历及结束,分别对应于 initializer, transformer和finalizer三个处理函数。三个函数之间有一个状态 status(也可以理解为上下文context),同时有一个emitter参数可以向输出PCollection发送数据
假定输入数据类型为I,输出数据类型为O,initializer, transformer, finalizer各自的期望签名为:
initializer(emitter, *side_inputs) => status(object)
transformer(status, emitter, I, *side_inputs) => status(object) (transformer的第一个参数允许被修改)
finalizer(status, emitter, *side_inputs) => None (finalizer的第一个参数允许被修改)
emitter.emit(O)
参数: - pcollection (PCollection) -- 输入PCollection
- initializer (callable) -- 初始化函数
- transformer (callable) -- 变换函数
- finalizer (callable) -- 结束函数
- *side_inputs -- 参与计算的SideInputs
- **options -- 可配置选项
返回: 表示返回结果的PCollection
返回类型: >>> from bigflow import transforms >>> import copy >>> def initializer(emitter): >>> return [] >>> >>> def transformer(status, emitter, inp): >>> status.append(copy.deepcopy(inp)) #如果要缓存一个数据,最好复制一份。 >>> return status >>> >>> def finalizer(status, emitter): >>> emitter.emit(status) >>> >>> _p = _pipeline.parallelize([1, 2, 3]) >>> _plist = transforms.transform(_p, initializer, transformer, finalizer) >>> print _plist.count().get() # 只有一个元素,元素的内容是[1, 2, 3]这样一个列表。 1 >>> print _plist.get() [[1, 2, 3]]
形式二:
基本原型为`transform(pcollection, transformer, *side_inputs, **options)` 其中transformer应为
bigflow.base.Transformer
类的子类, Transformer.begin_process在数据开始处理前会被调用。 Transformer.process在数据开始处理时,每条数据调用一次,传入需要的数据。 Transformer.end_process在数据处理完成后被调用。 用户需要输出的数据以列表或其它可迭代对象的形式返回,其中所有元素都会被作为输出PCollection中的一个元素。 (注意,如果不需要输出请返回一个空的[],而不要返回None)>>> class SumTransformer(base.Transformer): ... ... def begin_process(self): ... self._sum = 0 ... return [] ... ... def process(self, record): ... self._sum += record ... return [] ... ... def end_process(self): ... yield self._sum ... >>> p1 = _pipeline.parallelize([1, 2, 3]) >>> transforms.transform(p1, SumTransformer).get() 6
>>> class PartialSumTransformer(base.Transformer): ... ... def begin_process(self): ... self._sum = 0 ... return [] ... ... def process(self, record): ... self._sum += record ... yield self._sum ... >>> transforms.transform(p1, PartialSumTransformer()),get() [1, 3, 6]
>>> class ZipTransformer(base.Transformer): ... ... def begin_process(self, *si): ... self.index = 0 ... lens = map(len, si) ... self.min_len = min(lens) ... return [] ... ... def process(self, inp, *si): ... if self.index < self.min_len: ... yield (inp, ) + tuple(map(lambda x: x[self.index], si)) ... self.index += 1 ... >>> p2 = _pipeline.parallelize([4, 5]).sort() >>> transforms.transform(p1, ZipTransformer(), p2).get() [(1, 4), (2, 5)]
本方法为Bigflow所提供的最底层和最复杂的变换方法,它可以表达对PCollection 的任意变换。
在有其它函数(如aggregate)能完成同样功能时,尽量不要使用该函数,框架无法了解该函数内部实现, 无法进行许多深层次的优化工作。
-
bigflow.transforms.
union
(*pvalues, **options)¶ 对于多个输入PCollection/PObject,返回包含它们所有元素的PCollection
输入PCollection必须为同类型
参数: *pvalues -- 输入PCollection/PObject 返回: 表示结果的PCollection 返回类型: PCollection >>> from bigflow import transforms >>> _p1 = _pipeline.parallelize([1, 2, 3, 4]) >>> _p2 = _pipeline.parallelize([5, 6, 7, 8]) >>> transforms.union(_p1, _p2).get() [1, 2, 3, 4, 5, 6, 7, 8]
-
bigflow.transforms.
window_into
(pcollection, window, **options)¶ 利用给定的window对输入PCollection分组,返回一个表示分组结果的PTable
参数: - pcollection (PCollection) -- 输入PCollection
- window (Window) -- 输入Window
返回: 分组结果
返回类型: