使用ClassifierCompositeItemWriter的Spring Batch Java配置错误

西蒙·惠兰

我正在使用带有Java配置的Spring Batch(对此不熟悉),并且在尝试使用ClassifierCompositeItemWriter时遇到错误,因此基于分类器生成单独的文件。

我收到的错误是org.springframework.batch.item.WriterNotOpenException:必须先打开Writer,然后才能将其写入

我的配置如下所示:

    package com.infonova.btcompute.batch.geneva.job;

import com.infonova.btcompute.batch.billruntransfer.BillRunTranStatusFinishedJobAssignment;
import com.infonova.btcompute.batch.billruntransfer.BillRunTranStatusInprogressJobAssignment;
import com.infonova.btcompute.batch.billruntransfer.BillRunTransferStatus;
import com.infonova.btcompute.batch.geneva.camel.GenevaJobLauncher;
import com.infonova.btcompute.batch.geneva.dto.GenevaDetailsResultsDto;
import com.infonova.btcompute.batch.geneva.dto.GenveaDetailsTransactionDto;
import com.infonova.btcompute.batch.geneva.properties.GenevaDetailsExportJobProperties;
import com.infonova.btcompute.batch.geneva.rowmapper.GenevaDetailsTransactionsRowMapper;
import com.infonova.btcompute.batch.geneva.steps.*;
import com.infonova.btcompute.batch.repository.BillrunTransferStatusMapper;
import com.infonova.btcompute.batch.utils.FileNameGeneration;
import com.infonova.product.batch.camel.CamelEnabledJob;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.classify.BackToBackPatternClassifier;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;


