社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

SparkSql で5教科の成績に評価を付けてみる

SparkSqlはSpark上でSQLを利用するためのコンポーネントです。
Sparkの分散環境上で大量データに対して高速なSQLを実行できます。

SparkSqlでソートする練習のために、5教科の成績に評価を付けるプログラムを書いてみました。
例えば、今5人(A君~E君)がいるとして、英語の成績が以下であるとします。
A君:100点
B君:80点
C君:60点
D君:40点
E君:20点

その順位に応じた評価点をつけます。
上記の場合、A君:5点、B君:4点、C君:3点、D君:2点、E君:1点という感じです。
これを5教科(国語、数学、英語、理科、社会)に対して行い、合計の評価点を計算します。

元の点数そのまま使えばいいやんという感じでもありますが、ソートして連番を付ける練習がしたかったので、こんな計算をするようにしています。

ソースコードはこちらgithub.com

SparkSql で扱うエンティティには getter を定義しなければならない!

今回、成績を格納するエンティティとして、RecordData というクラスを定義しましたが、必ず各フィールドに対して getter を定義する必要がありました。
最初は定義していなかったのですが、DataFrame に対して、printSchema() を実行しても、select 文を発行しても何のデータも得られませんでした。
どうやら、getter 経由でデータを取得してくる仕様のようです。

SparkSql でも Window関数が使える!

select 文の結果に対して連番を付与する場合、SQLではWindow関数のrow_numberという機能がありますが、SparkSqlでもそれが使えるようです。
DataFrame には zipWithIndex 関数もありませんし、これは便利ですね。

