【架构实战】ETL架构演进:从批处理到实时流处理

【架构实战】ETL架构演进:从批处理到实时流处理

一、ETL概述

ETL(Extract-Extract-Transform-Load)是数据仓库的核心环节:

数据源 → Extract → Transform → Load → 数据仓库 ↓ 数据清洗 数据转换 数据聚合 

传统ETL的问题:

  • 批量处理,延迟高(T+1甚至更久)
  • 处理时间长,资源占用峰值高
  • 难以处理实时需求
  • 错误难以追溯

二、批处理ETL

1. Sqoop(数据库↔HDFS)

# 全量导入 sqoop import\--connect jdbc:mysql://mysql:3306/order_db \--username root \--password password \--table orders \ --target-dir /data/orders \ --delete-target-dir \ --num-mappers 4\ --fields-terminated-by ','# 增量导入 sqoop import\--connect jdbc:mysql://mysql:3306/order_db \--username root \--password password \--table orders \ --target-dir /data/orders \--incremental append \ --check-column order_id \ --last-value 10000\ --num-mappers 2

2. Spark Batch ETL

from pyspark.sql import SparkSession from pyspark.sql.functions import col, avg,sum, count spark = SparkSession.builder \ .appName("DailyOrderETL") \ .getOrCreate()# 读取数据 orders_df = spark.read \ .format("jdbc") \ .option("url","jdbc:mysql://mysql:3306/order_db") \ .option("dbtable","orders") \ .option("user","root") \ .option("password","password").load() order_items_df = spark.read \ .format("jdbc") \ .option("url","jdbc:mysql://mysql:3306/order_db") \ .option("dbtable","order_items") \ .option("user","root") \ .option("password","password").load()# 数据转换 daily_stats = orders_df \ .filter(col("order_date")=="2024-01-15") \ .join(order_items_df, orders_df.order_id == order_items_df.order_id) \ .groupBy("order_date","shop_id") \ .agg( count("orders.order_id").alias("order_count"),sum("order_items.amount").alias("total_amount"), avg("order_items.amount").alias("avg_amount"))# 写入数据仓库 daily_stats.write \ .format("parquet") \ .mode("overwrite") \ .partitionBy("order_date") \ .saveAsTable("warehouse.daily_shop_stats")

3. 数据清洗

# 数据清洗处理defclean_data(df):# 去除重复 df = df.dropDuplicates()# 处理空值 df = df.fillna({"phone":"UNKNOWN","address":"UNKNOWN"})# 数据标准化 df = df.withColumn("phone", regexp_replace(col("phone"),"[^0-9]","")) df = df.withColumn("order_time", to_timestamp(col("order_time"),"yyyy-MM-dd HH:mm:ss"))# 异常值处理 df = df.filter((col("amount")>0)&(col("amount")<1000000))return df 

三、实时流处理ETL

1. Flink实时ETL

流处理优势:

  • 延迟低(秒级甚至毫秒级)
  • 资源使用更均匀
  • 支持实时监控和告警
  • 错误影响范围小

WordCount示例:

publicclassFlinkWordCount{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2);// 读取数据源DataStream<String> text = env.socketTextStream("localhost",9999);// 转换和聚合DataStream<WordCount> counts = text .flatMap((line, out)->{for(String word : line.split("\\s")){ out.collect(newWordCount(word,1));}}).keyBy("word").window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("count");// 输出 counts.print(); env.execute("WordCount");}}

2. 实时数据同步

publicclassKafkaToHiveETL{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000);// 每分钟检查点 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);KafkaSource<String> source =KafkaSource.<String>builder().setBootstrapServers("kafka:9092").setGroupId("order-etl-consumer").setTopics("orders").setValueOnlyDeserializer(newSimpleStringSchema()).setStartingOffsets(OffsetsInitializer.committedOffsets()).build();DataStream<String> stream = env.fromSource( source,WatermarkStrategy.noWatermarks(),"Kafka Source");// 解析JSONDataStream<Order> orders = stream .map(json ->JSON.parseObject(json,Order.class)).filter(order -> order.getAmount()>0);// 写入HiveFlinkHiveConnector.writeToHive(orders,"warehouse.orders"); env.execute("KafkaToHiveETL");}}

3. 实时聚合

