【Ubuntu datasophon1.2.1 二开之九:验证离线数据入湖】

【Ubuntu datasophon1.2.1 二开之九:验证离线数据入湖】

Ubuntu datasophon1.2.1 二开之九:验证离线数据入湖

背景

前面一篇已经验证在线数据入湖了,最后一步就是验证离线数据入湖。虽然已经做好坑坑洼洼的准备。但是困难比预想要多得多。花时间要多得多。为了最后一哆嗦,坚持坚持坚持,终于胜利!

环境准备

1. 在datasophon安装好dolphinscheduler 3.1.8

配置租户

在这里插入图片描述

hdfs就是linux用户

创建环境

在这里插入图片描述


告诉ds spark3位置,hdfs位置

修改配置文件

修改bin\env\dolphinscheduler_env.sh

在这里插入图片描述


在这里插入图片描述


总结一下:
1.修改数据库类型为mysql,指定数据库url,用户名及密码
2.修改zk地址
3.修改hadoop位置及它配置目录
4.jdk位置
5.hive及flink位置
修改 work-server\bin\start.sh
就修改一行:
原来:

$JAVA_HOME/bin/java $JAVA_OPTS \ -cp "$DOLPHINSCHEDULER_HOME/conf":"$DOLPHINSCHEDULER_HOME/libs/*" \ org.apache.dolphinscheduler.server.worker.WorkerServer 

改成:

