【Docker】コンテナ上にHadoop + Spark実行環境を構築する
案件でHadoopを扱う事になりそうなので、勉強のためにDockerコンテナでHadoop + Spark環境を構築してみました。擬似分散モードでの起動を想定しています。
ソースコードはgit hubに上げてあります。
Hadoopのインストール
公式サイトからファイルをダウンロードする必要があります。
配布されているHadoopのバージョンは以下URLから確認できます。
Sparkのインストール
Sparkも公式サイトからダウンロードします。
ダウンロードするHadoopのバージョンと同じ物をChoose a package type
のセレクトボックスから選択します。
Dockerfile
公式サイトに環境構築手順がありますので、そちらを参考に各ツールのインストールと設定ファイルを配置します。
HadoopとSparkのダウンロードURLは、先述の公式サイトから確認できます。
FROM openjdk:8 ENV HADOOP_VERSION 2.10.2 ENV HADOOP_HOME=/hadoop-${HADOOP_VERSION} ENV HADOOP_OPTS "-Djava.library.path=${HADOOP_HOME}/lib/native" ENV HADOOP_PREFIX ${HADOOP_HOME} ENV HADOOP_COMMON_HOME ${HADOOP_HOME} ENV HADOOP_HDFS_HOME ${HADOOP_HOME} ENV HADOOP_MAPRED_HOME ${HADOOP_HOME} ENV HADOOP_YARN_HOME ${HADOOP_HOME} ENV HADOOP_CONF_DIR ${HADOOP_HOME}/etc/hadoop ENV YARN_CONF_DIR $HADOOP_PREFIX/etc/hadoop ENV SPARK_HOME=/usr/local/lib/spark ENV PATH=${HADOOP_HOME}/bin:${SPARK_HOME}/bin:$PATH RUN apt-get update \ && apt-get install -y --no-install-recommends ssh \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* \ && wget -q -O - http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz | tar zxf - \ && wget https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop2.tgz \ && tar zxvf spark-3.3.0-bin-hadoop2.tgz -C /usr/local/lib/ \ && ln -s /usr/local/lib/spark-3.3.0-bin-hadoop2 /usr/local/lib/spark \ && rm spark-3.3.0-bin-hadoop2.tgz USER ${USERNAME} WORKDIR /hadoop-${HADOOP_VERSION} COPY config ./etc/hadoop/ COPY start ./ COPY main.py ./ RUN mkdir -p /run/sshd /data \ && ssh-keygen -t rsa -P '' -f /root/.ssh/id_rsa \ && cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys \ && chmod 0600 /root/.ssh/authorized_keys \ && hdfs namenode -format CMD ["./start"]
start
Hadoop起動後にコンテナが停止しないようにsleep処理を実行しています。
#!/bin/bash /usr/sbin/sshd /hadoop-${HADOOP_VERSION}/sbin/start-dfs.sh /hadoop-${HADOOP_VERSION}/sbin/start-yarn.sh # daemonize while true; do sleep 1000 done
main.py
ネットで拾ってきた、Spark submit実行用のPythonファイルです。
# -*- coding: utf-8 -*- from pyspark import SparkContext, RDD from typing import List, Dict def main(): # inputデータ(試験の結果) input_data: List[Dict[str, int]] = [ {'国語': 86, '算数': 57, '理科': 45, '社会': 100}, {'国語': 67, '算数': 12, '理科': 43, '社会': 54}, {'国語': 98, '算数': 98, '理科': 78, '社会': 69}, ] # SparkContext, RDD作成 sc: SparkContext = SparkContext(appName='spark_sample') rdd: RDD = sc.parallelize(input_data) # 各教科および合計点の平均点を計算 output: Dict[str, float] = rdd\ .map(lambda x: x.update(合計=sum(x.values())) or x)\ .flatMap(lambda x: x.items())\ .groupByKey()\ .map(lambda x: (x[0], round(sum(x[1]) / len(x[1]), 2)))\ .collect() print(output) if __name__ == '__main__': main()
設定ファイル
疑似分散モードの設定とYARN、Sparkを適用する為の各種configファイルです。
config/core-site.xml
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> </configuration>
config/hdfs-site.xml
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>/data/namenode</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>/data/datanode</value> </property> </configuration>
config/hadoop-env.sh
設定内容の詳細は、以下の記事が参考になります。
export JAVA_HOME=/usr/local/openjdk-8 export HDFS_NAMENODE_USER=root export HDFS_DATANODE_USER=root export HDFS_SECONDARYNAMENODE_USER=root export HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no -o BatchMode=yes" export HADOOP_OPTS="${HADOOP_OPTS} -XX:-PrintWarnings -Djava.net.preferIPv4Stack=true -Djava.library.path=${HADOOP_HOME}/lib/native"
config/mapred-site.xml
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. See accompanying LICENSE file. --> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapreduce.application.classpath</name> <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value> </property> </configuration>
config/yarn-site.xml
<configuration> <property> <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.env-whitelist</name> <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_HOME,PATH,LANG,TZ,HADOOP_MAPRED_HOME</value> </property> </configuration>
起動
イメージをビルドして、コンテナを起動します。
$ docker build -t test-hadoop . $ docker run -itd --name test-hadoop --rm -p 50070:50070 -p 8088:8088 test-hadoop $ docker exec -it test-hadoop bash
少し待つとWeb UIにアクセスできます。
実行
MapReduceのサンプルである「Wordcount」を実行してみます。
# input配置先を作成 root@2b017152af10:/hadoop-2.10.2# hdfs dfs -mkdir -p /user/root/input # Wordcountの資材をinputに配置 root@2b017152af10:/hadoop-2.10.2# hdfs dfs -put etc/hadoop/* /user/root/input # Wordcountを実行 root@2b017152af10:/hadoop-2.10.2# hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.10.2.jar wordcount input output 22/10/15 08:28:58 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 22/10/15 08:28:59 INFO input.FileInputFormat: Total input files to process : 30 22/10/15 08:28:59 INFO mapreduce.JobSubmitter: number of splits:30 22/10/15 08:28:59 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1665818375854_0002 22/10/15 08:28:59 INFO conf.Configuration: resource-types.xml not found 22/10/15 08:28:59 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 22/10/15 08:28:59 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE 22/10/15 08:28:59 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE 22/10/15 08:28:59 INFO impl.YarnClientImpl: Submitted application application_1665818375854_0002 22/10/15 08:28:59 INFO mapreduce.Job: The url to track the job: http://2b017152af10:8088/proxy/application_1665818375854_0002/ 22/10/15 08:28:59 INFO mapreduce.Job: Running job: job_1665818375854_0002 22/10/15 08:29:06 INFO mapreduce.Job: Job job_1665818375854_0002 running in uber mode : false 22/10/15 08:29:06 INFO mapreduce.Job: map 0% reduce 0% 22/10/15 08:29:22 INFO mapreduce.Job: map 20% reduce 0% 22/10/15 08:29:33 INFO mapreduce.Job: map 40% reduce 0% 22/10/15 08:29:41 INFO mapreduce.Job: map 43% reduce 0% 22/10/15 08:29:42 INFO mapreduce.Job: map 57% reduce 0% 22/10/15 08:29:47 INFO mapreduce.Job: map 60% reduce 0% 22/10/15 08:29:48 INFO mapreduce.Job: map 67% reduce 0% 22/10/15 08:29:49 INFO mapreduce.Job: map 73% reduce 0% 22/10/15 08:29:53 INFO mapreduce.Job: map 77% reduce 24% 22/10/15 08:29:54 INFO mapreduce.Job: map 80% reduce 24% 22/10/15 08:29:55 INFO mapreduce.Job: map 83% reduce 24% 22/10/15 08:29:56 INFO mapreduce.Job: map 90% reduce 24% 22/10/15 08:29:58 INFO mapreduce.Job: map 93% reduce 24% 22/10/15 08:29:59 INFO mapreduce.Job: map 97% reduce 30% 22/10/15 08:30:00 INFO mapreduce.Job: map 100% reduce 30% 22/10/15 08:30:01 INFO mapreduce.Job: map 100% reduce 100% 22/10/15 08:30:02 INFO mapreduce.Job: Job job_1665818375854_0002 completed successfully 22/10/15 08:30:02 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=79549 FILE: Number of bytes written=6694130 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=89584 HDFS: Number of bytes written=42991 HDFS: Number of read operations=93 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 .... File Input Format Counters Bytes Read=85990 File Output Format Counters Bytes Written=42991 # 「output/part-r-00000」がジョブの出力結果 root@2b017152af10:/hadoop-2.10.2# hdfs dfs -ls output Found 2 items -rw-r--r-- 1 root supergroup 0 2022-10-15 08:29 output/_SUCCESS -rw-r--r-- 1 root supergroup 42991 2022-10-15 08:29 output/part-r-00000 # 出力内容をcat root@2b017152af10:/hadoop-2.10.2# hdfs dfs -cat output/part-r-00000 | head != 3 "" 7 "". 4 "$HADOOP_JOB_HISTORYSERVER_HEAPSIZE" 1 "$JAVA_HOME" 2 "$YARN_HEAPSIZE" 1 "$YARN_LOGFILE" 1 "$YARN_LOG_DIR" 1 "$YARN_POLICYFILE" 1 "*" 19 cat: Unable to write to output stream.
spark-shell
で見てみます。
root@2b017152af10:/hadoop-2.10.2# spark-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/10/15 08:43:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://2b017152af10:4040 Spark context available as 'sc' (master = local[*], app id = local-1665823413103). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.3.0 /_/ Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_342) Type in expressions to have them evaluated. Type :help for more information. scala> val txtFile = sc.textFile("output/part-r-00000") txtFile: org.apache.spark.rdd.RDD[String] = output/part-r-00000 MapPartitionsRDD[7] at textFile at <console>:23 scala> txtFile.filter(line => line.contains("apache")).foreach(println) #dfs.class=org.apache.hadoop.metrics.file.FileContext 1 (0 + 2) / 2] #jvm.class=org.apache.hadoop.metrics.file.FileContext 1 #jvm.class=org.apache.hadoop.metrics.spi.NullContext 1 #log4j.additivity.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=false 1 #log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.statedump=false 1 #log4j.appender.FSSTATEDUMP.layout=org.apache.log4j.PatternLayout 1 #log4j.appender.FSSTATEDUMP=org.apache.log4j.RollingFileAppender 1 #log4j.appender.HSAUDIT.layout=org.apache.log4j.PatternLayout 1 #log4j.appender.HSAUDIT=org.apache.log4j.DailyRollingFileAppender 1 #log4j.appender.HTTPDRFA.layout=org.apache.log4j.PatternLayout 1 #log4j.appender.HTTPDRFA=org.apache.log4j.DailyRollingFileAppender 1 .... rpc.class=org.apache.hadoop.metrics.ganglia.GangliaContext 1 rpc.class=org.apache.hadoop.metrics.ganglia.GangliaContext31 1 rpc.class=org.apache.hadoop.metrics.spi.NullContext 1 ugi.class=org.apache.hadoop.metrics.ganglia.GangliaContext 1 ugi.class=org.apache.hadoop.metrics.ganglia.GangliaContext31 1 ugi.class=org.apache.hadoop.metrics.spi.NullContext 1 scala> :quit root@2b017152af10:/hadoop-2.10.2#
次はSpark submitでmain.py
を実行してみます。
root@2b017152af10:/hadoop-2.10.2# spark-submit main.py --master yarn 22/10/15 08:35:30 INFO SparkContext: Running Spark version 3.3.0 22/10/15 08:35:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 22/10/15 08:35:30 INFO ResourceUtils: ============================================================== 22/10/15 08:35:30 INFO ResourceUtils: No custom resources configured for spark.driver. 22/10/15 08:35:30 INFO ResourceUtils: ============================================================== 22/10/15 08:35:30 INFO SparkContext: Submitted application: spark_sample 22/10/15 08:35:30 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 22/10/15 08:35:30 INFO ResourceProfile: Limiting resource is cpu .... 22/10/15 08:35:37 INFO DAGScheduler: Job 0 finished: collect at /hadoop-2.10.2/main.py:17, took 3.034884 s [('合計', 269.0), ('国語', 83.67), ('算数', 55.67), ('社会', 74.33), ('理科', 55.33)] 22/10/15 08:35:37 INFO SparkContext: Invoking stop() from shutdown hook 22/10/15 08:35:37 INFO SparkUI: Stopped Spark web UI at http://2b017152af10:4040 22/10/15 08:35:37 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 22/10/15 08:35:37 INFO MemoryStore: MemoryStore cleared 22/10/15 08:35:37 INFO BlockManager: BlockManager stopped 22/10/15 08:35:37 INFO BlockManagerMaster: BlockManagerMaster stopped 22/10/15 08:35:37 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 22/10/15 08:35:37 INFO SparkContext: Successfully stopped SparkContext 22/10/15 08:35:37 INFO ShutdownHookManager: Shutdown hook called 22/10/15 08:35:37 INFO ShutdownHookManager: Deleting directory /tmp/spark-266fe4d6-ea39-4e2e-b6f3-9c67dfed570c 22/10/15 08:35:37 INFO ShutdownHookManager: Deleting directory /tmp/spark-266fe4d6-ea39-4e2e-b6f3-9c67dfed570c/pyspark-05f99759-3773-41f5-b10f-f9aa1f82ef79 22/10/15 08:35:37 INFO ShutdownHookManager: Deleting directory /tmp/spark-74ca53c4-6b39-4ebc-b238-b971849ebaae root@2b017152af10:/hadoop-2.10.2#
まとめ
Dockerコンテナ上でHadoopとSparkを連携させてみました。
ユーザーがroot
のままですが、ちょっと動かしてみたい時などに便利そうですね。