数万件、数百万件といった大量のデータを処理するバッチ処理を実装する場面は、業務システムでよくあります。@Scheduledによる定期実行は知っているけど、大量データを効率的に処理する方法がわからない、そんな悩みを持つ方も多いでしょう。

Spring Batchは、こうした大量データ処理に特化したフレームワークです。この記事では、Spring Batchの基本構成から実際の実装方法まで、実例を交えながら解説します。

Spring Batchとは

Spring Batchを使うべき場面と使わない場面

Spring Batchはあらゆる定期処理に向いているわけではありません。次のような場面で真価を発揮します。

  • 大量データ (数万〜数百万件) を1回の処理で扱う
  • 失敗時に途中から再実行したい
  • chunk単位でトランザクションを区切りたい
  • skip / retry など堅牢なエラーハンドリングが必要

一方、件数が少ない処理や即時応答が必要な処理では、@Scheduled + 通常のサービスメソッドで十分です。リアルタイム性が必要なストリーミング処理は Kafka などのメッセージング基盤が適しています。

他のスケジューラ・ETLツールとの比較

ツール適した用途Spring Batchとの違い
@Scheduled軽量な定期処理大量データ処理・再実行制御がない
Quartz複雑なスケジューリングデータ処理機能は持たない
Apache Airflow / EmbulkETL / ワークフローJVM外、運用基盤が別途必要
Spring BatchJVM内の大量データバッチSpring Bootと親和性が高くJobRepositoryで状態管理

Spring アプリ内で完結する大量データ処理であれば、Spring Batch が最も少ない運用コストで導入できます。

Spring Batchは、大量データの読み込み・加工・書き込みに最適化されたフレームワークです。@Scheduledが「いつ実行するか」を制御するのに対し、Spring Batchは「どうやって大量データを処理するか」を提供します。

Spring Batchには次の特徴があります。

  • chunk処理 によるメモリ効率の良いデータ処理
  • chunk単位での トランザクション管理
  • リトライ・スキップ などのエラーハンドリング機能が標準搭載
  • 処理の再実行制御やメタデータ管理

メモリに全データを載せることなく、数百万件のデータを安全に処理できます。

Spring Batchの4つの主要コンポーネント

Spring Batchは以下のコンポーネントで構成されます。

  • Job - バッチ処理全体を表す最上位の概念
  • Step - Jobを構成する処理単位(1つのJobに複数のStepを定義可能)
  • ItemReader - データソースから1件ずつデータを読み込む
  • ItemProcessor - 読み込んだデータを加工・変換する(省略可能)
  • ItemWriter - 処理済みデータをまとめて書き込む

基本的な流れは「ItemReaderで読む → ItemProcessorで加工 → ItemWriterで書く」です。このサイクルをchunk単位で繰り返します。

chunk処理の仕組み

chunk処理は、Spring Batchの中核となる仕組みです。指定したchunkサイズ分のデータを読み込んでから、一括で書き込みます。

例えばchunkサイズを100に設定した場合は次のように動作します。

  1. ItemReaderで100件読み込む
  2. ItemProcessorで100件を加工
  3. ItemWriterで100件を一括書き込み
  4. トランザクションをコミット

1chunk = 1トランザクション という関係になっているため、chunk単位でコミット・ロールバックが行われます。

chunkサイズの目安は100〜1000件程度です。大きすぎるとメモリ不足、小さすぎると性能が落ちるため、データ特性に応じて調整しましょう。

依存関係の追加

まずはSpring Batchの依存関係を追加します。

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    runtimeOnly 'com.h2database:h2'
}

Spring BatchはJobRepositoryという仕組みでバッチ処理のメタデータを管理するため、データソースが必要です。開発時はH2、本番ではPostgreSQLやMySQLなどを使います。

Spring Boot 3.x以降の注意点 として、@EnableBatchProcessingは基本的に 不要 です。Spring Boot 3.x以降は自動設定でBatch機能が有効化されるため、デフォルト設定で問題ない場合は@EnableBatchProcessingを付ける必要はありません。カスタム設定が必要な場合のみ使用します。

なお、Spring Batch 5.0以降、JobBuilderFactory/StepBuilderFactoryは非推奨となり、JobBuilder/StepBuilderを直接使う形に変更されました。この記事では新しい記法を使用します。

