仙桃网站制作网站设计昆明网络推广
系列文章:
Flyte工作流平台调研(一)——整体架构
Flyte工作流平台调研(二)——核心概念说明
Flyte工作流平台调研(三)——核心组件原理
Flyte工作流平台调研(四)——服务部署
Flyte工作流平台调研(五)——扩展集成
Flyte工作流平台调研(六)——跟Ray框架对比
Flyte工作流平台调研(七)——核心源码走读
正文:
Flyte的设计具有高度的可扩展性,可以通过多种方式进行定制。这是官方提供的贡献开发一个集成的样例文档。
以下是Flyte主要的几种集成方式:
Flytekit plugins
简介
Flytekit是Flyte提供的Python SDK,帮助用户使用Python编程设计新的Workflow。它可以解析Python代码,将其编译为有效的工作流DAG,并将其提交给Flyte执行。
Flytekit plugins就是指用户通过 Python 编写自定义代码,来增强 Flytekit(Flyte 的 Python SDK)的功能。通过这种方式,能在 Flyte 工作流(Workflow)中添加自定义任务(Task)、数据处理步骤或其他功能。
插件运行时,基本上直接依赖Python代码或者基础镜像,而不依赖其他服务。
1. Flytekit 插件的作用
Flytekit 插件允许用户在 Flyte 中集成外部工具、服务或库,或在 Flyte 的任务和工作流中引入自定义的执行逻辑。这些插件可以帮助 Flyte 执行各种类型的作业,例如运行特定的机器学习平台、数据库操作、数据验证等。
例如:
- 通过 Comet 插件,Flyte 工作流可以与 Comet.ml 进行集成,自动追踪机器学习实验。
- 通过 MLFlow 插件,Flyte 可以追踪和记录机器学习模型的训练过程。
2. Flytekit 插件的特点
- 纯 Python 实现:Flytekit 插件完全使用 Python 编写,这使得用户可以轻松地开发、测试和调试插件功能。
- 单元测试:用户可以在本地进行单元测试,确保插件功能的正确性,而无需依赖 Flyte 的集群或其他外部服务。
- 扩展 Flytekit:插件主要用于扩展 Flytekit 的功能。比如,用户可以编写插件来处理特定的任务,或将 Flyte与特定的服务(如 Comet、MLFlow)集成。
3. 插件的类型
Flytekit 插件可以分为几种类型,根据它们的功能和目标服务进行分类:
- 机器学习平台插件:如 Comet、MLFlow,这些插件允许在 Flyte 中直接集成和使用机器学习平台,用于模型追踪、实验管理等。
- 数据处理插件:如 Great Expectations、Pandera,用于验证数据质量、进行数据验证等操作。
- 数据库和查询插件:如 Dolt、DuckDB、SQL 插件,提供数据存储、数据库查询等功能。
- 服务集成插件:如 AWS SageMaker、Airflow,这些插件允许将 Flyte 与 AWS 服务或其他外部系统(如 Airflow、BigQuery)集成,进行模型推理、任务执行等。
实现方式
本文档以Flyte的SQLAlchemy这个插件为例进行说明。具体实现一个自定义插件的方式是:
1. 下载Flytekit的代码:
Flytekit的代码仓库地址:GitHub - flyteorg/flytekit: Extensible Python SDK for developing Flyte tasks and workflows. Simple to get started and learn and highly extensible.
代码主要路径:
说明:flytekit是核心源码的实现,plugins是插件的实现。
2. 在代码中实现插件具体的代码
1)创建在plugins路径中创建对应的插件包,比如:
2)在代码中实现自定义的task或者workflow(这个插件只有task)
3)定义自定义task的自定义配置
4)定义运行当前Task的工作流Pod(也就是一个Node)默认依赖的镜像
3. 构建插件依赖的镜像
如果第1步中默认的镜像不是镜像仓库中已经存在的镜像,还需要自己构建当前自定义task的镜像,并上传到镜像仓库。
4. 构建并发布插件
使用项目根目录下创建 setup.py
,用于打包插件:
from setuptools import setup, find_packagessetup(name="my_flytekit_plugin",version="0.1.0",description="A custom Flytekit plugin",author="Your Name",packages=find_packages(),install_requires=["flytekit", "requests"],
)
构建并安装插件:
pip install .
将插件发布到 PyPI,便于其他 Flyte 用户使用:
python setup.py sdist bdist_wheel
twine upload dist/*
使用方式
还是以Flyte的SQLAlchemy这个插件为例进行说明。
1. 安装 Flyte 的 SQLAlchemy 插件
pip install flytekitplugins-sqlalchemy
导入所需的库:
from flytekit import kwtypes, task, workflow
from flytekit.types.schema import FlyteSchema
from flytekitplugins.sqlalchemy import SQLAlchemyConfig, SQLAlchemyTask
2. 定义 SQLAlchemy Task
首先,定义一个 SQLAlchemyTask
,它将从 RNA 中央数据库的 rna
表返回前 n 条记录。由于该数据库是公共的,可以将数据库 URI 硬编码在字符串中,包括用户名和密码。
注意:
SQLAlchemyTask
的输出默认是FlyteSchema
。
警告:
切勿存储用于私有或敏感数据库的密码!
DATABASE_URI = "postgresql://reader:NWDMCE5xdipIjRrp@hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs"# 定义查询的输出 schema,我们稍后在 `get_mean_length` 任务中复用
DataSchema = FlyteSchema[kwtypes(sequence_length=int)]sql_task = SQLAlchemyTask("rna",query_template=""" select len as sequence_length from rna where len >= {{ .inputs.min_length }} and len <= {{ .inputs.max_length }} limit {{ .inputs.limit }}""",inputs=kwtypes(min_length=int, max_length=int, limit=int),output_schema_type=DataSchema,task_config=SQLAlchemyConfig(uri=DATABASE_URI),
)
3. 定义计算平均长度的任务
接下来,定义一个任务,用于计算我们查询返回的 RNA 序列子集的平均长度。注意,如果你在 Flyte 后端中运行此任务(通过 pyflyte run
),如果没有指定镜像,pyflyte run
会使用默认的 flytekit
镜像。默认的 flytekit
镜像没有安装 sqlalchemy
插件。为了正确启动此任务的执行,需要使用以下命令。
bash
复制代码
pyflyte --config ~/.flyte/your-config.yaml run --destination-dir /app --remote --image ghcr.io/flyteorg/flytekit:py3.8-sqlalchemy-latest integrations/flytekit_plugins/sql/sql_alchemy.py my_wf --min_length 3 --max_length 100 --limit 50
另外,添加了destination-dir
参数,因为pyflyte run
默认会将代码复制到/root
目录,而该镜像的工作目录被设置为/app
。
@task
def get_mean_length(data: DataSchema) -> float:dataframe = data.open().all()return dataframe["sequence_length"].mean().item()
4. 将任务组合成一个工作流
最后,将所有任务组合成一个工作流:
@workflow
def my_wf(min_length: int, max_length: int, limit: int) -> float:return get_mean_length(data=sql_task(min_length=min_length, max_length=max_length, limit=limit))
运行代码
if __name__ == "__main__":print(f"Running {__file__} main...")print(my_wf(min_length=50, max_length=200, limit=5))
这样就完成了一个基于 SQLAlchemy 的 Flyte 工作流,能够从公共 PostgreSQL 数据库中查询 RNA 序列的长度,并计算这些序列的平均长度。
Native backend plugins
简介
原生后端插件是指直接依赖 Flyte 的原生计算和编排能力(通常是 Kubernetes 集群中的工作负载),无需额外的外部服务支持。这种插件由 Flyte 的内部组件(如 FlytePropeller 和 Kubernetes)协同管理,用于运行特定类型的工作负载或任务。
比如Native backend的Spark 插件,支持执行Spark Task,使用这种插件时,执行Spark Task时,会先直接在Flyte的k8s集群中启动一个Spark集群,然后再启动Task Pod去访问这个Spark集群执行 Task的具体内容。Spark Task Pods和Spark Cluster Pods都是FlytePropeller启动的,而FlytePropeller具有启动Spark Cluster Pods的能力就是这个插件扩展的,这个插件集成在FlytePropeller中,调用Spark的k8s operator启动一个Spark集群。
主要插件类型
- 分布式训练:
- Kubeflow PyTorch:运行 PyTorch 分布式训练任务。
- Kubeflow TensorFlow:运行 TensorFlow 分布式训练任务。
- MPI Operator:运行基于 Horovod 的深度学习训练任务。
- 大数据处理:
- Kubernetes Cluster Spark:运行 Spark 作业。
- Kubernetes Cluster Dask:运行 Dask 作业。
- 灵活任务:
- Kubernetes Pods:运行任意 Kubernetes Pod 工作负载。
- Ray:运行 Ray 分布式作业。
主要作用
Native Backend Plugins 的主要作用是提供一种无缝的方式,使用 Kubernetes 原生能力执行特定类型的工作负载,同时保持对 Flyte 工作流的高度集成。以下是其具体作用:
- 高效资源管理
Flyte 利用 Kubernetes 的资源调度能力,根据任务的资源需求动态分配和管理资源(如 CPU、内存、GPU)。 - 分布式任务支持
原生插件支持常见的分布式计算框架(如 PyTorch、TensorFlow、Spark、Ray),简化了大规模分布式任务的开发和运行。 - 统一工作流编排
无需外部服务支持,用户可以将各种任务类型统一集成到 Flyte 的工作流中,实现端到端的数据处理和模型训练。 - 简化操作和维护
通过 Kubernetes 的原生能力,Native Backend Plugins 避免了对外部服务的依赖,简化了部署和运维。
使用方式
这里以一个Ray插件为例,通过Flyte的Ray插件提交一个Ray任务。
1. flytekitplugins-ray基本说明
KubeRay 是一个开源工具包,旨在简化在 Kubernetes 上运行 Ray 应用程序的过程。它提供了一系列工具,增强了在 Kubernetes 上运行和管理 Ray 的操作能力。
插件的关键组件:
- Ray Operator
- 用于集群资源创建和删除的后台服务
- 管理 CRD(Custom Resource Definition)对象的
kubectl
插件/命令行工具 - 与集群功能无缝集成的作业(Jobs)和服务(Serving)功能
2. 安装插件
pip install flytekitplugins-ray
3. 如果直接给已存在的Ray集群提交任务
@ray.remote
def f(x):return x * x@task(task_config=RayJobConfig(address=<RAY_CLUSTER_ADDRESS>runtime_env={"pip": ["numpy", "pandas"]})
)
def ray_task() -> typing.List[int]:futures = [f.remote(i) for i in range(5)]return ray.get(futures)
4. 使用插件通过Flyte,提交一个Ray任务,并且Ray集群也有Flyte管理
@task(task_config=RayJobConfig(worker_node_config=[WorkerNodeConfig(group_name="test-group", replicas=10)]))
def ray_task() -> typing.List[int]:futures = [f.remote(i) for i in range(5)]return ray.get(futures)
5. 在 Flyte 集群上运行示例
要在 Flyte 集群上运行上述示例,使用以下命令:
pyflyte run --remote ray_example.py \ray_workflow --n 10
此命令将远程运行 ray_example.py
脚本中的 ray_workflow
,并传递参数 --n 10
。Flyte会先通过KubeRay在自己的K8S集群中创建一个Ray Cluster,然后再运行一个Pod把Ray任务提交到这个Ray Cluster中。
Flyte Agent
简介
Flyte Agent 是一种 长时间运行的无状态服务,通过 gRPC 接收执行请求,并与适当的外部或内部服务交互以启动任务。它的功能定位在支持特定类型的任务,并通过插件化扩展为不同的服务场景。
简单来说,就是在Flyte的K8S集群中启动一个无状态的Agent Pod,执行这种类型的任务时,这个Agent Pod给真正的服务转发请求。
1. 基本的工作流程:
- 任务触发:当用户触发特定类型的任务时,FlytePropeller 将通过 gRPC 向相应的 Agent 发送请求。
- 任务处理:Agent 接收到请求后,会与指定的服务交互(例如 BigQuery、AWS SageMaker 等),并启动对应的任务。
- 运行环境:每个 Agent 是一个 Kubernetes 部署,运行于 Flyte 的集群中。
2. 核心特点
- gRPC 通信:
- Flyte Agent 是通过 gRPC 接收来自 FlytePropeller 的任务请求的。
- 这些任务通常是特定类型(如 BigQuery、Databricks)的工作负载。
- Kubernetes 部署:
- 每个 Agent 是一个 Kubernetes 部署,运行在集群中,专注于某类任务的管理。
- Agent 本身无状态,任务的状态由后端服务(如 BigQuery 或 SageMaker)提供。
- 任务初始化:
- Agent 服务在接收任务后,会与适配的服务进行交互,启动作业或任务。
- 插件化扩展:
- Flyte 提供了许多预构建的 Agent(如 Airflow Agent、Snowflake Agent)。
- 用户可以根据需求开发自定义 Agent,用以支持其他任务类型。
3. 常见的 Flyte Agents
以下是一些内置的 Flyte Agents 及其用途:
- AWS SageMaker Inference Agent:部署模型、创建推理端点并触发推理任务。
- Airflow Agent:在 Flyte 的工作流中运行 Airflow 作业。
- BigQuery Agent:在 Flyte 中运行 BigQuery 查询任务。
- ChatGPT Agent:在工作流中运行 ChatGPT 相关任务。
- Databricks Agent:在 Flyte 中运行 Databricks 作业。
- Memory Machine Cloud Agent:使用 MemVerge Memory Machine Cloud 执行任务。
- OpenAI Batch Agent:提交 OpenAI 异步批处理请求。
- PERIAN Job Platform Agent:在 PERIAN 平台上执行任务。
- Sensor Agent:用于工作流中的传感器任务。
- Snowflake Agent:在 Snowflake 中运行作业。
使用方式
以一个ChatGPT agent为例,ChatGPT 可用于多种场景,例如情感分析、语言翻译、SQL 查询生成以及文本摘要。以下示例展示了如何在 Flyte 中运行 ChatGPT 任务:
1. 安装
pip install flytekitplugins-openai
2. 样例代码
from typing import List
import flytekit
from flytekit import ImageSpec, Secret, dynamic, task, workflow
from flytekitplugins.openai import ChatGPTTask# 需要指定 name、openai_organization 和 chatgpt_config
# name:用于 Flyte,必须唯一。
# openai_organization:OpenAI API 的组织 ID,可以在 [此处](https://platform.openai.com/account/org-settings) 找到。
# chatgpt_config:用于 OpenAI Chat Completion 的配置,可参考 [API 文档](https://platform.openai.com/docs/api-reference/completions)。chatgpt_small_job = ChatGPTTask(name="3.5-turbo",openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",chatgpt_config={"model": "gpt-3.5-turbo","temperature": 0.7,},
)chatgpt_big_job = ChatGPTTask(name="gpt-4",openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",chatgpt_config={"model": "gpt-4","temperature": 0.7,},
)@workflow
def my_chatgpt_job(message: str) -> str:message = chatgpt_small_job(message=message)message = chatgpt_big_job(message=message)return message# 本地运行工作流
if __name__ == "__main__":print(f"Running {__file__} main...")print(f"Running my_chatgpt_job(message='hi') {my_chatgpt_job(message='hi')}")
Agent的部署配置
如果使用的是 Flyte 的托管部署,需要联系部署管理员以在部署中配置代理。比如要在 Flyte 部署中启用 ChatGPT 代理,需要在代理服务器中设置 OpenAI API 密钥才能运行 ChatGPT 任务。详细的方式如下:
1. 指定代理配置
1) 通过Flyte binary
编辑相关的 YAML 文件以指定代理配置:
kubectl edit configmap flyte-sandbox-config -n flyte
在 YAML 文件中添加以下配置:
tasks:task-plugins:enabled-plugins:- container- sidecar- k8s-array- agent-servicedefault-for-task-types:- container: container- container_array: k8s-array- chatgpt: agent-serviceplugins:agent-service:# 配置超时是可选的。# 像使用大模型的 ChatGPT 任务可能需要更长的时间,# 因此可以在这里调整超时设置。defaultAgent:timeouts:ExecuteTaskSync: 10s
2) 通过Flyte core
创建一个名为values-override.yaml的文件,并向其中添加以下配置:
configmap:enabled_plugins:# -- Tasks specific configuration [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig)tasks:# -- Plugins configuration, [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig)task-plugins:# -- [Enabled Plugins](https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config). Enable sagemaker*, athena if you install the backendenabled-plugins:- container- sidecar- k8s-array- agent-servicedefault-for-task-types:container: containersidecar: sidecarcontainer_array: k8s-arraychatgpt: agent-serviceplugins:agent-service:# Configuring the timeout is optional.# Tasks like using ChatGPT with a large model might require a longer time,# so we have the option to adjust the timeout setting here.defaultAgent:timeouts:ExecuteTaskSync: 10s
2. 添加 OpenAI API 密钥
1) 使用 Helm 安装 flyteagent
pod:
helm repo add flyteorg https://flyteorg.github.io/flyte
helm install flyteagent flyteorg/flyteagent --namespace flyte
2)将 OpenAI API 密钥设置为秘密(Base64 编码):
SECRET_VALUE=$(echo -n "<OPENAI_API_TOKEN>" | base64) && \
kubectl patch secret flyteagent -n flyte --patch "{\"data\":{\"flyte_openai_api_key\":\"$SECRET_VALUE\"}}"
3)重启开发环境
kubectl rollout restart deployment flyteagent -n flyte
3. 升级 Flyte 的 Helm 发行版
1)使用 Flyte binary
helm upgrade <RELEASE_NAME> flyteorg/flyte-binary -n <YOUR_NAMESPACE> --values <YOUR_YAML_FILE>
将 <RELEASE_NAME>
替换为您的发行版名称(例如 flyte-backend
),<YOUR_NAMESPACE>
替换为您的命名空间名称(例如 flyte
),<YOUR_YAML_FILE>
替换为您的 YAML 文件名称。
2)使用Flyte core
helm upgrade <RELEASE_NAME> flyte/flyte-core -n <YOUR_NAMESPACE> --values values-override.yaml
将 <RELEASE_NAME>
替换为您的发行版名称(例如 flyte
),
将 <YOUR_NAMESPACE>
替换为您的命名空间名称(例如 flyte
)。
其他集成方式
除了前面三种主要的集成方式,Flyte还有几种集成方式,分布如下:
External service backend plugins
外部服务后端插件,正如其名称所示,这些插件依赖于外部服务来处理使用这些插件定义的 Flyte 任务中的工作负载。
特点:
- 依赖外部服务:这些插件不是通过 Flyte 自身直接处理任务,而是将任务交由外部服务完成,例如 AWS Athena、AWS Batch、Hive 等。
- 支持大规模计算任务:借助这些插件,Flyte 可以处理需要大规模资源或专用计算平台的工作负载,例如批量作业、SQL 查询和分布式计算。
- 灵活性和扩展性:Flyte 的任务定义通过插件扩展了支持范围,使其能够与多种外部服务无缝集成,满足不同工作流的需求。
- 异步执行能力:外部服务插件通常支持异步工作流,Flyte 会等待外部服务完成任务并返回结果。
原理:
- 任务提交:Flyte 的任务在定义时通过插件标记为特定类型(例如,Athena 查询、Batch 任务)。
- 任务调度:Flyte 将任务的执行请求发送到相应的外部服务(如 AWS Batch 或 Hive)。
- 任务处理:外部服务根据 Flyte 提供的配置处理任务,例如执行 SQL 查询或运行容器化的批处理任务。
- 结果返回:一旦外部服务完成任务,Flyte 接收返回结果并继续后续的工作流步骤。
这种模式允许 Flyte 保持轻量化,同时利用强大的外部计算服务来满足复杂工作流的需求。
主要类型:
- AWS Athena:使用 AWS Athena 执行查询。
- AWS Batch:在 AWS Batch 服务上运行任务和工作流。
- Flyte Interactive:使用 Flyte Interactive 执行任务以进行调试。
- Hive:在工作流中运行 Hive 作业。
SDKs for writing tasks and workflows
用于编写任务和工作流的 SDK,社区非常乐意帮助您构建新的 SDK。目前可用的 SDK 包括:
- flytekit:Flyte 的 Python SDK。
- flytekit-java:Flyte 的 Java/Scala SDK。
通过这些 SDK,开发者可以更好地利用 Flyte 的功能来构建分布式任务和复杂工作流,同时保持开发过程的高效性和灵活性。
Flyte operators
Flyte 提供Operator,可以与其他编排工具集成,帮助用户在这些工具中原生利用 Flyte 的构建功能。
目前集成了Airflow,从 Airflow 中触发 Flyte 的执行。