publicclassRealTimeShopStats{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 读取KafkaDataStream<Order> orders = env.addSource(newFlinkKafkaConsumer<>("orders",newOrderDeserializationSchema(),KafkaConfig.getProperties()));// 实时统计DataStream<ShopStats> stats = orders .keyBy(Order::getShopId).window(SlidingEventTimeWindows.of(Time.minutes(5),Time.minutes(1))).aggregate(newShopStatsAggregator());// 输出到Kafka(供下游消费) stats.addSink(newFlinkKafkaProducer<>("shop-realtime-stats",newShopStatsSerializationSchema(),KafkaConfig.getProperties()));// 输出到Redis(供仪表盘查询) stats.addSink(newRedisSink<>(redisConfig)); env.execute("RealTimeShopStats");}}// 聚合器publicclassShopStatsAggregatorimplementsAggregateFunction<Order,ShopStatsAccumulator,ShopStats>{@OverridepublicShopStatsAccumulatoradd(Order value,ShopStatsAccumulator acc){ acc.orderCount++; acc.totalAmount += value.getAmount(); acc.maxAmount =Math.max(acc.maxAmount, value.getAmount());return acc;}@OverridepublicShopStatsgetResult(ShopStatsAccumulator acc){returnnewShopStats( acc.getShopId(), acc.orderCount, acc.totalAmount, acc.totalAmount / acc.orderCount, acc.maxAmount );}}

四、Lambda架构

1. 架构设计

实时层(Speed Layer) │ └──► 流处理(Flink/Spark Streaming) │ └──► 实时结果(Redis) 批处理层(Batch Layer) 服务层(Serving Layer) │ │ └──► 批量计算(Hive/Spark)──┼──► 合并结果 │ ┌─────────────┴─────────────┐ │ │ 查询时合并 全量历史数据 

2. 实现示例

# 批处理层 - 计算全量指标defbatch_layer():# 计算每日商家统计 daily_stats = spark.sql(""" SELECT shop_id, COUNT(*) as order_count, SUM(amount) as total_amount, MAX(amount) as max_amount FROM orders WHERE order_date >= '2023-01-01' GROUP BY shop_id, order_date """) daily_stats.write \ .format("delta") \ .mode("overwrite") \ .partitionBy("order_date") \ .saveAsTable("warehouse.shop_daily_stats_batch")# 实时层 - 计算增量指标defspeed_layer():# 计算最近5分钟的统计 stats = orders \ .keyBy("shop_id") \ .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1))) \ .aggregate(ShopStatsAggregator()) stats.write \ .format("delta") \ .mode("append") \ .saveAsTable("warehouse.shop_realtime_stats")# 服务层 - 合并查询defserving_layer_query(shop_id):# 合并批处理和实时结果 result = spark.sql(f""" SELECT a.shop_id, a.total_amount + COALESCE(b.realtime_amount, 0) as total_amount, a.order_count + COALESCE(b.realtime_count, 0) as order_count FROM ( SELECT shop_id, total_amount, order_count FROM warehouse.shop_daily_stats_batch WHERE shop_id = {shop_id} AND order_date = CURRENT_DATE - 1 ) a LEFT JOIN ( SELECT shop_id, SUM(total_amount) as realtime_amount, SUM(order_count) as realtime_count FROM warehouse.shop_realtime_stats WHERE shop_id = {shop_id} GROUP BY shop_id ) b ON a.shop_id = b.shop_id """)return result.collect()[0]

五、Kappa架构

1. 架构设计

只使用流处理,抛弃批处理层 数据源 → Kafka → Flink流处理 → 结果存储 │ ├──► 实时计算 │ └──► 全量历史重计算(Replay) 

2. 全量历史重计算

publicclassKappaETL{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 1. 从Kafka读取历史数据(全量重放)DataStream<Order> historicalOrders = env .readFile(newParquetInputFormat<>(Path.fromPathString("/data/orders/parquet")),"/data/orders/parquet").map(line ->JSON.parseObject(line,Order.class));// 2. 从Kafka读取实时数据DataStream<Order> realtimeOrders = env.addSource(newFlinkKafkaConsumer<>("orders",newOrderDeserializationSchema(),KafkaConfig.getProperties()));// 3. 合并数据源DataStream<Order> allOrders = historicalOrders .union(realtimeOrders);// 4. 流处理计算DataStream<ShopStats> stats = allOrders .keyBy(Order::getShopId).window(SlidingEventTimeWindows.of(Time.minutes(5),Time.minutes(1))).aggregate(newShopStatsAggregator());// 5. 输出结果 stats.addSink(newRedisSink<>(redisConfig)); env.execute("KappaETL");}}

六、ETL工具对比

工具类型优点缺点适用场景
Sqoop批处理简单易用功能单一数据库↔HDFS
DataX批处理跨数据源无流处理离线同步
Flink流处理功能强大复杂度高实时ETL
Spark Streaming流处理与Spark集成延迟较高近实时
Kafka Streams流处理轻量级功能有限简单流处理

七、总结

ETL架构经历了从批处理到实时流处理的演进:

  • 批处理:T+1延迟,适合历史分析
  • 实时处理:秒级延迟,适合实时监控
  • Lambda:结合两者,但复杂度高
  • Kappa:简化架构,统一使用流处理

选型建议:

  1. 离线分析:Spark批处理
  2. 实时监控:Flink流处理
  3. 复杂场景:Lambda或Kappa架构

个人观点,仅供参考

Read more

iterm2-snazzy主题自定义教程:如何根据个人喜好调整终端色彩

iterm2-snazzy主题自定义教程:如何根据个人喜好调整终端色彩 【免费下载链接】iterm2-snazzyElegant iTerm2 theme with bright colors 项目地址: https://gitcode.com/gh_mirrors/it/iterm2-snazzy iterm2-snazzy是一款拥有明亮色彩的优雅iTerm2主题,能让你的终端界面更加美观舒适。本教程将带你了解如何安装该主题并根据个人喜好调整终端色彩,打造专属于你的个性化终端体验。 一、快速安装iterm2-snazzy主题 1.1 克隆项目仓库 首先,打开终端,执行以下命令克隆项目仓库: git clone https://gitcode.com/gh_mirrors/it/iterm2-snazzy 1.2 导入主题文件 进入克隆好的项目目录,找到Snazzy.itermcolors文件。打开iTerm2,依次点击iTerm2->Preferences->Profiles-&

AI 直接生成前端代码:我的软件原型设计流,从此告别重复画图

AI 直接生成前端代码:我的软件原型设计流,从此告别重复画图

近年来,AI 辅助开发越来越成熟,尤其是在快速原型设计方面。今天分享一下我如何借助 Cursor、Trace solo、ChatGPT、Qoder 等 AI 工具,高效完成软件原型的自动绘制与代码生成。 📌 核心流程三步走 1️⃣ 用 AI 输出需求文档(非技术描述) 首先,我会让 AI 根据产品思路或功能描述,生成一份清晰、无技术细节的需求文档。这一步不写代码,只聚焦逻辑与用户流程。 2️⃣ AI 生成 HTML 原型代码 基于上一步的需求文档,直接让 AI 生成对应的 HTML 代码,快速搭建出可交互的前端原型。支持实时预览,直观看到界面效果。 3️⃣ 反复微调,直至满意 生成的原型往往需要多次调整。通过自然语言描述修改方向,AI 可快速迭代代码,直至达到想要的交互与视觉效果。

前端可访问性:别让你的网站对某些人关闭大门

前端可访问性:别让你的网站对某些人关闭大门 毒舌时刻 这网站做的跟迷宫似的,正常人都找不到路,更别说有障碍的人了。 各位前端同行,咱们今天聊聊前端可访问性。别告诉我你还在忽略可访问性,那感觉就像在公共建筑里不建无障碍通道——能进,但不是所有人都能进。 为什么你需要关注可访问性 最近看到一个项目,按钮没有焦点状态,表单没有标签,屏幕阅读器根本无法正常工作。我就想问:你是在做网站还是在做密室逃脱? 反面教材 // 反面教材:忽略可访问性 function App() { return ( <div> <h1>我的网站</h1> <div> <input type="text" placeholder="用户名" /> <

飞书 lark-cli 深度解读:当办公软件遇上 AI Agent

飞书 lark-cli 深度解读:当办公软件遇上 AI Agent

飞书 lark-cli 深度解读:当办公软件遇上 AI Agent 2026年3月,飞书开源了官方命令行工具 lark-cli。这不是一个普通的 CLI,而是面向 AI Agent 时代的企业级基础设施。本文将从架构、设计理念、实战应用三个维度,全面解读这个项目的创新之处。 一、为什么2026年大家都在做CLI? 过去四十年,软件界面的进化方向一直是 CLI → GUI:从黑底白字的命令行,到图形化界面,让普通人也能用上电脑。 但2026年,方向反过来了。飞书、Google、Stripe、ElevenLabs、网易云音乐,一众看起来毫不相关的公司,不约而同在做同一件事:发布CLI工具。 新的用户来了 这个新用户叫 Agent。 Agent的本质是"文字进、文字出"的智能体。GUI是给眼睛看的,Agent没有眼睛;CLI是纯文字的,