Featured image of post 基于 Spark on k8s 的词频统计实验

基于 Spark on k8s 的词频统计实验

spark on k8s 词频统计

# 基于 Spark on k8s 的词频统计实验

# 1 简介

# 1.1 实验环境

本实验主要使用 Ubuntu 20.04 64 位作为系统环境,采用 6 台 4 核 8GB 云服务器作为 Kubernetes 集群部署机器,1 台 2 核 4GB 云服务器作为集群管理工具 Kuboard Spary 部署机器,1 台 2 核 4GB 云服务器作为 NFS Server(使用 Centos 7.6 系统)部署机器。

使用的软件如下:

名称版本
kuboard sparyv1.2.3-amd64
kubernetesv1.25.4
calicov3.23.3
etcdv3.5.5
crictlv1.25.0
crun1.4.5
krewv0.4.3
runcv1.1.4
cniv1.1.1
nerdctl1.0.0
corednsv1.8.6
dnsautoscaler1.8.5
pod_infra3.7
spark3.3.1
hadoop3.2.3

# 1.2 集群规划

  • Kuborad Spary
主机名IP
kuborad192.168.0.115
  • NFS Server
主机名IP
NFS-server192.168.0.132
  • Kubernetes 集群规划
主机名IP控制节点etcd 节点工作节点
node1192.168.0.76
node2192.168.0.213
node3192.168.0.2
node4192.168.0.41
node5192.168.0.73
node6192.168.0.12

# 2 部署 Kubernetes 集群

# 2.1 安装 Kuboard-Spray

Kuboard-Spray 是一款可以在图形界面引导下完成 Kubernetes 高可用集群离线安装的工具,开源仓库的地址为 Kuboard-Spray

  • 在 kuborad 节点上安装 docker-ce

    # 1. 安装必备的系统工具
    sudo apt-get remove docker docker-engine docker.io containerd runc;
    sudo apt-get install apt-transport-https ca-certificates curl gnupg2 software-properties-common;
    
    # 2. 安装 GPG 证书
    curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/docker.gpg;
    # 3. 写入软件源信息
    echo \
      "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/docker.gpg] https://mirrors.tuna.tsinghua.edu.cn/docker-ce/linux/ubuntu \
      $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
    
    # 4. 更新并安装 Docker-CE
    
    sudo apt-get update;
    sudo apt-get install docker-ce;
    
    # 5. 配置 docker 镜像加速器(可以在阿里云获取地址)
    
    sudo mkdir -p /etc/docker;
    sudo tee /etc/docker/daemon.json <<-'EOF'
    {
     "registry-mirrors": [
        "https://docker.mirrors.ustc.edu.cn",
        "https://cr.console.aliyun.com/" ]
    }
    EOF
    sudo systemctl daemon-reload;
    sudo systemctl restart docker;
    
  • 在 kuboard 节点上执行以下命令:

    docker run -d \
      --privileged \
      --restart=unless-stopped \
      --name=kuboard-spray \
      -e TZ=Asia/Shanghai \
      -p 80:80/tcp \
      -v /var/run/docker.sock:/var/run/docker.sock \
      -v ~/kuboard-spray-data:/data \
      eipwork/kuboard-spray:v1.2.3-amd64
    
  • 在浏览器打开地址 http://这台机器的 IP,输入用户名 admin,默认密码 Kuboard123,即可登录 Kuboard-Spray 界面。

# 2.2 加载离线资源包

  • 在 Kuboard-Spray 界面中,导航到 系统设置 –> 资源包管理 界面,可以看到已经等候您多时的 Kuboard-Spray 离线资源包,如下图所示

  • 点击 导入 按钮,在界面的引导下完成资源包的加载。

# 2.3 安装 Kubernetes 集群

在 Kuboard-Spray 界面中,导航到 集群管理 界面,点击界面中的 添加集群安装计划 按钮,填写表单如下:

  • 集群名称: 自定义名称,本文中填写为 kuboard,此名称不可以修改;

  • 资源包:选择前面步骤中导入的离线资源包。

  • 点击 确定 按钮后,将进入集群规划页面,在该界面中添加每个集群节点的连接参数并设置节点的角色,如下图所示:

    重要: kuboard-spray 所在机器不能当做 K8S 集群的一个节点,因为安装过程中会重启集群节点的容器引擎,这会导致 kuboard-spray 被重启掉。

  • 注意:

    • 最少的节点数量是 1 个;
    • ETCD 节点、控制节点的总数量必须为奇数;
  • 点击上图的 保存 按钮,再点击 执行 按钮,可以启动集群的离线安装过程,安装结果如下:

# 3 部署 Spark on k8s

