LargeCsvImportJob - Spring Batch 大型 CSV 檔案匯入作業說明

概述

這是一個基於 Spring Batch 框架的批次處理作業,專門用於處理大型 CSV 檔案的匯入任務。採用串流式讀取和批次寫入的方式,確保記憶體使用量固定在 chunk size 大小,有效避免因大檔案導致的 OOM (Out of Memory) 問題。

Spring Batch 核心架構

1. Job(作業)

  • 名稱: csvImportJob
  • 功能: 批次處理的最上層單位,代表一個完整的批次作業流程
  • 組成: 由一個或多個 Step 組成
  • 本案例: 包含一個 Step (csvImportStep)

2. Step(步驟)

  • 名稱: csvImportStep
  • 類型: Chunk-oriented Step(區塊導向步驟)
  • 功能: 實際執行資料處理的單位
  • 流程: Reader → Processor → Writer

三大核心元件

1. ItemReader(讀取器)- csvReader

功能說明

負責從 CSV 檔案中**串流式讀取**資料,一次只讀取一筆記錄到記憶體中。

實作細節

@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
        @Value("#{jobParameters['inputFile']}") String inputFile)

關鍵特性

  • @StepScope: 使 Bean 在每次 Step 執行時才建立,允許使用 Job Parameters
  • 動態參數: 透過 #{jobParameters['inputFile']} 在執行時動態傳入檔案路徑
  • 串流讀取: 使用 FlatFileItemReader,逐行讀取,不將整個檔案載入記憶體
  • 欄位對應: 透過 .names("column1", "column2", "column3") 定義 CSV 欄位
  • 跳過標題: .linesToSkip(1) 自動跳過第一行標題
  • 自動轉換: 使用 BeanWrapperFieldSetMapper 將 CSV 欄位自動對應到 CsvRecord 物件

運作流程

CSV 檔案 → 逐行讀取 → 解析欄位 → 轉換為 CsvRecord 物件 → 傳給 Processor

支援的資料來源類型

Spring Batch 的 Reader 支援多種資料來源,不限於本地檔案系統:

1. FileSystemResource - 本地檔案系統(預設)
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
        @Value("#{jobParameters['inputFile']}") String inputFile) {
    return new FlatFileItemReaderBuilder<CsvRecord>()
            .name("csvReader")
            .resource(new FileSystemResource(inputFile))  // 本地檔案
            .delimited()
            .names("column1", "column2", "column3")
            .linesToSkip(1)
            .fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
                setTargetType(CsvRecord.class);
            }})
            .build();
}

適用情境: - ✅ 檔案已經在本地磁碟 - ✅ 小型檔案(幾百 MB 以內) - ✅ 無需擔心網路問題

限制: - ❌ 需要先下載大檔案到本地 - ❌ 佔用本地磁碟空間 - ❌ 無法直接讀取雲端儲存


2. UrlResource - 支援 HTTP/HTTPS
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
        @Value("#{jobParameters['inputUrl']}") String inputUrl) {
    return new FlatFileItemReaderBuilder<CsvRecord>()
            .name("csvReader")
            .resource(new UrlResource(inputUrl))  // HTTP/HTTPS URL
            .delimited()
            .names("column1", "column2", "column3")
            .linesToSkip(1)
            .fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
                setTargetType(CsvRecord.class);
            }})
            .build();
}

適用情境: - ✅ 檔案託管在 Web Server - ✅ 臨時的公開連結

限制: - ❌ 不支援斷點續傳 - ❌ 網路中斷會導致整個 Chunk 失敗 - ❌ 需要持續的網路連線


3. Azure Blob Storage(推薦用於雲端場景)

需要加入 Azure Storage 依賴:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-storage-blob</artifactId>
    <version>12.25.0</version>
</dependency>
<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-storage-blob</artifactId>
    <version>5.8.0</version>
</dependency>

方式 1: 使用 Azure Spring Cloud 的 Resource Loader(推薦)

@Configuration
@RequiredArgsConstructor
public class LargeCsvImportJob {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final ResourceLoader resourceLoader;  // Spring 自動注入

    @Bean
    @StepScope
    public FlatFileItemReader<CsvRecord> csvReader(
            @Value("#{jobParameters['inputFile']}") String blobUrl) {

        // blobUrl 格式: azure-blob://container-name/path/to/file.csv
        Resource resource = resourceLoader.getResource(blobUrl);

        return new FlatFileItemReaderBuilder<CsvRecord>()
                .name("csvReader")
                .resource(resource)  // 支援 Azure Blob
                .delimited()
                .names("column1", "column2", "column3")
                .linesToSkip(1)
                .fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
                    setTargetType(CsvRecord.class);
                }})
                .build();
    }
}

application.properties 配置:

# Azure Storage 連線設定
spring.cloud.azure.storage.blob.account-name=your-storage-account
spring.cloud.azure.storage.blob.account-key=your-account-key
# 或使用 Managed Identity
spring.cloud.azure.storage.blob.credential.managed-identity-enabled=true

執行時傳入參數:

JobParameters params = new JobParametersBuilder()
    .addString("inputFile", "azure-blob://my-container/data/large-file.csv")
    .addDate("date", new Date())
    .toJobParameters();

方式 2: 自訂 Azure Blob Resource

@Component
public class AzureBlobResource extends AbstractResource {

    private final BlobClient blobClient;
    private final String blobUrl;

    public AzureBlobResource(String connectionString, String containerName, String blobName) {
        BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
                .connectionString(connectionString)
                .buildClient();

        this.blobClient = blobServiceClient
                .getBlobContainerClient(containerName)
                .getBlobClient(blobName);

        this.blobUrl = blobClient.getBlobUrl();
    }

    @Override
    public InputStream getInputStream() throws IOException {
        return blobClient.openInputStream();
    }

    @Override
    public String getDescription() {
        return "Azure Blob Storage resource [" + blobUrl + "]";
    }

    @Override
    public long contentLength() throws IOException {
        return blobClient.getProperties().getBlobSize();
    }

    @Override
    public boolean exists() {
        return blobClient.exists();
    }
}

使用方式:

@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
        @Value("#{jobParameters['containerName']}") String containerName,
        @Value("#{jobParameters['blobName']}") String blobName,
        @Value("${azure.storage.connection-string}") String connectionString) {

    Resource resource = new AzureBlobResource(connectionString, containerName, blobName);

    return new FlatFileItemReaderBuilder<CsvRecord>()
            .name("csvReader")
            .resource(resource)
            .delimited()
            .names("column1", "column2", "column3")
            .linesToSkip(1)
            .fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
                setTargetType(CsvRecord.class);
            }})
            .build();
}

4. AWS S3(適用於 AWS 環境)
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-s3</artifactId>
    <version>1.12.565</version>
</dependency>
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
        @Value("#{jobParameters['s3Bucket']}") String bucket,
        @Value("#{jobParameters['s3Key']}") String key,
        AmazonS3 s3Client) {

    S3Object s3Object = s3Client.getObject(bucket, key);
    InputStreamResource resource = new InputStreamResource(s3Object.getObjectContent());

    return new FlatFileItemReaderBuilder<CsvRecord>()
            .name("csvReader")
            .resource(resource)
            .delimited()
            .names("column1", "column2", "column3")
            .linesToSkip(1)
            .fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
                setTargetType(CsvRecord.class);
            }})
            .build();
}

網路異常處理機制

當使用雲端儲存(Blob、S3、URL)時,網路中途異常的處理策略:

問題分析
情境:正在讀取 Azure Blob 上的 100 萬筆 CSV

Chunk 1 (0-999):     ✅ 讀取成功,寫入資料庫
Chunk 2 (1000-1999): 讀取中... ❌ 網路中斷!

問題:
1. Reader 無法繼續讀取資料
2. 當前 Chunk 會失敗並回滾
3. Job 執行狀態變為 FAILED
解決方案 1: Reader 層級重試(推薦)

在 Reader 建立時啟用重試機制:

@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
        @Value("#{jobParameters['inputFile']}") String blobUrl,
        ResourceLoader resourceLoader) {

    // 使用重試模板包裝 Resource 讀取
    RetryTemplate retryTemplate = RetryTemplate.builder()
            .maxAttempts(5)                          // 最多重試 5 次
            .exponentialBackoff(1000, 2, 30000)      // 1秒起始,每次×2,最長30秒
            .retryOn(IOException.class)              // 網路異常時重試
            .retryOn(BlobStorageException.class)     // Blob 存取異常時重試
            .build();

    Resource resource = retryTemplate.execute(context -> 
        resourceLoader.getResource(blobUrl)
    );

    return new FlatFileItemReaderBuilder<CsvRecord>()
            .name("csvReader")
            .resource(resource)
            .delimited()
            .names("column1", "column2", "column3")
            .linesToSkip(1)
            .fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
                setTargetType(CsvRecord.class);
            }})
            .build();
}

