【Docker】コンテナ上にHadoop + Spark実行環境を構築する

+

 案件でHadoopを扱う事になりそうなので、勉強のためにDockerコンテナでHadoop + Spark環境を構築してみました。擬似分散モードでの起動を想定しています。
ソースコードはgit hubに上げてあります。

github.com

Hadoopのインストール

 公式サイトからファイルをダウンロードする必要があります。
配布されているHadoopのバージョンは以下URLから確認できます。

ftp.kddilabs.jp

Sparkのインストール

 Sparkも公式サイトからダウンロードします。
ダウンロードするHadoopのバージョンと同じ物をChoose a package typeのセレクトボックスから選択します。

spark.apache.org

Dockerfile

 公式サイトに環境構築手順がありますので、そちらを参考に各ツールのインストールと設定ファイルを配置します。
HadoopとSparkのダウンロードURLは、先述の公式サイトから確認できます。

FROM openjdk:8

ENV HADOOP_VERSION 2.10.2
ENV HADOOP_HOME=/hadoop-${HADOOP_VERSION}
ENV HADOOP_OPTS "-Djava.library.path=${HADOOP_HOME}/lib/native"
ENV HADOOP_PREFIX ${HADOOP_HOME}
ENV HADOOP_COMMON_HOME ${HADOOP_HOME}
ENV HADOOP_HDFS_HOME ${HADOOP_HOME}
ENV HADOOP_MAPRED_HOME ${HADOOP_HOME}
ENV HADOOP_YARN_HOME ${HADOOP_HOME}
ENV HADOOP_CONF_DIR ${HADOOP_HOME}/etc/hadoop
ENV YARN_CONF_DIR $HADOOP_PREFIX/etc/hadoop
ENV SPARK_HOME=/usr/local/lib/spark
ENV PATH=${HADOOP_HOME}/bin:${SPARK_HOME}/bin:$PATH


