UpstashServerless KafkaにSpring Bootからアクセスしてみます。

Upstashアカウントの作成

https://upstash.com/kafka から"Free"の"Start Now"をクリック

image

Google(等)でログイン

image

これだけでアカウント作成完了。

Kafkaクラスタの作成

ダッシュボードから"Kafka"を選択

image

”Create Cluster"ボタンをクリックし、名前などを入力

image

Topic名を入力して、"Create Topic"ボタンをクリック

image

これでKafkaの準備ができました。

Producerアプリの作成

Spring Initializrでアプリの雛形を作成します。

curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-producer \
       -d baseDir=demo-kafka-producer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaProducerApplication | tar -xzvf -
cd demo-kafka-producer

Producer用のサンプルコードを作成します。

cat <<'EOF' > src/main/java/com/example/ProducerController.java
package com.example;

import java.util.concurrent.CompletableFuture;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {
    private final KafkaTemplate<String, String> kafkaTemplate;

    private final String topic;

    public ProducerController(KafkaTemplate<String, String> kafkaTemplate, @Value("${demo.topic}") String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }

    @PostMapping(path = "/")
    public CompletableFuture<String> hello(@RequestBody String message) {
        CompletableFuture<SendResult<String, String>> result = this.kafkaTemplate.send(this.topic, message);
        return result.thenApply(r -> r.getProducerRecord().toString());
    }
}
EOF

Kafka環境に依らないプロパティをapplication.propertiesに定義します。

cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.shutdown=graceful
spring.application.name=demo-kafka-producer
EOF

Upstash環境に依存するプロパティをapplication-upstash.propertiesに定義します。

接続するための情報はダッシュボードから確認できます。

image

以下の

  • spring.kafka.bootstrap-servers
  • spring.kafka.jaas.options.password
  • spring.kafka.jaas.options.username

を変更してください。

cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.producer.security.protocol=SASL_SSL
EOF

アプリをビルドし、実行します。profileをupstashに設定して、Upstashに接続できるようにします。

./mvnw clean package -DskipTests
java -jar target/demo-kafka-producer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash

Consumerアプリの作成

Spring Initializrでアプリの雛形を作成します。

curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-consumer \
       -d baseDir=demo-kafka-consumer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer

Consumer用のサンプルコードを作成します。

cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ConsumerController {
    private final List<String> messages = new CopyOnWriteArrayList<>();

    private final Logger log = LoggerFactory.getLogger(ConsumerController.class);

    @GetMapping(path = "")
    public List<String> getMessages() {
        return this.messages;
    }

    @KafkaListener(topics = "${demo.topic}")
    public void onMessage(String message) {
        log.info("onMessage({})", message);
        this.messages.add(message);
    }
}
EOF

Kafka環境に依らないプロパティをapplication.propertiesに定義します。

cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
server.shutdown=graceful
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF

Upstash環境に依存するプロパティをapplication-upstash.propertiesに定義します。

以下の

  • spring.kafka.bootstrap-servers
  • spring.kafka.jaas.options.password
  • spring.kafka.jaas.options.username

を変更してください。

cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.consumer.security.protocol=SASL_SSL
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
EOF

アプリをビルドし、実行します。profileをupstashに設定して、Upstashに接続できるようにします。

./mvnw clean package -DskipTests
java -jar target/demo-kafka-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash

動作確認

Producerアプリにリクエストを送ります。

curl localhost:8080 -H "Content-Type: text/plain" -d "Hello World"
curl localhost:8080 -H "Content-Type: text/plain" -d "Test"   

Consumer側に次のようなログが出力されればOKです。

2024-06-30T02:50:22.952+09:00  INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World)
2024-06-30T02:50:29.202+09:00  INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Test)

また、送ったメッセージはConsumerアプリののAPIから取得できます。

$ curl localhost:8082
["Hello World","Test"]
Found a mistake? Update the entry.
Share this article: