社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

Apache Spark の SparkConf について調べてみる

Spark のチューニングにおいて重要な要素の一つとなるであろう SparkConf について調べてみる。

 

概要

・SparkConf クラスは、Sparkにおける主要な設定の仕組みである。

・SparkConf のインスタンスは新しい SparkContext を生成するときに必要になる。

・SparkConf のインスタンスには、ユーザがオーバライドしたい設定オプションが、キー/値ペアとして含まれている。

 

SparkConf の構築例 (Scala)

val conf = new SparkConf()
conf.set("spark.app.name", "App name")
conf.set("spark.master", "local[*]")

val sc = new SparkContext(conf)

 

・set メソッドは自分自身のインスタンスを返すので、メソッドチェーンで連続して set が可能。

spark-submit 実行時に設定する場合
・spark-submit で設定された値は自動的に検出され、新しい SparkConf の構築時に設定される。
→ アプリケーション側では、空の SparkConf を構築したとしても、spark-submit での設定値が反映される
→ spark-submit でSpark の設定値を受け付ける汎用の --conf フラグがある

設定ファイルからのロード
・spark-submit はファイルから設定値をロード可能
→ デフォルトでは、conf/spark-defaults.conf ファイルを読み取ろうとする
→ ファイルの場所は spark-submit の --properties-file フラグでカスタマイズ可能

注意点
・SparkConfは、一旦 SparkContext のコンストラクタに渡された後は変更できない
→ Spark の設定は途中で変更出来ないということ
・複数の場所で同じプロパティに対する設定が行われた場合、優先順位がある。
→優先順位の高いものから
アプリケーションコード内でset -> spark-submit のフラグ ->
プロパティファイル値 -> デフォルト値
→ Web UI を使えば、有効になっている設定のリストが見られる

応用
For unit tests, you can also call new SparkConf(false) to skip loading external settings and get the same configuration no matter what the system properties are.
→ つまりは "VM引数を読み込まない" ということらしい

・spark-shell において、SparkConf を設定した場合、 sc.stop で sc を停止した後、改めて定義する必要があるらしい。

http://stackoverflow.com/questions/31397731/customize-sparkcontext-using-sparkconf-set-when-using-spark-shell/31402667#31402667

・使用できるプロパティ

https://spark.apache.org/docs/latest/configuration.html#available-properties

Apache Curator Framework のリトライ処理について調べてみる

zookeeper サーバと接続出来なかった場合、そのリトライ処理はどのように実装されるのか?

 

概要

・ Curator とは、Zookeeper 上に構築された一連の高レベルライブラリ

→複雑な接続処理を容易にするAPIを提供してくれるもの

 

前提

・ローカル環境で Zookeeper サーバを起動していない状態において、Curator を使用したアプリを起動してみる。

→当然、接続はされないはず

→設定されたタイムアウト時間、リトライ回数後に終了してくれるはず?

 

実践

・新しい curator クライアントを生成する。

https://github.com/blueskyarea/zookeeper/blob/master/curator/sample-newClient.java

→新しい curator クライアント生成時において、セッションタイムアウト時間、コネクションタイムアウト時間、リトライ回数などを定義出来るようだ。

 

まとめ

・セッションタイムアウト時間は、一度はコネクションはされたものの、サーバと通信出来なくなった場合のタイムアウト時間っぽい。

・コネクションタイムアウト時間は、サーバに初めて接続を試みたが、サーバと通信出来ない場合のタイムアウト時間と思う。

・セッションタイムアウト時間、コネクションタイムアウト時間はそれぞれ1試行内の猶予時間。タイムアウトしたとしても、リトライ回数が残って入れば改めてタイムアウト時間まで接続を試みる。

 

応用

・curator を生成するだけでは、zookeeper への接続は必ずしも発生しない?

→curator 生成するだけなら、 zookeeper 停止していてもエラーにならないかも

→znode を作成するタイミングで zookeeper に接続しにいくから、そこでエラーになる

・デフォルトのセッションタイムアウト時間、コネクションタイムアウト時間は?

https://github.com/apache/curator/blob/master/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java

→バージョンによって異なるとは思うが、セッションタイムアウトは60秒で、コネクションタイムアウトは15秒。

・リトライが終了したら、 例えば ConnectionLossException が発生する(はず)

 

参考リンク

http://curator.apache.org/curator-client/

http://curator.apache.org/getting-started.html

https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.html

https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html

 

eclipse ローカルヒストリーをほんの少しだけ試してみました