解決方案 2: Step 層級重試 + Reader 異常處理
@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
                          ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
                          JdbcBatchItemWriter<CsvRecord> csvWriter) {
    return stepBuilderFactory.get("csvImportStep")
            .<CsvRecord, CsvRecord>chunk(1000)
            .reader(csvReader)
            .processor(csvProcessor)
            .writer(csvWriter)

            // 容錯設定
            .faultTolerant()

            // Reader 讀取失敗時重試
            .retryLimit(5)
            .retry(IOException.class)                    // 網路 I/O 異常
            .retry(BlobStorageException.class)           // Azure Blob 異常
            .retry(AmazonS3Exception.class)              // AWS S3 異常
            .retry(SocketTimeoutException.class)         // Socket 逾時
            .backOffPolicy(exponentialBackOffPolicy())   // 指數退避

            // 重試仍失敗後的處理
            .skipLimit(10)                               // 最多跳過 10 筆
            .skip(FlatFileParseException.class)          // 解析錯誤可跳過

            // 致命錯誤不重試,直接失敗
            .noRetry(OutOfMemoryError.class)
            .noRetry(NonTransientDataAccessException.class)

            .listener(retryListener())
            .build();
}

@Bean
public BackOffPolicy exponentialBackOffPolicy() {
    ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
    policy.setInitialInterval(2000);      // 網路問題初始等待 2 秒
    policy.setMaxInterval(60000);         // 最長等待 60 秒
    policy.setMultiplier(3.0);            // 每次等待時間 × 3
    return policy;
}

@Component
public static class NetworkRetryListener implements RetryListener {

    @Override
    public <T, E extends Throwable> void onError(
            RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {

        int retryCount = context.getRetryCount();
        log.warn("網路讀取失敗,第 {} 次重試: {}", retryCount, throwable.getMessage());

        // 重試次數過多時發送告警
        if (retryCount >= 3) {
            alertService.sendAlert(
                String.format("CSV Reader retry count: %d, error: %s", 
                              retryCount, throwable.getMessage())
            );
        }
    }
}

解決方案 3: 使用 Azure Blob 的斷點續傳功能

Azure Blob Storage SDK 支援斷點續傳,可以從中斷點繼續讀取:

@Component
public class ResumableAzureBlobResource extends AbstractResource {

    private final BlobClient blobClient;
    private final String blobUrl;
    private long currentPosition = 0;  // 記錄當前讀取位置

    public ResumableAzureBlobResource(BlobClient blobClient) {
        this.blobClient = blobClient;
        this.blobUrl = blobClient.getBlobUrl();
    }

    @Override
    public InputStream getInputStream() throws IOException {
        try {
            // 支援斷點續傳的 InputStream
            BlobInputStream blobInputStream = blobClient.openInputStream();

            // 如果有記錄位置,跳到該位置
            if (currentPosition > 0) {
                blobInputStream.skip(currentPosition);
            }

            return new MonitoredInputStream(blobInputStream, this);

        } catch (BlobStorageException e) {
            // 網路異常時重試
            log.warn("Blob 讀取失敗,準備重試: {}", e.getMessage());
            throw new IOException("Failed to open blob stream", e);
        }
    }

    public void updatePosition(long position) {
        this.currentPosition = position;
    }

    // 包裝 InputStream 以記錄讀取位置
    private static class MonitoredInputStream extends FilterInputStream {
        private final ResumableAzureBlobResource resource;
        private long bytesRead = 0;

        public MonitoredInputStream(InputStream in, ResumableAzureBlobResource resource) {
            super(in);
            this.resource = resource;
        }

        @Override
        public int read() throws IOException {
            int data = super.read();
            if (data != -1) {
                bytesRead++;
                resource.updatePosition(bytesRead);
            }
            return data;
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            int count = super.read(b, off, len);
            if (count > 0) {
                bytesRead += count;
                resource.updatePosition(bytesRead);
            }
            return count;
        }
    }
}

解決方案 4: Job 層級的 Restart 機制

利用 Spring Batch 內建的 Restart 功能:

@Bean
public Job csvImportJob(Step csvImportStep) {
    return jobBuilderFactory.get("csvImportJob")
            .start(csvImportStep)
            .incrementer(new RunIdIncrementer())  // 允許同樣參數重新執行
            .build();
}

@Bean
public Step csvImportStep(...) {
    return stepBuilderFactory.get("csvImportStep")
            .<CsvRecord, CsvRecord>chunk(1000)
            .reader(csvReader)
            .processor(csvProcessor)
            .writer(csvWriter)
            .allowStartIfComplete(true)  // 允許已完成的 Step 重新執行
            .build();
}

重新執行失敗的 Job:

@Service
public class BatchJobService {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job csvImportJob;

    @Autowired
    private JobExplorer jobExplorer;

    public void restartFailedJob(Long jobExecutionId) throws Exception {
        // 取得失敗的 JobExecution
        JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);

        if (failedExecution != null && failedExecution.getStatus() == BatchStatus.FAILED) {
            // 使用原始參數重新執行
            JobParameters originalParams = failedExecution.getJobParameters();

            log.info("重新執行失敗的 Job: {}", jobExecutionId);
            jobLauncher.run(csvImportJob, originalParams);

            // Spring Batch 會自動從失敗的 Chunk 繼續執行
        }
    }
}

運作機制:

第一次執行(網路中斷):
Chunk 1 (0-999):     ✅ 成功,Spring Batch 記錄進度
Chunk 2 (1000-1999): ✅ 成功,Spring Batch 記錄進度
Chunk 3 (2000-2999): ❌ 網路中斷失敗
Job 狀態: FAILED

重新執行:
Chunk 1 (0-999):     ⏭️ 偵測到已完成,跳過
Chunk 2 (1000-1999): ⏭️ 偵測到已完成,跳過
Chunk 3 (2000-2999): ✅ 從失敗點重新執行
Chunk 4 (3000-3999): ✅ 繼續執行
...
Job 狀態: COMPLETED

不同資料來源的比較

資料來源 網路依賴 斷點續傳 記憶體用量 適用場景
FileSystemResource N/A 本地檔案、小檔案
UrlResource 臨時下載、小檔案
Azure Blob (Spring Cloud) 生產環境、大檔案(推薦)
Azure Blob (自訂) 需要客製化功能
AWS S3 AWS 環境

網路異常處理最佳實踐

@Configuration
@RequiredArgsConstructor
public class RobustCsvImportJob {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final ResourceLoader resourceLoader;

    @Bean
    @StepScope
    public FlatFileItemReader<CsvRecord> csvReader(
            @Value("#{jobParameters['inputFile']}") String blobUrl) {

        Resource resource = resourceLoader.getResource(blobUrl);

        return new FlatFileItemReaderBuilder<CsvRecord>()
                .name("csvReader")
                .resource(resource)
                .delimited()
                .names("column1", "column2", "column3")
                .linesToSkip(1)
                .fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
                    setTargetType(CsvRecord.class);
                }})
                .saveState(true)  // ✅ 重要!儲存讀取進度
                .build();
    }

    @Bean
    public Step csvImportStep(ItemReader<CsvRecord> csvReader,
                              ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
                              JdbcBatchItemWriter<CsvRecord> csvWriter) {
        return stepBuilderFactory.get("csvImportStep")
                .<CsvRecord, CsvRecord>chunk(1000)
                .reader(csvReader)
                .processor(csvProcessor)
                .writer(csvWriter)

                // ✅ 網路異常處理
                .faultTolerant()
                .retryLimit(5)
                .retry(IOException.class)
                .retry(BlobStorageException.class)
                .retry(SocketTimeoutException.class)
                .backOffPolicy(networkBackOffPolicy())

                // ✅ 監控與告警
                .listener(networkRetryListener())

                .build();
    }

    @Bean
    public BackOffPolicy networkBackOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(2000);   // 網路問題初始等待長一點
        policy.setMaxInterval(60000);      // 最長等待 1 分鐘
        policy.setMultiplier(3.0);         // 快速增加等待時間
        return policy;
    }

    @Bean
    public Job csvImportJob(Step csvImportStep) {
        return jobBuilderFactory.get("csvImportJob")
                .start(csvImportStep)
                .incrementer(new RunIdIncrementer())  // ✅ 允許 Restart
                .build();
    }
}

application.properties 設定:

# Azure Blob 連線設定
spring.cloud.azure.storage.blob.account-name=${AZURE_STORAGE_ACCOUNT}
spring.cloud.azure.storage.blob.credential.managed-identity-enabled=true

