使用Spring Batch Integration为AWS S3中的每个新文件启动JobLaunchRequest

吉尔赫姆·贝纳迪(Guilherme Bernardi)

我正在关注文档:Spring Batch IntegrationIntegration AWS相结合,用于合并AWS S3。

但是在某些情况下,每个文件的批处理执行不起作用。

AWS S3 Pooling正常运行,因此当我放置新文件或启动应用程序且存储桶中有文件时,应用程序将与本地目录同步:

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
        S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(pS3SessionFactory);
        synchronizer.setPreserveTimestamp(true);
        synchronizer.setDeleteRemoteFiles(false);
        synchronizer.setRemoteDirectory("remote-bucket");
        //synchronizer.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "simpleMetadataStore"));
        return synchronizer;
    }

    @Bean
    @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
            S3InboundFileSynchronizer pS3InboundFileSynchronizer) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new FileSystemResource("files").getFile());
        //messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "fsSimpleMetadataStore"));
        return messageSource;
    }

    @Bean("s3filesChannel")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

我按照教程进行操作,因此创建了代码,因为它与文档相同,所以我FileMessageToJobRequest 不会在此处放置代码

所以我创建了bean IntegrationFlow和FileMessageToJobRequest:

    @Bean
    public IntegrationFlow integrationFlow(
            S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource) {
        return IntegrationFlows.from(pS3InboundFileSynchronizingMessageSource, 
                         c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("input.file.name");
        fileMessageToJobRequest.setJob(delimitedFileJob);
        return fileMessageToJobRequest;
    }

因此,在JobLaunchingGateway中,我认为是问题所在:

如果我这样创建:

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

        return jobLaunchingGateway;
    }

情况1(应用程序启动时存储桶为空):

  • 我在AWS S3中上传了一个新文件;
  • 池工作,文件出现在本地目录中;
  • 但是转换/作业没有被解雇。

情况2(应用程序启动时,存储桶中已经有一个文件):

  • 作业启动:
2021-01-12 13:32:34.451  INFO 1955 --- [ask-scheduler-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=arquivoDelimitadoJob]] launched with the following parameters: [{input.file.name=files/FILE1.csv}]
2021-01-12 13:32:34.524  INFO 1955 --- [ask-scheduler-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [delimitedFileJob]
  • 如果我在S3中添加第二个文件,则该工作不会像情况1那样启动。

情况3(存储桶有多个文件):

  • 文件在本地目录中正确同步
  • 但是作业仅对最后一个文件执行一次。

因此,按照文档,我将网关更改为:

    @Bean
    @ServiceActivator(inputChannel = IN_CHANNEL_NAME, poller = @Poller(fixedRate="1000"))
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());

        //JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        //jobLaunchingGateway.setOutputChannel(replyChannel());
        jobLaunchingGateway.setOutputChannel(s3FilesChannel());
        return jobLaunchingGateway;
    }

有了这个新的网关实现,如果我在S3中放了一个新文件,则应用程序会做出反应,但不会进行转换,从而给出错误:

Caused by: java.lang.IllegalArgumentException: The payload must be of type JobLaunchRequest. Object of class [java.io.File] must be an instance of class org.springframework.batch.integration.launch.JobLaunchRequest

并且,如果存储桶中有两个文件(应用程序启动时)为FILE1.csv和FILE2.csv,则该作业会正确运行FILE1.csv,但会为FILE2.csv提供上述错误。

实现这样的东西的正确方法是什么?

为了清楚起见,我想在这个存储桶中接收数千个csv文件,使用Spring Batch进行读取和处理,但是我还需要尽快从S3获取每个新文件。

提前致谢。

阿尔特姆·比兰(Artem Bilan)

JobLaunchingGateway确实是从我们预计仅JobLaunchRequest作为有效载荷。

既然您@InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))S3InboundFileSynchronizingMessageSourcebean定义有了它,那么@ServiceActivator(inputChannel = IN_CHANNEL_NAMEJobLaunchingGateway没有FileMessageToJobRequest变压器的情况下拥有它确实是错误的

integrationFlow看起来对我来说还可以,但是您确实需要@InboundChannelAdapterS3InboundFileSynchronizingMessageSourceBean中删除它并完全依赖于c.poller()配置。

另一种方法是留下@InboundChannelAdapter,但随后开始IntegrationFlowIN_CHANNEL_NAME不是一个MessageSource

由于您具有针对同一S3源的多个轮询器,而且这两个轮询器均基于同一本地目录,因此看到如此多的意外情况就不足为奇了。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Spring Cloud DataFlow在AWS S3存储桶源中的新文件之后使用启动任务

如何使用AWS PowerShell在S3中创建新文件夹

使用AWS CLI在S3存储桶中下载最新文件?

AWS使用JAVA从s3对象创建新文件出现错误

带有Spring Cloud AWS S3的Spring Batch为欧盟存储桶返回301

如何使用AWS CLI更新文件夹中所有S3对象的ACL?

使用perticular目录中的jenkins为每个EAR文件构建创建新文件夹

如何使用 Spring Batch Integration 使用多个实例同时处理多个大文件?

使用Spring Integration JAR时Spring Batch没有退出

使用注解的 Spring 集成和 Spring Batch [Spring-Batch-Integration]

使用Spring Integration AWS轮询S3存储桶以进行文件和处理

从Spring Batch中的文件目录为每个输入文件创建输出文件

使用Rest API的Spring Batch启动/停止

使用Spring Batch从HDFS读取文件

Spring Batch:使用quoteCharacter解析CSV文件

使用Spring Batch处理大文件

使用Spring Batch sftp发送文件

Spring Batch FileHeaderFieldSetMapper中工厂模式的使用

如何使用Spring Batch从任何数据库表中读取记录并导出为TextFile

rclone - 如何列出 AWS S3 存储桶中具有最新文件的目录?

使用 powershell 在 s3 存儲桶中創建一個新文件夾

Spring Batch中的JobParameters

从文件系统为Spring Batch阅读器设置资源

新文件到达AWS S3后触发AWS Lambda

如何使用spring-integration-aws获取S3存储桶中当前日期的对象计数?

在更新文件时从AWS S3下载

如何使用Spring Cloud AWS从S3删除文件?

尝试使用spring-integration-aws S3StreamingMessageSource轮询S3时使用NPE

如何使用 Laravel 队列拦截 S3 上的新文件?