异构数据迁移工具:DataX、DataX-Web

异构数据迁移工具:DataX、DataX-Web

异构数据迁移工具:DataX、DataX-Web


一、DataX + DataX-Web 简介:

1. DataX 核心特性

DataX 是阿里开源的 基础数据迁移引擎(纯命令行工具,无界面),核心功能是跨数据源同步数据。

  • 架构:通过 “Reader(读数据插件)+ Writer(写数据插件)” 实现跨数据源(MySQL、Oracle、HDFS 等)数据搬运;
  • 局限性:本身不自带分表规则逻辑,需配合脚本预处理或自定义插件实现按分表规则拆分数据;
  • 优势:轻量、开源免费、跨数据源兼容性强,适合中小规模数据迁移。

2. DataX-Web 核心作用

DataX 是阿里开源的 基础数据迁移引擎(纯命令行工具,无界面),核心功能是跨数据源同步数据。

  • 核心功能:可视化配置迁移任务、定时调度(如每日增量同步)、迁移进度监控、日志查询与异常告警;
  • 依赖关系:必须与 DataX 引擎配合使用(部署时需关联 DataX 安装路径,无法独立工作);
  • 优势:降低操作门槛,支持多任务管理,适合非技术人员或批量任务场景。

二、DataX

1. 下载 (两种方式)

方式二:gitcode 下载:https://gitcode.com/gh_mirrors/da/DataX
下滑动到 Quick Start 章节

在这里插入图片描述

方式一:github 下载:https://github.com/alibaba/DataX
下滑动到 Quick Start 章节

在这里插入图片描述

2. 设置 支持 python3(可选)

原因:从github上下载的版本只支持python2.x版本

如果你的python环境是python3.0以上的话,
请到 https://github.com/WeiYe-Jing/datax-web/tree/master/doc/datax-web/datax-python3
下载对应的三个.py文件代替datax文件夹中bin目录下的三个.py文件即可。

在这里插入图片描述

3. 测试安装是否成功

cd的安装位置的bin目录下执行

//查看模板 python datax.py -r streamreader -w streamwriter 
在这里插入图片描述

2、进入windows中cmd命令环境中:
解压后,可直接运行样例,查看是否安装成功:

在这里插入图片描述

1、将datax压缩包解压在安装目录:

在这里插入图片描述

4. 简单的入门测试(mysql案例)

点击一下这个可以找到相关数据库的json脚本格式 进行参考

在这里插入图片描述


我这边就 把同一个库下面的 user表 里面的数据 写到 user_0 表里面,相关 json 如下:(我命名为job001.json)

{"job":{"setting":{"speed":{"channel":2}, "errorLimit":{"record":5, "percentage":0.03}}, "content":[{"reader":{"name":"mysqlreader", "parameter":{"username":"root", "password":"root", "column":["id", "username", "password_hash", "role_id", "real_name", "email", "phone", "deleted_at", "created_at", "updated_at"], "connection":[{"table":["users"], "jdbcUrl":["jdbc:mysql://localhost:3306/student_management?characterEncoding=utf8&useSSL=false"]}]}}, "writer":{"name":"mysqlwriter", "parameter":{"username":"root", "password":"root", "column":["id", "username", "password_hash", "role_id", "real_name", "email", "phone", "deleted_at", "created_at", "updated_at"], "connection":[{"table":["users_0"], "jdbcUrl":"jdbc:mysql://localhost:3306/student_management?characterEncoding=utf8&useSSL=false"}]}}}]}}

放到 datax\job 目录下,然后 到 datax\bin 下 打开 cmd 窗口,执行

python datax.py ../job/job001.json 
在这里插入图片描述


成功 复制过去了

在这里插入图片描述

三、DataX-Web

https://github.com/WeiYe-Jing/datax-web/releases
https://github.com/alibaba/DataX/releases

1. 下载源码

git clone https://github.com/WeiYe-Jing/datax-web.git 

2. 创建数据库

执行bin/db下面的datax_web.sql文件
先自己创建一个datax_web的数据库,再把这个脚本执行一下

在这里插入图片描述

3. 修改项目配置

1.修改datax_admin下resources/application.yml文件(按需调整)

