社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

Java8 日時APIがわからない(現在時刻の取得編)

もうすぐ、JDK8(java8)の商用サポート期限が2019年1月で終了するという中、未だに Java8 日時API に混乱させられています。。
※この手の情報は色んなところに既にまとめられていますし、自分は5年くらい遅れている気がしますが、自分の備忘録のために覚えたことを残しておきたいと思います

実行マシンの timezone は JST です。

$ date
2018年  9月 28日 金曜日 00:46:31 JST

どんなクラスがあるか

クラス 内容
Instant 日時(エポック秒
LocalDateTime 日付時刻(タイムゾーンなし)
ZonedDateTime タイムゾーンつきの日付時刻
OffsetDateTime オフセット付きの日付時刻

now() で現在時刻を取得してみる

Instant instantNow = Instant.now();
System.out.println("instantNow:" + instantNow);

LocalDateTime ldtNow = LocalDateTime.now();
System.out.println("ldtNow:" + ldtNow);

ZonedDateTime zdtNow = ZonedDateTime.now();
System.out.println("zdtNow:" + zdtNow);

OffsetDateTime odtNow = OffsetDateTime.now();
System.out.println("odtNow:" + odtNow);

結果

Instant.now(): 2018-09-27T16:05:38.683Z
LocalDateTime.now(): 2018-09-28T01:05:39.077
ZonedDateTime.now(): 2018-09-28T01:05:39.078+09:00[Asia/Tokyo]
OffsetDateTime.now(): 2018-09-28T01:05:39.079+09:00

Instant.now() で取得した時刻は、マイナス9時間されている、つまりUTCで取得されていることが解ります。
ZonedDateTime と OffsetDateTime に関しても、9時間の差分が UTC に対してあることが明示されています。

タイムゾーンIDを指定してみる

LocalDateTime ldtjst = LocalDateTime.now(ZoneId.of("Asia/Tokyo"));
System.out.println("LocalDateTime.now(): " + ldtjst);

ZonedDateTime zdtjst = ZonedDateTime.now(ZoneId.of("Asia/Tokyo"));
System.out.println("ZonedDateTime.now(): " + zdtjst);

OffsetDateTime odtjst = OffsetDateTime.now(ZoneId.of("Asia/Tokyo"));
System.out.println("OffsetDateTime.now(): " + odtjst);

結果

LocalDateTime.now(): 2018-09-28T01:13:47.681
ZonedDateTime.now(): 2018-09-28T01:13:47.681+09:00[Asia/Tokyo]
OffsetDateTime.now(): 2018-09-28T01:13:47.681+09:00

※Instant.now() には、ZoneId を指定できません

結果、見た目は特に指定しなかった場合と変わらず。これは指定しなかった場合も、JST として解釈されているためと思われます。

タイムゾーンを変換してみる(JST -> UTC)

LocalDateTime ldtjst = LocalDateTime.now(ZoneId.of("Asia/Tokyo"));
LocalDateTime ldtutc = LocalDateTime.ofInstant(ldtjst.toInstant(ZoneOffset.UTC), ZoneId.of("UTC"));
System.out.println("LocalDateTime.now(): " + ldtutc);

ZonedDateTime zdtjst = ZonedDateTime.now(ZoneId.of("Asia/Tokyo"));
ZonedDateTime zdtutc = ZonedDateTime.ofInstant(zdtjst.toInstant(), ZoneId.of("UTC"));
System.out.println("ZonedDateTime.now(): " + zdtutc);

OffsetDateTime odtjst = OffsetDateTime.now(ZoneId.of("Asia/Tokyo"));
OffsetDateTime odtutc = OffsetDateTime.ofInstant(odtjst.toInstant(), ZoneId.of("UTC"));
System.out.println("OffsetDateTime.now(): " + odtutc);

結果

LocalDateTime.now(): 2018-09-28T01:25:50.328
ZonedDateTime.now(): 2018-09-27T16:25:50.329Z[UTC]
OffsetDateTime.now(): 2018-09-27T16:25:50.329Z

LocalDateTime.now() の見た目には変化なし、これはそもそも LocalDateTime がタイムゾーンを持たないから。
ZonedDateTime と OffsetDateTime は、マイナス9時間された UTCで取得されていることが解ります。
時刻の最後に "Z" が付いていますが、これは Zero timezone、つまり UTC からのオフセットが 0 であるという意味になります。

所感
現在時刻を取得しただけですが、各クラスがどのような意味を持つのかや、タイムゾーンを指定、変換したときの振る舞いを確認することが出来ました。
ZonedDateTime と OffsetDateTime の違いが解りにくいことと、各クラス間での変換については調べられていませんが、これだけでも、自分の中では少し整理できたように思います。

追記
"yyyyMMddHHmmss" は西暦を元にフォーマットしてくれる仕様。
"YYYYMMddHHmmss" はその年の最初の木曜日が含まれる週はたとえ昨年の日付(12/31)であっても今年としてフォーマットする仕様。

String ORG_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
String orgDate = "2017-12-31T13:00:01.247Z";
LocalDateTime ldt = LocalDateTime.parse(orgDate, DateTimeFormatter.ofPattern(ORG_FORMAT));
		
// yyyyMMddHHmmss
String NEW_FORMAT = "yyyyMMddHHmmss";
String result = ldt.format(DateTimeFormatter.ofPattern(NEW_FORMAT));
System.out.println(result); // 20171231130001

// YYYYMMddHHmmss
String NEW_FORMAT2 = "YYYYMMddHHmmss";
String result2 = ldt.format(DateTimeFormatter.ofPattern(NEW_FORMAT2));
System.out.println(result2); // 20181231130001

Apache Spark2.3 ブロードキャスト変数のパフォーマンス

ブロードキャスト変数は、リードオンリーの変数を効率的に各 Executor に送信する仕組みです。

Apache Spark2 にて、ブロードキャスト変数のパフォーマンスをローカル環境で確認してみました。
スレッド数は 3 を指定しています。
※ sparkConf.setMaster("local[3]");

ブロードキャスト変数なしの場合
1. 100 万件の数値リストを生成
2. 3件の文字列リストから RDD を生成
3. RDD の各文字列要素に対して、100 万件の数値リストをループし付加する処理(100万回ループ)を行う。
※ 即ち、3 の処理が (spark)Executor で行われる
計測結果(sec):
00:00:07.572
00:00:09.765
00:00:10.879

ブロードキャスト変数ありの場合
1. 100 万件の数値リストを生成し、それをブロードキャスト変数にする
2. 3件の文字列リストから RDD を生成
3. RDD の各文字列要素に対して、100 万件の数値リストをブロードキャスト変数で渡し、ループし付加する処理(100万回ループ)を行う。
※ 即ち、3 の処理が (spark)Executor で行われる
計測結果(sec):
00:00:01.593
00:00:01.802
00:00:01.668

ブロードキャスト変数を用いて、100万件の数値リストを Executor に渡した方が、圧倒的に速いという結果になりました。
いくら何でも違いが大きい印象を受けたので、これは Spark2 の恩恵かも?しれないと思い、Spark1.6でも試してみたところ
Spark1.6 ブロードキャスト変数なし(sec):
0:00:09.843
0:00:08.355
0:00:08.403

Spark1.6 ブロードキャスト変数あり(sec):
0:00:01.594
0:00:00.890
0:00:00.899

結果は誤差の範囲でした。むしろ1.6の方がほんの少し速いという結果に。
テストで使用したコード。
github.com

Couchbase 同期・非同期 insert(upsert)のパフォーマンス比較

Couchbase Java SDK を使用して、10万件のデータをインサートする場合のパフォーマンスを比較しました。
通常は同期処理にてインサートが行われますが、非同期用のAPIも用意されていましたので、それの比較となります。

Couchbase server は Docker で生成したもので、ノード数は1つだけです。

1.10万ループ内で1件ずつ insert(upsert) する場合(同期)

    protected void bigPut() {   	
    	// data preparation
    	stopWatch.start();
    	IntStream.range(0, 100000).forEach(i -> {
    		JsonObject user = JsonObject.empty()
        		    .put("firstname", "Walter")
        		    .put("lastname", "White")
        		    .put("job", "chemistry teacher")
        		    .put("age", i);
        	JsonDocument doc = JsonDocument.create("walter:" + i, user);
        	bucket.upsert(doc);
    	});
    	stopWatch.stop();
    	System.out.println(stopWatch);
    	stopWatch.reset();
    	
    	// disconnection
    	cluster.disconnect();
    }

2. 10万件のリストを作成し、そのリストを非同期で insert(upsert)した場合

protected void bulkPut(){
    	stopWatch.start();
    	List<JsonDocument> jsons = new ArrayList<>();
    	IntStream.range(0, 100000).forEach(i -> {
    		JsonObject user = JsonObject.empty()
        		    .put("firstname", "Walter")
        		    .put("lastname", "White")
        		    .put("job", "chemistry teacher")
        		    .put("age", i);
        	JsonDocument doc = JsonDocument.create("walter:" + i, user);
        	jsons.add(doc);
    	});
    	
    	Observable
        .from(jsons)
        .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(final JsonDocument docToInsert) {
                return bucket.async().upsert(docToInsert);
            }
        })
        .last()
        .toBlocking()
        .single();
    	
    	stopWatch.stop();
    	System.out.println(stopWatch);
    	stopWatch.reset();
    	
    	// disconnection
    	cluster.disconnect();
    }

