社内se × プログラマ × ビッグデータ

プログラミングなどITに興味があります。

Spark Streaming 試してみました

初めての Spark を参照しながら、Apache Spark Streaming を試してみました。
ソースコード

まずは概念、概要から・・・


1. Spark は RDD を元に構築されていますが、Spark Streaming においては DStream と呼ばれる概念の元に構築されるらしい。
2. DStream は RDD の並びである。

  • Streaming 処理なので、小さな RDD を順番に処理していくイメージ?

3. DStream は Flume、Kafka、HDFS など、色々なデータソースから生成可能。

  • RDD も色んなデータソースから生成可能ですね。

4. DStream では、変換(transformation)と出力(output)の操作ができる。

  • RDD における変換とアクションとよく似ていますね。

5. Spark のバッチプログラムと異なり、Spark Streaming では、アプリケーションを常時稼働させる。

続いて、お試し実装。
Spark Streaming では StreamingContext を生成します。
SparkConf は SparkContext 生成時と同じもの。
Durations.seconds では、そのバッチのインターバルを設定します。
Streaming 処理なので、1秒など短い間隔で設定されることが多いのかなと思います。

// create StreamingContext
SparkConf conf = new SparkConf().setAppName("StreamingApp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

DStream を生成します。
StreamingContext の socketTextStream でデータを受信するホストとポート番号を指定します。

// create DStream from specified port
JavaDStream<String> lines = jssc.socketTextStream("localhost", 7777);

今回は、DStream に filter() をかけてみます。error という文字を含む場合だけ取り出すようにします。

// filter for DStream
JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() {
    public Boolean call(String line) throws Exception {
        return line.contains("error");
    }    	
});

フィルター後の DStream を出力します。
面白いところは、streaming をスタートさせる命令は、ソースコード上では最後に書かれているところです。
Spark の遅延評価の仕組みに通じるところがある気がします。
また、これは独立したスレッドの中で動くらしく、アプリケーションが終了しないように、awaitTermination を呼び出しています。

// output
errorLines.print();
        
// start streaming
jssc.start();
        
// wait for end of job
jssc.awaitTermination();

それでは、いざ実行!
起動には spark-submit を使います。

$ ./spark-submit --class com.blueskyarea.StreamingApp --conf spark.yarn.jar=${ASSEMBLY_JAR} ./spark-streaming/target/spark-streaming-1.0-SNAPSHOT.jar
WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:7777
java.net.ConnectException: 接続を拒否されました
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:579)
	at java.net.Socket.connect(Socket.java:528)
	at java.net.Socket.<init>(Socket.java:425)
	at java.net.Socket.<init>(Socket.java:208)
	at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

コネクションエラーが発生しましたが、これは 7777 ポートが LISTEN 状態になっていないためのようです。
なので、nc コマンドで LISTEN 状態にします。

nc -l 7777

そして、nc コマンドに続いて入力してみます。

abc
error
efror
there are some error
error2

すると、Spark Streaming の出力ウィンドウには、error が含まれる行のみが出力されました。

error
there are some error
error2

お試しレベルですが、比較的簡単に実現することが出来ました。
リアルタイム(今回は1秒単位)に処理がされていく様子を見るのは楽しいです。
試した入力サイズが非常に小さいですが、1秒単位での処理においてレスポンスも速いです。
より複雑な変換処理を入れた場合、1秒以上要する変換処理の後に出力させた場合、どのような動きになるのか試してみたいです。