DB更新したあとにKafkaへイベントを流す。よくある処理ですが、@Transactionalの中でkafkaTemplate.send()を素直に呼んでいませんか。これ、けっこう危ない実装なんですよね。

この記事では、DBとKafkaの二重書き込み問題(Dual Write)をTransactional Outboxパターンで解決する方法を、実コードと一緒に整理していきます。

Dual Write問題とは何か

問題のあるコードを先に見てしまいましょう。

@Transactional
public void placeOrder(OrderCommand cmd) {
    Order order = orderRepository.save(new Order(cmd));
    kafkaTemplate.send("order.created", order.getId().toString(), toJson(order));
}

一見問題なさそうですが、DBとKafkaという2つの外部システムにまたがる書き込みには原子性がありません。次の2パターンで整合性が壊れます。

  • kafkaTemplate.send()は成功したが、その後のDBコミットがロールバック。Kafka側にだけ「存在しない注文」のイベントが流れる
  • DBコミットは成功したが、kafkaTemplate.send()が直前で失敗。後続サービスに通知が届かない

JTA/XAで分散トランザクションを張れば理屈上は解決しますが、Kafkaは2PCに向きませんし、運用コストも重いので現実的ではありません。

Outboxパターンの全体像

発想はシンプルです。「Kafkaへ直接送るのをやめて、同じDBの中にあるOutboxテーブルに書き込む」だけ。

App ──▶ DB(business + outbox を同一トランザクション)


              Relay(Poller / CDC)


                   Kafka ──▶ Consumer

ビジネスデータとOutboxレコードを同一トランザクションでコミットすれば、両方一緒に成功するか、両方一緒に失敗します。そのあと別プロセスがOutboxを読んでKafkaへ流す、という二段構えです。

なお、この方式は at-least-once配信 が前提になります。コンシューマ側で重複を許容できる設計にしておきましょう。JVM内のイベント配信であるSpring ApplicationEventとは異なり、こちらはプロセス間・サービス間でイベントを確実に伝播させる仕組みです。

Outboxテーブルの設計

最低限こんな形でスタートできます。retry_countはリトライ制御用、PUBLISHED済みdedup用のprocessed_eventテーブルも一緒に載せておきます。

CREATE TABLE outbox_event (
  id             UUID PRIMARY KEY,
  aggregate_type VARCHAR(64)  NOT NULL,
  aggregate_id   VARCHAR(64)  NOT NULL,
  event_type     VARCHAR(64)  NOT NULL,
  payload        JSONB        NOT NULL,
  status         VARCHAR(16)  NOT NULL DEFAULT 'PENDING',
  retry_count    INT          NOT NULL DEFAULT 0,
  next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  created_at     TIMESTAMPTZ  NOT NULL DEFAULT now(),
  published_at   TIMESTAMPTZ
);
CREATE INDEX idx_outbox_pending
  ON outbox_event (next_attempt_at) WHERE status = 'PENDING';