修改datax-admin项目中的配置文件application.yml,其配置文件中不少配置项是通过配置环境变量获取的,我这里直接定义在配置文件中了。需要重点调整的就是data.path目录,数据库地址、账号密码、服务端口等
server: port: 8080# port: ${server.port} spring: #数据源 datasource: username: root password: root url: jdbc:mysql://localhost:3306/datax_web?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8 # password: ${DB_PASSWORD:password}# username: ${DB_USERNAME:username}# url: jdbc:mysql://${DB_HOST:127.0.0.1}:${DB_PORT:3306}/${DB_DATABASE:dataxweb}?serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useSSL=false&nullNamePatternMatchesAll=true&useUnicode=true&characterEncoding=UTF-8 driver-class-name: com.mysql.jdbc.Driver hikari: ## 最小空闲连接数量 minimum-idle: 5## 空闲连接存活最大时间,默认600000(10分钟) idle-timeout: 180000## 连接池最大连接数,默认是10 maximum-pool-size: 10## 数据库连接超时时间,默认30秒,即30000 connection-timeout: 30000 connection-test-query: SELECT 1##此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟 max-lifetime: 1800000# datax-web email mail: host: smtp.qq.com port: 25 username: [email protected] password: xxx # username: ${mail.username}# password: ${mail.password} properties: mail: smtp: auth: true starttls: enable: true required: true socketFactory: class: javax.net.ssl.SSLSocketFactory management: health: mail: enabled: false server: servlet: context-path: /actuator mybatis-plus: # mapper.xml文件扫描 mapper-locations: classpath*:/mybatis-mapper/*Mapper.xml # 实体扫描,多个package用逗号或者分号分隔#typeAliasesPackage: com.yibo.essyncclient.*.entity global-config: # 数据库相关配置 db-config: # 主键类型 AUTO:"数据库ID自增", INPUT:"用户输入ID", ID_WORKER:"全局唯一ID (数字类型唯一ID)", UUID:"全局唯一ID UUID"; id-type: AUTO # 字段策略 IGNORED:"忽略判断",NOT_NULL:"非 NULL 判断"),NOT_EMPTY:"非空判断" field-strategy: NOT_NULL # 驼峰下划线转换 column-underline: true# 逻辑删除 logic-delete-value: 0 logic-not-delete-value: 1# 数据库类型 db-type: mysql banner: false# mybatis原生配置 configuration: map-underscore-to-camel-case: true cache-enabled: false call-setters-on-nulls: true jdbc-type-for-null: 'null' type-handlers-package: com.wugui.datax.admin.core.handler # 配置mybatis-plus打印sql日志 logging: level: com.wugui.datax.admin.mapper: info path: ./data/applogs/admin # level:# com.wugui.datax.admin.mapper: error# path: ${data.path}/applogs/admin#datax-job, access token datax: job: accessToken: #i18n (default empty as chinese, "en" as english) i18n: ## triggerpool max size triggerpool: fast: max: 200 slow: max: 100### log retention days logretentiondays: 30 datasource: aes: key: AD42F6697B035B75 

2.修改datax_executor下resources/application.yml文件(按需调整)

# web port server: # port: ${server.port} port: 8081# log config logging: config: classpath:logback.xml # path: ${data.path}/applogs/executor/jobhandler path: ./data/applogs/executor/jobhandler datax: job: admin: ### datax admin address list, such as "http://address" or "http://address01,http://address02" addresses: http://127.0.0.1:8080 # addresses: http://127.0.0.1:${datax.admin.port} executor: appname: datax-executor ip: #port: 9999 port: ${executor.port:9999}### job log path logpath: ./data/applogs/executor/jobhandler # logpath: ${data.path}/applogs/executor/jobhandler### job log retention days logretentiondays: 30### job, access token accessToken: executor: jsonpath: D:\\temp\\executor\\json\\# jsonpath: ${json.path} pypath: D:\Develop\MigraTools\datax\bin\datax.py # pypath: ${python.path}
  • admin.addresses datax_admin部署地址,如调度中心集群部署存在多个地址则用逗号分隔,执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";
  • executor.appname 执行器AppName,每个执行器机器集群的唯一标示,执行器心跳注册分组依据;
  • executor.ip 默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 “执行器注册” 和 “调度中心请求并触发任务”;
  • executor.port 执行器Server端口号,默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
  • executor.logpath 执行器运行日志文件存储磁盘路径,需要对该路径拥有读写权限;
  • executor.logretentiondays 执行器日志文件保存天数,过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
  • executor.jsonpath datax json临时文件保存路径
  • pypath DataX启动脚本地址,例如:xxx/datax/bin/datax.py(这个路径是上面搭建 dataX 已经创建好的启动脚本)
    如果系统配置DataX环境变量(DATAX_HOME),logpath、jsonpath、pypath可不配,log文件和临时json存放在环境变量路径下。

4. 启动项目

  • 1.运行datax_admin下 DataXAdminApplication

2.运行datax_executor下 DataXExecutorApplication

在这里插入图片描述


admin启动成功后日志会输出三个地址,两个接口文档地址,一个前端页面地址

在这里插入图片描述

5. 启动成功

启动成功后打开页面(默认管理员用户名:admin 密码:123456)
http://127.0.0.1:8080/index.html

在这里插入图片描述

6. 实战

1. 项目管理-添加项目

2. 配置数据源

注意:驱动和数据库版本一定要一致!!!

DataX-Admin 的 MySQL 驱动默认是 用的 5.1.49 , DataX 的 mysqlreader 和 mysqlwriter 的 mysql 驱动插件 默认也是 5.1.49
而我的mysql数据库是8.0.44版本的,所以导致定时任务一直是在连接中,没有往下跑
所以我将 DataX-Admin 的 MySQL 驱动 和 DataX 的 mysqlreader 和 mysqlwriter 的 mysql 驱动插件 都换成了 8.0.33版本的
DataX-Admin 具体操作如下

DataX 的 mysqlreader 和 mysqlwriter 的 操作如下
旧的mysql驱动版本我删除掉了
mysql8.0.33这个jar的下载地址为https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33

在这里插入图片描述

DataX-Admin 具体操作如下

在这里插入图片描述


在这里插入图片描述
3. 创建执行器
在这里插入图片描述
4. 创建DataX任务模板
在这里插入图片描述
5. 构建任务生成同步 json
在这里插入图片描述


在这里插入图片描述


在这里插入图片描述
  • 再点击下一步

点击选择模板

在这里插入图片描述


以下情况才是表示选中成功

在这里插入图片描述

点击构建

在这里插入图片描述
6. 任务管理
在这里插入图片描述

数据库的情况

在这里插入图片描述

点击执行一次看看效果

在这里插入图片描述


在这里插入图片描述

四、报错

1.在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数

原来的如下:

"setting":{"speed":{"byte":1048576,"channel":3}}

删除 speed.byte 配置,仅保留 channel 数(由 DataX 自动适配):

"setting":{"speed":{"channel":3}}

2.windows下乱码修复

cmd窗口下,输入指令:chcp 65001

3.报winutils缺失请下载

在这里插入图片描述

下载后的路径

在这里插入图片描述
在这里插入图片描述

五、参考文章

【1】window10本地运行datax与datax-web

Read more

深入解析 Spark 数据读取与 Hive 数据来源:构建高效数据处理链路

深入解析 Spark 数据读取与 Hive 数据来源:构建高效数据处理链路

在大数据技术生态中,Spark 作为核心计算引擎,Hive 作为数据仓库工具,二者协同支撑着海量数据的处理与存储工作。其中,Spark 的数据读取能力直接决定了计算效率的起点,Hive 的数据来源则影响着数据仓库的完整性与可用性。本文将系统梳理 Spark Core、Spark SQL 的数据读取方式,以及 Hive 中数据的主要来源,为大数据从业者构建高效数据处理链路提供参考。 一、Spark Core:底层数据读取的多元化实现         Spark Core 作为 Spark 生态的基础组件,依托SparkContext提供的 API,实现了对多种数据源的读取支持,其设计注重底层灵活性与兼容性,能够适配不同格式、不同存储位置的数据读取需求。 (一)本地集合:内存级数据快速加载         当数据规模较小时,可直接将本地集合转换为弹性分布式数据集(RDD),实现内存级别的快速读取与计算。Spark Core 提供了parallelize()和makeRDD()两种核心方法: * parallelize()方法支持将数组、

By Ne0inhk
一文说清楚Hive中常用的聚合函数[collect_list]

一文说清楚Hive中常用的聚合函数[collect_list]

collect_list(col)是Hive中常用的聚合函数,用于将分组内的某列值(col)收集到一个数组中。它的核心作用是将多行数据合并为单行的数组结构,常用于数据重组或复杂分析场景。以下是详细说明和示例: 一、函数特点 1. 分组聚合:需配合GROUP BY使用,将每个分组内的col值收集为数组。 2. 保留重复值:与collect_set(col)不同,collect_list不会去重,保留所有原始值(包括重复值)。 3. 顺序不确定:默认不保证数组内元素的顺序(除非配合窗口函数ORDER BY)。 二、典型应用场景 1. 用户行为序列分析:将用户的多次操作按时间串联为行为路径。 2. 数据结构转换:将行式存储的数据转为列式(如将多行商品标签转为单个商品的标签数组)。 3. 复杂统计:计算每个分组内的所有值的列表(如收集每个班级的所有学生成绩)。 三、示例演示 场景1:用户订单列表收集 需求:

By Ne0inhk
【DBeaver】连接带kerberos的hive[Apache|HDP]

【DBeaver】连接带kerberos的hive[Apache|HDP]

目录 一、安装配置Kerberos客户端环境 1.1 安装Kerberos客户端 1.2 环境配置 二、基于Cloudera驱动创建连接 三、基于Hive原生驱动创建连接 一、安装配置Kerberos客户端环境 1.1 安装Kerberos客户端 在Kerberos官网下载,地址如下:https://web.mit.edu/kerberos/dist/index.html 安装过程就是下一步 ,下一步那种。 1.2 环境配置 配置C:\ProgramData\MIT\Kerberos5\krb5.ini文件,将KDC Server服务器上/etc/krb5.conf文件中的部分内容,拷贝到krb5.ini中,如果直接将krb5.conf文件更名为krb5.ini并替换krb5.ini,

By Ne0inhk