示例

示例1 – 单Stage('map-only')作业

from bigflow import base
def foo(line):
    return "".join(set(line))
p = base.Pipeline.create('spark', spark_conf=configs, runtime="JNI")
ret = p \
    .read(input.TextFile('/test.in')) \
    .map(foo)
p.write(ret, output.TextFile('/test.out'))
p.run()

生成的计划示意图如下:

../_images/plan1.png

示例2 – WordCount

words = p \
    .read(input.TextFile("/home/wangcong09/modif.small")) \
    .flat_map(lambda _: _.split())
cnt1 = words \
    .apply(transforms.count)
cnt2 = words \
    .group_by(lambda _: _) \
    .apply_values(transforms.count) \
    .flatten()
p.write(cnt1, output.TextFile('/test.out1'))
p.write(cnt2, output.TextFile('/test.out2'))

生成的计划示意图如下:

../_images/plan2.png