--- title: Echo Serverを実装して学ぶReactor Nettyによるストリーム処理 tags: ["Reactor", "Reactor Netty", "Netty", "Java"] categories: ["Programming", "Java", "reactor", "ipc", "netty"] date: 2017-10-10T15:34:25Z updated: 2018-01-05T16:02:52Z --- Reactor 3.1, Reactor Netty 0.7が正式に[リリースされた](https://spring.io/blog/2017/09/28/reactor-bismuth-is-out)ので、TCP Server版Hello WorldであるEcho ServerをReactor Nettyで実装してみます。 > ℹ️ 2018-01-04 Bismuth-SR4版 (Reactor 3.1.2, Reactor Netty 0.7.2)にアップデート **目次** ### 依存ライブラリの追加 Reactor 3.1.x系のバージョン管理は`reactor-bom`の`Bismuth`でメンテナンスされています。これを``に設定しておけば``内でのバージョン明示は不要です。 ```xml io.projectreactor.ipc reactor-netty io.projectreactor reactor-test test io.projectreactor reactor-bom Bismuth-SR4 pom import ``` ### Echo Serverを実装 Reactor Nettyの`TcpServer`の使い方のテンプレートは次の通りです。 ``` java import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.tcp.TcpServer; import java.util.Optional; import java.util.function.BiFunction; public class EchoServer { public static void main(String[] args) { TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777); tcpServer.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { // ここに入力 => 出力の処理を書く return Flux.never(); } }); } } ``` サーバーサイドプログラミングに慣れていれば、"ああメッセージの処理ハンドラをラムダで書けばいいんだな"と思うかもしれません。 これはある意味合っているのですが、通常の"処理ハンドラ"と大きく違うのは、このハンドラ(`BiFunction>`)が扱うのは1件のメッセージではなく、無限ストリームであるという点です。 引数の`NettyInbound`/`NettyOutbound`はServletの`HttpServletRequest`/`HttpServletResponse`のように1リクエスト/レスポンスではなく、このサーバーへの入出力全体を表します。なので、この処理ハンドラの返り値が出力ストリームになるわけでもありません。処理が終わらないように`Flux.never()`(データは流れないけど終わらないストリーム)を返します。 入力として扱うデータを文字列に絞りましょう。この場合、入力となる文字列の無限ストリームを`Flux`で次のように取得できます。 ``` java tcpServer.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { Flux input = inbound.receive().asString(); return Flux.never(); } }); ``` この入力ストリームを加工して出力ストリームに送ります。 ここで通常のサーバープログラミングのように、1件ずつ処理したいと考えます。`Flux`クラスにはデータ1件ごとのコールバックメソッドとして`doOnNext(Consumer)`が用意されています。これを使うとサーバーサイドのコードは次のようになります。(わかりやすいようにラムダ式を使っていません) ```java tcpServer.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { Flux input = inbound.receive().asString(); input.doOnNext(new Consumer() { @Override public void accept(String message) { // 文字列メッセージ一件ずつの処理 } }).subscribe(); return Flux.never(); } }); ``` 注意点として、入力ストリームの`Flux`は`subscribe`メソッドを呼ばないとストリームにデータが流れません。 出力ストリームにこのメッセージを送信するコードは次の通りです。 ```java tcpServer.startAndAwait(new BiFunction>() { @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { Flux input = inbound.receive().asString(); input.doOnNext(new Consumer() { @Override public void accept(String message) { outbound.sendString(Mono.just(message)) .then() .subscribe(); } }).subscribe(); return Flux.never(); } }); ``` `NettyOutbound.sendString`は引数が`Publisher`なので、`String`をメッセージを`Mono.just`で包んでいます。この返り値が出力ストリームになります。この型も`NettyOutbound`です。これは`Publisher`を継承しています。 入力ストリーム同様に出力ストリームも`subscribe`しないとデータが流れません。 ここでは`then`メソッドで`Mono`に変換して`subscribe`します。 ここまででEcho Serverができました。`main`メソッドを実行してサーバーを起動時、`telnet`でTCPサーバーにアクセスします。 ``` $ telnet localhost 7777 Trying ::1... Connected to localhost. Escape character is '^]'. hoge <== 入力 hoge ==> 出力 foo <== 入力 foo ==> 出力 bar <== 入力 bar ==> 出力 ^] telnet> quit Connection closed. ``` `nc`コマンドでも動作確認できます。 ``` $ echo -n 'Hello World!' | nc localhost 7777 Hello World! ``` 上記のプログラムはラムダ式を使うと次のように書けます。 ``` java public static void main(String[] args) { TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777); tcpServer.startAndAwait((inbound, outbound) -> { Flux input = inbound.receive().asString(); input.doOnNext(message -> outbound.sendString(Mono.just(message)) .then() .subscribe()).subscribe(); return Flux.never(); }); } ``` 実際にメッセージが出力ストリームに流れているかどうかを確認したい場合は、次のように出力メッセージのストリームに`log`メソッドを追加します。 ``` java public static void main(String[] args) { TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777); tcpServer.startAndAwait((inbound, outbound) -> { Flux input = inbound.receive().asString(); input.doOnNext(message -> outbound.sendString(Mono.just(message).log() /*追加*/) .then() .subscribe()).subscribe(); return Flux.never(); }); } ``` これで再起動し、再度メッセージを送ると、次のようなログを確認できます。`onNext`で出力メッセージが流れていることがわかります。 ``` [ INFO] (reactor-tcp-nio-2) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription) [ INFO] (reactor-tcp-nio-2) | request(32) [ INFO] (reactor-tcp-nio-2) | onNext(Hello World!) [TRACE] (reactor-tcp-nio-2) [id: 0x8984720e, L:/0:0:0:0:0:0:0:1:7777 - R:/0:0:0:0:0:0:0:1:52269] Pending write size = 12 [ INFO] (reactor-tcp-nio-2) | request(1) [ INFO] (reactor-tcp-nio-2) | onComplete() ``` 出力側の`subscribe`を削除すると`onNext`ログがでないことを確認できるでしょう。 少し、リファクタしてみます。 上記のコードは入力ストリームの`doOnNext`コールバック内で出力ストリームを作成したため、両方の`subscribe`が必要でした。 ここで考え方を変えて、入力ストリームを変換して出力ストリームを作成するようにすると、作成されたストリームを`subscribe`するだけで入出力両方にデータが流れます。変換したストリームと元のストリームを合流させるには`flatMap`を使います。`flatMap`を使えばEchoServerは次のように書き換えられます。 ``` java public static void main(String[] args) { TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777); tcpServer.startAndAwait((inbound, outbound) -> { Flux input = inbound.receive().asString(); input.flatMap(message -> outbound.sendString(Mono.just(message))) .subscribe(); return Flux.never(); }); } ``` 見た目がすっきりしました。 > ちなみに、このコードで`flatMap`を`map`に変えてもコンパイルは通りますが、データは流れなくなります。この違い、とても重要なので理解してください。 ### Echo Serverのバッファリング処理 ただのEcho Serverだとつまらないので、もう少しストリーム処理っぽい内容に変えてみます。 `NettyOutbound.sendString`の引数は`Publisher`でした。つまりこのメソッドはメッセージを1件送るためのメソッドではなく、メッセージのストリームを送るメソッドです。 ここでは、EchoServerを3件ごとバッファリングして一気に出力するように書き換えます。`Flux`のストリームを3件ずつの塊ストリームである`Flux>`に変換するには`window(int)`メソッドを使用します。 ``` java public static void main(String[] args) { TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777); tcpServer.startAndAwait((inbound, outbound) -> { Flux input = inbound.receive().asString(); input.window(3) .flatMap(messages -> outbound.sendString(messages)) .subscribe(); return Flux.never(); }); } ``` これで3件ずつまとめてデータを扱うEchoServerになりました。 `telnet`で動作確認してみます。 ``` $ telnet localhost 7777 Trying ::1... Connected to localhost. Escape character is '^]'. hoge <== 入力 foo <== 入力 bar <== 入力 hoge ==> 出力 foo ==> 出力 bar ==> 出力 ^] telnet> quit Connection closed. ``` 前の例と異なり、3件単位でデータが流れていることがわかります。 --- Echo Serverを実装を通じてReactor Nettyを使ったTCP Serverの作り方及び、ストリームを扱うプログラミングの考え方を簡単に学びました。 Reactorを使うことでこれまで使ってこなかったストリーム脳で考えないといけない場面が増えてくるので、この記事がとっかかりになればと思います。