Skip to content

Commit

Permalink
Polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
ghillert committed Aug 19, 2013
1 parent 38af6f5 commit 0ff3dfe
Show file tree
Hide file tree
Showing 19 changed files with 147 additions and 104 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The sample is based on the sample create for the book [Spring Integration in Act

## Objective

This sample uses **Spring Batch Integration** to more easily use *Spring Batch* and *Spring Integration* together. The application will poll a directory for a file that contains 27 payment records. *Spring Batch* will subsequently process those payments.
This sample uses **Spring Batch Integration** to more easily use *Spring Batch* and *Spring Integration* together. The application will poll a directory for a file that contains 27 payment records. *Spring Batch* will subsequently process those payments. If an error occurs the Job is resubmitted.

## Running the Sample

Expand All @@ -30,5 +30,8 @@ Or via one line:
As a result the final console output should be:

DONE!!
exitStatus: COMPLETED; imported # of payments: 27

exitStatus: COMPLETED; # of payments imported: 27
Sent '2' notifications:
#1 Subject: 'Execution has STARTED', Message: 'Execution has STARTED'.
#2 Subject: 'Execution has COMPLETED', Message: 'Execution has COMPLETED'.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@

/**
* @author Marius Bogoevici
* @author Gunnar Hillert
*/
public class ExecutionsToMailTransformer {

@Transformer
public Message<String> transformExecutionsToMail(JobExecution jobExecution) {
String result = "Execution has ended " + jobExecution.getStatus().toString();
String result = "Execution has " + jobExecution.getStatus().toString();
return MessageBuilder.withPayload(result)
.setHeader(MailHeaders.TO, "[email protected]")
.setHeader(MailHeaders.FROM, "[email protected]").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,36 @@
*/
package org.springframework.batch.integration.samples.payments;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.integration.annotation.Router;

/**
* @author Marius Bogoevici
* @author Gunnar Hillert
*/
public class JobExecutionsRouter {

@Router
public String routeJobExecution(JobExecution jobExecution)
{
public List<String> routeJobExecution(JobExecution jobExecution) {

final List<String> routeToChannels = new ArrayList<String>();

if (jobExecution.getStatus().equals(BatchStatus.FAILED)) {
return "jobRestarts";
routeToChannels.add("jobRestarts");
}
else {
return "notifiableExecutions";

if (jobExecution.getStatus().equals(BatchStatus.COMPLETED)) {
routeToChannels.add("completeApplication");
}

routeToChannels.add("notifiableExecutions");
}

return routeToChannels;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.batch.integration.samples.payments;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersInvalidException;
Expand All @@ -27,8 +29,12 @@

/**
* @author Marius Bogoevici
* @author Gunnar Hillert
*/
public class JobRestart {

private static final Log logger = LogFactory.getLog(JobRestart.class);

@Autowired
JobLauncher jobLauncher;

Expand All @@ -37,6 +43,7 @@ public class JobRestart {

@ServiceActivator
public void restartIfPossible(JobExecution execution) throws JobInstanceAlreadyCompleteException, JobParametersInvalidException, JobRestartException, JobExecutionAlreadyRunningException {
jobLauncher.run(job, execution.getJobParameters());
logger.info("Restarting job...");
jobLauncher.run(job, execution.getJobParameters());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,17 @@
*/
package org.springframework.batch.integration.samples.payments;

import java.util.Scanner;
import java.util.List;

import org.apache.log4j.Logger;
import org.junit.Assert;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.integration.samples.payments.util.SpringIntegrationUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.jdbc.core.JdbcTemplate;

import org.springframework.mail.SimpleMailMessage;

/**
* Starts the Spring Context and will initialize the Spring Integration routes.
Expand All @@ -40,16 +35,15 @@
*/
public final class Main {

private static final Logger LOGGER = Logger.getLogger(Main.class);

private Main() { }

/**
* Load the Spring Integration Application Context
*
* @param args - command line arguments
* @throws InterruptedException
*/
public static void main(final String... args) {
public static void main(final String... args) throws InterruptedException {

System.out.println("\n========================================================="
+ "\n Welcome to the Spring Batch Integration "
Expand All @@ -67,28 +61,40 @@ public static void main(final String... args) {
context.registerShutdownHook();

final JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
final QueueChannel statusesChannel = context.getBean("statuses", QueueChannel.class);
final JobRepository jobRepository = context.getBean(JobRepository.class);

SpringIntegrationUtils.displayDirectories(context);


final Scanner scanner = new Scanner(System.in);

System.out.println("\n========================================================="
+ "\n "
+ "\n Waiting for Job execution to finish. "
+ "\n "
+ "\n=========================================================" );

JobExecution jobExecution = ((Message<JobExecution>) statusesChannel.receive(120000)).getPayload();
ExitStatus exitStatus = jobExecution.getExitStatus();
Assert.assertEquals(ExitStatus.COMPLETED, exitStatus);
int count = jdbcTemplate.queryForInt("select count(*) from payments");
final QueueChannel completeApplicationChannel =
context.getBean("completeApplication", QueueChannel.class);

System.out.println(String.format("\nDONE!!\nexitStatus: %s; imported # of payments: %s",
@SuppressWarnings("unchecked")
final Message<JobExecution> jobExecutionMessage = (Message<JobExecution>) completeApplicationChannel.receive();
final JobExecution jobExecution = jobExecutionMessage.getPayload();
final ExitStatus exitStatus = jobExecution.getExitStatus();
final int count = jdbcTemplate.queryForObject("select count(*) from payments", Integer.class);

System.out.println(String.format("\nDONE!!\nexitStatus: %s; # of payments imported: %s",
exitStatus.getExitCode(), count));

final StubJavaMailSender mailSender = context.getBean(StubJavaMailSender.class);
final List<SimpleMailMessage> emails = mailSender.getSentSimpleMailMessages();
final int numberOfSentNotifications = emails.size();

System.out.println(String.format("Sent '%s' notifications:", numberOfSentNotifications));

int counter = 1;
for (SimpleMailMessage mailMessage : emails) {
System.out.println(String.format("#%s Subject: '%s', Message: '%s'.",
counter, mailMessage.getText(), mailMessage.getText()));
counter++;
}

System.exit(0);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,35 @@

import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.listener.ItemListenerSupport;
import org.springframework.batch.integration.samples.payments.model.Notification;
import org.springframework.batch.integration.samples.payments.model.Payment;
import org.springframework.batch.item.file.FlatFileParseException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;

/**
* @author Marius Bogoevici
* @author Gunnar Hillert
*/
public class PaymentChunkListener extends ItemListenerSupport<Payment, Payment> {

private static final Log logger = LogFactory.getLog(PaymentChunkListener.class);

@Autowired
@Qualifier("chunkExecutions")
MessageChannel chunkNotificationsChannel;

@Override
public void onReadError(Exception ex) {
if (ex instanceof FlatFileParseException) {
FlatFileParseException ffpe = (FlatFileParseException) ex;
logger.error(String.format("Error reading data on line '%s' - data: '%s'", ffpe.getLineNumber(), ffpe.getInput()));
}
chunkNotificationsChannel.send(MessageBuilder.withPayload(new Notification(ex.getMessage(),true)).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@
*/
package org.springframework.batch.integration.samples.payments;

import org.springframework.batch.integration.samples.payments.model.Payment;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

/**
* @author Marius Bogoevici
* @author Gunnar Hillert
*/
public class PaymentFieldSetMapper implements FieldSetMapper<Payment> {

@Override
public Payment mapFieldSet(FieldSet fieldSet) throws BindException {
Payment payment = new Payment();

final Payment payment = new Payment();

payment.setSourceAccountNo(fieldSet.readString("source"));
payment.setDestinationAccountNo(fieldSet.readString("destination"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javax.sql.DataSource;

import org.apache.log4j.Logger;
import org.springframework.batch.integration.samples.payments.model.Payment;
import org.springframework.batch.item.ItemWriter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class StubJavaMailSender implements JavaMailSender {

private final List<SimpleMailMessage> sentSimpleMailMessages = new ArrayList<SimpleMailMessage>();


public StubJavaMailSender(MimeMessage uniqueMessage) {
this.uniqueMessage = uniqueMessage;
}
Expand Down Expand Up @@ -72,7 +71,7 @@ public void send(MimeMessagePreparator mimeMessagePreparator) throws MailExcepti
}

public void send(MimeMessagePreparator[] mimeMessagePreparators) throws MailException {
throw new UnsupportedOperationException("MimeMessagePreparator not supported");
throw new UnsupportedOperationException("MimeMessagePreparator not supported");
}

public void send(SimpleMailMessage simpleMessage) throws MailException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import javax.sql.DataSource;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.integration.samples.payments.PaymentWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
Expand All @@ -40,7 +39,7 @@ public JdbcTemplate jdbcTemplate() {

@Bean
public DataSource dataSource() {
EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
final EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
builder.setType(EmbeddedDatabaseType.HSQL)
.addScript("classpath:/org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("classpath:/org/springframework/batch/core/schema-hsqldb.sql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.integration.samples.payments;
package org.springframework.batch.integration.samples.payments.model;

/**
* @author Marius Bogoevici
* @author Gunnar Hillert
*/
public class Notification {

private String message;

private boolean failure;
Expand All @@ -43,4 +45,11 @@ public boolean isFailure() {
public void setFailure(boolean failure) {
this.failure = failure;
}

@Override
public String toString() {
return "Notification [message=" + message + ", failure=" + failure
+ "]";
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.integration.samples.payments;
package org.springframework.batch.integration.samples.payments.model;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
* @author Marius Bogoevici
* @author Gunnar Hillert
*/
public class Payment {

Expand All @@ -43,6 +45,12 @@ public Date getDate() {
return date;
}

public String getDateFormatted() {
final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
return simpleDateFormat.format(date);
}


public void setDate(Date date) {
this.date = date;
}
Expand All @@ -65,9 +73,8 @@ public void setSourceAccountNo(String sourceAccountNo) {

@Override
public String toString() {
return "Payment [sourceAccountNo=" + sourceAccountNo
+ ", destinationAccountNo=" + destinationAccountNo
+ ", amount=" + amount + ", date=" + date + "]";
return "Payment [sourceAcct#=" + sourceAccountNo
+ ", destAcct#=" + destinationAccountNo
+ ", amount=" + amount + ", date=" + getDateFormatted() + "]";
}

}
Loading

0 comments on commit 0ff3dfe

Please sign in to comment.