When building microservices, designing inter-service communication can be tricky. With synchronous REST API communication, if the receiving side goes down, the sender is affected as well. This is where asynchronous messaging with Kafka comes in.
This article explains how to implement Kafka Producers and Consumers from scratch in a Spring Boot 3.x application using spring-kafka. We’ll cover the basics of KafkaTemplate and @KafkaListener, along with error handling, retries, and Dead Letter Topic (DLT) configuration in a single coherent path. For the distinction between this and @Async (in-process thread-based asynchronous processing), see the async processing guide.
If you’re considering event-driven design within a JVM process at a smaller scale than Kafka, also check out the ApplicationEvent loose coupling guide and how to achieve a modular monolith with Spring Modulith to organize your options based on use case.
Local environment: Starting Kafka with Docker Compose
A KRaft configuration without Zookeeper is the shortest way to write this.
# docker-compose.yml
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: [email protected]:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
Once you start it with docker compose up -d, you’re ready to go. For how to write Dockerfiles and multi-stage builds, see the Docker containerization guide.
Adding dependencies and configuring application.yml
// build.gradle
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
}
Since it’s included in the Spring Boot managed BOM, no version specification is needed.
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-app-group
# earliest reads from the partition's beginning when the Consumer group isn't yet registered. In production, latest (start from newest) is more common
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Starting with StringSerializer / StringDeserializer is the safe bet. When you want to handle JSON objects, switch to JsonSerializer.
Sending messages with KafkaTemplate
Sending POJOs with JsonSerializer
You’ll often want to send POJOs, not just strings. Switching to JsonSerializer / JsonDeserializer lets you send and receive domain objects directly.
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.order"
spring.json.value.default.type: "com.example.order.OrderEvent"
public record OrderEvent(String orderId, String item, int quantity) {}
@Service
@RequiredArgsConstructor
public class OrderEventProducer {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void send(OrderEvent event) {
kafkaTemplate.send("orders", event.orderId(), event);
}
}
Always explicitly specify spring.json.trusted.packages. Specifying * allows deserialization to any class, posing an RCE risk from untrusted payloads.
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendOrder(String orderId, String payload) {
kafkaTemplate.send("orders", orderId, payload)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Send failed orderId={}", orderId, ex);
} else {
log.info("Send succeeded offset={}",
result.getRecordMetadata().offset());
}
});
}
}
send() returns a CompletableFuture. Register a callback with whenComplete and log send successes and failures.
Receiving messages with @KafkaListener
Parallel consumption with concurrency
By default, @KafkaListener runs on a single thread. For topics with multiple partitions, you can parallelize the consumer side to increase throughput.
@KafkaListener(topics = "orders", groupId = "my-app-group", concurrency = "3")
public void consume(String message) {
log.info("Received: {}", message);
}
The basic rule is to set concurrency with the number of partitions as the upper bound. If it exceeds the partition count, the surplus threads go idle.
Reliable processing with manual commit (AckMode)
By default, offsets are auto-committed, but for the “commit only when the DB write succeeds” scenario, switch to manual commit.
spring:
kafka:
consumer:
enable-auto-commit: false
listener:
ack-mode: MANUAL_IMMEDIATE
@KafkaListener(topics = "orders", groupId = "my-app-group")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
orderService.save(record.value());
ack.acknowledge();
} catch (Exception e) {
log.error("Processing failed - not committing, waiting for redelivery", e);
throw e;
}
}
MANUAL_IMMEDIATE commits immediately when called, while MANUAL commits in bulk at the end of the polling cycle. To guarantee At-Least-Once, the standard approach is manual commit + idempotent processing design.
@Component
@Slf4j
public class OrderConsumer {
// When only the payload is needed
@KafkaListener(topics = "orders", groupId = "my-app-group")
public void consume(String message) {
log.info("Received: {}", message);
}
// When metadata (offset, partition) is needed
@KafkaListener(topics = "orders-detail", groupId = "my-app-group")
public void consumeWithMeta(ConsumerRecord<String, String> record) {
log.info("key={}, partition={}, offset={}",
record.key(), record.partition(), record.offset());
}
}
If you only need to process the payload, receiving it as a direct argument is more readable. If you need metadata like offset, partition, or timestamp, use ConsumerRecord.
Verifying behavior
Verify by calling the Producer through a REST API endpoint.
curl -X POST http://localhost:8080/orders \
-H "Content-Type: application/json" \
-d '{"orderId":"001","item":"coffee"}'
Success is when the message appears in the Consumer log. For automated tests, @EmbeddedKafka is handy. Specifying spring.embedded.kafka.brokers with @TestPropertySource is essential — without it, it falls back to localhost:9092 from application.yml and fails to connect.
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"orders"})
@TestPropertySource(properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
class OrderConsumerTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@SpyBean
private OrderConsumer orderConsumer;
@Test
void canReceiveMessage() {
kafkaTemplate.send("orders", "key1", "test-message");
verify(orderConsumer, timeout(10000)).consume(any());
}
}
Using @SpyBean and verify() lets you confirm receipt without dragging state management like CountDownLatch into the Consumer class itself.
Configuring error handling and retries with DefaultErrorHandler
With default settings, when a processing exception occurs, infinite retries kick in and the consumer gets stuck. Explicitly configure a DefaultErrorHandler.
import org.springframework.kafka.support.serializer.DeserializationException;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory,
KafkaTemplate<String, String> kafkaTemplate) {
// 1-second interval, max 3 retries (4 attempts total)
var backOff = new FixedBackOff(1000L, 3L);
var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
var errorHandler = new DefaultErrorHandler(recoverer, backOff);
// Use the DeserializationException from spring-kafka, not Jackson's same-named class
errorHandler.addNotRetryableExceptions(DeserializationException.class);
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
If you want exponential backoff, pass ExponentialBackOff instead of FixedBackOff.
var backOff = new ExponentialBackOff(1000L, 2.0); // 1s → 2s → 4s...
backOff.setMaxAttempts(4);
For a comparison with exception design in REST APIs, also check out the exception handling guide.
Monitoring the Dead Letter Topic (DLT)
Messages that exceed the retry limit are automatically forwarded to {original-topic-name}.DLT. For the orders topic, that’s orders.DLT.
@KafkaListener(topics = "orders.DLT", groupId = "my-app-dlt-group")
public void consumeDlt(ConsumerRecord<String, String> record) {
log.error("DLT received - manual intervention required key={}, value={}",
record.key(), record.value());
// Send alert, record to admin DB, etc.
}
Decide at the design stage whether messages accumulated in the DLT will be requeued to the original topic after fixing the root cause, or handled manually.
Things to keep in mind for production
SSL/SASL authentication configuration example
Authentication configuration is required to connect to managed Kafka (Confluent Cloud, MSK, etc.).
spring:
kafka:
bootstrap-servers: pkc-xxxx.confluent.cloud:9092
security:
protocol: SASL_SSL
properties:
sasl.mechanism: PLAIN
sasl.jaas.config: |
org.apache.kafka.common.security.plain.PlainLoginModule required
username="${KAFKA_API_KEY}"
password="${KAFKA_API_SECRET}";
Manage credentials via environment variables or encryption with Jasypt, and avoid plaintext in application.yml.
Relationship between partition count and consumer count
If the number of consumer instances in a group exceeds the partition count, the surplus instances process nothing. For throughput design, maintain “partition count ≥ consumer count” and estimate throughput together with the concurrency setting.
Monitoring consumer lag
For local checks, kafka-consumer-groups.sh --describe is the basic tool.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-app-group
In production, the standard pattern is to visualize Micrometer metrics (kafka.consumer.records.lag.max) from Prometheus + kafka_exporter or Spring Boot Actuator in Grafana, and fire alerts when lag exceeds a threshold.
Summary
With spring-kafka, integrating Spring Boot with Kafka goes smoothly.
- Implement the Producer with
KafkaTemplate.send() - Implement the Consumer with
@KafkaListener - Retry and DLT forwarding with
DefaultErrorHandler+DeadLetterPublishingRecoverer
Cover these three points, and you’ll have a solid foundation for asynchronous inter-service communication. For large-scale data processing combining Kafka with Spring Batch, also check out the Spring Batch guide.
Related Articles
- How to loosely couple modules with Spring Boot’s ApplicationEvent - An introduction to in-process event-driven design
- How to achieve a modular monolith with Spring Modulith - As a step before introducing Kafka
- Implementing Spring Boot’s GlobalExceptionHandler for production - Exception design on the REST API side
- How to encrypt confidential information in config files with Jasypt in Spring Boot - Managing Kafka connection credentials