一般的に非同期処理の方が速く処理されるのは想像しやすいところです。
ただ、2に関しては、10万件のリストを作成してから、更にそのリストからドキュメントを取り出すような処理を行うため、1周多めにループ処理を行うことになります。
しかし、その時間込みで計測を行いました。

結果(3回計測)
1.10万ループ内で1件ずつ insert(upsert) する場合(同期)
00:00:58.620
00:01:00.178
00:00:53.739

2. 10万件のリストを作成し、そのリストを非同期で insert(upsert)した場合
00:00:36.464
00:00:35.964
00:00:48.657

それでもやはり、非同期処理の方がトータルで速いという結果になりました。
10万件を同時に処理したとは言え、正直なところ、想像していたよりも差が大きい印象です。

Docker Couchbase Server の起動スクリプト

Docker で Couchbase Server のコンテナを作成し、サービスを起動するスクリプトです。

sudo /etc/init.d/couchbase-server start

# waiting for finishing to start couchbase-server
sleep 30s

# Setup Administrator username and password
curl -v -X POST http://localhost:8091/settings/web -d 'password=password&username=admin&port=8091'

sleep 5s

# Setup Bucket
curl -u admin:password -v -X POST http://localhost:8091/pools/default/buckets \
-d 'flushEnabled=1&threadsNumber=3&replicaIndex=0&replicaNumber=0&evictionPolicy=valueOnly&ramQuotaMB=597&bucketType=membase&name=default&authType=sasl&saslPassword='

sleep 5s

# Setup Index RAM Quota
curl -u admin:password -X POST http://localhost:8091/pools/default \
-d 'memoryQuota=5000' -d 'indexMemoryQuota=269'

Couchbase Server を起動し、REST API にて admin 用のユーザーとパスワードを設定。
その後、Bucket も作るようにしているのですが、いくつかの sleep が散見されます。

最初はこの sleep 置いてなかったのですが、Couchbase Server 起動コマンドを叩いた直後は、REST API が使えないので、Bucket のセットアップ以降が失敗していました。

とはいえ、適当に固定した時間を sleep させるのは不安定なので、retry function を設けました。

sudo /etc/init.d/couchbase-server start

function retryRequest () {
  count=0
  retry_upper_limit=5
  request=$1
  while :
  do
    command="curl -L ${request} -o /dev/null -w %{http_code}\n -s"
    res=$(${command})
    if [ $res = 200 ] || [ $res = 202 ] ; then
      break
    fi
    count=$(expr $count + 1)
    if [ ${count} -gt ${retry_upper_limit} ] ; then
      exit -1
    fi
    sleep 5s
  done
}

# waiting for finishing to start couchbase-server
retryRequest "http://localhost:8091"

# Setup Administrator username and password
retryRequest "-X POST http://localhost:8091/settings/web -d password=password&username=admin&port=8091"

# Setup Bucket
retryRequest "-u admin:password -v -X POST http://localhost:8091/pools/default/buckets -d flushEnabled=1&threadsNumber=3&replicaIndex=0&replicaNumber=0&evictionPolicy=valueOnly&ramQuotaMB=597&bucketType=membase&name=default&authType=sasl&saslPassword="