public abstract class AbstractGenevaDetailsExportJob extends CamelEnabledJob {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGenevaDetailsExportJob.class);

    @Autowired
    protected JobBuilderFactory jobBuilders;

    @Autowired
    protected StepBuilderFactory stepBuilders;

    @Autowired
    protected DataSource datasource;

    @Autowired
    private BillrunTransferStatusMapper billrunTransferStatusMapper;

    @Autowired
    protected JdbcTemplate jdbcTemplate;


    public abstract GenevaDetailsExportJobProperties jobProperties();

    @Bean
    public RouteBuilder routeBuilder(final GenevaDetailsExportJobProperties jobProperties, final Job job) {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from(jobProperties.getConsumer())
                        .transacted("PROPAGATION_REQUIRED")
                        .routeId(jobProperties.getInputRouteName())
                        .process(genevaJobLauncher(job));
                        //.to("ftp://[email protected]?password=secret");
            }
        };
    }

    @Bean
    public Processor genevaJobLauncher(Job job) {
        return new GenevaJobLauncher(job);
    }

    @Bean
    @StepScope
    public GenevaDetailsReader reader() {
        GenevaDetailsReader reader = new GenevaDetailsReader(jobProperties().getMandatorKey(),
                jobProperties().getInvoiceType(), jobProperties().getSqlResourcePath());
        reader.setSql("");
        reader.setDataSource(datasource);
        reader.setRowMapper(new GenevaDetailsTransactionsRowMapper());
        reader.setFetchSize(jobProperties().getFetchSize());
        return reader;
    }

    @Bean
    @StepScope
    public GenevaDetailsItemProcessor processor() {
        return new GenevaDetailsItemProcessor();
    }

    @Bean
    @StepScope
    public ClassifierCompositeItemWriter writer() {

        List<String> serviceCodes = new ArrayList<>();//billrunTransferStatusMapper.getServiceCodes(jobProperties().getMandatorKey());
        Long billingTaskId = billrunTransferStatusMapper.getCurrentTaskId(jobProperties().getMandatorKey());
        String countryKey = billrunTransferStatusMapper.getCountryKey(billingTaskId);
        serviceCodes.add("BTCC");
        serviceCodes.add("CCMS");

        BackToBackPatternClassifier classifier = new BackToBackPatternClassifier();
        classifier.setRouterDelegate(new GenveaDetailsRouterClassifier());

        HashMap<String, Object> map = new HashMap<>();

        for (String serviceCode : serviceCodes) {
            map.put(serviceCode, genevaDetailsWriter(serviceCode, countryKey));
        }

        classifier.setMatcherMap(map);
        ClassifierCompositeItemWriter<GenveaDetailsTransactionDto> writer = new ClassifierCompositeItemWriter<>();
        writer.setClassifier(classifier);
        return writer;

    }


    @Bean
    @StepScope
    public GenevaDetailsFlatFileItemWriter genevaDetailsWriter(String serviceCode, String countryKey) {
        GenevaDetailsFlatFileItemWriter writer = new GenevaDetailsFlatFileItemWriter(jobProperties().getDelimiter());

        FileNameGeneration fileNameGeneration = new FileNameGeneration();

        try {
            FileSystemResource fileSystemResource = new FileSystemResource(new File(jobProperties().getExportDir(), fileNameGeneration.generateFileName(jdbcTemplate,
                    serviceCode, countryKey)));
            writer.setResource(fileSystemResource);
        } catch (SQLException e) {
            LOGGER.error("Error creating FileSystemResource : " + e.getMessage());
        }
        return writer;
    }

    @Bean
    public Job job() {
        return jobBuilders.get(jobProperties().getJobName())
                .start(setBillRunTransferStatusDetailInprogressStep())
                .next(processGenevaDetailsStep())
                .next(setBillRunTransferStatusProcessedStep())
                .build();
    }

    @Bean
    public Step setBillRunTransferStatusDetailInprogressStep() {
        return stepBuilders.get("setBillRunTransferStatusDetailInprogressStep")
                .tasklet(setBillRunTransferStatusDetailInprogress())
                .build();
    }

    @Bean
    public Tasklet setBillRunTransferStatusDetailInprogress() {
        return new BillRunTranStatusInprogressJobAssignment(BillRunTransferStatus.SUMMARY.toString(), BillRunTransferStatus.DETAILS_INPROGRESS.toString(),
                jobProperties().getMandatorKey(), jobProperties().getInvoiceTypeNum(), jobProperties().getReportTypeNum());
    }

    @Bean
    public Step setBillRunTransferStatusProcessedStep() {
        return stepBuilders.get("setBillRunTransferStatusProcessedStep")
                .tasklet(setBillRunTransferStatusProcessed())
                .build();
    }

    @Bean
    public Tasklet setBillRunTransferStatusProcessed() {
        return new BillRunTranStatusFinishedJobAssignment(BillRunTransferStatus.PROCESSED.toString());
    }

    @Bean
    public Step processGenevaDetailsStep() {
        return stepBuilders.get("processGenevaDetailsStep")
                .<GenveaDetailsTransactionDto, GenevaDetailsResultsDto>chunk(jobProperties().getChunkSize())
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

}

我的作家看起来像:

package com.infonova.btcompute.batch.geneva.steps;

import com.infonova.btcompute.batch.geneva.dto.GenevaDetailsResultsDto;
import com.infonova.btcompute.batch.repository.BillrunTransferStatusMapper;
import com.infonova.btcompute.batch.utils.FileNameGeneration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.*;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;

@Component
public class GenevaDetailsFlatFileItemWriter extends FlatFileItemWriter<GenevaDetailsResultsDto> {

    private static final Logger LOGGER = LoggerFactory.getLogger(GenevaDetailsFlatFileItemWriter.class);

    @Autowired
    protected JdbcTemplate jdbcTemplate;

    @Autowired
    private BillrunTransferStatusMapper billrunTransferStatusMapper;


    private String delimiter;


    public GenevaDetailsFlatFileItemWriter(String delimiter) {
        this.delimiter = delimiter;
        this.setLineAggregator(getLineAggregator());
        this.setHeaderCallback(getHeaderCallback());
    }

    private DelimitedLineAggregator<GenevaDetailsResultsDto> getLineAggregator() {
        DelimitedLineAggregator<GenevaDetailsResultsDto> delLineAgg = new DelimitedLineAggregator<>();
        delLineAgg.setDelimiter(delimiter);

        BeanWrapperFieldExtractor<GenevaDetailsResultsDto> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(getNames());
        delLineAgg.setFieldExtractor(fieldExtractor);

        return delLineAgg;
    }

