Warning

This article was automatically translated by OpenAI (gpt-4.1).It may be edited eventually, but please be aware that it may contain incorrect information at this time.

kafka-native is a Native Image of Apache Kafka compiled with GraalVM.
By using this, Kafka can start up extremely quickly.

By using this from Testcontainers, you can set up a Spring Boot development environment that allows instant access to Kafka.

Incidentally, Testcontainers can be used not only for testing but also for local development.

Note

If you want to know how it works, please refer to this article.

Also, when you create a project skeleton with Spring Initializr, the configuration for using kafka-native is automatically generated,
so in reality, there is zero effort required to set up the environment described in the article title (only Docker is needed).

Creating a Consumer App

Create an app skeleton using Spring Initializr.

curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-consumer \
       -d baseDir=demo-kafka-consumer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer

Under src/test/java, the Testcontainers configuration is set up. The apache/kafka-native:latest image is used.

$ cat src/test/java/com/example/TestDemoKafkaConsumerApplication.java 
package com.example;

import org.springframework.boot.SpringApplication;

public class TestDemoKafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.from(DemoKafkaConsumerApplication::main).with(TestcontainersConfiguration.class).run(args);
    }

}

$ cat src/test/java/com/example/TestcontainersConfiguration.java     
package com.example;

import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Bean;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@TestConfiguration(proxyBeanMethods = false)
class TestcontainersConfiguration {

    @Bean
    @ServiceConnection
    KafkaContainer kafkaContainer() {
        return new KafkaContainer(DockerImageName.parse("apache/kafka-native:latest"));
    }

}

Next, create sample code for the Consumer. Reuse the code created in this article.

cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ConsumerController {
    private final List<String> messages = new CopyOnWriteArrayList<>();

    private final Logger log = LoggerFactory.getLogger(ConsumerController.class);

    @GetMapping(path = "")
    public List<String> getMessages() {
        return this.messages;
    }

    @KafkaListener(topics = "${demo.topic}")
    public void onMessage(String message) {
        log.info("onMessage({})", message);
        this.messages.add(message);
    }
}
EOF
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF

Verifying Operation

Let's try out the app we've created so far using Testcontainers.
If you're using an IDE, just run the main method of TestDemoKafkaConsumerApplication under src/test/java.
If you want to use the Maven Plugin, you can run it with the following command:

./mvnw spring-boot:test-run

The app will start up immediately along with the following startup logs.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v3.4.5)

