Featured image of post 在 HPC 上运行 Apache Spark

在 HPC 上运行 Apache Spark

Apache Spark 是一个多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习任务。本文将为您提供在高性能计算(HPC)集群系统上运行多节点 Spark 集群的指南,并展示一个使用 PySpark 的作业示例。

在 HPC 上运行 Apache Spark

一、概述

Apache Spark 是一个多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习任务。本文将为您提供在高性能计算(HPC)集群系统上运行多节点 Spark 集群的指南,并展示一个使用 PySpark 的作业示例。

二、开始

1. 下载 OpenJDK-11.0.2

OpenJDK 官方网站 下载 OpenJDK-11.0.2。选择 Linux 的对应版本并下载。解压下载的文件并将其放置在 ${HOME}/software/openjdk 中并重命名为 11.0.2

2. 下载 Spark-3.4.2

Apache Spark 下载页面 下载 Spark 。本文使用的是 Spark-3.4.2,但本指南应该也适用于更新的版本。解压下载的文件并将目录重命名为 3.4.2,放置在 ${HOME}/software/spark 文件夹中。

3. 配置 modulefile

在自定义目录中安装软件后,需要将软件的可执行文件路径等添加到相应的环境变量中才能使用。module 是一款环境变量管理工具,通过 module 实现软件环境变量的管理,快速加载和切换软件环境。集群中安装了一些常用的软件和库,可以通过 module 进行加载使用。

在这里,我们需要编写 modulefile 来管理自己的 JDK 和 Spark 软件环境,以便快速加载 Java 和 Spark 环境。

  • ${HOME}/modulefiles/openjdk 中创建名为 11.0.2 的文本文件,内容为:
#%Module1.0
##
## openjdk modulefile
##

proc ModulesHelp { } {
    puts stderr "This module sets up the environment for OpenJdk 11.0.2 \n"
}

module-whatis "For more information, \$ module help openjdk/11.0.2\n"

conflict openjdk

# 注意!这里需要进行修改
set root <PATH/WHERE/OPENJDK/DIRECTORY/IS>

prepend-path PATH ${root}/bin
  • ${HOME}/modulefiles/spark 中创建名为 3.4.2 的文本文件, 内容为:
#%Module1.0
##
## spark modulefile
##

proc ModulesHelp { } {
    global version

    puts stderr "This module loads Apache Spark environment variables and updates the PATH."
    puts stderr " "
    puts stderr "Version: $version"
}


module-whatis "Loads Apache Spark environment variables and updates the PATH. \n For more information, \$ module help spark/3.4.2 .\n"

conflict spark

# Set the version and installation path
set version 3.4.2

# 注意!这里需要进行修改
set root <PATH/WHERE/SPARK/DIRECTORY/IS>

# Set the environment variables
setenv SPARK_HOME ${root}
setenv SPARK_CONF_DIR ${root}/conf
setenv PYSPARK_PYTHON python3

# Update the PATH
prepend-path PATH ${root}/bin
prepend-path PATH ${root}/sbin


