1. 先记住一条总原则:混用 DataStream + Table 时,用 DataStream API 配依赖
文档强调了一句非常关键的话:
如果一个 Job 里混用了 Python DataStream API 和 Python Table API,建议通过 DataStream API 去指定依赖,这样两边都能生效。
PyFlink 混用 DataStream 与 Table API 时依赖配置复杂。介绍如何通过 StreamExecutionEnvironment 统一指定 JAR、Python 包及 requirements。涵盖 pipeline.jars 上传策略、离线 pip 缓存安装、虚拟环境打包归档以及解释器路径设置。针对有网/无网集群场景提供工程化组合方案,并列出常见坑点如版本不一致或路径不可达问题,确保作业在远程集群稳定运行。
文档强调了一句非常关键的话:
如果一个 Job 里混用了 Python DataStream API 和 Python Table API,建议通过 DataStream API 去指定依赖,这样两边都能生效。
也就是:
table_env.get_config() / table_env.add_python_*StreamExecutionEnvironment 的 add_jars / add_python_file / set_python_requirements / add_python_archive / set_python_executablepipeline.jars:上传到集群(最常用)file:// 本地路径table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
Windows 示例(注意还是 file:///):
table_env.get_config().set("pipeline.jars","file:///E:/my/jar/path/connector.jar;file:///E:/my/jar/path/json.jar")
pipeline.classpaths:不上传,只加到 classpath(要求集群也能访问同路径)table_env.get_config().set("pipeline.classpaths","file:///opt/flink/jars/connector.jar;file:///opt/flink/jars/json.jar")
一句话:
pipeline.jarspipeline.classpathsadd_jars(...):上传到集群env.add_jars("file:///my/jar/path/connector1.jar","file:///my/jar/path/connector2.jar")
add_classpaths(...):加到 client + cluster classpath(同样要求可达)env.add_classpaths("file:///opt/flink/jars/connector1.jar","file:///opt/flink/jars/connector2.jar")
--jarfile 的限制适合:
my_udf.pymy_pkg/*.whl / *.egg(zip 本质)Table API:
table_env.add_python_file("/path/to/my_udf.py")
table_env.add_python_file("/path/to/my_pkg/") # 目录也可以
DataStream API:
env.add_python_file("/path/to/my_udf.py")
env.add_python_file("/path/to/my_pkg/")
等价的还有:
python.files-pyfs / --pyFiles关键点:这些会被加到 Python UDF worker 的 PYTHONPATH。
适合:
Table API:
table_env.set_python_requirements(
requirements_file_path="/path/to/requirements.txt",
requirements_cache_dir="cached_dir" # 可选
)
DataStream API:
env.set_python_requirements(
requirements_file_path="/path/to/requirements.txt",
requirements_cache_dir="cached_dir"
)
文档给了关键命令:
pip download -d cached_dir -r requirements.txt --no-binary :all:
然后把这个 cached_dir 作为 requirements_cache_dir 传进去,Flink 会上传它用于离线安装。
硬要求(很容易忽视):
等价的还有:
python.requirements-pyreq / --pyRequirements适合:
Table API:
table_env.add_python_archive("/path/to/py_env.zip","myenv")
DataStream API:
env.add_python_archive("/path/to/py_env.zip","myenv")
在 UDF 里访问(相对路径):
def my_udf():
with open("myenv/py_env/data/data.txt") as f:
...
如果没写 target_dir:
table_env.add_python_archive("/path/to/py_env.zip") # UDF 内访问:open("py_env.zip/py_env/data/data.txt")
支持格式:
注意:如果 archive 里是虚拟环境,一定要和集群平台一致。
这是很多人线上翻车的根本原因: client 侧需要 Python 来解析/编译 UDF;cluster 侧 worker 需要 Python 来执行 UDF。
Table API:
table_env.get_config().set_python_executable("/path/to/python")
DataStream API:
env.set_python_executable("/path/to/python")
env.add_python_archive("/path/to/py_env.zip","venv")
env.set_python_executable("venv/py_env/bin/python")
注意:如果指向 archive 内路径,用 相对路径,别写绝对路径。
等价方式:
python.executable-pyexec / --pyExecutableclient 端用于'编译阶段解析 Python UDF'。你可以:
source my_env/bin/activatepython.client.executable / -pyclientexec / PYFLINK_CLIENT_EXECUTABLE你可以在 Java Table API 或纯 SQL 里注册 Python UDF,例如:
tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");
但 Python 依赖依然要通过这些配置/参数提供:
python.filespython.archivespython.requirementspython.executablepython.client.executablepipeline.jars 或 env.add_jarsset_python_requirements(requirements.txt)add_python_file(my_udf.py / my_pkg/)add_python_archive(model.zip, "model")pip download -d cached_dir -r requirements.txtadd_python_file(...)add_python_archive(...)set_python_executable("venv/.../python")add_python_archive(venv.zip, "venv")set_python_executable("venv/.../python")StreamExecutionEnvironment 配pipeline.classpaths 指向本地路径,集群节点根本访问不到
→ 不确定就用 pipeline.jars / add_jars 上传
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online