社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

Spark2 AccumulatorV2

Spark2 で Accumulator を使おうと思ったら、deprecated になっていました。
代わりに AccumulatorV2 を使うようにとのこと。
https://spark.apache.org/docs/2.3.0/api/java/

旧 Accumulator と同じように使えるのかと思っていたら、AccumulatorV2 を継承して独自の Accumulator を定義する必要があるようです。
旧 Accumulator は単純に数値の加算処理しか対応していませんでしたが、独自に Accumulator を定義できるようになったということですね。
List を保持する Accumulator を作って、任意のタイミングで要素追加とかも出来そうです。

今回は、単純な加算処理だけ対応した Accumulator を以下のように定義しました。

class MyAccumulator extends AccumulatorV2<Integer, Integer> {
	
	private int count = 0;
	public MyAccumulator(int initialValue) {
		this.count = initialValue;
	}
	
	@Override
	public void add(Integer arg0) {
		this.count += arg0;
	}
	
	@Override
	public AccumulatorV2<Integer, Integer> copy() {
		return new MyAccumulator(value());
	}
	
	@Override
	public boolean isZero() {
		if (this.count == 0) {
			return true;
		}
		return false;
	}
	
	@Override
	public void merge(AccumulatorV2<Integer, Integer> arg0) {
		add(arg0.value());
	}
	
	@Override
	public void reset() {
		this.count = 0;
	}
	
	@Override
	public Integer value() {
		return this.count;
	}
}

気をつける必要があるのは、それぞれのメソッドの処理はきちんと書くこと。
何も書かなかったり、適当に null を返したりしてはいけないという意味です。
例えば、merge() メソッドは呼ばれていないと思って何も処理を書かないでいましたが、その場合、いくら加算しても結果は 0 になってしまいました。
それぞれのメソッドが内部的には呼ばれているようです。

使い方は以下のように、AccumulatorV2 を作り、(加算が)必要なタイミングで add を呼び出してあげます。

final AccumulatorV2<Integer, Integer> blankLines = new MyAccumulator(0);
jsc.sc().register(blankLines);
rdd.foreach(line -> {
	if (line.equals("")) {
		blankLines.add(1);
	}
});

サンプルソース
spark/AccumulatorV2Try.java at master · blueskyarea/spark · GitHub