我试图使用一个写入器写入成功的记录,而在另一个写入器中写入失败的记录。
我已经编写了实现ItemWriter的BillerOrderWriter类。我放了一些日志语句,可以看到它写成功billerOrderId或失败的billerOrderId。但是,似乎它没有调用DatabaseToCsvFileJobConfig或SuccessOrdersToCsvFileJobConfig。
public class BillerOrderWriter implements ItemWriter<BillerOrder>{
private static Logger log = LoggerFactory.getLogger("BillerOrderWriter.class");
@Autowired
SuccessfulOrdersToCsvFileJobConfig successfulOrdersToCsvFileJobConfig;
@Autowired
DatabaseToCsvFileJobConfig databaseToCsvFileJobConfig;
@Override
public void write(List<? extends BillerOrder> items) throws Exception {
for (BillerOrder item : items) {
log.info("item = " + item.toString());
if (item.getResult().equals("SUCCESS")) {
log.info(" Success billerOrderId = " + item.getBillerOrderId());
successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter();
} else {
log.info("Failed billerOrderId = " + item.getBillerOrderId());
databaseToCsvFileJobConfig.databaseCsvItemWriter();
}
}
}
}
这是BatchConfig类。
@Bean
public BillerOrderWriter billerOrderWriter() {
return new BillerOrderWriter();
}
@Bean
public Job importJobOrder(JobCompletionNotificationListner listener, Step step1) {
return jobBuilderFactory.get("importJobOrder")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean(name="step1")
public Step step1(BillerOrderWriter billerOrderWriter) {
return stepBuilderFactory.get("step1")
.<BillerOrder, BillerOrder> chunk(10)
.reader((ItemReader<? extends BillerOrder>) reader())
.processor(processor())
.writer(billerOrderWriter)
.build();
}
这是我的成功作家和失败作家课堂。
@Configuration
public class SuccessfulOrdersToCsvFileJobConfig {
private static Logger log = LoggerFactory.getLogger("SuccessfulOrdersToCsvFileJobConfig.class");
@Bean
public ItemWriter<BillerOrder> successfulDatabaseCsvItemWriter() {
log.info("Entering SuccessfulOrdersToCsvFileJobConfig...");
FlatFileItemWriter<BillerOrder> csvFileWriter = new FlatFileItemWriter<>();
String exportFileHeader = "BillerOrderId;SuccessMessage";
OrderWriter headerWriter = new OrderWriter(exportFileHeader);
csvFileWriter.setHeaderCallback(headerWriter);
String exportFilePath = "/tmp/SuccessBillerOrderIdForRetry.csv";
csvFileWriter.setResource(new FileSystemResource(exportFilePath));
LineAggregator<BillerOrder> lineAggregator = createOrderLineAggregator();
csvFileWriter.setLineAggregator(lineAggregator);
return csvFileWriter;
}
private LineAggregator<BillerOrder> createOrderLineAggregator() {
log.info("Entering createOrderLineAggregator...");
DelimitedLineAggregator<BillerOrder> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(";");
FieldExtractor<BillerOrder> fieldExtractor = createOrderFieldExtractor();
lineAggregator.setFieldExtractor(fieldExtractor);
return lineAggregator;
}
private FieldExtractor<BillerOrder> createOrderFieldExtractor() {
log.info("Entering createOrderFieldExtractor...");
BeanWrapperFieldExtractor<BillerOrder> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] {"billerOrderId","successMessage"});
return extractor;
}
}
@Configuration
public class DatabaseToCsvFileJobConfig {
private static Logger log = LoggerFactory.getLogger("DatabaseToCsvFileJobConfig.class");
@Bean
public ItemWriter<BillerOrder> databaseCsvItemWriter() {
log.info("Entering databaseCsvItemWriter...");
FlatFileItemWriter<BillerOrder> csvFileWriter = new FlatFileItemWriter<>();
String exportFileHeader = "BillerOrderId;ErrorMessage";
OrderWriter headerWriter = new OrderWriter(exportFileHeader);
csvFileWriter.setHeaderCallback(headerWriter);
String exportFilePath = "/tmp/FailedBillerOrderIdForRetry.csv";
csvFileWriter.setResource(new FileSystemResource(exportFilePath));
LineAggregator<BillerOrder> lineAggregator = createOrderLineAggregator();
csvFileWriter.setLineAggregator(lineAggregator);
return csvFileWriter;
}
private LineAggregator<BillerOrder> createOrderLineAggregator() {
log.info("Entering createOrderLineAggregator...");
DelimitedLineAggregator<BillerOrder> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(";");
FieldExtractor<BillerOrder> fieldExtractor = createOrderFieldExtractor();
lineAggregator.setFieldExtractor(fieldExtractor);
return lineAggregator;
}
private FieldExtractor<BillerOrder> createOrderFieldExtractor() {
log.info("Entering createOrderFieldExtractor...");
BeanWrapperFieldExtractor<BillerOrder> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] {"billerOrderId","errorMessage"});
return extractor;
}
}
这是我的工作完成侦听器类。
@Component
public class JobCompletionNotificationListner extends JobExecutionListenerSupport {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(JobCompletionNotificationListner.class);
@Override
public void afterJob(JobExecution jobExecution) {
log.info("In afterJob ...");
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
DatabaseToCsvFileJobConfig databaseToCsvFileJobConfig = new DatabaseToCsvFileJobConfig();
SuccessfulOrdersToCsvFileJobConfig successfulOrdersToCsvFileJobConfig = new SuccessfulOrdersToCsvFileJobConfig();
}
}
}
在您的BillerOrderWriter#write
方法中,应该编写对项进行实际写入操作的代码到数据接收器。但是在您的情况下,您正在调用successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter();
并databaseToCsvFileJobConfig.databaseCsvItemWriter();
创建了项目编写器bean。您应该注入那些委托编写者,并write
在需要时调用其方法,例如:
public class BillerOrderWriter implements ItemWriter<BillerOrder>{
private ItemWriter<BillerOrder> successfulDatabaseCsvItemWriter;
private ItemWriter<BillerOrder> databaseCsvItemWriter;
// constructor with successfulDatabaseCsvItemWriter + databaseCsvItemWriter
@Override
public void write(List<? extends BillerOrder> items) throws Exception {
for (BillerOrder item : items) {
if (item.getResult().equals("SUCCESS")) {
successfulDatabaseCsvItemWriter.write(Collections.singletonList(item));
} else {
databaseCsvItemWriter.write(Collections.singletonList(item));
}
}
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句