# 連線逾時設定
spring.cloud.azure.storage.blob.retry.max-retries=5
spring.cloud.azure.storage.blob.retry.retry-delay=2s
spring.cloud.azure.storage.blob.retry.max-retry-delay=60s

# Spring Batch 設定
spring.batch.job.enabled=false  # 不自動執行
spring.batch.jdbc.initialize-schema=always  # 確保 Metadata Tables 存在

總結:雲端檔案讀取建議

生產環境推薦配置:

  1. 使用 Azure Blob Storage + Spring Cloud Azure
  2. 自動處理驗證和重試
  3. 支援 Managed Identity
  4. 與 Spring Batch 無縫整合

  5. 啟用 Step 層級的 Retry

  6. 處理暫時性網路異常
  7. 指數退避策略
  8. 最多重試 5 次

  9. 啟用 Reader 的 saveState

  10. 記錄讀取進度
  11. 支援 Job Restart

  12. 設定合理的 Chunk Size

  13. 網路不穩時建議 500-1000
  14. 避免單一 Chunk 執行時間過長

  15. 監控與告警

  16. 記錄重試次數
  17. 重試超過閾值時發送告警
  18. 追蹤網路異常頻率

2. ItemProcessor(處理器)- csvProcessor

功能說明

對讀取到的每一筆資料進行**驗證和轉換**。

實作細節

@Bean
public ItemProcessor<CsvRecord, CsvRecord> csvProcessor() {
    return record -> {
        // 資料驗證
        if (record.getColumn1() == null || record.getColumn1().trim().isEmpty()) {
            log.warn("跳過無效記錄: {}", record);
            return null; // 返回 null 會跳過此筆資料
        }

        // 資料轉換
        record.setColumn1(record.getColumn1().trim());

        return record;
    };
}

關鍵特性

  • 資料驗證: 檢查必填欄位是否為空
  • 過濾機制: 返回 null 可跳過不符合條件的資料
  • 資料清理: 去除前後空白字元
  • 彈性處理: 可加入任何業務邏輯(資料轉換、格式化、計算等)

運作流程

CsvRecord → 驗證 → 轉換 → 返回處理後的 CsvRecord(或 null 跳過)

3. ItemWriter(寫入器)- csvWriter

功能說明

將處理後的資料**批次寫入資料庫**。

實作細節

@Bean
public JdbcBatchItemWriter<CsvRecord> csvWriter() {
    String sql = "INSERT INTO your_table (column1, column2, column3) " +
                 "VALUES (:column1, :column2, :column3)";

    return new JdbcBatchItemWriterBuilder<CsvRecord>()
            .dataSource(dataSource)
            .sql(sql)
            .itemSqlParameterSourceProvider(record -> {
                MapSqlParameterSource params = new MapSqlParameterSource();
                params.addValue("column1", record.getColumn1());
                params.addValue("column2", record.getColumn2());
                params.addValue("column3", record.getColumn3());
                return params;
            })
            .build();
}

關鍵特性

  • 批次寫入: 不是一筆一筆寫入,而是累積到 chunk size 後批次寫入
  • JDBC Batch: 使用 JDBC 的 batch 功能,大幅提升寫入效能
  • 參數化查詢: 使用 Named Parameter 防止 SQL Injection
  • 自動對應: 透過 itemSqlParameterSourceProvider 將物件欄位對應到 SQL 參數

Chunk 處理機制(核心重點)

Chunk 定義

.<CsvRecord, CsvRecord>chunk(1000)

運作原理

記憶體控制

  • chunk(1000): 每次在記憶體中最多保留 1000 筆資料
  • 固定記憶體: 無論檔案多大,記憶體使用量固定

執行流程

1. Reader 讀取 1 筆資料 → Processor 處理 → 暫存在記憶體
2. Reader 讀取第 2 筆 → Processor 處理 → 暫存在記憶體
3. ...
4. Reader 讀取第 1000 筆 → Processor 處理 → 暫存在記憶體
5. Writer 批次寫入這 1000 筆到資料庫
6. 提交交易(Commit)
7. 清空記憶體
8. 重複步驟 1-7,直到檔案讀取完畢

Chunk Size 調校建議

Chunk Size 適用情境 優點 缺點
100-500 資料結構複雜、驗證邏輯複雜 記憶體用量小、錯誤定位容易 提交次數多、效能較慢
1000-2000 一般情況(推薦) 效能與記憶體平衡 適中
5000+ 資料結構簡單、高效能需求 寫入效能高 記憶體用量大、錯誤回滾範圍大

完整執行流程圖

啟動 Job: csvImportJob
    ↓
執行 Step: csvImportStep
    ↓
    ┌─────────────────────────────────────────────┐
    │  Chunk 1 (0-999 筆)                         │
    │  ┌──────────────────────────────────────┐   │
    │  │ Reader: 讀取第 1 筆 CSV              │   │
    │  │    ↓                                  │   │
    │  │ Processor: 驗證 + 轉換                │   │
    │  │    ↓                                  │   │
    │  │ 暫存到 Chunk 記憶體                   │   │
    │  └──────────────────────────────────────┘   │
    │                  ...                         │
    │  ┌──────────────────────────────────────┐   │
    │  │ Reader: 讀取第 1000 筆 CSV           │   │
    │  │    ↓                                  │   │
    │  │ Processor: 驗證 + 轉換                │   │
    │  │    ↓                                  │   │
    │  │ 暫存到 Chunk 記憶體                   │   │
    │  └──────────────────────────────────────┘   │
    │                  ↓                           │
    │  Writer: 批次寫入 1000 筆到資料庫            │
    │                  ↓                           │
    │  Transaction Commit                          │
    │                  ↓                           │
    │  清空 Chunk 記憶體                           │
    └─────────────────────────────────────────────┘
                      ↓
    ┌─────────────────────────────────────────────┐
    │  Chunk 2 (1000-1999 筆)                     │
    │                  ...                         │
    └─────────────────────────────────────────────┘
                      ↓
                  重複直到檔案結束
                      ↓
                 Job 完成

交易管理

交易邊界

  • 每個 Chunk 是一個交易單位
  • Chunk 成功 → 提交(Commit)
  • Chunk 失敗 → 回滾(Rollback)該 Chunk 的所有資料

範例

假設檔案有 2500 筆資料,chunk size = 1000

Chunk 1 (0-999):     成功 → Commit → 資料庫有 1000 筆
Chunk 2 (1000-1999): 成功 → Commit → 資料庫有 2000 筆
Chunk 3 (2000-2499): 失敗 → Rollback → 資料庫仍有 2000 筆

最終結果: 資料庫有 2000 筆資料,第 3 個 Chunk 需要修正後重新執行

記憶體優化原理

傳統做法(不佳)

// ❌ 將整個檔案讀到記憶體
List<CsvRecord> allRecords = readAllFromCsv(); // 假設 100 萬筆
// 記憶體用量: 100 萬筆 × 每筆大小 = 可能數 GB

for (CsvRecord record : allRecords) {
    process(record);
    saveToDatabase(record);
}

Spring Batch 做法(優化)

// ✅ 串流式讀取 + Chunk 處理
// 記憶體用量: 1000 筆 × 每筆大小 = 可能數 MB
chunk(1000)
    .reader(csvReader)    // 一次讀 1 筆
    .processor(processor)  // 處理 1 筆
    .writer(writer)        // 累積 1000 筆才批次寫入

記憶體用量比較

資料量 傳統做法記憶體 Spring Batch 記憶體 節省比例
1 萬筆 10 MB ~1 MB 90%
10 萬筆 100 MB ~1 MB 99%
100 萬筆 1 GB ~1 MB 99.9%
1000 萬筆 10 GB ~1 MB 99.99%

Job Parameters(作業參數)

用途

在執行 Job 時動態傳入參數,例如檔案路徑。

使用方式

@Value("#{jobParameters['inputFile']}")

執行範例

JobParameters jobParameters = new JobParametersBuilder()
    .addString("inputFile", "/path/to/large-file.csv")
    .addDate("startDate", new Date())
    .toJobParameters();

jobLauncher.run(csvImportJob, jobParameters);

或透過 Command Line:

java -jar app.jar --job.name=csvImportJob inputFile=/path/to/large-file.csv

Chunk Exception 處理機制

預設行為:整個 Chunk 回滾

當一個 Chunk 在處理過程中發生異常,Spring Batch 預設會回滾整個 Chunk 的交易

範例情境

假設 chunk size = 1000

