无法使用 RabbitMQ 源 + 日志接收器流转换消息异常

马克·埃伯斯

我正在尝试 Spring Cloud Data Flow,今天我更新到了最新版本,因为我无法创建这个简单的例子,它应该简单地记录 AMQP 消息......

rabbit | log

当我部署这个流并简单地在消耗的队列上发布一个 String 消息时,这可以正常工作。但是当它是一个序列化的 PoJo 时,它不会。旧版本的数据流服务器 + 基于 spring boot 1.5.x 的启动应用程序就是这样做的。

Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"absolute_path":"/+~JF4472914347363856925.tmp","filename":"+~JF4472914347363856925.tmp","timestamp":1536315010932,"sshd_server":"localhost","sshd_port":22}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=sftp.uploaded, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_consumerQueue=sftp_uploaded, amqp_redelivered=false, id=d3d84d90-53ca-4c39-cdef-8665d35ddcf1, amqp_consumerTag=amq.ctag-8kP4KDjn13oae1Qutmw4IA, contentType=text/json, timestamp=1536315013950}]' to outbound message.
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:60) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:240) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:207) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    ... 22 common frames omitted
Caused by: java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"absolute_path":"/+~JF4472914347363856925.tmp","filename":"+~JF4472914347363856925.tmp","timestamp":1536315010932,"sshd_server":"localhost","sshd_port":22}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=sftp.uploaded, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_consumerQueue=sftp_uploaded, amqp_redelivered=false, id=d3d84d90-53ca-4c39-cdef-8665d35ddcf1, amqp_consumerTag=amq.ctag-8kP4KDjn13oae1Qutmw4IA, contentType=text/json, timestamp=1536315013950}]' to outbound message.
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:324) ~[spring-cloud-stream-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:351) ~[spring-cloud-stream-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:589) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:435) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    ... 32 common frames omitted

版本

  • spring-cloud-dataflow-server-local:1.6.2.RELEASE
  • Darwin-SR1-stream-applications-kafka-maven
奥列格·朱拉库斯基

Spring Cloud Stream 2.* 中的内容类型协商有重大变化和增强。你可以在这里阅读它https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#content-type-management基本上我看到的是您在堆栈中没有合适的 MessageConverter 。此外,据我所知,您的内容类型是text/json我们不提供转换器的。考虑将其更改为application/json.

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Spring Cloud Data Flow - rabbitmq 源接收器示例

从Apache Flink RabbitMQ源连接器访问消息属性

无法使用Go从RabbitMQ发送消息

无法收到消息RabbitMQ

使用RabbitMQ作为Flink DataStream源而不自动创建RabbitMQ队列

Flume-即使接收器无法运行,源也可以接受事件吗?

Flume HDFS接收器仅使用netcat源存储一行数据源

WeldProxy无法识别异常源

在不同服务器上使用avro源和接收器时发生错误

批量使用消息-RabbitMQ

使用RabbitMQ安排的消息

Flink 请求/响应模式可能与源/接收器结合使用吗?

Flume-使用Avro源和接收器分层数据流

siddhi-无法使用siddhi从Rabbitmq检索事件消息

无法使用 Swift 连接到远程 RabbitMQ 服务器

无法在RabbitMQ接收器中将content_type从application / octet-stream更改为application / json

使用本地托管的 rabbitmq 消息

“接收器输出,源输出,接收器卸载,源卸载”对GPU意味着什么?

根据上下文源将Serilog日志过滤到不同的接收器?

无法通过此代码接收消息(广播接收器)

如何使用封装的源和接收器测试akka流闭合形状可运行图

如何使用复制活动中的预复制脚本基于源中的更改跟踪表删除接收器中的记录?

是否可以使用SQL Server中的JDBC接收器来跳过源中未提供的列?

兔子侦听器引发异常时,无法在spring-amqp中修改Rabbitmq消息

无法在多个RabbitMQ交换之间发送消息

RabbitMQ无法序列化消息,错误转换

无法连接远程RabbitMQ服务器

无法销毁RabbitMQ服务器

在接收器源图中查找最小加权匹配