使用Apache Kudu和Impala实现存储分层 原

当为应用程序的数据选择一个存储系统时,我们通常会选择一个最适合我们业务场景的存储系统。对于快速更新和实时分析工作较多的场景,我们可能希望使用 Apache Kudu ,但是对于低成本的大规模可伸缩性场景,我们可能希望使用 HDFS 。因此,需要一种解决方案使我们能够利用多个存储系统的最佳特性。本文介绍了如何使用 Apache Impala 的滑动窗口模式,操作存储在 Apache KuduApache HDFS 中的数据,使用此模式,我们可以以对用户透明的方式获得多个存储层的所有优点。

Apache Kudu 旨在快速分析、快速变化的数据。 Kudu 提供快速插入/更新和高效列扫描的组合,以在单个存储层上实现多个实时分析工作负载。因此, Kudu 非常适合作为存储需要实时查询的数据的仓库。此外, Kudu 支持实时更新和删除行,以支持延迟到达的数据和数据更正。

Apache HDFS 旨在以低成本实现无限的可扩展性。它针对数据不可变的面向批处理的场景进行了优化,与 Apache Parquet 文件格式配合使用时,可以以极高的吞吐量和效率访问结构化数据。

对于数据小且不断变化的情况,如维度表,通常将所有数据保存在 Kudu 中。当数据符合 Kudu扩展限制 并且可以从 Kudu 的特性中受益时,在 Kudu 中保留大表是很常见的。如果数据量大,面向批处理且不太可能发生变化,则首选使用 Parquet 格式将数据存储在 HDFS 中。当我们需要利用两个存储层的优点时,滑动窗口模式是一个有用的解决方案。

滑动窗口模式

在此模式中,我们使用 Impala 创建匹配的 Kudu 表和 Parquet 格式的 HDFS 表。根据 KuduHDFS 表之间数据移动的频率,这些表按时间单位分区,通常使用每日、每月或每年分区。然后创建一个统一视图,并使用 WHERE 子句定义边界,该边界分隔从 Kudu 表中读取的数据以及从 HDFS 表中读取的数据。定义的边界很重要,这样我们就可以在 KuduHDFS 之间移动数据,而不会将重复的记录暴露给视图。移动数据后,可以使用原子的 ALTER VIEW 语句向前移动边界。

注意:此模式最适用于组织到范围分区( range partitions )中的某些顺序数据,因为在此情况下,按时间滑动窗口和删除分区操作会非常有效。

该模式实现滑动时间窗口,其中可变数据存储在 Kudu 中,不可变数据以 HDFS 上的 Parquet 格式存储。通过 Impala 操作 KuduHDFS 来利用两种存储系统的优势:

  • 流数据可立即查询 (Streaming data is immediately queryable)
  • 可以对更晚到达的数据或手动更正进行更新 (Updates for late arriving data or manual corrections can be made)
  • 存储在 HDFS 中的数据具有最佳大小,可提高性能并防止出现小文件 (Data stored in HDFS is optimally sized increasing performance and preventing small files)
  • 降低成本 (Reduced cost)

Impala 还支持 S3ADLS 等云存储方式。此功能允许方便地访问远程管理的存储系统,可从任何位置访问,并与各种基于云的服务集成。由于这些数据是远程的,因此针对 S3 数据的查询性能较差,使得 S3 适合于保存仅偶尔查询的“冷”数据。通过创建第三个匹配表并向统一视图添加另一个边界,可以扩展此模式以将冷数据保存在云存储系统中。

注意:为简单起见,下面的示例中仅说明了 KuduHDFS

将数据从 Kudu 移动到 HDFS 的过程分为两个阶段。第一阶段是数据移动,第二阶段是元数据更改,最后定义一些定期自动运行的数据任务来辅助我们维护滑动窗口。

在第一阶段,将当前不可变数据从 Kudu 复制到 HDFS 。即使数据从 Kudu 复制到 HDFS ,视图中定义的边界也会阻止向用户显示重复数据。此步骤可以包括根据需要进行的任何验证和重试,以确保数据卸载 (data offload) 成功。

在第二阶段,现在数据被安全地复制到 HDFS ,需要更改元数据以对分区进行调整。这包括向前移动边界,为下一个时段添加新的 Kudu 分区,以及删除旧的 Kudu 分区。

实现步骤

为了实现滑动窗口模式,需要一些 Impala 基础,下面介绍实现滑动窗口模式的基本步骤。

移动数据

