# 基于 Flink Native Kubernetes 的词频统计实验
# 1 简介
# 1.1 实验环境
本实验主要使用 Ubuntu 20.04 64 位作为系统环境,采用 3 台 4 核 8GB 云服务器作为 Kubernetes 集群部署机器,1 台 4 核 8GB 云服务器作为集群管理工具 Kuboard Spary 部署机器,并作为 NFS Server 部署机器。使用的软件如下:
名称 | 版本 |
---|---|
kuboard spary | v1.2.3-amd64 |
kubernetes | v1.25.4 |
calico | v3.23.3 |
etcd | v3.5.5 |
crictl | v1.25.0 |
crun | 1.4.5 |
krew | v0.4.3 |
runc | v1.1.4 |
cni | v1.1.1 |
nerdctl | 1.0.0 |
coredns | v1.8.6 |
dnsautoscaler | 1.8.5 |
pod_infra | 3.7 |
flink | 1.16.0 |
hadoop | 3.2.3 |
# 1.2 集群规划
- Kuborad Spary
主机名 | IP |
---|---|
kuborad | 192.168.0.15 |
- NFS Server
主机名 | IP |
---|---|
NFS-server | 192.168.0.15 |
- Kubernetes 集群规划
主机名 | IP | 控制节点 | etcd 节点 | 工作节点 |
---|---|---|---|---|
node1 | 192.168.0.6 | 是 | 是 | 是 |
node2 | 192.168.0.7 | 是 | 是 | 是 |
node3 | 192.168.0.14 | 是 | 是 | 是 |
# 2 Kubernetes 集群部署
- 这部分内容已经在Spark on K8s 实验中给出详细步骤,这里不再重复。
# 3 Flink Native Kubernetes 部署
# 3.1 配置 flink 用户权限
- 创建用户
flink
并配置权限
kubectl create serviceaccount flink -n bigdata
kubectl create clusterrolebinding flink-role-binding-flink \
--clusterrole=edit \
--serviceaccount=bigdata:flink
# 3.2 创建 session cluster
在安装了 Flink 的节点上进入 flink 根目录,执行以下命令并指定资源:
./bin/kubernetes-session.sh \ -Dkubernetes.namespace=bigdata \ -Dkubernetes.jobmanager.service-account=flink \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dkubernetes.cluster-id=flink-session-cluster \ -Dtaskmanager.memory.process.size=2048m \ -Dkubernetes.taskmanager.cpu=1 \ -Dkubernetes.jobmanager.replicas=1 \ -Dtaskmanager.numberOfTaskSlots=3 \ -Dresourcemanager.taskmanager-timeout=3600000
可以看到,控制台提示创建成功,并且提示了 Flink Web UI 的访问地址为:http://192.168.0.6:32077,可以看到 Web UI 界面如下:
继续在 flink 根目录下执行以下命令,将官方自带的 WindowJoin 任务提交到 session cluster 测试部署是否成功:
./bin/flink run -d \ --target kubernetes-session \ -Dkubernetes.namespace=bigdata \ -Dkubernetes.cluster-id=flink-session-cluster \ -Dkubernetes.service-account=flink \ -Dkubernetes.namespace=bigdata \ -Dkubernetes.taskmanager.cpu=1 \ examples/streaming/WindowJoin.jar
可以看到
WindowJoin.jar
已经被提交到 session cluster,占用 1 个 Slot,总共 Slot 数为 4
# 4 编写 WordCount 程序
- 配置 POM 文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cuterwrite</groupId>
<artifactId>FlinkApp</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.16.0</flink.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- Replace this with the main class of your job -->
<mainClass>com.cuterwrite.WordCount</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 编写
WordCount.java
:
package com.cuterwrite;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
public class WordCount {
private static final Logger log = LoggerFactory.getLogger(WordCount.class);
public WordCount() {}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// 监听 9999 端口的 socket 输入
DataStreamSource<String> text = env.socketTextStream("192.168.0.6", 9999);
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = value.toLowerCase().split(" ");
for (String token : tokens) {
collector.collect(new Tuple2<>(token, 1));
}
}
// 合并相同单词的频数
})
.keyBy(item -> item.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
log.info("单词:" + value.f0 + ",频率:" + value.f1);
}
});
env.execute("Word Count");
}
}
# 5 实验结果
提交 WordCount 程序 jar 包
./bin/flink run -d \ --target kubernetes-session \ -Dkubernetes.namespace=bigdata \ -Dkubernetes.cluster-id=flink-session-cluster \ -Dkubernetes.service-account=flink \ -Dkubernetes.namespace=bigdata \ /root/FlinkApp-1.0-SNAPSHOT.jar
查看 Flink Web UI:
使用 socket 传输字符进行测试:
nc 192.168.0.6 9999
实验结果: