LargeCsvImportJob - Spring Batch 大型 CSV 檔案匯入作業說明
概述
這是一個基於 Spring Batch 框架的批次處理作業,專門用於處理大型 CSV 檔案的匯入任務。採用串流式讀取和批次寫入的方式,確保記憶體使用量固定在 chunk size 大小,有效避免因大檔案導致的 OOM (Out of Memory) 問題。
Spring Batch 核心架構
1. Job(作業)
- 名稱:
csvImportJob - 功能: 批次處理的最上層單位,代表一個完整的批次作業流程
- 組成: 由一個或多個 Step 組成
- 本案例: 包含一個 Step (
csvImportStep)
2. Step(步驟)
- 名稱:
csvImportStep - 類型: Chunk-oriented Step(區塊導向步驟)
- 功能: 實際執行資料處理的單位
- 流程: Reader → Processor → Writer
三大核心元件
1. ItemReader(讀取器)- csvReader
功能說明
負責從 CSV 檔案中**串流式讀取**資料,一次只讀取一筆記錄到記憶體中。
實作細節
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
@Value("#{jobParameters['inputFile']}") String inputFile)
關鍵特性
- @StepScope: 使 Bean 在每次 Step 執行時才建立,允許使用 Job Parameters
- 動態參數: 透過
#{jobParameters['inputFile']}在執行時動態傳入檔案路徑 - 串流讀取: 使用
FlatFileItemReader,逐行讀取,不將整個檔案載入記憶體 - 欄位對應: 透過
.names("column1", "column2", "column3")定義 CSV 欄位 - 跳過標題:
.linesToSkip(1)自動跳過第一行標題 - 自動轉換: 使用
BeanWrapperFieldSetMapper將 CSV 欄位自動對應到CsvRecord物件
運作流程
CSV 檔案 → 逐行讀取 → 解析欄位 → 轉換為 CsvRecord 物件 → 傳給 Processor
支援的資料來源類型
Spring Batch 的 Reader 支援多種資料來源,不限於本地檔案系統:
1. FileSystemResource - 本地檔案系統(預設)
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
@Value("#{jobParameters['inputFile']}") String inputFile) {
return new FlatFileItemReaderBuilder<CsvRecord>()
.name("csvReader")
.resource(new FileSystemResource(inputFile)) // 本地檔案
.delimited()
.names("column1", "column2", "column3")
.linesToSkip(1)
.fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
setTargetType(CsvRecord.class);
}})
.build();
}
適用情境: - ✅ 檔案已經在本地磁碟 - ✅ 小型檔案(幾百 MB 以內) - ✅ 無需擔心網路問題
限制: - ❌ 需要先下載大檔案到本地 - ❌ 佔用本地磁碟空間 - ❌ 無法直接讀取雲端儲存
2. UrlResource - 支援 HTTP/HTTPS
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
@Value("#{jobParameters['inputUrl']}") String inputUrl) {
return new FlatFileItemReaderBuilder<CsvRecord>()
.name("csvReader")
.resource(new UrlResource(inputUrl)) // HTTP/HTTPS URL
.delimited()
.names("column1", "column2", "column3")
.linesToSkip(1)
.fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
setTargetType(CsvRecord.class);
}})
.build();
}
適用情境: - ✅ 檔案託管在 Web Server - ✅ 臨時的公開連結
限制: - ❌ 不支援斷點續傳 - ❌ 網路中斷會導致整個 Chunk 失敗 - ❌ 需要持續的網路連線
3. Azure Blob Storage(推薦用於雲端場景)
需要加入 Azure Storage 依賴:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.25.0</version>
</dependency>
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-storage-blob</artifactId>
<version>5.8.0</version>
</dependency>
方式 1: 使用 Azure Spring Cloud 的 Resource Loader(推薦)
@Configuration
@RequiredArgsConstructor
public class LargeCsvImportJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private final ResourceLoader resourceLoader; // Spring 自動注入
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
@Value("#{jobParameters['inputFile']}") String blobUrl) {
// blobUrl 格式: azure-blob://container-name/path/to/file.csv
Resource resource = resourceLoader.getResource(blobUrl);
return new FlatFileItemReaderBuilder<CsvRecord>()
.name("csvReader")
.resource(resource) // 支援 Azure Blob
.delimited()
.names("column1", "column2", "column3")
.linesToSkip(1)
.fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
setTargetType(CsvRecord.class);
}})
.build();
}
}
application.properties 配置:
# Azure Storage 連線設定
spring.cloud.azure.storage.blob.account-name=your-storage-account
spring.cloud.azure.storage.blob.account-key=your-account-key
# 或使用 Managed Identity
spring.cloud.azure.storage.blob.credential.managed-identity-enabled=true
執行時傳入參數:
JobParameters params = new JobParametersBuilder()
.addString("inputFile", "azure-blob://my-container/data/large-file.csv")
.addDate("date", new Date())
.toJobParameters();
方式 2: 自訂 Azure Blob Resource
@Component
public class AzureBlobResource extends AbstractResource {
private final BlobClient blobClient;
private final String blobUrl;
public AzureBlobResource(String connectionString, String containerName, String blobName) {
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
.connectionString(connectionString)
.buildClient();
this.blobClient = blobServiceClient
.getBlobContainerClient(containerName)
.getBlobClient(blobName);
this.blobUrl = blobClient.getBlobUrl();
}
@Override
public InputStream getInputStream() throws IOException {
return blobClient.openInputStream();
}
@Override
public String getDescription() {
return "Azure Blob Storage resource [" + blobUrl + "]";
}
@Override
public long contentLength() throws IOException {
return blobClient.getProperties().getBlobSize();
}
@Override
public boolean exists() {
return blobClient.exists();
}
}
使用方式:
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
@Value("#{jobParameters['containerName']}") String containerName,
@Value("#{jobParameters['blobName']}") String blobName,
@Value("${azure.storage.connection-string}") String connectionString) {
Resource resource = new AzureBlobResource(connectionString, containerName, blobName);
return new FlatFileItemReaderBuilder<CsvRecord>()
.name("csvReader")
.resource(resource)
.delimited()
.names("column1", "column2", "column3")
.linesToSkip(1)
.fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
setTargetType(CsvRecord.class);
}})
.build();
}
4. AWS S3(適用於 AWS 環境)
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.565</version>
</dependency>
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
@Value("#{jobParameters['s3Bucket']}") String bucket,
@Value("#{jobParameters['s3Key']}") String key,
AmazonS3 s3Client) {
S3Object s3Object = s3Client.getObject(bucket, key);
InputStreamResource resource = new InputStreamResource(s3Object.getObjectContent());
return new FlatFileItemReaderBuilder<CsvRecord>()
.name("csvReader")
.resource(resource)
.delimited()
.names("column1", "column2", "column3")
.linesToSkip(1)
.fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
setTargetType(CsvRecord.class);
}})
.build();
}
網路異常處理機制
當使用雲端儲存(Blob、S3、URL)時,網路中途異常的處理策略:
問題分析
情境:正在讀取 Azure Blob 上的 100 萬筆 CSV
Chunk 1 (0-999): ✅ 讀取成功,寫入資料庫
Chunk 2 (1000-1999): 讀取中... ❌ 網路中斷!
問題:
1. Reader 無法繼續讀取資料
2. 當前 Chunk 會失敗並回滾
3. Job 執行狀態變為 FAILED
解決方案 1: Reader 層級重試(推薦)
在 Reader 建立時啟用重試機制:
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
@Value("#{jobParameters['inputFile']}") String blobUrl,
ResourceLoader resourceLoader) {
// 使用重試模板包裝 Resource 讀取
RetryTemplate retryTemplate = RetryTemplate.builder()
.maxAttempts(5) // 最多重試 5 次
.exponentialBackoff(1000, 2, 30000) // 1秒起始,每次×2,最長30秒
.retryOn(IOException.class) // 網路異常時重試
.retryOn(BlobStorageException.class) // Blob 存取異常時重試
.build();
Resource resource = retryTemplate.execute(context ->
resourceLoader.getResource(blobUrl)
);
return new FlatFileItemReaderBuilder<CsvRecord>()
.name("csvReader")
.resource(resource)
.delimited()
.names("column1", "column2", "column3")
.linesToSkip(1)
.fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
setTargetType(CsvRecord.class);
}})
.build();
}
解決方案 2: Step 層級重試 + Reader 異常處理
@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
JdbcBatchItemWriter<CsvRecord> csvWriter) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
// 容錯設定
.faultTolerant()
// Reader 讀取失敗時重試
.retryLimit(5)
.retry(IOException.class) // 網路 I/O 異常
.retry(BlobStorageException.class) // Azure Blob 異常
.retry(AmazonS3Exception.class) // AWS S3 異常
.retry(SocketTimeoutException.class) // Socket 逾時
.backOffPolicy(exponentialBackOffPolicy()) // 指數退避
// 重試仍失敗後的處理
.skipLimit(10) // 最多跳過 10 筆
.skip(FlatFileParseException.class) // 解析錯誤可跳過
// 致命錯誤不重試,直接失敗
.noRetry(OutOfMemoryError.class)
.noRetry(NonTransientDataAccessException.class)
.listener(retryListener())
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(2000); // 網路問題初始等待 2 秒
policy.setMaxInterval(60000); // 最長等待 60 秒
policy.setMultiplier(3.0); // 每次等待時間 × 3
return policy;
}
@Component
public static class NetworkRetryListener implements RetryListener {
@Override
public <T, E extends Throwable> void onError(
RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
int retryCount = context.getRetryCount();
log.warn("網路讀取失敗,第 {} 次重試: {}", retryCount, throwable.getMessage());
// 重試次數過多時發送告警
if (retryCount >= 3) {
alertService.sendAlert(
String.format("CSV Reader retry count: %d, error: %s",
retryCount, throwable.getMessage())
);
}
}
}
解決方案 3: 使用 Azure Blob 的斷點續傳功能
Azure Blob Storage SDK 支援斷點續傳,可以從中斷點繼續讀取:
@Component
public class ResumableAzureBlobResource extends AbstractResource {
private final BlobClient blobClient;
private final String blobUrl;
private long currentPosition = 0; // 記錄當前讀取位置
public ResumableAzureBlobResource(BlobClient blobClient) {
this.blobClient = blobClient;
this.blobUrl = blobClient.getBlobUrl();
}
@Override
public InputStream getInputStream() throws IOException {
try {
// 支援斷點續傳的 InputStream
BlobInputStream blobInputStream = blobClient.openInputStream();
// 如果有記錄位置,跳到該位置
if (currentPosition > 0) {
blobInputStream.skip(currentPosition);
}
return new MonitoredInputStream(blobInputStream, this);
} catch (BlobStorageException e) {
// 網路異常時重試
log.warn("Blob 讀取失敗,準備重試: {}", e.getMessage());
throw new IOException("Failed to open blob stream", e);
}
}
public void updatePosition(long position) {
this.currentPosition = position;
}
// 包裝 InputStream 以記錄讀取位置
private static class MonitoredInputStream extends FilterInputStream {
private final ResumableAzureBlobResource resource;
private long bytesRead = 0;
public MonitoredInputStream(InputStream in, ResumableAzureBlobResource resource) {
super(in);
this.resource = resource;
}
@Override
public int read() throws IOException {
int data = super.read();
if (data != -1) {
bytesRead++;
resource.updatePosition(bytesRead);
}
return data;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
int count = super.read(b, off, len);
if (count > 0) {
bytesRead += count;
resource.updatePosition(bytesRead);
}
return count;
}
}
}
解決方案 4: Job 層級的 Restart 機制
利用 Spring Batch 內建的 Restart 功能:
@Bean
public Job csvImportJob(Step csvImportStep) {
return jobBuilderFactory.get("csvImportJob")
.start(csvImportStep)
.incrementer(new RunIdIncrementer()) // 允許同樣參數重新執行
.build();
}
@Bean
public Step csvImportStep(...) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
.allowStartIfComplete(true) // 允許已完成的 Step 重新執行
.build();
}
重新執行失敗的 Job:
@Service
public class BatchJobService {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job csvImportJob;
@Autowired
private JobExplorer jobExplorer;
public void restartFailedJob(Long jobExecutionId) throws Exception {
// 取得失敗的 JobExecution
JobExecution failedExecution = jobExplorer.getJobExecution(jobExecutionId);
if (failedExecution != null && failedExecution.getStatus() == BatchStatus.FAILED) {
// 使用原始參數重新執行
JobParameters originalParams = failedExecution.getJobParameters();
log.info("重新執行失敗的 Job: {}", jobExecutionId);
jobLauncher.run(csvImportJob, originalParams);
// Spring Batch 會自動從失敗的 Chunk 繼續執行
}
}
}
運作機制:
第一次執行(網路中斷):
Chunk 1 (0-999): ✅ 成功,Spring Batch 記錄進度
Chunk 2 (1000-1999): ✅ 成功,Spring Batch 記錄進度
Chunk 3 (2000-2999): ❌ 網路中斷失敗
Job 狀態: FAILED
重新執行:
Chunk 1 (0-999): ⏭️ 偵測到已完成,跳過
Chunk 2 (1000-1999): ⏭️ 偵測到已完成,跳過
Chunk 3 (2000-2999): ✅ 從失敗點重新執行
Chunk 4 (3000-3999): ✅ 繼續執行
...
Job 狀態: COMPLETED
不同資料來源的比較
| 資料來源 | 網路依賴 | 斷點續傳 | 記憶體用量 | 適用場景 |
|---|---|---|---|---|
| FileSystemResource | 無 | N/A | 低 | 本地檔案、小檔案 |
| UrlResource | 高 | ❌ | 低 | 臨時下載、小檔案 |
| Azure Blob (Spring Cloud) | 中 | ✅ | 低 | 生產環境、大檔案(推薦) |
| Azure Blob (自訂) | 中 | ✅ | 低 | 需要客製化功能 |
| AWS S3 | 中 | ✅ | 低 | AWS 環境 |
網路異常處理最佳實踐
@Configuration
@RequiredArgsConstructor
public class RobustCsvImportJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private final ResourceLoader resourceLoader;
@Bean
@StepScope
public FlatFileItemReader<CsvRecord> csvReader(
@Value("#{jobParameters['inputFile']}") String blobUrl) {
Resource resource = resourceLoader.getResource(blobUrl);
return new FlatFileItemReaderBuilder<CsvRecord>()
.name("csvReader")
.resource(resource)
.delimited()
.names("column1", "column2", "column3")
.linesToSkip(1)
.fieldSetMapper(new BeanWrapperFieldSetMapper<CsvRecord>() {{
setTargetType(CsvRecord.class);
}})
.saveState(true) // ✅ 重要!儲存讀取進度
.build();
}
@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
JdbcBatchItemWriter<CsvRecord> csvWriter) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
// ✅ 網路異常處理
.faultTolerant()
.retryLimit(5)
.retry(IOException.class)
.retry(BlobStorageException.class)
.retry(SocketTimeoutException.class)
.backOffPolicy(networkBackOffPolicy())
// ✅ 監控與告警
.listener(networkRetryListener())
.build();
}
@Bean
public BackOffPolicy networkBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(2000); // 網路問題初始等待長一點
policy.setMaxInterval(60000); // 最長等待 1 分鐘
policy.setMultiplier(3.0); // 快速增加等待時間
return policy;
}
@Bean
public Job csvImportJob(Step csvImportStep) {
return jobBuilderFactory.get("csvImportJob")
.start(csvImportStep)
.incrementer(new RunIdIncrementer()) // ✅ 允許 Restart
.build();
}
}
application.properties 設定:
# Azure Blob 連線設定
spring.cloud.azure.storage.blob.account-name=${AZURE_STORAGE_ACCOUNT}
spring.cloud.azure.storage.blob.credential.managed-identity-enabled=true
# 連線逾時設定
spring.cloud.azure.storage.blob.retry.max-retries=5
spring.cloud.azure.storage.blob.retry.retry-delay=2s
spring.cloud.azure.storage.blob.retry.max-retry-delay=60s
# Spring Batch 設定
spring.batch.job.enabled=false # 不自動執行
spring.batch.jdbc.initialize-schema=always # 確保 Metadata Tables 存在
總結:雲端檔案讀取建議
生產環境推薦配置:
- ✅ 使用 Azure Blob Storage + Spring Cloud Azure
- 自動處理驗證和重試
- 支援 Managed Identity
-
與 Spring Batch 無縫整合
-
✅ 啟用 Step 層級的 Retry
- 處理暫時性網路異常
- 指數退避策略
-
最多重試 5 次
-
✅ 啟用 Reader 的 saveState
- 記錄讀取進度
-
支援 Job Restart
-
✅ 設定合理的 Chunk Size
- 網路不穩時建議 500-1000
-
避免單一 Chunk 執行時間過長
-
✅ 監控與告警
- 記錄重試次數
- 重試超過閾值時發送告警
- 追蹤網路異常頻率
2. ItemProcessor(處理器)- csvProcessor
功能說明
對讀取到的每一筆資料進行**驗證和轉換**。
實作細節
@Bean
public ItemProcessor<CsvRecord, CsvRecord> csvProcessor() {
return record -> {
// 資料驗證
if (record.getColumn1() == null || record.getColumn1().trim().isEmpty()) {
log.warn("跳過無效記錄: {}", record);
return null; // 返回 null 會跳過此筆資料
}
// 資料轉換
record.setColumn1(record.getColumn1().trim());
return record;
};
}
關鍵特性
- 資料驗證: 檢查必填欄位是否為空
- 過濾機制: 返回
null可跳過不符合條件的資料 - 資料清理: 去除前後空白字元
- 彈性處理: 可加入任何業務邏輯(資料轉換、格式化、計算等)
運作流程
CsvRecord → 驗證 → 轉換 → 返回處理後的 CsvRecord(或 null 跳過)
3. ItemWriter(寫入器)- csvWriter
功能說明
將處理後的資料**批次寫入資料庫**。
實作細節
@Bean
public JdbcBatchItemWriter<CsvRecord> csvWriter() {
String sql = "INSERT INTO your_table (column1, column2, column3) " +
"VALUES (:column1, :column2, :column3)";
return new JdbcBatchItemWriterBuilder<CsvRecord>()
.dataSource(dataSource)
.sql(sql)
.itemSqlParameterSourceProvider(record -> {
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("column1", record.getColumn1());
params.addValue("column2", record.getColumn2());
params.addValue("column3", record.getColumn3());
return params;
})
.build();
}
關鍵特性
- 批次寫入: 不是一筆一筆寫入,而是累積到 chunk size 後批次寫入
- JDBC Batch: 使用 JDBC 的 batch 功能,大幅提升寫入效能
- 參數化查詢: 使用 Named Parameter 防止 SQL Injection
- 自動對應: 透過
itemSqlParameterSourceProvider將物件欄位對應到 SQL 參數
Chunk 處理機制(核心重點)
Chunk 定義
.<CsvRecord, CsvRecord>chunk(1000)
運作原理
記憶體控制
- chunk(1000): 每次在記憶體中最多保留 1000 筆資料
- 固定記憶體: 無論檔案多大,記憶體使用量固定
執行流程
1. Reader 讀取 1 筆資料 → Processor 處理 → 暫存在記憶體
2. Reader 讀取第 2 筆 → Processor 處理 → 暫存在記憶體
3. ...
4. Reader 讀取第 1000 筆 → Processor 處理 → 暫存在記憶體
5. Writer 批次寫入這 1000 筆到資料庫
6. 提交交易(Commit)
7. 清空記憶體
8. 重複步驟 1-7,直到檔案讀取完畢
Chunk Size 調校建議
| Chunk Size | 適用情境 | 優點 | 缺點 |
|---|---|---|---|
| 100-500 | 資料結構複雜、驗證邏輯複雜 | 記憶體用量小、錯誤定位容易 | 提交次數多、效能較慢 |
| 1000-2000 | 一般情況(推薦) | 效能與記憶體平衡 | 適中 |
| 5000+ | 資料結構簡單、高效能需求 | 寫入效能高 | 記憶體用量大、錯誤回滾範圍大 |
完整執行流程圖
啟動 Job: csvImportJob
↓
執行 Step: csvImportStep
↓
┌─────────────────────────────────────────────┐
│ Chunk 1 (0-999 筆) │
│ ┌──────────────────────────────────────┐ │
│ │ Reader: 讀取第 1 筆 CSV │ │
│ │ ↓ │ │
│ │ Processor: 驗證 + 轉換 │ │
│ │ ↓ │ │
│ │ 暫存到 Chunk 記憶體 │ │
│ └──────────────────────────────────────┘ │
│ ... │
│ ┌──────────────────────────────────────┐ │
│ │ Reader: 讀取第 1000 筆 CSV │ │
│ │ ↓ │ │
│ │ Processor: 驗證 + 轉換 │ │
│ │ ↓ │ │
│ │ 暫存到 Chunk 記憶體 │ │
│ └──────────────────────────────────────┘ │
│ ↓ │
│ Writer: 批次寫入 1000 筆到資料庫 │
│ ↓ │
│ Transaction Commit │
│ ↓ │
│ 清空 Chunk 記憶體 │
└─────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ Chunk 2 (1000-1999 筆) │
│ ... │
└─────────────────────────────────────────────┘
↓
重複直到檔案結束
↓
Job 完成
交易管理
交易邊界
- 每個 Chunk 是一個交易單位
- Chunk 成功 → 提交(Commit)
- Chunk 失敗 → 回滾(Rollback)該 Chunk 的所有資料
範例
假設檔案有 2500 筆資料,chunk size = 1000
Chunk 1 (0-999): 成功 → Commit → 資料庫有 1000 筆
Chunk 2 (1000-1999): 成功 → Commit → 資料庫有 2000 筆
Chunk 3 (2000-2499): 失敗 → Rollback → 資料庫仍有 2000 筆
最終結果: 資料庫有 2000 筆資料,第 3 個 Chunk 需要修正後重新執行
記憶體優化原理
傳統做法(不佳)
// ❌ 將整個檔案讀到記憶體
List<CsvRecord> allRecords = readAllFromCsv(); // 假設 100 萬筆
// 記憶體用量: 100 萬筆 × 每筆大小 = 可能數 GB
for (CsvRecord record : allRecords) {
process(record);
saveToDatabase(record);
}
Spring Batch 做法(優化)
// ✅ 串流式讀取 + Chunk 處理
// 記憶體用量: 1000 筆 × 每筆大小 = 可能數 MB
chunk(1000)
.reader(csvReader) // 一次讀 1 筆
.processor(processor) // 處理 1 筆
.writer(writer) // 累積 1000 筆才批次寫入
記憶體用量比較
| 資料量 | 傳統做法記憶體 | Spring Batch 記憶體 | 節省比例 |
|---|---|---|---|
| 1 萬筆 | 10 MB | ~1 MB | 90% |
| 10 萬筆 | 100 MB | ~1 MB | 99% |
| 100 萬筆 | 1 GB | ~1 MB | 99.9% |
| 1000 萬筆 | 10 GB | ~1 MB | 99.99% |
Job Parameters(作業參數)
用途
在執行 Job 時動態傳入參數,例如檔案路徑。
使用方式
@Value("#{jobParameters['inputFile']}")
執行範例
JobParameters jobParameters = new JobParametersBuilder()
.addString("inputFile", "/path/to/large-file.csv")
.addDate("startDate", new Date())
.toJobParameters();
jobLauncher.run(csvImportJob, jobParameters);
或透過 Command Line:
java -jar app.jar --job.name=csvImportJob inputFile=/path/to/large-file.csv
Chunk Exception 處理機制
預設行為:整個 Chunk 回滾
當一個 Chunk 在處理過程中發生異常,Spring Batch 預設會回滾整個 Chunk 的交易。
範例情境
假設 chunk size = 1000
Chunk 1 (0-999 筆):
- Reader 讀取第 1 筆 → 成功
- Processor 處理第 1 筆 → 成功
- ...
- Reader 讀取第 500 筆 → 成功
- Processor 處理第 500 筆 → ❌ 拋出 Exception
結果:
✅ Transaction Rollback
✅ 這 500 筆資料都不會寫入資料庫
❌ Job 執行失敗,狀態變為 FAILED
問題分析
1. 一筆資料錯誤導致整批失敗
1000 筆資料中,只有第 500 筆有問題
→ 整批 1000 筆都無法寫入
→ 需要修正資料後重新執行
2. 記憶體中的資料無法恢復
異常發生時:
- 前 499 筆已經處理完畢(在記憶體中)
- 第 500 筆處理失敗
→ 整個 Chunk 回滾
→ 前 499 筆的處理結果也會消失
3. 重新執行成本高
修正第 500 筆資料後重新執行:
→ 前 499 筆會重新讀取、處理、寫入
→ 浪費計算資源
三種異常處理策略
策略 1: Restart(重啟)- 預設行為
適用情境
- 資料錯誤可以被修正(例如:格式錯誤、值超出範圍)
- 可以重新執行整個 Job
- 資料量不大,重新執行成本可接受
運作流程
第一次執行:
Chunk 1 (0-999): ✅ 成功,資料庫有 1000 筆
Chunk 2 (1000-1999): ❌ 第 1500 筆失敗,回滾
Job 狀態: FAILED
修正資料後,重新執行:
Chunk 1 (0-999): ⚠️ 已存在,Spring Batch 會偵測並跳過
Chunk 2 (1000-1999): ✅ 成功,資料庫有 2000 筆
Chunk 3 (2000-2999): ✅ 成功,資料庫有 3000 筆
Job 狀態: COMPLETED
設定方式
@Bean
public Step csvImportStep(...) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
// 預設就是 Restart 模式,不需額外設定
.build();
}
優點
- ✅ 資料一致性最高
- ✅ 實作簡單
- ✅ Spring Batch 會記錄進度,重啟時可以從失敗的 Chunk 繼續
缺點
- ❌ 需要人工介入修正資料
- ❌ Job 會中斷,需要重新啟動
- ❌ 一筆錯誤影響整批資料
策略 2: Skip(跳過)- 容錯處理
適用情境
- 少數資料錯誤可以接受(例如:日誌匯入、非關鍵資料)
- 資料錯誤無法即時修正
- 希望 Job 不中斷完整執行
運作流程
Chunk 1 (0-999 筆):
- 第 1-499 筆 → ✅ 處理成功
- 第 500 筆 → ❌ Processor 拋出 ValidationException
- ⚠️ Spring Batch 跳過第 500 筆
- 第 501-999 筆 → ✅ 繼續處理
結果:
✅ 999 筆寫入資料庫(跳過第 500 筆)
✅ Job 繼續執行,不會失敗
⚠️ 跳過的資料會被記錄在 Metadata Tables
設定方式
@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
JdbcBatchItemWriter<CsvRecord> csvWriter) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
.faultTolerant() // 啟用容錯機制
.skipLimit(100) // 最多允許跳過 100 筆
.skip(ValidationException.class) // 指定可跳過的異常類型
.skip(DataFormatException.class) // 可指定多個
.noSkip(FatalException.class) // 致命錯誤不可跳過
.build();
}
進階配置:自訂 Skip 策略
@Bean
public Step csvImportStep(...) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
.faultTolerant()
.skipPolicy(customSkipPolicy()) // 使用自訂策略
.build();
}
@Bean
public SkipPolicy customSkipPolicy() {
return (throwable, skipCount) -> {
// 自訂邏輯:根據異常類型和已跳過次數決定是否跳過
if (throwable instanceof ValidationException) {
return skipCount < 100; // ValidationException 最多跳過 100 次
}
if (throwable instanceof SQLException) {
return false; // SQL 異常不可跳過
}
return skipCount < 50; // 其他異常最多跳過 50 次
};
}
Skip Listener(監聽跳過的資料)
@Bean
public Step csvImportStep(...) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
.faultTolerant()
.skipLimit(100)
.skip(ValidationException.class)
.listener(skipListener()) // 註冊監聽器
.build();
}
@Component
public class CustomSkipListener implements SkipListener<CsvRecord, CsvRecord> {
@Override
public void onSkipInRead(Throwable t) {
log.error("跳過讀取資料時發生錯誤: {}", t.getMessage());
}
@Override
public void onSkipInProcess(CsvRecord item, Throwable t) {
log.error("跳過處理資料: {}, 原因: {}", item, t.getMessage());
// 可以將跳過的資料寫入錯誤檔案或錯誤表
saveToErrorTable(item, t.getMessage());
}
@Override
public void onSkipInWrite(CsvRecord item, Throwable t) {
log.error("跳過寫入資料: {}, 原因: {}", item, t.getMessage());
}
private void saveToErrorTable(CsvRecord item, String errorMessage) {
// 將錯誤資料寫入專門的錯誤表,供後續人工處理
jdbcTemplate.update(
"INSERT INTO error_records (data, error_msg, create_time) VALUES (?, ?, ?)",
item.toString(), errorMessage, LocalDateTime.now()
);
}
}
優點
- ✅ Job 不會因少數錯誤而中斷
- ✅ 提高批次作業的完整性
- ✅ 可以記錄跳過的資料供後續處理
缺點
- ❌ 部分資料未匯入,需要後續處理
- ❌ 如果跳過次數過多,可能資料品質有問題
- ❌ 需要額外的錯誤資料追蹤機制
最佳實踐
// 1. 建立錯誤資料表
CREATE TABLE error_records (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
job_execution_id BIGINT,
step_name VARCHAR(100),
item_data TEXT,
error_message TEXT,
error_type VARCHAR(100),
create_time TIMESTAMP
);
// 2. 在 SkipListener 中詳細記錄
@Override
public void onSkipInProcess(CsvRecord item, Throwable t) {
ErrorRecord error = new ErrorRecord();
error.setJobExecutionId(stepExecution.getJobExecutionId());
error.setStepName(stepExecution.getStepName());
error.setItemData(objectMapper.writeValueAsString(item));
error.setErrorMessage(t.getMessage());
error.setErrorType(t.getClass().getSimpleName());
error.setCreateTime(LocalDateTime.now());
errorRecordRepository.save(error);
// 發送告警(如果跳過次數過多)
if (skipCount > 50) {
alertService.sendAlert("Skip count exceeded 50 in job: " + jobName);
}
}
策略 3: Retry(重試)- 暫時性錯誤
適用情境
- 暫時性錯誤(例如:資料庫連線逾時、網路暫時中斷、Deadlock)
- 錯誤可能自行恢復
- 重試可以解決問題
運作流程
Chunk 1 (0-999 筆):
- 第 1-499 筆 → ✅ 處理成功
- 第 500 筆 → ❌ Writer 拋出 DeadlockLoserDataAccessException
- ⚠️ Spring Batch 重試第 500 筆(第 1 次)
- 第 500 筆 → ❌ 仍然失敗
- ⚠️ Spring Batch 重試第 500 筆(第 2 次)
- 第 500 筆 → ✅ 成功!
- 第 501-999 筆 → ✅ 繼續處理
結果:
✅ 1000 筆全部寫入資料庫
✅ Job 繼續執行
⚠️ 第 500 筆實際處理了 3 次(1 次原始 + 2 次重試)
設定方式
@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
JdbcBatchItemWriter<CsvRecord> csvWriter) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
.faultTolerant() // 啟用容錯機制
.retryLimit(3) // 最多重試 3 次
.retry(DeadlockLoserDataAccessException.class) // 死鎖時重試
.retry(OptimisticLockingFailureException.class) // 樂觀鎖失敗時重試
.retry(TransientDataAccessException.class) // 暫時性資料庫異常
.noRetry(SQLException.class) // SQL 異常不重試
.build();
}
進階配置:設定重試間隔
@Bean
public Step csvImportStep(...) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
.faultTolerant()
.retryPolicy(customRetryPolicy()) // 使用自訂重試策略
.build();
}
@Bean
public RetryPolicy customRetryPolicy() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
// 可以針對不同異常設定不同的重試次數
Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
retryableExceptions.put(DeadlockLoserDataAccessException.class, true);
retryableExceptions.put(OptimisticLockingFailureException.class, true);
return retryPolicy;
}
@Bean
public BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000); // 初始等待 1 秒
backOffPolicy.setMaxInterval(10000); // 最長等待 10 秒
backOffPolicy.setMultiplier(2.0); // 每次等待時間翻倍
return backOffPolicy;
}
Retry 流程範例
第 500 筆資料處理失敗(Deadlock):
嘗試 1: ❌ 失敗 → 等待 1 秒
嘗試 2: ❌ 失敗 → 等待 2 秒
嘗試 3: ✅ 成功!
如果 3 次都失敗:
→ 根據設定決定是 Skip 或 Fail
優點
- ✅ 自動處理暫時性錯誤
- ✅ 無需人工介入
- ✅ 提高 Job 穩定性
缺點
- ❌ 增加執行時間(重試需要時間)
- ❌ 如果不是暫時性錯誤,重試無效
- ❌ 需要確保 Processor 和 Writer 是冪等的(多次執行結果相同)
策略組合:Skip + Retry
在實務上,通常會**組合使用 Skip 和 Retry**:
@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
JdbcBatchItemWriter<CsvRecord> csvWriter) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
.faultTolerant()
// Retry 設定:處理暫時性錯誤
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.retry(OptimisticLockingFailureException.class)
.backOffPolicy(exponentialBackOffPolicy())
// Skip 設定:重試失敗後跳過
.skipLimit(100)
.skip(ValidationException.class) // 驗證錯誤直接跳過
.skip(DataFormatException.class) // 格式錯誤直接跳過
.skipPolicy((throwable, skipCount) -> {
// 重試 3 次後仍失敗的 Deadlock,也可以跳過
if (throwable instanceof DeadlockLoserDataAccessException) {
return skipCount < 10; // 最多跳過 10 個 Deadlock
}
return false;
})
// 監聽器:記錄所有跳過和重試的資料
.listener(skipListener())
.listener(retryListener())
.build();
}
執行邏輯
第 500 筆資料處理流程:
1. 第一次處理 → ❌ DeadlockLoserDataAccessException
2. 符合 Retry 條件 → 等待 1 秒後重試
3. 第二次處理 → ❌ 仍然 Deadlock
4. 繼續重試 → 等待 2 秒後重試
5. 第三次處理 → ❌ 仍然 Deadlock
6. Retry 次數用盡 → 檢查 Skip 條件
7. 符合 Skip 條件 → ⚠️ 跳過此筆資料
8. SkipListener 記錄錯誤 → 寫入 error_records 表
9. 繼續處理第 501 筆
不同場景的建議策略
| 場景 | 推薦策略 | 設定重點 |
|---|---|---|
| 關鍵業務資料(如訂單) | Restart | 不容許遺漏,錯誤必須修正 |
| 日誌匯入、統計資料 | Skip | 少量錯誤可接受,記錄供後續檢視 |
| 大量資料且網路不穩 | Retry | 設定合理的重試次數和間隔 |
| 一般資料匯入 | Retry + Skip | 先重試暫時性錯誤,重試失敗後跳過並記錄 |
| ETL 資料遷移 | Retry + Skip + 人工審核 | 組合策略 + 錯誤資料匯出供人工處理 |
完整範例:生產環境配置
@Slf4j
@Configuration
@RequiredArgsConstructor
public class LargeCsvImportJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
JdbcBatchItemWriter<CsvRecord> csvWriter,
SkipListener<CsvRecord, CsvRecord> skipListener) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
// 容錯配置
.faultTolerant()
// Retry:暫時性錯誤重試 3 次
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.retry(OptimisticLockingFailureException.class)
.retry(QueryTimeoutException.class)
.backOffPolicy(exponentialBackOffPolicy())
// Skip:資料錯誤最多跳過 100 筆
.skipLimit(100)
.skip(ValidationException.class)
.skip(DataFormatException.class)
.skip(ConstraintViolationException.class)
// 致命錯誤不跳過,直接失敗
.noSkip(OutOfMemoryError.class)
.noSkip(SQLException.class)
// 監聽器
.listener(skipListener)
.build();
}
@Bean
public BackOffPolicy exponentialBackOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000); // 第一次重試等待 1 秒
policy.setMaxInterval(10000); // 最長等待 10 秒
policy.setMultiplier(2.0); // 每次等待時間 × 2
return policy;
}
@Component
public static class CustomSkipListener implements SkipListener<CsvRecord, CsvRecord> {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private AlertService alertService;
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public void onSkipInProcess(CsvRecord item, Throwable t) {
int count = skipCount.incrementAndGet();
log.error("跳過資料 [{}]: {}, 錯誤: {}", count, item, t.getMessage());
// 記錄到錯誤表
jdbcTemplate.update(
"INSERT INTO error_records (item_data, error_msg, error_type, create_time) VALUES (?, ?, ?, ?)",
item.toString(),
t.getMessage(),
t.getClass().getSimpleName(),
LocalDateTime.now()
);
// 跳過次數過多時發送告警
if (count > 50) {
alertService.sendAlert("CSV Import: Skip count exceeded 50!");
}
}
@Override
public void onSkipInWrite(CsvRecord item, Throwable t) {
log.error("跳過寫入資料: {}, 錯誤: {}", item, t.getMessage());
}
@Override
public void onSkipInRead(Throwable t) {
log.error("跳過讀取資料,錯誤: {}", t.getMessage());
}
}
@Bean
public Job csvImportJob(Step csvImportStep) {
return jobBuilderFactory.get("csvImportJob")
.start(csvImportStep)
.build();
}
}
監控與告警
1. 查詢執行狀態
-- 查詢 Job 執行狀態
SELECT
je.JOB_EXECUTION_ID,
je.STATUS,
je.START_TIME,
je.END_TIME,
se.STEP_NAME,
se.READ_COUNT,
se.WRITE_COUNT,
se.READ_SKIP_COUNT, -- 讀取時跳過的筆數
se.PROCESS_SKIP_COUNT, -- 處理時跳過的筆數
se.WRITE_SKIP_COUNT, -- 寫入時跳過的筆數
se.ROLLBACK_COUNT -- 回滾次數
FROM BATCH_JOB_EXECUTION je
JOIN BATCH_STEP_EXECUTION se ON je.JOB_EXECUTION_ID = se.JOB_EXECUTION_ID
WHERE je.JOB_NAME = 'csvImportJob'
ORDER BY je.START_TIME DESC;
2. 告警機制
@Component
public class JobExecutionListener implements org.springframework.batch.core.JobExecutionListener {
@Autowired
private AlertService alertService;
@Override
public void afterJob(JobExecution jobExecution) {
Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
for (StepExecution step : stepExecutions) {
int totalSkips = step.getReadSkipCount()
+ step.getProcessSkipCount()
+ step.getWriteSkipCount();
// 跳過筆數過多時告警
if (totalSkips > 100) {
alertService.sendAlert(
String.format("Job %s: Total skips = %d",
jobExecution.getJobInstance().getJobName(),
totalSkips)
);
}
// 回滾次數過多時告警
if (step.getRollbackCount() > 5) {
alertService.sendAlert(
String.format("Job %s: High rollback count = %d",
jobExecution.getJobInstance().getJobName(),
step.getRollbackCount())
);
}
}
}
}
總結:Exception 處理決策樹
Chunk 發生 Exception
↓
是否為暫時性錯誤?
├─ 是(Deadlock、Timeout)
│ → 使用 Retry 策略
│ → 重試 3 次
│ ↓
│ 重試成功?
│ ├─ 是 → ✅ 繼續執行
│ └─ 否 → 進入 Skip 判斷
│
└─ 否(資料格式錯誤、驗證失敗)
↓
是否可接受遺漏此筆資料?
├─ 是(非關鍵資料)
│ → 使用 Skip 策略
│ → 記錄錯誤資料
│ → ✅ 繼續執行
│
└─ 否(關鍵業務資料)
→ 使用 Restart 策略
→ ❌ Job 失敗
→ 修正資料後重新執行
效能優化建議
1. Chunk Size 調整
- 預設 1000 是經驗值
- 可根據資料大小和資料庫效能調整
- 建議範圍: 500-5000
2. 資料庫連線池
spring.datasource.hikari.maximum-pool-size=10
spring.datasource.hikari.minimum-idle=5
3. 批次插入優化
spring.jpa.properties.hibernate.jdbc.batch_size=1000
spring.jpa.properties.hibernate.order_inserts=true
4. Processor 複雜度
- 避免在 Processor 中進行耗時操作(如外部 API 呼叫)
- 複雜運算考慮使用多執行緒
多執行緒處理(進階)
如需提升效能,可啟用多執行緒:
@Bean
public Step csvImportStep(ItemReader<CsvRecord> csvReader,
ItemProcessor<CsvRecord, CsvRecord> csvProcessor,
JdbcBatchItemWriter<CsvRecord> csvWriter,
TaskExecutor taskExecutor) {
return stepBuilderFactory.get("csvImportStep")
.<CsvRecord, CsvRecord>chunk(1000)
.reader(csvReader)
.processor(csvProcessor)
.writer(csvWriter)
.taskExecutor(taskExecutor) // 啟用多執行緒
.throttleLimit(10) // 最多 10 個執行緒
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("batch-");
executor.initialize();
return executor;
}
⚠️ 注意: 多執行緒模式下,Reader 必須是 Thread-safe 的。
監控與日誌
Spring Batch 內建監控
- JobRepository: 自動記錄 Job 執行狀態
- MetaData Tables:
BATCH_JOB_INSTANCEBATCH_JOB_EXECUTIONBATCH_STEP_EXECUTIONBATCH_STEP_EXECUTION_CONTEXTBATCH_JOB_EXECUTION_CONTEXTBATCH_JOB_EXECUTION_PARAMS
MetaData Tables 建立方式
Spring Batch 的 Metadata Tables 不需要手動建立,有多種自動化方式:
方式 1: 自動建立(開發環境推薦)
application.properties 設定:
# 自動建立 Spring Batch Metadata Tables
spring.batch.jdbc.initialize-schema=always
# 其他可選值:
# - always: 每次啟動都執行建表 SQL(會先 DROP TABLE,適合開發環境)
# - embedded: 只在嵌入式資料庫(H2、HSQL)時自動建立(預設值)
# - never: 不自動建立,需要手動建立
運作原理: - Spring Boot 會自動偵測使用的資料庫類型(MySQL、PostgreSQL、Oracle 等) - 自動執行對應的建表 SQL 腳本 - SQL 腳本位於 Spring Batch 的 JAR 檔中
適用情境:
- ✅ 開發環境、測試環境
- ✅ 使用嵌入式資料庫(H2)
- ❌ 生產環境(不建議使用 always,避免誤刪資料)
方式 2: 手動執行 SQL(生產環境推薦)
步驟 1: 取得官方 SQL 腳本
Spring Batch 為各種資料庫提供了官方的建表腳本,位於:
spring-batch-core-{version}.jar
└── org/springframework/batch/core/
├── schema-mysql.sql # MySQL / MariaDB
├── schema-postgresql.sql # PostgreSQL
├── schema-oracle.sql # Oracle
├── schema-sqlserver.sql # SQL Server
├── schema-db2.sql # DB2
├── schema-h2.sql # H2
└── ...
從 Maven 依賴中取得腳本:
方法 1: 從本地 Maven Repository 複製
# Windows
cd %USERPROFILE%\.m2\repository\org\springframework\batch\spring-batch-core\4.3.10
jar -xf spring-batch-core-4.3.10.jar org/springframework/batch/core/schema-mysql.sql
# Linux / macOS
cd ~/.m2/repository/org/springframework/batch/spring-batch-core/4.3.10
jar -xf spring-batch-core-4.3.10.jar org/springframework/batch/core/schema-mysql.sql
方法 2: 從 GitHub 下載
# Spring Batch GitHub Repository
https://github.com/spring-projects/spring-batch/tree/main/spring-batch-core/src/main/resources/org/springframework/batch/core
# 直接下載 MySQL 腳本範例
curl -O https://raw.githubusercontent.com/spring-projects/spring-batch/main/spring-batch-core/src/main/resources/org/springframework/batch/core/schema-mysql.sql
步驟 2: 執行 SQL 腳本
# MySQL
mysql -u root -p your_database < schema-mysql.sql
# PostgreSQL
psql -U postgres -d your_database -f schema-postgresql.sql
# SQL Server
sqlcmd -S localhost -d your_database -i schema-sqlserver.sql -U sa -P password
步驟 3: 設定 application.properties
# 告訴 Spring Boot 不要自動建表,因為已經手動建立
spring.batch.jdbc.initialize-schema=never
方式 3: 使用 Flyway 或 Liquibase(企業環境推薦)
使用 Flyway 管理資料庫版本:
pom.xml 加入依賴:
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
建立 Migration 檔案:
src/main/resources/
└── db/
└── migration/
├── V1__create_spring_batch_tables.sql # Spring Batch Metadata Tables
└── V2__create_business_tables.sql # 業務表(如 your_table)
V1__create_spring_batch_tables.sql(從官方腳本複製):
-- MySQL 範例
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
PARAMETER_NAME VARCHAR(100) NOT NULL ,
PARAMETER_TYPE VARCHAR(100) NOT NULL ,
PARAMETER_VALUE VARCHAR(2500) ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
application.properties 設定:
# Flyway 設定
spring.flyway.enabled=true
spring.flyway.baseline-on-migrate=true
# Spring Batch 不自動建表(由 Flyway 管理)
spring.batch.jdbc.initialize-schema=never
優點: - ✅ 版本控制:所有資料庫變更都有版本記錄 - ✅ 可追蹤:知道何時、由誰執行了哪些變更 - ✅ 可回滾:可以回到特定版本 - ✅ 多環境一致:開發、測試、生產環境使用相同腳本 - ✅ CI/CD 整合:可自動化部署資料庫變更
方式 4: Spring Boot 自動執行自訂 SQL(簡化版)
如果不想使用 Flyway,可以讓 Spring Boot 自動執行自訂 SQL:
步驟 1: 放置 SQL 檔案
src/main/resources/
├── schema.sql # 建表 SQL(啟動時自動執行)
└── data.sql # 初始資料 SQL(可選)
schema.sql(複製官方腳本內容):
-- 從官方 schema-mysql.sql 複製完整內容
CREATE TABLE BATCH_JOB_INSTANCE ( ... );
CREATE TABLE BATCH_JOB_EXECUTION ( ... );
-- ... 其他表
application.properties 設定:
# 啟動時執行 schema.sql 和 data.sql
spring.sql.init.mode=always
# 或 spring.sql.init.mode=embedded (只在嵌入式資料庫執行)
# Spring Batch 不再自動建表
spring.batch.jdbc.initialize-schema=never
Metadata Tables 說明
1. BATCH_JOB_INSTANCE
用途: 記錄 Job 的唯一實例(由 Job Name + Job Parameters 識別)
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL, -- Job 名稱
JOB_KEY VARCHAR(32) NOT NULL, -- Parameters 的 Hash 值
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
);
範例查詢:
-- 查詢所有 Job Instance
SELECT
JOB_INSTANCE_ID,
JOB_NAME,
JOB_KEY
FROM BATCH_JOB_INSTANCE
ORDER BY JOB_INSTANCE_ID DESC;
2. BATCH_JOB_EXECUTION
用途: 記錄每次 Job 執行的詳細資訊
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL, -- 關聯的 Job Instance
CREATE_TIME DATETIME(6) NOT NULL, -- 建立時間
START_TIME DATETIME(6) DEFAULT NULL , -- 開始時間
END_TIME DATETIME(6) DEFAULT NULL , -- 結束時間
STATUS VARCHAR(10) , -- STARTED, COMPLETED, FAILED 等
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6)
);
範例查詢:
-- 查詢最近執行的 Job
SELECT
je.JOB_EXECUTION_ID,
ji.JOB_NAME,
je.STATUS,
je.START_TIME,
je.END_TIME,
TIMESTAMPDIFF(SECOND, je.START_TIME, je.END_TIME) as DURATION_SECONDS,
je.EXIT_CODE,
je.EXIT_MESSAGE
FROM BATCH_JOB_EXECUTION je
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE ji.JOB_NAME = 'csvImportJob'
ORDER BY je.START_TIME DESC
LIMIT 10;
3. BATCH_STEP_EXECUTION
用途: 記錄每個 Step 執行的詳細統計資訊(最重要的監控表)
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL, -- Step 名稱
JOB_EXECUTION_ID BIGINT NOT NULL, -- 關聯的 Job Execution
START_TIME DATETIME(6) DEFAULT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT , -- 提交次數(= Chunk 數量)
READ_COUNT BIGINT , -- 讀取筆數
FILTER_COUNT BIGINT , -- 過濾筆數(Processor 返回 null)
WRITE_COUNT BIGINT , -- 寫入筆數
READ_SKIP_COUNT BIGINT , -- 讀取時跳過筆數
WRITE_SKIP_COUNT BIGINT , -- 寫入時跳過筆數
PROCESS_SKIP_COUNT BIGINT , -- 處理時跳過筆數
ROLLBACK_COUNT BIGINT , -- 回滾次數
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6)
);
範例查詢:
-- 查詢 Job 的詳細執行統計
SELECT
ji.JOB_NAME,
je.JOB_EXECUTION_ID,
je.STATUS as JOB_STATUS,
se.STEP_NAME,
se.STATUS as STEP_STATUS,
se.READ_COUNT, -- 讀取總數
se.WRITE_COUNT, -- 寫入總數
se.FILTER_COUNT, -- Processor 過濾(返回 null)
se.READ_SKIP_COUNT, -- 讀取跳過
se.PROCESS_SKIP_COUNT, -- 處理跳過
se.WRITE_SKIP_COUNT, -- 寫入跳過
(se.READ_SKIP_COUNT + se.PROCESS_SKIP_COUNT + se.WRITE_SKIP_COUNT) as TOTAL_SKIP,
se.ROLLBACK_COUNT, -- 回滾次數
se.COMMIT_COUNT, -- Chunk 提交次數
ROUND(se.READ_COUNT / se.COMMIT_COUNT, 0) as AVG_CHUNK_SIZE,
TIMESTAMPDIFF(SECOND, se.START_TIME, se.END_TIME) as DURATION_SECONDS,
ROUND(se.READ_COUNT / TIMESTAMPDIFF(SECOND, se.START_TIME, se.END_TIME), 2) as RECORDS_PER_SECOND
FROM BATCH_STEP_EXECUTION se
JOIN BATCH_JOB_EXECUTION je ON se.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE ji.JOB_NAME = 'csvImportJob'
ORDER BY se.START_TIME DESC;
4. BATCH_JOB_EXECUTION_PARAMS
用途: 記錄每次執行時傳入的 Job Parameters
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
PARAMETER_NAME VARCHAR(100) NOT NULL , -- 參數名稱(如 inputFile)
PARAMETER_TYPE VARCHAR(100) NOT NULL , -- 參數類型(STRING, DATE, LONG, DOUBLE)
PARAMETER_VALUE VARCHAR(2500) , -- 參數值
IDENTIFYING CHAR(1) NOT NULL -- 是否用於識別 Job Instance
);
範例查詢:
-- 查詢 Job 執行時使用的參數
SELECT
ji.JOB_NAME,
je.JOB_EXECUTION_ID,
jep.PARAMETER_NAME,
jep.PARAMETER_VALUE,
je.STATUS,
je.START_TIME
FROM BATCH_JOB_EXECUTION_PARAMS jep
JOIN BATCH_JOB_EXECUTION je ON jep.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE ji.JOB_NAME = 'csvImportJob'
ORDER BY je.START_TIME DESC;
5. BATCH_STEP_EXECUTION_CONTEXT
用途: 儲存 Step 執行時的上下文資訊(如 Reader 讀取進度,用於 Restart)
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL, -- JSON 格式的簡短上下文
SERIALIZED_CONTEXT TEXT -- 完整序列化的上下文
);
範例查詢:
-- 查詢 Step 的執行上下文(可看到 Reader 讀到第幾行)
SELECT
se.STEP_EXECUTION_ID,
se.STEP_NAME,
se.STATUS,
sec.SHORT_CONTEXT
FROM BATCH_STEP_EXECUTION_CONTEXT sec
JOIN BATCH_STEP_EXECUTION se ON sec.STEP_EXECUTION_ID = se.STEP_EXECUTION_ID
WHERE se.STEP_EXECUTION_ID = 1;
-- 輸出範例:
-- {"FlatFileItemReader.read.count":1500}
-- 表示 Reader 已讀取 1500 筆,Restart 時會從這裡繼續
6. BATCH_JOB_EXECUTION_CONTEXT
用途: 儲存 Job 執行時的上下文資訊
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT
);
常用監控查詢
1. 查詢失敗的 Job
SELECT
ji.JOB_NAME,
je.JOB_EXECUTION_ID,
je.STATUS,
je.START_TIME,
je.EXIT_MESSAGE,
jep.PARAMETER_VALUE as INPUT_FILE
FROM BATCH_JOB_EXECUTION je
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
LEFT JOIN BATCH_JOB_EXECUTION_PARAMS jep
ON je.JOB_EXECUTION_ID = jep.JOB_EXECUTION_ID
AND jep.PARAMETER_NAME = 'inputFile'
WHERE je.STATUS = 'FAILED'
ORDER BY je.START_TIME DESC;
2. 查詢有跳過資料的執行
SELECT
ji.JOB_NAME,
se.STEP_NAME,
se.READ_SKIP_COUNT + se.PROCESS_SKIP_COUNT + se.WRITE_SKIP_COUNT as TOTAL_SKIP,
se.READ_COUNT,
se.WRITE_COUNT,
je.START_TIME
FROM BATCH_STEP_EXECUTION se
JOIN BATCH_JOB_EXECUTION je ON se.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE (se.READ_SKIP_COUNT + se.PROCESS_SKIP_COUNT + se.WRITE_SKIP_COUNT) > 0
ORDER BY je.START_TIME DESC;
3. 查詢執行效能
SELECT
ji.JOB_NAME,
DATE(je.START_TIME) as EXECUTION_DATE,
COUNT(*) as EXECUTION_COUNT,
AVG(TIMESTAMPDIFF(SECOND, je.START_TIME, je.END_TIME)) as AVG_DURATION_SECONDS,
MIN(TIMESTAMPDIFF(SECOND, je.START_TIME, je.END_TIME)) as MIN_DURATION,
MAX(TIMESTAMPDIFF(SECOND, je.START_TIME, je.END_TIME)) as MAX_DURATION
FROM BATCH_JOB_EXECUTION je
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE je.STATUS = 'COMPLETED'
AND je.START_TIME >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY ji.JOB_NAME, DATE(je.START_TIME)
ORDER BY EXECUTION_DATE DESC;
清理歷史資料
Metadata Tables 會持續累積資料,建議定期清理:
-- 刪除 30 天前的執行記錄
DELETE FROM BATCH_STEP_EXECUTION_CONTEXT
WHERE STEP_EXECUTION_ID IN (
SELECT STEP_EXECUTION_ID FROM BATCH_STEP_EXECUTION se
JOIN BATCH_JOB_EXECUTION je ON se.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
WHERE je.CREATE_TIME < DATE_SUB(NOW(), INTERVAL 30 DAY)
);
DELETE FROM BATCH_JOB_EXECUTION_CONTEXT
WHERE JOB_EXECUTION_ID IN (
SELECT JOB_EXECUTION_ID FROM BATCH_JOB_EXECUTION
WHERE CREATE_TIME < DATE_SUB(NOW(), INTERVAL 30 DAY)
);
DELETE FROM BATCH_STEP_EXECUTION
WHERE JOB_EXECUTION_ID IN (
SELECT JOB_EXECUTION_ID FROM BATCH_JOB_EXECUTION
WHERE CREATE_TIME < DATE_SUB(NOW(), INTERVAL 30 DAY)
);
DELETE FROM BATCH_JOB_EXECUTION_PARAMS
WHERE JOB_EXECUTION_ID IN (
SELECT JOB_EXECUTION_ID FROM BATCH_JOB_EXECUTION
WHERE CREATE_TIME < DATE_SUB(NOW(), INTERVAL 30 DAY)
);
DELETE FROM BATCH_JOB_EXECUTION
WHERE CREATE_TIME < DATE_SUB(NOW(), INTERVAL 30 DAY);
-- 注意:BATCH_JOB_INSTANCE 通常不刪除,因為會影響 Restart 判斷
總結:Metadata Tables 建立建議
| 環境 | 推薦方式 | 理由 |
|---|---|---|
| 開發環境 | spring.batch.jdbc.initialize-schema=always |
快速開發,自動建立 |
| 測試環境 | Flyway / Liquibase | 與生產環境一致,可測試 Migration |
| 生產環境 | Flyway / Liquibase + 手動審核 | 版本控制、可追蹤、可回滾 |
| POC / Demo | spring.batch.jdbc.initialize-schema=embedded |
使用 H2 嵌入式資料庫 |
最佳實踐: 1. ✅ 生產環境使用 Flyway 管理所有資料庫變更 2. ✅ 將官方 SQL 腳本納入版本控制 3. ✅ 定期備份 Metadata Tables(用於分析和審計) 4. ✅ 定期清理歷史資料(建議保留 30-90 天) 5. ✅ 建立監控儀表板,視覺化 Job 執行狀態
查詢執行狀態
SELECT * FROM BATCH_JOB_EXECUTION
WHERE JOB_NAME = 'csvImportJob'
ORDER BY START_TIME DESC;
日誌輸出
程式碼中使用 @Slf4j 和 Lombok,可記錄:
- 跳過的無效記錄
- 處理進度
- 錯誤訊息
總結
核心優勢
- ✅ 記憶體安全: 固定記憶體用量,不受檔案大小影響
- ✅ 高效能: 批次寫入 + JDBC Batch
- ✅ 容錯性: Chunk 交易管理,失敗可重試
- ✅ 可維護: 清晰的 Reader-Processor-Writer 架構
- ✅ 可擴展: 支援多執行緒、重試、跳過等機制
適用場景
- 大型 CSV 檔案匯入(百萬筆以上)
- ETL(Extract-Transform-Load)資料處理
- 資料遷移
- 定期批次作業
不適用場景
- 即時性要求高的作業(建議使用訊息佇列)
- 小檔案(直接用一般 I/O 即可)
- 複雜的多表關聯寫入(建議分多個 Step)
使用範例
1. 準備 CSV 檔案
column1,column2,column3
value1,value2,value3
value4,value5,value6
...
2. 準備資料庫表
CREATE TABLE your_table (
column1 VARCHAR(255),
column2 VARCHAR(255),
column3 VARCHAR(255)
);
3. 執行 Job
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job csvImportJob;
public void importCsv(String filePath) {
JobParameters params = new JobParametersBuilder()
.addString("inputFile", filePath)
.addDate("date", new Date())
.toJobParameters();
jobLauncher.run(csvImportJob, params);
}
客製化建議
1. 修改資料模型
根據實際 CSV 結構調整 CsvRecord 類別:
@Data
public static class CsvRecord {
private String id;
private String name;
private BigDecimal amount;
private LocalDate date;
// 根據實際需求增加欄位
}
2. 修改 SQL
根據目標資料表調整:
String sql = "INSERT INTO orders (order_id, customer_name, total_amount, order_date) " +
"VALUES (:id, :name, :amount, :date)";
3. 增強驗證邏輯
return record -> {
// 欄位驗證
if (record.getAmount().compareTo(BigDecimal.ZERO) < 0) {
log.warn("金額不得為負數: {}", record);
return null;
}
// 資料轉換
record.setName(record.getName().toUpperCase());
return record;
};
最後更新: 2026-01-12