javax.websocket
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. web. socket. server. standard. ServerEndpointExporter ; @Configuration
public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndPointExporter ( ) { return new ServerEndpointExporter ( ) ; }
}
public enum WebSocketType { ON_OPEN , ON_MESSAGE , ON_ERROR , ON_CLOSE ;
}
@Getter
@ToString
public class WebSocketEvent extends ApplicationEvent { private final WebSocketType state; private final String bizId; private final Object data; private final LocalDateTime dateTime; private final String sessionId; public WebSocketEvent ( Object source, WebSocketType state, String sessionId, String bizId, Object data, LocalDateTime dateTime) { super ( source) ; this . state = state; this . sessionId = sessionId; this . bizId = bizId; this . data = data; this . dateTime = dateTime; } public WebSocketEvent ( Object source, WebSocketType state, String sessionId, String bizId, Object data) { this ( source, state, sessionId, bizId, data, LocalDateTime . now ( ) ) ; } public WebSocketEvent ( Object source, WebSocketType state, String sessionId, String bizId) { this ( source, state, sessionId, bizId, null , LocalDateTime . now ( ) ) ; } public WebSocketEvent ( Object source, WebSocketType state) { this ( source, state, null , null ) ; } }
import lombok. extern. slf4j. Slf4j ;
import org. apache. commons. collections4. CollectionUtils ;
import org. apache. commons. lang3. StringUtils ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. context. ApplicationContext ;
import org. springframework. stereotype. Component ;
import org. springframework. web. util. UriComponentsBuilder ; import javax. websocket. * ;
import javax. websocket. server. ServerEndpoint ;
import java. util. List ;
import java. util. Map ;
import java. util. Optional ;
import java. util. concurrent. ConcurrentHashMap ;
import java. util. concurrent. TimeUnit ;
import java. util. concurrent. atomic. AtomicInteger ;
import java. util. function. Consumer ; import static java. util. Objects . isNull ;
import static java. util. Objects . nonNull ; @Slf4j
@Component
@ServerEndpoint ( value = "/websocket" )
public class WebSocketServer { private static final long MAX_IDLE_TIMEOUT = TimeUnit . SECONDS . toMillis ( 45 ) ; private static final AtomicInteger ONLINE_COUNT = new AtomicInteger ( 0 ) ; private static final ConcurrentHashMap < String , SessionCache > SESSION_MAP = new ConcurrentHashMap < > ( ) ; private static ApplicationContext context; @Autowired public void setApplicationContext ( ApplicationContext context) { WebSocketServer . context = context; } @OnOpen public void onOpen ( Session session) { final String id = session. getId ( ) ; final List < String > allow = getQueryParams ( session) . get ( "allow" ) ; if ( CollectionUtils . isNotEmpty ( allow) ) { final String bizId = allow. get ( 0 ) ; if ( StringUtils . isNotBlank ( bizId) ) { final SessionCache put = SESSION_MAP . put ( id, new SessionCache ( session, bizId) ) ; int cnt = isNull ( put) ? ONLINE_COUNT . incrementAndGet ( ) : ONLINE_COUNT . get ( ) ; log. info ( "连接 [{} ({})] 已加入,当前连接数 : {}" , id, bizId, cnt) ; session. setMaxIdleTimeout ( MAX_IDLE_TIMEOUT ) ; context. publishEvent ( new WebSocketEvent ( this , WebSocketType . ON_OPEN , id, bizId) ) ; return ; } } closeQuietly ( session, CloseReason. CloseCodes . CANNOT_ACCEPT ) ; log. warn ( "连接 [{}] 被拒绝,没有设置业务标识" , id) ; } @OnClose public void onClose ( Session session) { final String id = session. getId ( ) ; SessionCache remove = SESSION_MAP . remove ( id) ; final boolean nonNull = nonNull ( remove) ; final String bizId = nonNull ? remove. bizId : null ; final int cnt = nonNull ? ONLINE_COUNT . decrementAndGet ( ) : ONLINE_COUNT . get ( ) ; log. info ( "连接 [{} ({})] 已断开,当前连接数 : {}" , id, bizId, cnt) ; context. publishEvent ( new WebSocketEvent ( this , WebSocketType . ON_CLOSE , id, bizId) ) ; } @OnError public void onError ( Session session, Throwable error) { final String id = session. getId ( ) ; final SessionCache sessionCache = SESSION_MAP . get ( id) ; final String bizId = nonNull ( sessionCache) ? sessionCache. bizId : null ; log. warn ( "连接 [{} ({})] 有错误 : {}\n{}" , id, bizId, error. getMessage ( ) , error. getStackTrace ( ) ) ; } @OnMessage public void onMessage ( String message, Session session) { final String id = session. getId ( ) ; final SessionCache sessionCache = SESSION_MAP . get ( id) ; final String bizId = nonNull ( sessionCache) ? sessionCache. bizId : null ; log. info ( "连接 [{} ({})] 有消息 : {}" , id, bizId, message) ; if ( StringUtils . isNotBlank ( message) ) { context. publishEvent ( new WebSocketEvent ( this , WebSocketType . ON_MESSAGE , id, bizId, message) ) ; } } public static boolean isOnline ( SessionCache cache, long timestamp) { return nonNull ( cache) && cache. session. isOpen ( ) && ! cache. isTimeout ( timestamp) ; } public static boolean isOnline ( String sessionId, long timestamp) { if ( StringUtils . isNotBlank ( sessionId) ) { return isOnline ( SESSION_MAP . get ( sessionId) , timestamp) ; } return false ; } public static void removeTimeout ( ) { final long now = System . currentTimeMillis ( ) ; SESSION_MAP . forEach ( ( key, value) -> { if ( value. isTimeout ( now) || ! value. session. isOpen ( ) ) { closeQuietly ( value. session, CloseReason. CloseCodes . GOING_AWAY ) ; SESSION_MAP . remove ( key) ; log. warn ( "主动断开 Timeout 连接 [{} ({})]" , key, value. bizId) ; } } ) ; } public static void refreshTimestamp ( String sessionId) { if ( StringUtils . isNotBlank ( sessionId) ) { final SessionCache cache = SESSION_MAP . get ( sessionId) ; if ( nonNull ( cache) ) { cache. refreshTimestamp ( ) ; } } } public static void removeUnAuthorization ( String sessionId) { final SessionCache cache = SESSION_MAP . get ( sessionId) ; if ( nonNull ( cache) ) { closeQuietly ( cache. session, CloseReason. CloseCodes . CANNOT_ACCEPT ) ; SESSION_MAP . remove ( sessionId) ; log. warn ( "主动断开 UnAuthorization 连接 [{} ({})]" , sessionId, cache. bizId) ; } } public static void send ( String sessionId, String message) { send ( sessionId, message, null ) ; } public static void send ( String sessionId, String message, Consumer < SendResult > callback) { final long now = System . currentTimeMillis ( ) ; if ( StringUtils . isNotBlank ( sessionId) ) { final SessionCache cache = SESSION_MAP . get ( sessionId) ; if ( isOnline ( cache, now) ) { cache. session. getAsyncRemote ( ) . sendText ( message, sendResult -> { if ( ! sendResult. isOK ( ) ) { Throwable ex = sendResult. getException ( ) ; final String bizId = cache. bizId; log. error ( "向 连接 [{} ({})] 发送数据 出错 : {}\n{}" , sessionId, bizId, ex. getMessage ( ) , ex. getStackTrace ( ) ) ; } Optional . ofNullable ( callback) . ifPresent ( x -> x. accept ( sendResult) ) ; } ) ; } } } public static void broadcast ( String message) { SESSION_MAP . forEach ( ( key, value) -> send ( key, message) ) ; } public static void broadcast ( String message, Consumer < SendResult > callback) { SESSION_MAP . forEach ( ( key, value) -> send ( key, message, callback) ) ; } public static void broadcast ( String bizId, String message) { broadcast ( bizId, message, null ) ; } public static void broadcast ( String bizId, String message, Consumer < SendResult > callback) { if ( StringUtils . isNotBlank ( bizId) ) { final long now = System . currentTimeMillis ( ) ; SESSION_MAP . forEach ( ( key, value) -> { if ( bizId. equals ( value. bizId) && isOnline ( value, now) ) { value. session. getAsyncRemote ( ) . sendText ( message, sendResult -> { if ( ! sendResult. isOK ( ) ) { Throwable ex = sendResult. getException ( ) ; final String sessionId = value. session. getId ( ) ; log. error ( "向 连接 [{} ({})] 发送数据 出错 : {}\n{}" , sessionId, bizId, ex. getMessage ( ) , ex. getStackTrace ( ) ) ; } Optional . ofNullable ( callback) . ifPresent ( x -> x. accept ( sendResult) ) ; } ) ; } } ) ; } } private static Map < String , List < String > > getQueryParams ( Session session) {
return UriComponentsBuilder . fromUri ( session. getRequestURI ( ) ) . build ( ) . getQueryParams ( ) ; } private static void closeQuietly ( Session session) { closeQuietly ( session, CloseReason. CloseCodes . NO_STATUS_CODE ) ; } private static void closeQuietly ( Session session, CloseReason. CloseCodes closeCode) { try { if ( session. isOpen ( ) ) { session. close ( new CloseReason ( closeCode, "" ) ) ; } } catch ( Exception ignored) { } } private static final class SessionCache { private Session session; private String bizId; private long timestamp; public SessionCache ( Session session) { this ( session, "" ) ; } public SessionCache ( Session session, String bizId) { this . session = session; this . bizId = bizId; this . timestamp = System . currentTimeMillis ( ) ; } public Session getSession ( ) { return session; } public void setSession ( Session session) { this . session = session; } public String getBizId ( ) { return bizId; } public void setBizId ( String bizId) { this . bizId = bizId; } public long getTimestamp ( ) { return timestamp; } public void setTimestamp ( long timestamp) { this . timestamp = timestamp; } public void refreshTimestamp ( ) { setTimestamp ( System . currentTimeMillis ( ) ) ; } public boolean isTimeout ( long now) { return now - getTimestamp ( ) > MAX_IDLE_TIMEOUT ; } } }