CSVファイルからDBへのデータ投入

まずは基本的な例として、CSVファイルを読み込んでDBに投入する処理を実装してみましょう。

対象となるエンティティクラスは次のとおりです。

public class User {
    private Long id;
    private String name;
    private String email;
    // getter/setter省略
}

FlatFileItemReaderでCSVを読み込む

@Bean
public FlatFileItemReader<User> csvReader() {
    return new FlatFileItemReaderBuilder<User>()
        .name("csvReader")
        .resource(new ClassPathResource("users.csv"))
        .delimited()
        .names("id", "name", "email")
        .targetType(User.class)
        .build();
}

FlatFileItemReaderはCSVやTSVを読み込むためのItemReaderです。names()で列名を指定し、targetType()でマッピング先のクラスを指定すると、フィールド名が一致する場合は自動的にオブジェクトに変換されます。

より柔軟なマッピングが必要な場合は、FieldSetMapperを使ってマッピングロジックを自分で定義できます。

JdbcBatchItemWriterでDBに書き込む

@Bean
public JdbcBatchItemWriter<User> dbWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<User>()
        .dataSource(dataSource)
        .sql("INSERT INTO users (id, name, email) VALUES (:id, :name, :email)")
        .beanMapped()
        .build();
}

JdbcBatchItemWriterは、JDBCのバッチ更新機能を使って複数件を一括でINSERTします。beanMapped()を指定すると、エンティティのフィールド名とSQL名前付きパラメータが一致する必要があります

JobとStepの定義

@Configuration
public class CsvImportJobConfig {

    @Bean
    public Job csvImportJob(JobRepository jobRepository, Step csvImportStep) {
        return new JobBuilder("csvImportJob", jobRepository)
            .start(csvImportStep)
            .build();
    }

    @Bean
    public Step csvImportStep(JobRepository jobRepository,
                              PlatformTransactionManager transactionManager,
                              FlatFileItemReader<User> csvReader,
                              JdbcBatchItemWriter<User> dbWriter) {
        return new StepBuilder("csvImportStep", jobRepository)
            .<User, User>chunk(100, transactionManager)
            .reader(csvReader)
            .writer(dbWriter)
            .build();
    }
}

JobとStepを組み立てます。Spring Boot 3.x以降、JobRepositoryとPlatformTransactionManagerはSpringが自動設定したBeanを引数として受け取りますchunk(100, transactionManager)で、100件ごとにchunk処理を行い、トランザクション管理を有効化しています。

DBからDBへの大量データ変換処理

次は、あるテーブルから読み込んでビジネスロジックで加工し、別のテーブルに書き込む例です。

public class OrderEntity {
    private Long id;
    private Long customerId;
    private BigDecimal amount;
    private String status;
    // getter/setter省略
}

public class ProcessedOrder {
    private Long orderId;
    private BigDecimal finalAmount;
    private String status;
    // getter/setter省略
}

JdbcCursorItemReaderでDBから読み込む

@Bean
public JdbcCursorItemReader<OrderEntity> orderReader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<OrderEntity>()
        .name("orderReader")
        .dataSource(dataSource)
        .sql("SELECT id, customer_id, amount, status FROM orders WHERE status = 'PENDING'")
        .rowMapper(new BeanPropertyRowMapper<>(OrderEntity.class))
        .build();
}

JdbcCursorItemReaderは、SQLのカーソルを使って1件ずつデータを読み込みます。メモリに全件載せることなく、大量データを順次処理できます。

ItemProcessorでデータを加工

@Component
public class OrderProcessor implements ItemProcessor<OrderEntity, ProcessedOrder> {

    @Override
    public ProcessedOrder process(OrderEntity order) throws Exception {
        ProcessedOrder processed = new ProcessedOrder();
        processed.setOrderId(order.getId());
        processed.setFinalAmount(order.getAmount().multiply(new BigDecimal("0.9")));
        processed.setStatus("PROCESSED");
        return processed;
    }
}

ItemProcessorで1件ずつビジネスロジックを適用します。ここでnullを返すと、その件はItemWriterに渡されずフィルタリングされます。