df.select(row_number().over(Window.partitionBy().orderBy(orderElement1))
Window関数を使うためには HiveContext が必要!

HiveContext なしで使おうとすると、以下のエラーメッセージが表示されてしまいました。

Exception in thread "main" org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;

なので、SQLContext は HiveContext から作りました。

SQLContext sqlCtx = new HiveContext(jsc);

ただ、コンテキストの生成処理に体感でも数秒間時間がかかっている気がします。

実行確認

printSchema() の実行結果。

root
 |-- english: integer (nullable = true)
 |-- japanese: integer (nullable = true)
 |-- mathematics: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- science: integer (nullable = true)
 |-- social: integer (nullable = true)

日本語の成績。このように点数の多い人から、高い score が付けられています。

+-----+-------+--------+
|score|   name|japanese|
+-----+-------+--------+
|    1| Edison|       3|
|    2|Clinton|      13|
|    3|    Abe|      51|
|    4| Donald|      63|
|    5|   Bush|      66|
+-----+-------+--------+

数学の成績。みんな低すぎでしょ。

+-----+-------+-----------+
|score|   name|mathematics|
+-----+-------+-----------+
|    1| Donald|          2|
|    2| Edison|          2|
|    3|   Bush|          6|
|    4|    Abe|          9|
|    5|Clinton|         40|
+-----+-------+-----------+

英語の成績。単なる乱数とは言え、0点が出てしまった。

+-----+-------+-------+
|score|   name|english|
+-----+-------+-------+
|    1| Donald|      0|
|    2|    Abe|     29|
|    3|Clinton|     40|
|    4|   Bush|     49|
|    5| Edison|     51|
+-----+-------+-------+

社会の成績。

+-----+-------+------+
|score|   name|social|
+-----+-------+------+
|    1| Donald|    12|
|    2|Clinton|    53|
|    3| Edison|    72|
|    4|    Abe|    84|
|    5|   Bush|    98|
+-----+-------+------+

最後は、理科の成績。

+-----+-------+-------+
|score|   name|science|
+-----+-------+-------+
|    1| Edison|      6|
|    2| Donald|      7|
|    3|Clinton|     61|
|    4|   Bush|     69|
|    5|    Abe|     90|
+-----+-------+-------+

そして、総合成績。totalScore は綺麗に3の倍数になりました。

+-------+----------+--------+-----------+-------+------+-------+
|   name|totalScore|japanese|mathematics|english|social|science|
+-------+----------+--------+-----------+-------+------+-------+
|   Bush|        21|      66|          6|     49|    98|     69|
|    Abe|        18|      51|          9|     29|    84|     90|
|Clinton|        15|      13|         40|     40|    53|     61|
| Edison|        12|       3|          2|     51|    72|      6|
| Donald|         9|      63|          2|      0|    12|      7|
+-------+----------+--------+-----------+-------+------+-------+
まとめ

今までは、DataFrame で出来ないことを RDD に変換してから処理して、また DataFrame に戻すみたいなことをしていたけど、SparkSql(DataFrame) のままでも出来ることは思っていたより多い。
変換処理のオーバーヘッドも減らせるだろうし、使っていくのが楽しみになってきました。

Apache ZooKeeper の基本について調べてみる

概要

・分散アプリケーションを構築する上では、同期、設定管理、グルーピング、名前管理などの機能が必要となる。

→これらの実装は複雑で面倒なもの

→Zookeeper はこれらの機能を提供してくれる

・ただし、Zookeeper が提供するのは、基本機能要素そのものではなく、基本機能要素を独自に実装できるようにするための、ファイルシステム的なAPIを提供する。

・基本機能要素を実装したものをレシピと呼ぶ。

→レシピは znode と呼ばれる小さいデータノードの操作で構成される

・ ZooKeeper はJavaで書かれており、 jvm 上で動作可能。

 

API

・ znode を生成、削除したり、znode にデータを格納するようなものが提供されている

・ ZooKeeper クライアントは、ZooKeeper サービスに接続し、セッションを確立する。

→セッションを通してAPIを呼び出す

 

注意点

・ znode のデータは部分的に書いたり、読み出したりすることはできない

→完全置き換え、完全読み出し

→原子性の確保とも言えるかも

 

znode のモード

・永続(persistent) znode

→意図的に delete しないと削除されない

→これを作成したクライアントセッションが無くなったとしても、維持しなければならない場合に役立つ

 

・短命(ephemeral) znode

→作成したクライアントがクラッシュしたり、接続がクローズされると自動的に削除される

→これを作成したクライアントセッションが有効な間だけ存在するべき情報を保持する場合に役立つ

→クライアントから意図的に削除することも可能。作成したクライアントでなくても削除可能。

→基本的に消えてしまう性質から、短命 znode には子ノードは作成できない。(今も?)

 

・シーケンシャル znode

永続 znode、短命 znode 何れもシーケンシャルにすることが可能。

→znode にユニークな番号を付与するのに便利

Apache Spark の SparkConf について調べてみる

Spark のチューニングにおいて重要な要素の一つとなるであろう SparkConf について調べてみる。

 

概要

・SparkConf クラスは、Sparkにおける主要な設定の仕組みである。

・SparkConf のインスタンスは新しい SparkContext を生成するときに必要になる。

・SparkConf のインスタンスには、ユーザがオーバライドしたい設定オプションが、キー/値ペアとして含まれている。

 

SparkConf の構築例 (Scala)

val conf = new SparkConf()
conf.set("spark.app.name", "App name")
conf.set("spark.master", "local[*]")

val sc = new SparkContext(conf)

 

・set メソッドは自分自身のインスタンスを返すので、メソッドチェーンで連続して set が可能。

spark-submit 実行時に設定する場合
・spark-submit で設定された値は自動的に検出され、新しい SparkConf の構築時に設定される。
→ アプリケーション側では、空の SparkConf を構築したとしても、spark-submit での設定値が反映される
→ spark-submit でSpark の設定値を受け付ける汎用の --conf フラグがある

設定ファイルからのロード
・spark-submit はファイルから設定値をロード可能
→ デフォルトでは、conf/spark-defaults.conf ファイルを読み取ろうとする
→ ファイルの場所は spark-submit の --properties-file フラグでカスタマイズ可能

注意点
・SparkConfは、一旦 SparkContext のコンストラクタに渡された後は変更できない
→ Spark の設定は途中で変更出来ないということ
・複数の場所で同じプロパティに対する設定が行われた場合、優先順位がある。
→優先順位の高いものから
アプリケーションコード内でset -> spark-submit のフラグ ->
プロパティファイル値 -> デフォルト値
→ Web UI を使えば、有効になっている設定のリストが見られる

応用
For unit tests, you can also call new SparkConf(false) to skip loading external settings and get the same configuration no matter what the system properties are.
→ つまりは "VM引数を読み込まない" ということらしい

・spark-shell において、SparkConf を設定した場合、 sc.stop で sc を停止した後、改めて定義する必要があるらしい。

http://stackoverflow.com/questions/31397731/customize-sparkcontext-using-sparkconf-set-when-using-spark-shell/31402667#31402667

・使用できるプロパティ

https://spark.apache.org/docs/latest/configuration.html#available-properties

Apache Curator Framework のリトライ処理について調べてみる

zookeeper サーバと接続出来なかった場合、そのリトライ処理はどのように実装されるのか?

 

概要

・ Curator とは、Zookeeper 上に構築された一連の高レベルライブラリ

→複雑な接続処理を容易にするAPIを提供してくれるもの

 

前提

・ローカル環境で Zookeeper サーバを起動していない状態において、Curator を使用したアプリを起動してみる。

→当然、接続はされないはず

→設定されたタイムアウト時間、リトライ回数後に終了してくれるはず?

 

実践

・新しい curator クライアントを生成する。

https://github.com/blueskyarea/zookeeper/blob/master/curator/sample-newClient.java

→新しい curator クライアント生成時において、セッションタイムアウト時間、コネクションタイムアウト時間、リトライ回数などを定義出来るようだ。

 

まとめ

・セッションタイムアウト時間は、一度はコネクションはされたものの、サーバと通信出来なくなった場合のタイムアウト時間っぽい。

・コネクションタイムアウト時間は、サーバに初めて接続を試みたが、サーバと通信出来ない場合のタイムアウト時間と思う。

・セッションタイムアウト時間、コネクションタイムアウト時間はそれぞれ1試行内の猶予時間。タイムアウトしたとしても、リトライ回数が残って入れば改めてタイムアウト時間まで接続を試みる。

 

応用

・curator を生成するだけでは、zookeeper への接続は必ずしも発生しない?

→curator 生成するだけなら、 zookeeper 停止していてもエラーにならないかも

→znode を作成するタイミングで zookeeper に接続しにいくから、そこでエラーになる

・デフォルトのセッションタイムアウト時間、コネクションタイムアウト時間は?

https://github.com/apache/curator/blob/master/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java

→バージョンによって異なるとは思うが、セッションタイムアウトは60秒で、コネクションタイムアウトは15秒。

・リトライが終了したら、 例えば ConnectionLossException が発生する(はず)

 

参考リンク

http://curator.apache.org/curator-client/

http://curator.apache.org/getting-started.html

https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.html

https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html

 

eclipse ローカルヒストリーをほんの少しだけ試してみました

ローカルヒストリーは、eclipse 上で編集した履歴を残しておける機能ですね。
普段、変更履歴は git で確認しているので、個人的には全く使ったことがなかったのですが、便利そうです。

ローカルヒストリーの使い方

eclipse 上でファイルを右クリック、比較、ローカルヒストリーの順で選択していくと表示されます。
f:id:blueskyarea:20170706220031p:plain

こんな感じで、左側に最新ファイルの内容、右側に選択中の過去ファイルの内容が表示され、その差分が確認できます。

過去のファイル同士でも確認できる

ctrlキーを押しながら、2つのファイルを選択して、右クリック→互いのファイルを比較を選択すると、その2つのファイルの差分が確認できます。
f:id:blueskyarea:20170706221146p:plain

活躍しそうな場面

1. プログラムを色々といじっていたら、いつの間にか動かなくなった。どこが悪いか分からない。前の動く状態の時まで戻したい。
> 小さい変更の度に動作確認する習慣がある人にはこういうケースは少ないかもしれませんが、それでも確実に前の状態に戻せるので便利だと思います。

2. 新しい機能の実装が終わった。とりあえずどこを変更したのか、その差分をまとめて確認したい。
> git などのバージョン管理システムが導入されているのであれば、そちらでも十分な気がします。

3. とりあえず差分をみたい
> 難しいことは抜きで、私にとってはこれが一番の動機になるかもしれません。

まとめ
  • ヒストリーファイル同士でも確認できることは知らなかった。
  • IDEは多機能なのに、まだまだ単純なエディターとしか使えてないのが勿体ないなぁ。

Java 例外処理における 11 個の誤り

Java の例外処理における tips を見つけたんですが、自分にはとても分かりやすかったので、感想と共に書き残しておこうと思います。

11 Mistakes Java Developers make when Using Exceptions
https://www.linkedin.com/pulse/11-mistakes-java-developers-make-when-using-rafael-chinelato-del-nero

1. Exception クラスしか使わないこと

どんな例外をキャッチする場合も、Exception クラスで定義してしまっているということですね。
まぁ、例外が発生したらその種類に関わらず、アプリケーションを止めてしまうようなケースであれば、楽と言えば楽と思いますが。
別の見方をすれば、どんな例外が発生するのか開発者が想定出来ていないということなのかもしれません。

基本的には Exception クラスを継承した固有の例外クラスを定義してあげて、スロー・キャッチしてあげるようにするのが良いのかもしれません。
Exception クラスでキャッチするということは、本当に想定外の例外が発生した場合になるのかもしれません。

2. 固有の例外クラスを作り過ぎてしまうこと

確かに例外が発生する箇所ごとに、例外クラスを作っていたら例外クラスだらけになってしまいますね。
自分は基本的にキャッチ例外と非キャッチ例外の2つしか作らないのですが、この記事に書かれているように、ビジネス要件のレベルで例外を作成するのも良いなと思いました。
その例外の名前を見るだけで、大体の例外内容やビジネスインパクトなどが分かるくらいが良いですね。

例)
ApplicationFailException よりも ApiNotAvailableException の方が何が問題なのか分かります。

