Flink on Zeppelin (2) - Batch 篇

在  Flink on Zeppelin 入门篇   中我们讲述了如何配置 Zeppelin + Flink 来运行一个最简单的 WordCount 例子。 本文将讲述如何使用 Flink SQL + UDF 来做 Batch ETL 和 BI 数据分析的任务。

Flink Interpreter 类型

首先介绍下 Zeppelin 中的 Flink Interpreter 类型。 Zeppelin 的 Flink Interpreter 支持 Flink 的所有 API (DataSet, DataStream, Table API )。 语言方面支持 Scala,Python,SQL。 下图是 Zeppelin 中支持的不同场景下的 Flink Interpreter。

Name

Class

Description

%flink

FlinkInterpreter

Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment

%flink.pyflink

PyFlinkInterpreter

Provides a python environment

%flink.ipyflink

IPyFlinkInterpreter

Provides an ipython environment

%flink.ssql

FlinkStreamSqlInterpreter

Provides a stream sql environment

%flink.bsql

FlinkBatchSqlInterpreter

Provides a batch sql environment

配置 Flink Interpreter

下图例举了所有重要的 Flink 配置信息,除此之外你还可以配置任意 Flink 的Configuration。(https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html)

Property

Default

Description

FLINK_HOME

Flink 的安装目录

HADOOP_CONF_DIR

Hadoop 配置信息目录

HIVE_CONF_DIR

Hive 配置信息目录

flink.execution.mode

local

Execution mode of flink, e.g. local | yarn | remote

flink.execution.remote.host

jobmanager hostname if it is remote mode

flink.execution.remote.port

jobmanager port if it is remote mode

flink.jm.memory

1024

Total number of memory(mb) of JobManager

flink.tm.memory

1024

Total number of memory(mb) of TaskManager

flink.yarn.appName

Zeppelin Flink Session

Yarn app name‍

flink.yarn.queue

queue name of yarn app

flink.execution.jars

additional user jars (comma separated)‍‍‍

flink.execution.packages

additional user packages (comma separated), e.g. org.apache.flink:flink-connector-kafka_2.11:1.10,org.apache.flink:flink-connector-kafka-base_2.11:1.10,org.apache.flink:flink-json:1.10

zeppelin.pyflink.python

python

python binary executable for PyFlink

table.exec.resource.default-parallelism

1

Default parallelism for flink sql job

zeppelin.flink.scala.color

true

whether display scala shell output in colorful format

zeppelin.flink.enableHive

false

whether enable hive

zeppelin.flink.printREPLOutput

true

Print REPL output

zeppelin.flink.maxResult

1

max number of rows returned by sql interpreter

‍‍‍

StreamExecutionEnvironment, ExecutionEnvironment, StreamTableEnvironment, BatchTableEnvironment

Flink Interpreter (%flink) 为用户自动创建了下面 6 个变量作为 Flink Scala 程序的入口。

  • senv  (StreamExecutionEnvironment),

  • benv  (ExecutionEnvironment)

  • stenv  (StreamTableEnvironment for blink planner)

  • btenv  (BatchTableEnvironment for blink planner)

  • stenv_2  (StreamTableEnvironment for flink planner)

  • btenv_2  (BatchTableEnvironment for flink planner)

PyFlinkInterpreter (%flink.pyflink, %flink.ipyflink) 为用户自动创建了 6 个 Python 变量作为 PyFlink 程序的入口

  • s_env (StreamExecutionEnvironment),

  • b_env (ExecutionEnvironment)

  • st_env  (StreamTableEnvironment for blink planner)

  • bt_env  (BatchTableEnvironment for blink planner)

  • st_env_2  (StreamTableEnvironment for flink planner)

  • bt_env_2  (BatchTableEnvironment for flink planner)

Blink/Flink Planner

Flink 1.10 中有 2 种 table api 的 planner: flink & blink。

  • 如果你用 DataSet API 以及需要把 DataSet 转换成 Table,那么就需要使用 Flink planner 的 TableEnvironment (btenv_2 and stenv_2)。

  • 其他场景下, 我们都会建议用户使用 blink planner。这也是 Flink SQL 使用的 planner(%flink.bsql & %flink.ssql)。

使用 Flink Batch SQL

%flink.bsql 是用来执行 Flink 的 batch sql. 运行 help 命令可以得到所有可用的命令。

总的来说,Flink Batch SQL 可以用来做 2 大任务:

  • 使用 insert into 语句来做 Batch ETL

  • 使用 select 语句来做 BI 数据分析

基于 Bank 数据的 Batch ETL

下面我们基于 Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做 Batch ETL 任务。 首先用 Flink SQL 创建一个 raw 数据的 source table,以及清洗干净后的 sink table。

然后再定义 Table Function 来 parse raw data。

接下来就可以用 insert into 语句来进行数据转换(source table --> sink table)。

用 select 语句来 Preview 最终数据,验证 insert into 语句的正确性。

基于 Bank 数据的 BI 数据分析

经过上面的数据清洗工作,接下来就可以对数据进行分析了。 用户不仅可以使用标准的 SQL Select 语句进行分析,也可以使用 Zeppelin 的 dynamic forms 来增加交互性(TextBox,Select,Checkbox)。

  • 使用 Flink UDF

SQL 虽然强大,但表达能力毕竟有限。 有时候就要借助于 UDF 来表达更复杂的逻辑。 Flink Interpreter 支持 2 种 UDF (Scala + Python)。 下面是 2 个简单的例子。

  • Scala UDF

%flink


class ScalaUpper extends ScalarFunction {

def eval(str: String) = str.toUpperCase

}


btenv.registerFunction("scala_upper", new ScalaUpper()

  • Python UDF

%flink.pyflink


class PythonUpper(ScalarFunction):

def eval(self, s):

return s.upper()


bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))

创建完 UDF 之后,你就可以在 SQL 里使用了。

对 Hive 数据的数据分析

除了可以分析 Flink SQL 创建的 table 之外,Flink 也可以分析 Hive 上已有的 table。 如果要让 Flink Interpreter 使用 Hive,那么需要做以下配置:

  • 设置 zeppelin.flink.enableHive 为 true

  • Copy 下面这些 dependencies 到 Flink 的 lib 目录

    • flink-connector-hive_{scala_version}-{flink.version}.jar

    • flink-hadoop-compatibility_{scala_version}-{flink.version}.jar

    • flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar

    • hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)

  • 在 Flink interpreter setting 里或者 zeppelin-env.sh 里指定 HIVE_CONF_DIR

  • 在 Flink interpreter setting 指定 zeppelin.flink.hive.version 为你使用的 Hive 版本

下面就用一个简单的例子展示如何在 Zeppelin 中用 Flink 查询 Hive table。

1. 用 Zeppelin 的 jdbc interpreter 查询 Hive tables

2. 用 Flink SQL 查询 Hive table 的 schema

3. 用 Flink SQL 查询 Hive table

更多 Flink SQL 资料

本文只是简单介绍如何在 Zeppelin 中使用 Flink SQL + UDF,关于更多 Flink SQL 和 UDF 请参考 Flink 官方文档:

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html

如果有碰到任何问题,请加入下面这个钉钉群讨论。 后续我们会有更多 Tutorial 的文章,敬请期待。

关注 Flink 中文社区,获取更多技术干货

你也「 在看 」吗?:point_down:

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章