エラーハンドリング - skipとretry

JobExecutionListenerでJob実行の前後処理を行う

Jobの開始前と終了後に共通処理を挟みたい場合は JobExecutionListener を使います。実行時間の計測、Slack通知、メトリクス送信などに便利です。

@Component
public class LoggingJobListener implements JobExecutionListener {

    private static final Logger log = LoggerFactory.getLogger(LoggingJobListener.class);

    @Override
    public void beforeJob(JobExecution jobExecution) {
        log.info("Job開始: {} params={}", jobExecution.getJobInstance().getJobName(), jobExecution.getJobParameters());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        BatchStatus status = jobExecution.getStatus();
        Duration duration = Duration.between(jobExecution.getStartTime(), jobExecution.getEndTime());
        log.info("Job終了: status={} duration={}ms", status, duration.toMillis());
        if (status == BatchStatus.FAILED) {
            // 失敗時はアラート送信など
        }
    }
}

リスナーは JobBuilder.listener(listener) で登録します。Step単位で前後処理を挟みたい場合は StepExecutionListener を使います。

並列処理 - Multi-threaded StepとPartitioning

単一スレッドのchunk処理では処理時間が間に合わない場合、Spring Batchは大きく2つの並列化手段を提供します。

Multi-threaded Step(複数スレッドで同一Stepを処理)

@Bean
public Step multiThreadedStep(JobRepository jobRepository,
                              PlatformTransactionManager transactionManager,
                              ItemReader<User> reader,
                              ItemWriter<User> writer) {
    return new StepBuilder("multiThreadedStep", jobRepository)
        .<User, User>chunk(100, transactionManager)
        .reader(reader)
        .writer(writer)
        .taskExecutor(new SimpleAsyncTaskExecutor("batch-thread-"))
        .build();
}

taskExecutor を設定するだけでchunk処理が並列化されます。ただし ItemReader はスレッドセーフである必要がありますJdbcCursorItemReader などはスレッドセーフではないため、SynchronizedItemStreamReader でラップするか、JdbcPagingItemReader を使います。

Partitioning(データを分割して並列実行)

Partitioningは入力データを範囲分割し、複数のWorker Stepで並列実行する方式です。例えば「ID 1-10000はWorker1、10001-20000はWorker2」のように分けます。スループットが最も上がる反面、Partitioner の実装が必要です。大量データを定期処理する本番バッチではこちらが選ばれます。

リスタート(失敗したJobの途中再開)

Spring BatchはJobRepositoryに各Stepの進捗を記録するため、失敗したJobを 失敗箇所から再開 できます。

同一の JobParameters で再実行すると、Spring Batchは前回の JobExecution を参照し、完了済みStepはスキップ、失敗したStepの途中から処理を再開します。chunk処理の場合は最後にコミットされたchunkの次から再開されます。

ただし ItemReader が再開位置を保持するには、ExecutionContext に状態を保存する必要があります。FlatFileItemReaderJdbcCursorItemReader は標準でこの仕組みを備えています。自作の Reader を使う場合は ItemStream を実装してください。

Spring Batch Adminと運用監視

Spring Batch Admin はかつてJob実行状況をWeb UIで管理する公式プロジェクトでしたが、現在は非推奨 となっています。現代的な運用監視は次の方法が主流です。

  • Spring Boot Actuator + Micrometer でJob実行メトリクスをPrometheusに送信
  • JobRepository テーブル (BATCH_JOB_EXECUTION など) を直接SQLで監視
  • Spring Cloud Data Flow をJob基盤として導入

本番では BATCH_JOB_EXECUTIONSTATUS カラムを定期チェックし、FAILED を検知したらアラートを発火させるのが基本パターンです。

データ不正などで一部のレコードが処理できない場合でも、全体を止めずに処理を継続したいことがあります。skipを使うと、特定の例外が発生した件をスキップして次に進めます。

@Bean
public Step resilientStep(JobRepository jobRepository,
                          PlatformTransactionManager transactionManager,
                          ItemReader<User> reader,
                          ItemWriter<User> writer) {
    return new StepBuilder("resilientStep", jobRepository)
        .<User, User>chunk(100, transactionManager)
        .reader(reader)
        .writer(writer)
        .faultTolerant()
        .skip(ValidationException.class)
        .skipLimit(10)
        .retry(TransientDataAccessException.class)
        .retryLimit(3)
        .build();
}

