Skip to content

数据采集工具

大数据架构中,数据采集层负责将各类数据源的数据同步到大数据平台。不同工具适用于不同的数据传输场景:

工具定位数据方向状态
Flume日志采集,实时流式传输日志文件 / Kafka → HDFS / HBase活跃维护
Sqoop关系型数据库 ↔ Hadoop 批量同步MySQL / Oracle ↔ HDFS / Hive已停止维护(2021)
DataX异构数据源离线同步,支持多种读写插件任意数据源 → 任意数据源活跃维护(阿里开源)
Kettle(PDI)可视化 ETL 工具,拖拽式设计流程多数据源互转活跃维护

Sqoop 停止维护

Apache Sqoop 已于 2021 年 6 月停止维护,移入 Apache Attic。新项目建议使用 DataXSeaTunnel 替代。


一、Flume

简介

Flume 是 Apache 开源的分布式日志采集系统,专为海量日志数据的收集、聚合和传输设计,支持实时将数据可靠地写入 HDFS、HBase、Kafka 等。

核心架构

KafkaSource(从 Kafka 拉取数据)

FileChannel(数据暂存到磁盘,保障可靠性)

HDFSSink(写入 HDFS)

零点漂移问题与拦截器

什么是零点漂移?

日志文件原始时间戳经过多个组件(Kafka → Flume)传输后,Kafka Source header 中的时间戳与业务数据时间戳不一致,导致 HDFS 按时间分区存储时出现数据"漂移"到错误分区。

解决方案:通过 Flume 拦截器(Interceptor)将 header 中的 timestamp 替换为日志中的业务时间戳。

拦截器 Maven 依赖:

xml
<dependency>
  <groupId>org.apache.flume</groupId>
  <artifactId>flume-ng-core</artifactId>
  <version>1.10.1</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.62</version>
</dependency>

拦截器实现:

java
package com.xupengboo.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.*;

public class TimestampInterceptor implements Interceptor {

    @Override
    public void initialize() {}

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);
        try {
            JSONObject jsonObject = JSONObject.parseObject(log);
            // 将 header 时间戳替换为日志中的业务时间戳,解决零点漂移
            String ts = jsonObject.getString("ts");
            headers.put("timestamp", ts);
            return event;
        } catch (Exception e) {
            // 非 JSON 格式数据返回 null,后续统一过滤
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        // 过滤掉非 JSON 格式的无效数据
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()) {
            if (intercept(iterator.next()) == null) {
                iterator.remove();
            }
        }
        return list;
    }

    @Override
    public void close() {}

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() { return new TimestampInterceptor(); }
        @Override
        public void configure(Context context) {}
    }
}

常见问题

打包后运行时若报 ClassNotFoundException: fastjson,需手动将 fastjson-x.x.x.jar 复制到 /opt/module/flume/lib/ 目录下。

打包后将 Jar 放入 Flume lib 目录:

shell
cp target/flume-interceptor.jar /opt/module/flume/lib/

二、Sqoop

已停止维护

Sqoop 于 2021 年停止维护,仅用于维护老项目,新项目请使用 DataX。

简介

Sqoop(SQL-to-Hadoop)专门用于在关系型数据库(MySQL、Oracle)和 Hadoop 生态(HDFS、Hive、HBase)之间批量传输数据

部署

shell
# 下载 Sqoop 1.4.7(对应 Hadoop 3.2.4)
curl -O -L https://archive.apache.org/dist/sqoop/1.4.7/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
tar -zxvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz -C /opt/
mv /opt/sqoop-1.4.7.bin__hadoop-2.6.0 /opt/sqoop

# 配置环境变量
vi /etc/profile
export SQOOP_HOME=/opt/sqoop
export PATH=$PATH:$SQOOP_HOME/bin
shell
# 配置 Hadoop 路径
cp $SQOOP_HOME/conf/sqoop-env-template.sh $SQOOP_HOME/conf/sqoop-env.sh
vi $SQOOP_HOME/conf/sqoop-env.sh

