社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

リファクタリングと追加実装はコミットを分けてほしい

主にソースコードをレビューする立場である場合の視点になります。
チームで開発しているとき、リファクタリングと追加実装を同時にレビュー提出されることがあります。

ただ、レビュー依頼のコメントには、リファクタリングのことは触れられていないので、レビューする側としては、全てが追加機能の実装に伴う変更点であると最初は解釈します。

しかしながら、レビューを進めていくとどうも追加実装の要件とは関係の無いところまで編集されている様子。
よくよく確認してみると、気になった箇所のリファクタリングをついでにやったとのこと。

気になったソースコードをそのまま放置せずにリファクタリングする試みはとても良いことだと思うのですが、個人的には出来ればコミットは分けておきたいところです。
コミットを分けておくことで、リファクタリング箇所と追加実装箇所を明確に分けて見ることができるため、レビューもしやすくなります。

あと、リファクタリングを行った段階で単体テストを通しておけば、そのリファクタリングによって不具合が作られなかったことの確認も容易にできます。
以前あったのが、リファクタリングと追加実装を同時に行った後、単体テストが通らなくなり、ずっと追加実装したコードを疑っていたが、実はリファクタリングした箇所が不具合を作ってしまっていました。

チームで開発作業していると、他人が実装したプログラムを引き継いだり、逆に自分が実装したプログラムを引き継いてもらったりなど日常茶飯事なので、少しずつ小さなステップで確認しながら実装していかないと、簡単に不具合が混入されてしまいます。

1.コード規約が定まっていない(個人的にはあまり細かく縛られたくないですが)
2.ソースコード内のコメントが不親切
3.詳細設計書がない

こういった場合、良くも悪くもプログラマーの個性がソースコードに表れるため、理解が難しくなる場合があります。
※数ヵ月後には自分自身が書いたものですら、理解出来なくなってしまうことも

切羽詰っているプロジェクトほど、こういったステップを踏む余裕はないかもしれませんが、後戻りを防ぐためにも確実に進めるようにしていきたいものです。

Java 無名(匿名)クラスを意識して使ってみる

Javaでプログラムの基本は、クラスのインスタンスを生成し、そのインスタンスから目的のメソッドを呼ぶところにあると思いますが、結果的に一度しかインスタンス化されないクラス(オブジェクト)があったり、ある特定のクラスからしか必要とされないインスタンスがあったりします。

自分はプログラミングする際、1つのクラスに多くの役割を持たせたくない(役割分担させる)ため、クラスを細かく定義することを基本にしていますが、中には一度しかインスタンスが生成されない、メソッドが一つしかないクラスを作ってしまうことがあります。

特に大きな問題になる訳ではないですが、単純な処理しか行っていないのであれば、そのメソッドを定義する無理にクラスを定義せずに、その処理を必要としているクラス内に含めてしまっても良いかもしれません。

無名クラスという仕組みを使えば、インターフェースを実装したクラス(あるいはあるクラスを継承したサブクラス)の宣言を省略することが出来るので、その辺りを上手く使えないかと試みたのですが(今までは意識して使ったことなし)。

クラスを定義しインスタンスを明示的に生成する場合

interface Dao {
    String getData();
}
	
class MySqlDao implements Dao {
    @Override
    public String getData() {
	return "MySql data from MySqlDao.class";
    }
}
MySqlDao mySqlDao = new MySqlDao();
System.out.println(mySqlDao.getData());

Dao インターフェースを実装した MySqlDao クラスを定義しています。
呼び出し側では、そのインスタンスを生成しています。

無名クラスを使う場合

interface Dao {
    String getData();
}
Dao mySqlDao2 = new Dao() {
    @Override
    public String getData() {
	return "MySql data from anonymous class";
    }
};
System.out.println(mySqlDao2.getData());

まるで Dao のインスタンスを生成しているように見えますが、ここでは Dao インターフェースを実装した無名クラスのインスタンス(mySqlDao2 )が生成されていることになります。
※そもそもインターフェースのインスタンスを生成することはできない

結局、クラスの宣言と利用を同時に行っているだけではあるのですが、これによって MySqlDao.class の定義を省略することが可能になりました。
管理するクラスの数が減るという面でメリットがあると思います。

一方で”無名”のクラスになってしまうので、そのクラス名からどのような役割を持っているのかを推測することが出来なくなります。
例が悪かったですが、Dao などは外部から呼び出される前提のクラスになると思いますので、広く使われるようなクラスは無名クラスにするべきではなさそうです。
本当にそこでしか使わない(使い捨てに出来る)クラス限定で適用するものかと思います。
さらに無名クラスになれるのは、インターフェースを実装したクラスあるいはサブクラスに限定されるので、今のところ個人的には活躍の場はそれ程ない見込みです。

Java8 日時APIがわからない(現在時刻の取得編)