Chunk 1 (0-999 筆):
  - Reader 讀取第 1 筆 → 成功
  - Processor 處理第 1 筆 → 成功
  - ...
  - Reader 讀取第 500 筆 → 成功
  - Processor 處理第 500 筆 → ❌ 拋出 Exception

結果:
  ✅ Transaction Rollback
  ✅ 這 500 筆資料都不會寫入資料庫
  ❌ Job 執行失敗,狀態變為 FAILED

問題分析

1. 一筆資料錯誤導致整批失敗

1000 筆資料中,只有第 500 筆有問題
→ 整批 1000 筆都無法寫入
→ 需要修正資料後重新執行

2. 記憶體中的資料無法恢復

異常發生時:
  - 前 499 筆已經處理完畢(在記憶體中)
  - 第 500 筆處理失敗
  → 整個 Chunk 回滾
  → 前 499 筆的處理結果也會消失

3. 重新執行成本高

修正第 500 筆資料後重新執行:
  → 前 499 筆會重新讀取、處理、寫入
  → 浪費計算資源

三種異常處理策略

策略 1: Restart(重啟)- 預設行為

適用情境

  • 資料錯誤可以被修正(例如:格式錯誤、值超出範圍)
  • 可以重新執行整個 Job
  • 資料量不大,重新執行成本可接受

運作流程

第一次執行:
Chunk 1 (0-999):   ✅ 成功,資料庫有 1000 筆
Chunk 2 (1000-1999): ❌ 第 1500 筆失敗,回滾
Job 狀態: FAILED

修正資料後,重新執行:
Chunk 1 (0-999):   ⚠️ 已存在,Spring Batch 會偵測並跳過
Chunk 2 (1000-1999): ✅ 成功,資料庫有 2000 筆
Chunk 3 (2000-2999): ✅ 成功,資料庫有 3000 筆
Job 狀態: COMPLETED

設定方式

@Bean
public Step csvImportStep(...) {
    return stepBuilderFactory.get("csvImportStep")
            .<CsvRecord, CsvRecord>chunk(1000)
            .reader(csvReader)
            .processor(csvProcessor)
            .writer(csvWriter)
            // 預設就是 Restart 模式,不需額外設定
            .build();
}

優點

  • ✅ 資料一致性最高
  • ✅ 實作簡單
  • ✅ Spring Batch 會記錄進度,重啟時可以從失敗的 Chunk 繼續

缺點

  • ❌ 需要人工介入修正資料
  • ❌ Job 會中斷,需要重新啟動
  • ❌ 一筆錯誤影響整批資料

策略 2: Skip(跳過)- 容錯處理

適用情境

  • 少數資料錯誤可以接受(例如:日誌匯入、非關鍵資料)
  • 資料錯誤無法即時修正
  • 希望 Job 不中斷完整執行

運作流程

Chunk 1 (0-999 筆):
  - 第 1-499 筆 → ✅ 處理成功
  - 第 500 筆 → ❌ Processor 拋出 ValidationException
  - ⚠️ Spring Batch 跳過第 500 筆
  - 第 501-999 筆 → ✅ 繼續處理

結果:
  ✅ 999 筆寫入資料庫(跳過第 500 筆)
  ✅ Job 繼續執行,不會失敗
  ⚠️ 跳過的資料會被記錄在 Metadata Tables

設定方式

@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
                          ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
                          JdbcBatchItemWriter<CsvRecord> csvWriter) {
    return stepBuilderFactory.get("csvImportStep")
            .<CsvRecord, CsvRecord>chunk(1000)
            .reader(csvReader)
            .processor(csvProcessor)
            .writer(csvWriter)
            .faultTolerant()                        // 啟用容錯機制
            .skipLimit(100)                         // 最多允許跳過 100 筆
            .skip(ValidationException.class)        // 指定可跳過的異常類型
            .skip(DataFormatException.class)        // 可指定多個
            .noSkip(FatalException.class)           // 致命錯誤不可跳過
            .build();
}

進階配置:自訂 Skip 策略

@Bean
public Step csvImportStep(...) {
    return stepBuilderFactory.get("csvImportStep")
            .<CsvRecord, CsvRecord>chunk(1000)
            .reader(csvReader)
            .processor(csvProcessor)
            .writer(csvWriter)
            .faultTolerant()
            .skipPolicy(customSkipPolicy())  // 使用自訂策略
            .build();
}

@Bean
public SkipPolicy customSkipPolicy() {
    return (throwable, skipCount) -> {
        // 自訂邏輯:根據異常類型和已跳過次數決定是否跳過
        if (throwable instanceof ValidationException) {
            return skipCount < 100;  // ValidationException 最多跳過 100 次
        }
        if (throwable instanceof SQLException) {
            return false;  // SQL 異常不可跳過
        }
        return skipCount < 50;  // 其他異常最多跳過 50 次
    };
}

Skip Listener(監聽跳過的資料)

@Bean
public Step csvImportStep(...) {
    return stepBuilderFactory.get("csvImportStep")
            .<CsvRecord, CsvRecord>chunk(1000)
            .reader(csvReader)
            .processor(csvProcessor)
            .writer(csvWriter)
            .faultTolerant()
            .skipLimit(100)
            .skip(ValidationException.class)
            .listener(skipListener())  // 註冊監聽器
            .build();
}

@Component
public class CustomSkipListener implements SkipListener<CsvRecord, CsvRecord> {

    @Override
    public void onSkipInRead(Throwable t) {
        log.error("跳過讀取資料時發生錯誤: {}", t.getMessage());
    }

    @Override
    public void onSkipInProcess(CsvRecord item, Throwable t) {
        log.error("跳過處理資料: {}, 原因: {}", item, t.getMessage());
        // 可以將跳過的資料寫入錯誤檔案或錯誤表
        saveToErrorTable(item, t.getMessage());
    }

    @Override
    public void onSkipInWrite(CsvRecord item, Throwable t) {
        log.error("跳過寫入資料: {}, 原因: {}", item, t.getMessage());
    }

    private void saveToErrorTable(CsvRecord item, String errorMessage) {
        // 將錯誤資料寫入專門的錯誤表,供後續人工處理
        jdbcTemplate.update(
            "INSERT INTO error_records (data, error_msg, create_time) VALUES (?, ?, ?)",
            item.toString(), errorMessage, LocalDateTime.now()
        );
    }
}

優點

  • ✅ Job 不會因少數錯誤而中斷
  • ✅ 提高批次作業的完整性
  • ✅ 可以記錄跳過的資料供後續處理

缺點

  • ❌ 部分資料未匯入,需要後續處理
  • ❌ 如果跳過次數過多,可能資料品質有問題
  • ❌ 需要額外的錯誤資料追蹤機制

最佳實踐

// 1. 建立錯誤資料表
CREATE TABLE error_records (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    job_execution_id BIGINT,
    step_name VARCHAR(100),
    item_data TEXT,
    error_message TEXT,
    error_type VARCHAR(100),
    create_time TIMESTAMP
);

// 2. 在 SkipListener 中詳細記錄
@Override
public void onSkipInProcess(CsvRecord item, Throwable t) {
    ErrorRecord error = new ErrorRecord();
    error.setJobExecutionId(stepExecution.getJobExecutionId());
    error.setStepName(stepExecution.getStepName());
    error.setItemData(objectMapper.writeValueAsString(item));
    error.setErrorMessage(t.getMessage());
    error.setErrorType(t.getClass().getSimpleName());
    error.setCreateTime(LocalDateTime.now());

    errorRecordRepository.save(error);

    // 發送告警(如果跳過次數過多)
    if (skipCount > 50) {
        alertService.sendAlert("Skip count exceeded 50 in job: " + jobName);
    }
}

策略 3: Retry(重試)- 暫時性錯誤

適用情境

  • 暫時性錯誤(例如:資料庫連線逾時、網路暫時中斷、Deadlock)
  • 錯誤可能自行恢復
  • 重試可以解決問題

運作流程

Chunk 1 (0-999 筆):
  - 第 1-499 筆 → ✅ 處理成功
  - 第 500 筆 → ❌ Writer 拋出 DeadlockLoserDataAccessException
  - ⚠️ Spring Batch 重試第 500 筆(第 1 次)
  - 第 500 筆 → ❌ 仍然失敗
  - ⚠️ Spring Batch 重試第 500 筆(第 2 次)
  - 第 500 筆 → ✅ 成功!
  - 第 501-999 筆 → ✅ 繼續處理

結果:
  ✅ 1000 筆全部寫入資料庫
  ✅ Job 繼續執行
  ⚠️ 第 500 筆實際處理了 3 次(1 次原始 + 2 次重試)

設定方式

