【Java】javax.websocket

chatgpt/2023/9/27 17:38:41

javax.websocket

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndPointExporter() {return new ServerEndpointExporter();}
}
public enum WebSocketType {ON_OPEN,ON_MESSAGE,ON_ERROR,ON_CLOSE;
}
@Getter
@ToString
public class WebSocketEvent extends ApplicationEvent {private final WebSocketType state;private final String bizId;private final Object data;private final LocalDateTime dateTime;private final String sessionId;public WebSocketEvent(Object source, WebSocketType state, String sessionId, String bizId, Object data, LocalDateTime dateTime) {super(source);this.state = state;this.sessionId = sessionId;this.bizId = bizId;this.data = data;this.dateTime = dateTime;}public WebSocketEvent(Object source, WebSocketType state, String sessionId, String bizId, Object data) {this(source, state, sessionId, bizId, data, LocalDateTime.now());}public WebSocketEvent(Object source, WebSocketType state, String sessionId, String bizId) {this(source, state, sessionId, bizId, null, LocalDateTime.now());}public WebSocketEvent(Object source, WebSocketType state) {this(source, state, null, null);}}
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.web.util.UriComponentsBuilder;import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;@Slf4j
@Component
@ServerEndpoint(value = "/websocket")
public class WebSocketServer {private static final long MAX_IDLE_TIMEOUT = TimeUnit.SECONDS.toMillis(45);private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);private static final ConcurrentHashMap<String, SessionCache> SESSION_MAP = new ConcurrentHashMap<>();private static ApplicationContext context;@Autowiredpublic void setApplicationContext(ApplicationContext context) {WebSocketServer.context = context;}/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session) {final String id = session.getId();final List<String> allow = getQueryParams(session).get("allow");if (CollectionUtils.isNotEmpty(allow)) {final String bizId = allow.get(0);if (StringUtils.isNotBlank(bizId)) {final SessionCache put = SESSION_MAP.put(id, new SessionCache(session, bizId));int cnt = isNull(put) ? ONLINE_COUNT.incrementAndGet() : ONLINE_COUNT.get();log.info("连接 [{} ({})] 已加入,当前连接数 : {}", id, bizId, cnt);session.setMaxIdleTimeout(MAX_IDLE_TIMEOUT);context.publishEvent(new WebSocketEvent(this, WebSocketType.ON_OPEN, id, bizId));return;}}closeQuietly(session, CloseReason.CloseCodes.CANNOT_ACCEPT);log.warn("连接 [{}] 被拒绝,没有设置业务标识", id);}/*** 连接关闭调用的方法*/@OnClosepublic void onClose(Session session) {final String id = session.getId();SessionCache remove = SESSION_MAP.remove(id);final boolean nonNull = nonNull(remove);final String bizId = nonNull ? remove.bizId : null;final int cnt = nonNull ? ONLINE_COUNT.decrementAndGet() : ONLINE_COUNT.get();log.info("连接 [{} ({})] 已断开,当前连接数 : {}", id, bizId, cnt);context.publishEvent(new WebSocketEvent(this, WebSocketType.ON_CLOSE, id, bizId));}/*** 出现错误** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {final String id = session.getId();final SessionCache sessionCache = SESSION_MAP.get(id);final String bizId = nonNull(sessionCache) ? sessionCache.bizId : null;log.warn("连接 [{} ({})] 有错误 : {}\n{}", id, bizId, error.getMessage(), error.getStackTrace());}/*** 收到客户端消息后调用的方法** @param message* @param session*/@OnMessagepublic void onMessage(String message, Session session) {final String id = session.getId();final SessionCache sessionCache = SESSION_MAP.get(id);final String bizId = nonNull(sessionCache) ? sessionCache.bizId : null;log.info("连接 [{} ({})] 有消息 : {}", id, bizId, message);if (StringUtils.isNotBlank(message)) {context.publishEvent(new WebSocketEvent(this, WebSocketType.ON_MESSAGE, id, bizId, message));}}public static boolean isOnline(SessionCache cache, long timestamp) {return nonNull(cache) && cache.session.isOpen() && !cache.isTimeout(timestamp);}public static boolean isOnline(String sessionId, long timestamp) {if (StringUtils.isNotBlank(sessionId)) {return isOnline(SESSION_MAP.get(sessionId), timestamp);}return false;}public static void removeTimeout() {final long now = System.currentTimeMillis();SESSION_MAP.forEach((key, value) -> {if (value.isTimeout(now) || !value.session.isOpen()) {closeQuietly(value.session, CloseReason.CloseCodes.GOING_AWAY);SESSION_MAP.remove(key);log.warn("主动断开 Timeout 连接 [{} ({})]", key, value.bizId);}});}public static void refreshTimestamp(String sessionId) {if (StringUtils.isNotBlank(sessionId)) {final SessionCache cache = SESSION_MAP.get(sessionId);if (nonNull(cache)) {cache.refreshTimestamp();}}}public static void removeUnAuthorization(String sessionId) {final SessionCache cache = SESSION_MAP.get(sessionId);if (nonNull(cache)) {closeQuietly(cache.session, CloseReason.CloseCodes.CANNOT_ACCEPT);SESSION_MAP.remove(sessionId);log.warn("主动断开 UnAuthorization 连接 [{} ({})]", sessionId, cache.bizId);}}/*** 发送消息,实践表明,每次浏览器刷新,session会发生变化。** @param sessionId* @param message*/public static void send(String sessionId, String message) {send(sessionId, message, null);}public static void send(String sessionId, String message, Consumer<SendResult> callback) {final long now = System.currentTimeMillis();if (StringUtils.isNotBlank(sessionId)) {final SessionCache cache = SESSION_MAP.get(sessionId);if (isOnline(cache, now)) {cache.session.getAsyncRemote().sendText(message, sendResult -> {if (!sendResult.isOK()) {Throwable ex = sendResult.getException();final String bizId = cache.bizId;log.error("向 连接 [{} ({})] 发送数据 出错 : {}\n{}", sessionId, bizId, ex.getMessage(), ex.getStackTrace());}Optional.ofNullable(callback).ifPresent(x -> x.accept(sendResult));});}}}public static void broadcast(String message) {SESSION_MAP.forEach((key, value) -> send(key, message));}public static void broadcast(String message, Consumer<SendResult> callback) {SESSION_MAP.forEach((key, value) -> send(key, message, callback));}public static void broadcast(String bizId, String message) {broadcast(bizId, message, null);}public static void broadcast(String bizId, String message, Consumer<SendResult> callback) {if (StringUtils.isNotBlank(bizId)) {final long now = System.currentTimeMillis();SESSION_MAP.forEach((key, value) -> {if (bizId.equals(value.bizId) && isOnline(value, now)) {value.session.getAsyncRemote().sendText(message, sendResult -> {if (!sendResult.isOK()) {Throwable ex = sendResult.getException();final String sessionId = value.session.getId();log.error("向 连接 [{} ({})] 发送数据 出错 : {}\n{}", sessionId, bizId, ex.getMessage(), ex.getStackTrace());}Optional.ofNullable(callback).ifPresent(x -> x.accept(sendResult));});}});}}private static Map<String, List<String>> getQueryParams(Session session) {
//        return session.getRequestParameterMap();return UriComponentsBuilder.fromUri(session.getRequestURI()).build().getQueryParams();}private static void closeQuietly(Session session) {closeQuietly(session, CloseReason.CloseCodes.NO_STATUS_CODE);}private static void closeQuietly(Session session, CloseReason.CloseCodes closeCode) {try {if (session.isOpen()) {session.close(new CloseReason(closeCode, ""));}} catch (Exception ignored) {}}private static final class SessionCache {private Session session;private String bizId;private long timestamp;public SessionCache(Session session) {this(session, "");}public SessionCache(Session session, String bizId) {this.session = session;this.bizId = bizId;this.timestamp = System.currentTimeMillis();}public Session getSession() {return session;}public void setSession(Session session) {this.session = session;}public String getBizId() {return bizId;}public void setBizId(String bizId) {this.bizId = bizId;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public void refreshTimestamp() {setTimestamp(System.currentTimeMillis());}public boolean isTimeout(long now) {return now - getTimestamp() > MAX_IDLE_TIMEOUT;}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.exyb.cn/news/show-5312798.html

如若内容造成侵权/违法违规/事实不符,请联系郑州代理记账网进行投诉反馈,一经查实,立即删除!

相关文章

Qt 3. QSerialPortInfo显示串口信息在QTextEdit显示

//ex2.cpp #include "ex2.h" #include "ui_ex2.h" #include <QtSerialPort/QtSerialPort>int static cnt 0;Ex2::Ex2(QWidget *parent): QDialog(parent), ui(new Ui::Ex2) {ui->setupUi(this); }Ex2::~Ex2() {delete ui; }void Ex2::on_pushBu

QT基于TCP协议实现数据传输以及波形绘制——安卓APP及Windows程序双版本

文章代码有非常非常之详细的解析&#xff01;&#xff01;&#xff01;诸位可放心食用 这个玩意我做了两个&#xff0c;一个是安卓app&#xff0c;一个是Windows程序。代码并非全部都是由我从无到有实现&#xff0c;只是实现了我想要的功能。多亏了巨人的肩膀&#xff0c;开源…

天工开物 #7 Rust 与 Java 程序的异步接口互操作

许多语言的高性能程序库都是建立在 C/C 的核心实现上的。 例如&#xff0c;著名 Python 科学计算库 Pandas 和 Numpy 的核心是 C 实现的&#xff0c;RocksDB 的 Java 接口是对底层 C 接口的封装。 Rust 语言的基本目标之一就是替代 C 在这些领域的位置&#xff0c;为开发者提供…

1400*B. Swaps(排序)

Example input 3 2 3 1 4 2 3 5 3 1 2 4 6 5 7 5 9 1 3 2 4 6 10 8 output 0 2 3 题意&#xff1a; 每次交换相邻的两个数&#xff0c;问两个数列共同交换多少次&#xff0c;可以使得第一个数列的首个数字小于第二个数列的首个数字&#xff0c;求最少的交换次数。 解析&am…

AIGC“弄脏”互联网 大模型“课本”遭污染

“AI制造”充斥互联网&#xff0c;连“真人小姐姐”也可以批量生成。随着生成式人工智能的爆发&#xff0c;一个可怕的现象出现&#xff1a;AI正在污染整个互联网。 知乎成为生成无脑答案的重灾区&#xff0c;这些内容描述简短、概括性十足&#xff0c;细看逻辑混乱、错误百出…

高忆管理:股票投资策略是什么?有哪些?

在进行股票买卖过程中&#xff0c;出资者需求有自己的方案和出资战略&#xff0c;并且主张严格遵从出资战略买卖&#xff0c;不要跟风操作。那么股票出资战略是什么&#xff1f;有哪些&#xff1f;下面就由高忆管理为我们剖析&#xff1a; 股票出资战略简略来说便是能够协助出资…

02mysql查询语句之运算符总结

# 1.选择工资不在5000到12000的员工的姓名和工资 SELECT last_name,salary FROM employees # where salary not between 5000 and 12000; WHERE salary < 5000 OR salary > 12000; # 2.选择在20或50号部门工作的员工姓名和部门号 SELECT last_name,department_id FROM…

Clion开发Stm32之温湿度传感器(DHT11)驱动编写

前言 涵盖之前文章: Clion开发STM32之HAL库GPIO宏定义封装(最新版)Clion开发stm32之微妙延迟(采用nop指令实现)Clion开发STM32之日志模块(参考RT-Thread) DHT11驱动文件 头文件 /*******************************************************************************Copyrig…
推荐文章