# 在 HPC 上运行 Apache Spark
# 一、概述
Apache Spark 是一个多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习任务。本文将为您提供在高性能计算(HPC)集群系统上运行多节点 Spark 集群的指南,并展示一个使用 PySpark 的作业示例。
# 二、开始
# 1. 下载 OpenJDK-11.0.2
从 OpenJDK 官方网站
下载 OpenJDK-11.0.2。选择 Linux 的对应版本并下载。解压下载的文件并将其放置在 ${HOME}/software/openjdk
中并重命名为 11.0.2
。
# 2. 下载 Spark-3.4.2
从 Apache Spark 下载页面 下载 Spark 。本文使用的是 Spark-3.4.2,但本指南应该也适用于更新的版本。解压下载的文件并将目录重命名为 3.4.2,放置在 ${HOME}/software/spark 文件夹中。
# 3. 配置 modulefile
在自定义目录中安装软件后,需要将软件的可执行文件路径等添加到相应的环境变量中才能使用。module
是一款环境变量管理工具,通过 module
实现软件环境变量的管理,快速加载和切换软件环境。集群中安装了一些常用的软件和库,可以通过 module
进行加载使用。
在这里,我们需要编写 modulefile
来管理自己的 JDK 和 Spark 软件环境,以便快速加载 Java 和 Spark 环境。
- 在
${HOME}/modulefiles/openjdk
中创建名为11.0.2
的文本文件,内容为:
#%Module1.0
##
## openjdk modulefile
##
proc ModulesHelp { } {
puts stderr "This module sets up the environment for OpenJdk 11.0.2 \n"
}
module-whatis "For more information, \$ module help openjdk/11.0.2\n"
conflict openjdk
# 注意!这里需要进行修改
set root <PATH/WHERE/OPENJDK/DIRECTORY/IS>
prepend-path PATH ${root}/bin
- 在
${HOME}/modulefiles/spark
中创建名为3.4.2
的文本文件, 内容为:
#%Module1.0
##
## spark modulefile
##
proc ModulesHelp { } {
global version
puts stderr "This module loads Apache Spark environment variables and updates the PATH."
puts stderr " "
puts stderr "Version: $version"
}
module-whatis "Loads Apache Spark environment variables and updates the PATH. \n For more information, \$ module help spark/3.4.2 .\n"
conflict spark
# Set the version and installation path
set version 3.4.2
# 注意!这里需要进行修改
set root <PATH/WHERE/SPARK/DIRECTORY/IS>
# Set the environment variables
setenv SPARK_HOME ${root}
setenv SPARK_CONF_DIR ${root}/conf
setenv PYSPARK_PYTHON python3
# Update the PATH
prepend-path PATH ${root}/bin
prepend-path PATH ${root}/sbin
# Update the CLASSPATH
prepend-path CLASSPATH ${root}/jars/*
# 4. 使用 pip 安装 pyspark 库
- 创建虚拟 Conda 环境 pyspark
conda create -n pyspark python=3.10
- 安装 pyspark
conda activate pyspark
pip install pyspark
# 5. 编写环境加载脚本 set-spark-env.sh
- 在
${HOME}/scripts
目录下编写set-spark-env.sh
脚本文件:
#!/bin/bash
source /etc/profile
# 注意!这里需要修改为你的 Conda 的安装路径
export CONDA_PATH=<PATH/WHERE/CONDA/DIRECTORY/IS>
export PATH=$CONDA_PATH/bin:$PATH
export MODULEPATH=${HOME}/modulefiles:$MODULEPATH
source activate
conda activate pyspark
module load openjdk
module load spark
# 6. 编写 sbatch 脚本
- 为了启动 Spark 集群,我们使用以下 Slurm 脚本来请求计算节点。Slurm 脚本请求四个节点,并生成一个 master 节点和三个 worker 节点的 Spark 集群。可以通过更改 Slurm 脚本中的
-N
选项的值来增加或减少工作节点的数量。
#!/bin/bash
#SBATCH --export=ALL
#SBATCH --mem=0
#SBATCH -p C28M250G
#SBATCH -t 1:00:00
#SBATCH -N 4
#SBATCH -J spark_test
#SBATCH -o o.spark_test
#SBATCH -e e.spark_test
source ~/scripts/set-spark-env.sh
workdir=`pwd`
nodes=($(scontrol show hostnames ${SLURM_JOB_NODELIST} | sort | uniq ))
numnodes=${#nodes[@]}
last=$(( $numnodes - 1 ))
export SCRATCH=${workdir}/scratch
master=${nodes[0]}
masterurl="spark://${master}:7077"
ssh ${nodes[0]} "source ~/scripts/set-spark-env.sh; start-master.sh"
for i in $( seq 1 $last )
do
ssh ${nodes[$i]} "source ~/scripts/set-spark-env.sh; start-worker.sh ${masterurl}"
done
ssh ${nodes[0]} "cd ${workdir}; source ~/scripts/set-spark-env.sh; /usr/bin/time -v spark-submit --deploy-mode client --executor-cores 28 --executor-memory 240G --conf spark.standalone.submit.waitAppCompletion=true --master $masterurl spark_test.py"
wait
echo 'end'
exit
- 该 Slurm 脚本会提交一个用于测试的 python 脚本(
spark_test.py
),内容如下。此脚本运行 PySpark 代码来测试 Spark 集群。复制下面的内容,并将其保存在 sbatch 脚本所在目录中的spark_test.py
文件。你也可以更改spark_test.py
文件的路径,但必须适当地更新 Slurm 脚本。
#spark_test.py
import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('Test-app').getOrCreate()
#Generate sample dataset
cola_list = ['2022-01-01', '2022-01-02', '2022-01-03' ]
colb_list = ['CSC', 'PHY', 'MAT', 'ENG', 'CHE', 'ENV', 'BIO', 'PHRM']
colc_list = [100, 200, 300, 400, 500, 600, 700, 800, 900]
# declaring a random.seed value to generate same data in every run
random.seed(1)
sample_data = []
for idx in range(1000):
sample_data.append([random.choice(cola_list), random.choice(colb_list), random.choice(colc_list)])
columns= ["date", "org", "value"]
#creating a Spark dataframe
df = spark.createDataFrame(data = sample_data, schema = columns)
res = (df.groupBy('date','org')
.agg(F.count('value').alias('count_value')))
res.show()
- 如果启动了 Spark 集群并且
spark-test.py
成功执行,那么日志文件o.spark_test
中的输出应该如下:
starting org.apache.spark.deploy.master.Master, logging to ...
starting org.apache.spark.deploy.worker.Worker, logging to ...
starting org.apache.spark.deploy.worker.Worker, logging to ...
starting org.apache.spark.deploy.worker.Worker, logging to ...
+----------+----+-----------+
| date| org|count_value|
+----------+----+-----------+
|2022-01-03| BIO| 37|
|2022-01-02| ENV| 53|
|2022-01-03| CHE| 39|
|2022-01-03| PHY| 46|
|2022-01-01| CSC| 45|
|2022-01-03| CSC| 48|
|2022-01-01| BIO| 39|
|2022-01-01| MAT| 42|
|2022-01-02| CHE| 44|
|2022-01-03| ENV| 33|
|2022-01-01| ENG| 33|
|2022-01-02| ENG| 28|
|2022-01-01| ENV| 33|
|2022-01-02| CSC| 45|
|2022-01-02| MAT| 51|
|2022-01-01| PHY| 38|
|2022-01-01|PHRM| 40|
|2022-01-03|PHRM| 42|
|2022-01-02|PHRM| 43|
|2022-01-03| ENG| 56|
+----------+----+-----------+
only showing top 20 rows
end
- Spark 还提供了一个 web UI 来监控集群,您可以通过将 master 节点端口转发到本地机器来在本地机器上访问它。
- 例如,如果 master 节点在
cpu1
上运行,则可以在本地计算机终端上运行以下代码。
ssh -t -t <USERNAME>@<LOGIN_NODE_IP> -L 8080:localhost:8080 \ -i <PRIVATE_KEY_LOCATION> ssh cpu1 -L 8080:127.0.0.1:8080
- 然后就可以在本地机器上的 Web 浏览器上使用地址 http://localhost:8080/ 访问 Spark Web UI。
- 例如,如果 master 节点在
# 三、总结
在本文中,我们介绍了如何在 HPC 集群上部署和运行 Apache Spark 集群。通过遵循本指南中的步骤,你应该能够成功地在 HPC 环境中运行 Spark 作业。请注意,根据你的具体 HPC 环境和配置,可能需要进行一些调整。
注释
Spark 官方文档 是一个非常有用的工具,通过它可以帮助你找到 Spark 的具体说明并解决问题。所以实际遇到问题时要多使用它。