@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
                          ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
                          JdbcBatchItemWriter<CsvRecord> csvWriter) {
    return stepBuilderFactory.get("csvImportStep")
            .<CsvRecord, CsvRecord>chunk(1000)
            .reader(csvReader)
            .processor(csvProcessor)
            .writer(csvWriter)
            .faultTolerant()                                    // 啟用容錯機制
            .retryLimit(3)                                      // 最多重試 3 次
            .retry(DeadlockLoserDataAccessException.class)      // 死鎖時重試
            .retry(OptimisticLockingFailureException.class)     // 樂觀鎖失敗時重試
            .retry(TransientDataAccessException.class)          // 暫時性資料庫異常
            .noRetry(SQLException.class)                        // SQL 異常不重試
            .build();
}

進階配置:設定重試間隔

@Bean
public Step csvImportStep(...) {
    return stepBuilderFactory.get("csvImportStep")
            .<CsvRecord, CsvRecord>chunk(1000)
            .reader(csvReader)
            .processor(csvProcessor)
            .writer(csvWriter)
            .faultTolerant()
            .retryPolicy(customRetryPolicy())  // 使用自訂重試策略
            .build();
}

@Bean
public RetryPolicy customRetryPolicy() {
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3);

    // 可以針對不同異常設定不同的重試次數
    Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
    retryableExceptions.put(DeadlockLoserDataAccessException.class, true);
    retryableExceptions.put(OptimisticLockingFailureException.class, true);

    return retryPolicy;
}

@Bean
public BackOffPolicy backOffPolicy() {
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);   // 初始等待 1 秒
    backOffPolicy.setMaxInterval(10000);      // 最長等待 10 秒
    backOffPolicy.setMultiplier(2.0);         // 每次等待時間翻倍
    return backOffPolicy;
}

Retry 流程範例

第 500 筆資料處理失敗(Deadlock):

嘗試 1: ❌ 失敗 → 等待 1 秒
嘗試 2: ❌ 失敗 → 等待 2 秒
嘗試 3: ✅ 成功!

如果 3 次都失敗:
  → 根據設定決定是 Skip 或 Fail

優點

  • ✅ 自動處理暫時性錯誤
  • ✅ 無需人工介入
  • ✅ 提高 Job 穩定性

缺點

  • ❌ 增加執行時間(重試需要時間)
  • ❌ 如果不是暫時性錯誤,重試無效
  • ❌ 需要確保 Processor 和 Writer 是冪等的(多次執行結果相同)

策略組合:Skip + Retry

在實務上,通常會**組合使用 Skip 和 Retry**:

@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
                          ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
                          JdbcBatchItemWriter<CsvRecord> csvWriter) {
    return stepBuilderFactory.get("csvImportStep")
            .<CsvRecord, CsvRecord>chunk(1000)
            .reader(csvReader)
            .processor(csvProcessor)
            .writer(csvWriter)
            .faultTolerant()

            // Retry 設定:處理暫時性錯誤
            .retryLimit(3)
            .retry(DeadlockLoserDataAccessException.class)
            .retry(OptimisticLockingFailureException.class)
            .backOffPolicy(exponentialBackOffPolicy())

            // Skip 設定:重試失敗後跳過
            .skipLimit(100)
            .skip(ValidationException.class)         // 驗證錯誤直接跳過
            .skip(DataFormatException.class)         // 格式錯誤直接跳過
            .skipPolicy((throwable, skipCount) -> {
                // 重試 3 次後仍失敗的 Deadlock,也可以跳過
                if (throwable instanceof DeadlockLoserDataAccessException) {
                    return skipCount < 10;  // 最多跳過 10 個 Deadlock
                }
                return false;
            })

            // 監聽器:記錄所有跳過和重試的資料
            .listener(skipListener())
            .listener(retryListener())

            .build();
}

執行邏輯

第 500 筆資料處理流程:

1. 第一次處理 → ❌ DeadlockLoserDataAccessException
2. 符合 Retry 條件 → 等待 1 秒後重試
3. 第二次處理 → ❌ 仍然 Deadlock
4. 繼續重試 → 等待 2 秒後重試
5. 第三次處理 → ❌ 仍然 Deadlock
6. Retry 次數用盡 → 檢查 Skip 條件
7. 符合 Skip 條件 → ⚠️ 跳過此筆資料
8. SkipListener 記錄錯誤 → 寫入 error_records 表
9. 繼續處理第 501 筆

不同場景的建議策略

場景 推薦策略 設定重點
關鍵業務資料(如訂單) Restart 不容許遺漏,錯誤必須修正
日誌匯入、統計資料 Skip 少量錯誤可接受,記錄供後續檢視
大量資料且網路不穩 Retry 設定合理的重試次數和間隔
一般資料匯入 Retry + Skip 先重試暫時性錯誤,重試失敗後跳過並記錄
ETL 資料遷移 Retry + Skip + 人工審核 組合策略 + 錯誤資料匯出供人工處理

完整範例:生產環境配置

@Slf4j
@Configuration
@RequiredArgsConstructor
public class LargeCsvImportJob {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Bean
    public Step csvImportStep(ItemReader<CsvRecord> csvReader,
                              ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
                              JdbcBatchItemWriter<CsvRecord> csvWriter,
                              SkipListener<CsvRecord, CsvRecord> skipListener) {
        return stepBuilderFactory.get("csvImportStep")
                .<CsvRecord, CsvRecord>chunk(1000)
                .reader(csvReader)
                .processor(csvProcessor)
                .writer(csvWriter)

                // 容錯配置
                .faultTolerant()

                // Retry:暫時性錯誤重試 3 次
                .retryLimit(3)
                .retry(DeadlockLoserDataAccessException.class)
                .retry(OptimisticLockingFailureException.class)
                .retry(QueryTimeoutException.class)
                .backOffPolicy(exponentialBackOffPolicy())

                // Skip:資料錯誤最多跳過 100 筆
                .skipLimit(100)
                .skip(ValidationException.class)
                .skip(DataFormatException.class)
                .skip(ConstraintViolationException.class)

                // 致命錯誤不跳過,直接失敗
                .noSkip(OutOfMemoryError.class)
                .noSkip(SQLException.class)

                // 監聽器
                .listener(skipListener)

                .build();
    }

    @Bean
    public BackOffPolicy exponentialBackOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);    // 第一次重試等待 1 秒
        policy.setMaxInterval(10000);       // 最長等待 10 秒
        policy.setMultiplier(2.0);          // 每次等待時間 × 2
        return policy;
    }

    @Component
    public static class CustomSkipListener implements SkipListener<CsvRecord, CsvRecord> {

        @Autowired
        private JdbcTemplate jdbcTemplate;

        @Autowired
        private AlertService alertService;

        private final AtomicInteger skipCount = new AtomicInteger(0);

        @Override
        public void onSkipInProcess(CsvRecord item, Throwable t) {
            int count = skipCount.incrementAndGet();

            log.error("跳過資料 [{}]: {}, 錯誤: {}", count, item, t.getMessage());

            // 記錄到錯誤表
            jdbcTemplate.update(
                "INSERT INTO error_records (item_data, error_msg, error_type, create_time) VALUES (?, ?, ?, ?)",
                item.toString(), 
                t.getMessage(), 
                t.getClass().getSimpleName(), 
                LocalDateTime.now()
            );

            // 跳過次數過多時發送告警
            if (count > 50) {
                alertService.sendAlert("CSV Import: Skip count exceeded 50!");
            }
        }

        @Override
        public void onSkipInWrite(CsvRecord item, Throwable t) {
            log.error("跳過寫入資料: {}, 錯誤: {}", item, t.getMessage());
        }

        @Override
        public void onSkipInRead(Throwable t) {
            log.error("跳過讀取資料,錯誤: {}", t.getMessage());
        }
    }

    @Bean
    public Job csvImportJob(Step csvImportStep) {
        return jobBuilderFactory.get("csvImportJob")
                .start(csvImportStep)
                .build();
    }
}

監控與告警

1. 查詢執行狀態

-- 查詢 Job 執行狀態
SELECT 
    je.JOB_EXECUTION_ID,
    je.STATUS,
    je.START_TIME,
    je.END_TIME,
    se.STEP_NAME,
    se.READ_COUNT,
    se.WRITE_COUNT,
    se.READ_SKIP_COUNT,      -- 讀取時跳過的筆數
    se.PROCESS_SKIP_COUNT,   -- 處理時跳過的筆數
    se.WRITE_SKIP_COUNT,     -- 寫入時跳過的筆數
    se.ROLLBACK_COUNT        -- 回滾次數
FROM BATCH_JOB_EXECUTION je
JOIN BATCH_STEP_EXECUTION se ON je.JOB_EXECUTION_ID = se.JOB_EXECUTION_ID
WHERE je.JOB_NAME = 'csvImportJob'
ORDER BY je.START_TIME DESC;

2. 告警機制

@Component
public class JobExecutionListener implements org.springframework.batch.core.JobExecutionListener {

    @Autowired
    private AlertService alertService;

    @Override
    public void afterJob(JobExecution jobExecution) {
        Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();

        for (StepExecution step : stepExecutions) {
            int totalSkips = step.getReadSkipCount() 
                           + step.getProcessSkipCount() 
                           + step.getWriteSkipCount();

            // 跳過筆數過多時告警
            if (totalSkips > 100) {
                alertService.sendAlert(
                    String.format("Job %s: Total skips = %d", 
                                  jobExecution.getJobInstance().getJobName(), 
                                  totalSkips)
                );
            }

            // 回滾次數過多時告警
            if (step.getRollbackCount() > 5) {
                alertService.sendAlert(
                    String.format("Job %s: High rollback count = %d", 
                                  jobExecution.getJobInstance().getJobName(), 
                                  step.getRollbackCount())
                );
            }
        }
    }
}

總結:Exception 處理決策樹

Chunk 發生 Exception
        ↓
是否為暫時性錯誤?
    ├─ 是(Deadlock、Timeout)
    │   → 使用 Retry 策略
    │   → 重試 3 次
    │        ↓
    │   重試成功?
    │       ├─ 是 → ✅ 繼續執行
    │       └─ 否 → 進入 Skip 判斷
    │
    └─ 否(資料格式錯誤、驗證失敗)
         ↓
    是否可接受遺漏此筆資料?
        ├─ 是(非關鍵資料)
        │   → 使用 Skip 策略
        │   → 記錄錯誤資料
        │   → ✅ 繼續執行
        │
        └─ 否(關鍵業務資料)
            → 使用 Restart 策略
            → ❌ Job 失敗
            → 修正資料後重新執行

效能優化建議

1. Chunk Size 調整

  • 預設 1000 是經驗值
  • 可根據資料大小和資料庫效能調整
  • 建議範圍: 500-5000

2. 資料庫連線池

spring.datasource.hikari.maximum-pool-size=10
spring.datasource.hikari.minimum-idle=5

3. 批次插入優化

spring.jpa.properties.hibernate.jdbc.batch_size=1000
spring.jpa.properties.hibernate.order_inserts=true

4. Processor 複雜度

  • 避免在 Processor 中進行耗時操作(如外部 API 呼叫)
  • 複雜運算考慮使用多執行緒

多執行緒處理(進階)

如需提升效能,可啟用多執行緒:

@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
                          ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
                          JdbcBatchItemWriter<CsvRecord> csvWriter,
                          TaskExecutor taskExecutor) {
    return stepBuilderFactory.get("csvImportStep")
            .<CsvRecord, CsvRecord>chunk(1000)
            .reader(csvReader)
            .processor(csvProcessor)
            .writer(csvWriter)
            .taskExecutor(taskExecutor)  // 啟用多執行緒
            .throttleLimit(10)            // 最多 10 個執行緒
            .build();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("batch-");
    executor.initialize();
    return executor;
}

⚠️ 注意: 多執行緒模式下,Reader 必須是 Thread-safe 的。

監控與日誌

Spring Batch 內建監控

  • JobRepository: 自動記錄 Job 執行狀態
  • MetaData Tables:
  • BATCH_JOB_INSTANCE
  • BATCH_JOB_EXECUTION
  • BATCH_STEP_EXECUTION
  • BATCH_STEP_EXECUTION_CONTEXT
  • BATCH_JOB_EXECUTION_CONTEXT
  • BATCH_JOB_EXECUTION_PARAMS

MetaData Tables 建立方式

Spring Batch 的 Metadata Tables 不需要手動建立,有多種自動化方式:

方式 1: 自動建立(開發環境推薦)

application.properties 設定:

# 自動建立 Spring Batch Metadata Tables
spring.batch.jdbc.initialize-schema=always

# 其他可選值:
# - always: 每次啟動都執行建表 SQL(會先 DROP TABLE,適合開發環境)
# - embedded: 只在嵌入式資料庫(H2、HSQL)時自動建立(預設值)
# - never: 不自動建立,需要手動建立

運作原理: - Spring Boot 會自動偵測使用的資料庫類型(MySQL、PostgreSQL、Oracle 等) - 自動執行對應的建表 SQL 腳本 - SQL 腳本位於 Spring Batch 的 JAR 檔中

適用情境: - ✅ 開發環境、測試環境 - ✅ 使用嵌入式資料庫(H2) - ❌ 生產環境(不建議使用 always,避免誤刪資料)


方式 2: 手動執行 SQL(生產環境推薦)

步驟 1: 取得官方 SQL 腳本

Spring Batch 為各種資料庫提供了官方的建表腳本,位於:

spring-batch-core-{version}.jar
└── org/springframework/batch/core/
    ├── schema-mysql.sql          # MySQL / MariaDB
    ├── schema-postgresql.sql     # PostgreSQL
    ├── schema-oracle.sql         # Oracle
    ├── schema-sqlserver.sql      # SQL Server
    ├── schema-db2.sql            # DB2
    ├── schema-h2.sql             # H2
    └── ...

從 Maven 依賴中取得腳本:

方法 1: 從本地 Maven Repository 複製

# Windows
cd %USERPROFILE%\.m2\repository\org\springframework\batch\spring-batch-core\4.3.10
jar -xf spring-batch-core-4.3.10.jar org/springframework/batch/core/schema-mysql.sql

# Linux / macOS
cd ~/.m2/repository/org/springframework/batch/spring-batch-core/4.3.10
jar -xf spring-batch-core-4.3.10.jar org/springframework/batch/core/schema-mysql.sql

方法 2: 從 GitHub 下載

# Spring Batch GitHub Repository
https://github.com/spring-projects/spring-batch/tree/main/spring-batch-core/src/main/resources/org/springframework/batch/core

# 直接下載 MySQL 腳本範例
curl -O https://raw.githubusercontent.com/spring-projects/spring-batch/main/spring-batch-core/src/main/resources/org/springframework/batch/core/schema-mysql.sql

步驟 2: 執行 SQL 腳本

# MySQL
mysql -u root -p your_database < schema-mysql.sql

# PostgreSQL
psql -U postgres -d your_database -f schema-postgresql.sql

# SQL Server
sqlcmd -S localhost -d your_database -i schema-sqlserver.sql -U sa -P password

步驟 3: 設定 application.properties

# 告訴 Spring Boot 不要自動建表,因為已經手動建立
spring.batch.jdbc.initialize-schema=never

方式 3: 使用 Flyway 或 Liquibase(企業環境推薦)

使用 Flyway 管理資料庫版本:

pom.xml 加入依賴:

<dependency>
    <groupId>org.flywaydb</groupId>
    <artifactId>flyway-core</artifactId>
</dependency>

建立 Migration 檔案:

src/main/resources/
└── db/
    └── migration/
        ├── V1__create_spring_batch_tables.sql     # Spring Batch Metadata Tables
        └── V2__create_business_tables.sql         # 業務表(如 your_table)

V1__create_spring_batch_tables.sql(從官方腳本複製):

-- MySQL 範例
CREATE TABLE BATCH_JOB_INSTANCE  (
    JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT ,
    JOB_NAME VARCHAR(100) NOT NULL,
    JOB_KEY VARCHAR(32) NOT NULL,
    constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION  (
    JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT  ,
    JOB_INSTANCE_ID BIGINT NOT NULL,
    CREATE_TIME DATETIME(6) NOT NULL,
    START_TIME DATETIME(6) DEFAULT NULL ,
    END_TIME DATETIME(6) DEFAULT NULL ,
    STATUS VARCHAR(10) ,
    EXIT_CODE VARCHAR(2500) ,
    EXIT_MESSAGE VARCHAR(2500) ,
    LAST_UPDATED DATETIME(6),
    constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
    references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
    JOB_EXECUTION_ID BIGINT NOT NULL ,
    PARAMETER_NAME VARCHAR(100) NOT NULL ,
    PARAMETER_TYPE VARCHAR(100) NOT NULL ,
    PARAMETER_VALUE VARCHAR(2500) ,
    IDENTIFYING CHAR(1) NOT NULL ,
    constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION  (
    STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT NOT NULL,
    STEP_NAME VARCHAR(100) NOT NULL,
    JOB_EXECUTION_ID BIGINT NOT NULL,
    CREATE_TIME DATETIME(6) NOT NULL,
    START_TIME DATETIME(6) DEFAULT NULL ,
    END_TIME DATETIME(6) DEFAULT NULL ,
    STATUS VARCHAR(10) ,
    COMMIT_COUNT BIGINT ,
    READ_COUNT BIGINT ,
    FILTER_COUNT BIGINT ,
    WRITE_COUNT BIGINT ,
    READ_SKIP_COUNT BIGINT ,
    WRITE_SKIP_COUNT BIGINT ,
    PROCESS_SKIP_COUNT BIGINT ,
    ROLLBACK_COUNT BIGINT ,
    EXIT_CODE VARCHAR(2500) ,
    EXIT_MESSAGE VARCHAR(2500) ,
    LAST_UPDATED DATETIME(6),
    constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
    STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT ,
    constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
    references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
    JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT ,
    constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
    ID BIGINT NOT NULL,
    UNIQUE_KEY CHAR(1) NOT NULL,
    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);

CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
    ID BIGINT NOT NULL,
    UNIQUE_KEY CHAR(1) NOT NULL,
    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);

CREATE TABLE BATCH_JOB_SEQ (
    ID BIGINT NOT NULL,
    UNIQUE_KEY CHAR(1) NOT NULL,
    constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;

INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

application.properties 設定:

# Flyway 設定
spring.flyway.enabled=true
spring.flyway.baseline-on-migrate=true

# Spring Batch 不自動建表(由 Flyway 管理)
spring.batch.jdbc.initialize-schema=never

優點: - ✅ 版本控制:所有資料庫變更都有版本記錄 - ✅ 可追蹤:知道何時、由誰執行了哪些變更 - ✅ 可回滾:可以回到特定版本 - ✅ 多環境一致:開發、測試、生產環境使用相同腳本 - ✅ CI/CD 整合:可自動化部署資料庫變更


方式 4: Spring Boot 自動執行自訂 SQL(簡化版)

如果不想使用 Flyway,可以讓 Spring Boot 自動執行自訂 SQL:

步驟 1: 放置 SQL 檔案

src/main/resources/
├── schema.sql                    # 建表 SQL(啟動時自動執行)
└── data.sql                      # 初始資料 SQL(可選)

schema.sql(複製官方腳本內容):

-- 從官方 schema-mysql.sql 複製完整內容
CREATE TABLE BATCH_JOB_INSTANCE ( ... );
CREATE TABLE BATCH_JOB_EXECUTION ( ... );
-- ... 其他表

application.properties 設定:

# 啟動時執行 schema.sql 和 data.sql
spring.sql.init.mode=always
# 或 spring.sql.init.mode=embedded (只在嵌入式資料庫執行)

# Spring Batch 不再自動建表
spring.batch.jdbc.initialize-schema=never

Metadata Tables 說明

1. BATCH_JOB_INSTANCE

用途: 記錄 Job 的唯一實例(由 Job Name + Job Parameters 識別)

CREATE TABLE BATCH_JOB_INSTANCE  (
    JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT ,
    JOB_NAME VARCHAR(100) NOT NULL,           -- Job 名稱
    JOB_KEY VARCHAR(32) NOT NULL,             -- Parameters 的 Hash 值
    constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
);

範例查詢:

-- 查詢所有 Job Instance
SELECT 
    JOB_INSTANCE_ID,
    JOB_NAME,
    JOB_KEY
FROM BATCH_JOB_INSTANCE
ORDER BY JOB_INSTANCE_ID DESC;

2. BATCH_JOB_EXECUTION

用途: 記錄每次 Job 執行的詳細資訊

CREATE TABLE BATCH_JOB_EXECUTION  (
    JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT  ,
    JOB_INSTANCE_ID BIGINT NOT NULL,          -- 關聯的 Job Instance
    CREATE_TIME DATETIME(6) NOT NULL,         -- 建立時間
    START_TIME DATETIME(6) DEFAULT NULL ,     -- 開始時間
    END_TIME DATETIME(6) DEFAULT NULL ,       -- 結束時間
    STATUS VARCHAR(10) ,                      -- STARTED, COMPLETED, FAILED 等
    EXIT_CODE VARCHAR(2500) ,
    EXIT_MESSAGE VARCHAR(2500) ,
    LAST_UPDATED DATETIME(6)
);

範例查詢:

-- 查詢最近執行的 Job
SELECT 
    je.JOB_EXECUTION_ID,
    ji.JOB_NAME,
    je.STATUS,
    je.START_TIME,
    je.END_TIME,
    TIMESTAMPDIFF(SECOND, je.START_TIME, je.END_TIME) as DURATION_SECONDS,
    je.EXIT_CODE,
    je.EXIT_MESSAGE
FROM BATCH_JOB_EXECUTION je
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE ji.JOB_NAME = 'csvImportJob'
ORDER BY je.START_TIME DESC
LIMIT 10;

3. BATCH_STEP_EXECUTION

用途: 記錄每個 Step 執行的詳細統計資訊(最重要的監控表)

CREATE TABLE BATCH_STEP_EXECUTION  (
    STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT NOT NULL,
    STEP_NAME VARCHAR(100) NOT NULL,          -- Step 名稱
    JOB_EXECUTION_ID BIGINT NOT NULL,         -- 關聯的 Job Execution
    START_TIME DATETIME(6) DEFAULT NULL ,
    END_TIME DATETIME(6) DEFAULT NULL ,
    STATUS VARCHAR(10) ,
    COMMIT_COUNT BIGINT ,                     -- 提交次數(= Chunk 數量)
    READ_COUNT BIGINT ,                       -- 讀取筆數
    FILTER_COUNT BIGINT ,                     -- 過濾筆數(Processor 返回 null)
    WRITE_COUNT BIGINT ,                      -- 寫入筆數
    READ_SKIP_COUNT BIGINT ,                  -- 讀取時跳過筆數
    WRITE_SKIP_COUNT BIGINT ,                 -- 寫入時跳過筆數
    PROCESS_SKIP_COUNT BIGINT ,               -- 處理時跳過筆數
    ROLLBACK_COUNT BIGINT ,                   -- 回滾次數
    EXIT_CODE VARCHAR(2500) ,
    EXIT_MESSAGE VARCHAR(2500) ,
    LAST_UPDATED DATETIME(6)
);

範例查詢:

-- 查詢 Job 的詳細執行統計
SELECT 
    ji.JOB_NAME,
    je.JOB_EXECUTION_ID,
    je.STATUS as JOB_STATUS,
    se.STEP_NAME,
    se.STATUS as STEP_STATUS,
    se.READ_COUNT,                            -- 讀取總數
    se.WRITE_COUNT,                           -- 寫入總數
    se.FILTER_COUNT,                          -- Processor 過濾(返回 null)
    se.READ_SKIP_COUNT,                       -- 讀取跳過
    se.PROCESS_SKIP_COUNT,                    -- 處理跳過
    se.WRITE_SKIP_COUNT,                      -- 寫入跳過
    (se.READ_SKIP_COUNT + se.PROCESS_SKIP_COUNT + se.WRITE_SKIP_COUNT) as TOTAL_SKIP,
    se.ROLLBACK_COUNT,                        -- 回滾次數
    se.COMMIT_COUNT,                          -- Chunk 提交次數
    ROUND(se.READ_COUNT / se.COMMIT_COUNT, 0) as AVG_CHUNK_SIZE,
    TIMESTAMPDIFF(SECOND, se.START_TIME, se.END_TIME) as DURATION_SECONDS,
    ROUND(se.READ_COUNT / TIMESTAMPDIFF(SECOND, se.START_TIME, se.END_TIME), 2) as RECORDS_PER_SECOND
FROM BATCH_STEP_EXECUTION se
JOIN BATCH_JOB_EXECUTION je ON se.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE ji.JOB_NAME = 'csvImportJob'
ORDER BY se.START_TIME DESC;

4. BATCH_JOB_EXECUTION_PARAMS

用途: 記錄每次執行時傳入的 Job Parameters

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
    JOB_EXECUTION_ID BIGINT NOT NULL ,
    PARAMETER_NAME VARCHAR(100) NOT NULL ,    -- 參數名稱(如 inputFile)
    PARAMETER_TYPE VARCHAR(100) NOT NULL ,    -- 參數類型(STRING, DATE, LONG, DOUBLE)
    PARAMETER_VALUE VARCHAR(2500) ,           -- 參數值
    IDENTIFYING CHAR(1) NOT NULL              -- 是否用於識別 Job Instance
);

範例查詢:

-- 查詢 Job 執行時使用的參數
SELECT 
    ji.JOB_NAME,
    je.JOB_EXECUTION_ID,
    jep.PARAMETER_NAME,
    jep.PARAMETER_VALUE,
    je.STATUS,
    je.START_TIME
FROM BATCH_JOB_EXECUTION_PARAMS jep
JOIN BATCH_JOB_EXECUTION je ON jep.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE ji.JOB_NAME = 'csvImportJob'
ORDER BY je.START_TIME DESC;

5. BATCH_STEP_EXECUTION_CONTEXT

用途: 儲存 Step 執行時的上下文資訊(如 Reader 讀取進度,用於 Restart)

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
    STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    SHORT_CONTEXT VARCHAR(2500) NOT NULL,     -- JSON 格式的簡短上下文
    SERIALIZED_CONTEXT TEXT                    -- 完整序列化的上下文
);

範例查詢:

-- 查詢 Step 的執行上下文(可看到 Reader 讀到第幾行)
SELECT 
    se.STEP_EXECUTION_ID,
    se.STEP_NAME,
    se.STATUS,
    sec.SHORT_CONTEXT
FROM BATCH_STEP_EXECUTION_CONTEXT sec
JOIN BATCH_STEP_EXECUTION se ON sec.STEP_EXECUTION_ID = se.STEP_EXECUTION_ID
WHERE se.STEP_EXECUTION_ID = 1;

-- 輸出範例:
-- {"FlatFileItemReader.read.count":1500} 
-- 表示 Reader 已讀取 1500 筆,Restart 時會從這裡繼續

6. BATCH_JOB_EXECUTION_CONTEXT

用途: 儲存 Job 執行時的上下文資訊

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
    JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT
);

常用監控查詢

1. 查詢失敗的 Job

SELECT 
    ji.JOB_NAME,
    je.JOB_EXECUTION_ID,
    je.STATUS,
    je.START_TIME,
    je.EXIT_MESSAGE,
    jep.PARAMETER_VALUE as INPUT_FILE
FROM BATCH_JOB_EXECUTION je
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
LEFT JOIN BATCH_JOB_EXECUTION_PARAMS jep 
    ON je.JOB_EXECUTION_ID = jep.JOB_EXECUTION_ID 
    AND jep.PARAMETER_NAME = 'inputFile'
WHERE je.STATUS = 'FAILED'
ORDER BY je.START_TIME DESC;

2. 查詢有跳過資料的執行

SELECT 
    ji.JOB_NAME,
    se.STEP_NAME,
    se.READ_SKIP_COUNT + se.PROCESS_SKIP_COUNT + se.WRITE_SKIP_COUNT as TOTAL_SKIP,
    se.READ_COUNT,
    se.WRITE_COUNT,
    je.START_TIME
FROM BATCH_STEP_EXECUTION se
JOIN BATCH_JOB_EXECUTION je ON se.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE (se.READ_SKIP_COUNT + se.PROCESS_SKIP_COUNT + se.WRITE_SKIP_COUNT) > 0
ORDER BY je.START_TIME DESC;

3. 查詢執行效能

SELECT 
    ji.JOB_NAME,
    DATE(je.START_TIME) as EXECUTION_DATE,
    COUNT(*) as EXECUTION_COUNT,
    AVG(TIMESTAMPDIFF(SECOND, je.START_TIME, je.END_TIME)) as AVG_DURATION_SECONDS,
    MIN(TIMESTAMPDIFF(SECOND, je.START_TIME, je.END_TIME)) as MIN_DURATION,
    MAX(TIMESTAMPDIFF(SECOND, je.START_TIME, je.END_TIME)) as MAX_DURATION
FROM BATCH_JOB_EXECUTION je
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE je.STATUS = 'COMPLETED'
  AND je.START_TIME >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY ji.JOB_NAME, DATE(je.START_TIME)
ORDER BY EXECUTION_DATE DESC;

清理歷史資料

Metadata Tables 會持續累積資料,建議定期清理:

-- 刪除 30 天前的執行記錄
DELETE FROM BATCH_STEP_EXECUTION_CONTEXT 
WHERE STEP_EXECUTION_ID IN (
    SELECT STEP_EXECUTION_ID FROM BATCH_STEP_EXECUTION se
    JOIN BATCH_JOB_EXECUTION je ON se.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
    WHERE je.CREATE_TIME < DATE_SUB(NOW(), INTERVAL 30 DAY)
);

DELETE FROM BATCH_JOB_EXECUTION_CONTEXT 
WHERE JOB_EXECUTION_ID IN (
    SELECT JOB_EXECUTION_ID FROM BATCH_JOB_EXECUTION 
    WHERE CREATE_TIME < DATE_SUB(NOW(), INTERVAL 30 DAY)
);

DELETE FROM BATCH_STEP_EXECUTION 
WHERE JOB_EXECUTION_ID IN (
    SELECT JOB_EXECUTION_ID FROM BATCH_JOB_EXECUTION 
    WHERE CREATE_TIME < DATE_SUB(NOW(), INTERVAL 30 DAY)
);

DELETE FROM BATCH_JOB_EXECUTION_PARAMS 
WHERE JOB_EXECUTION_ID IN (
    SELECT JOB_EXECUTION_ID FROM BATCH_JOB_EXECUTION 
    WHERE CREATE_TIME < DATE_SUB(NOW(), INTERVAL 30 DAY)
);

DELETE FROM BATCH_JOB_EXECUTION 
WHERE CREATE_TIME < DATE_SUB(NOW(), INTERVAL 30 DAY);

-- 注意:BATCH_JOB_INSTANCE 通常不刪除,因為會影響 Restart 判斷

總結:Metadata Tables 建立建議

環境 推薦方式 理由
開發環境 spring.batch.jdbc.initialize-schema=always 快速開發,自動建立
測試環境 Flyway / Liquibase 與生產環境一致,可測試 Migration
生產環境 Flyway / Liquibase + 手動審核 版本控制、可追蹤、可回滾
POC / Demo spring.batch.jdbc.initialize-schema=embedded 使用 H2 嵌入式資料庫

最佳實踐: 1. ✅ 生產環境使用 Flyway 管理所有資料庫變更 2. ✅ 將官方 SQL 腳本納入版本控制 3. ✅ 定期備份 Metadata Tables(用於分析和審計) 4. ✅ 定期清理歷史資料(建議保留 30-90 天) 5. ✅ 建立監控儀表板,視覺化 Job 執行狀態

查詢執行狀態

SELECT * FROM BATCH_JOB_EXECUTION 
WHERE JOB_NAME = 'csvImportJob' 
ORDER BY START_TIME DESC;

日誌輸出

程式碼中使用 @Slf4j 和 Lombok,可記錄: - 跳過的無效記錄 - 處理進度 - 錯誤訊息

總結

核心優勢

  1. 記憶體安全: 固定記憶體用量,不受檔案大小影響
  2. 高效能: 批次寫入 + JDBC Batch
  3. 容錯性: Chunk 交易管理,失敗可重試
  4. 可維護: 清晰的 Reader-Processor-Writer 架構
  5. 可擴展: 支援多執行緒、重試、跳過等機制

適用場景

  • 大型 CSV 檔案匯入(百萬筆以上)
  • ETL(Extract-Transform-Load)資料處理
  • 資料遷移
  • 定期批次作業

不適用場景

  • 即時性要求高的作業(建議使用訊息佇列)
  • 小檔案(直接用一般 I/O 即可)
  • 複雜的多表關聯寫入(建議分多個 Step)

使用範例

1. 準備 CSV 檔案

column1,column2,column3
value1,value2,value3
value4,value5,value6
...

2. 準備資料庫表

CREATE TABLE your_table (
    column1 VARCHAR(255),
    column2 VARCHAR(255),
    column3 VARCHAR(255)
);

3. 執行 Job

@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job csvImportJob;

public void importCsv(String filePath) {
    JobParameters params = new JobParametersBuilder()
        .addString("inputFile", filePath)
        .addDate("date", new Date())
        .toJobParameters();

    jobLauncher.run(csvImportJob, params);
}

客製化建議

1. 修改資料模型

根據實際 CSV 結構調整 CsvRecord 類別:

@Data
public static class CsvRecord {
    private String id;
    private String name;
    private BigDecimal amount;
    private LocalDate date;
    // 根據實際需求增加欄位
}

2. 修改 SQL

根據目標資料表調整:

String sql = "INSERT INTO orders (order_id, customer_name, total_amount, order_date) " +
             "VALUES (:id, :name, :amount, :date)";

3. 增強驗證邏輯

return record -> {
    // 欄位驗證
    if (record.getAmount().compareTo(BigDecimal.ZERO) < 0) {
        log.warn("金額不得為負數: {}", record);
        return null;
    }

    // 資料轉換
    record.setName(record.getName().toUpperCase());

    return record;
};

最後更新: 2026-01-12