社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

TextFileStream でリアルタイム word count

Spark Streaming ではディレクトリを監視して、その中に入ったテキストファイルを取りこめるようなので試してみました。
ソースコードはこちら

1.準備(コーディング)
ディレクトリを監視し、テキストファイルを取り込むためには、textFileStream の引数に監視するディレクトリパスを指定します。
ディレクトリパスに、ワイルドカードは使えないようです。
監視するディレクトリ階層を深く設定できてしまうと、パフォーマンス的に影響があるためでしょうか。

// create DStream from text file
String logDir = "/tmp/logs";
JavaDStream<String> logData = jssc.textFileStream(logDir);

今回はワードカウントを実装してみたいと思います。
取り込んだ DStream を flatMap で取り出し、空白(スペース)区切りでリストに格納します。

// Split into words
JavaDStream<String> words = logData.flatMap(new FlatMapFunction<String, String>() {
  public Iterable<String> call(String line) throws Exception {
    return Arrays.asList(line.split(" "));
  }
});

flatMap を使った場合の関数(call)は、1つの要素を返すのではなくて、イテレータを返します。
返される DStream の内容はイテレータではなくて、すべてのイテレータから返された要素になります。
それぞれの要素は区切られることなく、フラットな状態です。
そのため、入力文字列を単語に分割する用途としてよく使われているようです。

ペアの DStream を生成し、reduce 処理でカウントしていきます。
このあたりは、RDD でワードカウントするのと同じ方法です。

// Transform into word and count
JavaPairDStream<String, Integer> counts = words.mapToPair(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String word) throws Exception {
	return new Tuple2(word, 1);
    }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer original, Integer additional) throws Exception {
	return original + additional;
    }
});

出来上がった counts を表示させます。
ペアの DStream でも print が使えるようになっています。
ただ、デフォルトでは10個までしか出力されないので、引数50を与えて50個まで出力できるようにします。

// output
counts.print(50);

2.実行
今回は apache のログのようなデータを用意し、それを取り込ませてみました。

(x86_64),4)
((KHTML,,4)
(222,1)
(66.249.69.97,1)
(/favicon.ico,1)
(505,1)
(Gecko),4)
(514,1)
(Chrome/37.0.2062.94,4)
(404,2)

今回は単に空白で単語を区切っただけなので、さすがに意味の分からない単位でワードカウントされてしまっていますが、きちんとカウントされていることが確認できました。

3.感想
ディレクトリを指定しておくだけで、その中に生成されたテキストを自動で取り込んでくれるのは非常に便利だと思います。
ただ、同じファイル名で追記されたデータは取り込んでくれませんでしたので、常に新しいファイルに出力する必要があるかもしれません。
また、新規作成ファイルであったとしても、ファイル作成中に中途半端な状態で取り込まれてしまう可能性があると思います。
これは hdfs など(原子性が保証?)のデータソースを使えれば解決できるかもしれません。
fluentd などと連携させることで非常に便利なリアルタイム集計ツールになってくれそうな気がします。