# Setup Index RAM Quota
retryRequest "-u admin:password -v -X POST http://localhost:8091/pools/default -d memoryQuota=2000&indexMemoryQuota=269"

REST API を取り合えず叩いてみて、成功コードが返ってこなかったら、5秒待ってリトライするようにしました。
環境に依存すると思いますが、REST API が使えるようになるまで4回ほどリトライしているので、20秒ほど要しているようです。

Spark2 AccumulatorV2

Spark2 で Accumulator を使おうと思ったら、deprecated になっていました。
代わりに AccumulatorV2 を使うようにとのこと。
https://spark.apache.org/docs/2.3.0/api/java/

旧 Accumulator と同じように使えるのかと思っていたら、AccumulatorV2 を継承して独自の Accumulator を定義する必要があるようです。
旧 Accumulator は単純に数値の加算処理しか対応していませんでしたが、独自に Accumulator を定義できるようになったということですね。
List を保持する Accumulator を作って、任意のタイミングで要素追加とかも出来そうです。

今回は、単純な加算処理だけ対応した Accumulator を以下のように定義しました。

class MyAccumulator extends AccumulatorV2<Integer, Integer> {
	
	private int count = 0;
	public MyAccumulator(int initialValue) {
		this.count = initialValue;
	}
	
	@Override
	public void add(Integer arg0) {
		this.count += arg0;
	}
	
	@Override
	public AccumulatorV2<Integer, Integer> copy() {
		return new MyAccumulator(value());
	}
	
	@Override
	public boolean isZero() {
		if (this.count == 0) {
			return true;
		}
		return false;
	}
	
	@Override
	public void merge(AccumulatorV2<Integer, Integer> arg0) {
		add(arg0.value());
	}
	
	@Override
	public void reset() {
		this.count = 0;
	}
	
	@Override
	public Integer value() {
		return this.count;
	}
}

気をつける必要があるのは、それぞれのメソッドの処理はきちんと書くこと。
何も書かなかったり、適当に null を返したりしてはいけないという意味です。
例えば、merge() メソッドは呼ばれていないと思って何も処理を書かないでいましたが、その場合、いくら加算しても結果は 0 になってしまいました。
それぞれのメソッドが内部的には呼ばれているようです。

使い方は以下のように、AccumulatorV2 を作り、(加算が)必要なタイミングで add を呼び出してあげます。

final AccumulatorV2<Integer, Integer> blankLines = new MyAccumulator(0);
jsc.sc().register(blankLines);
rdd.foreach(line -> {
	if (line.equals("")) {
		blankLines.add(1);
	}
});

サンプルソース
spark/AccumulatorV2Try.java at master · blueskyarea/spark · GitHub

文字コードの歴史を少し知りたい

python のドキュメントですが、文字コードの歴史について書かれている部分がありました。
https://docs.python.org/2.7/howto/unicode.html

文字コードと言えば、何も考えずに "UTF-8" でみたいなところがあって、正直よく分かっていません。

歴史
ドキュメントによると、1968年に ASCII コードが標準化されたとあります。

アメリカの標準だったということで、普通のアルファベットのみが定義されていて、アクセント付きの文字( ‘é’ or ‘Í’)については、定義されていなかったとのこと。

1980年代になっても、ほとんどのパーソナルコンピューターは 8bit だったため、0 から 255 までの値しか扱えない。
その中でも、ASCII コードは 127 までしか使っていなかったので、残りの 128 から 255 までを使って、アクセント付きの文字などを補おうと頑張っていた様子。
とはいえ、世界各国の文字を補うにはあまりにも少なすぎるし、独自仕様が出来すぎてしまったので、Unicode 標準化という動きが始まったらしい。

Unicode では、16bit で文字を取り扱うことを始めたので、2^16 = 65,536 の値が使用できることになる。
当初はこれで、全ての言語のアルファベットを扱うことを目標にしていたみたいですが、後になって足りないことが発覚。
そして Unicode では、0 から 1,114,111 の値(どのように算出しているかは不明ですが)が使用できるようにしたらしい。

Unicodeとは
世界中の文字の集合。一応、それぞれの文字に対して番号は振られているみたいですが、それ自体はコンピュータが解釈する数値ではないようです。
平仮名の ”あ” だったら、U+3042。
符号化文字集合とも言われているようですが、あまり覚えていません。
これらの文字をパソコンで扱うためには、「符号化形式」にしたがって、文字を数値化する必要がある。
その「符号化形式」と呼ばれるものが Shift-JIS や UTF-8 になる。
Unicode = UTF-16 みたいなことを情報処理試験対策で学んだ気もしますが、この解釈だと違うようです

UTF-8とは
ASCIIコードの文字に、世界中の文字を加えた文字コード
ASCIIで定義している文字を、Unicodeでそのまま使用することを目的としているので、互換性が高いようです。
ASCIIと同じ部分は1バイトで表現し、そのほかの部分を 2~6バイトで表現する可変長の符号化方式なので、漢字や仮名文字(3〜4バイト)などはデータサイズが少し大きくなるようです。

互換性が高く世界中の色んなアプリケーションが採用しているので、とりあえず UTF-8 を基準に考えるのは当然のことかなと思います。

Java インターフェース 実装してみる

前回の記事で、インターフェースのメリットが解った?ところで、実装をしてみます。
blueskyarea.hatenablog.com

今回は、データベースからデータを取り出すところに、Dao インターフェースを実装します。
インターフェースの定義
Dao をインターフェースを定義します。
データベースから全てのデータを取得するための、getAll() メソッドを定義しています。

public interface Dao {
	Map<Integer, String> getAll();
}
MySqlDao の作成

例としてMaySql からデータを取得するとして、MySqlDao を作成します。
Dao インターフェースを実装するため、getAll() の実装が強制されます。
もちろん戻り値の型も インターフェース側で定義したものと同じ Map でなければなりません。
これは MySqlDao の実装担当者が、意図しない型でデータを返却することを防いでくれます。

public class MySqlDao implements Dao {
	@Override
	public Map<Integer, String> getAll() {
		Map<Integer, String> map = new HashMap<>();
		map.put(1, "mysql1");
		map.put(2, "mysql2");
		map.put(3, "mysql3");
		return map;
	}
}

※今回は例なので、単に固定値を返しているだけです

クライアント の作成

MySqlDao を実際に使うクライアントを作ります。

public class Client {
	public static void main(String[] args) {
		Dao dao = new MySqlDao();
		new Client().getData(dao);
	}
	
	private void getData(Dao dao) {
		dao.getAll().forEach((key, value) -> {
			System.out.println(key + ":" + value);
		});
	}
}
実行1
1:mysql1
2:mysql2
3:mysql3

このように期待した値が得られています。

MySql -> Postgres への変更

さてここで、データソースを MySql から Postgres に変更することになりました。
もちろんクライアント側は、同じ出力結果を期待しています。
まず、Postgres からデータを取得するための PostgresDao を作成します。
同じく Dao を実装しているので、getAll() の実装が強制されます。
クライアント側は、MySql の時と同じ出力結果を期待しているので、これは好都合です。

public class PostgresDao implements Dao {
	@Override
	public Map<Integer, String> getAll() {
		Map<Integer, String> map = new HashMap<>();
		map.put(1, "postgres1");
		map.put(2, "postgres2");
		map.put(3, "postgres3");
		return map;
	}
}
クライアント側で使う Dao を変更

あとは、クライアント側で使う Dao を MySql から Postgres に変更してあげます。
以下のように、変更したのは、生成する dao インスタンスだけです。

public class Client {
	public static void main(String[] args) {
		//Dao dao = new MySqlDao();
		Dao dao = new PostgresDao();  // 変更したのはここだけ
		new Client().getData(dao);
	}
	
	private void getData(Dao dao) {
		dao.getAll().forEach((key, value) -> {
			System.out.println(key + ":" + value);
		});
	}
}
実行2
1:postgres1
2:postgres2
3:postgres3

このようにクライアント側では、使用する Dao を変更しただけで、期待する結果を得ることができました。
これは利用者側にとって好都合です。
利用者側は、Dao がどのようにデータをデータベースから取得しているかなどは意識する必要がありません。
使いたいデータベースの Dao を使うだけで、期待する値が得られます。

感想

個人的に同じシステムにおいて、データソースを途中で変更するというケースに遭遇したことがないので、この恩恵を実際に得られたことはないです。
ただ、インターフェースを使うことによって、疎結合を実現できれば、変更に強いシステムになり得ることは理解できました。