社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

(自分用) Apache Spark QUIZ 1

Q1. In spark-shell, How to specify multiple dependencies using --packages for spark-submit?
(spark-shell において、複数の依存パッケージをロードするには、--packages でどのように指定すれば良いか ?)

Q2. What is this warning meaning in spark streaming?
(spark-streaming においてこの warning は何を意味するか ?)

replicated to only 0 peer(s) instead of 1 peers

Q3. What is this warning meaning in spark ?
(spark においてこの warning は何を意味するか ?)

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

Q4. What is "window length" and "slide interval" in spark-streaming?
(spark-streaming におけるウインドウ集計で、「ウインドウ幅」と「スライド幅」それぞれの意味は ?)

Q5. How to create pair-RDD(key, value) ?
(ペア RDD をどのように作成するか?)

val pairRdd = rdd.??? { r => {
(r, 1)
}}

Q6. What is the final response ?
(以下のコマンドの最終レスポンスは ?)

scala> rdd.collect
res1: Array[String] = Array(a, b, c, b)

scala> rdd.map { r => {
     | if (r != "b")(r,1)
     | }}.collect
res2: Array[Any] = ???

Q7. How to save as text from RDD ?
RDDのデータをテキストとして出力するには?)

Q8. What is the final response ?
(以下のコマンドの最終レスポンスは ?)

scala> rdd.collect
res1: Array[String] = Array(a, b, c, b)

scala> rdd.map { r => {
     | if (r != "b")(r,1)
     | else null
     | }}.collect
res2: Array[(String, Int)] = ???

Q9. What is the final response ?
(以下のコマンドの最終レスポンスは ?)

scala> pairRdd.collect
res1: Array[(String, Int)] = Array((a,1), (b,1), (c,1), (b,1))

scala> pairRdd.reduceByKey((x,y) => x+y).collect
res1: Array[(String, Int)] = ???

Q10. What are these commands meaning in spark-streaming ?
(spark-streaming におけるこれらのコマンドの意味は ?)

ssc.start()
ssc.awaitTermination()

(解答例)
=========================================-
A1. A list of packages should be separated using commas without whitespaces
(空白なしでコンマ区切りで指定すること)

--packages org.apache.bahir:spark-streaming-twitter_2.11:2.3.2,com.atilika.kuromoji:kuromoji-ipadic:0.9.0

A2. The warning in this case means that incoming data from stream are not replicated at all. The reason for that may be that you run the app with just one instance of Spark worker or running in local mode.
(このケースではデータのレプリカが作成されていない警告であると考えられる。Spark の Worker のインスタンスが1つだけしか起動していない場合に発生しうる)

A3. The error indicates that you cluster has insufficient resources for current job.Since you have not started the slaves i.e worker.
(Worker のインスタンスが存在していないため、そのジョブが開始できないことを意味していると考えられる)

A4. window length means "Scope of one window aggregation", slide interval means "Time difference until the next window tabulation starts".
(一回のウインドウ集計の集計範囲を「ウインドウ幅」、次のウインドウ集計が開始するまでの時間差を「スライド幅」)

A5.

val pairRdd = rdd.map { r => {
(r, 1)
}}

A6.

res2: Array[Any] = Array((a,1), (), (c,1), ())

A7.

rdd.saveAsTextFile("file:///share/outout/result")

※part-xxxx というファイル名で保存される

A8.

res2: Array[(String, Int)] = Array((a,1), null, (c,1), null)

A9.

res1: Array[(String, Int)] = Array((b,2), (a,1), (c,1))

A10.
ssc.start()
>Starts streaming calculation.
(計算処理を開始)

ssc.awaitTermination()
>Waits till closed context or thrown exception.
(stopメソッドによるcontextの終了や例外が発生するまで待つ)

(自分用) Elasticsearch Quiz 1

Q1. Elasticsearch 用のメイン設定ファイル名は ?

Q2. データやログのディレクトリを Elasticsearch がインストールされているディレクトリの外部に配置することが推奨される理由は ?

Q3. jvm.options ではヒープサイズの設定が重要だが、最低でも ? GB以上の設定が要求されるか ? (バージョン 7.6.3)

Q4. Elasticsearchはデータ量をどのようにスケーリングしますか?

Q5. Shard とは何か?

Q6. Elasticsearch バージョン7以上では、デフォルトでいくつの Shard が1インデックスに追加されるか?

Q7. Elasticsearch はどのようにして高可用性を保証しているか?

Q8. Primary shard とは何か?

Q9. replica shard とは何か?

Q10. replication group とは何か?