ローカルヒストリーは、eclipse 上で編集した履歴を残しておける機能ですね。
普段、変更履歴は git で確認しているので、個人的には全く使ったことがなかったのですが、便利そうです。

ローカルヒストリーの使い方

eclipse 上でファイルを右クリック、比較、ローカルヒストリーの順で選択していくと表示されます。
f:id:blueskyarea:20170706220031p:plain

こんな感じで、左側に最新ファイルの内容、右側に選択中の過去ファイルの内容が表示され、その差分が確認できます。

過去のファイル同士でも確認できる

ctrlキーを押しながら、2つのファイルを選択して、右クリック→互いのファイルを比較を選択すると、その2つのファイルの差分が確認できます。
f:id:blueskyarea:20170706221146p:plain

活躍しそうな場面

1. プログラムを色々といじっていたら、いつの間にか動かなくなった。どこが悪いか分からない。前の動く状態の時まで戻したい。
> 小さい変更の度に動作確認する習慣がある人にはこういうケースは少ないかもしれませんが、それでも確実に前の状態に戻せるので便利だと思います。

2. 新しい機能の実装が終わった。とりあえずどこを変更したのか、その差分をまとめて確認したい。
> git などのバージョン管理システムが導入されているのであれば、そちらでも十分な気がします。

3. とりあえず差分をみたい
> 難しいことは抜きで、私にとってはこれが一番の動機になるかもしれません。

まとめ
  • ヒストリーファイル同士でも確認できることは知らなかった。
  • IDEは多機能なのに、まだまだ単純なエディターとしか使えてないのが勿体ないなぁ。

Java 例外処理における 11 個の誤り

Java の例外処理における tips を見つけたんですが、自分にはとても分かりやすかったので、感想と共に書き残しておこうと思います。

11 Mistakes Java Developers make when Using Exceptions
https://www.linkedin.com/pulse/11-mistakes-java-developers-make-when-using-rafael-chinelato-del-nero

1. Exception クラスしか使わないこと

どんな例外をキャッチする場合も、Exception クラスで定義してしまっているということですね。
まぁ、例外が発生したらその種類に関わらず、アプリケーションを止めてしまうようなケースであれば、楽と言えば楽と思いますが。
別の見方をすれば、どんな例外が発生するのか開発者が想定出来ていないということなのかもしれません。

基本的には Exception クラスを継承した固有の例外クラスを定義してあげて、スロー・キャッチしてあげるようにするのが良いのかもしれません。
Exception クラスでキャッチするということは、本当に想定外の例外が発生した場合になるのかもしれません。

2. 固有の例外クラスを作り過ぎてしまうこと

確かに例外が発生する箇所ごとに、例外クラスを作っていたら例外クラスだらけになってしまいますね。
自分は基本的にキャッチ例外と非キャッチ例外の2つしか作らないのですが、この記事に書かれているように、ビジネス要件のレベルで例外を作成するのも良いなと思いました。
その例外の名前を見るだけで、大体の例外内容やビジネスインパクトなどが分かるくらいが良いですね。

例)
ApplicationFailException よりも ApiNotAvailableException の方が何が問題なのか分かります。

3. すべての例外キャッチ時にログを残すこと

ログを残すことは良いことに思えますが、例外をキャッチする度にログを残していると、最終的に同じエラー内容が何度も残されてしまうかもしれません。
ログを残すにしても、スロー元ですでにロギングされている内容でないか?また、スロー先でまとめてロギング出来ないかを考慮して残すようにしたいですね。

4. キャッチ例外(Exceptionクラスを継承)と、非キャッチ例外(RuntimeExceptionを継承)の違いを知らないこと

自分が初めて固有の例外クラスを作成したときは、とにかく非キャッチ例外を使うようにしていました。
理由は単に楽だからです。そこにそれぞれの例外の違いなどは特に意識していません。
非キャッチ例外でスローしておけば、呼び出し元でわざわざキャッチ処理を書く必要がありませんから。

それぞれの役割を知るようになってからは、使い分けるように(一応)意識するようになりました。
この使い方については、さまざまな意見があるようですが自分の中では以下の使い分けで落ち着いています。

リカバリ可能な例外であれば、キャッチ例外。
リカバリ不可能(あるいは不要)な例外であれば、非キャッチ例外。

キャッチ例外でスローしておけば、呼び出し元で改めて try - catch しないといけないので。
意図的にその例外をハンドリングしていることを明示していることにもなります。

5. 無音の例外があること

