Warning
This article was automatically translated by OpenAI (gpt-4o).It may be edited eventually, but please be aware that it may contain incorrect information at this time.
Let's try accessing Upstash's Serverless Kafka from Spring Boot.
Creating an Upstash Account
Go to https://upstash.com/kafka and click "Start Now" under "Free".
Log in with Google (or other options).
That's it for account creation.
Creating a Kafka Cluster
Select "Kafka" from the dashboard.
Click the "Create Cluster" button and enter the necessary details.
Enter the topic name and click the "Create Topic" button.
Now Kafka is ready.
Creating a Producer Application
Create a skeleton application using Spring Initializr.
curl https://start.spring.io/starter.tgz \
-d artifactId=demo-kafka-producer \
-d baseDir=demo-kafka-producer \
-d packageName=com.example \
-d dependencies=kafka,testcontainers,web,actuator \
-d type=maven-project \
-d applicationName=DemoKafkaProducerApplication | tar -xzvf -
cd demo-kafka-producer
Create sample code for the producer.
cat <<'EOF' > src/main/java/com/example/ProducerController.java
package com.example;
import java.util.concurrent.CompletableFuture;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String topic;
public ProducerController(KafkaTemplate<String, String> kafkaTemplate, @Value("${demo.topic}") String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
@PostMapping(path = "/")
public CompletableFuture<String> hello(@RequestBody String message) {
CompletableFuture<SendResult<String, String>> result = this.kafkaTemplate.send(this.topic, message);
return result.thenApply(r -> r.getProducerRecord().toString());
}
}
EOF
Define properties that are independent of the Kafka environment in application.properties.
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.shutdown=graceful
spring.application.name=demo-kafka-producer
EOF
Define properties that depend on the Upstash environment in application-upstash.properties.
You can check the connection information from the dashboard.
Change the following:
spring.kafka.bootstrap-serversspring.kafka.jaas.options.passwordspring.kafka.jaas.options.username
cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.producer.security.protocol=SASL_SSL
EOF
Build and run the application. Set the profile to upstash to connect to Upstash.
./mvnw clean package -DskipTests
java -jar target/demo-kafka-producer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash
Creating a Consumer Application
Create a skeleton application using Spring Initializr.
curl https://start.spring.io/starter.tgz \
-d artifactId=demo-kafka-consumer \
-d baseDir=demo-kafka-consumer \
-d packageName=com.example \
-d dependencies=kafka,testcontainers,web,actuator \
-d type=maven-project \
-d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer
Create sample code for the consumer.
cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ConsumerController {
private final List<String> messages = new CopyOnWriteArrayList<>();
private final Logger log = LoggerFactory.getLogger(ConsumerController.class);
@GetMapping(path = "")
public List<String> getMessages() {
return this.messages;
}
@KafkaListener(topics = "${demo.topic}")
public void onMessage(String message) {
log.info("onMessage({})", message);
this.messages.add(message);
}
}
EOF
Define properties that are independent of the Kafka environment in application.properties.
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
server.shutdown=graceful
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF
Define properties that depend on the Upstash environment in application-upstash.properties.
Change the following:
spring.kafka.bootstrap-serversspring.kafka.jaas.options.passwordspring.kafka.jaas.options.username
cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.consumer.security.protocol=SASL_SSL
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
EOF
Build and run the application. Set the profile to upstash to connect to Upstash.
./mvnw clean package -DskipTests
java -jar target/demo-kafka-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash
Testing time
Send requests to the producer application.
curl localhost:8080 -H "Content-Type: text/plain" -d "Hello World"
curl localhost:8080 -H "Content-Type: text/plain" -d "Test"
If the following logs are output on the consumer side, it is working correctly.
2024-06-30T02:50:22.952+09:00 INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Hello World)
2024-06-30T02:50:29.202+09:00 INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Test)
You can also retrieve the sent messages from the consumer application's API.
$ curl localhost:8082
["Hello World","Test"]