Q11. 1つの shard 毎ににデフォルトの replica 数はいくつか?

Q12. 1つの index 毎ににデフォルトの shard 数(primary shard と replica shard を合わせて)はいくつか ?

(解答例)
=================================
A1.
elasticsearch.yml

A2.
Elasticsearch のバージョンアップ時がしやすい(ディレクトリが独立しているため)

A3.
1 GB 以上

A4.
Sharding
ノードを追加することも助けにはなるが、ある程度以上になると Sharding が無いと対処できない。

A5.
Index データの一部
Sharding によって、Index が複数の Shard に分割されます。

A6.
1つ
バージョン7未満では5つがデフォルトであった。

A7.
Replication (レプリケーション)
Replication によって、仮に1台のノードが壊れたとしても、他のノードからデータを取得し続けることが可能。
※当然ながらそのクラスタは2台以上で構成されていなければならない

A8.
documentはprimary shardに保存される。
documentをインデックスするとき、まずprimary shardに保存され、その後replica shardに反映される。

A9.
primary shardのコピー。primary shardは0個以上のreplicaを持てる。
primaryが死んだ時にreplicaがprimaryに昇格できるため、フェイルオーバーできる。

A10.
primary shardの replica(replica shard) とそのprimary shard自身の集合体。

A11.
1つ

A12.
2つ (1 primary shard と 1 replica shard)

Elasticsearch のディレクトリ構成を見てみる

Elasticsearch のディレクトリ構成

バージョン 7.6.2 をモデルにしています。
デフォルトではインストール直後は以下のようなディレクトリ構成になっています。

/usr/share/elasticsearch # ls
LICENSE.txt  NOTICE.txt  README.asciidoc  bin  config  data  jdk  lib  logs  modules  plugins
bin
# ls bin
elasticsearch           elasticsearch-env            elasticsearch-plugin           elasticsearch-sql-cli-7.6.2.jar  x-pack-watcher-env
elasticsearch-certgen   elasticsearch-env-from-file  elasticsearch-saml-metadata    elasticsearch-syskeygen
elasticsearch-certutil  elasticsearch-keystore       elasticsearch-setup-passwords  elasticsearch-users
elasticsearch-cli       elasticsearch-migrate        elasticsearch-shard            x-pack-env
elasticsearch-croneval  elasticsearch-node           elasticsearch-sql-cli          x-pack-security-env
config
# ls config/
elasticsearch.keystore  elasticsearch.yml  jvm.options  log4j2.properties  role_mapping.yml  roles.yml  users  users_roles
elasticsearch.yml

Elasticsearch 用のメイン設定ファイル。
1. cluster.name は設定することが推奨されている(明示するため)

# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
#cluster.name: my-application
#

2. node.name は設定することが推奨されている(明示するため)

# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
#node.name: node-1
#
# Add custom attributes to the node:
#
#node.attr.rack: r1

3. データやログは Elasticsearch のディレクトリの外部に設定することが推奨される。
※elasticsearch バージョンアップ時に便利

# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
${path.data}
#
# Path to log files:
#
${path.logs}

4. Elasticsearch を別のIPアドレスにバインドが可能

# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
#network.host: 192.168.0.1
#
# Set a custom port for HTTP:
#
#http.port: 9200
#
# For more information, consult the network module documentation.

5. どれだけの数の Elasticsearch インスタンスが接続されるか設定可能

# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
#
# For more information, consult the discovery and cluster formation module documentation.
jvm.options

Elasticsearch は Java でビルドされており、JVM 内で動作するため、重要な設定。
特にヒープサイズの設定が重要だが、最低でも1GB以上の設定が要求される。

# cat config/jvm.options 
## JVM configuration

################################################################
## IMPORTANT: JVM heap size
################################################################
##
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
##
## -Xms4g
## -Xmx4g
##
## See https://www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html
## for more information
##
################################################################

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

-Xms1g
-Xmx1g
(省略)
log4j2.properties

Elasticsearch は log4j2 のロギングフレームワークを使用している。

role_mapping.yml roles.yml users users_roles

これらの設定は Kibana 内で設定することが推奨されている。

jdk

jdkディレクトリには、Elasticsearchに同梱されているOpenJDKが含まれています。

lib

「lib」ディレクトリには、Elasticsearchに必要な多数の依存関係が含まれています。
"log4j" logging フレームワークや、Apache Lucene など。

module

「module」ディレクトリには、Elasticsearchにいくつかの追加機能を提供する多数の組み込みモジュールが含まれています。
これらはデフォルトで有効になっているようです。
たとえば、X-Pack 機能が配置されているのが分かります。