Silencing を無音と直訳してしまってますが、つまりは例外が発生しないところでキャッチしようとしていることですね。
以下は極端な例ですが、全く意味のない処理になります。

try {
  // Nothing to do
} catch (Exception e) {
  // Nothing to do
}

こんなことするわけない!と思いがちですが、「どんな例外が発生するか分からないから、とりあえず try - catch しとこう」みたいなコーディングは、同じことをしているのかもしれません。
自分も覚えがあります。

6. "throw early, catch late" の原則に従っていないこと

この原則は有名らしいのですが、聞いたことがなかったです。
自分なりに調べたところ、
1) throw early -> 問題の原因を見つけやすくするために、出来るだけ早く例外を投げること
2) catch late -> その例外の対処は、高い階層であればあるほど対処しやすい(どこからリトライさせるのか)

というように理解しています。

7. 例外に対して、明確なメッセージを使っていない

単に「エラーが発生しました」「処理に失敗しました」では何が悪かったのか分からない。
より具体的に「◯◯のデータベースの××テーブルが見つかりません」のように書いてあった方が原因がすぐに分かる。

8. 例外を処理した後に、クローズ系の後処理をしていない

せっかく try - catch - finally があるのだから、finally においてデータベースコネクションのクローズなんかもしておくと、パフォーマンス的には良いかもしれません。

try(Dao mysqlDao = new Dao()) {}

try-with-resources で書いておければ、毎回 close 書かなくて済みます。

9. javadoc に例外についての文書が残されていない

個人的に一番気をつけたいと思った項目。
何故、その例外を作ったのか?って全く書いていないので、意識して書くようにしたいです。

10. Stacktrace(トレース)を失うこと

英語の訳は Never lose the Stacktrace -> Stacktrace を決して失わないこと のように思いますが、文書からするとトレースを失うことが問題のように思えます。
個人的にはよく「例外の情報源を握りつぶす」と呼んでいます。

11. 固有例外が階層で整理されていないこと

Java の例外クラスが階層で定義されているのと同じように、固有で作成する例外についても階層で整理するようにしなさいってことですね。
自分は2つ以上の固有例外クラスを作ったことがないので、あまり実感はないのですが、例外の発生箇所(階層)によって例外を定義するのであれば、やはり階層を意識した作りにした方が管理しやすいだろうなと思います。

まとめ
  • 例外処理は面倒だけど、意味のある例外処理を書けるということは、そのプログラムを理解出来ているということだと思う
  • 意味のある例外処理を書くには、何故その例外が発生するのかを理解する必要がある
  • 理由をコメント javadoc で残すようにしよう

maven Dependency trees が便利

Maven の便利な機能の一つとして、ライブラリの依存性を確認できるものがあります。

あるプロジェクトの中で色んなライブラリを活用していると、そのライブラリ間の依存性が問題になり、プログラムが正常に動作しない場合があったりします。

ライブラリAとライブラリBが同じライブラリCに依存している、けどそのバージョンが違うみたいなこともあります。

そういった時にどのライブラリに問題があるのか? 調査するのに本当に時間がかかったりします。

ただ、次のコマンドを使えば簡単に依存ライブラリを取得することが出来ます。

mvn dependency:tree

このコマンドは、直接依存しているライブラリや間接的に依存しているライブラリを表示してくれます。
こんな感じです。
hbase-client は色んなライブラリを抱えていますね。

[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ hbase-mini-monitor ---
[INFO] com.blueskyarea:hbase-mini-monitor:jar:1.0-SNAPSHOT
[INFO] +- org.apache.hbase:hbase-client:jar:1.1.2:compile
[INFO] |  +- org.apache.hbase:hbase-annotations:jar:1.1.2:compile
[INFO] |  |  +- jdk.tools:jdk.tools:jar:1.7:system
[INFO] |  |  \- log4j:log4j:jar:1.2.17:compile
[INFO] |  +- org.apache.hbase:hbase-common:jar:1.1.2:compile
[INFO] |  |  +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] |  |  \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
[INFO] |  +- org.apache.hbase:hbase-protocol:jar:1.1.2:compile
[INFO] |  +- commons-codec:commons-codec:jar:1.9:compile
[INFO] |  +- commons-io:commons-io:jar:2.4:compile
[INFO] |  +- commons-lang:commons-lang:jar:2.6:compile
[INFO] |  +- commons-logging:commons-logging:jar:1.2:compile
[INFO] |  +- com.google.guava:guava:jar:12.0.1:compile
[INFO] |  |  \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] |  +- io.netty:netty-all:jar:4.0.23.Final:compile
[INFO] |  +- org.apache.zookeeper:zookeeper:jar:3.4.6:compile
[INFO] |  |  +- org.slf4j:slf4j-api:jar:1.6.1:compile
[INFO] |  |  \- org.slf4j:slf4j-log4j12:jar:1.6.1:compile
[INFO] |  +- org.apache.htrace:htrace-core:jar:3.1.0-incubating:compile
[INFO] |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
[INFO] |  |  \- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
[INFO] |  +- org.jruby.jcodings:jcodings:jar:1.0.8:compile
[INFO] |  +- org.jruby.joni:joni:jar:2.1.2:compile
[INFO] |  +- org.apache.hadoop:hadoop-auth:jar:2.5.1:compile
[INFO] |  |  +- org.apache.httpcomponents:httpclient:jar:4.2.5:compile
[INFO] |  |  |  \- org.apache.httpcomponents:httpcore:jar:4.2.4:compile
[INFO] |  |  \- org.apache.directory.server:apacheds-kerberos-codec:jar:2.0.0-M15:compile
[INFO] |  |     +- org.apache.directory.server:apacheds-i18n:jar:2.0.0-M15:compile
[INFO] |  |     +- org.apache.directory.api:api-asn1-api:jar:1.0.0-M20:compile
[INFO] |  |     \- org.apache.directory.api:api-util:jar:1.0.0-M20:compile
[INFO] |  +- org.apache.hadoop:hadoop-common:jar:2.5.1:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-annotations:jar:2.5.1:compile
[INFO] |  |  +- commons-cli:commons-cli:jar:1.2:compile
[INFO] |  |  +- org.apache.commons:commons-math3:jar:3.1.1:compile
[INFO] |  |  +- xmlenc:xmlenc:jar:0.52:compile
[INFO] |  |  +- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] |  |  +- commons-net:commons-net:jar:3.1:compile
[INFO] |  |  +- commons-el:commons-el:jar:1.0:runtime
[INFO] |  |  +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] |  |  |  +- commons-digester:commons-digester:jar:1.8:compile
[INFO] |  |  |  |  \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] |  |  |  \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] |  |  +- org.apache.avro:avro:jar:1.7.4:compile
[INFO] |  |  |  +- com.thoughtworks.paranamer:paranamer:jar:2.3:compile
[INFO] |  |  |  \- org.xerial.snappy:snappy-java:jar:1.0.4.1:compile
[INFO] |  |  +- com.jcraft:jsch:jar:0.1.42:compile
[INFO] |  |  \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] |  |     \- org.tukaani:xz:jar:1.0:compile
[INFO] |  +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.5.1:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-yarn-common:jar:2.5.1:compile
[INFO] |  |  |  +- org.apache.hadoop:hadoop-yarn-api:jar:2.5.1:compile
[INFO] |  |  |  \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
[INFO] |  |  |     +- javax.xml.stream:stax-api:jar:1.0-2:compile
[INFO] |  |  |     \- javax.activation:activation:jar:1.1:compile
[INFO] |  |  \- io.netty:netty:jar:3.6.2.Final:compile
[INFO] |  \- com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile
[INFO] +- com.google.code.gson:gson:jar:2.2.4:compile
[INFO] +- org.eclipse.jetty:jetty-server:jar:9.1.3.v20140225:compile
[INFO] |  +- javax.servlet:javax.servlet-api:jar:3.1.0:compile
[INFO] |  +- org.eclipse.jetty:jetty-http:jar:9.1.3.v20140225:compile
[INFO] |  |  \- org.eclipse.jetty:jetty-util:jar:9.1.3.v20140225:compile
[INFO] |  \- org.eclipse.jetty:jetty-io:jar:9.1.3.v20140225:compile
[INFO] \- junit:junit:jar:3.8.1:test

これで調査にかける時間が削減できるといいなと思います。

Java で何故 4.0 - 3.10 は 0.90 ではない ?

twitter で流れてきたツイートで以下のようなものがありました。

Why 4.0 - 3.10 not equal to 0.90 ?

public static void main(String args[]) {
  double x = 4.0 - 3.10;
  System.out.println(x == 0.90);
}

結果: false

ツイートに対する返信で、「double や float を比較するのに == を使ってはならない。」というものがありましたが・・・。
実際にこの値を取得してみると [0.8999999999999999] という値が得られます。

なぜ誤差が生じるのか?
  • float型とdouble型は、2進浮動小数点算術を行う。
  • 2進浮動小数点算術は、科学計算と工学計算のために設計されており、広範囲の大きさの数値に対して近似を取得する。

