社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

Spark Streaming で テキストファイルへのセーブ

ソースコード

RDD には saveAsTextFile というメソッドがあり、引数に指定したディレクトリに簡単に出力することができます。
Spark Streaming における DStream にも saveAsTextFiles というメソッドがありました。

ただし、Java での JavaDStream から使う場合は、dstream() を使って JavaDStream -> DStream に変換しなければなりません。

JavaDStream javaDStream;
DStream dstream = javaDStream.dstream();
dstream.saveAsTextFiles("prefix", "suffix");

1. "prefix" と "suffix" ?
ただ、この "prefix" と "suffix" が分かりにくい。。
実際に以下のような指定をして動かしてみると、

dstream.saveAsTextFiles("/tmp/output", "txt");

期待していたのは、/tmp/output ディレクトリ配下にファイルが生成されることでしたが
/tmp ディレクトリ配下に以下のようなディレクトリが生成されてしまいました。
ファイルではなくて以下のディレクトリです。

ls /tmp
output-1498060642000.txt
output-1498060643000.txt
output-1498060644000.txt
output-1498060645000.txt
output-1498060646000.txt

そもそもディレクトリなのに .txt という拡張子がついているのが違和感。

2. "prefix" を "/tmp/output/" にして、ディレクトリであることを明示してみる
最後に "/" を入れただけです。

dstream.saveAsTextFiles("/tmp/output/", "txt");

同じく期待していたのは、/tmp/output ディレクトリ配下にファイルが生成されることでしたが
/tmp/output ディレクトリ配下に、やはりディレクトリが生成されてしまいました。
しかも、ディレクトリ名の先頭に "-" が付いている始末。

/tmp/output$ ls
-1498061778000.txt  -1498061780000.txt  -1498061782000.txt  -1498061784000.txt
-1498061779000.txt  -1498061781000.txt  -1498061783000.txt  -1498061785000.txt

完全にこの "prefix" の指定の仕方は間違っている気がします。

3. 空っぽのディレクトリが作られまくる
この Streaming は Durations を 1秒間隔で設定していますが、これによって毎秒ディレクトリが生成される動きになりました。
たとえ、出力する内容が何も無いにしてもです。
空っぽのディレクトリ、正確には _SUCCESS ファイルのみ入っているディレクトリ。
もちろん、要らないディレクトリです。

4. データが空の時は何も出力したくない
DStream は RDD の並びである。
という訳で、foreachRDD なんてメソッドが用意されていました。
RDD を一つずつ取り出して、各 RDD に対して処理することができます。
RDD には isEmpty を使えば空っぽかどうか確認できるので、結局・・・

JavaDStream javaDStream;
javaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
	public Void call(JavaRDD<String> rdd) throws Exception {
		if(!rdd.partitions().isEmpty()) {
			rdd.saveAsTextFile("/tmp/output");
		}
	        return null;
	}
});

こんな形すれば、実際に新しいデータの入力があった時だけ、出力処理が行われるようになりました。
return null; というのが気になりますが。