$JAVA_HOME/bin/java $JAVA_OPTS \ -cp "$DOLPHINSCHEDULER_HOME/conf":"$DOLPHINSCHEDULER_HOME/libs/*":${HADOOP_CONF_DIR}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/hdfs/* \ org.apache.dolphinscheduler.server.worker.WorkerServer 

启动时传入hdfs配置目录及home目录

2. 升级spark3版本

datasophon 自带spark3.1.3 ,不支持paimon,换句话paimon,支持spark 从3.2版本开始。必须升级,否则报类找不到,这个问题耽搁我好长时间,paimon从0.7-0.9都试,还试了kafka-connect方式。

数据加工流向图

在这里插入图片描述

遇到坑及填平方法

1.现象: 经典的 NoClassDefFoundError,例如 org/apache/spark/kafka010/KafkaConfigUpdater 和 org/apache/spark/sql/connector/write/Write。

填平方案:

依赖分析: 通过错误栈和GitHub issue确认,KafkaConfigUpdater 类属于 Spark 而非 Paimon,且需要额外的 spark-token-provider-kafka 等JAR包。

版本匹配: 最终放弃在Spark 3.1.3上挣扎,将Spark升级到与Paimon 0.9.0兼容的3.2.4版本,从根本上解决了 Write 类找不到的API不兼容问题。

依赖管理: 明确需要通过 --jars 参数或DataSophon资源中心,将 paimon-spark-3.2-0.9.0.jar 等所有依赖JAR包完整地提供给Spark任务。

2. Spark与Paimon版本不兼容

现象: 使用Paimon 0.8.2或0.9.0时,均报告 Write 类找不到。

填平方案:

放弃低版本组合: 确认Paimon从某个版本开始强依赖Spark 3.2的DataSource V2 API,与你的Spark 3.1.3环境不兼容。

升级Spark: 决定将Spark从 3.1.3 升级到 3.2.4。这一步是解决问题的关键转折点,虽然需要升级组件,但一劳永逸地解决了兼容性问题。

3. HDFS权限问题

现象: 建表或写入时报 Permission denied,用户 root123 无法在 /user/paimon/warehouse 或 /user/hive/warehouse 下创建目录。

填平方案:

使用 sudo -u hdfs: 切换到HDFS超级用户执行权限修复命令。

递归授权: 执行 hdfs dfs -chown -R root123:supergroup /user/paimon 和 hdfs dfs -chmod -R 755 /user/paimon,将目录所有者改为当前用户并赋予写权限。

调整Hive仓库权限: 类似地,对 /user/hive/warehouse 目录也进行了权限调整,确保了元数据操作顺畅。

4. 元数据存储方式选择

现象: 使用Paimon默认的文件系统Catalog时,元数据直接存在HDFS上,导致并发问题和权限困扰。

填平方案:

切换为Hive Metastore: 最终决定使用 Hive Metastore 作为Paimon的Catalog。配置如下:

text
–conf spark.sql.catalog.paimon.metastore=hive
–conf spark.sql.catalog.paimon.uri=thrift://ddp1:9083
明确数据存储路径: 同时指定 spark.sql.catalog.paimon.warehouse,实现元数据(在MySQL/Hive)与数据文件(在HDFS)的分离,使整个架构更清晰、更稳定。

5. 环境与组件升级

现象: 旧版Spark 3.1.3成为兼容性瓶颈,且DataSophon管理着多个组件,升级有顾虑。

填平方案:

确认兼容性: 检查并确认Hadoop 3.3.3、Hive 3.1.3、Kafka 2.4.1等核心组件与Spark 3.2.4和JDK 11兼容。

分步升级: 选择保持JDK 8不变,仅升级Spark到3.2.4,将影响面降到最低,成功绕过JDK升级的风险。

6.Spark 找不到 Kafka 数据源

错误信息:

text
Failed to find data source: kafka
原因: Spark 默认不包含 Kafka 集成包。

解决方案:

bash

下载匹配版本的 Kafka 依赖

spark-sql-kafka-0-10_2.12-3.2.0.jar
kafka-clients-2.8.0.jar
commons-pool2-2.11.1.jar

在 DS Spark 任务的"选项参数"中添加 --jars

–jars /path/to/spark-sql-kafka-0-10_2.12-3.2.0.jar,/path/to/kafka-clients-2.8.0.jar,/path/to/commons-pool2-2.11.1.jar
关键点:

版本必须与 Spark 版本匹配

commons-pool2 是 Kafka 消费者的传递依赖,容易遗漏

7.Paimon 表创建失败(Derby 权限问题)

错误信息:

text
ERROR XBM0H: Directory /…/metastore_db cannot be created.
java.io.FileNotFoundException: derby.log (Permission denied)
原因: Spark 默认使用嵌入式 Derby 作为 Hive Metastore,没有配置 Hive Metastore 时会尝试在临时目录创建 Derby 数据库。

解决方案: 配置使用外部的 Hive Metastore

bash
–conf spark.sql.catalog.paimon.metastore=hive
–conf spark.sql.catalog.paimon.uri=thrift://hive-metastore-host:9083
–conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

8.Kafka Topic 不存在

错误信息:

text
UNKNOWN_TOPIC_OR_PARTITION
原因: 脚本中配置的 topic 名称与实际不符。

解决方案:

bash

查看所有 topic

kafka-topics.sh --bootstrap-server kafka-host:9092 --list

使用正确的 topic 名称

kafka_topic = “user_log_topic” # 而不是 “user_log”

9.Spark 版本不匹配导致类找不到

错误信息:

text
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig
原因: Spark 版本与 Kafka 依赖版本不匹配。

解决方案:

Spark 3.2.0 → 使用 spark-sql-kafka-0-10_2.12-3.2.0.jar 和 kafka-clients-2.8.0.jar

不能混用 3.2.4 的包

10.DolphinScheduler 参数换行问题

错误信息:

text
–conf: command not found
原因: DS 的"选项参数"中换行导致命令被分割。

解决方案: 所有参数写在一行,不要换行

properties
–jars /path/to/jar1.jar,/path/to/jar2.jar --conf key1=value1 --conf key2=value2

11.Shell 脚本换行符问题

错误信息:

text
$‘\r’: command not found
原因: 在 Windows 上编辑的脚本上传后带有 \r\n 换行符。

解决方案:

bash

转换换行符

sed -i ‘s/\r$//’ script.sh

或直接在 DS 的脚本框中编写,不上传文件

12.Shell 脚本中 Spark 命令缺少配置

错误信息:

text
ERROR XBM0H: Directory /…/metastore_db cannot be created.
原因: Shell 脚本中的 spark-sql 命令没有配置 Hive Metastore。

解决方案: 在 Shell 脚本中的每个 spark-sql 命令都要加上完整配置:

bash
${SPARK_HOME}/bin/spark-sql
–conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog
–conf spark.sql.catalog.paimon.metastore=hive
–conf spark.sql.catalog.paimon.uri=thrift://ddp1:9083
–conf spark.sql.catalog.paimon.warehouse=hdfs://…/warehouse
–conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

13.CSV 导入 ClickHouse 格式错误

错误信息:

text
Code: 117. DB::Exception: Expected end of line: … (INCORRECT_DATA)
原因:

CSV 中的 JSON 字段包含逗号和引号,干扰了 CSV 解析

时间格式 2026-03-29T10:00:00.000Z 不是 ClickHouse 默认格式

解决方案: 使用 TSV 格式替代 CSV

bash

将 CSV 转换为 TSV(逗号 → 制表符)

cat data.csv | sed ‘s/,/\t/g’ | clickhouse client --query “INSERT INTO table FORMAT TSV”

14.Shell 脚本 while 循环只执行一次

错误信息: 循环内的命令导致循环提前退出。

原因: clickhouse-client 命令失败或 echo 干扰了循环变量。

解决方案: 放弃 while 循环,使用管道直接导入

bash

不推荐:while 循环逐行插入

while read line; do
clickhouse-client --query “INSERT …”
done < file.csv

推荐:管道批量导入

cat file.csv | clickhouse-client --query “INSERT INTO table FORMAT TSV”

15. Paimon 表写入失败(表不存在)

错误信息:

text
Schema file not found in location paimon.default.ods_user_log. Please create table first.
解决方案: 先创建表,再写入数据

sql
CREATE TABLE IF NOT EXISTS paimon.default.ods_user_log (…) USING paimon;
或在 Python 脚本中自动创建:

python
if not spark.catalog.tableExists(“paimon.default.ods_user_log”):
spark.sql(“CREATE TABLE …”)

最后

成果截图:

在这里插入图片描述
在这里插入图片描述

各个节点对应脚本
1.kafka_to_paimon

from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit, from_json, to_timestamp from pyspark.sql.types import StructType, StructField, StringType, TimestampType import sys import os bizdate = sys.argv[1]iflen(sys.argv)>1else os.environ.get("bizdate","2026-03-29") spark = SparkSession.builder \ .appName(f"KafkaToPaimon_Batch_{bizdate}") \ .config("spark.sql.catalog.paimon","org.apache.paimon.spark.SparkCatalog") \ .config("spark.sql.catalog.paimon.warehouse","hdfs://nameservice1/user/paimon/warehouse") \ .config("spark.sql.catalog.paimon.metastore","hive") \ .config("spark.sql.catalog.paimon.uri","thrift://ddp1:9083") \ .config("spark.sql.extensions","org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") \ .getOrCreate()print(f"开始处理业务日期: {bizdate}") # 定义 schema json_schema =StructType([StructField("user_id",StringType(), True),StructField("event_time",StringType(), True),StructField("event_type",StringType(), True),StructField("data",StringType(), True)]) # Kafka 配置 kafka_bootstrap_servers ="ddp4:9092,ddp3:9092" kafka_topic ="user_log_topic" # 检查表是否存在,如果不存在则创建 print("检查表是否存在...")try: # 尝试查询表 spark.sql("SELECT 1 FROM paimon.default.ods_user_log LIMIT 1")print("表已存在") except Exception:print("表不存在,正在创建...") create_sql =""" CREATETABLE paimon.default.ods_user_log( user_id STRING, event_time TIMESTAMP, event_type STRING, data STRING, dt STRING)USING paimon PARTITIONEDBY(dt)TBLPROPERTIES('bucket'='4','file.format'='parquet')""" spark.sql(create_sql)print("表创建成功") # 从 Kafka 读取数据 df = spark.read \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \ .option("subscribe", kafka_topic) \ .option("startingOffsets","earliest") \ .option("endingOffsets","latest") \ .load() \ .selectExpr("CAST(value AS STRING) as json_str") # 解析 JSON parsed_df = df.select(from_json(col("json_str"), json_schema).alias("data")).select(col("data.user_id"),to_timestamp(col("data.event_time")).alias("event_time"),col("data.event_type"),col("data.data"),lit(bizdate).alias("dt")).filter(col("user_id").isNotNull()) count = parsed_df.count()print(f"从 Kafka 读取到 {count} 条有效数据")if count >0:print("\n数据样例(前5条):") parsed_df.show(5, truncate=False) # 写入数据 parsed_df.write \ .format("paimon") \ .mode("append") \ .insertInto("paimon.default.ods_user_log")print(f"✅ 写入完成,共 {count} 条记录")else:print("没有数据需要写入") spark.stop()

2.paimon写入ck(实际是写入hdfs,忘记改名称)

from pyspark.sql import SparkSession bizdate ="2026-03-29" spark = SparkSession.builder \ .appName(f"PaimonToClickHouse_{bizdate}") \ .config("spark.sql.catalog.paimon","org.apache.paimon.spark.SparkCatalog") \ .config("spark.sql.catalog.paimon.metastore","hive") \ .config("spark.sql.catalog.paimon.uri","thrift://ddp1:9083") \ .config("spark.sql.catalog.paimon.warehouse","hdfs://nameservice1/user/paimon/warehouse") \ .config("spark.sql.extensions","org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") \ .getOrCreate()print("="*50)print("Paimon → ClickHouse 导入")print(f"业务日期: {bizdate}")print("="*50) # 读取数据 df = spark.sql(f""" SELECT user_id, event_time, event_type, data FROM paimon.default.ods_user_log WHERE dt ='{bizdate}'""") count = df.count()print(f"[1/2] Paimon 表中数据量: {count}")if count >0:print("[2/2] 写入 ClickHouse...") # 保存为 CSV 到 HDFS output_path = f"/tmp/paimon_export_{bizdate}" df.coalesce(1).write.mode("overwrite").option("header","false").csv(output_path)print(f"✅ 数据已导出到 HDFS: {output_path}")print(f" 请使用 clickhouse-client 导入数据")else:print("⚠️ 没有数据需要导入") spark.stop()

3.hdfs导入ck

#!/bin/bash # 环境变量 CH_CLIENT=/opt/datasophon/clickhouse/bin/clickhouse HOST=ddp3 PORT=9000USER=defaultBIZDATE="2026-03-29" echo "==========================================" echo "Paimon → ClickHouse 数据导入" echo "业务日期: ${BIZDATE}" echo "==========================================" # 1. 下载 CSV 文件 echo "[1/5] 下载 CSV 文件..." hdfs dfs -get -f /tmp/paimon_export_${BIZDATE}/part-*.csv /tmp/paimon_data_${BIZDATE}.csv 2>/dev/nullif[!-f /tmp/paimon_data_${BIZDATE}.csv ]; then echo "❌ CSV 文件不存在" exit 1 fi TOTAL_LINES=$(wc -l </tmp/paimon_data_${BIZDATE}.csv) echo "CSV 文件行数: ${TOTAL_LINES}" # 2. 创建 ClickHouse 表(如果不存在) echo "[2/5] 创建 ClickHouse 表(如果不存在)..." ${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query " CREATETABLEIFNOTEXISTSdefault.user_log( user_id String, event_time String, event_type String, data String )ENGINE=MergeTree()ORDERBY user_id " 2>/dev/null # 3. 删除当前日期的旧数据(避免重复) echo "[3/5] 删除当前日期的旧数据..." ${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query " ALTERTABLEdefault.user_log DELETEWHERE event_time LIKE'${BIZDATE}%' " 2>/dev/null # 4. 导入数据(使用 TSV 格式) echo "[4/5] 导入数据到 ClickHouse..." # 将 CSV 转换为 TSV(用制表符分隔)后导入 cat /tmp/paimon_data_${BIZDATE}.csv | sed 's/,/\t/g'| ${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query "INSERT INTO default.user_log FORMAT TSV" # 5. 验证导入结果 echo "[5/5] 验证导入结果..."COUNT=$(${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query "SELECT COUNT(*) FROM default.user_log WHERE event_time LIKE '${BIZDATE}%'"2>/dev/null)TOTAL=$(${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query "SELECT COUNT(*) FROM default.user_log"2>/dev/null) echo "==========================================" echo "✅ 导入完成!" echo " 本次导入行数: ${TOTAL_LINES}" echo " 当前日期数据量: ${COUNT}" echo " ClickHouse 总数据量: ${TOTAL}" echo "==========================================" # 显示数据样例 if["${TOTAL_LINES}"-gt 0]; then echo "" echo "新增数据样例:" ${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query "SELECT * FROM default.user_log WHERE event_time LIKE '${BIZDATE}%' LIMIT 3"--format PrettyCompact fi # 清理临时文件 rm -f /tmp/paimon_data_${BIZDATE}.csv echo "" echo "==========================================" echo "导入任务完成" echo "=========================================="

如需沟通:lita2lz

Read more

最新更新版本,OpenClaw v2026.4.2 深度解读剖析:Task Flow 重磅回归与安全架构的全面硬化

最新更新版本,OpenClaw v2026.4.2 深度解读剖析:Task Flow 重磅回归与安全架构的全面硬化

文档版本:v1.0 分析基准日期:2026年4月3日 字数统计:约20,000字 分析维度:架构演进、功能解析、安全机制、生态影响、升级指南、未来展望 第一章:版本总览——一次功能与安全并重的里程碑式更新 1.1 发布背景与战略定位 2026年4月3日,OpenClaw 正式发布 v2026.4.2 版本。这并非一次常规的迭代更新,而是在经历了2026年3月一系列架构大手术(v2026.3.7 的 ContextEngine 插件化、v2026.3.31 的核心架构重塑)之后,项目进入**"能力回归与安全硬化"**阶段的关键里程碑。 从版本号演进来看,v2026.4.2

Python + Selenium + AI 智能爬虫:自动识别反爬与数据提取

Python + Selenium + AI 智能爬虫:自动识别反爬与数据提取

结合 Selenium 浏览器自动化与 AI 大模型能力,构建能够自动识别反爬机制、智能解析页面的新一代爬虫系统。 1. 系统架构 验证码 登录墙 正常页面 种子 URL 队列 调度器 Selenium WebDriver 反检测模块 页面渲染 AI 反爬识别 AI 验证码破解 自动登录 AI 数据提取 数据清洗管道 存储 MongoDB / CSV 数据看板 2. 反爬机制分布 35%25%20%10%7%3%常见反爬机制占比(Top 500 网站统计)JS 动态渲染请求频率限制验证码(图形/滑块)User-Agent 检测IP

014、文本到图像生成:CLIP引导与潜在对齐

一、从一次深夜调试说起 上周在复现一个文本到图像的生成实验时,遇到了一个典型问题:模型生成的图像看起来“还行”,但总感觉和输入文本差了那么点意思。比如输入“一只戴着墨镜的柴犬在沙滩上晒太阳”,出来的图像柴犬倒是像,但墨镜时有时无,沙滩背景也经常混入奇怪的植被。损失函数在下降,指标看着也正常,但就是不对劲。 这种“不对劲”往往不是模型结构的问题,而是文本和图像两个模态的“对齐”没做好。今天要聊的CLIP引导和潜在对齐,就是解决这个问题的关键思路。 二、CLIP为什么能成为“翻译官” CLIP(Contrastive Language-Image Pre-training)本身是一个多模态模型,它的训练方式很巧妙:让模型学会判断哪些文本和哪些图像是配对的。它不生成任何东西,只做“匹配判断”。这个特性让它成了文本和图像之间的一个高质量“翻译官”。 在扩散模型中引入CLIP,核心目的是用CLIP的跨模态理解能力,来约束图像生成过程,让生成的图像在语义上更贴近文本描述。这里常见的做法是在扩散过程的采样阶段,用CLIP的文本编码和图像编码计算相似度,作为额外的引导信号。 三、CLI