社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

初 JMockit の感想

JUnit で使えるモックのライブラリには、mockito, jmockit, easymock などがあります。
個人的には mockito しか使ったことがなかったのですが、jmockit がよく使われているらしいので、試してみました。

具体的な使い方などは既に多くの記事が存在しますので、ここには書かないです。
※個人的な感想

1.テスト対象のクラス
Hoge.class -> テストの対象とする主クラス。
Foo.class -> Hoge.class から呼び出されるクラス。

どちらも、文字列を返すだけのシンプルなメソッドしか用意していません。
そして実際のテストクラスです。

テストコード1

テストコード2


2.クラス変数として mock を定義すると、全テストメソッドに適用される

// To specify as class value, every test can get the mock.
@Mocked private Hoge hogeMock;
	
@Test
public void testLocalValueBecomeMockFromClassValue() {
	Hoge hoge = new Hoge();
	assertThat(hoge.methodHoge(), nullValue());
}

@Test
public void testLocalValueBecomeMockFromClassValue2() {
	assertThat(hogeMock.methodHoge(), nullValue());
}

シンプルなテストクラスに対しては便利かもしれないけど、意図しない mock が適用されてしまう可能性がある。
次の例のように、各テストクラス毎に mock を渡してあげる方がよいと思います。

@Test
public void testLocalValueBecomeMockFromArg(@Mocked Hoge hogeMock) {
	Hoge hoge = new Hoge();
	assertThat(hoge.methodHoge(), nullValue());
}

3.特定のインプットに対する戻り値をテストするなら、Expectations。不特定のインプットに対しては NonStrictExpectations。
Expectations

@Test
public void testExpectationsWithReturnValue() {
	new Expectations() {{
		hogeMock.methodHoge(); result = "called without arguments.";
		hogeMock.methodHoge("hoge"); result = "called with arguments";
	}};
	
	assertThat(hogeMock.methodHoge(), is("called without arguments."));
	assertThat(hogeMock.methodHoge("hoge"), is("called with arguments"));
}

Expectations を使う場合は、Expectations 内で定義した順番と、assert の順番を揃える必要がある点に注意ですね。

NonStrictExpectations

@Test
public void testNonStrictExpectations() {
	new NonStrictExpectations() {{
		hogeMock.methodHoge(anyString); result = "stub returns value";
	}};
	
	// To specify "anyString", always returns same value.
	assertThat(hogeMock.methodHoge("hoge"), is("stub returns value"));
	assertThat(hogeMock.methodHoge("mock"), is("stub returns value"));
	assertThat(hogeMock.methodHoge("always"), is("stub returns value"));
}

どんな文字列でもの意味である anyString とか便利ですね。
スタブとして使う部品が入力値に依らず、常に同じ値を返して良いのであれば、これは使えますね。

4.Injectable を使えば、テスト対象クラスのクラス変数も簡単に mock にできる

@Tested private Hoge hogeTested;
@Injectable private Foo foo;

@Test
public void testHogeMethodHasIntValue() {
	final String abcString = "abc";
	new Expectations() {{
		foo.methodString(abcString); result = "return from methodString";
	}};
	
	assertThat(hogeTested.methodHoge(abcString), is("return from methodString"));
}

Injectable で定義されたクラス変数は Tested で定義されたクラス内で mock になる。
戻り値も自由自在に定義できる。すごく便利ですね。

5.感想
初めて JMockit 使ってみましたが、簡単に mock が作れて戻り値の定義も出来るので非常に便利だと思います。
慣れてくれば楽に書けるようになりそうです。
ただあまり乱用すると、可読性が落ちそうですし、mock 全てに対して言えることですが、 mock 化すべき対象は API や外部ストレージからの応答など、可変性があるものに限定したいですね。

Spark Streaming で テキストファイルへのセーブ

ソースコード

RDD には saveAsTextFile というメソッドがあり、引数に指定したディレクトリに簡単に出力することができます。
Spark Streaming における DStream にも saveAsTextFiles というメソッドがありました。

ただし、Java での JavaDStream から使う場合は、dstream() を使って JavaDStream -> DStream に変換しなければなりません。

JavaDStream javaDStream;
DStream dstream = javaDStream.dstream();
dstream.saveAsTextFiles("prefix", "suffix");

1. "prefix" と "suffix" ?
ただ、この "prefix" と "suffix" が分かりにくい。。
実際に以下のような指定をして動かしてみると、

dstream.saveAsTextFiles("/tmp/output", "txt");

期待していたのは、/tmp/output ディレクトリ配下にファイルが生成されることでしたが
/tmp ディレクトリ配下に以下のようなディレクトリが生成されてしまいました。
ファイルではなくて以下のディレクトリです。