もうすぐ、JDK8(java8)の商用サポート期限が2019年1月で終了するという中、未だに Java8 日時API に混乱させられています。。
※この手の情報は色んなところに既にまとめられていますし、自分は5年くらい遅れている気がしますが、自分の備忘録のために覚えたことを残しておきたいと思います

実行マシンの timezone は JST です。

$ date
2018年  9月 28日 金曜日 00:46:31 JST

どんなクラスがあるか

クラス 内容
Instant 日時(エポック秒
LocalDateTime 日付時刻(タイムゾーンなし)
ZonedDateTime タイムゾーンつきの日付時刻
OffsetDateTime オフセット付きの日付時刻

now() で現在時刻を取得してみる

Instant instantNow = Instant.now();
System.out.println("instantNow:" + instantNow);

LocalDateTime ldtNow = LocalDateTime.now();
System.out.println("ldtNow:" + ldtNow);

ZonedDateTime zdtNow = ZonedDateTime.now();
System.out.println("zdtNow:" + zdtNow);

OffsetDateTime odtNow = OffsetDateTime.now();
System.out.println("odtNow:" + odtNow);

結果

Instant.now(): 2018-09-27T16:05:38.683Z
LocalDateTime.now(): 2018-09-28T01:05:39.077
ZonedDateTime.now(): 2018-09-28T01:05:39.078+09:00[Asia/Tokyo]
OffsetDateTime.now(): 2018-09-28T01:05:39.079+09:00

Instant.now() で取得した時刻は、マイナス9時間されている、つまりUTCで取得されていることが解ります。
ZonedDateTime と OffsetDateTime に関しても、9時間の差分が UTC に対してあることが明示されています。

タイムゾーンIDを指定してみる

LocalDateTime ldtjst = LocalDateTime.now(ZoneId.of("Asia/Tokyo"));
System.out.println("LocalDateTime.now(): " + ldtjst);

ZonedDateTime zdtjst = ZonedDateTime.now(ZoneId.of("Asia/Tokyo"));
System.out.println("ZonedDateTime.now(): " + zdtjst);

OffsetDateTime odtjst = OffsetDateTime.now(ZoneId.of("Asia/Tokyo"));
System.out.println("OffsetDateTime.now(): " + odtjst);

結果

LocalDateTime.now(): 2018-09-28T01:13:47.681
ZonedDateTime.now(): 2018-09-28T01:13:47.681+09:00[Asia/Tokyo]
OffsetDateTime.now(): 2018-09-28T01:13:47.681+09:00

※Instant.now() には、ZoneId を指定できません

結果、見た目は特に指定しなかった場合と変わらず。これは指定しなかった場合も、JST として解釈されているためと思われます。

タイムゾーンを変換してみる(JST -> UTC)

LocalDateTime ldtjst = LocalDateTime.now(ZoneId.of("Asia/Tokyo"));
LocalDateTime ldtutc = LocalDateTime.ofInstant(ldtjst.toInstant(ZoneOffset.UTC), ZoneId.of("UTC"));
System.out.println("LocalDateTime.now(): " + ldtutc);

ZonedDateTime zdtjst = ZonedDateTime.now(ZoneId.of("Asia/Tokyo"));
ZonedDateTime zdtutc = ZonedDateTime.ofInstant(zdtjst.toInstant(), ZoneId.of("UTC"));
System.out.println("ZonedDateTime.now(): " + zdtutc);

OffsetDateTime odtjst = OffsetDateTime.now(ZoneId.of("Asia/Tokyo"));
OffsetDateTime odtutc = OffsetDateTime.ofInstant(odtjst.toInstant(), ZoneId.of("UTC"));
System.out.println("OffsetDateTime.now(): " + odtutc);

結果

LocalDateTime.now(): 2018-09-28T01:25:50.328
ZonedDateTime.now(): 2018-09-27T16:25:50.329Z[UTC]
OffsetDateTime.now(): 2018-09-27T16:25:50.329Z

LocalDateTime.now() の見た目には変化なし、これはそもそも LocalDateTime がタイムゾーンを持たないから。
ZonedDateTime と OffsetDateTime は、マイナス9時間された UTCで取得されていることが解ります。
時刻の最後に "Z" が付いていますが、これは Zero timezone、つまり UTC からのオフセットが 0 であるという意味になります。

所感
現在時刻を取得しただけですが、各クラスがどのような意味を持つのかや、タイムゾーンを指定、変換したときの振る舞いを確認することが出来ました。
ZonedDateTime と OffsetDateTime の違いが解りにくいことと、各クラス間での変換については調べられていませんが、これだけでも、自分の中では少し整理できたように思います。

Apache Spark2.3 ブロードキャスト変数のパフォーマンス

ブロードキャスト変数は、リードオンリーの変数を効率的に各 Executor に送信する仕組みです。

Apache Spark2 にて、ブロードキャスト変数のパフォーマンスをローカル環境で確認してみました。
スレッド数は 3 を指定しています。
※ sparkConf.setMaster("local[3]");

ブロードキャスト変数なしの場合
1. 100 万件の数値リストを生成
2. 3件の文字列リストから RDD を生成
3. RDD の各文字列要素に対して、100 万件の数値リストをループし付加する処理(100万回ループ)を行う。
※ 即ち、3 の処理が (spark)Executor で行われる
計測結果(sec):
00:00:07.572
00:00:09.765
00:00:10.879

ブロードキャスト変数ありの場合
1. 100 万件の数値リストを生成し、それをブロードキャスト変数にする
2. 3件の文字列リストから RDD を生成
3. RDD の各文字列要素に対して、100 万件の数値リストをブロードキャスト変数で渡し、ループし付加する処理(100万回ループ)を行う。
※ 即ち、3 の処理が (spark)Executor で行われる
計測結果(sec):
00:00:01.593
00:00:01.802
00:00:01.668

ブロードキャスト変数を用いて、100万件の数値リストを Executor に渡した方が、圧倒的に速いという結果になりました。
いくら何でも違いが大きい印象を受けたので、これは Spark2 の恩恵かも?しれないと思い、Spark1.6でも試してみたところ
Spark1.6 ブロードキャスト変数なし(sec):
0:00:09.843
0:00:08.355
0:00:08.403

Spark1.6 ブロードキャスト変数あり(sec):
0:00:01.594
0:00:00.890
0:00:00.899

結果は誤差の範囲でした。むしろ1.6の方がほんの少し速いという結果に。
テストで使用したコード。
github.com

Couchbase 同期・非同期 insert(upsert)のパフォーマンス比較

Couchbase Java SDK を使用して、10万件のデータをインサートする場合のパフォーマンスを比較しました。
通常は同期処理にてインサートが行われますが、非同期用のAPIも用意されていましたので、それの比較となります。

Couchbase server は Docker で生成したもので、ノード数は1つだけです。

1.10万ループ内で1件ずつ insert(upsert) する場合(同期)

    protected void bigPut() {   	
    	// data preparation
    	stopWatch.start();
    	IntStream.range(0, 100000).forEach(i -> {
    		JsonObject user = JsonObject.empty()
        		    .put("firstname", "Walter")
        		    .put("lastname", "White")
        		    .put("job", "chemistry teacher")
        		    .put("age", i);
        	JsonDocument doc = JsonDocument.create("walter:" + i, user);
        	bucket.upsert(doc);
    	});
    	stopWatch.stop();
    	System.out.println(stopWatch);
    	stopWatch.reset();
    	
    	// disconnection
    	cluster.disconnect();
    }

2. 10万件のリストを作成し、そのリストを非同期で insert(upsert)した場合

protected void bulkPut(){
    	stopWatch.start();
    	List<JsonDocument> jsons = new ArrayList<>();
    	IntStream.range(0, 100000).forEach(i -> {
    		JsonObject user = JsonObject.empty()
        		    .put("firstname", "Walter")
        		    .put("lastname", "White")
        		    .put("job", "chemistry teacher")
        		    .put("age", i);
        	JsonDocument doc = JsonDocument.create("walter:" + i, user);
        	jsons.add(doc);
    	});
    	
    	Observable
        .from(jsons)
        .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(final JsonDocument docToInsert) {
                return bucket.async().upsert(docToInsert);
            }
        })
        .last()
        .toBlocking()
        .single();
    	
    	stopWatch.stop();
    	System.out.println(stopWatch);
    	stopWatch.reset();
    	
    	// disconnection
    	cluster.disconnect();
    }

