입력 채널에서 이벤트를 소비하는 스트림 리스너가 있는 스프링 클라우드 스트림 애플리케이션이 있습니다. 처리 방법의 실행을 기록하기 위해 AOP 조언을 추가할 때까지 모든 것이 원활하게 실행됩니다(응용 프로그램의 다른 사람 간에). 그 후 다음 오류와 함께 테스트가 실패하기 시작했습니다.
org.springframework.messaging.MessagingException: com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$$9795881e#handle[1 args]를 호출하는 동안 예외가 발생했습니다. 중첩 예외는 java.lang.IllegalStateException입니다. 매핑된 처리기 메서드 클래스 'com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$$9795881e$MockitoMock$1733324661'이 실제 끝점의 인스턴스가 아닙니다. 빈 클래스 'com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$$9795881e$$EnhancerBySpringCGLIB$$2a2d55ce'. 엔드포인트에 프록시가 필요한 경우(예: @Transactional로 인해) 클래스 기반 프록시를 사용하세요. HandlerMethod 세부 정보: ...
싱크 정의:
애플리케이션 코드는 다음과 같습니다.
public interface ExchangeRateStoreStreamSink {
String NEWEXCHANGERATE="new-exchange-rate";
@Input(NEWEXCHANGERATE)
SubscribableChannel newExchangeRate();
}
주석이 달린 메서드가 있는 스트림 리스너:
@EnableBinding(ExchangeRateStoreStreamSink.class)
public class ExchangeRateStoreStreamListener {
private CommandBus commandBus;
@Autowired
public ExchangeRateStoreStreamListener(CommandBus commandBus) {
this.commandBus = commandBus;
}
@Loggable(operationName="ExchangeRateConsumption")
@StreamListener(ExchangeRateStoreStreamSink.NEWEXCHANGERATE)
public void handle(NewExchangeRateMessage newExchangeRateMessage) {
AddExchangeRateCommand addExchangeRateCommand = new AddExchangeRateCommand(newExchangeRateMessage.from,
newExchangeRateMessage.to, newExchangeRateMessage.amount, newExchangeRateMessage.date);
commandBus.dispatch(addExchangeRateCommand);
}
}
테스트:
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class ExchangeRateStoreStreamListenerTest {
@Autowired
private ExchangeRateStoreStreamSink streamSink;
@SpyBean
private ExchangeRateStoreStreamListener streamListener;
@Test
public void test() {
SubscribableChannel input = streamSink.newExchangeRate();
NewExchangeRateMessage exchangeRateMessage = NewExchangeRateMessageFactory.aNewExchangeRateMessage();
input.send(new GenericMessage<>(exchangeRateMessage));
verify(streamListener).handle(any(NewExchangeRateMessage.class));
}
}
AOP 측면:
@Aspect
@Component
public class LoggingAspect {
private static final String API_DOMAIN = "fx";
@Pointcut(value = "@annotation(loggable) && execution(* *(..))", argNames = "loggable")
public void loggableMethod(Loggable loggable) { }
@Around(value = "loggableMethod(loggable)", argNames = "pjp,loggable")
public Object logAccess(ProceedingJoinPoint pjp, Loggable loggable) throws Throwable {
final Signature signature = pjp.getSignature();
final Logger logger = LogManager.getLogger(signature.getDeclaringType());
logger.info( "api_domain={} _operation={} _message=\"Start operation\"",
API_DOMAIN, loggable.operationName());
try {
return pjp.proceed();
} catch (DomainError domainError) {
// Some logic here
}
}
}
어떤 도움이든 환영합니다. 미리 감사드립니다!
StreamListener가 이미 프록시이기 때문입니다. 설명할 것이 너무 많고 블로그를 하기에 좋은 주제일 것입니다. . . 어쨌든, 훨씬 더 간단한 접근 방식으로 해결할 수 있는 해결하려는 실제 문제를 설명했지만, ChannelInterceptor
그것은 본질적으로 메시지 처리기 호출에 대한 주변 조언 역할을 합니다. 기본적으로 다음은 예입니다.
@Bean
@GlobalChannelInterceptor
public ChannelInterceptor channelInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> msg, MessageChannel mc) {
System.out.println("Before send to channel: " + mc);
return msg;
}
@Override
public void afterSendCompletion(Message<?> msg, MessageChannel mc, boolean bln, Exception excptn) {
System.out.println("After send completion to channel: " + mc);
}
@Override
public void postSend(Message<?> msg, MessageChannel mc, boolean bln) {
System.out.println("After send to channel: " + mc);
}
};
}
. . . 다음은 생성할 출력입니다.
Before send to channel: input
Before send to channel: integrationFlowCreator.channel#0
===> SOME LOG MESSAGE INSIDE YOUR CODE
After send to channel: integrationFlowCreator.channel#0
After send completion to channel: integrationFlowCreator.channel#0
After send to channel: input
After send completion to channel: input
구성에서 선언하기만 하면 됩니다. 모든 채널에 적용되므로 원하는 한 번만 (논리 포함) 모니터링할 수 있습니다. 자세한 내용은 Javadoc GlobalChannelInterceptor
을 참조하십시오. 도움이 되기를 바랍니다.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다