RUN apt-get update \
  && apt-get install -y --no-install-recommends ssh \
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/* \
  && wget -q -O - http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz | tar zxf - \
  && wget https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop2.tgz \
  && tar zxvf spark-3.3.0-bin-hadoop2.tgz -C /usr/local/lib/ \
  && ln -s /usr/local/lib/spark-3.3.0-bin-hadoop2 /usr/local/lib/spark \
  && rm spark-3.3.0-bin-hadoop2.tgz

USER ${USERNAME}

WORKDIR /hadoop-${HADOOP_VERSION}
COPY config ./etc/hadoop/
COPY start ./
COPY main.py ./

RUN mkdir -p /run/sshd /data \
    && ssh-keygen -t rsa -P '' -f /root/.ssh/id_rsa \
    && cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys \
    && chmod 0600 /root/.ssh/authorized_keys \
    && hdfs namenode -format

CMD ["./start"]

start

 Hadoop起動後にコンテナが停止しないようにsleep処理を実行しています。

#!/bin/bash

/usr/sbin/sshd
/hadoop-${HADOOP_VERSION}/sbin/start-dfs.sh
/hadoop-${HADOOP_VERSION}/sbin/start-yarn.sh

# daemonize
while true; do
  sleep 1000
done

main.py

 ネットで拾ってきた、Spark submit実行用のPythonファイルです。

# -*- coding: utf-8 -*-
from pyspark import SparkContext, RDD
from typing import List, Dict


def main():
   # inputデータ(試験の結果)
   input_data: List[Dict[str, int]] = [
      {'国語': 86, '算数': 57, '理科': 45, '社会': 100},
      {'国語': 67, '算数': 12, '理科': 43, '社会': 54},
      {'国語': 98, '算数': 98, '理科': 78, '社会': 69},
   ]
   # SparkContext, RDD作成
   sc: SparkContext = SparkContext(appName='spark_sample')
   rdd: RDD = sc.parallelize(input_data)
   # 各教科および合計点の平均点を計算
   output: Dict[str, float] = rdd\
      .map(lambda x: x.update(合計=sum(x.values())) or x)\
      .flatMap(lambda x: x.items())\
      .groupByKey()\
      .map(lambda x: (x[0], round(sum(x[1]) / len(x[1]), 2)))\
      .collect()

   print(output)


if __name__ == '__main__':
    main()

設定ファイル

 疑似分散モードの設定とYARN、Sparkを適用する為の各種configファイルです。

config/core-site.xml
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>
config/hdfs-site.xml
<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>/data/namenode</value>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>/data/datanode</value>
  </property>
</configuration>
config/hadoop-env.sh

 設定内容の詳細は、以下の記事が参考になります。

software.fujitsu.com

export JAVA_HOME=/usr/local/openjdk-8
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no -o BatchMode=yes"
export HADOOP_OPTS="${HADOOP_OPTS} -XX:-PrintWarnings -Djava.net.preferIPv4Stack=true -Djava.library.path=${HADOOP_HOME}/lib/native"
config/mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
  <property>
    <name>mapreduce.application.classpath</name>
    <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
  </property>
</configuration>
config/yarn-site.xml
<configuration>
  <property>
    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.env-whitelist</name>
    <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_HOME,PATH,LANG,TZ,HADOOP_MAPRED_HOME</value>
  </property>
</configuration>

起動

 イメージをビルドして、コンテナを起動します。

$ docker build -t test-hadoop .

$ docker run -itd --name test-hadoop --rm -p 50070:50070 -p 8088:8088 test-hadoop

$ docker exec -it test-hadoop bash


少し待つとWeb UIにアクセスできます。

50070ポートにアクセスすると、HDFSの情報を参照できます。

YARNのResourceManager

実行

 MapReduceのサンプルである「Wordcount」を実行してみます。

# input配置先を作成
root@2b017152af10:/hadoop-2.10.2# hdfs dfs -mkdir -p /user/root/input

# Wordcountの資材をinputに配置
root@2b017152af10:/hadoop-2.10.2# hdfs dfs -put etc/hadoop/* /user/root/input

# Wordcountを実行
root@2b017152af10:/hadoop-2.10.2# hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.10.2.jar wordcount input output
22/10/15 08:28:58 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/10/15 08:28:59 INFO input.FileInputFormat: Total input files to process : 30
22/10/15 08:28:59 INFO mapreduce.JobSubmitter: number of splits:30
22/10/15 08:28:59 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1665818375854_0002
22/10/15 08:28:59 INFO conf.Configuration: resource-types.xml not found
22/10/15 08:28:59 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
22/10/15 08:28:59 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
22/10/15 08:28:59 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
22/10/15 08:28:59 INFO impl.YarnClientImpl: Submitted application application_1665818375854_0002
22/10/15 08:28:59 INFO mapreduce.Job: The url to track the job: http://2b017152af10:8088/proxy/application_1665818375854_0002/
22/10/15 08:28:59 INFO mapreduce.Job: Running job: job_1665818375854_0002
22/10/15 08:29:06 INFO mapreduce.Job: Job job_1665818375854_0002 running in uber mode : false
22/10/15 08:29:06 INFO mapreduce.Job:  map 0% reduce 0%
22/10/15 08:29:22 INFO mapreduce.Job:  map 20% reduce 0%
22/10/15 08:29:33 INFO mapreduce.Job:  map 40% reduce 0%
22/10/15 08:29:41 INFO mapreduce.Job:  map 43% reduce 0%
22/10/15 08:29:42 INFO mapreduce.Job:  map 57% reduce 0%
22/10/15 08:29:47 INFO mapreduce.Job:  map 60% reduce 0%
22/10/15 08:29:48 INFO mapreduce.Job:  map 67% reduce 0%
22/10/15 08:29:49 INFO mapreduce.Job:  map 73% reduce 0%
22/10/15 08:29:53 INFO mapreduce.Job:  map 77% reduce 24%
22/10/15 08:29:54 INFO mapreduce.Job:  map 80% reduce 24%
22/10/15 08:29:55 INFO mapreduce.Job:  map 83% reduce 24%
22/10/15 08:29:56 INFO mapreduce.Job:  map 90% reduce 24%
22/10/15 08:29:58 INFO mapreduce.Job:  map 93% reduce 24%
22/10/15 08:29:59 INFO mapreduce.Job:  map 97% reduce 30%
22/10/15 08:30:00 INFO mapreduce.Job:  map 100% reduce 30%
22/10/15 08:30:01 INFO mapreduce.Job:  map 100% reduce 100%
22/10/15 08:30:02 INFO mapreduce.Job: Job job_1665818375854_0002 completed successfully
22/10/15 08:30:02 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=79549
                FILE: Number of bytes written=6694130
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=89584
                HDFS: Number of bytes written=42991
                HDFS: Number of read operations=93
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
....
        File Input Format Counters 
                Bytes Read=85990
        File Output Format Counters 
                Bytes Written=42991

# 「output/part-r-00000」がジョブの出力結果
root@2b017152af10:/hadoop-2.10.2# hdfs dfs -ls output
Found 2 items
-rw-r--r--   1 root supergroup          0 2022-10-15 08:29 output/_SUCCESS
-rw-r--r--   1 root supergroup      42991 2022-10-15 08:29 output/part-r-00000

# 出力内容をcat
root@2b017152af10:/hadoop-2.10.2# hdfs dfs -cat output/part-r-00000 | head
!=      3
""      7
"".     4
"$HADOOP_JOB_HISTORYSERVER_HEAPSIZE"    1
"$JAVA_HOME"    2
"$YARN_HEAPSIZE"        1
"$YARN_LOGFILE" 1
"$YARN_LOG_DIR" 1
"$YARN_POLICYFILE"      1
"*"     19
cat: Unable to write to output stream.


spark-shellで見てみます。

root@2b017152af10:/hadoop-2.10.2# spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/15 08:43:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://2b017152af10:4040
Spark context available as 'sc' (master = local[*], app id = local-1665823413103).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.0
      /_/
         
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_342)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val txtFile = sc.textFile("output/part-r-00000")
txtFile: org.apache.spark.rdd.RDD[String] = output/part-r-00000 MapPartitionsRDD[7] at textFile at <console>:23

scala> txtFile.filter(line => line.contains("apache")).foreach(println)
#dfs.class=org.apache.hadoop.metrics.file.FileContext   1           (0 + 2) / 2]
#jvm.class=org.apache.hadoop.metrics.file.FileContext   1
#jvm.class=org.apache.hadoop.metrics.spi.NullContext    1
#log4j.additivity.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=false 1
#log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.statedump=false    1
#log4j.appender.FSSTATEDUMP.layout=org.apache.log4j.PatternLayout       1
#log4j.appender.FSSTATEDUMP=org.apache.log4j.RollingFileAppender        1
#log4j.appender.HSAUDIT.layout=org.apache.log4j.PatternLayout   1
#log4j.appender.HSAUDIT=org.apache.log4j.DailyRollingFileAppender       1
#log4j.appender.HTTPDRFA.layout=org.apache.log4j.PatternLayout  1
#log4j.appender.HTTPDRFA=org.apache.log4j.DailyRollingFileAppender      1
....
rpc.class=org.apache.hadoop.metrics.ganglia.GangliaContext      1
rpc.class=org.apache.hadoop.metrics.ganglia.GangliaContext31    1
rpc.class=org.apache.hadoop.metrics.spi.NullContext     1
ugi.class=org.apache.hadoop.metrics.ganglia.GangliaContext      1
ugi.class=org.apache.hadoop.metrics.ganglia.GangliaContext31    1
ugi.class=org.apache.hadoop.metrics.spi.NullContext     1

scala> :quit
root@2b017152af10:/hadoop-2.10.2# 


次はSpark submitでmain.pyを実行してみます。

root@2b017152af10:/hadoop-2.10.2# spark-submit main.py --master yarn
22/10/15 08:35:30 INFO SparkContext: Running Spark version 3.3.0
22/10/15 08:35:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/15 08:35:30 INFO ResourceUtils: ==============================================================
22/10/15 08:35:30 INFO ResourceUtils: No custom resources configured for spark.driver.
22/10/15 08:35:30 INFO ResourceUtils: ==============================================================
22/10/15 08:35:30 INFO SparkContext: Submitted application: spark_sample
22/10/15 08:35:30 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
22/10/15 08:35:30 INFO ResourceProfile: Limiting resource is cpu
....

22/10/15 08:35:37 INFO DAGScheduler: Job 0 finished: collect at /hadoop-2.10.2/main.py:17, took 3.034884 s
[('合計', 269.0), ('国語', 83.67), ('算数', 55.67), ('社会', 74.33), ('理科', 55.33)]
22/10/15 08:35:37 INFO SparkContext: Invoking stop() from shutdown hook
22/10/15 08:35:37 INFO SparkUI: Stopped Spark web UI at http://2b017152af10:4040
22/10/15 08:35:37 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/10/15 08:35:37 INFO MemoryStore: MemoryStore cleared
22/10/15 08:35:37 INFO BlockManager: BlockManager stopped
22/10/15 08:35:37 INFO BlockManagerMaster: BlockManagerMaster stopped
22/10/15 08:35:37 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/10/15 08:35:37 INFO SparkContext: Successfully stopped SparkContext
22/10/15 08:35:37 INFO ShutdownHookManager: Shutdown hook called
22/10/15 08:35:37 INFO ShutdownHookManager: Deleting directory /tmp/spark-266fe4d6-ea39-4e2e-b6f3-9c67dfed570c
22/10/15 08:35:37 INFO ShutdownHookManager: Deleting directory /tmp/spark-266fe4d6-ea39-4e2e-b6f3-9c67dfed570c/pyspark-05f99759-3773-41f5-b10f-f9aa1f82ef79
22/10/15 08:35:37 INFO ShutdownHookManager: Deleting directory /tmp/spark-74ca53c4-6b39-4ebc-b238-b971849ebaae
root@2b017152af10:/hadoop-2.10.2# 

まとめ

Dockerコンテナ上でHadoopとSparkを連携させてみました。
ユーザーがrootのままですが、ちょっと動かしてみたい時などに便利そうですね。

良記事