DB更新したあとにKafkaへイベントを流す。よくある処理ですが、@Transactionalの中でkafkaTemplate.send()を素直に呼んでいませんか。これ、けっこう危ない実装なんですよね。
この記事では、DBとKafkaの二重書き込み問題(Dual Write)をTransactional Outboxパターンで解決する方法を、実コードと一緒に整理していきます。
Dual Write問題とは何か
問題のあるコードを先に見てしまいましょう。
@Transactional
public void placeOrder(OrderCommand cmd) {
Order order = orderRepository.save(new Order(cmd));
kafkaTemplate.send("order.created", order.getId().toString(), toJson(order));
}
一見問題なさそうですが、DBとKafkaという2つの外部システムにまたがる書き込みには原子性がありません。次の2パターンで整合性が壊れます。
kafkaTemplate.send()は成功したが、その後のDBコミットがロールバック。Kafka側にだけ「存在しない注文」のイベントが流れる- DBコミットは成功したが、
kafkaTemplate.send()が直前で失敗。後続サービスに通知が届かない
JTA/XAで分散トランザクションを張れば理屈上は解決しますが、Kafkaは2PCに向きませんし、運用コストも重いので現実的ではありません。
Outboxパターンの全体像
発想はシンプルです。「Kafkaへ直接送るのをやめて、同じDBの中にあるOutboxテーブルに書き込む」だけ。
App ──▶ DB(business + outbox を同一トランザクション)
│
▼
Relay(Poller / CDC)
│
▼
Kafka ──▶ Consumer
ビジネスデータとOutboxレコードを同一トランザクションでコミットすれば、両方一緒に成功するか、両方一緒に失敗します。そのあと別プロセスがOutboxを読んでKafkaへ流す、という二段構えです。
なお、この方式は at-least-once配信 が前提になります。コンシューマ側で重複を許容できる設計にしておきましょう。JVM内のイベント配信であるSpring ApplicationEventとは異なり、こちらはプロセス間・サービス間でイベントを確実に伝播させる仕組みです。
Outboxテーブルの設計
最低限こんな形でスタートできます。retry_countはリトライ制御用、PUBLISHED済みdedup用のprocessed_eventテーブルも一緒に載せておきます。
CREATE TABLE outbox_event (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL,
aggregate_id VARCHAR(64) NOT NULL,
event_type VARCHAR(64) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(16) NOT NULL DEFAULT 'PENDING',
retry_count INT NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_pending
ON outbox_event (next_attempt_at) WHERE status = 'PENDING';
CREATE TABLE processed_event (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
status = 'PENDING'に絞った部分インデックスでPollerのクエリを軽くしつつ、idをそのままKafkaメッセージのヘッダに載せてコンシューマがdedupできるようにします。payloadはJSONBにしておくとイベントスキーマの進化に追従しやすいです。
ビジネス処理側でOutboxへ書き込む
エンティティは普通のJPAでOKです。Hibernate 6でJSONBを扱うなら@JdbcTypeCode(SqlTypes.JSON)が推奨ですが、シンプルにTEXT扱いでStringに入れてもアプリ側のJSON直列化で十分動きます。
@Entity
@Table(name = "outbox_event")
@Getter @Setter
public class OutboxEvent {
@Id private UUID id;
private String aggregateType;
private String aggregateId;
private String eventType;
@JdbcTypeCode(SqlTypes.JSON)
private String payload;
private String status;
private int retryCount;
private OffsetDateTime nextAttemptAt;
private OffsetDateTime createdAt;
private OffsetDateTime publishedAt;
}
サービス側では、ビジネスデータの保存と同じトランザクション内でOutboxへsaveします。createdAtなどはDDL側のDEFAULT now()に任せてアプリでは詰めない方針で統一すると、初心者がハマりにくいです。
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
@Transactional
public void placeOrder(OrderCommand cmd) throws JsonProcessingException {
Order order = orderRepository.save(new Order(cmd));
OutboxEvent ev = new OutboxEvent();
ev.setId(UUID.randomUUID());
ev.setAggregateType("Order");
ev.setAggregateId(order.getId().toString());
ev.setEventType("OrderCreated");
ev.setPayload(objectMapper.writeValueAsString(order));
ev.setStatus("PENDING");
outboxRepository.save(ev);
}
}
@ScheduledなPollerでリレーする
ここが本記事の肝です。PENDINGレコードを定期的に拾ってKafkaへ送ります。重要なのは「送信成功を同期的に待ってから同一トランザクション内でstatusを更新する」点です。whenCompleteの非同期コールバックでDB更新を書くと、コールバックは別スレッドかつFOR UPDATEのロックが解放された後のオートコミット扱いになり、設計意図と挙動がズレるので避けます。
@Component
@RequiredArgsConstructor
public class OutboxPoller {
private final JdbcTemplate jdbc;
private final KafkaTemplate<String, String> kafkaTemplate;
private static final RowMapper<OutboxEvent> MAPPER = (rs, i) -> {
OutboxEvent e = new OutboxEvent();
e.setId(UUID.fromString(rs.getString("id")));
e.setAggregateId(rs.getString("aggregate_id"));
e.setEventType(rs.getString("event_type"));
e.setPayload(rs.getString("payload"));
e.setRetryCount(rs.getInt("retry_count"));
return e;
};
@Scheduled(fixedDelay = 1000)
@Transactional
public void relay() {
List<OutboxEvent> rows = jdbc.query("""
SELECT id, aggregate_id, event_type, payload, retry_count
FROM outbox_event
WHERE status = 'PENDING' AND next_attempt_at <= now()
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""", MAPPER);
for (OutboxEvent ev : rows) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"order.events", ev.getAggregateId(), ev.getPayload());
record.headers().add("event-id",
ev.getId().toString().getBytes(StandardCharsets.UTF_8));
try {
kafkaTemplate.send(record).get(5, TimeUnit.SECONDS);
jdbc.update(
"UPDATE outbox_event SET status='PUBLISHED', published_at=now() WHERE id=?",
ev.getId());
} catch (Exception e) {
int next = ev.getRetryCount() + 1;
long backoffSec = (long) Math.min(300, Math.pow(2, next));
String status = next >= 10 ? "FAILED" : "PENDING";
jdbc.update("""
UPDATE outbox_event
SET retry_count=?, next_attempt_at=now() + (? || ' seconds')::interval, status=?
WHERE id=?
""", next, backoffSec, status, ev.getId());
}
}
}
}
FOR UPDATE SKIP LOCKEDはPostgreSQL 9.5以降とMySQL 8で使える構文で、複数のアプリインスタンスが同時にPollerを動かしてもお互いに別レコードを掴むため衝突しません(PostgreSQLのSELECTドキュメント参照)。指数バックオフは2の冪乗で増やし、最大300秒・10回でデッドレター扱いのFAILEDに落としています。
at-least-onceと冪等コンシューマ
「Kafka送信成功 → status更新前にクラッシュ」というケースだと、同じイベントがもう一度送られます。これがat-least-onceの宿命です。コンシューマ側はevent_idでdedupするのが定石です。Producer側でevent-idヘッダを必ず付けているので、それを取り出します。
@KafkaListener(topics = "order.events")
@Transactional
public void onEvent(ConsumerRecord<String, String> rec) {
UUID eventId = UUID.fromString(new String(
rec.headers().lastHeader("event-id").value(), StandardCharsets.UTF_8));
if (processedEventRepository.existsById(eventId)) {
return;
}
// ... 業務処理 ...
processedEventRepository.save(new ProcessedEvent(eventId));
}
業務処理がUPSERTや状態遷移チェックで自然に冪等になっているなら、dedupテーブルは不要なこともあります。exactly-onceを実現したと誤解しないように。Outboxパターンはあくまで「DBとKafkaが食い違わない」ことの保証で、コンシューマ視点では重複が来うる、というのは押さえておきましょう。
Debezium方式との比較
もうひとつの選択肢が、DebeziumでDBのWAL/binlogを読んでKafkaへ流すCDC方式です。
- Pollerの良さ は構成のシンプルさ。Spring Bootアプリ単体で完結し、Kafka Connectクラスタを別途運用しなくていい
- Debeziumの良さ は低遅延と、アプリ側にスケジューラを持たなくていいこと。Outbox Event Routerでルーティングまで面倒を見てくれる
チームがすでにKafka Connectを運用しているならDebeziumに寄せると楽です。そうでないなら、まずはPoller方式で十分実用になります。
Testcontainersで動作確認
統合テストはTestcontainersでPostgreSQLとKafkaを立ち上げるのが手軽です。要点だけ示します。
@SpringBootTest
@Testcontainers
class OutboxIntegrationTest {
@Container static PostgreSQLContainer<?> pg = new PostgreSQLContainer<>("postgres:16");
@Container static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
@DynamicPropertySource
static void props(DynamicPropertyRegistry r) {
r.add("spring.datasource.url", pg::getJdbcUrl);
r.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired OrderService orderService;
@Autowired KafkaTestConsumer consumer;
@Test
void publishesAfterCommit() {
orderService.placeOrder(new OrderCommand("item-1"));
await().atMost(10, SECONDS).untilAsserted(() ->
assertThat(consumer.received()).hasSize(1));
}
@Test
void noPublishOnRollback() {
assertThrows(RuntimeException.class,
() -> orderService.placeOrderAndFail(new OrderCommand("item-2")));
await().during(3, SECONDS).untilAsserted(() ->
assertThat(consumer.received()).isEmpty());
}
}
ビジネス処理が例外でロールバックしたとき、Outboxにもレコードが残らず、当然Kafkaにも何も流れない。これがOutboxパターンが正しく動いていることの確認です。
運用で気をつけたいこと
Outboxテーブルはほっとくと肥大化します。PUBLISHED済みレコードは数日〜数週間で削除、または別アーカイブテーブルへ退避するジョブを用意しておきましょう。
監視は次の3つを見ておくと安心です。
- 未送信(PENDING)件数
- 最古PENDINGレコードの経過時間
- リトライ回数の分布
マルチインスタンスでPollerを動かす場合、SKIP LOCKEDがあれば基本足りますが、「1インスタンスだけ動かしたい」処理があるならShedLockも選択肢に入ります。
まとめ
Transactional Outboxパターンは、@Transactional内でKafkaを直接叩く落とし穴を、Outboxテーブルというワンクッションで解決する地味だが堅いやり方です。Pollerは数十行で書けますし、コンシューマ側を冪等にしておけば実運用にも耐えます。
関連トピックとして、補償トランザクションを扱うSagaパターンの解説、Kafka Producer/Consumerの基礎、Spring Bootのトランザクション管理、JVM内イベントのApplicationEvent入門もあわせてどうぞ。