社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

Apache Spark2.3 ブロードキャスト変数のパフォーマンス

ブロードキャスト変数は、リードオンリーの変数を効率的に各 Executor に送信する仕組みです。

Apache Spark2 にて、ブロードキャスト変数のパフォーマンスをローカル環境で確認してみました。
スレッド数は 3 を指定しています。
※ sparkConf.setMaster("local[3]");

ブロードキャスト変数なしの場合
1. 100 万件の数値リストを生成
2. 3件の文字列リストから RDD を生成
3. RDD の各文字列要素に対して、100 万件の数値リストをループし付加する処理(100万回ループ)を行う。
※ 即ち、3 の処理が (spark)Executor で行われる
計測結果(sec):
00:00:07.572
00:00:09.765
00:00:10.879

ブロードキャスト変数ありの場合
1. 100 万件の数値リストを生成し、それをブロードキャスト変数にする
2. 3件の文字列リストから RDD を生成
3. RDD の各文字列要素に対して、100 万件の数値リストをブロードキャスト変数で渡し、ループし付加する処理(100万回ループ)を行う。
※ 即ち、3 の処理が (spark)Executor で行われる
計測結果(sec):
00:00:01.593
00:00:01.802
00:00:01.668

ブロードキャスト変数を用いて、100万件の数値リストを Executor に渡した方が、圧倒的に速いという結果になりました。
いくら何でも違いが大きい印象を受けたので、これは Spark2 の恩恵かも?しれないと思い、Spark1.6でも試してみたところ
Spark1.6 ブロードキャスト変数なし(sec):
0:00:09.843
0:00:08.355
0:00:08.403

Spark1.6 ブロードキャスト変数あり(sec):
0:00:01.594
0:00:00.890
0:00:00.899

結果は誤差の範囲でした。むしろ1.6の方がほんの少し速いという結果に。
テストで使用したコード。
github.com

Couchbase 同期・非同期 insert(upsert)のパフォーマンス比較

Couchbase Java SDK を使用して、10万件のデータをインサートする場合のパフォーマンスを比較しました。
通常は同期処理にてインサートが行われますが、非同期用のAPIも用意されていましたので、それの比較となります。

Couchbase server は Docker で生成したもので、ノード数は1つだけです。

1.10万ループ内で1件ずつ insert(upsert) する場合(同期)

    protected void bigPut() {   	
    	// data preparation
    	stopWatch.start();
    	IntStream.range(0, 100000).forEach(i -> {
    		JsonObject user = JsonObject.empty()
        		    .put("firstname", "Walter")
        		    .put("lastname", "White")
        		    .put("job", "chemistry teacher")
        		    .put("age", i);
        	JsonDocument doc = JsonDocument.create("walter:" + i, user);
        	bucket.upsert(doc);
    	});
    	stopWatch.stop();
    	System.out.println(stopWatch);
    	stopWatch.reset();
    	
    	// disconnection
    	cluster.disconnect();
    }

2. 10万件のリストを作成し、そのリストを非同期で insert(upsert)した場合

protected void bulkPut(){
    	stopWatch.start();
    	List<JsonDocument> jsons = new ArrayList<>();
    	IntStream.range(0, 100000).forEach(i -> {
    		JsonObject user = JsonObject.empty()
        		    .put("firstname", "Walter")
        		    .put("lastname", "White")
        		    .put("job", "chemistry teacher")
        		    .put("age", i);
        	JsonDocument doc = JsonDocument.create("walter:" + i, user);
        	jsons.add(doc);
    	});
    	
    	Observable
        .from(jsons)
        .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(final JsonDocument docToInsert) {
                return bucket.async().upsert(docToInsert);
            }
        })
        .last()
        .toBlocking()
        .single();
    	
    	stopWatch.stop();
    	System.out.println(stopWatch);
    	stopWatch.reset();
    	
    	// disconnection
    	cluster.disconnect();
    }

一般的に非同期処理の方が速く処理されるのは想像しやすいところです。
ただ、2に関しては、10万件のリストを作成してから、更にそのリストからドキュメントを取り出すような処理を行うため、1周多めにループ処理を行うことになります。
しかし、その時間込みで計測を行いました。

結果(3回計測)
1.10万ループ内で1件ずつ insert(upsert) する場合(同期)
00:00:58.620
00:01:00.178
00:00:53.739

2. 10万件のリストを作成し、そのリストを非同期で insert(upsert)した場合
00:00:36.464
00:00:35.964
00:00:48.657

それでもやはり、非同期処理の方がトータルで速いという結果になりました。
10万件を同時に処理したとは言え、正直なところ、想像していたよりも差が大きい印象です。

Docker Couchbase Server の起動スクリプト

Docker で Couchbase Server のコンテナを作成し、サービスを起動するスクリプトです。

sudo /etc/init.d/couchbase-server start

# waiting for finishing to start couchbase-server
sleep 30s

# Setup Administrator username and password
curl -v -X POST http://localhost:8091/settings/web -d 'password=password&username=admin&port=8091'

sleep 5s

# Setup Bucket
curl -u admin:password -v -X POST http://localhost:8091/pools/default/buckets \
-d 'flushEnabled=1&threadsNumber=3&replicaIndex=0&replicaNumber=0&evictionPolicy=valueOnly&ramQuotaMB=597&bucketType=membase&name=default&authType=sasl&saslPassword='

sleep 5s

# Setup Index RAM Quota
curl -u admin:password -X POST http://localhost:8091/pools/default \
-d 'memoryQuota=5000' -d 'indexMemoryQuota=269'

Couchbase Server を起動し、REST API にて admin 用のユーザーとパスワードを設定。
その後、Bucket も作るようにしているのですが、いくつかの sleep が散見されます。

最初はこの sleep 置いてなかったのですが、Couchbase Server 起動コマンドを叩いた直後は、REST API が使えないので、Bucket のセットアップ以降が失敗していました。

とはいえ、適当に固定した時間を sleep させるのは不安定なので、retry function を設けました。

sudo /etc/init.d/couchbase-server start

function retryRequest () {
  count=0
  retry_upper_limit=5
  request=$1
  while :
  do
    command="curl -L ${request} -o /dev/null -w %{http_code}\n -s"
    res=$(${command})
    if [ $res = 200 ] || [ $res = 202 ] ; then
      break
    fi
    count=$(expr $count + 1)
    if [ ${count} -gt ${retry_upper_limit} ] ; then
      exit -1
    fi
    sleep 5s
  done
}

# waiting for finishing to start couchbase-server
retryRequest "http://localhost:8091"

# Setup Administrator username and password
retryRequest "-X POST http://localhost:8091/settings/web -d password=password&username=admin&port=8091"

# Setup Bucket
retryRequest "-u admin:password -v -X POST http://localhost:8091/pools/default/buckets -d flushEnabled=1&threadsNumber=3&replicaIndex=0&replicaNumber=0&evictionPolicy=valueOnly&ramQuotaMB=597&bucketType=membase&name=default&authType=sasl&saslPassword="

# Setup Index RAM Quota
retryRequest "-u admin:password -v -X POST http://localhost:8091/pools/default -d memoryQuota=2000&indexMemoryQuota=269"

REST API を取り合えず叩いてみて、成功コードが返ってこなかったら、5秒待ってリトライするようにしました。
環境に依存すると思いますが、REST API が使えるようになるまで4回ほどリトライしているので、20秒ほど要しているようです。

Spark2 AccumulatorV2

Spark2 で Accumulator を使おうと思ったら、deprecated になっていました。
代わりに AccumulatorV2 を使うようにとのこと。
https://spark.apache.org/docs/2.3.0/api/java/

旧 Accumulator と同じように使えるのかと思っていたら、AccumulatorV2 を継承して独自の Accumulator を定義する必要があるようです。
旧 Accumulator は単純に数値の加算処理しか対応していませんでしたが、独自に Accumulator を定義できるようになったということですね。
List を保持する Accumulator を作って、任意のタイミングで要素追加とかも出来そうです。

今回は、単純な加算処理だけ対応した Accumulator を以下のように定義しました。

class MyAccumulator extends AccumulatorV2<Integer, Integer> {
	
