42346

SpringBatch local partitioning restart problems

Question:

I am having issues with restart of local partitioning batch. I am throwing RuntimeException on 101st processed item. The job fails, but something is going wrong, because on restart, the job continues from 150th item (and not from the 100th item that it should).

Here is the xml-conf:

<bean id="taskExecutor" class="org.springframework.scheduling.commonj.WorkManagerTaskExecutor" > <property name="workManagerName" value="springWorkManagers" /> </bean> <bean id="transactionManager" class="org.springframework.transaction.jta.WebSphereUowTransactionManager"/> <batch:job id="LocalPartitioningJob"> <batch:step id="masterStep"> <batch:partition step="slaveStep" partitioner="splitPartitioner"> <batch:handler grid-size="5" task-executor="taskExecutor" /> </batch:partition> </batch:step> </batch:job> <batch:step id="slaveStep"> <batch:tasklet transaction-manager="transactionManager"> <batch:chunk reader="partitionReader" processor="compositeItemProcessor" writer="sqlWriter" commit-interval="50" /> <batch:transaction-attributes isolation="SERIALIZABLE" propagation="REQUIRE" timeout="600" /> <batch:listeners> <batch:listener ref="Processor1" /> <batch:listener ref="Processor2" /> <batch:listener ref="Processor3" /> </batch:listeners> </batch:tasklet> </batch:step> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> <property name="tablePrefix" value="${sb.db.tableprefix}" /> <property name="dataSource" ref="ds" /> <property name="maxVarCharLength" value="1000"/> </bean> <bean id="transactionManager" class="org.springframework.transaction.jta.WebSphereUowTransactionManager"/> <jee:jndi-lookup id="ds" jndi-name="${sb.db.jndi}" cache="true" expected-type="javax.sql.DataSource" />

The splitPartitioner implements Partitioner and splits the initial data and saves it to the executionContexts as lists. The processors call remote ejb's to fetch additional data and the sqlWriter is just a org.spring...JdbcBatchItemWriter. PartitionReader code below:

public class PartitionReader implements ItemStreamReader<TransferObjectTO> { private List<TransferObjectTO> partitionItems; public PartitionReader() { } public synchronized TransferObjectTO read() { if(partitionItems.size() > 0) { return partitionItems.remove(0); } else { return null; } } @SuppressWarnings("unchecked") @Override public void open(ExecutionContext executionContext) throws ItemStreamException { partitionItems = (List<TransferObjectTO>) executionContext.get("partitionItems"); } @Override public void update(ExecutionContext executionContext) throws ItemStreamException { executionContext.put("partitionItems", partitionItems); } @Override public void close() throws ItemStreamException { } }

Answer1:

It seems that I had few misunderstandings of SpringBatch and my buggy code. The first misunderstanding was that I thought that the readCount would be rolled back on RuntimeException. Now I see that this is not the case, but when SpringBatch is incrementing this value and upon step failure, the value is committed.

Related to above, I thought that the update method on ItemStreamReader would be always called, but the executionContext update to database would just be committed or rolled back. But it seems that the update is called only if no errors occur and the executionContext update is always committed.

The third misunderstanding was that the partitioning "master step" would not be re-executed on restart, but only slave steps are re-executed. But actually "master step" is re-executed if "master step"'s slave step would fail. So I guess that master and slave steps are actually somehow handled as a single step.

And then there was my buggy code in the PartitionReader, which was supposed to save db-server disk space. Maybe the partitionItems should not be edited on next()? (Related to the above statements) Anyhow here is the code for the working PartitionReader:

public class PartitionReader implements ItemStreamReader<TransferObjectTO> { private List<TransferObjectTO> partitionItems; private int index; public PartitionReader() { } public synchronized TransferObjectTO read() { if(partitionItems.size() > index) { return partitionItems.get(index++); } else { return null; } } @SuppressWarnings("unchecked") @Override public void open(ExecutionContext executionContext) throws ItemStreamException { partitionItems = (List<TransferObjectTO>) executionContext.get("partitionItems"); index = executionContext.getInt("partitionIndex", 0); } @Override public void update(ExecutionContext executionContext) throws ItemStreamException { executionContext.put("partitionIndex", index); } @Override public void close() throws ItemStreamException { } }

Recommend

  • How to calculate a percentage
  • Stop propagation to appended element
  • Lock wait timeout exceeded; try restarting transaction [closed]
  • After subtracting a number from a sequence, how many of remaining numbers are positive? [closed]
  • Neither BindingResult nor plain target object for bean name 'tweets' available as request
  • How to perform feed forward propagation in CNN using Keras?
  • How to use the setParameterList() method in Hibernate?
  • How to stop all events one html element can listen to?
  • What is $event in Angular Material and do I need it with UI Router?
  • How fast are Azure Storage changes propagated to instances?
  • Filling missing values using forward and backward fill in pandas dataframe (ffill and bfill)
  • Logging module sample code repeats messages n-times-each call
  • Q-learning in neural network not 'learning'
  • How to stop propagating declarations through hierarchical includes?
  • Spring batch error tolerance
  • How to send image as base64 string in JSON using HTTP POST in Android?
  • networkstream “cannot access a disposed object” when using newly created networkstream
  • Complicated COUNT query in MySQL
  • How can I get new CSRF token in LARAVEL by using ajax
  • Find symbol dependencies in source code [closed]
  • Gforce min not supported for character in data.table
  • MySql - get days remaining
  • data.table replicate rows after join?
  • Laravel lmutator $this->attributes return 'Undefined index: id'
  • CSS Grid, position absolute an element in a css grid item: IMPOSSIBLE
  • calling a fragment from fragment
  • Entity Framework ObjectContext: Concurrency
  • how to get data attributes of dynamically generated element
  • Runtime.exec() gives Error: Could not find or load main class
  • How to get links to open in the native browser in iOS Meteor apps?
  • How to make JSON.NET deserialize to Microsoft Date Time?
  • ListItem.Attributes.Add not working
  • How to change the font size of a single index for UISegmentedControl?
  • How to attach a node.js readable stream to a Sendgrid email?
  • Exception “firebase.functions() takes … no argument …” when specifying a region for a Cloud Function
  • Ajax jQuery multiple calls at the same time - long wait for answer and not able to cancel
  • Spray.io: When (not) to use non-blocking route handling?
  • Hazelcast - OperationTimeoutException
  • File upload with ng-file-upload throwing error
  • Change div Background jquery