CREATE TABLE processed_event (
  event_id     UUID PRIMARY KEY,
  processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

status = 'PENDING'に絞った部分インデックスでPollerのクエリを軽くしつつ、idをそのままKafkaメッセージのヘッダに載せてコンシューマがdedupできるようにします。payloadはJSONBにしておくとイベントスキーマの進化に追従しやすいです。

ビジネス処理側でOutboxへ書き込む

エンティティは普通のJPAでOKです。Hibernate 6でJSONBを扱うなら@JdbcTypeCode(SqlTypes.JSON)が推奨ですが、シンプルにTEXT扱いでStringに入れてもアプリ側のJSON直列化で十分動きます。

@Entity
@Table(name = "outbox_event")
@Getter @Setter
public class OutboxEvent {
    @Id private UUID id;
    private String aggregateType;
    private String aggregateId;
    private String eventType;
    @JdbcTypeCode(SqlTypes.JSON)
    private String payload;
    private String status;
    private int retryCount;
    private OffsetDateTime nextAttemptAt;
    private OffsetDateTime createdAt;
    private OffsetDateTime publishedAt;
}

サービス側では、ビジネスデータの保存と同じトランザクション内でOutboxへsaveします。createdAtなどはDDL側のDEFAULT now()に任せてアプリでは詰めない方針で統一すると、初心者がハマりにくいです。

@Service
@RequiredArgsConstructor
public class OrderService {
    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    @Transactional
    public void placeOrder(OrderCommand cmd) throws JsonProcessingException {
        Order order = orderRepository.save(new Order(cmd));
        OutboxEvent ev = new OutboxEvent();
        ev.setId(UUID.randomUUID());
        ev.setAggregateType("Order");
        ev.setAggregateId(order.getId().toString());
        ev.setEventType("OrderCreated");
        ev.setPayload(objectMapper.writeValueAsString(order));
        ev.setStatus("PENDING");
        outboxRepository.save(ev);
    }
}

@ScheduledなPollerでリレーする

ここが本記事の肝です。PENDINGレコードを定期的に拾ってKafkaへ送ります。重要なのは「送信成功を同期的に待ってから同一トランザクション内でstatusを更新する」点です。whenCompleteの非同期コールバックでDB更新を書くと、コールバックは別スレッドかつFOR UPDATEのロックが解放された後のオートコミット扱いになり、設計意図と挙動がズレるので避けます。

@Component
@RequiredArgsConstructor
public class OutboxPoller {
    private final JdbcTemplate jdbc;
    private final KafkaTemplate<String, String> kafkaTemplate;

    private static final RowMapper<OutboxEvent> MAPPER = (rs, i) -> {
        OutboxEvent e = new OutboxEvent();
        e.setId(UUID.fromString(rs.getString("id")));
        e.setAggregateId(rs.getString("aggregate_id"));
        e.setEventType(rs.getString("event_type"));
        e.setPayload(rs.getString("payload"));
        e.setRetryCount(rs.getInt("retry_count"));
        return e;
    };

    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void relay() {
        List<OutboxEvent> rows = jdbc.query("""
            SELECT id, aggregate_id, event_type, payload, retry_count
              FROM outbox_event
             WHERE status = 'PENDING' AND next_attempt_at <= now()
             ORDER BY created_at
             LIMIT 100
             FOR UPDATE SKIP LOCKED
            """, MAPPER);

        for (OutboxEvent ev : rows) {
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "order.events", ev.getAggregateId(), ev.getPayload());
            record.headers().add("event-id",
                ev.getId().toString().getBytes(StandardCharsets.UTF_8));
            try {
                kafkaTemplate.send(record).get(5, TimeUnit.SECONDS);
                jdbc.update(
                    "UPDATE outbox_event SET status='PUBLISHED', published_at=now() WHERE id=?",
                    ev.getId());
            } catch (Exception e) {
                int next = ev.getRetryCount() + 1;
                long backoffSec = (long) Math.min(300, Math.pow(2, next));
                String status = next >= 10 ? "FAILED" : "PENDING";
                jdbc.update("""
                    UPDATE outbox_event
                       SET retry_count=?, next_attempt_at=now() + (? || ' seconds')::interval, status=?
                     WHERE id=?
                    """, next, backoffSec, status, ev.getId());
            }
        }
    }
}

FOR UPDATE SKIP LOCKEDはPostgreSQL 9.5以降とMySQL 8で使える構文で、複数のアプリインスタンスが同時にPollerを動かしてもお互いに別レコードを掴むため衝突しません(PostgreSQLのSELECTドキュメント参照)。指数バックオフは2の冪乗で増やし、最大300秒・10回でデッドレター扱いのFAILEDに落としています。

at-least-onceと冪等コンシューマ

「Kafka送信成功 → status更新前にクラッシュ」というケースだと、同じイベントがもう一度送られます。これがat-least-onceの宿命です。コンシューマ側はevent_idでdedupするのが定石です。Producer側でevent-idヘッダを必ず付けているので、それを取り出します。

@KafkaListener(topics = "order.events")
@Transactional
public void onEvent(ConsumerRecord<String, String> rec) {
    UUID eventId = UUID.fromString(new String(
        rec.headers().lastHeader("event-id").value(), StandardCharsets.UTF_8));
    if (processedEventRepository.existsById(eventId)) {
        return;
    }
    // ... 業務処理 ...
    processedEventRepository.save(new ProcessedEvent(eventId));
}

業務処理がUPSERTや状態遷移チェックで自然に冪等になっているなら、dedupテーブルは不要なこともあります。exactly-onceを実現したと誤解しないように。Outboxパターンはあくまで「DBとKafkaが食い違わない」ことの保証で、コンシューマ視点では重複が来うる、というのは押さえておきましょう。

Debezium方式との比較

もうひとつの選択肢が、DebeziumでDBのWAL/binlogを読んでKafkaへ流すCDC方式です。

  • Pollerの良さ は構成のシンプルさ。Spring Bootアプリ単体で完結し、Kafka Connectクラスタを別途運用しなくていい
  • Debeziumの良さ は低遅延と、アプリ側にスケジューラを持たなくていいこと。Outbox Event Routerでルーティングまで面倒を見てくれる

チームがすでにKafka Connectを運用しているならDebeziumに寄せると楽です。そうでないなら、まずはPoller方式で十分実用になります。

Testcontainersで動作確認

統合テストはTestcontainersでPostgreSQLとKafkaを立ち上げるのが手軽です。要点だけ示します。

@SpringBootTest
@Testcontainers
class OutboxIntegrationTest {
    @Container static PostgreSQLContainer<?> pg = new PostgreSQLContainer<>("postgres:16");
    @Container static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));

    @DynamicPropertySource
    static void props(DynamicPropertyRegistry r) {
        r.add("spring.datasource.url", pg::getJdbcUrl);
        r.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }

    @Autowired OrderService orderService;
    @Autowired KafkaTestConsumer consumer;

    @Test
    void publishesAfterCommit() {
        orderService.placeOrder(new OrderCommand("item-1"));
        await().atMost(10, SECONDS).untilAsserted(() ->
            assertThat(consumer.received()).hasSize(1));
    }

    @Test
    void noPublishOnRollback() {
        assertThrows(RuntimeException.class,
            () -> orderService.placeOrderAndFail(new OrderCommand("item-2")));
        await().during(3, SECONDS).untilAsserted(() ->
            assertThat(consumer.received()).isEmpty());
    }
}

ビジネス処理が例外でロールバックしたとき、Outboxにもレコードが残らず、当然Kafkaにも何も流れない。これがOutboxパターンが正しく動いていることの確認です。

運用で気をつけたいこと

Outboxテーブルはほっとくと肥大化します。PUBLISHED済みレコードは数日〜数週間で削除、または別アーカイブテーブルへ退避するジョブを用意しておきましょう。

監視は次の3つを見ておくと安心です。

  • 未送信(PENDING)件数
  • 最古PENDINGレコードの経過時間
  • リトライ回数の分布

マルチインスタンスでPollerを動かす場合、SKIP LOCKEDがあれば基本足りますが、「1インスタンスだけ動かしたい」処理があるならShedLockも選択肢に入ります。

まとめ

Transactional Outboxパターンは、@Transactional内でKafkaを直接叩く落とし穴を、Outboxテーブルというワンクッションで解決する地味だが堅いやり方です。Pollerは数十行で書けますし、コンシューマ側を冪等にしておけば実運用にも耐えます。

関連トピックとして、補償トランザクションを扱うSagaパターンの解説Kafka Producer/Consumerの基礎Spring Bootのトランザクション管理、JVM内イベントのApplicationEvent入門もあわせてどうぞ。