kafka-nativeはGraalVMでコンパイルされたApache KafkaのNative Imageです。
これを使うとKafkaの起動が非常に早くなります。

Testcontainersからこれを使うことで、瞬時にKafkaにアクセスできるSpring Boot開発環境を用意できます。

ちなみに、Testcontainersはテスト以外にローカル開発でも使えます。

Note

仕組みを知りたい方はこちらの記事を参照してください。

また、Spring Initializrでプロジェクトの雛形を作ると、kafka-nativeを使うための設定が自動で生成されるので、
記事タイトルの環境を作る手間は実は0です(Dockerのみ必要)。

Consumerアプリの作成

Spring Initializrでアプリの雛形を作成します。

curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-consumer \
       -d baseDir=demo-kafka-consumer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer

src/test/java以下にTestcontainersの設定がされています。apache/kafka-native:latestのイメージが使われています。

$ cat src/test/java/com/example/TestDemoKafkaConsumerApplication.java 
package com.example;

import org.springframework.boot.SpringApplication;

public class TestDemoKafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.from(DemoKafkaConsumerApplication::main).with(TestcontainersConfiguration.class).run(args);
    }

}

$ cat src/test/java/com/example/TestcontainersConfiguration.java     
package com.example;

import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
class TestcontainersConfiguration {

    @Bean
    @ServiceConnection
    KafkaContainer kafkaContainer() {
        return new KafkaContainer(DockerImageName.parse("apache/kafka-native:latest"));
    }

}

次にConsumer用のサンプルコードを作成します。こちらの記事で作成したコードを再利用します。

cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ConsumerController {
    private final List<String> messages = new CopyOnWriteArrayList<>();

    private final Logger log = LoggerFactory.getLogger(ConsumerController.class);

    @GetMapping(path = "")
    public List<String> getMessages() {
        return this.messages;
    }

    @KafkaListener(topics = "${demo.topic}")
    public void onMessage(String message) {
        log.info("onMessage({})", message);
        this.messages.add(message);
    }
}
EOF
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF

動作確認

ここまで作成したアプリをTestcontainersを使って試します。
IDEであれば、src/test/java配下のTestDemoKafkaConsumerApplicationmainメソッドを実行すれば良いです。
Maven Pluginを使う場合は、以下のコマンドで実行できます。

./mvnw spring-boot:test-run

次のような起動ログとともにすぐにアプリは起動します。

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v3.4.5)