一般的に非同期処理の方が速く処理されるのは想像しやすいところです。
ただ、2に関しては、10万件のリストを作成してから、更にそのリストからドキュメントを取り出すような処理を行うため、1周多めにループ処理を行うことになります。
しかし、その時間込みで計測を行いました。

結果(3回計測)
1.10万ループ内で1件ずつ insert(upsert) する場合(同期)
00:00:58.620
00:01:00.178
00:00:53.739

2. 10万件のリストを作成し、そのリストを非同期で insert(upsert)した場合
00:00:36.464
00:00:35.964
00:00:48.657

それでもやはり、非同期処理の方がトータルで速いという結果になりました。
10万件を同時に処理したとは言え、正直なところ、想像していたよりも差が大きい印象です。

Docker Couchbase Server の起動スクリプト

Docker で Couchbase Server のコンテナを作成し、サービスを起動するスクリプトです。

sudo /etc/init.d/couchbase-server start

# waiting for finishing to start couchbase-server
sleep 30s

# Setup Administrator username and password
curl -v -X POST http://localhost:8091/settings/web -d 'password=password&username=admin&port=8091'

sleep 5s

# Setup Bucket
curl -u admin:password -v -X POST http://localhost:8091/pools/default/buckets \
-d 'flushEnabled=1&threadsNumber=3&replicaIndex=0&replicaNumber=0&evictionPolicy=valueOnly&ramQuotaMB=597&bucketType=membase&name=default&authType=sasl&saslPassword='