ls /tmp
output-1498060642000.txt
output-1498060643000.txt
output-1498060644000.txt
output-1498060645000.txt
output-1498060646000.txt

そもそもディレクトリなのに .txt という拡張子がついているのが違和感。

2. "prefix" を "/tmp/output/" にして、ディレクトリであることを明示してみる
最後に "/" を入れただけです。

dstream.saveAsTextFiles("/tmp/output/", "txt");

同じく期待していたのは、/tmp/output ディレクトリ配下にファイルが生成されることでしたが
/tmp/output ディレクトリ配下に、やはりディレクトリが生成されてしまいました。
しかも、ディレクトリ名の先頭に "-" が付いている始末。

/tmp/output$ ls
-1498061778000.txt  -1498061780000.txt  -1498061782000.txt  -1498061784000.txt
-1498061779000.txt  -1498061781000.txt  -1498061783000.txt  -1498061785000.txt

完全にこの "prefix" の指定の仕方は間違っている気がします。

3. 空っぽのディレクトリが作られまくる
この Streaming は Durations を 1秒間隔で設定していますが、これによって毎秒ディレクトリが生成される動きになりました。
たとえ、出力する内容が何も無いにしてもです。
空っぽのディレクトリ、正確には _SUCCESS ファイルのみ入っているディレクトリ。
もちろん、要らないディレクトリです。

4. データが空の時は何も出力したくない
DStream は RDD の並びである。
という訳で、foreachRDD なんてメソッドが用意されていました。
RDD を一つずつ取り出して、各 RDD に対して処理することができます。
RDD には isEmpty を使えば空っぽかどうか確認できるので、結局・・・

JavaDStream javaDStream;
javaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
	public Void call(JavaRDD<String> rdd) throws Exception {
		if(!rdd.partitions().isEmpty()) {
			rdd.saveAsTextFile("/tmp/output");
		}
	        return null;
	}
});

こんな形すれば、実際に新しいデータの入力があった時だけ、出力処理が行われるようになりました。
return null; というのが気になりますが。

Spark Streaming の textFileStream で複数のディレクトリを対象にしてみる

ソースコード

単に2つの DStream を作成してあげるだけです。

// create DStream from text file
String logDir = "/tmp/logs";
String logDir2 = "/tmp/logs2";
JavaDStream<String> logData = jssc.textFileStream(logDir);
JavaDStream<String> logData2 = jssc.textFileStream(logDir2);

// output
logData.print();
logData2.print();

// start streaming
jssc.start();

// wait for end of job
jssc.awaitTermination();

出力結果
print() の出力結果ですが、以下のように1つの間隔(今回は一秒)に対して
2つの結果が出力される場所が出来ていました。

-------------------------------------------
Time: 1498053561000 ms
-------------------------------------------

-------------------------------------------
Time: 1498053561000 ms
-------------------------------------------

