--- title: Upstashの Serverless KafkaにSpring Bootでアクセスするメモ tags: ["Spring Boot", "Kafka", "Upstash", "Spring for Apache Kafka"] categories: ["Programming", "Java", "org", "springframework", "kafka"] date: 2024-07-01T02:58:12Z updated: 2024-07-01T03:02:43Z --- [Upstash](https://upstash.com/)の[Serverless Kafka](https://upstash.com/docs/kafka/overall/getstarted)にSpring Bootからアクセスしてみます。 ### Upstashアカウントの作成 https://upstash.com/kafka から"Free"の"Start Now"をクリック image Google(等)でログイン image これだけでアカウント作成完了。 ### Kafkaクラスタの作成 ダッシュボードから"Kafka"を選択 image ”Create Cluster"ボタンをクリックし、名前などを入力 image Topic名を入力して、"Create Topic"ボタンをクリック image これでKafkaの準備ができました。 ### Producerアプリの作成 Spring Initializrでアプリの雛形を作成します。 ``` curl https://start.spring.io/starter.tgz \ -d artifactId=demo-kafka-producer \ -d baseDir=demo-kafka-producer \ -d packageName=com.example \ -d dependencies=kafka,testcontainers,web,actuator \ -d type=maven-project \ -d applicationName=DemoKafkaProducerApplication | tar -xzvf - cd demo-kafka-producer ``` Producer用のサンプルコードを作成します。 ```java cat <<'EOF' > src/main/java/com/example/ProducerController.java package com.example; import java.util.concurrent.CompletableFuture; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { private final KafkaTemplate kafkaTemplate; private final String topic; public ProducerController(KafkaTemplate kafkaTemplate, @Value("${demo.topic}") String topic) { this.kafkaTemplate = kafkaTemplate; this.topic = topic; } @PostMapping(path = "/") public CompletableFuture hello(@RequestBody String message) { CompletableFuture> result = this.kafkaTemplate.send(this.topic, message); return result.thenApply(r -> r.getProducerRecord().toString()); } } EOF ``` Kafka環境に依らないプロパティを`application.properties`に定義します。 ```properties cat <<'EOF' > src/main/resources/application.properties demo.topic=demo server.shutdown=graceful spring.application.name=demo-kafka-producer EOF ``` Upstash環境に依存するプロパティを`application-upstash.properties`に定義します。 接続するための情報はダッシュボードから確認できます。 image 以下の * `spring.kafka.bootstrap-servers` * `spring.kafka.jaas.options.password` * `spring.kafka.jaas.options.username` を変更してください。 ```properties cat <<'EOF' > src/main/resources/application-upstash.properties spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256 spring.kafka.admin.security.protocol=SASL_SSL spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092 spring.kafka.jaas.control-flag=required spring.kafka.jaas.enabled=true spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule spring.kafka.jaas.options.password=XXXX spring.kafka.jaas.options.username=XXXX spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256 spring.kafka.producer.security.protocol=SASL_SSL EOF ``` アプリをビルドし、実行します。profileを`upstash`に設定して、Upstashに接続できるようにします。 ``` ./mvnw clean package -DskipTests java -jar target/demo-kafka-producer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash ``` ### Consumerアプリの作成 Spring Initializrでアプリの雛形を作成します。 ``` curl https://start.spring.io/starter.tgz \ -d artifactId=demo-kafka-consumer \ -d baseDir=demo-kafka-consumer \ -d packageName=com.example \ -d dependencies=kafka,testcontainers,web,actuator \ -d type=maven-project \ -d applicationName=DemoKafkaConsumerApplication | tar -xzvf - cd demo-kafka-consumer ``` Consumer用のサンプルコードを作成します。 ```java cat <<'EOF' > src/main/java/com/example/ConsumerController.java package com.example; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ConsumerController { private final List messages = new CopyOnWriteArrayList<>(); private final Logger log = LoggerFactory.getLogger(ConsumerController.class); @GetMapping(path = "") public List getMessages() { return this.messages; } @KafkaListener(topics = "${demo.topic}") public void onMessage(String message) { log.info("onMessage({})", message); this.messages.add(message); } } EOF ``` Kafka環境に依らないプロパティを`application.properties`に定義します。 ```properties cat <<'EOF' > src/main/resources/application.properties demo.topic=demo server.port=8082 server.shutdown=graceful spring.application.name=demo-kafka-consumer spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.group-id=${spring.application.name} EOF ``` Upstash環境に依存するプロパティを`application-upstash.properties`に定義します。 以下の * `spring.kafka.bootstrap-servers` * `spring.kafka.jaas.options.password` * `spring.kafka.jaas.options.username` を変更してください。 ```properties cat <<'EOF' > src/main/resources/application-upstash.properties spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256 spring.kafka.admin.security.protocol=SASL_SSL spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092 spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256 spring.kafka.consumer.security.protocol=SASL_SSL spring.kafka.jaas.control-flag=required spring.kafka.jaas.enabled=true spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule spring.kafka.jaas.options.password=XXXX spring.kafka.jaas.options.username=XXXX EOF ``` アプリをビルドし、実行します。profileを`upstash`に設定して、Upstashに接続できるようにします。 ``` ./mvnw clean package -DskipTests java -jar target/demo-kafka-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash ``` ### 動作確認 Producerアプリにリクエストを送ります。 ``` curl localhost:8080 -H "Content-Type: text/plain" -d "Hello World" curl localhost:8080 -H "Content-Type: text/plain" -d "Test" ``` Consumer側に次のようなログが出力されればOKです。 ``` 2024-06-30T02:50:22.952+09:00 INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Hello World) 2024-06-30T02:50:29.202+09:00 INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController : onMessage(Test) ``` また、送ったメッセージはConsumerアプリののAPIから取得できます。 ``` $ curl localhost:8082 ["Hello World","Test"] ```