社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

Linux でディスク使用量を調べる

頻繁に使用するコマンドなのでメモ。

ディスク全体での使用量を調べる

df -h
$ df -h
Filesystem      Size  Used Avail Use% Mounted on
/dev/sda5        26G   22G  3.2G  88% /
tmpfs           395M  1.2M  394M   1% /run

※ h オプションが無かったら、単位がつきません

指定ディレクトリでの使用量を調べる

du -sh [ディレクトリのパス]
$ du -sh ~/
12G	/home/xx/

※ s オプションが無かったら、統計ではなく階下層ディレクトリ単位で表示されます

指定ディレクトリ配下でディスク使用量の内訳を調べる

du -sh [ディレクトリのパス/*]
$ du -sh ~/*
31M	/home/xx/backup
4.9G	/home/xx/bin

調べるディレクトリの深さ(階層)を指定する

du -h --max-depth 2 [ディレクトリのパス/*]
$ du -h --max-depth 2 ~/*
16M	/home/xx/backup/201705
15M	/home/xx/backup/201706
4.9G	/home/xx/bin/script

JMockit でメソッド内から呼び出しているメソッドのみをモックする

表記のとおりです。
例えば、以下のような methodA と methodB があった場合、junitテストで methodA を呼び出した時の methodB の振る舞いを変更したいということです。
※戻り値を 5 以外にしたい

public class PartialMock {
  public int methodA(int a) {
    int b = methodB();
    return a + b;
  }

  public int methodB() {
    return 5;
  }
}

JMockit ではパーシャルモックという機能が使えるようです。
(Partial Mock: 部分的なモック)

下記のように、Expectations の引数にインスタンスを渡して、振る舞いを変えたいメソッドを定義してあげます。
こうすることで、methodB の戻り値が 5 -> 10 に変わります。

@Test
public void testMethodA_WithPartialMock() {
  final PartialMock partialMock = new PartialMock();
	
  // changed behavior of methodB
  new Expectations(partialMock) {{
    partialMock.methodB(); result = 10;
  }};
	
  assertThat(partialMock.methodA(5), is(15));
}

これも簡単に使えるし、強力な機能だと思います。
テスト対象クラス
テストクラス

初 JMockit の感想

JUnit で使えるモックのライブラリには、mockito, jmockit, easymock などがあります。
個人的には mockito しか使ったことがなかったのですが、jmockit がよく使われているらしいので、試してみました。

具体的な使い方などは既に多くの記事が存在しますので、ここには書かないです。
※個人的な感想

1.テスト対象のクラス
Hoge.class -> テストの対象とする主クラス。
Foo.class -> Hoge.class から呼び出されるクラス。

どちらも、文字列を返すだけのシンプルなメソッドしか用意していません。
そして実際のテストクラスです。

テストコード1

テストコード2


2.クラス変数として mock を定義すると、全テストメソッドに適用される

// To specify as class value, every test can get the mock.
@Mocked private Hoge hogeMock;
	
@Test
public void testLocalValueBecomeMockFromClassValue() {
	Hoge hoge = new Hoge();
	assertThat(hoge.methodHoge(), nullValue());
}

@Test
public void testLocalValueBecomeMockFromClassValue2() {
	assertThat(hogeMock.methodHoge(), nullValue());
}

シンプルなテストクラスに対しては便利かもしれないけど、意図しない mock が適用されてしまう可能性がある。
次の例のように、各テストクラス毎に mock を渡してあげる方がよいと思います。

@Test
public void testLocalValueBecomeMockFromArg(@Mocked Hoge hogeMock) {
	Hoge hoge = new Hoge();
	assertThat(hoge.methodHoge(), nullValue());
}

3.特定のインプットに対する戻り値をテストするなら、Expectations。不特定のインプットに対しては NonStrictExpectations。
Expectations

@Test
public void testExpectationsWithReturnValue() {
	new Expectations() {{
		hogeMock.methodHoge(); result = "called without arguments.";
		hogeMock.methodHoge("hoge"); result = "called with arguments";
	}};
	
	assertThat(hogeMock.methodHoge(), is("called without arguments."));
	assertThat(hogeMock.methodHoge("hoge"), is("called with arguments"));
}

Expectations を使う場合は、Expectations 内で定義した順番と、assert の順番を揃える必要がある点に注意ですね。

NonStrictExpectations

@Test
public void testNonStrictExpectations() {
	new NonStrictExpectations() {{
		hogeMock.methodHoge(anyString); result = "stub returns value";
	}};
	
	// To specify "anyString", always returns same value.
	assertThat(hogeMock.methodHoge("hoge"), is("stub returns value"));
	assertThat(hogeMock.methodHoge("mock"), is("stub returns value"));
	assertThat(hogeMock.methodHoge("always"), is("stub returns value"));
}

どんな文字列でもの意味である anyString とか便利ですね。
スタブとして使う部品が入力値に依らず、常に同じ値を返して良いのであれば、これは使えますね。

4.Injectable を使えば、テスト対象クラスのクラス変数も簡単に mock にできる

@Tested private Hoge hogeTested;
@Injectable private Foo foo;

@Test
public void testHogeMethodHasIntValue() {
	final String abcString = "abc";
	new Expectations() {{
		foo.methodString(abcString); result = "return from methodString";
	}};
	
	assertThat(hogeTested.methodHoge(abcString), is("return from methodString"));
}

Injectable で定義されたクラス変数は Tested で定義されたクラス内で mock になる。
戻り値も自由自在に定義できる。すごく便利ですね。

5.感想
初めて JMockit 使ってみましたが、簡単に mock が作れて戻り値の定義も出来るので非常に便利だと思います。
慣れてくれば楽に書けるようになりそうです。
ただあまり乱用すると、可読性が落ちそうですし、mock 全てに対して言えることですが、 mock 化すべき対象は API や外部ストレージからの応答など、可変性があるものに限定したいですね。

Spark Streaming で テキストファイルへのセーブ

ソースコード

RDD には saveAsTextFile というメソッドがあり、引数に指定したディレクトリに簡単に出力することができます。
Spark Streaming における DStream にも saveAsTextFiles というメソッドがありました。

ただし、Java での JavaDStream から使う場合は、dstream() を使って JavaDStream -> DStream に変換しなければなりません。

JavaDStream javaDStream;
DStream dstream = javaDStream.dstream();
dstream.saveAsTextFiles("prefix", "suffix");

1. "prefix" と "suffix" ?
ただ、この "prefix" と "suffix" が分かりにくい。。
実際に以下のような指定をして動かしてみると、

dstream.saveAsTextFiles("/tmp/output", "txt");

期待していたのは、/tmp/output ディレクトリ配下にファイルが生成されることでしたが
/tmp ディレクトリ配下に以下のようなディレクトリが生成されてしまいました。
ファイルではなくて以下のディレクトリです。

ls /tmp
output-1498060642000.txt
output-1498060643000.txt
output-1498060644000.txt
output-1498060645000.txt
output-1498060646000.txt

そもそもディレクトリなのに .txt という拡張子がついているのが違和感。

2. "prefix" を "/tmp/output/" にして、ディレクトリであることを明示してみる
最後に "/" を入れただけです。

dstream.saveAsTextFiles("/tmp/output/", "txt");

同じく期待していたのは、/tmp/output ディレクトリ配下にファイルが生成されることでしたが
/tmp/output ディレクトリ配下に、やはりディレクトリが生成されてしまいました。
しかも、ディレクトリ名の先頭に "-" が付いている始末。

/tmp/output$ ls
-1498061778000.txt  -1498061780000.txt  -1498061782000.txt  -1498061784000.txt
-1498061779000.txt  -1498061781000.txt  -1498061783000.txt  -1498061785000.txt

完全にこの "prefix" の指定の仕方は間違っている気がします。

3. 空っぽのディレクトリが作られまくる
この Streaming は Durations を 1秒間隔で設定していますが、これによって毎秒ディレクトリが生成される動きになりました。
たとえ、出力する内容が何も無いにしてもです。
空っぽのディレクトリ、正確には _SUCCESS ファイルのみ入っているディレクトリ。
もちろん、要らないディレクトリです。

4. データが空の時は何も出力したくない
DStream は RDD の並びである。
という訳で、foreachRDD なんてメソッドが用意されていました。
RDD を一つずつ取り出して、各 RDD に対して処理することができます。
RDD には isEmpty を使えば空っぽかどうか確認できるので、結局・・・

JavaDStream javaDStream;
javaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
	public Void call(JavaRDD<String> rdd) throws Exception {
		if(!rdd.partitions().isEmpty()) {
			rdd.saveAsTextFile("/tmp/output");
		}
	        return null;
	}
});

こんな形すれば、実際に新しいデータの入力があった時だけ、出力処理が行われるようになりました。
return null; というのが気になりますが。

Spark Streaming の textFileStream で複数のディレクトリを対象にしてみる

ソースコード

単に2つの DStream を作成してあげるだけです。

// create DStream from text file
String logDir = "/tmp/logs";
String logDir2 = "/tmp/logs2";
JavaDStream<String> logData = jssc.textFileStream(logDir);
JavaDStream<String> logData2 = jssc.textFileStream(logDir2);

// output
logData.print();
logData2.print();

// start streaming
jssc.start();

// wait for end of job
jssc.awaitTermination();

出力結果
print() の出力結果ですが、以下のように1つの間隔(今回は一秒)に対して
2つの結果が出力される場所が出来ていました。

-------------------------------------------
Time: 1498053561000 ms
-------------------------------------------

-------------------------------------------
Time: 1498053561000 ms
-------------------------------------------

-------------------------------------------
Time: 1498053562000 ms
-------------------------------------------
2017-06-21T22:58:19+09:00	test.access1	{"message":"66.249.69.97 - - [24/Sep/2014:22:25:44 +0000] \"GET /071300/242153 HTTP/1.1\" 404 514 \"-\" \"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)\""}
2017-06-21T22:58:19+09:00	test.access1	{"message":"71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] \"GET /error HTTP/1.1\" 404 505 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}
2017-06-21T22:58:19+09:00	test.access1	{"message":"71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] \"GET /favicon.ico HTTP/1.1\" 200 1713 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}
2017-06-21T22:58:19+09:00	test.access1	{"message":"71.19.157.174 - - [24/Sep/2014:22:26:37 +0000] \"GET / HTTP/1.1\" 200 18785 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}
2017-06-21T22:58:19+09:00	test.access1	{"message":"71.19.157.174 - - [24/Sep/2014:22:26:37 +0000] \"GET /jobmineimg.php?q=m HTTP/1.1\" 200 222 \"http://www.holdenkarau.com/\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}

-------------------------------------------
Time: 1498053562000 ms
-------------------------------------------

-------------------------------------------
Time: 1498053571000 ms
-------------------------------------------

-------------------------------------------
Time: 1498053571000 ms
-------------------------------------------

-------------------------------------------
Time: 1498053572000 ms
-------------------------------------------
2017-06-21T22:58:29+09:00	test.access2	{"message":"71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] \"GET /error78978 HTTP/1.1\" 404 505 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}

-------------------------------------------
Time: 1498053572000 ms
-------------------------------------------

fluentd で収集したファイルを Spark Streaming で GET

Spark Streaming ではファイルが更新されたとしても、その差分のみの取得はされない。
Spark Streaming ではあるディレクトリに生成された新規ファイルは自動的に取り込んでくれるよう。
そこで差分のみを fluentd で取得し、それを新規ファイルとして出力してみました。

◆ fluentd の準備
1. ruby のバージョンアップ
fluentd をインストールしようとしたら、ruby のバージョン 2.1.0 以上を要求されたので、まず先にインストールしました。

sudo add-apt-repository -y ppa:brightbox/ruby-ng
sudo apt-get update
sudo apt-get -y install ruby2.1

$ ruby -v
ruby 2.1.9p490 (2016-03-30 revision 54437) [x86_64-linux-gnu]

2. fluentd のインストー

sudo gem install fluentd

3. fluentd のセットアップ

fluentd --setup .
-> fluent.conf ファイルが生成される

4. fluent.conf ファイルの編集
すでにファイルには色々と書き込まれていますが、すべて削除して以下の内容にしました。

## File input
## read apache logs with tag=apache.access
<source>
  type tail
  format none
  path /tmp/input/test.log
  tag apache.access
</source>

# match tag=apache.access and write to file
<match apache.access>
  type file
  path /tmp/logs/result.log
  time_slice_format %Y%m%d%H%M%S
  time_slice_wait 1m
</match>

5. fluentd の起動

fluentd -c fluent.conf -vv

◆ ログファイルの準備
fluentd の source 内で指定したログファイルを適当に更新するだけです。
こんな感じのログです。

apache.access   {"message":"66.249.69.97 - - [24/Sep/2014:22:25:44 +0000] \"GET /071300/242153 HTTP/1.1\" 404 514 \"-\" \"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)\""}
2017-06-17T10:30:31+09:00       apache.access   {"message":"71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] \"GET /error HTTP/1.1\" 404 505 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}
2017-06-17T10:30:31+09:00       apache.access   {"message":"71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] \"GET /favicon.ico HTTP/1.1\" 200 1713 \"-\" \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36\""}

すると、更新された差分情報のみが fluentd によって回収され、以下のようなファイルが /tmp/logs 内に出来上がります。
result.log.20170617103031_0.log

◆ Spark streaming の起動
そして、Spark streaming の StreamingTextFileStream で /tmp/logs ディレクトリを監視していると
(単にスペース区切りで文字列カウントしただけなので、アウトプット自体は美しくないですが)
差分のみを取得することが出来ました。

-------------------------------------------
Time: 1497967776000 ms
-------------------------------------------
(\"Mozilla/5.0,5)
((KHTML,,4)
(x86_64),4)
(2017-06-20T23:08:33+09:00	apache.access	{"message":"71.19.157.174,4)
(222,1)
(/favicon.ico,1)
(505,1)
(Gecko),4)
(514,1)

TextFileStream でリアルタイム word count

Spark Streaming ではディレクトリを監視して、その中に入ったテキストファイルを取りこめるようなので試してみました。
ソースコードはこちら

1.準備(コーディング)
ディレクトリを監視し、テキストファイルを取り込むためには、textFileStream の引数に監視するディレクトリパスを指定します。
ディレクトリパスに、ワイルドカードは使えないようです。
監視するディレクトリ階層を深く設定できてしまうと、パフォーマンス的に影響があるためでしょうか。

// create DStream from text file
String logDir = "/tmp/logs";
JavaDStream<String> logData = jssc.textFileStream(logDir);

今回はワードカウントを実装してみたいと思います。
取り込んだ DStream を flatMap で取り出し、空白(スペース)区切りでリストに格納します。

// Split into words
JavaDStream<String> words = logData.flatMap(new FlatMapFunction<String, String>() {
  public Iterable<String> call(String line) throws Exception {
    return Arrays.asList(line.split(" "));
  }
});

flatMap を使った場合の関数(call)は、1つの要素を返すのではなくて、イテレータを返します。
返される DStream の内容はイテレータではなくて、すべてのイテレータから返された要素になります。
それぞれの要素は区切られることなく、フラットな状態です。
そのため、入力文字列を単語に分割する用途としてよく使われているようです。

ペアの DStream を生成し、reduce 処理でカウントしていきます。
このあたりは、RDD でワードカウントするのと同じ方法です。

// Transform into word and count
JavaPairDStream<String, Integer> counts = words.mapToPair(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String word) throws Exception {
	return new Tuple2(word, 1);
    }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer original, Integer additional) throws Exception {
	return original + additional;
    }
});

出来上がった counts を表示させます。
ペアの DStream でも print が使えるようになっています。
ただ、デフォルトでは10個までしか出力されないので、引数50を与えて50個まで出力できるようにします。

// output
counts.print(50);

2.実行
今回は apache のログのようなデータを用意し、それを取り込ませてみました。

(x86_64),4)
((KHTML,,4)
(222,1)
(66.249.69.97,1)
(/favicon.ico,1)
(505,1)
(Gecko),4)
(514,1)
(Chrome/37.0.2062.94,4)
(404,2)

今回は単に空白で単語を区切っただけなので、さすがに意味の分からない単位でワードカウントされてしまっていますが、きちんとカウントされていることが確認できました。

3.感想
ディレクトリを指定しておくだけで、その中に生成されたテキストを自動で取り込んでくれるのは非常に便利だと思います。
ただ、同じファイル名で追記されたデータは取り込んでくれませんでしたので、常に新しいファイルに出力する必要があるかもしれません。
また、新規作成ファイルであったとしても、ファイル作成中に中途半端な状態で取り込まれてしまう可能性があると思います。
これは hdfs など(原子性が保証?)のデータソースを使えれば解決できるかもしれません。
fluentd などと連携させることで非常に便利なリアルタイム集計ツールになってくれそうな気がします。