3. すべての例外キャッチ時にログを残すこと

ログを残すことは良いことに思えますが、例外をキャッチする度にログを残していると、最終的に同じエラー内容が何度も残されてしまうかもしれません。
ログを残すにしても、スロー元ですでにロギングされている内容でないか?また、スロー先でまとめてロギング出来ないかを考慮して残すようにしたいですね。

4. キャッチ例外(Exceptionクラスを継承)と、非キャッチ例外(RuntimeExceptionを継承)の違いを知らないこと

自分が初めて固有の例外クラスを作成したときは、とにかく非キャッチ例外を使うようにしていました。
理由は単に楽だからです。そこにそれぞれの例外の違いなどは特に意識していません。
非キャッチ例外でスローしておけば、呼び出し元でわざわざキャッチ処理を書く必要がありませんから。

それぞれの役割を知るようになってからは、使い分けるように(一応)意識するようになりました。
この使い方については、さまざまな意見があるようですが自分の中では以下の使い分けで落ち着いています。

リカバリ可能な例外であれば、キャッチ例外。
リカバリ不可能(あるいは不要)な例外であれば、非キャッチ例外。

キャッチ例外でスローしておけば、呼び出し元で改めて try - catch しないといけないので。
意図的にその例外をハンドリングしていることを明示していることにもなります。

