Featured image of post 基于 Flink Native Kubernetes 的词频统计实验

基于 Flink Native Kubernetes 的词频统计实验

Flink Native Kubernetes 词频统计

# 基于 Flink Native Kubernetes 的词频统计实验

# 1 简介

# 1.1 实验环境

本实验主要使用 Ubuntu 20.04 64 位作为系统环境,采用 3 台 4 核 8GB 云服务器作为 Kubernetes 集群部署机器,1 台 4 核 8GB 云服务器作为集群管理工具 Kuboard Spary 部署机器,并作为 NFS Server 部署机器。使用的软件如下:

名称版本
kuboard sparyv1.2.3-amd64
kubernetesv1.25.4
calicov3.23.3
etcdv3.5.5
crictlv1.25.0
crun1.4.5
krewv0.4.3
runcv1.1.4
cniv1.1.1
nerdctl1.0.0
corednsv1.8.6
dnsautoscaler1.8.5
pod_infra3.7
flink1.16.0
hadoop3.2.3

# 1.2 集群规划

  • Kuborad Spary
主机名IP
kuborad192.168.0.15
  • NFS Server
主机名IP
NFS-server192.168.0.15
  • Kubernetes 集群规划
主机名IP控制节点etcd 节点工作节点
node1192.168.0.6
node2192.168.0.7
node3192.168.0.14

# 2 Kubernetes 集群部署

  • 这部分内容已经在Spark on K8s 实验中给出详细步骤,这里不再重复。
  • 创建用户flink 并配置权限
kubectl create serviceaccount flink -n bigdata

kubectl create clusterrolebinding flink-role-binding-flink \
  --clusterrole=edit \
  --serviceaccount=bigdata:flink

# 3.2 创建 session cluster

  • 在安装了 Flink 的节点上进入 flink 根目录,执行以下命令并指定资源:

    ./bin/kubernetes-session.sh \
        -Dkubernetes.namespace=bigdata \
        -Dkubernetes.jobmanager.service-account=flink \
        -Dkubernetes.rest-service.exposed.type=NodePort \
        -Dkubernetes.cluster-id=flink-session-cluster \
        -Dtaskmanager.memory.process.size=2048m \
        -Dkubernetes.taskmanager.cpu=1 \
        -Dkubernetes.jobmanager.replicas=1 \
        -Dtaskmanager.numberOfTaskSlots=3 \
        -Dresourcemanager.taskmanager-timeout=3600000
    

    可以看到,控制台提示创建成功,并且提示了 Flink Web UI 的访问地址为:http://192.168.0.6:32077,可以看到 Web UI 界面如下:

  • 继续在 flink 根目录下执行以下命令,将官方自带的 WindowJoin 任务提交到 session cluster 测试部署是否成功:

    ./bin/flink run -d \
      --target kubernetes-session \
      -Dkubernetes.namespace=bigdata \
      -Dkubernetes.cluster-id=flink-session-cluster \
      -Dkubernetes.service-account=flink \
      -Dkubernetes.namespace=bigdata \
      -Dkubernetes.taskmanager.cpu=1 \
    
      examples/streaming/WindowJoin.jar
    

    可以看到WindowJoin.jar 已经被提交到 session cluster,占用 1 个 Slot,总共 Slot 数为 4

# 4 编写 WordCount 程序

  • 配置 POM 文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cuterwrite</groupId>
    <artifactId>FlinkApp</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.16.0</flink.version>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <!-- Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- Replace this with the main class of your job -->
                                    <mainClass>com.cuterwrite.WordCount</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  • 编写WordCount.java
package com.cuterwrite;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

public class WordCount {
    private static final Logger log = LoggerFactory.getLogger(WordCount.class);
    public WordCount() {}

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        // 监听 9999 端口的 socket 输入
        DataStreamSource<String> text = env.socketTextStream("192.168.0.6", 9999);

        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(" ");
                for (String token : tokens) {
                    collector.collect(new Tuple2<>(token, 1));
                }
            }
            // 合并相同单词的频数
        })
        .keyBy(item -> item.f0)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .sum(1)
        .addSink(new SinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
                log.info("单词:" + value.f0 + ",频率:" + value.f1);
            }
        });

        env.execute("Word Count");
    }
}

# 5 实验结果

  • 提交 WordCount 程序 jar 包

    ./bin/flink run -d \
      --target kubernetes-session \
      -Dkubernetes.namespace=bigdata \
      -Dkubernetes.cluster-id=flink-session-cluster \
      -Dkubernetes.service-account=flink \
      -Dkubernetes.namespace=bigdata \
      /root/FlinkApp-1.0-SNAPSHOT.jar
    
  • 查看 Flink Web UI:

  • 使用 socket 传输字符进行测试:

    nc 192.168.0.6 9999
    
  • 实验结果:

本博客已稳定运行
总访客数: Loading
总访问量: Loading
发表了 73 篇文章 · 总计 323.73k

使用 Hugo 构建
主题 StackJimmy 设计
基于 v3.27.0 分支版本修改