fix: 修复ws连接心跳的问题

This commit is contained in:
lifangliang 2025-08-29 11:01:13 +08:00
parent ae2f6366c4
commit d00cf7a5c2

View File

@ -21,6 +21,7 @@ import java.net.URI;
import java.util.*; import java.util.*;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -32,9 +33,12 @@ public class PrinterClient implements ApplicationRunner {
private final PrintServerConfig config; private final PrintServerConfig config;
private Session session; private Session session;
private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
private boolean isConnecting = false; private final ScheduledExecutorService connectionMonitor = Executors.newSingleThreadScheduledExecutor();
private volatile ScheduledFuture<?> heartbeatTask;
private volatile boolean isConnecting = false;
private int reconnectAttempts = 0;
private static final int MAX_RECONNECT_ATTEMPTS = 10;
// 构造函数注入PrintQueueService和PrintServerConfig // 构造函数注入PrintQueueService和PrintServerConfig
public PrinterClient(@Lazy PrintQueueService printQueueService, PrintServerConfig config) { public PrinterClient(@Lazy PrintQueueService printQueueService, PrintServerConfig config) {
@ -48,6 +52,8 @@ public class PrinterClient implements ApplicationRunner {
this.session = session; this.session = session;
log.info("WebSocket连接已建立"); log.info("WebSocket连接已建立");
isConnecting = false; isConnecting = false;
reconnectAttempts = 0; // 重置重连计数
startHeartbeat();
} }
@OnMessage @OnMessage
@ -125,25 +131,22 @@ public class PrinterClient implements ApplicationRunner {
public void onClose(Session session, CloseReason closeReason) { public void onClose(Session session, CloseReason closeReason) {
log.warn("WebSocket连接关闭: {}", closeReason.getReasonPhrase()); log.warn("WebSocket连接关闭: {}", closeReason.getReasonPhrase());
this.session = null; this.session = null;
// 安排重连任务 // 连接监控器会自动处理重连
scheduleReconnect();
} }
@OnError @OnError
public void onError(Session session, Throwable throwable) { public void onError(Session session, Throwable throwable) {
log.error("WebSocket连接发生错误", throwable); log.error("WebSocket连接发生错误", throwable);
this.session = null; this.session = null;
// 连接监控器会自动处理重连
// 安排重连任务
scheduleReconnect();
} }
/** /**
* 连接到WebSocket服务器 * 连接到WebSocket服务器
*/ */
private void connect() { private void connect() {
if (isConnecting || (session != null && session.isOpen())) { if (isConnecting) {
return; // 已经连接或正在连接中 return; // 正在连接中
} }
// 从配置对象中获取最新的连接参数 // 从配置对象中获取最新的连接参数
@ -151,6 +154,11 @@ public class PrinterClient implements ApplicationRunner {
String printerId = config.getPrinterId(); String printerId = config.getPrinterId();
String apiKey = config.getApiKey(); String apiKey = config.getApiKey();
if (serverUri == null || serverUri.trim().isEmpty()) {
log.warn("WebSocket URL未配置跳过连接");
return;
}
// 添加调试日志 // 添加调试日志
log.info("当前配置 - WebSocket URL: {}, PrinterId: {}, ApiKey: {}", serverUri, printerId, apiKey); log.info("当前配置 - WebSocket URL: {}, PrinterId: {}, ApiKey: {}", serverUri, printerId, apiKey);
@ -165,9 +173,8 @@ public class PrinterClient implements ApplicationRunner {
container.connectToServer(this, new URI(tempUrl)); container.connectToServer(this, new URI(tempUrl));
} catch (Exception e) { } catch (Exception e) {
log.error("连接到WebSocket服务器失败", e); log.error("连接到WebSocket服务器失败", e);
} finally {
isConnecting = false; isConnecting = false;
// 连接失败安排重连
scheduleReconnect();
} }
} }
@ -175,27 +182,63 @@ public class PrinterClient implements ApplicationRunner {
* 启动心跳机制 * 启动心跳机制
*/ */
private void startHeartbeat() { private void startHeartbeat() {
heartbeatExecutor.scheduleAtFixedRate(() -> { // 取消之前的心跳任务
if (heartbeatTask != null && !heartbeatTask.isCancelled()) {
heartbeatTask.cancel(false);
}
heartbeatTask = heartbeatExecutor.scheduleAtFixedRate(() -> {
if (session != null && session.isOpen()) { if (session != null && session.isOpen()) {
try { try {
session.getBasicRemote().sendText("{\"type\":\"heartbeat\"}"); session.getBasicRemote().sendText("{\"type\":\"heartbeat\"}");
log.debug("发送心跳"); log.debug("发送心跳");
} catch (IOException e) { } catch (IOException e) {
log.error("发送心跳失败", e); log.debug("发送心跳失败", e);
} }
} }
}, 30, 30, TimeUnit.SECONDS); }, 30, 20, TimeUnit.SECONDS);
log.info("心跳机制已启动"); log.info("心跳机制已启动");
} }
/** /**
* 安排重连任务 * 启动连接监控
*/ */
private void scheduleReconnect() { private void startConnectionMonitor() {
reconnectExecutor.schedule(() -> { connectionMonitor.scheduleWithFixedDelay(() -> {
log.info("尝试重新连接到WebSocket服务器..."); try {
connect(); checkAndReconnect();
}, 5, TimeUnit.SECONDS); } catch (Exception e) {
log.error("连接监控任务执行失败", e);
}
}, 10, 20, TimeUnit.SECONDS); // 每10秒检查一次
log.info("连接监控已启动每10秒检查一次连接状态");
}
/**
* 检查连接状态并在需要时重连
*/
private void checkAndReconnect() {
if (isConnected()) {
// 连接正常重置重连计数
reconnectAttempts = 0;
log.debug("WebSocket连接状态正常");
return;
}
if (isConnecting) {
log.debug("正在连接中,跳过此次检查");
return;
}
if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
log.error("重连次数已达上限({}次),停止重连", MAX_RECONNECT_ATTEMPTS);
return;
}
log.info("检测到连接断开,开始重连...(第{}次尝试)", reconnectAttempts + 1);
reconnectAttempts++;
connect();
} }
/** /**
@ -204,6 +247,16 @@ public class PrinterClient implements ApplicationRunner {
@PreDestroy @PreDestroy
public void shutdown() { public void shutdown() {
log.info("正在关闭WebSocket客户端..."); log.info("正在关闭WebSocket客户端...");
// 取消心跳任务
if (heartbeatTask != null && !heartbeatTask.isCancelled()) {
heartbeatTask.cancel(false);
}
// 停止连接监控和心跳
connectionMonitor.shutdownNow();
heartbeatExecutor.shutdownNow();
if (session != null) { if (session != null) {
try { try {
session.close(); session.close();
@ -211,8 +264,6 @@ public class PrinterClient implements ApplicationRunner {
log.error("关闭WebSocket连接失败", e); log.error("关闭WebSocket连接失败", e);
} }
} }
reconnectExecutor.shutdownNow();
heartbeatExecutor.shutdownNow();
log.info("WebSocket客户端已关闭"); log.info("WebSocket客户端已关闭");
} }
@ -221,6 +272,12 @@ public class PrinterClient implements ApplicationRunner {
*/ */
public void reconnect() { public void reconnect() {
log.info("配置已更改重新连接WebSocket服务器..."); log.info("配置已更改重新连接WebSocket服务器...");
// 停止旧的心跳任务
if (heartbeatTask != null && !heartbeatTask.isCancelled()) {
heartbeatTask.cancel(false);
}
if (session != null && session.isOpen()) { if (session != null && session.isOpen()) {
try { try {
session.close(); session.close();
@ -230,6 +287,10 @@ public class PrinterClient implements ApplicationRunner {
} }
session = null; session = null;
isConnecting = false; isConnecting = false;
// 重置重连计数
reconnectAttempts = 0;
connect(); connect();
} }
@ -270,6 +331,8 @@ public class PrinterClient implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
// 启动连接监控
startConnectionMonitor();
// 应用启动后连接到WebSocket服务器 // 应用启动后连接到WebSocket服务器
connect(); connect();
} }