5. 無音の例外があること

Silencing を無音と直訳してしまってますが、つまりは例外が発生しないところでキャッチしようとしていることですね。
以下は極端な例ですが、全く意味のない処理になります。

try {
  // Nothing to do
} catch (Exception e) {
  // Nothing to do
}

こんなことするわけない!と思いがちですが、「どんな例外が発生するか分からないから、とりあえず try - catch しとこう」みたいなコーディングは、同じことをしているのかもしれません。
自分も覚えがあります。

6. "throw early, catch late" の原則に従っていないこと

この原則は有名らしいのですが、聞いたことがなかったです。
自分なりに調べたところ、
1) throw early -> 問題の原因を見つけやすくするために、出来るだけ早く例外を投げること
2) catch late -> その例外の対処は、高い階層であればあるほど対処しやすい(どこからリトライさせるのか)

というように理解しています。

7. 例外に対して、明確なメッセージを使っていない

単に「エラーが発生しました」「処理に失敗しました」では何が悪かったのか分からない。
より具体的に「◯◯のデータベースの××テーブルが見つかりません」のように書いてあった方が原因がすぐに分かる。

8. 例外を処理した後に、クローズ系の後処理をしていない

せっかく try - catch - finally があるのだから、finally においてデータベースコネクションのクローズなんかもしておくと、パフォーマンス的には良いかもしれません。

try(Dao mysqlDao = new Dao()) {}

try-with-resources で書いておければ、毎回 close 書かなくて済みます。

9. javadoc に例外についての文書が残されていない

個人的に一番気をつけたいと思った項目。
何故、その例外を作ったのか?って全く書いていないので、意識して書くようにしたいです。

10. Stacktrace(トレース)を失うこと

英語の訳は Never lose the Stacktrace -> Stacktrace を決して失わないこと のように思いますが、文書からするとトレースを失うことが問題のように思えます。
個人的にはよく「例外の情報源を握りつぶす」と呼んでいます。

11. 固有例外が階層で整理されていないこと

Java の例外クラスが階層で定義されているのと同じように、固有で作成する例外についても階層で整理するようにしなさいってことですね。
自分は2つ以上の固有例外クラスを作ったことがないので、あまり実感はないのですが、例外の発生箇所(階層)によって例外を定義するのであれば、やはり階層を意識した作りにした方が管理しやすいだろうなと思います。

まとめ
  • 例外処理は面倒だけど、意味のある例外処理を書けるということは、そのプログラムを理解出来ているということだと思う
  • 意味のある例外処理を書くには、何故その例外が発生するのかを理解する必要がある
  • 理由をコメント javadoc で残すようにしよう

maven Dependency trees が便利

Maven の便利な機能の一つとして、ライブラリの依存性を確認できるものがあります。

あるプロジェクトの中で色んなライブラリを活用していると、そのライブラリ間の依存性が問題になり、プログラムが正常に動作しない場合があったりします。

ライブラリAとライブラリBが同じライブラリCに依存している、けどそのバージョンが違うみたいなこともあります。

そういった時にどのライブラリに問題があるのか? 調査するのに本当に時間がかかったりします。

ただ、次のコマンドを使えば簡単に依存ライブラリを取得することが出来ます。

mvn dependency:tree

このコマンドは、直接依存しているライブラリや間接的に依存しているライブラリを表示してくれます。
こんな感じです。
hbase-client は色んなライブラリを抱えていますね。