sleep 5s

# Setup Index RAM Quota
curl -u admin:password -X POST http://localhost:8091/pools/default \
-d 'memoryQuota=5000' -d 'indexMemoryQuota=269'

Couchbase Server を起動し、REST API にて admin 用のユーザーとパスワードを設定。
その後、Bucket も作るようにしているのですが、いくつかの sleep が散見されます。

最初はこの sleep 置いてなかったのですが、Couchbase Server 起動コマンドを叩いた直後は、REST API が使えないので、Bucket のセットアップ以降が失敗していました。

とはいえ、適当に固定した時間を sleep させるのは不安定なので、retry function を設けました。

sudo /etc/init.d/couchbase-server start

function retryRequest () {
  count=0
  retry_upper_limit=5
  request=$1
  while :
  do
    command="curl -L ${request} -o /dev/null -w %{http_code}\n -s"
    res=$(${command})
    if [ $res = 200 ] || [ $res = 202 ] ; then
      break
    fi
    count=$(expr $count + 1)
    if [ ${count} -gt ${retry_upper_limit} ] ; then
      exit -1
    fi
    sleep 5s
  done
}

# waiting for finishing to start couchbase-server
retryRequest "http://localhost:8091"

# Setup Administrator username and password
retryRequest "-X POST http://localhost:8091/settings/web -d password=password&username=admin&port=8091"

# Setup Bucket
retryRequest "-u admin:password -v -X POST http://localhost:8091/pools/default/buckets -d flushEnabled=1&threadsNumber=3&replicaIndex=0&replicaNumber=0&evictionPolicy=valueOnly&ramQuotaMB=597&bucketType=membase&name=default&authType=sasl&saslPassword="

# Setup Index RAM Quota
retryRequest "-u admin:password -v -X POST http://localhost:8091/pools/default -d memoryQuota=2000&indexMemoryQuota=269"

REST API を取り合えず叩いてみて、成功コードが返ってこなかったら、5秒待ってリトライするようにしました。
環境に依存すると思いますが、REST API が使えるようになるまで4回ほどリトライしているので、20秒ほど要しているようです。

Spark2 AccumulatorV2

Spark2 で Accumulator を使おうと思ったら、deprecated になっていました。
代わりに AccumulatorV2 を使うようにとのこと。
https://spark.apache.org/docs/2.3.0/api/java/

旧 Accumulator と同じように使えるのかと思っていたら、AccumulatorV2 を継承して独自の Accumulator を定義する必要があるようです。
旧 Accumulator は単純に数値の加算処理しか対応していませんでしたが、独自に Accumulator を定義できるようになったということですね。
List を保持する Accumulator を作って、任意のタイミングで要素追加とかも出来そうです。

今回は、単純な加算処理だけ対応した Accumulator を以下のように定義しました。

class MyAccumulator extends AccumulatorV2<Integer, Integer> {
	
	private int count = 0;
	public MyAccumulator(int initialValue) {
		this.count = initialValue;
	}
	
	@Override
	public void add(Integer arg0) {
		this.count += arg0;
	}
	
	@Override
	public AccumulatorV2<Integer, Integer> copy() {
		return new MyAccumulator(value());
	}
	
	@Override
	public boolean isZero() {
		if (this.count == 0) {
			return true;
		}
		return false;
	}
	
	@Override
	public void merge(AccumulatorV2<Integer, Integer> arg0) {
		add(arg0.value());
	}
	
	@Override
	public void reset() {
		this.count = 0;
	}
	
	@Override
	public Integer value() {
		return this.count;
	}
}

気をつける必要があるのは、それぞれのメソッドの処理はきちんと書くこと。
何も書かなかったり、適当に null を返したりしてはいけないという意味です。
例えば、merge() メソッドは呼ばれていないと思って何も処理を書かないでいましたが、その場合、いくら加算しても結果は 0 になってしまいました。
それぞれのメソッドが内部的には呼ばれているようです。

使い方は以下のように、AccumulatorV2 を作り、(加算が)必要なタイミングで add を呼び出してあげます。

final AccumulatorV2<Integer, Integer> blankLines = new MyAccumulator(0);
jsc.sc().register(blankLines);
rdd.foreach(line -> {
	if (line.equals("")) {
		blankLines.add(1);
	}
});

サンプルソース
spark/AccumulatorV2Try.java at master · blueskyarea/spark · GitHub