2025-05-16T13:04:24.718+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] c.example.DemoKafkaConsumerApplication   : Starting DemoKafkaConsumerApplication using Java 21.0.6 with PID 36472 (/Users/toshiaki/git/demo-kafka-consumer/target/classes started by toshiaki in /Users/toshiaki/git/demo-kafka-consumer)
2025-05-16T13:04:24.718+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] c.example.DemoKafkaConsumerApplication   : No active profile set, falling back to 1 default profile: "default"
2025-05-16T13:04:25.037+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8082 (http)
2025-05-16T13:04:25.041+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2025-05-16T13:04:25.041+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.40]
2025-05-16T13:04:25.057+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2025-05-16T13:04:25.057+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 326 ms
2025-05-16T13:04:25.102+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.images.PullPolicy     : Image pull policy will be performed by: DefaultPullPolicy()
2025-05-16T13:04:25.103+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.utility.ImageNameSubstitutor         : Image name substitution will be performed by: DefaultImageNameSubstitutor (composite of 'ConfigurationFileImageNameSubstitutor' and 'PrefixingImageNameSubstitutor')
2025-05-16T13:04:25.108+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Testcontainers version: 1.20.6
2025-05-16T13:04:25.170+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.d.DockerClientProviderStrategy       : Loaded org.testcontainers.dockerclient.UnixSocketClientProviderStrategy from ~/.testcontainers.properties, will try it first
2025-05-16T13:04:25.255+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.d.DockerClientProviderStrategy       : Found Docker environment with local Unix socket (unix:///var/run/docker.sock)
2025-05-16T13:04:25.255+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Docker host IP address is localhost
2025-05-16T13:04:25.267+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Connected to docker: 
  Server Version: 27.5.1
  API Version: 1.47
  Operating System: OrbStack
  Total Memory: 96439 MB
2025-05-16T13:04:25.300+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.testcontainers/ryuk:0.11.0            : Creating container for image: testcontainers/ryuk:0.11.0
2025-05-16T13:04:25.319+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.utility.RegistryAuthLocator          : Credential helper/store (docker-credential-osxkeychain) does not have credentials for https://index.docker.io/v1/
2025-05-16T13:04:25.401+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.testcontainers/ryuk:0.11.0            : Container testcontainers/ryuk:0.11.0 is starting: 4db5d8b10e4204e5654150e6a653fe59c74def47ddca213ab76b48ef4f5a977b
2025-05-16T13:04:25.578+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.testcontainers/ryuk:0.11.0            : Container testcontainers/ryuk:0.11.0 started in PT0.278707S
2025-05-16T13:04:25.580+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.utility.RyukResourceReaper           : Ryuk started - will monitor and terminate Testcontainers containers on JVM exit
2025-05-16T13:04:25.581+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Checking the system...
2025-05-16T13:04:25.581+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : ✔︎ Docker server version should be at least 1.6.0
2025-05-16T13:04:25.581+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.apache/kafka-native:latest            : Creating container for image: apache/kafka-native:latest
2025-05-16T13:04:25.609+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.apache/kafka-native:latest            : Container apache/kafka-native:latest is starting: 90fdfa0b9b144fa2905fca8b0ee467ca84cec471320a2bdfa61852f68bc79990
2025-05-16T13:04:25.949+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.apache/kafka-native:latest            : Container apache/kafka-native:latest started in PT0.367594S
2025-05-16T13:04:26.105+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 1 endpoint beneath base path '/actuator'
2025-05-16T13:04:26.130+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8082 (http) with context path '/'
2025-05-16T13:04:26.141+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.include.jmx.reporter = true
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:32769]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-demo-kafka-consumer-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    enable.metrics.push = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = demo-kafka-consumer
    group.instance.id = null
    group.protocol = classic
    group.remote.assignor = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metadata.recovery.strategy = none
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.max.ms = 1000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 45000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2025-05-16T13:04:26.153+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
2025-05-16T13:04:26.194+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.8.1
2025-05-16T13:04:26.194+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 70d6ff42debf7e17
2025-05-16T13:04:26.194+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1747368266194
2025-05-16T13:04:26.201+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Subscribed to topic(s): demo
2025-05-16T13:04:26.208+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] c.example.DemoKafkaConsumerApplication   : Started DemoKafkaConsumerApplication in 1.598 seconds (process running for 1.703)
2025-05-16T13:04:26.295+09:00  WARN 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Error while fetching metadata with correlation id 2 : {demo=UNKNOWN_TOPIC_OR_PARTITION}
2025-05-16T13:04:26.295+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Cluster ID: 4L6g3nShT-eMCtK--X86sw
2025-05-16T13:04:26.411+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Discovered group coordinator localhost:32769 (id: 2147483646 rack: null)
2025-05-16T13:04:26.412+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group
2025-05-16T13:04:26.420+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Request joining group due to: need to re-join with the given member-id: consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee
2025-05-16T13:04:26.421+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group
2025-05-16T13:04:26.423+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully joined group with generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'}
2025-05-16T13:04:26.425+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Finished assignment for group at generation 1: {consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee=Assignment(partitions=[demo-0])}
2025-05-16T13:04:26.435+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully synced group in generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'}
2025-05-16T13:04:26.435+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Notifying assignor about the new Assignment(partitions=[demo-0])
2025-05-16T13:04:26.436+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] k.c.c.i.ConsumerRebalanceListenerInvoker : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Adding newly assigned partitions: demo-0
2025-05-16T13:04:26.440+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Found no committed offset for partition demo-0
2025-05-16T13:04:26.442+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Resetting offset for partition demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:32769 (id: 1 rack: null)], epoch=0}}.
2025-05-16T13:04:26.443+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : demo-kafka-consumer: partitions assigned: [demo-0]

Kafkaはkafka-nativeのイメージを使って起動し、アクセス可能なポートは次のログからわかります。

    bootstrap.servers = [localhost:32769]

あるいは次のコマンドでも確認できます。

$ docker ps | grep kafka-native                                                              
90fdfa0b9b14   apache/kafka-native:latest   "sh -c 'while [ ! -f…"   About a minute ago   Up About a minute   0.0.0.0:32769->9092/tcp, [::]:32769->9092/tcp   funny_pascal

KafkaのCLIを使って、メッセージを送ります。

KafkaのCLIはbrewでインストールできます。

brew install kafka

次のコマンドでメッセージを送ります。

cat <<EOF | kafka-console-producer --bootstrap-server localhost:32769 --topic demo
Hello World 1
Hello World 2
Hello World 3
EOF

アプリ側のログを確認すると、メッセージが受信されていることがわかります。

2025-05-16T13:08:02.225+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World 1)
2025-05-16T13:08:02.225+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World 2)
2025-05-16T13:08:02.225+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World 3)

次のコマンドで、メッセージをHTTPでも取得できます。

$ curl -s http://localhost:8082 | jq . 
[
  "Hello World 1",
  "Hello World 2",
  "Hello World 3"
]

kafa-nativeを使うことで、瞬時にKafkaにアクセスできるSpring Boot開発環境を作成できました。
Testcontainersを使うことで、ローカル開発環境でもKafkaを簡単に立ち上げることができるので、非常に便利です。
Testcontainersはテストだけでなく、ローカル開発環境でも使えるので、ぜひ試してみてください。

Found a mistake? Update the entry.
Share this article: