我有一个自定义的 WebSocket 处理程序(用于自定义子协议),以通常的方式注册:
public class WSConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry
.addHandler(new PerConnectionWebSocketHandler(CustomProtocolHandler.class), endpointUrl);
}
}
在CustomProtocolHandler
所有继承自WebSocketHandler
(例如afterConnectionEstablished
,handleMessage
等)的方法中,接收WebSocketSession
不是线程安全的。官方教程说ConcurrentWebSocketSessionDecorator
可以用来防止并发写入,但是当我的自定义处理程序被命中时,包装会话已经太晚了。
public class CustomProtocolHandler implements WebSocketHandler, SubProtocolCapable {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
//This is not safe
session.sendMessage(...);
//And this this doesn't make sense
new ConcurrentWebSocketSessionDecorator(session).sendMessage(...);
}
}
现在,我看到 Spring 自己的SubProtocolWebSocketHandler
(它和我的一样,实现了WebSocketHandler
和SubProtocolCapable
)确实在内部装饰了会话,我希望我能够将它用作基础,但是要创建SubProtocolWebSocketHandler
我需要的MessageChannel
来自 spring 消息传递。我既不使用 Spring Messaging,也不知道如何获得MessageChannel
. 我应该实施一个自定义的吗?
那么,ConcurrentWebSocketSessionDecorator
打算如何使用呢?我希望有一种方法可以用 注册会话包装逻辑webSocketHandlerRegistry
,但不是。我唯一的想法是维护另一个ConcurrentHashMap
将原始会话(或其 ID)映射到包装器的想法,但这太可怕了,我认为我不需要更多状态来管理和清理。
我最终制作了自己的版本PerConnectionWebSocketHandler
,就像原始版本一样,维护一个会话到处理程序的映射,但我的将处理程序包装到一个持有者中,该持有者还包含一个装饰会话供处理程序使用。
原来的PerConnectionWebSocketHandler
样子是这样的:
public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFactoryAware {
private final Map<WebSocketSession, WebSocketHandler> handlers = new ConcurrentHashMap<>();
...
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
WebSocketHandler handler = this.provider.getHandler();
this.handlers.put(session, handler);
handler.afterConnectionEstablished(session);
}
}
我的现在看起来像这样:
public class CustomPerConnectionWebSocketHandler implements WebSocketHandler {
private final Map<WebSocketSession, HandlerWrapper> handlers = new ConcurrentHashMap<>();
...
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
WebSocketHandler handler = ...;
//Here's a chance to decorate the session as needed
WebSocketSession decoratedSession = new ConcurrentWebSocketSessionDecorator(session, sendTimeLimit, sendBufferSizeLimit);
HandlerWrapper wrapper = new HandlerWrapper(handler, decoratedSession);
this.handlers.put(session, wrapper);
wrapper.afterConnectionEstablished();
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
//Delegate all the calls in this fashion
handlers.get(session).handleMessage(message);
}
...
private static class HandlerWrapper {
private final WebSocketHandler handler;
private final WebSocketSession session;
HandlerWrapper(WebSocketHandler handler, WebSocketSession session) {
this.handler = handler;
this.session = session;
}
void afterConnectionEstablished() throws Exception {
handler.afterConnectionEstablished(session);
}
void handleMessage(WebSocketMessage<?> message) throws Exception {
handler.handleMessage(session, message);
}
... //Other delegating methods
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句