Commit 6eb36174 by guoxuejian Committed by gdj

feat: enhance MQTT configuration and add dynamic subscription recovery for online devices

parent e73893f6
...@@ -53,6 +53,7 @@ public class MqttConfiguration { ...@@ -53,6 +53,7 @@ public class MqttConfiguration {
adapter.setConverter(converter); adapter.setConverter(converter);
adapter.setQos(1); adapter.setQos(1);
adapter.setOutputChannel(inboundChannel); adapter.setOutputChannel(inboundChannel);
adapter.setRecoveryInterval(5000);
return adapter; return adapter;
} }
......
...@@ -3,16 +3,21 @@ package com.dji.sample.component; ...@@ -3,16 +3,21 @@ package com.dji.sample.component;
import com.dji.sample.component.redis.RedisConst; import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils; import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.manage.model.dto.DeviceDTO; import com.dji.sample.manage.model.dto.DeviceDTO;
import com.dji.sample.manage.service.IDeviceRedisService;
import com.dji.sample.manage.service.IDeviceService; import com.dji.sample.manage.service.IDeviceService;
import com.dji.sdk.common.SDKManager;
import com.dji.sdk.cloudapi.device.DeviceDomainEnum; import com.dji.sdk.cloudapi.device.DeviceDomainEnum;
import com.dji.sdk.mqtt.IMqttTopicService; import com.dji.sdk.mqtt.IMqttTopicService;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Arrays; import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
...@@ -31,6 +36,12 @@ public class GlobalScheduleService { ...@@ -31,6 +36,12 @@ public class GlobalScheduleService {
private IMqttTopicService topicService; private IMqttTopicService topicService;
@Autowired @Autowired
private IDeviceRedisService deviceRedisService;
@Autowired
private MqttPahoMessageDrivenChannelAdapter mqttInbound;
@Autowired
private ObjectMapper mapper; private ObjectMapper mapper;
/** /**
* Check the status of the devices every 30 seconds. It is recommended to use cache. * Check the status of the devices every 30 seconds. It is recommended to use cache.
...@@ -58,4 +69,86 @@ public class GlobalScheduleService { ...@@ -58,4 +69,86 @@ public class GlobalScheduleService {
log.info("Subscriptions: {}", Arrays.toString(topicService.getSubscribedTopic())); log.info("Subscriptions: {}", Arrays.toString(topicService.getSubscribedTopic()));
} }
/**
* Check MQTT connection/subscriptions every 30 seconds and recover missing dynamic subscriptions.
*/
@Scheduled(initialDelay = 30, fixedRate = 30, timeUnit = TimeUnit.SECONDS)
private void mqttConnectionCheck() {
if (!mqttInbound.isRunning()) {
log.warn("MQTT adapter is not running, attempting to restart and re-subscribe...");
try {
mqttInbound.start();
resubscribeOnlineDevices();
} catch (Exception e) {
log.error("Failed to restart MQTT adapter", e);
}
return;
}
String[] currentTopics = topicService.getSubscribedTopic();
int start = RedisConst.DEVICE_ONLINE_PREFIX.length();
boolean missingSubscription = false;
for (String key : RedisOpsUtils.getAllKeys(RedisConst.DEVICE_ONLINE_PREFIX + "*")) {
String sn = key.substring(start);
String osdTopic = "thing/product/" + sn + "/osd";
if (!containsTopic(currentTopics, osdTopic) && !containsTopic(currentTopics, "thing/product/+/osd")) {
missingSubscription = true;
break;
}
}
if (missingSubscription) {
log.warn("Detected missing MQTT subscriptions after reconnect, re-subscribing online devices...");
resubscribeOnlineDevices();
}
}
private boolean containsTopic(String[] topics, String expected) {
if (topics == null || topics.length == 0 || !StringUtils.hasText(expected)) {
return false;
}
for (String topic : topics) {
if (expected.equals(topic)) {
return true;
}
}
return false;
}
/**
* Re-subscribe all currently online non-drone devices.
*/
private void resubscribeOnlineDevices() {
int start = RedisConst.DEVICE_ONLINE_PREFIX.length();
RedisOpsUtils.getAllKeys(RedisConst.DEVICE_ONLINE_PREFIX + "*")
.stream()
.map(key -> key.substring(start))
.map(deviceRedisService::getDeviceOnline)
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(device -> {
try {
if (DeviceDomainEnum.DRONE == device.getDomain()) {
deviceService.subDroneOnlineSubscribeTopic(device.getDeviceSn());
log.info("Re-subscribed drone device: {}", device.getDeviceSn());
return;
}
String childSn = device.getChildDeviceSn();
String childThingVersion = StringUtils.hasText(childSn)
? deviceRedisService.getDeviceOnline(childSn).map(DeviceDTO::getThingVersion).orElse(null)
: null;
deviceService.subDeviceOnlineSubscribeTopic(
SDKManager.registerDevice(device.getDeviceSn(), childSn,
device.getDomain(), device.getType(), device.getSubType(),
device.getThingVersion(), childThingVersion));
log.info("Re-subscribed device: {}", device.getDeviceSn());
} catch (Exception e) {
log.error("Failed to re-subscribe device: {}", device.getDeviceSn(), e);
}
});
}
} }
\ No newline at end of file
...@@ -106,7 +106,7 @@ public class MqttPropertyConfiguration { ...@@ -106,7 +106,7 @@ public class MqttPropertyConfiguration {
mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ? mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ?
customizeOptions.getPassword().toCharArray() : new char[0]); customizeOptions.getPassword().toCharArray() : new char[0]);
mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setKeepAliveInterval(10); mqttConnectOptions.setKeepAliveInterval(60);
return mqttConnectOptions; return mqttConnectOptions;
} }
......
# 草莓机场 OSD 数据不转发到前端问题分析
# 草莓机场 OSD 数据不转发到前端问题分析
## 问题描述
草莓机场(S24M350S,type=88101)持续通过 MQTT 上报 OSD 数据,但前端显示设备一直离线,数据未能成功转发到 Web 浏览器端。
## 上报的 OSD 数据示例
```json
{
"tid": "dc856cbc-439d-42a9-bda4-60fec750f3c1",
"bid": "e4f6db87-30c7-423b-9e3d-5f6bdcde5550",
"timestamp": 1770624907673,
"gateway": "962A0996E2D65E01BF9DB05F75D7D622",
"data": {
"network_state": {"type": 2, "quality": 4, "rate": 10},
"drone_charge_state": {"state": 0, "capacity_percent": 0},
"drone_in_dock": 1,
...
}
}
```
## 问题根因分析
### 1. OSD 数据处理流程
```
MQTT Topic → OsdRouter.osdRouterFlow() → SDKManager.getDeviceSDK() → OsdDeviceTypeEnum.find() → osdDock()
```
### 2. 关键代码位置
#### OsdRouter.java (第 48 行)
```java
GatewayManager gateway = SDKManager.getDeviceSDK(response.getGateway());
```
**问题**:如果草莓机场的 SN 没有注册到 `SDKManager`,这里会抛出 `CloudSDKException`
#### SDKDeviceService.java (第 151-162 行)
```java
@Override
public void osdDock(TopicOsdRequest<OsdDock> request, MessageHeaders headers) {
String from = request.getFrom();
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(from);
if (deviceOpt.isEmpty() || !StringUtils.hasText(deviceOpt.get().getWorkspaceId())) {
deviceOpt = deviceService.getDeviceBySn(from);
if (deviceOpt.isEmpty()) {
log.error("Please restart the drone.");
return; // ❌ 直接返回,不推送数据
}
}
...
}
```
**问题**:设备不在 Redis 或数据库中时,直接返回,不推送 OSD 数据到前端
### 3. 设备注册时机
设备应该在以下时机注册到 `SDKManager`
- 设备上线时(`updateTopoOnline`
- 应用启动时(`ApplicationBootInitial`
## 可能的原因
### 原因 1:草莓机场未正确上线
- 草莓机场可能没有发送 `thing/product/{gateway_sn}/status` 上线消息
- 或者上线消息的 domain/type/subType 不正确
### 原因 2:设备未绑定到 Workspace
- 设备存在于数据库,但 `workspace_id` 字段为空
- 导致 `osdDock` 方法中的检查失败
### 原因 3:设备类型映射问题
虽然代码中已经定义了草莓机场的类型:
- `DeviceTypeEnum._88101(88101)`
- `DeviceEnum.S24M350S`
- `GatewayTypeEnum.DOCK` 包含 `DeviceEnum.S24M350S`
但可能在某个环节类型映射失败。
## 解决方案
### 方案 1:检查设备是否正确上线和绑定(推荐)
1. **检查数据库中的设备记录**
```sql
SELECT device_sn, device_name, workspace_id, domain, type, sub_type, thing_version, bound_status
FROM tbl_device
WHERE device_sn = '962A0996E2D65E01BF9DB05F75D7D622';
```
2. **检查 Redis 中的设备在线状态**
```bash
redis-cli
> GET device:online:962A0996E2D65E01BF9DB05F75D7D622
```
3. **检查设备是否发送了上线消息**
- 查看 MQTT 日志,确认是否收到 `thing/product/962A0996E2D65E01BF9DB05F75D7D622/status` 消息
- 确认消息内容中的 domain=3, type=88101, sub_type=0
4. **如果设备未绑定,手动绑定到 Workspace**
```sql
UPDATE tbl_device
SET workspace_id = 'your_workspace_id'
WHERE device_sn = '962A0996E2D65E01BF9DB05F75D7D622';
```
### 方案 2:增强错误处理和日志
修改 `OsdRouter.java`,添加异常捕获:
```java
@Bean
public IntegrationFlow osdRouterFlow() {
return IntegrationFlows
.from(ChannelName.INBOUND_OSD)
.transform(Message.class, source -> {
try {
TopicOsdRequest response = Common.getObjectMapper().readValue(
(byte[]) source.getPayload(), new TypeReference<TopicOsdRequest>() {});
String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
return response.setFrom(topic.substring(
(THING_MODEL_PRE + PRODUCT).length(), topic.indexOf(OSD_SUF)));
} catch (IOException e) {
throw new CloudSDKException(e);
}
}, null)
.<TopicOsdRequest>handle((response, headers) -> {
try {
GatewayManager gateway = SDKManager.getDeviceSDK(response.getGateway());
OsdDeviceTypeEnum typeEnum = OsdDeviceTypeEnum.find(
gateway.getType(), response.getFrom().equals(response.getGateway()));
// ... 后续处理
} catch (CloudSDKException e) {
log.error("Failed to process OSD data for gateway: {}, error: {}",
response.getGateway(), e.getMessage());
// 尝试自动注册设备或返回 null 跳过
return null;
}
})
// ... 其他处理
.get();
}
```
### 方案 3:修改 osdDock 方法,允许未绑定设备推送数据
修改 `SDKDeviceService.java``osdDock` 方法:
```java
@Override
public void osdDock(TopicOsdRequest<OsdDock> request, MessageHeaders headers) {
String from = request.getFrom();
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(from);
if (deviceOpt.isEmpty() || !StringUtils.hasText(deviceOpt.get().getWorkspaceId())) {
deviceOpt = deviceService.getDeviceBySn(from);
if (deviceOpt.isEmpty()) {
log.warn("Device {} not found in database, OSD data will not be pushed", from);
return;
}
if (!StringUtils.hasText(deviceOpt.get().getWorkspaceId())) {
log.warn("Device {} not bound to workspace, OSD data will not be pushed", from);
return;
}
}
DeviceDTO device = deviceOpt.get();
if (StringUtils.hasText(device.getChildDeviceSn())) {
deviceService.getDeviceBySn(device.getChildDeviceSn()).ifPresent(device::setChildren);
}
deviceRedisService.setDeviceOnline(device);
fillDockOsd(from, request.getData());
// 推送 OSD 数据到前端
deviceService.pushOsdDataToWeb(device.getWorkspaceId(), BizCodeEnum.DOCK_OSD, from, request.getData());
}
```
## 调试步骤
1. **启用详细日志**
-`application.yml` 中设置:
```yaml
logging:
level:
com.raytue.sdk.mqtt.osd: DEBUG
com.raytue.sample.manage.service.impl.SDKDeviceService: DEBUG
```
2. **查看后台日志**
- 搜索关键字:`962A0996E2D65E01BF9DB05F75D7D622`
- 查找异常:`CloudSDKException`, `NOT_REGISTERED`
- 查找警告:`Please restart the drone`, `Please bind the dock first`
3. **使用 MQTT 工具订阅主题**
```
thing/product/+/osd
thing/product/962A0996E2D65E01BF9DB05F75D7D622/status
```
4. **检查 WebSocket 连接**
- 前端是否正确连接到 WebSocket
- WebSocket 是否订阅了正确的 workspace
## 预期结果
修复后,草莓机场的 OSD 数据应该能够:
1.`OsdRouter` 正确路由到 `osdDock` 方法
2. 通过 `pushOsdDataToWeb` 推送到前端
3. 前端通过 WebSocket 接收到数据并更新设备状态为在线
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment