BasePipeline

Pipeline基类定义

class bigflow.pipeline.pipeline_base.PipelineBase(**options)

基类:object

Pipeline基类

add_archive(file_path, resource_path)

向Pipeline添加一个压缩文件,使得该文件在运行期自动被解包

参数:
  • file_path (str) -- 文件路径,目前仅支持HDFS
  • resource_path (str) -- 运行期访问该文件解压后的路径
add_cache_id(cache_id)

save the ptype cache node id for use

add_directory(dir_path, resource_path='', is_only_python_files_accepted=False)

向Pipeline添加一个目录,使得该目录下的文件/子目录能够在运行期被访问

参数:
  • dir_path (str) -- 需要添加的本地目录路径
  • resource_path (str) -- 计算引擎运行时访问该目录的路径, 应是相对路径. 如未提供, 则 dir_path 下的所有 文件和子目录将会在远端的当前目录.
  • is_only_python_files_accepted (bool) -- 是否仅添加目录下的.py/.pyc/.egg文件,默认为False

注解

如 resource_path 未提供, 且调用了多次 add_directory, 各个目录下的文件或者子目录如果存在重 名, 则为未定义行为. 有可能在添加时就出错, 也有可能在远端启动出错. 为避免该情况, 可以为每个要添 加的 dir_path 设置惟一的 resource_path.

add_egg_file(file_path)

向Pipeline添加一个egg文件,使得该文件会在运行期自动被添加到PYTHONPATH中

参数:file_path (str) -- egg文件路径
add_file(source, resource_path=None, executable=False, from_bytes=False)

向Pipeline添加单个文件,使得该文件能够在运行期被访问

参数:
  • source (str) -- 需要添加的文件路径或者比特数据流
  • resource_path (str) -- 计算引擎运行时访问该文件的本地路径, 应是相对路径. 也即在远端, source 将会被映射 成该 resource_path 路径, 用户程序可以直接用该路径访问.
  • executable (bool) -- 若为True,则该文件在运行期会被添加可执行属性
  • from_bytes (bool) -- 若为True, 则表示远端文件的内容即为 source 的比特数据流, 默认为 False, source 应为一个文件路径
>> pipeline.add_file('/path/to/local/or/hdfs/file', './target_path_a')
>> def remote_filter(self):
..     # This is a dumb example, don't do this in production,
..     import os
..     return os.path.isfile('./target_path_a')
>> pc = pipeline.parallelize([1,2,3])
>> filtered_pc = pc.filter(remote_filter)
>> print pc.diff(filtered_pc).count().get()
.. 0
async_run()

立刻运行Pipeline并等待作业提交完成

Raises:BigflowRuntimeException -- 若运行期出错抛出此异常
config()

获得job config

返回:用户不应当修改此Pipeline job config
返回类型:JobConfig
default_objector()

返回该Pipeline的默认序列化/反序列化器

返回:序列化/反序列化器,用户不应当修改此objector
返回类型:Objector
get(pvalue)

将一个P类型表示的数据汇聚为内存变量,相当于调用pvalue.get()。改方法隐式调用pvalue.cache() 并立即触发Pipeline.run()

参数:pvalue (PType) -- P类型实例
返回:内存变量
返回类型:object
id()

得到表示该Pipeline的唯一ID

返回:Pipeline ID
返回类型:int
parallelize(dataset, **options)

将一段内存变量映射为一个P类型实例

参数:
  • dataset (object) -- 任意类型的内存变量
  • options -- serde: 设置dataset的serde对象
返回:

表示该内存变量的P类型

返回类型:

PType

plan(cache_id)
read(source)

将外部存储的数据映射为一个PCollection,并在运行时读取数据

参数:source (Source) -- 表示外部存储的Source实例
返回:读取结果
返回类型:PCollection
reset_all_counters()

将所有counter清零

Raises:error.BigflowRuntimeException -- 此方法不允许在 Bigflow变换 的用户自定义方法(UDF)中调用,否则抛出此异常
reset_counter(name)

将一个counter清零, 若 name 中不包含 group 部分, 则默认将 Flume group 下面对应的 counter 清零

参数:name (str) -- counter名称,其说明请参考 counter模块
Raises:error.BigflowRuntimeException -- 此方法不允许在 Bigflow变换 的用户自定义方法(UDF)中调用,否则抛出此异常
run()

立刻运行Pipeline并等待结束

Raises:BigflowRuntimeException -- 若运行期出错抛出此异常
set_fini_hook(name, fn)

向Pipeline设置一个结束钩子,使得Pipeline能够在进程结束前依次执行 :param name: 钩子名称 :type name: str :param fn: 方法名称 :type fn: callable

set_init_hook(name, fn)

向Pipeline设置一个初始化钩子,使得Pipeline能够在进程启动时依次执行(按钩子名字排序) :param name: 钩子名称, 用户应使用 str 作为钩子名字, 同时钩子应使用 ascii 码中字符. :type name: str :param fn: 方法名称 :type fn: callable

write(pcollection, target)

将一个PCollection映射为外部存储数据,并在运行期写到该外部存储

参数:
  • pcollection (PCollection) -- 要写出的PCollection
  • target (Target) -- 表示外部存储的Target实例