# 3.1 制作 spark 容器镜像

  • 下载 spark-3.3.1-bin-hadoop3

    wget https://mirrors.pku.edu.cn/apache/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz;
    tar -xzf spark-3.3.1-bin-hadoop.tgz;
    mv spark-3.3.1-bin-hadoop spark;
    
  • 修改 Dockerfile 默认 apt 源加速

    cd spark/kubernetes/dockerfiles/spark;
    // 修改 Dockerfile 内容
    // 修改前:
    sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list
    // 修改后:
    sed -i 's#http://deb.debian.org#https://mirrors.ustc.edu.cn#g' /etc/apt/source.list
    sed -i 's|security.debian.org/debian-security|mirrors.ustc.edu.cn/debian-security|g' /etc/apt/source.list
    
  • 构建 docker 镜像

    cd spark/bin;
    // -r <repo> -t <tag>
    ./docker-image-tool.sh -r cuterwrite -t 0.1 build;
    
  • 推送镜像到阿里云仓库(参考容器镜像服务->实例列表->镜像仓库)

    docker login --username=[阿里云账号] registry.cn-hangzhou.aliyuncs.com;
    docker tag [ImageId] registry.cn-hangzhou.aliyuncs.com/[repository]:[镜像版本号];
    docker push registry.cn-hangzhou.aliyuncs.com/[repository]:[镜像版本号];
    

# 3.2 创建命名空间

  • 访问 Kuboard,通常默认用户名为 admin,默认密码为 Kuboard123,访问地址为第一个控制节点的 80 端口(取决于安装时的参数),如下图所示:

  • 点击进入 default 集群,在下图所示的页面点击创建spark 命名空间:

# 3.3 配置 spark 用户权限

  • 创建用户spark 并配置权限

    kubectl create serviceaccount spark
    kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namesparce=spark
    

# 3.4 配置 spark 历史服务器

  • 创建一个名为spark-history-server 的 deployment,配置如下:

    • 容器信息:

      • 名称:spark-history-server

      • 容器镜像:registry.cn-hangzhou.aliyuncs.com/[用户名]/spark:0.1(需配置仓库仓库名和密码)

      • 环境变量:SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=hdfs://192.168.0.238:8020/sparkhistory(需提前部署 HDFS)

      • 容器端口:18080,端口名称 http

      • 参数:["/opt/spark/bin/spark-class", “org.apache.spark.deploy.history.HistoryServer”]

    • 服务信息:

      • 端口:18080

      • 协议:TCP

      • 目标端口:18080

      • NodePort:30080

      • 类型:NodePort

  • 测试配置是否成功:

    ./spark-submit \
        --master k8s://https://127.0.0.1:6443 \
        --deploy-mode cluster \
        --name spark-pi \
        --class org.apache.spark.examples.SparkPi \
        --conf spark.kubernetes.executor.request.cores=1 \
        --conf spark.kubernetes.executor.limit.cores=1 \
        --conf spark.kubernetes.driver.limit.cores=1 \
        --conf spark.kubernetes.driver.request.cores=1 \
        --conf spark.eventLog.enabled=true \
        --conf spark.eventLog.dir=hdfs://192.168.0.238:8020/sparkhistory \
        --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
        --conf spark.kubernetes.namespace=bigdata \
        --conf spark.executor.instances=2 \
        --conf spark.kubernetes.file.upload.path=/tmp \
        --conf spark.kubernetes.container.pullSecrets=aliyun-repository \
        --conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/cuterwrite/spark:0.1 \
    hdfs://192.168.0.238:8020/user/root/jars/spark-examples_2.12-3.3.1.jar
    

    提交任务成功后可以在 Kuboard 管理界面看到一个新启动的容器组:

    访问 spark 历史服务器,可以看到以下记录:

# 4 编写 WordCount 程序

WordCount.java

package com.cuterwrite;

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.Iterator;

public class WordCount {
    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession.builder().appName("WordCount").getOrCreate();
        Dataset<String> lines = spark.read().textFile("hdfs://192.168.0.238:8020/input/news.txt");
        Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        }, Encoders.STRING());
        Dataset<Row> wordCounts = words.groupBy("value").count();
        wordCounts.write().format("csv").save("hdfs://192.168.0.238:8020/output/word_count_result");
    }
}

# 5 实验结果

  • 提交词频统计任务到Kubernetes
./spark-submit \
    --master k8s://https://127.0.0.1:6443 \
    --deploy-mode cluster \
    --name wordcount \
    --class com.cuterwrite.WordCount \
    --conf spark.kubernetes.executor.request.cores=2 \
    --conf spark.kubernetes.executor.limit.cores=2 \
    --conf spark.kubernetes.driver.limit.cores=1 \
    --conf spark.kubernetes.driver.request.cores=1 \
    --conf spark.eventLog.enabled=true \
    --conf spark.eventLog.dir=hdfs://192.168.0.238:8020/sparkhistory \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.namespace=bigdata \
    --conf spark.executor.instances=3 \
    --conf spark.kubernetes.file.upload.path=/tmp \
    --conf spark.kubernetes.container.pullSecrets=aliyun-repository \
    --conf spark.kubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/cuterwrite/spark:0.1 \
hdfs://192.168.0.238:8020/user/root/jars/SparkApp-1.0.jar
  • 执行结果:
hdfs dfs -cat output/wordCount/_temporary/0/task_202212221534101760903765384745539_0002_m_000000/*
本博客已稳定运行
总访客数: Loading
总访问量: Loading
发表了 73 篇文章 · 总计 323.73k

使用 Hugo 构建
主题 StackJimmy 设计
基于 v3.27.0 分支版本修改