Skip to content

Commit 62a8f44

Browse files
fmbenhassinemminella
authored andcommitted
Improve the performance of step partitioning
This commit improves the performance of splitting a step execution. It moves the logic of finding the last step execution of a job instance to the database (instead of doing it in memory). Resolves BATCH-2716
1 parent 161801e commit 62a8f44

File tree

5 files changed

+112
-31
lines changed

5 files changed

+112
-31
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.batch.core.BatchStatus;
3434
import org.springframework.batch.core.ExitStatus;
3535
import org.springframework.batch.core.JobExecution;
36+
import org.springframework.batch.core.JobInstance;
3637
import org.springframework.batch.core.StepExecution;
3738
import org.springframework.beans.factory.InitializingBean;
3839
import org.springframework.dao.OptimisticLockingFailureException;
@@ -83,6 +84,16 @@ public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implement
8384

8485
private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " and STEP_EXECUTION_ID = ?";
8586

87+
private static final String GET_LAST_STEP_EXECUTION = "SELECT " +
88+
" SE.STEP_EXECUTION_ID, SE.STEP_NAME, SE.START_TIME, SE.END_TIME, SE.STATUS, SE.COMMIT_COUNT, SE.READ_COUNT, SE.FILTER_COUNT, SE.WRITE_COUNT, SE.EXIT_CODE, SE.EXIT_MESSAGE, SE.READ_SKIP_COUNT, SE.WRITE_SKIP_COUNT, SE.PROCESS_SKIP_COUNT, SE.ROLLBACK_COUNT, SE.LAST_UPDATED, SE.VERSION," +
89+
" JE.JOB_EXECUTION_ID, JE.START_TIME, JE.END_TIME, JE.STATUS, JE.EXIT_CODE, JE.EXIT_MESSAGE, JE.CREATE_TIME, JE.LAST_UPDATED, JE.VERSION" +
90+
" from %PREFIX%JOB_EXECUTION JE, %PREFIX%STEP_EXECUTION SE" +
91+
" where " +
92+
" SE.JOB_EXECUTION_ID in (SELECT JOB_EXECUTION_ID from %PREFIX%JOB_EXECUTION where JE.JOB_INSTANCE_ID = ?)" +
93+
" and SE.JOB_EXECUTION_ID = JE.JOB_EXECUTION_ID " +
94+
" and SE.STEP_NAME = ?" +
95+
" order by SE.START_TIME desc, SE.STEP_EXECUTION_ID desc";
96+
8697
private static final String CURRENT_VERSION_STEP_EXECUTION = "SELECT VERSION FROM %PREFIX%STEP_EXECUTION WHERE STEP_EXECUTION_ID=?";
8798

8899
private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH;
@@ -297,6 +308,30 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
297308
}
298309
}
299310