2025-05-16T13:04:24.718+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] c.example.DemoKafkaConsumerApplication   : Starting DemoKafkaConsumerApplication using Java 21.0.6 with PID 36472 (/Users/toshiaki/git/demo-kafka-consumer/target/classes started by toshiaki in /Users/toshiaki/git/demo-kafka-consumer)
2025-05-16T13:04:24.718+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] c.example.DemoKafkaConsumerApplication   : No active profile set, falling back to 1 default profile: "default"
2025-05-16T13:04:25.037+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8082 (http)
2025-05-16T13:04:25.041+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2025-05-16T13:04:25.041+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.40]
2025-05-16T13:04:25.057+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2025-05-16T13:04:25.057+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 326 ms
2025-05-16T13:04:25.102+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.images.PullPolicy     : Image pull policy will be performed by: DefaultPullPolicy()
2025-05-16T13:04:25.103+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.utility.ImageNameSubstitutor         : Image name substitution will be performed by: DefaultImageNameSubstitutor (composite of 'ConfigurationFileImageNameSubstitutor' and 'PrefixingImageNameSubstitutor')
2025-05-16T13:04:25.108+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Testcontainers version: 1.20.6
2025-05-16T13:04:25.170+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.d.DockerClientProviderStrategy       : Loaded org.testcontainers.dockerclient.UnixSocketClientProviderStrategy from ~/.testcontainers.properties, will try it first
2025-05-16T13:04:25.255+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.d.DockerClientProviderStrategy       : Found Docker environment with local Unix socket (unix:///var/run/docker.sock)
2025-05-16T13:04:25.255+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Docker host IP address is localhost
2025-05-16T13:04:25.267+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Connected to docker: 
  Server Version: 27.5.1
  API Version: 1.47
  Operating System: OrbStack
  Total Memory: 96439 MB
2025-05-16T13:04:25.300+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.testcontainers/ryuk:0.11.0            : Creating container for image: testcontainers/ryuk:0.11.0
2025-05-16T13:04:25.319+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.utility.RegistryAuthLocator          : Credential helper/store (docker-credential-osxkeychain) does not have credentials for https://index.docker.io/v1/
2025-05-16T13:04:25.401+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.testcontainers/ryuk:0.11.0            : Container testcontainers/ryuk:0.11.0 is starting: 4db5d8b10e4204e5654150e6a653fe59c74def47ddca213ab76b48ef4f5a977b
2025-05-16T13:04:25.578+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.testcontainers/ryuk:0.11.0            : Container testcontainers/ryuk:0.11.0 started in PT0.278707S
2025-05-16T13:04:25.580+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.t.utility.RyukResourceReaper           : Ryuk started - will monitor and terminate Testcontainers containers on JVM exit
2025-05-16T13:04:25.581+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : Checking the system...
2025-05-16T13:04:25.581+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] org.testcontainers.DockerClientFactory   : ✔︎ Docker server version should be at least 1.6.0
2025-05-16T13:04:25.581+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.apache/kafka-native:latest            : Creating container for image: apache/kafka-native:latest
2025-05-16T13:04:25.609+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.apache/kafka-native:latest            : Container apache/kafka-native:latest is starting: 90fdfa0b9b144fa2905fca8b0ee467ca84cec471320a2bdfa61852f68bc79990
2025-05-16T13:04:25.949+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] tc.apache/kafka-native:latest            : Container apache/kafka-native:latest started in PT0.367594S
2025-05-16T13:04:26.105+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 1 endpoint beneath base path '/actuator'
2025-05-16T13:04:26.130+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8082 (http) with context path '/'
2025-05-16T13:04:26.141+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.include.jmx.reporter = true
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:32769]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-demo-kafka-consumer-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    enable.metrics.push = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = demo-kafka-consumer
    group.instance.id = null
    group.protocol = classic
    group.remote.assignor = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metadata.recovery.strategy = none
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.max.ms = 1000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 45000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2025-05-16T13:04:26.153+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
2025-05-16T13:04:26.194+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.8.1
2025-05-16T13:04:26.194+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 70d6ff42debf7e17
2025-05-16T13:04:26.194+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1747368266194
2025-05-16T13:04:26.201+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Subscribed to topic(s): demo
2025-05-16T13:04:26.208+09:00  INFO 36472 --- [demo-kafka-consumer] [           main] c.example.DemoKafkaConsumerApplication   : Started DemoKafkaConsumerApplication in 1.598 seconds (process running for 1.703)
2025-05-16T13:04:26.295+09:00  WARN 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Error while fetching metadata with correlation id 2 : {demo=UNKNOWN_TOPIC_OR_PARTITION}
2025-05-16T13:04:26.295+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Cluster ID: 4L6g3nShT-eMCtK--X86sw
2025-05-16T13:04:26.411+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Discovered group coordinator localhost:32769 (id: 2147483646 rack: null)
2025-05-16T13:04:26.412+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group
2025-05-16T13:04:26.420+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Request joining group due to: need to re-join with the given member-id: consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee
2025-05-16T13:04:26.421+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] (Re-)joining group
2025-05-16T13:04:26.423+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully joined group with generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'}
2025-05-16T13:04:26.425+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Finished assignment for group at generation 1: {consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee=Assignment(partitions=[demo-0])}
2025-05-16T13:04:26.435+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Successfully synced group in generation Generation{generationId=1, memberId='consumer-demo-kafka-consumer-1-aeed8fa9-f2b8-4062-9bb0-18b98b7061ee', protocol='range'}
2025-05-16T13:04:26.435+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Notifying assignor about the new Assignment(partitions=[demo-0])
2025-05-16T13:04:26.436+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] k.c.c.i.ConsumerRebalanceListenerInvoker : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Adding newly assigned partitions: demo-0
2025-05-16T13:04:26.440+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Found no committed offset for partition demo-0
2025-05-16T13:04:26.442+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-demo-kafka-consumer-1, groupId=demo-kafka-consumer] Resetting offset for partition demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:32769 (id: 1 rack: null)], epoch=0}}.
2025-05-16T13:04:26.443+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : demo-kafka-consumer: partitions assigned: [demo-0]

Kafka is started using the kafka-native image, and you can see the accessible port from the following log:

    bootstrap.servers = [localhost:32769]

Alternatively, you can also check with the following command:

$ docker ps | grep kafka-native                                                              
90fdfa0b9b14   apache/kafka-native:latest   "sh -c 'while [ ! -f…"   About a minute ago   Up About a minute   0.0.0.0:32769->9092/tcp, [::]:32769->9092/tcp   funny_pascal

Send messages using the Kafka CLI.

You can install the Kafka CLI with brew.

brew install kafka

Send messages with the following command:

cat <<EOF | kafka-console-producer --bootstrap-server localhost:32769 --topic demo
Hello World 1
Hello World 2
Hello World 3
EOF

If you check the app's logs, you can see that the messages have been received.

2025-05-16T13:08:02.225+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World 1)
2025-05-16T13:08:02.225+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World 2)
2025-05-16T13:08:02.225+09:00  INFO 36472 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World 3)

You can also retrieve the messages via HTTP with the following command:

$ curl -s http://localhost:8082 | jq . 
[
  "Hello World 1",
  "Hello World 2",
  "Hello World 3"
]

By using kafka-native, we were able to create a Spring Boot development environment that allows instant access to Kafka.
With Testcontainers, you can easily spin up Kafka even in your local development environment, which is extremely convenient.
Testcontainers can be used not only for testing but also for local development environments, so be sure to give it a try.

Found a mistake? Update the entry.
Share this article: