数据采集工具
大数据架构中,数据采集层负责将各类数据源的数据同步到大数据平台。不同工具适用于不同的数据传输场景:
| 工具 | 定位 | 数据方向 | 状态 |
|---|---|---|---|
| Flume | 日志采集,实时流式传输 | 日志文件 / Kafka → HDFS / HBase | 活跃维护 |
| Sqoop | 关系型数据库 ↔ Hadoop 批量同步 | MySQL / Oracle ↔ HDFS / Hive | 已停止维护(2021) |
| DataX | 异构数据源离线同步,支持多种读写插件 | 任意数据源 → 任意数据源 | 活跃维护(阿里开源) |
| Kettle(PDI) | 可视化 ETL 工具,拖拽式设计流程 | 多数据源互转 | 活跃维护 |
Sqoop 停止维护
Apache Sqoop 已于 2021 年 6 月停止维护,移入 Apache Attic。新项目建议使用 DataX 或 SeaTunnel 替代。
一、Flume
简介
Flume 是 Apache 开源的分布式日志采集系统,专为海量日志数据的收集、聚合和传输设计,支持实时将数据可靠地写入 HDFS、HBase、Kafka 等。
核心架构
KafkaSource(从 Kafka 拉取数据)
↓
FileChannel(数据暂存到磁盘,保障可靠性)
↓
HDFSSink(写入 HDFS)零点漂移问题与拦截器
什么是零点漂移?
日志文件原始时间戳经过多个组件(Kafka → Flume)传输后,Kafka Source header 中的时间戳与业务数据时间戳不一致,导致 HDFS 按时间分区存储时出现数据"漂移"到错误分区。
解决方案:通过 Flume 拦截器(Interceptor)将 header 中的 timestamp 替换为日志中的业务时间戳。
拦截器 Maven 依赖:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>拦截器实现:
package com.xupengboo.gmall.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.*;
public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);
try {
JSONObject jsonObject = JSONObject.parseObject(log);
// 将 header 时间戳替换为日志中的业务时间戳,解决零点漂移
String ts = jsonObject.getString("ts");
headers.put("timestamp", ts);
return event;
} catch (Exception e) {
// 非 JSON 格式数据返回 null,后续统一过滤
return null;
}
}
@Override
public List<Event> intercept(List<Event> list) {
// 过滤掉非 JSON 格式的无效数据
Iterator<Event> iterator = list.iterator();
while (iterator.hasNext()) {
if (intercept(iterator.next()) == null) {
iterator.remove();
}
}
return list;
}
@Override
public void close() {}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() { return new TimestampInterceptor(); }
@Override
public void configure(Context context) {}
}
}常见问题
打包后运行时若报 ClassNotFoundException: fastjson,需手动将 fastjson-x.x.x.jar 复制到 /opt/module/flume/lib/ 目录下。
打包后将 Jar 放入 Flume lib 目录:
cp target/flume-interceptor.jar /opt/module/flume/lib/二、Sqoop
已停止维护
Sqoop 于 2021 年停止维护,仅用于维护老项目,新项目请使用 DataX。
简介
Sqoop(SQL-to-Hadoop)专门用于在关系型数据库(MySQL、Oracle)和 Hadoop 生态(HDFS、Hive、HBase)之间批量传输数据。
部署
# 下载 Sqoop 1.4.7(对应 Hadoop 3.2.4)
curl -O -L https://archive.apache.org/dist/sqoop/1.4.7/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
tar -zxvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz -C /opt/
mv /opt/sqoop-1.4.7.bin__hadoop-2.6.0 /opt/sqoop
# 配置环境变量
vi /etc/profile
export SQOOP_HOME=/opt/sqoop
export PATH=$PATH:$SQOOP_HOME/bin# 配置 Hadoop 路径
cp $SQOOP_HOME/conf/sqoop-env-template.sh $SQOOP_HOME/conf/sqoop-env.sh
vi $SQOOP_HOME/conf/sqoop-env.sh
export HADOOP_COMMON_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=/usr/local/hadoop# 添加数据库 JDBC 驱动
cd $SQOOP_HOME/lib/
wget https://jdbc.postgresql.org/download/postgresql-42.7.3.jar常用命令
# 测试连接并查询
sqoop eval \
--driver org.postgresql.Driver \
--connect jdbc:postgresql://192.168.10.66:5432/postgres \
--username root \
--password "${DB_PASSWORD}" \
--query "SELECT * FROM table_name LIMIT 1"
# 将 PostgreSQL 表导入 HDFS
export HADOOP_USER_NAME=hadoop # 指定 HDFS 操作用户
sqoop import \
--driver org.postgresql.Driver \
--connect jdbc:postgresql://192.168.10.66:5432/postgres \
--username root \
--password "${DB_PASSWORD}" \
--table your_table \
--target-dir /user/sqoop/output \
--delete-target-dir \
-m 1 # 使用 1 个 Mapper,适合小表常见报错:StringUtils 找不到
# 报错:java.lang.NoClassDefFoundError: org/apache/commons/lang/StringUtils
# 解决:添加 commons-lang 2.x 版本(3.x 不兼容)
cd $SQOOP_HOME/lib
wget https://repo1.maven.org/maven2/commons-lang/commons-lang/2.6/commons-lang-2.6.jar工作原理
① Sqoop 根据表结构生成 Java 序列化类
② 编译并打包为可执行 JAR 文件
③ 将 JAR 作为 MapReduce 作业提交给 YARN
④ 每个 Mapper 通过 JDBC 读取数据库数据,写入 HDFS三、DataX
简介
DataX 是阿里巴巴开源的离线数据同步框架,支持几乎所有主流数据源之间的数据传输,通过插件化架构(Reader + Writer)灵活扩展。
开源地址:https://github.com/alibaba/DataX
安装
# 环境要求
java -version # JDK 1.8
python --version # Python 2.7(注意:DataX 使用 Python 2)
mvn --version # Maven 3.x
# 下载并解压
curl -O -L https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
tar -zxvf datax.tar.gz使用
# 自检(验证安装是否正常)
python bin/datax.py job/job.json
# 执行同步作业
python bin/datax.py /path/to/your_job.json生成配置模板
DataX 提供命令快速生成 Reader + Writer 的 JSON 配置模板:
# 查看所有可用插件
ls plugin/reader # 查看所有 Reader 插件
ls plugin/writer # 查看所有 Writer 插件
# 生成从 MySQL 读取写入 HDFS 的配置模板
python bin/datax.py -r mysqlreader -w hdfswriter生成的 JSON 模板作为作业配置的起点,填入实际的数据源参数后即可运行。
四、Kettle(PDI)
简介
Kettle 已正式更名为 Pentaho Data Integration(PDI),是一款成熟的开源 ETL 工具,以可视化拖拽式设计为核心,适合数据集成、数据清洗和数据仓库建设。
GitHub:https://github.com/pentaho/pentaho-kettle
Kettle vs DataX
| Kettle(PDI) | DataX | |
|---|---|---|
| 操作方式 | 可视化 GUI 拖拽 | JSON 配置文件 |
| 学习曲线 | 低(无需编码) | 中(需要了解 JSON 配置) |
| 扩展性 | 插件丰富,支持复杂 ETL 逻辑 | 插件化 Reader/Writer |
| 适用场景 | 数据清洗、复杂转换 | 高效批量同步 |
Docker Compose 部署
# docker-compose.yml 示例(参考官方文档获取最新配置)
version: '3'
services:
pdi:
image: hiromuhota/webspoon:latest
ports:
- "8080:8080"
volumes:
- ./data:/datadocker compose up -d
# 访问 http://localhost:8080