[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ hbase-mini-monitor ---
[INFO] com.blueskyarea:hbase-mini-monitor:jar:1.0-SNAPSHOT
[INFO] +- org.apache.hbase:hbase-client:jar:1.1.2:compile
[INFO] |  +- org.apache.hbase:hbase-annotations:jar:1.1.2:compile
[INFO] |  |  +- jdk.tools:jdk.tools:jar:1.7:system
[INFO] |  |  \- log4j:log4j:jar:1.2.17:compile
[INFO] |  +- org.apache.hbase:hbase-common:jar:1.1.2:compile
[INFO] |  |  +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] |  |  \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
[INFO] |  +- org.apache.hbase:hbase-protocol:jar:1.1.2:compile
[INFO] |  +- commons-codec:commons-codec:jar:1.9:compile
[INFO] |  +- commons-io:commons-io:jar:2.4:compile
[INFO] |  +- commons-lang:commons-lang:jar:2.6:compile
[INFO] |  +- commons-logging:commons-logging:jar:1.2:compile
[INFO] |  +- com.google.guava:guava:jar:12.0.1:compile
[INFO] |  |  \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] |  +- io.netty:netty-all:jar:4.0.23.Final:compile
[INFO] |  +- org.apache.zookeeper:zookeeper:jar:3.4.6:compile
[INFO] |  |  +- org.slf4j:slf4j-api:jar:1.6.1:compile
[INFO] |  |  \- org.slf4j:slf4j-log4j12:jar:1.6.1:compile
[INFO] |  +- org.apache.htrace:htrace-core:jar:3.1.0-incubating:compile
[INFO] |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
[INFO] |  |  \- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
[INFO] |  +- org.jruby.jcodings:jcodings:jar:1.0.8:compile
[INFO] |  +- org.jruby.joni:joni:jar:2.1.2:compile
[INFO] |  +- org.apache.hadoop:hadoop-auth:jar:2.5.1:compile
[INFO] |  |  +- org.apache.httpcomponents:httpclient:jar:4.2.5:compile
[INFO] |  |  |  \- org.apache.httpcomponents:httpcore:jar:4.2.4:compile
[INFO] |  |  \- org.apache.directory.server:apacheds-kerberos-codec:jar:2.0.0-M15:compile
[INFO] |  |     +- org.apache.directory.server:apacheds-i18n:jar:2.0.0-M15:compile
[INFO] |  |     +- org.apache.directory.api:api-asn1-api:jar:1.0.0-M20:compile
[INFO] |  |     \- org.apache.directory.api:api-util:jar:1.0.0-M20:compile
[INFO] |  +- org.apache.hadoop:hadoop-common:jar:2.5.1:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-annotations:jar:2.5.1:compile
[INFO] |  |  +- commons-cli:commons-cli:jar:1.2:compile
[INFO] |  |  +- org.apache.commons:commons-math3:jar:3.1.1:compile
[INFO] |  |  +- xmlenc:xmlenc:jar:0.52:compile
[INFO] |  |  +- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] |  |  +- commons-net:commons-net:jar:3.1:compile
[INFO] |  |  +- commons-el:commons-el:jar:1.0:runtime
[INFO] |  |  +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] |  |  |  +- commons-digester:commons-digester:jar:1.8:compile
[INFO] |  |  |  |  \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] |  |  |  \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] |  |  +- org.apache.avro:avro:jar:1.7.4:compile
[INFO] |  |  |  +- com.thoughtworks.paranamer:paranamer:jar:2.3:compile
[INFO] |  |  |  \- org.xerial.snappy:snappy-java:jar:1.0.4.1:compile
[INFO] |  |  +- com.jcraft:jsch:jar:0.1.42:compile
[INFO] |  |  \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] |  |     \- org.tukaani:xz:jar:1.0:compile
[INFO] |  +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.5.1:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-yarn-common:jar:2.5.1:compile
[INFO] |  |  |  +- org.apache.hadoop:hadoop-yarn-api:jar:2.5.1:compile
[INFO] |  |  |  \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
[INFO] |  |  |     +- javax.xml.stream:stax-api:jar:1.0-2:compile
[INFO] |  |  |     \- javax.activation:activation:jar:1.1:compile
[INFO] |  |  \- io.netty:netty:jar:3.6.2.Final:compile
[INFO] |  \- com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile
[INFO] +- com.google.code.gson:gson:jar:2.2.4:compile
[INFO] +- org.eclipse.jetty:jetty-server:jar:9.1.3.v20140225:compile
[INFO] |  +- javax.servlet:javax.servlet-api:jar:3.1.0:compile
[INFO] |  +- org.eclipse.jetty:jetty-http:jar:9.1.3.v20140225:compile
[INFO] |  |  \- org.eclipse.jetty:jetty-util:jar:9.1.3.v20140225:compile
[INFO] |  \- org.eclipse.jetty:jetty-io:jar:9.1.3.v20140225:compile
[INFO] \- junit:junit:jar:3.8.1:test

これで調査にかける時間が削減できるといいなと思います。