	private int count = 0;
	public MyAccumulator(int initialValue) {
		this.count = initialValue;
	}
	
	@Override
	public void add(Integer arg0) {
		this.count += arg0;
	}
	
	@Override
	public AccumulatorV2<Integer, Integer> copy() {
		return new MyAccumulator(value());
	}
	
	@Override
	public boolean isZero() {
		if (this.count == 0) {
			return true;
		}
		return false;
	}
	
	@Override
	public void merge(AccumulatorV2<Integer, Integer> arg0) {
		add(arg0.value());
	}
	
	@Override
	public void reset() {
		this.count = 0;
	}
	
	@Override
	public Integer value() {
		return this.count;
	}
}

気をつける必要があるのは、それぞれのメソッドの処理はきちんと書くこと。
何も書かなかったり、適当に null を返したりしてはいけないという意味です。
例えば、merge() メソッドは呼ばれていないと思って何も処理を書かないでいましたが、その場合、いくら加算しても結果は 0 になってしまいました。
それぞれのメソッドが内部的には呼ばれているようです。

使い方は以下のように、AccumulatorV2 を作り、(加算が)必要なタイミングで add を呼び出してあげます。

final AccumulatorV2<Integer, Integer> blankLines = new MyAccumulator(0);
jsc.sc().register(blankLines);
rdd.foreach(line -> {
	if (line.equals("")) {
		blankLines.add(1);
	}
});

サンプルソース
spark/AccumulatorV2Try.java at master · blueskyarea/spark · GitHub

文字コードの歴史を少し知りたい

python のドキュメントですが、文字コードの歴史について書かれている部分がありました。
https://docs.python.org/2.7/howto/unicode.html

文字コードと言えば、何も考えずに "UTF-8" でみたいなところがあって、正直よく分かっていません。

歴史
ドキュメントによると、1968年に ASCII コードが標準化されたとあります。

アメリカの標準だったということで、普通のアルファベットのみが定義されていて、アクセント付きの文字( ‘é’ or ‘Í’)については、定義されていなかったとのこと。

1980年代になっても、ほとんどのパーソナルコンピューターは 8bit だったため、0 から 255 までの値しか扱えない。
その中でも、ASCII コードは 127 までしか使っていなかったので、残りの 128 から 255 までを使って、アクセント付きの文字などを補おうと頑張っていた様子。
とはいえ、世界各国の文字を補うにはあまりにも少なすぎるし、独自仕様が出来すぎてしまったので、Unicode 標準化という動きが始まったらしい。

Unicode では、16bit で文字を取り扱うことを始めたので、2^16 = 65,536 の値が使用できることになる。
当初はこれで、全ての言語のアルファベットを扱うことを目標にしていたみたいですが、後になって足りないことが発覚。
そして Unicode では、0 から 1,114,111 の値(どのように算出しているかは不明ですが)が使用できるようにしたらしい。

Unicodeとは
世界中の文字の集合。一応、それぞれの文字に対して番号は振られているみたいですが、それ自体はコンピュータが解釈する数値ではないようです。
平仮名の ”あ” だったら、U+3042。
符号化文字集合とも言われているようですが、あまり覚えていません。
これらの文字をパソコンで扱うためには、「符号化形式」にしたがって、文字を数値化する必要がある。
その「符号化形式」と呼ばれるものが Shift-JIS や UTF-8 になる。
Unicode = UTF-16 みたいなことを情報処理試験対策で学んだ気もしますが、この解釈だと違うようです

UTF-8とは
ASCIIコードの文字に、世界中の文字を加えた文字コード
ASCIIで定義している文字を、Unicodeでそのまま使用することを目的としているので、互換性が高いようです。
ASCIIと同じ部分は1バイトで表現し、そのほかの部分を 2~6バイトで表現する可変長の符号化方式なので、漢字や仮名文字(3〜4バイト)などはデータサイズが少し大きくなるようです。

互換性が高く世界中の色んなアプリケーションが採用しているので、とりあえず UTF-8 を基準に考えるのは当然のことかなと思います。

Java インターフェース 実装してみる

前回の記事で、インターフェースのメリットが解った?ところで、実装をしてみます。
blueskyarea.hatenablog.com

今回は、データベースからデータを取り出すところに、Dao インターフェースを実装します。
インターフェースの定義
Dao をインターフェースを定義します。
データベースから全てのデータを取得するための、getAll() メソッドを定義しています。

public interface Dao {
	Map<Integer, String> getAll();
}
MySqlDao の作成

例としてMaySql からデータを取得するとして、MySqlDao を作成します。
Dao インターフェースを実装するため、getAll() の実装が強制されます。
もちろん戻り値の型も インターフェース側で定義したものと同じ Map でなければなりません。
これは MySqlDao の実装担当者が、意図しない型でデータを返却することを防いでくれます。

public class MySqlDao implements Dao {
	@Override
	public Map<Integer, String> getAll() {
		Map<Integer, String> map = new HashMap<>();
		map.put(1, "mysql1");
		map.put(2, "mysql2");
		map.put(3, "mysql3");
		return map;
	}
}

※今回は例なので、単に固定値を返しているだけです

クライアント の作成

MySqlDao を実際に使うクライアントを作ります。

public class Client {
	public static void main(String[] args) {
		Dao dao = new MySqlDao();
		new Client().getData(dao);
	}
	
	private void getData(Dao dao) {
		dao.getAll().forEach((key, value) -> {
			System.out.println(key + ":" + value);
		});
	}
}
実行1
1:mysql1
2:mysql2
3:mysql3

このように期待した値が得られています。

MySql -> Postgres への変更

さてここで、データソースを MySql から Postgres に変更することになりました。
もちろんクライアント側は、同じ出力結果を期待しています。
まず、Postgres からデータを取得するための PostgresDao を作成します。
同じく Dao を実装しているので、getAll() の実装が強制されます。
クライアント側は、MySql の時と同じ出力結果を期待しているので、これは好都合です。

public class PostgresDao implements Dao {
	@Override
	public Map<Integer, String> getAll() {
		Map<Integer, String> map = new HashMap<>();
		map.put(1, "postgres1");
		map.put(2, "postgres2");
		map.put(3, "postgres3");
		return map;
	}
}
クライアント側で使う Dao を変更

あとは、クライアント側で使う Dao を MySql から Postgres に変更してあげます。
以下のように、変更したのは、生成する dao インスタンスだけです。

public class Client {
	public static void main(String[] args) {
		//Dao dao = new MySqlDao();
		Dao dao = new PostgresDao();  // 変更したのはここだけ
		new Client().getData(dao);
	}
	
	private void getData(Dao dao) {
		dao.getAll().forEach((key, value) -> {
			System.out.println(key + ":" + value);
		});
	}
}
実行2
1:postgres1
2:postgres2
3:postgres3

このようにクライアント側では、使用する Dao を変更しただけで、期待する結果を得ることができました。
これは利用者側にとって好都合です。
利用者側は、Dao がどのようにデータをデータベースから取得しているかなどは意識する必要がありません。
使いたいデータベースの Dao を使うだけで、期待する値が得られます。

感想

個人的に同じシステムにおいて、データソースを途中で変更するというケースに遭遇したことがないので、この恩恵を実際に得られたことはないです。
ただ、インターフェースを使うことによって、疎結合を実現できれば、変更に強いシステムになり得ることは理解できました。

Java インターフェース メリット わからない

もう何度も実装したことがあるにも関わらず、そのメリットがいまいちピンときていなかったりします。
ネットで検索すると、メリットについて語っている色々な情報が出てきます。
その時は何となく理解できた気になるのですが、しばらくするとまたアレなんだっけ?という疑問がでてくるループに入っています。

1. インターフェースとは、処理は具体的に書かれていない、メソッドの型だけを宣言したもの

インターフェースがどういうものかは分かりますが、これだけではメリットについては何も分からないです。
処理を具体的に書かないと、何の役にも立たないのに、何でそんなことをするのか?という疑問が残ります。。

2. 処理内容を具体的に書いていないから、後で変える事ができる

なぜ、処理内容を後で変える事ができるとうれしいのでしょうか。
結局実装するのだから、それを後回しにしているだけのようにも聞こえますが、それがメリットなのでしょうか。。