# ls modules/
aggs-matrix-stats  ingest-user-agent  percolator             transform         x-pack-deprecation  x-pack-monitoring
analysis-common    lang-expression    rank-eval              transport-netty4  x-pack-enrich       x-pack-rollup
flattened          lang-mustache      reindex                vectors           x-pack-graph        x-pack-security
frozen-indices     lang-painless      repository-url         x-pack-analytics  x-pack-ilm          x-pack-sql
ingest-common      mapper-extras      search-business-rules  x-pack-ccr        x-pack-logstash     x-pack-voting-only-node
ingest-geoip       parent-join        spatial                x-pack-core       x-pack-ml           x-pack-watcher
plugins

「plugins」ディレクトリは最初は空です。
何らかのプラグインを追加したい場合に、ここに追加可能です。
module との違いは、module は Elasticsearch に同梱されているものであるのに対して、plugins はカスタム機能の追加を提供してくれます。
そのため、サードパーティ製や自分たちで作成したものを追加できます。
※plugins は取り外し可能ですが、module は取り外しません(できません)

Docker でコンテナ技術を学ぶ

Docker を通して、学んだことを書いていきます。

Docker基礎

docker-compose

用途

(自分用)docker-compose Quiz 1

Q1. docker-compose とは何をするためのツール?

Q2. docker-compose でコンテナの作成と開始を行うコマンドは?

Q3. docker-compose でコンテナを開始前にイメージを構築すオプションは?

Q4. docker-compose でバックグラウンドでコンテナを実行するオプションは?

Q5. docker-compose でコンテナをスケールさせるオプションは?

(回答例)
===============================

A1.

複数のコンテナで構成されるアプリケーションについて、Dockerイメージのビルドや各コンテナの起動・停止などをより簡単に行えるようにするツール。

A2.

up
Ex) docker-compose up

A3.

--build
Ex) docker-compose up --build

A4.

-d
Ex) docker-compose up --build -d

A5.

--scale
Ex) docker-compose up -d --scale {service_name}=2

Docker で centos 7 をベースに Hadoop(version3.1.3) コンテナ作成したときの問題

Docker で centos 7 をベースに Hadoop(version3.1.3) コンテナ作成してみました。
ただ作成してみたものの、使いづらいと感じている面があります。

理由1

systemctlを使ったサービスの自動起動を行うには、コンテナ起動時に/sbin/initを指定しなければならない

また、その指定のほかに管理者権限モードのオプション(--privileged、または、--cap-add=SYS_ADMIN)が必要になる。
しかしながら、管理者権限モードで実行されたDockerコンテナは、ホストOSのハードウェア資源(デバイスなど)へのアクセスが無制限に許可されるため、セキュリティ面において注意が必要。
自分の場合は、docker-compose を使っているため、以下のように指定する必要があります。

privileged: true
command: /sbin/init

Hadoop においては、hdfs(Namenode, Datanode) を起動するための ssh サービスを起動するため、この設定が必須でした。
個人のローカル環境においてのみであれば、セキュリティ面に関しては気にする必要はないかもしれません。

理由2

command: /sbin/init を指定するため、本来使いたいスタートアップ用のスクリプトが起動できなかった

コンテナの起動時に実行させたいスクリプト(start.sh) がある場合、本来であれば command や endpoint に指定することで起動させることができますが、command: /sbin/init が必須であるため、start.sh を指定してあげることができませんでした。
以下のように指定したり、start.sh 内に /sbin/init を記述するなど試行錯誤しましたが結果動作させることが出来ず。

   command: /sbin/init && /opt/hadoop/current/start.sh
   command: bash -c "/sbin/init && /opt/hadoop/current/start.sh"
   command: bash -c "exec /sbin/init && /opt/hadoop/current/start.sh"

今のところ、一旦コンテナを起動させた後に、start.sh を実行してあげる必要があります。
docker-compose を実行した後に start.sh を実行させるラッパーシェルを用意してあげれば手間にはならないかもしれません。

とは言え、これらの問題はベースとなるDocker Image(今回は centos7 など systemctl を採用)に依存するものであり、他のDocker Image を使用することで問題が回避できる可能性があるので、別の Docker Imageでも試行したいと考えています。

jps: command not found

Docker コンテナで Java をインストールしたが、jps コマンドが使用できなかった。
Dockerfile においてインストールするパッケージに (-devel) を指定すればよい。
※Docker自体はこの問題に関係ない

(変更前: jps つかえない)

RUN yum install -y java-1.8.0-openjdk

(変更前: jps つかえる)

RUN yum install -y java-1.8.0-openjdk-devel