---
title: How to use the TCP client provided by Spring Framework (Part 1/2)
tags: ["Java", "Spring", "TCP"]
categories: ["Programming", "Java", "org", "springframework", "messaging", "tcp"]
date: 2015-10-04T02:01:18Z
updated: 2015-10-04T02:01:18Z
---

When you want to use not HTTP client but TCP client, [org.springframework.messaging.tcp.TcpOperations](http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/messaging/tcp/TcpOperations.html) interface is available. 
[org.springframework.messaging.tcp.reactor.Reactor2TcpClient](http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.html) is provided as a implementation of `TcpOperations` which is using [Reactor](http://projectreactor.io/). `Reactor2TcpClient` is released at Spring 4.2. ([Reactor11TcpClient](http://docs.spring.io/spring/docs/4.1.7.RELEASE/javadoc-api/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.html) had been provided since Spring 4.0. It disappeared at 4.2!!)


However how to use it is not documented. (Even unit test doesn't exist!) [This issue](https://github.com/reactor/reactor/issues/510) can be a hint.

I tried using `Reactor2TcpClient`.

``` java
package demo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.util.SocketUtils;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class DemoTcpApplication {

    @Slf4j
    @Configuration
    public static class AppConfig {
        int port = SocketUtils.findAvailableTcpPort();

        @Bean(destroyMethod = "shutdown")
        TcpOperations<String> tcpOperations() {
            return new Reactor2TcpClient<>("localhost", port,
                    new Codec<Buffer, Message<String>, Message<String>>() {
                        @Override
                        public Function<Buffer, Message<String>> decoder(Consumer<Message<String>> next) {
                            return bytes -> MessageBuilder.withPayload(bytes.asString()).build();
                        }

                        @Override
                        public Buffer apply(Message<String> message) {
                            return Buffer.wrap(message.getPayload());
                        }
                    });
        }

        @Bean(initMethod = "start", destroyMethod = "stop")
        HelloServer testTcpServer() throws IOException {
            return new HelloServer(port);
        }

        @Bean
        InitializingBean sampleRunner(TcpOperations<String> tcpOperations) {
            return () -> {
                log.info("start");
                IntStream.range(0, 10).forEach(i -> {
                    tcpOperations.connect(new TcpConnectionHandler<String>() {
                        @Override
                        public void afterConnected(TcpConnection<String> connection) {
                            connection.send(MessageBuilder.withPayload("Hello!" + i + "\n").build());
                        }

                        @Override
                        public void afterConnectFailure(Throwable ex) {
                            // NO-OP
                        }

                        @Override
                        public void handleMessage(Message<String> message) {
                            log.info(message.getPayload());
                        }

                        @Override
                        public void handleFailure(Throwable ex) {
                            // NO-OP
                        }

                        @Override
                        public void afterConnectionClosed() {
                            // NO-OP
                        }
                    });
                });
                log.info("end");
            };
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // System.setProperty("reactor.tcp.ioThreadCount", "10");
        new AnnotationConfigApplicationContext(AppConfig.class)
                .registerShutdownHook();
        TimeUnit.SECONDS.sleep(1);
        System.exit(0);
    }
}
```

`HelloServer` is an echo TCP server. In this program, a TCP message is handled as `String`. If other type is appropriate (ex. `byte[]`), pass the `Codec` (ex. `Codec<Buffer, Message<byte[]>, Message<byte[]>>`) to the constructor of `Reactor2TcpClient`.

This program actually works as follows:

``` console
[main] INFO org.springframework.context.annotation.AnnotationConfigApplicationContext - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Sun Oct 04 11:28:14 JST 2015]; root of context hierarchy
[main] INFO demo.HelloServer - listen 22192
[main] INFO demo.DemoTcpApplication$AppConfig - start
[main] INFO demo.DemoTcpApplication$AppConfig - end
[reactor-tcp-io-2] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!1
[reactor-tcp-io-4] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!3
[reactor-tcp-io-1] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!0
[reactor-tcp-io-3] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!2
[reactor-tcp-io-1] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!4
[reactor-tcp-io-3] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!6
[reactor-tcp-io-1] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!8
[reactor-tcp-io-4] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!7
[reactor-tcp-io-2] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!5
[reactor-tcp-io-2] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!9
[Thread-4] INFO org.springframework.context.annotation.AnnotationConfigApplicationContext - Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Sun Oct 04 11:28:14 JST 2015]; root of context hierarchy
[Thread-4] INFO demo.HelloServer - stop...
```

As you can see, handling a connection is complicated.

I've created a helper class to create `TcpConnectionHandler` called `TcpConnectionHandlerBuilder`.

``` java
package demo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;

import java.util.function.Consumer;

@Slf4j
public class TcpConnectionHandlerBuilder<T> {
    private Consumer<TcpConnection<T>> onConnect;
    private Consumer<Message<T>> onMessage;
    private Consumer<Throwable> onConnectFailure;
    private Consumer<Throwable> onFailure;
    private Runnable onClosed;

    public TcpConnectionHandlerBuilder<T> onConnect(
            Consumer<TcpConnection<T>> onConnected) {
        this.onConnect = onConnected;
        return this;
    }

    public TcpConnectionHandlerBuilder<T> onMessage(Consumer<Message<T>> onMessage) {
        this.onMessage = onMessage;
        return this;
    }

    public TcpConnectionHandlerBuilder<T> onConnectFailure(Consumer<Throwable> onConnectFailure) {
        this.onConnectFailure = onConnectFailure;
        return this;
    }

    public TcpConnectionHandlerBuilder<T> onFailure(Consumer<Throwable> onFailure) {
        this.onFailure = onFailure;
        return this;
    }

    public TcpConnectionHandlerBuilder<T> onClosed(Runnable onClosed) {
        this.onClosed = onClosed;
        return this;
    }

    public TcpConnectionHandler<T> build() {
        return new TcpConnectionHandler<T>() {
            @Override
            public void afterConnected(TcpConnection<T> connection) {
                log.trace("afterConnected {}", connection);
                if (TcpConnectionHandlerBuilder.this.onConnect != null) {
                    TcpConnectionHandlerBuilder.this.onConnect.accept(connection);
                }
            }

            @Override
            public void afterConnectFailure(Throwable ex) {
                log.trace("afterConnectFailure", ex);
                if (TcpConnectionHandlerBuilder.this.onConnectFailure != null) {
                    TcpConnectionHandlerBuilder.this.onConnectFailure.accept(ex);
                }
            }

            @Override
            public void handleMessage(Message<T> message) {
                log.trace("handleMessage {}", message);
                if (TcpConnectionHandlerBuilder.this.onMessage != null) {
                    TcpConnectionHandlerBuilder.this.onMessage.accept(message);
                }
            }

            @Override
            public void handleFailure(Throwable ex) {
                log.trace("handleFailure", ex);
                if (TcpConnectionHandlerBuilder.this.onFailure != null) {
                    TcpConnectionHandlerBuilder.this.onFailure.accept(ex);
                }
            }

            @Override
            public void afterConnectionClosed() {
                log.trace("afterConnectionClosed");
                if (TcpConnectionHandlerBuilder.this.onClosed != null) {
                    TcpConnectionHandlerBuilder.this.onClosed.run();
                }
            }
        };
    }
}
```

Using this class, connecting code can be simplified like the following:

``` java
tcpOperations.connect(new TcpConnectionHandlerBuilder<String>()
        .onConnect(c -> c.send(MessageBuilder.withPayload("Hello!" + i + "\n").build()))
        .onMessage(m -> log.info(m.getPayload()))
        .build());
```

This looks good for the high level TCP client.
However even though this is simple tcp programming, following dependencies are required (Netty! GS Collections!).

![image](https://qiita-image-store.s3.amazonaws.com/0/1852/f544b271-d129-9c3f-955c-778bcfa1d947.png)

I think this is somewhat exaggerated.

In the next entry, I'll introduce another way to use TCP client using Spring Integration.

Sample code shown in this entry is available [here](https://github.com/making/demo-tcp).
