From d00cf7a5c2c88fb6c88b529e2d1025dade15e5ad Mon Sep 17 00:00:00 2001 From: lifangliang Date: Fri, 29 Aug 2025 11:01:13 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8Dws=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E5=BF=83=E8=B7=B3=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../printserver/main/ws/PrinterClient.java | 107 ++++++++++++++---- 1 file changed, 85 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/goeing/printserver/main/ws/PrinterClient.java b/src/main/java/com/goeing/printserver/main/ws/PrinterClient.java index 9795734..52260d0 100644 --- a/src/main/java/com/goeing/printserver/main/ws/PrinterClient.java +++ b/src/main/java/com/goeing/printserver/main/ws/PrinterClient.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -32,9 +33,12 @@ public class PrinterClient implements ApplicationRunner { private final PrintServerConfig config; private Session session; - private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(); - private boolean isConnecting = false; + private final ScheduledExecutorService connectionMonitor = Executors.newSingleThreadScheduledExecutor(); + private volatile ScheduledFuture heartbeatTask; + private volatile boolean isConnecting = false; + private int reconnectAttempts = 0; + private static final int MAX_RECONNECT_ATTEMPTS = 10; // 构造函数,注入PrintQueueService和PrintServerConfig public PrinterClient(@Lazy PrintQueueService printQueueService, PrintServerConfig config) { @@ -48,6 +52,8 @@ public class PrinterClient implements ApplicationRunner { this.session = session; log.info("WebSocket连接已建立"); isConnecting = false; + reconnectAttempts = 0; // 重置重连计数 + startHeartbeat(); } @OnMessage @@ -125,25 +131,22 @@ public class PrinterClient implements ApplicationRunner { public void onClose(Session session, CloseReason closeReason) { log.warn("WebSocket连接关闭: {}", closeReason.getReasonPhrase()); this.session = null; - // 安排重连任务 - scheduleReconnect(); + // 连接监控器会自动处理重连 } @OnError public void onError(Session session, Throwable throwable) { log.error("WebSocket连接发生错误", throwable); this.session = null; - - // 安排重连任务 - scheduleReconnect(); + // 连接监控器会自动处理重连 } /** * 连接到WebSocket服务器 */ private void connect() { - if (isConnecting || (session != null && session.isOpen())) { - return; // 已经连接或正在连接中 + if (isConnecting) { + return; // 正在连接中 } // 从配置对象中获取最新的连接参数 @@ -151,6 +154,11 @@ public class PrinterClient implements ApplicationRunner { String printerId = config.getPrinterId(); String apiKey = config.getApiKey(); + if (serverUri == null || serverUri.trim().isEmpty()) { + log.warn("WebSocket URL未配置,跳过连接"); + return; + } + // 添加调试日志 log.info("当前配置 - WebSocket URL: {}, PrinterId: {}, ApiKey: {}", serverUri, printerId, apiKey); @@ -165,9 +173,8 @@ public class PrinterClient implements ApplicationRunner { container.connectToServer(this, new URI(tempUrl)); } catch (Exception e) { log.error("连接到WebSocket服务器失败", e); + } finally { isConnecting = false; - // 连接失败,安排重连 - scheduleReconnect(); } } @@ -175,27 +182,63 @@ public class PrinterClient implements ApplicationRunner { * 启动心跳机制 */ private void startHeartbeat() { - heartbeatExecutor.scheduleAtFixedRate(() -> { + // 取消之前的心跳任务 + if (heartbeatTask != null && !heartbeatTask.isCancelled()) { + heartbeatTask.cancel(false); + } + + heartbeatTask = heartbeatExecutor.scheduleAtFixedRate(() -> { if (session != null && session.isOpen()) { try { session.getBasicRemote().sendText("{\"type\":\"heartbeat\"}"); log.debug("发送心跳"); } catch (IOException e) { - log.error("发送心跳失败", e); + log.debug("发送心跳失败", e); } } - }, 30, 30, TimeUnit.SECONDS); + }, 30, 20, TimeUnit.SECONDS); log.info("心跳机制已启动"); } /** - * 安排重连任务 + * 启动连接监控 */ - private void scheduleReconnect() { - reconnectExecutor.schedule(() -> { - log.info("尝试重新连接到WebSocket服务器..."); - connect(); - }, 5, TimeUnit.SECONDS); + private void startConnectionMonitor() { + connectionMonitor.scheduleWithFixedDelay(() -> { + try { + checkAndReconnect(); + } catch (Exception e) { + log.error("连接监控任务执行失败", e); + } + }, 10, 20, TimeUnit.SECONDS); // 每10秒检查一次 + + log.info("连接监控已启动,每10秒检查一次连接状态"); + } + + /** + * 检查连接状态并在需要时重连 + */ + private void checkAndReconnect() { + if (isConnected()) { + // 连接正常,重置重连计数 + reconnectAttempts = 0; + log.debug("WebSocket连接状态正常"); + return; + } + + if (isConnecting) { + log.debug("正在连接中,跳过此次检查"); + return; + } + + if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) { + log.error("重连次数已达上限({}次),停止重连", MAX_RECONNECT_ATTEMPTS); + return; + } + + log.info("检测到连接断开,开始重连...(第{}次尝试)", reconnectAttempts + 1); + reconnectAttempts++; + connect(); } /** @@ -204,6 +247,16 @@ public class PrinterClient implements ApplicationRunner { @PreDestroy public void shutdown() { log.info("正在关闭WebSocket客户端..."); + + // 取消心跳任务 + if (heartbeatTask != null && !heartbeatTask.isCancelled()) { + heartbeatTask.cancel(false); + } + + // 停止连接监控和心跳 + connectionMonitor.shutdownNow(); + heartbeatExecutor.shutdownNow(); + if (session != null) { try { session.close(); @@ -211,8 +264,6 @@ public class PrinterClient implements ApplicationRunner { log.error("关闭WebSocket连接失败", e); } } - reconnectExecutor.shutdownNow(); - heartbeatExecutor.shutdownNow(); log.info("WebSocket客户端已关闭"); } @@ -221,6 +272,12 @@ public class PrinterClient implements ApplicationRunner { */ public void reconnect() { log.info("配置已更改,重新连接WebSocket服务器..."); + + // 停止旧的心跳任务 + if (heartbeatTask != null && !heartbeatTask.isCancelled()) { + heartbeatTask.cancel(false); + } + if (session != null && session.isOpen()) { try { session.close(); @@ -230,6 +287,10 @@ public class PrinterClient implements ApplicationRunner { } session = null; isConnecting = false; + + // 重置重连计数 + reconnectAttempts = 0; + connect(); } @@ -270,6 +331,8 @@ public class PrinterClient implements ApplicationRunner { @Override public void run(ApplicationArguments args) { + // 启动连接监控 + startConnectionMonitor(); // 应用启动后连接到WebSocket服务器 connect(); }