Warning

This article was automatically translated by OpenAI (gpt-4o).It may be edited eventually, but please be aware that it may contain incorrect information at this time.

Let's try accessing Upstash's Serverless Kafka from Spring Boot.

Creating an Upstash Account

Go to https://upstash.com/kafka and click "Start Now" under "Free".

image

Log in with Google (or other options).

image

That's it for account creation.

Creating a Kafka Cluster

Select "Kafka" from the dashboard.

image

Click the "Create Cluster" button and enter the necessary details.

image

Enter the topic name and click the "Create Topic" button.

image

Now Kafka is ready.

Creating a Producer Application

Create a skeleton application using Spring Initializr.

curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-producer \
       -d baseDir=demo-kafka-producer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaProducerApplication | tar -xzvf -
cd demo-kafka-producer

Create sample code for the producer.

cat <<'EOF' > src/main/java/com/example/ProducerController.java
package com.example;

import java.util.concurrent.CompletableFuture;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {
    private final KafkaTemplate<String, String> kafkaTemplate;

    private final String topic;

    public ProducerController(KafkaTemplate<String, String> kafkaTemplate, @Value("${demo.topic}") String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }

    @PostMapping(path = "/")
    public CompletableFuture<String> hello(@RequestBody String message) {
        CompletableFuture<SendResult<String, String>> result = this.kafkaTemplate.send(this.topic, message);
        return result.thenApply(r -> r.getProducerRecord().toString());
    }
}
EOF

Define properties that are independent of the Kafka environment in application.properties.

cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.shutdown=graceful
spring.application.name=demo-kafka-producer
EOF

Define properties that depend on the Upstash environment in application-upstash.properties.

You can check the connection information from the dashboard.

image

Change the following:

  • spring.kafka.bootstrap-servers
  • spring.kafka.jaas.options.password
  • spring.kafka.jaas.options.username
cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.producer.security.protocol=SASL_SSL
EOF

Build and run the application. Set the profile to upstash to connect to Upstash.

./mvnw clean package -DskipTests
java -jar target/demo-kafka-producer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash

Creating a Consumer Application

Create a skeleton application using Spring Initializr.

curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-consumer \
       -d baseDir=demo-kafka-consumer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer

Create sample code for the consumer.

cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ConsumerController {
    private final List<String> messages = new CopyOnWriteArrayList<>();

    private final Logger log = LoggerFactory.getLogger(ConsumerController.class);

    @GetMapping(path = "")
    public List<String> getMessages() {
        return this.messages;
    }

    @KafkaListener(topics = "${demo.topic}")
    public void onMessage(String message) {
        log.info("onMessage({})", message);
        this.messages.add(message);
    }
}
EOF

Define properties that are independent of the Kafka environment in application.properties.

cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
server.shutdown=graceful
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF

Define properties that depend on the Upstash environment in application-upstash.properties.

Change the following:

  • spring.kafka.bootstrap-servers
  • spring.kafka.jaas.options.password
  • spring.kafka.jaas.options.username
cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.consumer.security.protocol=SASL_SSL
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
EOF

Build and run the application. Set the profile to upstash to connect to Upstash.

./mvnw clean package -DskipTests
java -jar target/demo-kafka-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash

Testing time

Send requests to the producer application.

curl localhost:8080 -H "Content-Type: text/plain" -d "Hello World"
curl localhost:8080 -H "Content-Type: text/plain" -d "Test"   

If the following logs are output on the consumer side, it is working correctly.

2024-06-30T02:50:22.952+09:00  INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World)
2024-06-30T02:50:29.202+09:00  INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Test)

You can also retrieve the sent messages from the consumer application's API.

$ curl localhost:8082
["Hello World","Test"]
Found a mistake? Update the entry.
Share this article: