🌺The Begin🌺点点关注,收藏不迷路🌺

01 引言:为什么需要集成Hadoop生态?

在数据湖与数据仓库边界日益模糊的今天,企业的大数据平台往往同时存在多种技术组件。Hadoop生态凭借HDFS的海量存储能力和Hive/Hadoop的数据处理能力,成为数据湖建设的基石;而Apache Doris则以其极速的MPP查询引擎,在实时分析领域占据优势。

将两者打通,实现"湖仓一体"的架构,已经成为大数据平台建设的必然趋势。通过Doris与Hadoop生态的无缝集成,企业可以实现:

  • 冷热数据分层:热数据存储在Doris本地,冷数据存储在HDFS,降低存储成本
  • 查询加速:利用Doris的分布式计算引擎,直接加速Hive数据湖查询
  • 数据联邦:通过Multi-Catalog统一访问Hive、Iceberg、Hudi等多种数据源

本文将系统讲解Doris与Hadoop生态中HDFS、Hive等核心组件的集成方式,涵盖存储扩展、数据湖查询、数据迁移等关键场景,并提供完整的实践指南。

02 集成全景架构

外部存储

Hadoop生态

Doris集群

冷数据存储

冷数据存储

元数据访问

Hive查询

数据写入

Broker Load

FE 前端节点

BE1 数据节点

BE2 数据节点

HDFS
分布式文件系统

Hive Metastore
元数据服务

Hive Server
数据仓库

Spark
计算引擎

S3/OSS/OSS
对象存储

集成层次说明

集成方式 核心功能 适用场景
存储层集成 冷热分离、远程存储 降低存储成本,海量历史数据归档
元数据层集成 Hive Catalog、外部表 统一元数据访问,查询加速
计算层集成 Spark on Doris、数据导入导出 数据加工、ETL流程
数据迁移 Broker Load、Insert Into 历史数据迁移,增量同步

03 HDFS集成:存储扩展与冷热分离

Doris支持将HDFS作为远程存储,实现数据冷热分离。热数据存储在Doris本地(SSD),冷数据自动迁移到HDFS(HDD),从而降低存储成本。

03.1 创建HDFS资源

-- 创建HDFS RESOURCE
CREATE RESOURCE "remote_hdfs"
PROPERTIES (
    "type" = "hdfs",
    "fs.defaultFS" = "hdfs://namenode_host:8020",
    "hadoop.username" = "doris_user",
    "hadoop.password" = "password",
    "root_path" = "/doris/cold_data"
);

HDFS HA模式配置

CREATE RESOURCE "remote_hdfs_ha"
PROPERTIES (
    "type" = "hdfs",
    "fs.defaultFS" = "hdfs://my_ha",
    "dfs.nameservices" = "my_ha",
    "dfs.ha.namenodes.my_ha" = "nn1,nn2",
    "dfs.namenode.rpc-address.my_ha.nn1" = "nn1_host:8020",
    "dfs.namenode.rpc-address.my_ha.nn2" = "nn2_host:8020",
    "dfs.client.failover.proxy.provider.my_ha" = 
        "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
    "hadoop.username" = "doris_user"
);

03.2 创建存储策略

-- 创建存储策略:数据在本地保留1天后迁移到HDFS
CREATE STORAGE POLICY hdfs_cold_policy
PROPERTIES (
    "storage_resource" = "remote_hdfs",
    "cooldown_ttl" = "1d"
);

参数说明

  • storage_resource:关联的HDFS资源名称
  • cooldown_ttl:数据在本地保留的时间,超过后自动迁移到HDFS

03.3 建表时指定存储策略

-- 新建表时指定存储策略
CREATE TABLE orders
(
    order_id BIGINT,
    order_date DATE,
    amount DECIMAL(18,2)
)
DUPLICATE KEY(order_id)
DISTRIBUTED BY HASH(order_id) BUCKETS 32
PROPERTIES (
    "storage_policy" = "hdfs_cold_policy"
);

03.4 存量表迁移

-- 为已存在的表设置远程存储
ALTER TABLE existing_table SET ("storage_policy" = "hdfs_cold_policy");

-- 为特定分区设置远程存储
ALTER TABLE partitioned_table 
MODIFY PARTITION (p20240101) SET ("storage_policy" = "hdfs_cold_policy");

03.5 配置与优化

BE配置参数

参数 默认值 说明
cold_data_compaction_thread_num 2 远程存储Compaction并发线程数
cold_data_compaction_interval_sec 1800 Compaction执行间隔(秒)
remove_unused_remote_files_interval_sec 21600 垃圾回收间隔(秒)

查询性能优化:Doris为远程存储提供了本地缓存机制,首次查询时从HDFS拉取数据并缓存到BE本地磁盘,后续查询直接命中缓存。

# be.conf配置缓存
# 缓存路径(多个路径用分号分隔)
cache_path = /data/doris/cache
# 缓存最大容量
cache_size = 100GB

03.6 限制说明

  • 使用了远程存储的表不支持备份
  • Unique模型开启Merge-on-Write时,不支持设置远程存储
  • 存储策略设置后无法取消
  • 冷却时间修改只对未迁移数据生效,已迁移数据不会回迁

04 Hive集成:联邦查询与数据湖加速

通过Hive Catalog,Doris可以自动获取Hive Metastore中的元数据,实现对Hive表、Iceberg表、Hudi表的直接查询,无需数据搬迁。

04.1 Hive Catalog架构

Hive生态

Doris

读取元数据

读取数据文件

读取数据文件

Doris FE

Doris BE

Hive Metastore

HDFS

S3

04.2 创建Hive Catalog

基础配置(Hive Metastore)

CREATE CATALOG hive_catalog PROPERTIES (
    'type' = 'hms',
    'hive.metastore.uris' = 'thrift://metastore_host:9083'
);

带HDFS认证的配置

CREATE CATALOG hive_catalog PROPERTIES (
    'type' = 'hms',
    'hive.metastore.uris' = 'thrift://metastore_host:9083',
    'hadoop.username' = 'hdfs_user',
    'dfs.nameservices' = 'my_ha',
    'dfs.ha.namenodes.my_ha' = 'nn1,nn2',
    'dfs.namenode.rpc-address.my_ha.nn1' = 'nn1_host:8020',
    'dfs.namenode.rpc-address.my_ha.nn2' = 'nn2_host:8020',
    'dfs.client.failover.proxy.provider.my_ha' = 
        'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);

04.3 支持的Hive版本与格式

Hive版本:支持Hive 1.x、2.x、3.x、4.x

支持的数据格式:Parquet、ORC、TextFile、Avro、SequenceFile

支持的事务表:Full-ACID Transactional Table(Hive 3.x+)、Insert-Only Transactional Table

04.4 查询Hive数据

-- 切换到Hive Catalog
SWITCH hive_catalog;

-- 查看数据库
SHOW DATABASES;

-- 查询Hive表
SELECT * FROM hive_db.sales_table 
WHERE dt = '2024-01-01' 
LIMIT 10;

-- 使用全限定名查询
SELECT COUNT(*) FROM hive_catalog.hive_db.user_table;

04.5 查询Hive分区信息

-- 方式1:SHOW PARTITIONS
SHOW PARTITIONS FROM hive_catalog.hive_db.sales_table;

-- 方式2:使用元数据表(2.1.7+)
SELECT * FROM hive_catalog.hive_db.sales_table$partitions;

04.6 数据写回:将Doris数据写入Hive

Doris支持将查询结果写回Hive表,实现数据加工和ETL闭环。

-- 将Doris内表数据写入Hive
INSERT INTO hive_catalog.hive_db.result_table
SELECT order_date, SUM(amount) as daily_amount
FROM doris_db.orders
GROUP BY order_date;

写回配置要求:需要在Catalog属性中配置fs.defaultFS参数:

CREATE CATALOG hive_catalog PROPERTIES (
    'type' = 'hms',
    'hive.metastore.uris' = 'thrift://metastore_host:9083',
    'fs.defaultFS' = 'hdfs://namenode:8020'  -- 写回时必须配置
);

04.7 类型映射

Hive类型 Doris类型 说明
boolean boolean
tinyint/smallint/int/bigint tinyint/smallint/int/bigint
date date
timestamp datetime(6) 固定映射到微秒精度
float/double float/double
decimal(P,S) decimal(P,S) 未指定精度时默认为decimal(9,0)
char(N)/varchar(N) char(N)/varchar(N)
string string
array/map/struct array/map/struct

05 数据导入:从HDFS批量加载数据

Broker Load是Doris从HDFS批量导入大规模数据的核心方式。

05.1 Broker Load原理

HDFS BE FE 客户端 HDFS BE FE 客户端 提交LOAD语句 解析并生成计划 分发导入任务 拉取数据分片 解析并写入 汇报完成状态 返回结果

05.2 从HDFS导入数据

LOAD LABEL demo_db.order_load
(
    DATA INFILE("hdfs://namenode:8020/data/orders/*.parquet")
    INTO TABLE orders
    FORMAT AS "parquet"
    (order_id, user_id, amount, order_date)
)
WITH BROKER
(
    "username" = "hdfs_user",
    "password" = ""
)
PROPERTIES
(
    "timeout" = "3600",
    "max_filter_ratio" = "0.1"
);

Kerberos认证配置

LOAD LABEL demo_db.kerberos_load
(
    DATA INFILE("hdfs://namenode:8020/data/orders/*.csv")
    INTO TABLE orders
    COLUMNS TERMINATED BY ","
)
WITH BROKER
(
    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "doris/hadoop@HADOOP.COM",
    "kerberos_keytab" = "/path/to/doris.keytab"
);

06 Spark集成:计算引擎协同

06.1 Spark on Doris

Doris可以作为Spark SQL的数据源,让Spark直接对Doris中的数据进行复杂的批处理分析。

// Spark读取Doris数据
val df = spark.read.format("doris")
  .option("doris.table.identifier", "db.table")
  .option("doris.fenodes", "fe_host:8030")
  .option("user", "root")
  .option("password", "")
  .load()

df.createOrReplaceTempView("doris_table")
spark.sql("SELECT city, COUNT(*) FROM doris_table GROUP BY city").show()

06.2 Spark写入Doris

// Spark写入Doris
df.write.format("doris")
  .option("doris.table.identifier", "db.target_table")
  .option("doris.fenodes", "fe_host:8030")
  .option("user", "root")
  .option("password", "")
  .mode("append")
  .save()

06.3 数据迁移场景

从Hive/Hadoop迁移数据到Doris的推荐路径:

Spark读取

Doris Connector写入

Broker Load

Catalog查询

INSERT INTO SELECT

Hive表

Spark DataFrame

Doris表

HDFS文件

Hive表

Doris

07 实践案例:统一数据服务平台

某科技公司基于Doris与Hadoop生态构建了统一数据服务平台,实现以下收益:

07.1 架构演进

原架构痛点

  • 系统复杂:Hive + ClickHouse + StarRocks + TiDB等多组件
  • 数据冗余:多组件间数据同步,存储成本高
  • 维护困难:需要多种技能栈

新架构:以Doris为核心,集存储、加工、服务为一体的统一架构

应用层

Doris统一平台

数据源

HDFS冷数据

Kafka实时数据

MySQL业务数据

Broker Load
导入HDFS

Routine Load
导入Kafka

Catalog
联邦查询

内部表
存储与计算

数据API

BI报表

07.2 数据分层设计

分层 模型 说明
ODS层 Duplicate 明细模型,保留完整变更链路
DWD/DM/APP层 Unique 主键模型,支持数据更新

07.3 集成收益

  • 存储成本下降60%:从130+TB降至60TB
  • 计算效率提升10倍:Doris较Hive数仓计算耗时大幅降低
  • 架构简化:从多组件缩减到以Doris为核心

08 总结

Doris与Hadoop生态的集成提供了多层次、多维度的能力:

集成方式 核心功能 适用场景
HDFS冷热分离 远程存储、冷数据迁移 海量历史数据归档,降低存储成本
Hive Catalog 元数据统一访问、数据湖查询 联邦查询、查询加速、数据写回
Broker Load 批量数据导入 历史数据迁移、离线ETL
Spark集成 计算引擎协同 复杂ETL、机器学习预处理

核心要点回顾

  1. 冷热分离降成本:通过HDFS远程存储策略,热数据本地、冷数据HDFS,存储成本下降60%以上
  2. Hive Catalog统一元数据:无需搬迁数据,Doris可直接查询Hive/Iceberg/Hudi表,实现湖仓一体
  3. Broker Load高效导入:支持Parquet/ORC/CSV等多种格式,可并行拉取HDFS数据
  4. 计算引擎协同:Spark可读写Doris,形成完整的数据加工链路
  5. 类型映射需注意:Hive的timestamp映射为datetime(6),特殊类型需测试验证

通过合理的集成架构设计,企业可以充分发挥Doris的实时分析能力与Hadoop生态的海量存储能力,构建高性价比、高性能的统一数据服务平台。

在这里插入图片描述


🌺The End🌺点点关注,收藏不迷路🌺
Logo

脑启社区是一个专注类脑智能领域的开发者社区。欢迎加入社区,共建类脑智能生态。社区为开发者提供了丰富的开源类脑工具软件、类脑算法模型及数据集、类脑知识库、类脑技术培训课程以及类脑应用案例等资源。

更多推荐