    private String[] getHeaderNames() {
        return new String[] {"Record ID", "Service Identifier", "Billing Account Reference", "Cost Description", "Event Cost",
                "Event Date and Time", "Currency Code", "Charge Category", "Order Identifier", "Net Usage", "UOM",
                "Quantity", "Service Start Date", "Service End Date"};
    }

    private String[] getNames() {
        return new String[] {"RECORD_ID", "SERVICE_CODE", "BILLING_ACCOUNT_REFERENCE", "COST_DESCRIPTION", "EVENT_COST",
                "EVENT_DATE_AND_TIME", "CURRENCY_CODE", "CHARGE_CATEGORY", "ORDER_IDENTIFIER", "NET_USAGE", "UOM",
                "QUANTITY", "SERVICE_START_DATE", "SERVICE_END_DATE"};
    }



    private FlatFileHeaderCallback getHeaderCallback()
    {
        return new FlatFileHeaderCallback() {
            @Override
            public void writeHeader(Writer writer) throws IOException {
                writer.write(String.join(delimiter, getHeaderNames()));
            }
        };
    }

//    @BeforeStep
//    public void beforeStep(StepExecution stepExecution) {
//        billingTaskId = (Long) stepExecution.getJobExecution().getExecutionContext().get("billingTaskId");
//        FileNameGeneration fileNameGeneration = new FileNameGeneration();
//
//        try {
//            FileSystemResource fileSystemResource = new FileSystemResource(new File(exportDir, fileNameGeneration.generateFileName(jdbcTemplate,
//                    serviceCode, billrunTransferStatusMapper.getCountryKey(billingTaskId))));
//            setResource(fileSystemResource);
//        } catch (SQLException e) {
//            LOGGER.error("Error creating FileSystemResource : " + e.getMessage());
//        }
//    }
}

我已经在网上搜索过,找不到解决此问题的方法。

汉斯·温格(Hansjoerg Wingeier)

ClassifierCompositeItemWriter没有实现ItemStream接口,因此从不调用FlatFileItemWriter的open方法。

最简单的方法是在创建分类器映射时调用open方法:

for (String serviceCode : serviceCodes) {
    FlatFileItemWriter  writer  =genevaDetailsWriter(serviceCode, countryKey); 
    writer.open (new ExecutionContext ());
    map.put(serviceCode, writer);

    }

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

重试不与Spring Batch的使用Java配置工作

使用 Java 配置在 SQL 中进行 Spring Batch 后期绑定

Spring Batch Java Config @EnableBatchProcessing批注错误

Spring Batch分区-队列配置

Spring Batch-Java配置中的抽象步骤定义?

Spring Batch:Java配置文件中的并行步骤执行

引起:java.lang.NoSuchMethodException Spring Batch xml配置

FlatFileParseException解析错误-Spring Batch

如何使用具有Java配置的CommandLineJobRunner启动Spring Batch Job

在Spring Batch分区中配置gridSize

Spring Batch:需要更好的作业配置

使用Spring Batch时发生batch-int:job-launching-gateway错误

使用Java Config时的Spring Batch Table前缀

Spring Batch Classifier Composite Item Writer错误

使用Spring Batch和Spring Boot 2.2.5时,无法从xml配置中注册Custom LineMapper

Spring Batch是否支持Java 8?

Spring Batch 测试用例忽略 Spring 配置文件

Spring Boot + Spring Batch + HSQLDB -> 为 JobRepository 配置 HSQLDB

JedisConnectionFactory的方法已弃用。Spring Batch中要使用哪些XML配置?

如何具有多个spring boot batch配置类

如何为Spring Batch配置分配动态属性?

从步骤之前获取XML配置Spring Batch中的jobExecutionContext

Spring Batch Gemfire 9.6 连接错误 - 引起:java.io.IOException:无法写入部署目录

使用Eclipse或STS的Spring Batch

在Spring Batch中使用stepExecution

Spring Batch/Postgres:错误:关系“batch_job_instance”不存在

Spring Batch FlatFileItemReader在错误的令牌数量上继续

Spring Batch:如何解析错误的CSV文件?

基于Spring Java的自定义错误页面配置