社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

zkCli.sh にコマンドを渡して実行

zkCli.sh は zookeeper に標準で用意されているクライアントツール。
起動することで、対話式にコマンドを実行することが可能。

また、zkCli.sh にパイプでコマンドを渡してあげることで、1コマンドであれば実行させることも可能。
例えば、/test という znode を作成するならば

echo "create /test" | ./bin/zkCli.sh

こんな感じで作成することが出来る。

install されている python のバージョンチェック

コマンドラインで確認できるわけですが、スクリプト内でチェックして違う動作をさせたい時もあります。
version.py

#!/usr/bin/env python

import sys

IS_PYTHON2 = sys.version_info[0] == 2
if IS_PYTHON2:
  print 'Hello, version 2'
else:
  print 'Hello, version 3'

実行させてみると、確認ができてますね。

xx@xx-Ubuntu:~/bin/python$ python --version
Python 2.7.6

xx@xx-Ubuntu:~/bin/python$ ./version.py 
Hello, version 2

bash で変数にパイプを入れる

例えば、以下のような変数を用意して、
GREPS="grep start | grep end"

こんな感じで実行できないかなと。
less test.log | ${GREPS}

$ less test-log | ${GREPS}
grep: |: そのようなファイルやディレクトリはありません
grep: grep: そのようなファイルやディレクトリはありません
grep: end: そのようなファイルやディレクトリはありません

だめだった。単なる文字列として解釈されてしまっている。
コマンドとして解釈してほしいので、eval を指定してみると、

$ less test-log | eval ${GREPS}
test,start,end

動いてくれました。

ps コマンドの grep で grep 自身のプロセスを除外するとき

v オプションを使って

ps aux | grep java | grep -v grep

こんな感じでよく書いていたけど、

ps aux | grep [j]ava

これでも除外することが可能。

どうもこの場合、grep 自身のプロセスは "grep [j]ava" という文字列として判断されるので
grep java にはマッチしない即ち grep 自身のプロセスは表示対象にならないという動きになるみたいです。

Apache Spark ブロードキャスト変数について調べてみる

Apache Spark プログラミングの機能に、ブロードキャスト変数というものがあるらしい。

どういうものか、調べてみる。

出来ればどういったケースで有用であるかを理解したい。

 

概要

・ブロードキャスト変数は、ドライバで定義した定数を各エグゼキュータに転送するための変数

・アキュムレータと同じく、ドライバとエグゼキュータ間での共有変数

・すべてのエグゼキュータに送信される

 

用途

・大規模なリードオンリーのルックアップ用のテーブルを全ノードに送信

機械学習アルゴリズムの大規模な特徴ベクトルを全ノードに送信

 

前提

・Sparkでは、クロージャの中で参照されたすべての変数を自動的にエグゼキュータに送信する

→便利ではあるが、Sparkが操作の度にその変数を送信してしまう

→非効率になる場合がある

→ブロードキャスト変数を使えば、送信は一度で済む

→ただ小さいサイズの定数であれば、ブロードキャストする方がオーバヘッドが大きい可能性

 

使い方

1. Broadcast[T] を生成する。

2. 値にアクセスするには value プロパティを使う。

3. この変数は各ノードに1回だけ送信される。リードオンリーとして扱う。

→値を更新しても、他のノードには伝達されない

 

注意点

・ブロードキャスト変数の値は変更しないこと

→後から追加されたノードにブロードキャスト変数が送信されるため

→値が変更されていると、他のノードに送信済みの値と差が生じてしまう

 

実践

・ブロードキャスト変数を使う場合、使わない場合

https://github.com/blueskyarea/scala/blob/master/spark/broadcast/sample1.scala

 

 注意点(応用)

・大きなデータをブロードキャストする場合、値のシリアライズに時間がかかったり、ネットワーク経由での送信に時間がかかる可能性がある

→大きなボトルネック

→JavaAPIデフォルトのシリアライゼーションライブラリ(Java Serialization)は、配列やプリミティブ型以外を扱う場合、非常に効率が悪くなる

→別のライブラリを使用するか

→独自のシリアライゼーションルーチンを実装する

 

まとめ

・ブロードキャスト変数を使えば、各ノード上にキャッシュされたリードオンリーの変数を保持することができる

・大きな変数を共有したい場合、ブロードキャスト変数を使うことで通信回数が減り、通信コストを下げることが出来る

・ブロードキャストされたデータはシリアライズされた形式でキャッシュされ、各タスクを実行する前にデシリアライズされる