-------------------------------------------
Time: 1498053562000 ms
-------------------------------------------
2017-06-21T22:58:19+09:00	test.access1	{"message":"66.249.69.97 - - [24/Sep/2014:22:25:44 +0000] \"GET /071300/242153 HTTP/1.1\" 404 514 \"-\" \"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)\""}
2017-06-21T22:58:19+09:00	test.access1	{"message":"71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] \"GET /error HTTP/1.1\" 404 505 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}
2017-06-21T22:58:19+09:00	test.access1	{"message":"71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] \"GET /favicon.ico HTTP/1.1\" 200 1713 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}
2017-06-21T22:58:19+09:00	test.access1	{"message":"71.19.157.174 - - [24/Sep/2014:22:26:37 +0000] \"GET / HTTP/1.1\" 200 18785 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}
2017-06-21T22:58:19+09:00	test.access1	{"message":"71.19.157.174 - - [24/Sep/2014:22:26:37 +0000] \"GET /jobmineimg.php?q=m HTTP/1.1\" 200 222 \"http://www.holdenkarau.com/\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}

-------------------------------------------
Time: 1498053562000 ms
-------------------------------------------

-------------------------------------------
Time: 1498053571000 ms
-------------------------------------------

-------------------------------------------
Time: 1498053571000 ms
-------------------------------------------

-------------------------------------------
Time: 1498053572000 ms
-------------------------------------------
2017-06-21T22:58:29+09:00	test.access2	{"message":"71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] \"GET /error78978 HTTP/1.1\" 404 505 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}

-------------------------------------------
Time: 1498053572000 ms
-------------------------------------------

fluentd で収集したファイルを Spark Streaming で GET

Spark Streaming ではファイルが更新されたとしても、その差分のみの取得はされない。
Spark Streaming ではあるディレクトリに生成された新規ファイルは自動的に取り込んでくれるよう。
そこで差分のみを fluentd で取得し、それを新規ファイルとして出力してみました。

◆ fluentd の準備
1. ruby のバージョンアップ
fluentd をインストールしようとしたら、ruby のバージョン 2.1.0 以上を要求されたので、まず先にインストールしました。

sudo add-apt-repository -y ppa:brightbox/ruby-ng
sudo apt-get update
sudo apt-get -y install ruby2.1

$ ruby -v
ruby 2.1.9p490 (2016-03-30 revision 54437) [x86_64-linux-gnu]

2. fluentd のインストー

sudo gem install fluentd

3. fluentd のセットアップ

fluentd --setup .
-> fluent.conf ファイルが生成される

4. fluent.conf ファイルの編集
すでにファイルには色々と書き込まれていますが、すべて削除して以下の内容にしました。

## File input
## read apache logs with tag=apache.access
<source>
  type tail
  format none
  path /tmp/input/test.log
  tag apache.access
</source>

# match tag=apache.access and write to file
<match apache.access>
  type file
  path /tmp/logs/result.log
  time_slice_format %Y%m%d%H%M%S
  time_slice_wait 1m
</match>

5. fluentd の起動

fluentd -c fluent.conf -vv

◆ ログファイルの準備
fluentd の source 内で指定したログファイルを適当に更新するだけです。
こんな感じのログです。

apache.access   {"message":"66.249.69.97 - - [24/Sep/2014:22:25:44 +0000] \"GET /071300/242153 HTTP/1.1\" 404 514 \"-\" \"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)\""}
2017-06-17T10:30:31+09:00       apache.access   {"message":"71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] \"GET /error HTTP/1.1\" 404 505 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}
2017-06-17T10:30:31+09:00       apache.access   {"message":"71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] \"GET /favicon.ico HTTP/1.1\" 200 1713 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}

すると、更新された差分情報のみが fluentd によって回収され、以下のようなファイルが /tmp/logs 内に出来上がります。
result.log.20170617103031_0.log

◆ Spark streaming の起動
そして、Spark streaming の StreamingTextFileStream で /tmp/logs ディレクトリを監視していると
(単にスペース区切りで文字列カウントしただけなので、アウトプット自体は美しくないですが)
差分のみを取得することが出来ました。

-------------------------------------------
Time: 1497967776000 ms
-------------------------------------------
(\"Mozilla/5.0,5)
((KHTML,,4)
(x86_64),4)
(2017-06-20T23:08:33+09:00	apache.access	{"message":"71.19.157.174,4)
(222,1)
(/favicon.ico,1)
(505,1)
(Gecko),4)
(514,1)

TextFileStream でリアルタイム word count

Spark Streaming ではディレクトリを監視して、その中に入ったテキストファイルを取りこめるようなので試してみました。
ソースコードはこちら

1.準備(コーディング)
ディレクトリを監視し、テキストファイルを取り込むためには、textFileStream の引数に監視するディレクトリパスを指定します。
ディレクトリパスに、ワイルドカードは使えないようです。
監視するディレクトリ階層を深く設定できてしまうと、パフォーマンス的に影響があるためでしょうか。

// create DStream from text file
String logDir = "/tmp/logs";
JavaDStream<String> logData = jssc.textFileStream(logDir);

今回はワードカウントを実装してみたいと思います。
取り込んだ DStream を flatMap で取り出し、空白(スペース)区切りでリストに格納します。

// Split into words
JavaDStream<String> words = logData.flatMap(new FlatMapFunction<String, String>() {
  public Iterable<String> call(String line) throws Exception {
    return Arrays.asList(line.split(" "));
  }
});

flatMap を使った場合の関数(call)は、1つの要素を返すのではなくて、イテレータを返します。
返される DStream の内容はイテレータではなくて、すべてのイテレータから返された要素になります。
それぞれの要素は区切られることなく、フラットな状態です。
そのため、入力文字列を単語に分割する用途としてよく使われているようです。

ペアの DStream を生成し、reduce 処理でカウントしていきます。
このあたりは、RDD でワードカウントするのと同じ方法です。

// Transform into word and count
JavaPairDStream<String, Integer> counts = words.mapToPair(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String word) throws Exception {
	return new Tuple2(word, 1);
    }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer original, Integer additional) throws Exception {
	return original + additional;
    }
});

出来上がった counts を表示させます。
ペアの DStream でも print が使えるようになっています。
ただ、デフォルトでは10個までしか出力されないので、引数50を与えて50個まで出力できるようにします。

// output
counts.print(50);

2.実行
今回は apache のログのようなデータを用意し、それを取り込ませてみました。

(x86_64),4)
((KHTML,,4)
(222,1)
(66.249.69.97,1)
(/favicon.ico,1)
(505,1)
(Gecko),4)
(514,1)
(Chrome/37.0.2062.94,4)
(404,2)

今回は単に空白で単語を区切っただけなので、さすがに意味の分からない単位でワードカウントされてしまっていますが、きちんとカウントされていることが確認できました。

3.感想
ディレクトリを指定しておくだけで、その中に生成されたテキストを自動で取り込んでくれるのは非常に便利だと思います。
ただ、同じファイル名で追記されたデータは取り込んでくれませんでしたので、常に新しいファイルに出力する必要があるかもしれません。
また、新規作成ファイルであったとしても、ファイル作成中に中途半端な状態で取り込まれてしまう可能性があると思います。
これは hdfs など(原子性が保証?)のデータソースを使えれば解決できるかもしれません。
fluentd などと連携させることで非常に便利なリアルタイム集計ツールになってくれそうな気がします。

Spark Streaming 試してみました

初めての Spark を参照しながら、Apache Spark Streaming を試してみました。
ソースコード

まずは概念、概要から・・・


1. Spark は RDD を元に構築されていますが、Spark Streaming においては DStream と呼ばれる概念の元に構築されるらしい。
2. DStream は RDD の並びである。

  • Streaming 処理なので、小さな RDD を順番に処理していくイメージ?

3. DStream は Flume、Kafka、HDFS など、色々なデータソースから生成可能。

  • RDD も色んなデータソースから生成可能ですね。

4. DStream では、変換(transformation)と出力(output)の操作ができる。

  • RDD における変換とアクションとよく似ていますね。

5. Spark のバッチプログラムと異なり、Spark Streaming では、アプリケーションを常時稼働させる。

続いて、お試し実装。
Spark Streaming では StreamingContext を生成します。
SparkConf は SparkContext 生成時と同じもの。
Durations.seconds では、そのバッチのインターバルを設定します。
Streaming 処理なので、1秒など短い間隔で設定されることが多いのかなと思います。

// create StreamingContext
SparkConf conf = new SparkConf().setAppName("StreamingApp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

DStream を生成します。
StreamingContext の socketTextStream でデータを受信するホストとポート番号を指定します。

// create DStream from specified port
JavaDStream<String> lines = jssc.socketTextStream("localhost", 7777);

今回は、DStream に filter() をかけてみます。error という文字を含む場合だけ取り出すようにします。

// filter for DStream
JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() {
    public Boolean call(String line) throws Exception {
        return line.contains("error");
    }    	
});

フィルター後の DStream を出力します。
面白いところは、streaming をスタートさせる命令は、ソースコード上では最後に書かれているところです。
Spark の遅延評価の仕組みに通じるところがある気がします。
また、これは独立したスレッドの中で動くらしく、アプリケーションが終了しないように、awaitTermination を呼び出しています。

// output
errorLines.print();
        
// start streaming
jssc.start();
        
// wait for end of job
jssc.awaitTermination();

それでは、いざ実行!
起動には spark-submit を使います。

$ ./spark-submit --class com.blueskyarea.StreamingApp --conf spark.yarn.jar=${ASSEMBLY_JAR} ./spark-streaming/target/spark-streaming-1.0-SNAPSHOT.jar
WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:7777
java.net.ConnectException: 接続を拒否されました
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:579)
	at java.net.Socket.connect(Socket.java:528)
	at java.net.Socket.<init>(Socket.java:425)
	at java.net.Socket.<init>(Socket.java:208)
	at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

コネクションエラーが発生しましたが、これは 7777 ポートが LISTEN 状態になっていないためのようです。
なので、nc コマンドで LISTEN 状態にします。

nc -l 7777

そして、nc コマンドに続いて入力してみます。

abc
error
efror
there are some error
error2

すると、Spark Streaming の出力ウィンドウには、error が含まれる行のみが出力されました。

error
there are some error
error2

お試しレベルですが、比較的簡単に実現することが出来ました。
リアルタイム(今回は1秒単位)に処理がされていく様子を見るのは楽しいです。
試した入力サイズが非常に小さいですが、1秒単位での処理においてレスポンスも速いです。
より複雑な変換処理を入れた場合、1秒以上要する変換処理の後に出力させた場合、どのような動きになるのか試してみたいです。

Javaの演習問題やりました

Javaのプログラミングの練習したいと思って、「Java練習問題」でググって出てきたページ。
アルゴリズム編にチャレンジ。
https://eng-entrance.com/java-question-algorithm

全部で4問。最初の3問はすぐに書けた(とりあえず動くレベルでも)けど、4問目は苦戦しました。
入力した数式を括弧も含めてひとつひとつパースするなんて、、練習問題だからこそある設定ですね。

こういうのを書いてみると、普段は色んな便利ライブラリにお世話になっているなぁと感じます。