状态!状态!

在 Flink 的官网中,我们可以看到 Flink 社区对于 Flink 的定义是 Apache Flink® — Stateful Computations over Data Streams, 一个在数据流之上的有状态计算引擎。在正式开工之前,我们需要先对于状态有一个很初步很初步的了解。 Flink 官网上对于 State 有非常多专业的解释,我今天先初步在本文里面先谈谈简单的一些理解,感兴趣的朋友呢可以去读读 Flink 官方的解释。

在 Flink 中,任务是使用有向无环图进行描述的,每个图的节点实际上是一个操作函数或算子(Operator),而这些函数和算子在 Flink 处理单一的元素和事件时会存储数据(官网使用的是 remember 记住),因此我们可以说 Flink 的算子和函数是有状态的(Stateful)。状态在Flink中非常重要,我会在后面单独谈谈 Flink 的状态。

何为流

我们在学习程序设计的过程中,会接触到各种各样的流,文件流,网络流,Java 的流 API 等等等等。在 Flink 的世界里,数据可以形成事件流,日志记录,传感器数据,用户的行为等等都可以形成数据流。Flink 官网 中把流分为两类:有界流和无界流。

Flink 官网的解释图

简单来说,无界流有流的起点但是没有终点,数据源源不断地到达,因此我们也需要持续地处理数据。而有界流是有流的结束点的,且数据可以在全部到达后进行处理,因此我们可以把有界流理解为批处理(Batch)。与真正的批处理有所不同的是,Flink是有状态的,每个状态中的数据可能会随着数据的不断到达而发生变化。值得注意的是,由于无界流会源源不断地到达,一般我们会使用时间的特征作为无界流的处理顺序。

搭建 Maven 项目

首先创建一个 Maven 项目,并添加以下关键的依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.15</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>

编写 Word Count Job

完成 Maven 后我们需要编写 Word Count 的 任务代码。和普通的 Java 程序一样,Flink 会找到并执行 main 方法中所定义的处理逻辑。首先我们需要在 main 方法的第一句就使用 getExecutionEnvironment() 方法获得程序运行所需要的流环境

1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

然后我们需要向流处理的环境注册一个数据源,以源源不断地读入字符串:

1
DataStream<String> source = env.addSource(new WordCountSource()).name("SteveJobs-Word");

在获取到数据源后,我们需要对数据进行处理。首先是需要使用 flatMap 配合分词工具对输入的字符串进行切分,得到多个二元组。这些二元组的第一个元素是所得的单词,而第二个元素是 1 。为什么是 1 呢,因为我们会根据单词进行分组,每个单词都会拥有一个独立的 状态 ,每个状态都会存储当前处理过的数据信息。在这个样例中,flink 会对每个单词的状态中所有元组的第二个元素进行求和操作。由于第二个元素都是1,可知其求和的结果等于单词的个数。

1
2
3
4
DataStream<Tuple2<String,Integer>> proc = source.flatMap(new TextTokenizer())
.keyBy(0)
.sum(1)
.name("Process-Word-Count");

在 Flink 中,一切的数据都需要输出到 Sink 中。Sink 可以输出数据到数据库,到 Elasticsearch 中,到 HDFS 中,或是到 Kafka MQ 中。因此,我们需要指定 Flink 的计算结果输出位置。如在本样例中,我们会把数据输出到

1
2
proc.addSink(new WordCountSink())
.name("Word-Count-Sink");

Flink 的任务需要在本地打包,并上传到服务器加载后才可以在 Flink 集群上运行。因此我们需要在 main 方法的末尾,指定任务的名称,并调用环境的 execute 方法启动任务。

1
env.execute("Word Count");

最终,我们可以得到 WordCountJob 的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class WordCountJob  {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> source = env.addSource(new WordCountSource())
.name("SteveJobs-Word");

DataStream<Tuple2<String,Integer>> proc = source.flatMap(new TextTokenizer())
.keyBy(0)
.sum(1)
.name("Process-Word-Count");

proc.addSink(new WordCountSink())
.name("Word-Count-Sink");

env.execute("Word Count");
}

}

数据的 Source 和 Sink

在这个项目中,我们先做一个有界流的试验。在 Flink 中,Source 和 Sink 是数据输入和数据输出的算子,即数据通过 Source 算子输入, 通过 Sink 算子输出。根据刚刚所介绍的,有界流实质上是指批处理,所处理的数据是有限的。因此我们的输入数据源部分我们实现了简单的遍历器数据源 (JavaDoc),直接上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class WordCountSource extends FromIteratorFunction<String> implements Serializable {

private static final long serialVersionUID = 0L;

public WordCountSource() {
super(new WordIterator());
}

private static class WordIterator implements Iterator<String>,Serializable{

private static final long serialVersionUID = 4L;

private int index = 0;
private int length = -1;

private WordIterator(){
length = StaticWordData.STEVE_JOBS_WORD.length;
}

@Override
public boolean hasNext() {
return (index < length);
}

@Override
public String next() {
return StaticWordData.STEVE_JOBS_WORD[index ++];
}
}

}

Sink 作为输出,我们参照了 Flink 官方 Code Walkthroughs 的写法,直接输出到日志中。 具体写法可以参考 Sink 的 JavaDoc ,以下是我的写法:

1
2
3
4
5
6
7
8
9
10
public class WordCountSink implements SinkFunction<Tuple2<String,Integer>> {

private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(WordCountSink.class);

@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
logger.info("{ Word: \""+ value.f0 + "\", Cnt:" + value.f1 +"}");
}
}

测试

在编写好代码以后,使用 Maven 的指令进行打包:

1
$ mvn clean package

我在之前的更新中介绍了如何搭建单机版 Flink , 传送门 。 我们把单机版 Flink 启动起来,然后选择 Submit New Job ,点击 Add new 通过网页版上传刚刚打包好的包。

上传好的效果图

我们可以点击 Show Plan 查看 我们在程序中定义的执行图。

执行图

点击 Submit,即可提交任务到 Flink 并执行。我们可以在 Task Manager 中,选择并查看当前执行任务的 Task Manager 的 Log。即可看到我们在 Sink 中输出的日志:

Log

可以看到,单词的统计会随着流数据的输入而不断增长。

每次更新啰里啰嗦的话

我提供了本实验的源代码,请在我的 Github 获取:ousheobin/flink-word-count

欢迎大家在上面折腾各种玩法,比如实现无界的数据源,或者把你的 word count 结果输出到 Kafka MQ。