→明示的なブロードキャスト変数の生成は複数のステージを横断したタスクが同じデータを必要とするか、デシリアライズ形式のデータのキャッシュが重要な場合にのみ有用なことを意味する

 

ひとり言

・結局、どの程度の大きさのデータであれば、ブロードキャスト使った方がいいんだろうか?

・仮にデータサイズが小さくとも、各エグゼキュータと共有するべき情報(例えば認証情報など)は、ブロードキャスト変数として共有されているのだろうか?

Apache Spark アキュムレータについて調べてみる

Apache Spark プログラミングの機能に、アキュムレータというものがあるらしい。

どういうものか、調べてみる。

出来ればどういったケースで有用であるかを理解したい。

 

概要

・アキュムレータは主に情報を集計するためのものらしい。

・アキュムレータは書き込みだけが許された変数。

 

前提

・map() の関数などを Spark に渡す場合、外部のドライバプログラム内で定義された変数を使うことが出来る。

・ただし、エグゼキュータは各変数のコピーを受け取ることになるため、エグゼキュータ内での変更はドライバ側には反映されない。

RDDメソッドの実行はクロージャとして扱われるから

RDDメソッド内から、メソッドのスコープ外にあるドライバの変数を直接書き換えはできない

RDDメソッド内から書き換える必要がある場合に、アキュムレータを使う

→でも JavaScript で言うクロージャでは、普通に外部の変数書き換えられているよね?

・アキュムレータは、ドライバとエグゼキュータで共有可能な変数である。

 

用途

・もっとも一般的な用途の一つは、ジョブの実行中に発生したイベント数をカウントすること。

・同じ値を並列プログラムの複数の場所から加算処理を行う場合に便利?

 

実践

・アキュームレータを用いて、テキストファイル内の空行をカウント

https://github.com/blueskyarea/scala/blob/master/spark/accumulator/sample1.scala

 

注意点

・アキュムレータでのカウント結果が分かるのは、アクションを実行した後。(変換処理は遅延評価されるから)

・エグゼキュータからは、アキュムレータの value プロパティにアクセス出来ない。(ドライバからのみ)

 

まとめ

・アキュムレータは、ドライバプログラム内で sc.accumulator() メソッドを使って生成する。

・エグゼキュータは、アキュムレータの += メソッドを使って加算する。

・ドライバプログラムから、アキュムレータの値にアクセスするには、value メソッドを使う。

 

注意点(応用)

・Sparkは障害が発生したマシンや、低速なマシンがあった場合、その処理を他のノードで実行するように調整することがある

→同じ変換処理(関数)が何度も実行されるかもしれない

→アキュムレータの処理が重複されてしまうかもしれない

→変換処理でアキュムレータを使うのは、デバッグ用途に限定すべき

・アクション内で使われたアキュムレータは、必ず1回のみ実行される

→絶対的な値のカウンタが必要な場合は、変換処理ではなく、アクション内にアキュムレータを置く必要がある

 

実践2

・アキュムレータをアクション内に置く

https://github.com/blueskyarea/scala/blob/master/spark/accumulator/sample2.scala

 

実践3

・カスタムアキュムレータ

https://github.com/blueskyarea/scala/blob/master/spark/accumulator/sample3.scala

→思ったとおりの動きではない(なんでリストに0も追加されるのだろう?)

 

ひとり言

クロージャについても調べたけど、まだ理解が出来ていない気がする。

→外側の関数のスコープにある変数が更新出来ないというのは、Scalaでのクロージャであって、JavaScriptクロージャとはまた別なのでは?と思う

→外側の関数のスコープにある変数を使用出来るという意味ではScalaでもJavaScriptでも共通していると思う。

 

参考リンク

API

 https://spark.apache.org/docs/latest/api/java/org/apache/spark/Accumulator.html

http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka

 

・accumulator

http://imranrashid.com/posts/Spark-Accumulators/

http://bigdatafindings.blogspot.jp/2015/08/learning-spark-notes-advanced-spark.html

https://groups.google.com/forum/#!topic/spark-users/ydFmOx8RSrw

 

・Sample

https://github.com/blueskyarea/scala/blob/master/spark/accumulator/sample3.scala

 

クロージャ

http://amkt922.hatenablog.com/entry/2014/02/20/181059

http://qiita.com/mochizukikotaro/items/7403835a0dbb00ea71ae

http://dqn.sakusakutto.jp/2009/01/javascript_2.html