Spark Streaming 試してみました
初めての Spark を参照しながら、Apache Spark Streaming を試してみました。
ソースコード
まずは概念、概要から・・・
1. Spark は RDD を元に構築されていますが、Spark Streaming においては DStream と呼ばれる概念の元に構築されるらしい。
2. DStream は RDD の並びである。
- Streaming 処理なので、小さな RDD を順番に処理していくイメージ?
3. DStream は Flume、Kafka、HDFS など、色々なデータソースから生成可能。
- RDD も色んなデータソースから生成可能ですね。
4. DStream では、変換(transformation)と出力(output)の操作ができる。
- RDD における変換とアクションとよく似ていますね。
5. Spark のバッチプログラムと異なり、Spark Streaming では、アプリケーションを常時稼働させる。
続いて、お試し実装。
Spark Streaming では StreamingContext を生成します。
SparkConf は SparkContext 生成時と同じもの。
Durations.seconds では、そのバッチのインターバルを設定します。
Streaming 処理なので、1秒など短い間隔で設定されることが多いのかなと思います。
// create StreamingContext SparkConf conf = new SparkConf().setAppName("StreamingApp"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
DStream を生成します。
StreamingContext の socketTextStream でデータを受信するホストとポート番号を指定します。
// create DStream from specified port JavaDStream<String> lines = jssc.socketTextStream("localhost", 7777);
今回は、DStream に filter() をかけてみます。error という文字を含む場合だけ取り出すようにします。
// filter for DStream JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() { public Boolean call(String line) throws Exception { return line.contains("error"); } });
フィルター後の DStream を出力します。
面白いところは、streaming をスタートさせる命令は、ソースコード上では最後に書かれているところです。
Spark の遅延評価の仕組みに通じるところがある気がします。
また、これは独立したスレッドの中で動くらしく、アプリケーションが終了しないように、awaitTermination を呼び出しています。
// output errorLines.print(); // start streaming jssc.start(); // wait for end of job jssc.awaitTermination();
それでは、いざ実行!
起動には spark-submit を使います。
$ ./spark-submit --class com.blueskyarea.StreamingApp --conf spark.yarn.jar=${ASSEMBLY_JAR} ./spark-streaming/target/spark-streaming-1.0-SNAPSHOT.jar
WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:7777 java.net.ConnectException: 接続を拒否されました at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.<init>(Socket.java:425) at java.net.Socket.<init>(Socket.java:208) at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73) at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)
コネクションエラーが発生しましたが、これは 7777 ポートが LISTEN 状態になっていないためのようです。
なので、nc コマンドで LISTEN 状態にします。
nc -l 7777
そして、nc コマンドに続いて入力してみます。
abc error efror there are some error error2
すると、Spark Streaming の出力ウィンドウには、error が含まれる行のみが出力されました。
error there are some error error2
お試しレベルですが、比較的簡単に実現することが出来ました。
リアルタイム(今回は1秒単位)に処理がされていく様子を見るのは楽しいです。
試した入力サイズが非常に小さいですが、1秒単位での処理においてレスポンスも速いです。
より複雑な変換処理を入れた場合、1秒以上要する変換処理の後に出力させた場合、どのような動きになるのか試してみたいです。