export HADOOP_COMMON_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=/usr/local/hadoop
shell
# 添加数据库 JDBC 驱动
cd $SQOOP_HOME/lib/
wget https://jdbc.postgresql.org/download/postgresql-42.7.3.jar

常用命令

shell
# 测试连接并查询
sqoop eval \
  --driver org.postgresql.Driver \
  --connect jdbc:postgresql://192.168.10.66:5432/postgres \
  --username root \
  --password "${DB_PASSWORD}" \
  --query "SELECT * FROM table_name LIMIT 1"

# 将 PostgreSQL 表导入 HDFS
export HADOOP_USER_NAME=hadoop   # 指定 HDFS 操作用户
sqoop import \
  --driver org.postgresql.Driver \
  --connect jdbc:postgresql://192.168.10.66:5432/postgres \
  --username root \
  --password "${DB_PASSWORD}" \
  --table your_table \
  --target-dir /user/sqoop/output \
  --delete-target-dir \
  -m 1   # 使用 1 个 Mapper,适合小表

常见报错:StringUtils 找不到

shell
# 报错:java.lang.NoClassDefFoundError: org/apache/commons/lang/StringUtils
# 解决:添加 commons-lang 2.x 版本(3.x 不兼容)
cd $SQOOP_HOME/lib
wget https://repo1.maven.org/maven2/commons-lang/commons-lang/2.6/commons-lang-2.6.jar

工作原理

① Sqoop 根据表结构生成 Java 序列化类
② 编译并打包为可执行 JAR 文件
③ 将 JAR 作为 MapReduce 作业提交给 YARN
④ 每个 Mapper 通过 JDBC 读取数据库数据,写入 HDFS

三、DataX

简介

DataX 是阿里巴巴开源的离线数据同步框架,支持几乎所有主流数据源之间的数据传输,通过插件化架构(Reader + Writer)灵活扩展。

开源地址:https://github.com/alibaba/DataX

安装

shell
# 环境要求
java -version   # JDK 1.8
python --version # Python 2.7(注意:DataX 使用 Python 2)
mvn --version   # Maven 3.x

# 下载并解压
curl -O -L https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
tar -zxvf datax.tar.gz

使用

shell
# 自检(验证安装是否正常)
python bin/datax.py job/job.json

# 执行同步作业
python bin/datax.py /path/to/your_job.json

生成配置模板

DataX 提供命令快速生成 Reader + Writer 的 JSON 配置模板:

shell
# 查看所有可用插件
ls plugin/reader   # 查看所有 Reader 插件
ls plugin/writer   # 查看所有 Writer 插件

# 生成从 MySQL 读取写入 HDFS 的配置模板
python bin/datax.py -r mysqlreader -w hdfswriter

生成的 JSON 模板作为作业配置的起点,填入实际的数据源参数后即可运行。


四、Kettle(PDI)

简介

Kettle 已正式更名为 Pentaho Data Integration(PDI),是一款成熟的开源 ETL 工具,以可视化拖拽式设计为核心,适合数据集成、数据清洗和数据仓库建设。

GitHub:https://github.com/pentaho/pentaho-kettle

Kettle vs DataX

Kettle(PDI)DataX
操作方式可视化 GUI 拖拽JSON 配置文件
学习曲线低(无需编码)中(需要了解 JSON 配置)
扩展性插件丰富,支持复杂 ETL 逻辑插件化 Reader/Writer
适用场景数据清洗、复杂转换高效批量同步

Docker Compose 部署

yaml
# docker-compose.yml 示例(参考官方文档获取最新配置)
version: '3'
services:
  pdi:
    image: hiromuhota/webspoon:latest
    ports:
      - "8080:8080"
    volumes:
      - ./data:/data
shell
docker compose up -d
# 访问 http://localhost:8080