只要我们使用每种存储格式定义匹配表,就可以通过 Impala 在存储系统之间移动数据。为简洁起见,未描述创建 Impala 表时可用的所有选项,可以参考 ImpalaCREATE TABLE文档 来查找创建 KuduHDFS 和云存储表的正确语法。下面列出了一些示例,其中包括滑动窗口模式。

创建表后,移动数据就像 INSERT ... SELECT 语句一样简单:

INSERT INTO table_foo SELECT * FROM table_bar;

SELECT 语句的所有功能都可用于选择要移动的特定数据。

注意:如果将数据移动到 Kudu ,可以使用 UPSERT INTO 语句来处理重复键。

统一查询

Impala 中查询来自多个表和数据源的数据也很简单。为简洁起见,未描述创建 Impala 视图时可用的所有选项,可以参考 ImpalaCREATE VIEW文档

创建统一查询的视图就像使用两个 SELECT 子句和 UNION ALLCREATE VIEW 语句一样简单:

CREATE VIEW foo_view AS
SELECT col1, col2, col3 FROM foo_parquet
UNION ALL
SELECT col1, col2, col3 FROM foo_kudu;

警告:确保使用 UNION ALL 而不是 UNIONUNION 关键字本身与 UNION DISTINCT 相同,可能会对性能产生重大影响,可以在 Impala UNION文档 中找到更多信息。

SELECT 语句的所有功能都可用于公开每个基础表中的正确数据和列,使用 WHERE 子句传递和下推任何需要特殊处理或转换的谓词非常重要。下面将在滑动窗口模式的讨论中进行更多示例。

此外,可以通过 ALTER VIEW 语句更改视图,当与 SELECT 语句结合使用时,这很有用,因为它可以用于原子地更新视图正在访问的数据。

示例

下面是使用滑动窗口模式来操作具有三个月活动可变的月度周期数据的实现示例,超过三个月的数据将使用 Parquet 格式卸载到 HDFS

创建Kudu表

首先,创建一个 Kudu 表,该表将保存三个月的活动可变数据。该表由时间列分区,每个范围包含一个数据周期。拥有与时间周期匹配的分区很重要,因为删除 Kudu 分区比通过 DELETE 子句删除数据更有效。该表还由另一个键列进行散列分区,以确保所有数据都不会写入单个分区。

注意:模式设计 (schema design) 应根据我们的数据和读/写性能考虑因素而有所不同。此示例模式仅用于演示目的,而不是“最佳”模式。有关选择模式的更多指导,请参考 Kudu模式设计文档(schema design documentation) 。例如,如果数据输入速率较低,则可能不需要任何散列分区,如果数据输入速率非常高,则可能需要更多散列桶。

CREATE TABLE my_table_kudu
(  
  name STRING,
  time TIMESTAMP,
  message STRING,
  PRIMARY KEY(name, time)
)
PARTITION BY
   HASH(name) PARTITIONS 4,
   RANGE(time) (
      PARTITION '2018-01-01' <= VALUES < '2018-02-01', --January
      PARTITION '2018-02-01' <= VALUES < '2018-03-01', --February
      PARTITION '2018-03-01' <= VALUES < '2018-04-01', --March
      PARTITION '2018-04-01' <= VALUES < '2018-05-01'  --April
)
STORED AS KUDU;

注意:有一个额外的月分区( 2018-04-01至2018-05-01 )可以为数据提供一个时间缓冲区,以便将数据移动到不可变表中。

创建HDFS表

创建 Parquet 格式的 HDFS 表,该表将保存较旧的不可变数据。此表按年、月和日进行分区,以便进行有效访问,即使我们无法按时间列本身进行分区,这将在下面的视图步骤中进一步讨论。有关更多详细信息,请参考 Impala的分区文档

CREATE TABLE my_table_parquet
(  
  name STRING,
  time TIMESTAMP,
  message STRING
)
PARTITIONED BY (year int, month int, day int)
STORED AS PARQUET;

创建统一视图

现在创建统一视图,用于无缝地查询所有数据:

CREATE VIEW my_table_view AS
SELECT name, time, message
FROM my_table_kudu
WHERE time >= "2018-01-01"
UNION ALL
SELECT name, time, message
FROM my_table_parquet
WHERE time < "2018-01-01"
AND year = year(time)
AND month = month(time)
AND day = day(time);

每个 SELECT 子句都明确列出要公开的所有列,这可确保不会公开 Parquet 表所特有的年、月和日列。如果需要,它还允许处理任何必要的列或类型映射。

应用于 my_table_kudumy_table_parquet 的初始 WHERE 子句定义了 KuduHDFS 之间的边界,以确保在卸载数据的过程中不会读取重复数据。

应用于 my_table_parquet 的附加 AND 子句用于确保单个年、月和日列的良好谓词下推 (good predicate pushdown)