# Update the CLASSPATH
prepend-path CLASSPATH ${root}/jars/*

4. 使用 pip 安装 pyspark 库

  • 创建虚拟 Conda 环境 pyspark
conda create -n pyspark python=3.10
  • 安装 pyspark
conda activate pyspark
pip install pyspark

5. 编写环境加载脚本 set-spark-env.sh

  • ${HOME}/scripts 目录下编写 set-spark-env.sh 脚本文件:
#!/bin/bash

source /etc/profile

# 注意!这里需要修改为你的 Conda 的安装路径
export CONDA_PATH=<PATH/WHERE/CONDA/DIRECTORY/IS>
export PATH=$CONDA_PATH/bin:$PATH

export MODULEPATH=${HOME}/modulefiles:$MODULEPATH

source activate
conda activate pyspark

module load openjdk
module load spark

6. 编写 sbatch 脚本

  • 为了启动 Spark 集群,我们使用以下 Slurm 脚本来请求计算节点。Slurm 脚本请求四个节点,并生成一个 master 节点和三个 worker 节点的 Spark 集群。可以通过更改 Slurm 脚本中的 -N 选项的值来增加或减少工作节点的数量。
#!/bin/bash
#SBATCH --export=ALL
#SBATCH --mem=0
#SBATCH -p C28M250G
#SBATCH -t 1:00:00
#SBATCH -N 4
#SBATCH -J spark_test
#SBATCH -o o.spark_test
#SBATCH -e e.spark_test

source ~/scripts/set-spark-env.sh
workdir=`pwd`
nodes=($(scontrol show hostnames ${SLURM_JOB_NODELIST} | sort | uniq ))
numnodes=${#nodes[@]}
last=$(( $numnodes - 1 ))

export SCRATCH=${workdir}/scratch

master=${nodes[0]}
masterurl="spark://${master}:7077"

ssh ${nodes[0]} "source ~/scripts/set-spark-env.sh; start-master.sh"
for i in $( seq 1 $last )
do
    ssh ${nodes[$i]} "source ~/scripts/set-spark-env.sh; start-worker.sh ${masterurl}"
done

ssh ${nodes[0]} "cd ${workdir}; source ~/scripts/set-spark-env.sh; /usr/bin/time -v spark-submit --deploy-mode client --executor-cores 28 --executor-memory 240G --conf spark.standalone.submit.waitAppCompletion=true --master $masterurl spark_test.py"
wait
echo 'end'
exit

  • 该 Slurm 脚本会提交一个用于测试的 python 脚本( spark_test.py ),内容如下。此脚本运行 PySpark 代码来测试 Spark 集群。复制下面的内容,并将其保存在 sbatch 脚本所在目录中的 spark_test.py 文件。你也可以更改 spark_test.py 文件的路径,但必须适当地更新 Slurm 脚本。
#spark_test.py
import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark = SparkSession.builder.appName('Test-app').getOrCreate()

#Generate sample dataset
cola_list = ['2022-01-01', '2022-01-02', '2022-01-03' ]
colb_list = ['CSC', 'PHY', 'MAT', 'ENG', 'CHE', 'ENV', 'BIO', 'PHRM']
colc_list = [100, 200, 300, 400, 500, 600, 700, 800, 900]


# declaring a random.seed value to generate same data in every run
random.seed(1)
sample_data = []
for idx in range(1000):
    sample_data.append([random.choice(cola_list), random.choice(colb_list), random.choice(colc_list)])

columns= ["date", "org", "value"]
#creating a Spark dataframe
df = spark.createDataFrame(data = sample_data, schema = columns)

res = (df.groupBy('date','org')
       .agg(F.count('value').alias('count_value')))
res.show()
  • 如果启动了 Spark 集群并且 spark-test.py 成功执行,那么日志文件 o.spark_test 中的输出应该如下:
starting org.apache.spark.deploy.master.Master, logging to ...
starting org.apache.spark.deploy.worker.Worker, logging to ...
starting org.apache.spark.deploy.worker.Worker, logging to ...
starting org.apache.spark.deploy.worker.Worker, logging to ...
+----------+----+-----------+
|      date| org|count_value|
+----------+----+-----------+
|2022-01-03| BIO|         37|
|2022-01-02| ENV|         53|
|2022-01-03| CHE|         39|
|2022-01-03| PHY|         46|
|2022-01-01| CSC|         45|
|2022-01-03| CSC|         48|
|2022-01-01| BIO|         39|
|2022-01-01| MAT|         42|
|2022-01-02| CHE|         44|
|2022-01-03| ENV|         33|
|2022-01-01| ENG|         33|
|2022-01-02| ENG|         28|
|2022-01-01| ENV|         33|
|2022-01-02| CSC|         45|
|2022-01-02| MAT|         51|
|2022-01-01| PHY|         38|
|2022-01-01|PHRM|         40|
|2022-01-03|PHRM|         42|
|2022-01-02|PHRM|         43|
|2022-01-03| ENG|         56|
+----------+----+-----------+
only showing top 20 rows

end
  • Spark 还提供了一个 web UI 来监控集群,您可以通过将 master 节点端口转发到本地机器来在本地机器上访问它。
    • 例如,如果 master 节点在 cpu1 上运行,则可以在本地计算机终端上运行以下代码。
      ssh -t -t  <USERNAME>@<LOGIN_NODE_IP> -L 8080:localhost:8080 \
      -i <PRIVATE_KEY_LOCATION> ssh cpu1  -L 8080:127.0.0.1:8080
    

三、总结

在本文中,我们介绍了如何在 HPC 集群上部署和运行 Apache Spark 集群。通过遵循本指南中的步骤,你应该能够成功地在 HPC 环境中运行 Spark 作业。请注意,根据你的具体 HPC 环境和配置,可能需要进行一些调整。

Spark 官方文档 是一个非常有用的工具,通过它可以帮助你找到 Spark 的具体说明并解决问题。所以实际遇到问题时要多使用它。

本博客已稳定运行
发表了70篇文章 · 总计295.51k字
Welcome to cuterwrite's blog!
使用 Hugo 构建
主题 StackJimmy 设计
基于 v3.25.0 分支版本修改