つまりは、2進浮動小数点 で演算しているので、広い範囲の桁数を扱うことになった場合は、近似値を取得しようとするので、その時点で正確な値ではなくなってしまうということだと思っています。

誤差が生じないケース?

0.5(10進数) -> 0.1(2進数)
0.5 のように、2進数に有効桁で変換出来る数値であれば問題ない。

double y = 0.5 - 0.25;
System.out.println(y);  // 0.25
誤差が生じるケース?

0.1(10進数) -> 0.00011001100110011....(2進数)
0.1 のように、2進数では表現ができない(循環小数無限小数)数値だと、どこかで桁の丸めが発生するので、その時点で誤差(丸め誤差)が発生することになる。

double z = 0.30 - 0.20;
System.out.println(z);	// 0.09999999999999998

※ただし、答えが 0.1 になる全ての演算で誤差になる訳ではなさそうです

double p = 0.20 - 0.10;
System.out.println(p);	// 0.1

どこの桁で丸めが発生するかに依存するように思います。

   0.00110011001100 (0.2)
-) 0.00011001100110 (0.1)
   0.00011001100110 (0.1)
結論
  • Java で float型とdouble型の計算結果は正確な値を保証しない

BigDecimalなどのBCDを使うか、許容可能であれば四捨五入で対応する

Scala プログラミング練習(バブルソート)

色んな便利なライブラリのお陰で、アルゴリズムの勉強をせずとも複雑な処理が実現できています。
ただ、やはりアルゴリズムってプログラミングをする上では知っておきたいところです。
正直なところ、アルゴリズムの勉強ってきちんとやったことがないので
プログラミングを通じて学んでみようかなと思ってます。

バブルソートって?
まずはバブルソートですが、手書きでどんな流れになるか手元で書いてみたところ。
f:id:blueskyarea:20170701012329p:plain

ひどい図になりましたが、赤が数字の大小を比較しているところ、青がその数字の居場所が確定したところになります。
文字で流れを書くと以下のような感じです。
1. リストの最初の要素と次の要素を比較。
2. 左の要素の方が大きかったら、右の要素と交換。
3. 一番左の要素以外を新たなリストとし、1 -> 2 -> 3 を繰り返す。
4. 一番右の要素までチェックしたら、一番右の要素以外を新たなリストとし、1 -> 2 -> 3 -> 4 を繰り返す。
5. 4 で作り新たなリストのサイズが 1 になったら終了。

単純なことなはずなのに、言葉で表現するのって難しい。

最初に書いてみたコード

def bubbleSort1(list: List[Int]): List[Int] = {
  def switchElement(list2: List[Int]): List[Int] = {
    val headValue = list2.head
    val tailList = list2.tail
    var switchedList = headValue :: tailList
      
    if (headValue > tailList.head) {
      switchedList = tailList.head :: headValue :: tailList.tail
    }
      
    if (switchedList.tail.length > 1) {
      switchedList = switchedList.head :: switchElement(switchedList.tail)
    }
    switchedList
  }
    
  if (list.length > 1) {
    var latestSwitchedList = switchElement(list)
    if (latestSwitchedList.init.length > 1) {
      latestSwitchedList = bubbleSort1(latestSwitchedList.init) :+ latestSwitchedList.last
    }
    latestSwitchedList
  } else {
    list  
  }
}

動くには動きますが、var を使ってしまっているのが気持ち悪かったので、そこだけ書き直しました。

def bubbleSort2(list: List[Int]): List[Int] = {
  def switchElement(list2: List[Int]): List[Int] = {
    val headValue = list2.head
    val tailList = list2.tail
     
    val switchedList = 
      if (headValue > tailList.head) tailList.head :: headValue :: tailList.tail
      else headValue :: tailList
      
    if (switchedList.tail.length > 1) switchedList.head :: switchElement(switchedList.tail)
    else switchedList
  }
    
  if (list.length > 1) {
    val latestSwitchedList = switchElement(list)
    if (latestSwitchedList.init.length > 1) bubbleSort2(latestSwitchedList.init) :+ latestSwitchedList.last
    else latestSwitchedList
  } else {
    list  
  }
}

感想
久しぶりに scala で書いたので楽しかったし、再帰処理で書けたのが良かった。
case match とか使えばもっとシンプルに書けるのかも。
思っていたよりも難しいが、他のアルゴリズムにもチャレンジしてみたい。

ソースコード