faultTolerant()でエラーハンドリング機能を有効化します。skip()で無視する例外を指定し、skipLimit(10)で最大10件までスキップを許容します。

retry()はネットワークエラーなど一時的な障害に対するリトライ設定です。retryLimit(3)で最大3回までリトライします。skipとretryは組み合わせ可能です。

JobParametersによる実行制御

JobParametersを使うと、実行時にパラメータを渡して処理を柔軟に制御できます。

@Bean
@StepScope
public FlatFileItemReader<User> parameterizedReader(
        @Value("#{jobParameters['inputFile']}") String inputFile) {
    return new FlatFileItemReaderBuilder<User>()
        .name("parameterizedReader")
        .resource(new FileSystemResource(inputFile))
        .delimited()
        .names("id", "name", "email")
        .targetType(User.class)
        .build();
}

@StepScopeを付けることで、Step実行時にBeanが生成される遅延評価により、JobParametersの値を受け取れます。これにより、実行ごとに異なるファイルを処理できます。

JobParametersは実行の識別にも使われます。同じJobParametersでは同じJobインスタンスとして扱われるため、正常終了したJobは再実行できません。再実行したい場合は、パラメータを変えるか、RunIdIncrementerを使って自動的にパラメータを変える必要があります。

バッチの実行方法

実装したバッチを実行する方法はいくつかあります。

@Scheduledと組み合わせて定期実行

@Component
public class BatchScheduler {

    private final JobLauncher jobLauncher;
    private final Job csvImportJob;

    public BatchScheduler(JobLauncher jobLauncher, Job csvImportJob) {
        this.jobLauncher = jobLauncher;
        this.csvImportJob = csvImportJob;
    }

    @Scheduled(cron = "0 0 2 * * *")
    public void runBatch() throws Exception {
        JobParameters params = new JobParametersBuilder()
            .addLong("time", System.currentTimeMillis())
            .toJobParameters();
        
        jobLauncher.run(csvImportJob, params);
    }
}

@ScheduledJobLauncherを組み合わせると、定期実行が簡単に実現できます。毎回異なるパラメータ(現在時刻など)を渡すことで、再実行可能にしています。

実装時の注意点

DBコネクションプールの設定を見直す - 大量データ処理ではchunkごとにコネクションを取得します。デフォルト設定のままだとプール枯渇を招くため、HikariCPのチューニングを併せて確認しましょう。

長時間バッチではグレースフルシャットダウンに注意 - Kubernetesのローリングアップデート中にバッチが強制終了されると中途半端な状態が残ります。グレースフルシャットダウン設定で安全に止められるようにしておくと安心です。

Job失敗時の通知は例外ハンドラと組み合わせる - 失敗をログだけに頼ると気づきが遅れます。GlobalExceptionHandlerの本番運用パターンを参考に、JobExecutionListenerからアラートを飛ばす設計にしておくと運用が楽になります。

リアルタイム連携が必要であれば、バッチではなくKafkaのProducer/Consumerによる非同期メッセージングも検討してみてください。

実務で使う際の注意点をいくつか挙げておきます。

chunkサイズは100件から試す - データ特性に応じて調整が必要ですが、まずは100件から始めるのが無難です。

大量データ処理ではログ出力量に注意 - 全件ログを出すとログファイルが肥大化します。1000件ごとなど間引いてログ出力しましょう。

本番ではJobRepositoryに永続化DBを使う - H2は開発用と割り切り、本番ではPostgreSQLなどを使いましょう。初回実行時にはSpring Batchのメタデータテーブルが自動作成されます。

まとめ

Spring Batchを使えば、大量データを安全かつ効率的に処理できます。Job/Step/ItemReader/ItemWriterという4つのコンポーネントを理解し、chunk処理の仕組みを押さえれば、実用的なバッチ処理が実装できます。

トランザクション管理やエラーハンドリングも標準で備わっているため、安心して業務システムに組み込めます。まずは小さなバッチから始めて、徐々に複雑な処理に挑戦してみてください。