3. インターフェースを実装したクラスではメソッドの実装が必須だ

そのメソッドを実装することを強制することができるのは、実装漏れを防ぐというメリットがあるのでしょうか。。
何れにしても、欲しいメソッドを実装していなければ処理は動かないのだから、実装漏れすることは無いように思います。

4. 処理は別で記述する方が、後々の変更などが予想される場合には便利

後々の変更というよりは、新しいクラスを追加し、それに実装するという意味かなと思います。
それが便利なのでしょうか。。

5. インターフェースは多重継承のように複数のインタフェースを実装可能

複数のインターフェースを実装するということは、それだけ実装しなければいけないメソッドが増えることになります。
実際に複数のインターフェースを実装するとややこしいし、これをメリットと捉えることができるのでしょうか。。

6. 利用者側へこちらが指定したインターフェースの使用を要求する

内容的には、”インターフェースを実装したクラスではメソッドの実装が必須” と同じことを言っています。
ただ、こちらの表現の方が利用者側に対して何らかの実装を要求する(強制させる)という意味合いが理解しやすいです。

7. Runnable インターフェースを例に考えてみる

Java でスレッド処理を書くときに出てくる Runnable インターフェースです。

private static class MyThread implements Runnable {
    @Override
    public void run() {
        // 何かの処理
    }
}

Runnable インターフェースを実装したクラスは、run() メソッドを実装することを利用者に強制することができます。
run() メソッドに書く処理内容は利用者側に一任されており、好きな処理を書くことができます。
利用者側は以下のように Thread のインスタンスを生成することで、簡単にスレッドを作ることができます。

Thread t1 = new Thread(new MyThread());
t1.start();

もし、MyThread とは違う処理をするスレッド(MyThread2 )を作りたくなったとします。

private static class MyThread2 implements Runnable {
    @Override
    public void run() {
        // 何かの処理
    }
}

この場合も同じく利用者側に要求されるのは、run() メソッドの実装です。
逆に言うと、それ以外には利用者側に要求されていません。
同じく run() メソッドに処理を実装することで、異なる処理を持つスレッドを作ることができます。

Thread t2 = new Thread(new MyThread2());
t2.start();

要はインターフェースが同じなので、利用者側に実装を要求されるメソッドも同じものになります。

この観点からすると、インターフェースは利用者側にとってメリットがあるように思えます。
利用者側がその機能を利用するためには、何を実装する必要があるのかが、明示されているからです。

8. インターフェースで疎結合を実現することができる

疎結合、すなわち各クラス間での依存性を低くすることで、変更に強いアプリケーションを作ることができるという意味かと思います。
以下の例が分かりやすかったです。
悪い例:

public class PC {
    public void setKeyboard(Keyboard keyboard) {};
    public void setMouse(Mouse mouse) {};
}
public class Keyboard {}
public class Mouse {}

良い例:

public class PC {
    public void setUsbDevice1(USB usb) {};
    public void setUsbDevice2(USB usb) {};
    public void setUsbDevice3(USB usb) {};
}
public class Keyboard implements USB {}
public class Mouse implements USB {}

悪い例の方は、PC クラスが Keyboard クラスと Mouse クラスに依存してしまっていて、仮に Keyboard クラスが削除された場合、PC クラスも修正しなければならないし、もし Printer クラスが追加された場合、PC クラスも修正しなければならない。
良い例の方は、USB というインターフェースを介しているので、仮に Keyboard クラスが削除された場合でも、PCクラスに修正は発生しないし、もし Printer クラスが追加された場合、USB インターフェースを実装すれば、PC クラスを修正することなく、既存のメソッド setUsbDevice[1-3] を利用することができます。

なるほど、具体的な例を見てみると、より納得感があるように思います。
今のところ、以下の2点がメリットと思えてきました。
1.利用者側に必要な実装を明示できること
2.クラス間の疎結合を実現できること

基本的に利用者側にメリットがありそうです。
あとは、実際にこれらを意識して、設計・プログラミングができるかどうか。1はまだしも、2がやや難しそうです。