311+
@Override
312+
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
313+
List<StepExecution> executions = getJdbcTemplate().query(
314+
getQuery(GET_LAST_STEP_EXECUTION),
315+
(rs, rowNum) -> {
316+
Long jobExecutionId = rs.getLong(18);
317+
JobExecution jobExecution = new JobExecution(jobExecutionId);
318+
jobExecution.setStartTime(rs.getTimestamp(19));
319+
jobExecution.setEndTime(rs.getTimestamp(20));
320+
jobExecution.setStatus(BatchStatus.valueOf(rs.getString(21)));
321+
jobExecution.setExitStatus(new ExitStatus(rs.getString(22), rs.getString(23)));
322+
jobExecution.setCreateTime(rs.getTimestamp(24));
323+
jobExecution.setLastUpdated(rs.getTimestamp(25));
324+
jobExecution.setVersion(rs.getInt(26));
325+
return new StepExecutionRowMapper(jobExecution).mapRow(rs, rowNum);
326+
},
327+
jobInstance.getInstanceId(), stepName);
328+
if (executions.isEmpty()) {
329+
return null;
330+
} else {
331+
return executions.get(0);
332+
}
333+
}
334+
300335
@Override
301336
public void addStepExecutions(JobExecution jobExecution) {
302337
getJdbcTemplate().query(getQuery(GET_STEP_EXECUTIONS), new StepExecutionRowMapper(jobExecution),

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapStepExecutionDao.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2018 the original author or authors.
2+
* Copyright 2006-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727

2828
import org.springframework.batch.core.Entity;
2929
import org.springframework.batch.core.JobExecution;
30+
import org.springframework.batch.core.JobInstance;
3031
import org.springframework.batch.core.StepExecution;
3132
import org.springframework.dao.OptimisticLockingFailureException;
3233
import org.springframework.lang.Nullable;
@@ -119,6 +120,29 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
119120
return executionsByStepExecutionId.get(stepExecutionId);
120121
}
121122

123+
@Override
124+
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
125+
StepExecution latest = null;
126+
for (StepExecution stepExecution : executionsByStepExecutionId.values()) {
127+
if (!stepExecution.getStepName().equals(stepName)
128+
|| stepExecution.getJobExecution().getJobInstance().getInstanceId() != jobInstance.getInstanceId()) {
129+
continue;
130+
}
131+
if (latest == null) {
132+
latest = stepExecution;
133+
}
134+
if (latest.getStartTime().getTime() < stepExecution.getStartTime().getTime()) {
135+
latest = stepExecution;
136+
}
137+
// Use step execution ID as the tie breaker if start time is identical
138+
if (latest.getStartTime().getTime() == stepExecution.getStartTime().getTime() &&
139+
latest.getId() < stepExecution.getId()) {
140+
latest = stepExecution;
141+
}
142+
}
143+
return latest;
144+
}
145+
122146
@Override
123147
public void addStepExecutions(JobExecution jobExecution) {
124148
Map<Long, StepExecution> executions = executionsByJobExecutionId.get(jobExecution.getId());

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2018 the original author or authors.
2+
* Copyright 2006-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import java.util.Collection;
2020

2121
import org.springframework.batch.core.JobExecution;
22+
import org.springframework.batch.core.JobInstance;
2223
import org.springframework.batch.core.StepExecution;
2324
import org.springframework.lang.Nullable;
2425

@@ -65,6 +66,19 @@ public interface StepExecutionDao {
6566
@Nullable
6667
StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId);
6768

69+
/**
70+
* Retrieve the last {@link StepExecution} for a given {@link JobInstance}
71+
* ordered by starting time and then id.
72+
*
73+
* @param jobInstance the parent {@link JobInstance}
74+
* @param stepName the name of the step
75+
* @return a {@link StepExecution}
76+
*/
77+
@Nullable
78+
default StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
79+
throw new UnsupportedOperationException();
80+
}
81+
6882
/**
6983
* Retrieve all the {@link StepExecution} for the parent {@link JobExecution}.
7084
*

spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2018 the original author or authors.
2+
* Copyright 2006-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,7 +35,6 @@
3535
import org.springframework.lang.Nullable;
3636
import org.springframework.util.Assert;
3737

38-
import java.util.ArrayList;
3938
import java.util.Collection;
4039
import java.util.Date;
4140
import java.util.List;
@@ -219,32 +218,7 @@ public void updateExecutionContext(JobExecution jobExecution) {
219218
@Override
220219
@Nullable
221220
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
222-
List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance);
223-
List<StepExecution> stepExecutions = new ArrayList<>(jobExecutions.size());
224-
225-
for (JobExecution jobExecution : jobExecutions) {
226-
stepExecutionDao.addStepExecutions(jobExecution);
227-
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
228-
if (stepName.equals(stepExecution.getStepName())) {
229-
stepExecutions.add(stepExecution);
230-
}
231-
}
232-
}
233-
234-
StepExecution latest = null;
235-
for (StepExecution stepExecution : stepExecutions) {
236-
if (latest == null) {
237-
latest = stepExecution;
238-
}
239-
if (latest.getStartTime().getTime() < stepExecution.getStartTime().getTime()) {
240-
latest = stepExecution;
241-
}
242-
// Use step execution ID as the tie breaker if start time is identical
243-
if (latest.getStartTime().getTime() == stepExecution.getStartTime().getTime() &&
244-
latest.getId() < stepExecution.getId()) {
245-
latest = stepExecution;
246-
}
247-
}
221+
StepExecution latest = stepExecutionDao.getLastStepExecution(jobInstance, stepName);
248222

249223
if (latest != null) {
250224
ExecutionContext stepExecutionContext = ecDao.getExecutionContext(latest);

spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/AbstractStepExecutionDaoTests.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2007 the original author or authors.
2+
* Copyright 2006-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,7 +21,9 @@
2121
import static org.junit.Assert.assertNull;
2222
import static org.junit.Assert.fail;
2323

24+
import java.time.Instant;
2425
import java.util.ArrayList;
26+
import java.util.Arrays;
2527
import java.util.Collection;
2628
import java.util.Date;
2729
import java.util.List;
@@ -152,6 +154,38 @@ public void testSaveAndGetExecutions() {
152154
}
153155
}
154156

157+
@Transactional
158+
@Test
159+
public void testSaveAndGetLastExecution() {
160+
Instant now = Instant.now();
161+
StepExecution stepExecution1 = new StepExecution("step1", jobExecution);
162+
stepExecution1.setStartTime(Date.from(now));
163+
StepExecution stepExecution2 = new StepExecution("step1", jobExecution);
164+
stepExecution2.setStartTime(Date.from(now.plusMillis(500)));
165+
166+
dao.saveStepExecutions(Arrays.asList(stepExecution1, stepExecution2));
167+
168+
StepExecution lastStepExecution = dao.getLastStepExecution(jobInstance, "step1");
169+
assertNotNull(lastStepExecution);
170+
assertEquals(stepExecution2.getId(), lastStepExecution.getId());
171+
}
172+
173+
@Transactional
174+
@Test
175+
public void testSaveAndGetLastExecutionWhenSameStartTime() {
176+
Instant now = Instant.now();
177+
StepExecution stepExecution1 = new StepExecution("step1", jobExecution);
178+
stepExecution1.setStartTime(Date.from(now));
179+
StepExecution stepExecution2 = new StepExecution("step1", jobExecution);
180+
stepExecution2.setStartTime(Date.from(now));
181+
182+
dao.saveStepExecutions(Arrays.asList(stepExecution1, stepExecution2));
183+
StepExecution lastStepExecution = stepExecution1.getId() > stepExecution2.getId() ? stepExecution1 : stepExecution2;
184+
StepExecution retrieved = dao.getLastStepExecution(jobInstance, "step1");
185+
assertNotNull(retrieved);
186+
assertEquals(lastStepExecution.getId(), retrieved.getId());
187+
}
188+
155189
@Transactional
156190
@Test(expected = IllegalArgumentException.class)
157191
public void testSaveNullCollectionThrowsException() {

0 commit comments

Comments
 (0)