警告:如前所述,请务必使用 UNION ALL 而不是 UNIONUNION 关键字本身与 UNION DISTINCT 相同,可能会对性能产生重大影响,可以在 Impala UNION文档 中找到更多信息。

创建定时任务

现在已创建基表和视图,接着创建定时任务以维护滑动窗口,下面定时任务中使用的 SQL 文件可以接收从脚本和调度工具传递的变量。

创建 window_data_move.sql 文件以将数据从最旧的分区移动到 HDFS

INSERT INTO ${var:hdfs_table} PARTITION (year, month, day)
SELECT *, year(time), month(time), day(time)
FROM ${var:kudu_table}
WHERE time >= add_months("${var:new_boundary_time}", -1)
AND time < "${var:new_boundary_time}";
COMPUTE INCREMENTAL STATS ${var:hdfs_table};

注意: COMPUTE INCREMENTAL STATS 子句不是必需的,但可帮助我们对 Impala 查询进行优化。

要运行 SQL 语句,请使用 Impala shell 并传递所需的变量,示例如下:

impala-shell -i <impalad:port> -f window_data_move.sql
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"

注意:可以调整 WHERE 子句以匹配给定的数据周期和卸载的粒度,这里, add_months 函数的参数为 -1 ,用于从新的边界时间移动过去一个月的数据。

创建 window_view_alter.sql 文件以通过更改统一视图来调整时间边界:

ALTER VIEW ${var:view_name} AS
SELECT name, time, message
FROM ${var:kudu_table}
WHERE time >= "${var:new_boundary_time}"
UNION ALL
SELECT name, time, message
FROM ${var:hdfs_table}
WHERE time < "${var:new_boundary_time}"
AND year = year(time)
AND month = month(time)
AND day = day(time);

要运行 SQL 语句,请使用 Impala shell 并传递所需的变量,示例如下:

impala-shell -i <impalad:port> -f window_view_alter.sql
--var=view_name=my_table_view
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"

创建 window_partition_shift.sql 文件以调整 Kudu 分区:

ALTER TABLE ${var:kudu_table}

ADD RANGE PARTITION add_months("${var:new_boundary_time}", 
${var:window_length}) <= VALUES < add_months("${var:new_boundary_time}", 
${var:window_length} + 1);

ALTER TABLE ${var:kudu_table}  

DROP RANGE PARTITION add_months("${var:new_boundary_time}", -1) 
<= VALUES < "${var:new_boundary_time}";

要运行 SQL 语句,请使用 Impala shell 并传递所需的变量,示例如下:

impala-shell -i <impalad:port> -f window_partition_shift.sql
--var=kudu_table=my_table_kudu
--var=new_boundary_time="2018-02-01"
--var=window_length=3

注意:应该定期在 Kudu 表上运行 COMPUTE STATS ,以确保 Impala 的查询性能最佳。

试验

现在我们已经创建了表、视图和脚本来使用滑动窗口模式,可以通过插入不同时间范围的数据并运行脚本来向前移动窗口来进行试验。

将一些示例值插入 Kudu 表:

INSERT INTO my_table_kudu VALUES
('joey', '2018-01-01', 'hello'),
('ross', '2018-02-01', 'goodbye'),
('rachel', '2018-03-01', 'hi');

在每个表/视图中显示数据:

SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;

将1月数据移动到 HDFS

impala-shell -i <impalad:port> -f window_data_move.sql
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"

确认数据在两个位置,但在视图中不重复:

SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;

改变视图将时间边界向前移至2月:

impala-shell -i <impalad:port> -f window_view_alter.sql
--var=view_name=my_table_view
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"

确认数据仍在两个位置,但在视图中不重复:

SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;

调整 Kudu 分区:

impala-shell -i <impalad:port> -f window_partition_shift.sql
--var=kudu_table=my_table_kudu
--var=new_boundary_time="2018-02-01"
--var=window_length=3

确认1月数据现在仅在 HDFS 中:

SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;

使用 ImpalaEXPLAIN 语句确认谓词下推:

EXPLAIN SELECT * FROM my_table_view;
EXPLAIN SELECT * FROM my_table_view WHERE time < "2018-02-01";
EXPLAIN SELECT * FROM my_table_view WHERE time > "2018-02-01";

explain 输出中,我们应该看到“ kudu 谓词”,其中包括“ SCAN KUDU ”部分中的时间列过滤器和“谓词”,其中包括“ SCAN HDFS ”部分中的时间、日、月和年列。

编译自: Transparent Hierarchical Storage Management with Apache Kudu and Impala

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章