When you want to use not HTTP client but TCP client, org.springframework.messaging.tcp.TcpOperations interface is available.
org.springframework.messaging.tcp.reactor.Reactor2TcpClient is provided as a implementation of TcpOperations
which is using Reactor. Reactor2TcpClient
is released at Spring 4.2. (Reactor11TcpClient had been provided since Spring 4.0. It disappeared at 4.2!!)
However how to use it is not documented. (Even unit test doesn't exist!) This issue can be a hint.
I tried using Reactor2TcpClient
.
package demo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.util.SocketUtils;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class DemoTcpApplication {
@Slf4j
@Configuration
public static class AppConfig {
int port = SocketUtils.findAvailableTcpPort();
@Bean(destroyMethod = "shutdown")
TcpOperations<String> tcpOperations() {
return new Reactor2TcpClient<>("localhost", port,
new Codec<Buffer, Message<String>, Message<String>>() {
@Override
public Function<Buffer, Message<String>> decoder(Consumer<Message<String>> next) {
return bytes -> MessageBuilder.withPayload(bytes.asString()).build();
}
@Override
public Buffer apply(Message<String> message) {
return Buffer.wrap(message.getPayload());
}
});
}
@Bean(initMethod = "start", destroyMethod = "stop")
HelloServer testTcpServer() throws IOException {
return new HelloServer(port);
}
@Bean
InitializingBean sampleRunner(TcpOperations<String> tcpOperations) {
return () -> {
log.info("start");
IntStream.range(0, 10).forEach(i -> {
tcpOperations.connect(new TcpConnectionHandler<String>() {
@Override
public void afterConnected(TcpConnection<String> connection) {
connection.send(MessageBuilder.withPayload("Hello!" + i + "\n").build());
}
@Override
public void afterConnectFailure(Throwable ex) {
// NO-OP
}
@Override
public void handleMessage(Message<String> message) {
log.info(message.getPayload());
}
@Override
public void handleFailure(Throwable ex) {
// NO-OP
}
@Override
public void afterConnectionClosed() {
// NO-OP
}
});
});
log.info("end");
};
}
}
public static void main(String[] args) throws InterruptedException {
// System.setProperty("reactor.tcp.ioThreadCount", "10");
new AnnotationConfigApplicationContext(AppConfig.class)
.registerShutdownHook();
TimeUnit.SECONDS.sleep(1);
System.exit(0);
}
}
HelloServer
is an echo TCP server. In this program, a TCP message is handled as String
. If other type is appropriate (ex. byte[]
), pass the Codec
(ex. Codec<Buffer, Message<byte[]>, Message<byte[]>>
) to the constructor of Reactor2TcpClient
.
This program actually works as follows:
[main] INFO org.springframework.context.annotation.AnnotationConfigApplicationContext - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Sun Oct 04 11:28:14 JST 2015]; root of context hierarchy
[main] INFO demo.HelloServer - listen 22192
[main] INFO demo.DemoTcpApplication$AppConfig - start
[main] INFO demo.DemoTcpApplication$AppConfig - end
[reactor-tcp-io-2] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!1
[reactor-tcp-io-4] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!3
[reactor-tcp-io-1] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!0
[reactor-tcp-io-3] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!2
[reactor-tcp-io-1] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!4
[reactor-tcp-io-3] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!6
[reactor-tcp-io-1] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!8
[reactor-tcp-io-4] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!7
[reactor-tcp-io-2] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!5
[reactor-tcp-io-2] INFO demo.DemoTcpApplication$AppConfig - Hi Hello!9
[Thread-4] INFO org.springframework.context.annotation.AnnotationConfigApplicationContext - Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Sun Oct 04 11:28:14 JST 2015]; root of context hierarchy
[Thread-4] INFO demo.HelloServer - stop...
As you can see, handling a connection is complicated.
I've created a helper class to create TcpConnectionHandler
called TcpConnectionHandlerBuilder
.
package demo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import java.util.function.Consumer;
@Slf4j
public class TcpConnectionHandlerBuilder<T> {
private Consumer<TcpConnection<T>> onConnect;
private Consumer<Message<T>> onMessage;
private Consumer<Throwable> onConnectFailure;
private Consumer<Throwable> onFailure;
private Runnable onClosed;
public TcpConnectionHandlerBuilder<T> onConnect(
Consumer<TcpConnection<T>> onConnected) {
this.onConnect = onConnected;
return this;
}
public TcpConnectionHandlerBuilder<T> onMessage(Consumer<Message<T>> onMessage) {
this.onMessage = onMessage;
return this;
}
public TcpConnectionHandlerBuilder<T> onConnectFailure(Consumer<Throwable> onConnectFailure) {
this.onConnectFailure = onConnectFailure;
return this;
}
public TcpConnectionHandlerBuilder<T> onFailure(Consumer<Throwable> onFailure) {
this.onFailure = onFailure;
return this;
}
public TcpConnectionHandlerBuilder<T> onClosed(Runnable onClosed) {
this.onClosed = onClosed;
return this;
}
public TcpConnectionHandler<T> build() {
return new TcpConnectionHandler<T>() {
@Override
public void afterConnected(TcpConnection<T> connection) {
log.trace("afterConnected {}", connection);
if (TcpConnectionHandlerBuilder.this.onConnect != null) {
TcpConnectionHandlerBuilder.this.onConnect.accept(connection);
}
}
@Override
public void afterConnectFailure(Throwable ex) {
log.trace("afterConnectFailure", ex);
if (TcpConnectionHandlerBuilder.this.onConnectFailure != null) {
TcpConnectionHandlerBuilder.this.onConnectFailure.accept(ex);
}
}
@Override
public void handleMessage(Message<T> message) {
log.trace("handleMessage {}", message);
if (TcpConnectionHandlerBuilder.this.onMessage != null) {
TcpConnectionHandlerBuilder.this.onMessage.accept(message);
}
}
@Override
public void handleFailure(Throwable ex) {
log.trace("handleFailure", ex);
if (TcpConnectionHandlerBuilder.this.onFailure != null) {
TcpConnectionHandlerBuilder.this.onFailure.accept(ex);
}
}
@Override
public void afterConnectionClosed() {
log.trace("afterConnectionClosed");
if (TcpConnectionHandlerBuilder.this.onClosed != null) {
TcpConnectionHandlerBuilder.this.onClosed.run();
}
}
};
}
}
Using this class, connecting code can be simplified like the following:
tcpOperations.connect(new TcpConnectionHandlerBuilder<String>()
.onConnect(c -> c.send(MessageBuilder.withPayload("Hello!" + i + "\n").build()))
.onMessage(m -> log.info(m.getPayload()))
.build());
This looks good for the high level TCP client. However even though this is simple tcp programming, following dependencies are required (Netty! GS Collections!).
I think this is somewhat exaggerated.
In the next entry, I'll introduce another way to use TCP client using Spring Integration.
Sample code shown in this entry is available here.