You write events to Kafka after a DB update — a common pattern, but are you naively calling kafkaTemplate.send() inside @Transactional? That’s actually a pretty dangerous implementation.
In this article, we’ll walk through how to solve the dual-write problem between DB and Kafka using the Transactional Outbox pattern, together with real code.
What Is the Dual Write Problem?
Let’s start by looking at problematic code.
@Transactional
public void placeOrder(OrderCommand cmd) {
Order order = orderRepository.save(new Order(cmd));
kafkaTemplate.send("order.created", order.getId().toString(), toJson(order));
}
At first glance it looks fine, but writes that span two external systems — the DB and Kafka — have no atomicity. Consistency breaks in these two scenarios:
kafkaTemplate.send()succeeds, but the subsequent DB commit rolls back. Only Kafka ends up with an event for a “nonexistent order”- The DB commit succeeds, but
kafkaTemplate.send()fails just before it. Downstream services never get notified
In theory you could solve this with a distributed transaction via JTA/XA, but Kafka isn’t suited for 2PC, and the operational overhead is heavy — it isn’t realistic.
The Outbox Pattern in a Nutshell
The idea is simple: “Stop sending to Kafka directly, and instead write to an Outbox table inside the same DB.”
App ──▶ DB (business + outbox in the same transaction)
│
▼
Relay (Poller / CDC)
│
▼
Kafka ──▶ Consumer
If you commit the business data and the Outbox record in the same transaction, they both succeed together or both fail together. After that, a separate process reads from the Outbox and forwards events to Kafka — a two-stage approach.
Note that this approach assumes at-least-once delivery. Design your consumers to tolerate duplicates. Unlike Spring ApplicationEvent, which is event delivery within a single JVM, this is a mechanism for reliably propagating events across processes and services.
Designing the Outbox Table
You can start with something minimal like this. retry_count is for retry control, and I’ll include a processed_event table for dedup of already-PUBLISHED events.
CREATE TABLE outbox_event (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL,
aggregate_id VARCHAR(64) NOT NULL,
event_type VARCHAR(64) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(16) NOT NULL DEFAULT 'PENDING',
retry_count INT NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_pending
ON outbox_event (next_attempt_at) WHERE status = 'PENDING';
CREATE TABLE processed_event (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
A partial index limited to status = 'PENDING' keeps the Poller’s query light, while putting id directly into the Kafka message header lets consumers dedupe. Keeping the payload as JSONB makes it easier to evolve the event schema later.
Writing to the Outbox from the Business Logic
A plain JPA entity is fine. To handle JSONB in Hibernate 6, @JdbcTypeCode(SqlTypes.JSON) is recommended, but for simplicity you can also treat it as TEXT and put it in a String — the JSON serialization on the application side works just fine.
@Entity
@Table(name = "outbox_event")
@Getter @Setter
public class OutboxEvent {
@Id private UUID id;
private String aggregateType;
private String aggregateId;
private String eventType;
@JdbcTypeCode(SqlTypes.JSON)
private String payload;
private String status;
private int retryCount;
private OffsetDateTime nextAttemptAt;
private OffsetDateTime createdAt;
private OffsetDateTime publishedAt;
}
On the service side, save to the Outbox inside the same transaction as the business data. Standardizing on a policy of letting things like createdAt be filled by the DDL’s DEFAULT now() rather than the application makes it easier for newcomers to avoid pitfalls.
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
@Transactional
public void placeOrder(OrderCommand cmd) throws JsonProcessingException {
Order order = orderRepository.save(new Order(cmd));
OutboxEvent ev = new OutboxEvent();
ev.setId(UUID.randomUUID());
ev.setAggregateType("Order");
ev.setAggregateId(order.getId().toString());
ev.setEventType("OrderCreated");
ev.setPayload(objectMapper.writeValueAsString(order));
ev.setStatus("PENDING");
outboxRepository.save(ev);
}
}
Relaying with an @Scheduled Poller
This is the core of the article. We periodically pick up PENDING records and send them to Kafka. The important point is to “synchronously wait for send success, then update the status inside the same transaction.” If you write the DB update inside a whenComplete async callback, the callback runs on a different thread and after the FOR UPDATE lock has been released, effectively as an auto-commit — your design intent diverges from actual behavior, so avoid that.
@Component
@RequiredArgsConstructor
public class OutboxPoller {
private final JdbcTemplate jdbc;
private final KafkaTemplate<String, String> kafkaTemplate;
private static final RowMapper<OutboxEvent> MAPPER = (rs, i) -> {
OutboxEvent e = new OutboxEvent();
e.setId(UUID.fromString(rs.getString("id")));
e.setAggregateId(rs.getString("aggregate_id"));
e.setEventType(rs.getString("event_type"));
e.setPayload(rs.getString("payload"));
e.setRetryCount(rs.getInt("retry_count"));
return e;
};
@Scheduled(fixedDelay = 1000)
@Transactional
public void relay() {
List<OutboxEvent> rows = jdbc.query("""
SELECT id, aggregate_id, event_type, payload, retry_count
FROM outbox_event
WHERE status = 'PENDING' AND next_attempt_at <= now()
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""", MAPPER);
for (OutboxEvent ev : rows) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"order.events", ev.getAggregateId(), ev.getPayload());
record.headers().add("event-id",
ev.getId().toString().getBytes(StandardCharsets.UTF_8));
try {
kafkaTemplate.send(record).get(5, TimeUnit.SECONDS);
jdbc.update(
"UPDATE outbox_event SET status='PUBLISHED', published_at=now() WHERE id=?",
ev.getId());
} catch (Exception e) {
int next = ev.getRetryCount() + 1;
long backoffSec = (long) Math.min(300, Math.pow(2, next));
String status = next >= 10 ? "FAILED" : "PENDING";
jdbc.update("""
UPDATE outbox_event
SET retry_count=?, next_attempt_at=now() + (? || ' seconds')::interval, status=?
WHERE id=?
""", next, backoffSec, status, ev.getId());
}
}
}
}
FOR UPDATE SKIP LOCKED is syntax available in PostgreSQL 9.5+ and MySQL 8, and it lets multiple application instances run the Poller simultaneously without colliding — each grabs a different set of records (see the PostgreSQL SELECT documentation). The exponential backoff doubles each time, capped at 300 seconds and 10 attempts before the record is moved to the dead-letter FAILED status.
At-Least-Once and Idempotent Consumers
In the scenario “Kafka send succeeds → crash before status update,” the same event will be sent again. This is the inherent nature of at-least-once. The standard approach on the consumer side is to dedupe by event_id. Since the producer always attaches the event-id header, we pull it out:
@KafkaListener(topics = "order.events")
@Transactional
public void onEvent(ConsumerRecord<String, String> rec) {
UUID eventId = UUID.fromString(new String(
rec.headers().lastHeader("event-id").value(), StandardCharsets.UTF_8));
if (processedEventRepository.existsById(eventId)) {
return;
}
// ... business logic ...
processedEventRepository.save(new ProcessedEvent(eventId));
}
If your business logic is naturally idempotent via UPSERT or state-transition checks, you may not need a dedup table at all. Don’t mistake this for achieving exactly-once. The Outbox pattern only guarantees “the DB and Kafka don’t get out of sync” — from the consumer’s perspective, duplicates can still arrive, and that’s something to keep in mind.
Comparison with the Debezium Approach
Another option is the CDC approach: have Debezium read the DB’s WAL/binlog and forward to Kafka.
- The Poller’s strength is simplicity of architecture. It’s self-contained within a Spring Boot app, with no need to separately operate a Kafka Connect cluster
- Debezium’s strength is low latency and not having to maintain a scheduler in your application. The Outbox Event Router even handles routing for you
If your team already runs Kafka Connect, leaning toward Debezium makes life easier. Otherwise, the Poller approach is more than practical to start with.
Verifying Behavior with Testcontainers
For integration tests, the easiest path is to spin up PostgreSQL and Kafka with Testcontainers. Just the key points:
@SpringBootTest
@Testcontainers
class OutboxIntegrationTest {
@Container static PostgreSQLContainer<?> pg = new PostgreSQLContainer<>("postgres:16");
@Container static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
@DynamicPropertySource
static void props(DynamicPropertyRegistry r) {
r.add("spring.datasource.url", pg::getJdbcUrl);
r.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired OrderService orderService;
@Autowired KafkaTestConsumer consumer;
@Test
void publishesAfterCommit() {
orderService.placeOrder(new OrderCommand("item-1"));
await().atMost(10, SECONDS).untilAsserted(() ->
assertThat(consumer.received()).hasSize(1));
}
@Test
void noPublishOnRollback() {
assertThrows(RuntimeException.class,
() -> orderService.placeOrderAndFail(new OrderCommand("item-2")));
await().during(3, SECONDS).untilAsserted(() ->
assertThat(consumer.received()).isEmpty());
}
}
When business logic rolls back due to an exception, no record is left in the Outbox, and naturally nothing flows to Kafka. That’s the confirmation that the Outbox pattern is working correctly.
Operational Considerations
The Outbox table grows unbounded if you leave it alone. Prepare a job that deletes PUBLISHED records after a few days to a few weeks, or moves them to a separate archive table.
For monitoring, watching these three metrics gives you peace of mind:
- Number of unsent (PENDING) records
- Age of the oldest PENDING record
- Distribution of retry counts
When running the Poller across multiple instances, SKIP LOCKED is generally enough on its own, but if there’s processing that you want to run on only one instance, ShedLock is also an option.
Summary
The Transactional Outbox pattern is an unglamorous but solid way to solve the pitfall of hitting Kafka directly inside @Transactional, by adding a single buffer in the form of the Outbox table. The Poller can be written in just a few dozen lines, and as long as the consumer is idempotent it can withstand real-world operations.
For related topics, see also the Saga pattern explainer covering compensating transactions, Kafka Producer/Consumer fundamentals, Spring Boot transaction management, and the ApplicationEvent intro for in-JVM events.