Build a federated query solution with Apache Doris, Apache Flink, and Apache Hudi
Apache Doris
Posted on January 3, 2024
The Multi-Catalog feature of Apache Doris is designed to facilitate integration with external data catalogs, enhancing Doris' capabilities in data lake analytics and federated data queries.
In older versions of Doris, user data is in a two-tiered structure: database and table. Thus, connections to external catalogs could only be done at the database or table level. For example, users could create a mapping to a table in an external catalog via create external table, or to a database via create external database . If there were large amounts of databases or tables in the external catalog, users would need to create mappings to them one by one, which could be a heavy workload.
With the advent of Multi-Catalog, Doris now has a new three-tiered metadata hierarchy (catalog -> database -> table), which means users can connect to external data at the catalog level. The currently supported external catalogs include:
- Apache Hive
- Apache Iceberg
- Apache Hudi
- Elasticsearch
- JDBC
- Apache Paimon(Incubating)
Multi-Catalog works as an additional and enhanced external table connection method. It helps users conduct multi-catalog federated queries quickly.
This tutorial will demonstrate how to build a real-time data lake and warehouse integrated federated query analysis using Flink + Hudi + Doris. This post primarily showcases how to use Doris and Hudi. Additionally, this entire tutorial environment is set up based on a pseudo-distributed environment.
Environment
The demonstration environment for this tutorial is as follows:
- CentOS 7
- Apache Doris 2.0.2
- Hadoop 3.3.3
- Hive 3.1.3
- Flink 1.17.1
- Apache Hudi 0.14
- JDK 1.8.0_311
Installation
1.Download Flink 1.17.1:
Decompress and install: tar zxf flink-1.17.1-bin-scala_2.12.tgz
2.Download dependencies for Flink and Hudi:
wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-planner_2.12/1.17.1/flink-table-planner_2.12-1.17.1.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-sync-bundle/0.14.0/hudi-hive-sync-bundle-0.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.17-bundle/0.14.0/hudi-flink1.17-bundle-0.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.14.0/hudi-hadoop-mr-bundle-0.14.0.jar
Move these dependencies to the flink-1.17.1/lib directory and remove or replace the existing flink-table-planner-loader-1.17.1.jar.
Download these dependencies to the flink-1.17.1/lib
directory and delete or remove the existing flink-table-planner-loader-1.17.1.jar
Create and Write Data to Hudi Table
Start Flink:
bin/start-cluster.sh
Start Flink client
./bin/sql-client.sh embedded shell
Set the result mode to tableau for direct display of results
set sql-client.execution.result-mode=tableau;
Start Hive MetaStore and HiveServer:
nohup ./bin/hive --service hiveserver2 >/dev/null 2>&1 &
nohup ./bin/hive --service metastore >/dev/null 2>&1 &
Start Flink:
Create an Hudi table:
Use Hive MetaStore Service to store metadata of Hudi:
CREATE TABLE table1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://localhost:9000/user/hive/warehouse/demo.db',
'table.type'='COPY_ON_WRITE',
'hive_sync.enable'='true',
'hive_sync.table'='hudi_hive',
'hive_sync.db'='demo',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://192.168.31.54:9083'
);
- 'table.type'='COPY_ON_WRITE', -- The MERGE_ON_READ method will not output to Hive until the Parquet file is generated.
- 'hive_sync.enable'='true', -- Required: Enable Hive synchronization.
- 'hive_sync.table'='${hive_table}', -- Required: Name of the newly created Hive table.
- 'hive_sync.db'='${hive_db}', -- Required: Name of the newly created Hive database.
- 'hive_sync.mode'='hms', -- Required: Set the Hive sync mode to HMS, default is JDBC.
- 'hive_sync.metastore.uris'='thrift://ip:9083' -- Required: Metastore port for Hive synchronization.
Write data to the Hudi table:
INSERT INTO table1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
Query the Hudi table using Flink:
SELECT * FROM TABLE1
You can verify that the data file is present on HDFS, and in the Hive client, you can see the table:
hive> use demo;
OK
Time taken: 0.027 seconds
hive> show tables;
OK
hudi_hive
Doris on Hudi
Doris interacts with Hudi data in a straightforward way. You only need to create a catalog, but don't need to write a complete table creation statement, as was required before. Additionally, when tables or fields are added or removed in the Hudi data source, Doris can automatically detect changes through configuration or manual refresh of the catalog.
Now, let's create a catalog in Doris to access data from external Hudi tables:
CREATE CATALOG hudi PROPERTIES (
'type'='hms',
'hive.metastore.uris' = 'thrift://192.168.31.54:9083'
);
Here, the metadata of Hudi is stored using HMS (Hive MetaStore). During creation, you only need to specify the two pieces of information above. If your HDFS is highly available, you should add NameNode HA information:
'hadoop.username' = 'hive',
'dfs.nameservices'='your-nameservice',
'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
Refer to the Doris documentation for more details.
After successful creation, you can use the highlighted steps below to see Hudi tables.
Execute a query on the Hudi table:
Execute a query on the Hudi table.
Move data from the Hudi table to Doris:
First, create a Doris table:
CREATE TABLE doris_hudi(
uuid VARCHAR(20) ,
name VARCHAR(10),
age INT,
ts datetime(3),
`partition` VARCHAR(20)
)
UNIQUE KEY(`uuid`)
DISTRIBUTED BY HASH(`uuid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);
Then, use the INSERT SELECT statement to migrate data from Hudi to Doris:
insert into doris_hudi select uuid,name,age,ts,partition from hudi.demo.hudi_hive;
Query the Doris table:
mysql> select * from doris_hudi;
+------+---------+------+-------------------------+-----------+
| uuid | name | age | ts | partition |
+------+---------+------+-------------------------+-----------+
| id1 | Danny | 23 | 1970-01-01 08:00:01.000 | par1 |
| id2 | Stephen | 33 | 1970-01-01 08:00:02.000 | par1 |
| id3 | Julian | 53 | 1970-01-01 08:00:03.000 | par2 |
| id4 | Fabian | 31 | 1970-01-01 08:00:04.000 | par2 |
| id5 | Sophia | 18 | 1970-01-01 08:00:05.000 | par3 |
| id6 | Emma | 20 | 1970-01-01 08:00:06.000 | par3 |
| id7 | Bob | 44 | 1970-01-01 08:00:07.000 | par4 |
| id8 | Han | 56 | 1970-01-01 08:00:08.000 | par4 |
+------+---------+------+-------------------------+-----------+
8 rows in set (0.02 sec)
You can also use the CATS method to migrate Hudi data to Doris, where Doris automatically handles table creation:
create table doris_hudi_01
PROPERTIES("replication_num" = "1") as
select uuid,name,age,ts,`partition` from hudi.demo.hudi_hive;
In conclusion, utilizing Doris for unified data lake and warehouse, along with its federated query capabilities, is a simple and efficient experience. Boost your data analysis performance now!
Posted on January 3, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
January 3, 2024