マイクロサービスを構築していると、サービス間の連携設計で悩むことがありますよね。REST APIによる同期通信だと、受信側がダウンしたときに送信側まで影響を受けてしまいます。そういう場面でKafkaを使った非同期メッセージングが効いてきます。

この記事では spring-kafka を使って、KafkaのProducerとConsumerを Spring Boot 3.x アプリにゼロから実装する手順を解説します。KafkaTemplate@KafkaListener の基本から、エラーハンドリング・リトライ・Dead Letter Topic(DLT)の設定まで一本道で押さえます。なお、@Async(プロセス内スレッド非同期)との使い分けは非同期処理ガイドを参照してください。

Kafkaほど大規模ではないJVMプロセス内のイベント駆動設計を検討する場合は、ApplicationEventによる疎結合化ガイドSpring Modulithでモジュラーモノリスを実現する方法もあわせて検討すると、ユースケースに応じた選択肢を整理できます。

ローカル環境:Docker ComposeでKafkaを起動する

ZookeeperなしのKRaft構成で最短に書けます。

# docker-compose.yml
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: [email protected]:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"

docker compose up -d で起動したら準備完了です。Dockerfileの書き方やマルチステージビルドについてはDockerコンテナ化ガイドも参照してください。

依存追加とapplication.ymlの設定

// build.gradle
dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
}

Spring Boot管理のBOMに含まれているのでバージョン指定は不要です。

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-app-group
      # earliest はConsumerグループ未登録時にパーティション先頭から読む。本番では latest(最新から開始)が一般的
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

まずは StringSerializer / StringDeserializer で始めるのが無難です。JSONオブジェクトを扱いたいときは JsonSerializer に切り替えましょう。

KafkaTemplateでメッセージを送信する

POJOをJsonSerializerで送る

文字列だけでなくPOJOを送りたいケースは多いはずです。JsonSerializer / JsonDeserializer に切り替えれば、ドメインオブジェクトをそのまま送受信できます。

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "com.example.order"
        spring.json.value.default.type: "com.example.order.OrderEvent"
public record OrderEvent(String orderId, String item, int quantity) {}

@Service
@RequiredArgsConstructor
public class OrderEventProducer {
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void send(OrderEvent event) {
        kafkaTemplate.send("orders", event.orderId(), event);
    }
}

spring.json.trusted.packages必ず明示 してください。* を指定すると任意クラスへのデシリアライズが許可され、untrusted ペイロードからのRCEリスクがあります。

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendOrder(String orderId, String payload) {
        kafkaTemplate.send("orders", orderId, payload)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("送信失敗 orderId={}", orderId, ex);
                } else {
                    log.info("送信成功 offset={}",
                        result.getRecordMetadata().offset());
                }
            });
    }
}

send()CompletableFuture を返します。whenComplete でコールバックを登録して、送信成功・失敗をログに残しておきましょう。

@KafkaListenerでメッセージを受信する

concurrencyで並列消費する

デフォルトの @KafkaListener は1スレッドで動きます。パーティションが複数あるトピックではコンシューマー側を並列化してスループットを上げられます。

@KafkaListener(topics = "orders", groupId = "my-app-group", concurrency = "3")
public void consume(String message) {
    log.info("受信: {}", message);
}

concurrency の値は パーティション数を上限 に設定するのが基本です。パーティション数を超えると余剰スレッドはアイドル状態になります。

手動コミット(AckMode)で確実に処理する

デフォルトはオフセットの自動コミットですが、「DBへの書き込みが成功したときだけコミットしたい」というシナリオでは手動コミットに切り替えます。

spring:
  kafka:
    consumer:
      enable-auto-commit: false
    listener:
      ack-mode: MANUAL_IMMEDIATE
@KafkaListener(topics = "orders", groupId = "my-app-group")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
    try {
        orderService.save(record.value());
        ack.acknowledge();
    } catch (Exception e) {
        log.error("処理失敗 - コミットせず再配信を待つ", e);
        throw e;
    }
}

MANUAL_IMMEDIATE は呼び出し時に即コミット、MANUAL はポーリングサイクル終端でまとめてコミットします。At-Least-Once を保証したい場合は手動コミット + 冪等な処理設計がセオリーです。

@Component
@Slf4j
public class OrderConsumer {

    // ペイロードだけ必要な場合
    @KafkaListener(topics = "orders", groupId = "my-app-group")
    public void consume(String message) {
        log.info("受信: {}", message);
    }

    // メタデータ(offset・partition)が必要な場合
    @KafkaListener(topics = "orders-detail", groupId = "my-app-group")
    public void consumeWithMeta(ConsumerRecord<String, String> record) {
        log.info("key={}, partition={}, offset={}",
            record.key(), record.partition(), record.offset());
    }
}

ペイロードだけ処理するなら引数直接受け取りが読みやすいです。オフセット・パーティション・タイムスタンプなどのメタデータが必要なら ConsumerRecord を使いましょう。

動作確認

REST APIエンドポイント経由でProducerを呼び出して確認します。

curl -X POST http://localhost:8080/orders \
  -H "Content-Type: application/json" \
  -d '{"orderId":"001","item":"coffee"}'

Consumerのログにメッセージが出力されれば成功です。自動テストには @EmbeddedKafka が便利です。@TestPropertySourcespring.embedded.kafka.brokers を指定するのが必須で、これがないと application.ymllocalhost:9092 を参照して接続エラーになります。

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"orders"})
@TestPropertySource(properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
class OrderConsumerTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @SpyBean
    private OrderConsumer orderConsumer;

    @Test
    void メッセージを受信できる() {
        kafkaTemplate.send("orders", "key1", "test-message");
        verify(orderConsumer, timeout(10000)).consume(any());
    }
}

@SpyBeanverify() を使うことで、CountDownLatch のような状態管理をConsumerクラス自体に持ち込まずに受信確認できます。

DefaultErrorHandlerでエラーハンドリングとリトライを設定する

デフォルト設定のままだと処理例外が発生したときに無限リトライが走り、コンシューマーが詰まります。DefaultErrorHandler を明示的に設定しましょう。

import org.springframework.kafka.support.serializer.DeserializationException;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
            kafkaListenerContainerFactory(
                ConsumerFactory<String, String> consumerFactory,
                KafkaTemplate<String, String> kafkaTemplate) {

        // 1秒間隔・最大3回リトライ(計4回試行)
        var backOff = new FixedBackOff(1000L, 3L);
        var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
        var errorHandler = new DefaultErrorHandler(recoverer, backOff);

        // Jacksonの同名クラスではなくspring-kafka由来のDeserializationExceptionを使うこと
        errorHandler.addNotRetryableExceptions(DeserializationException.class);

        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory);
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
}

指数バックオフを使いたい場合は FixedBackOff の代わりに ExponentialBackOff を渡します。

var backOff = new ExponentialBackOff(1000L, 2.0); // 1秒→2秒→4秒...
backOff.setMaxAttempts(4);

REST APIでの例外設計との比較は例外ハンドリングガイドも参考にしてみてください。

Dead Letter Topic(DLT)の監視

リトライ上限を超えたメッセージは自動的に {元のトピック名}.DLT へ転送されます。orders トピックなら orders.DLT です。

@KafkaListener(topics = "orders.DLT", groupId = "my-app-dlt-group")
public void consumeDlt(ConsumerRecord<String, String> record) {
    log.error("DLT受信 - 手動対応が必要 key={}, value={}",
        record.key(), record.value());
    // アラート送信・管理DBへの記録など
}

DLTに蓄積されたメッセージは、障害原因を修正してオリジナルトピックへ再キューするか、手動処理するかを設計段階で決めておきましょう。

本番適用時に意識しておきたいこと

SSL/SASL認証の設定例

マネージドKafka(Confluent Cloud / MSK等)への接続には認証設定が必須です。

spring:
  kafka:
    bootstrap-servers: pkc-xxxx.confluent.cloud:9092
    security:
      protocol: SASL_SSL
    properties:
      sasl.mechanism: PLAIN
      sasl.jaas.config: |
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="${KAFKA_API_KEY}"
        password="${KAFKA_API_SECRET}";

資格情報は環境変数またはJasyptによる暗号化で管理し、application.yml への平文記載は避けてください。

パーティション数とコンシューマー数の関係

グループ内のコンシューマーインスタンス数がパーティション数を超えると、余剰インスタンスは何も処理しません。スループット設計では「パーティション数 ≧ コンシューマー数」を守った上で、concurrency 設定とあわせてスループット見積りを行いましょう。

コンシューマーラグの監視

手元での確認は kafka-consumer-groups.sh --describe が基本です。

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-app-group

本番では Prometheus + kafka_exporter、または Spring Boot Actuator のMicrometerメトリクス(kafka.consumer.records.lag.max)を Grafana で可視化し、ラグが閾値を超えたらアラートを発火させる構成が定番です。

まとめ

spring-kafkaを使えばSpring BootとKafkaの統合はスムーズに進みます。

  • KafkaTemplate.send() でProducer実装
  • @KafkaListener でConsumer実装
  • DefaultErrorHandler + DeadLetterPublishingRecoverer でリトライとDLT転送

この3点を押さえれば、サービス間の非同期連携の基盤が整います。KafkaとSpring Batchを組み合わせた大量データ処理についてはSpring Batchガイドも参考にしてみてください。

関連記事