Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
G
GeoFlyApi
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
GeoFly
GeoFlyApi
Commits
b7b1a226
Commit
b7b1a226
authored
Mar 04, 2026
by
guoxuejian
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
feat: enhance MQTT configuration and add dynamic subscription recovery for online devices
parent
1cfe2bbc
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
306 additions
and
1 deletions
+306
-1
cloud-sdk/src/main/java/com/dji/sdk/mqtt/MqttConfiguration.java
+1
-0
sample/src/main/java/com/dji/sample/component/GlobalScheduleService.java
+94
-0
sample/src/main/java/com/dji/sample/component/mqtt/config/MqttPropertyConfiguration.java
+1
-1
草莓机场OSD数据不转发问题分析.md
+210
-0
No files found.
cloud-sdk/src/main/java/com/dji/sdk/mqtt/MqttConfiguration.java
View file @
b7b1a226
...
@@ -53,6 +53,7 @@ public class MqttConfiguration {
...
@@ -53,6 +53,7 @@ public class MqttConfiguration {
adapter
.
setConverter
(
converter
);
adapter
.
setConverter
(
converter
);
adapter
.
setQos
(
1
);
adapter
.
setQos
(
1
);
adapter
.
setOutputChannel
(
inboundChannel
);
adapter
.
setOutputChannel
(
inboundChannel
);
adapter
.
setRecoveryInterval
(
5000
);
return
adapter
;
return
adapter
;
}
}
...
...
sample/src/main/java/com/dji/sample/component/GlobalScheduleService.java
View file @
b7b1a226
...
@@ -3,16 +3,21 @@ package com.dji.sample.component;
...
@@ -3,16 +3,21 @@ package com.dji.sample.component;
import
com.dji.sample.component.redis.RedisConst
;
import
com.dji.sample.component.redis.RedisConst
;
import
com.dji.sample.component.redis.RedisOpsUtils
;
import
com.dji.sample.component.redis.RedisOpsUtils
;
import
com.dji.sample.manage.model.dto.DeviceDTO
;
import
com.dji.sample.manage.model.dto.DeviceDTO
;
import
com.dji.sample.manage.service.IDeviceRedisService
;
import
com.dji.sample.manage.service.IDeviceService
;
import
com.dji.sample.manage.service.IDeviceService
;
import
com.dji.sdk.common.SDKManager
;
import
com.dji.sdk.cloudapi.device.DeviceDomainEnum
;
import
com.dji.sdk.cloudapi.device.DeviceDomainEnum
;
import
com.dji.sdk.mqtt.IMqttTopicService
;
import
com.dji.sdk.mqtt.IMqttTopicService
;
import
org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.scheduling.annotation.Scheduled
;
import
org.springframework.scheduling.annotation.Scheduled
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
import
org.springframework.util.StringUtils
;
import
java.util.Arrays
;
import
java.util.Arrays
;
import
java.util.Optional
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeUnit
;
/**
/**
...
@@ -31,6 +36,12 @@ public class GlobalScheduleService {
...
@@ -31,6 +36,12 @@ public class GlobalScheduleService {
private
IMqttTopicService
topicService
;
private
IMqttTopicService
topicService
;
@Autowired
@Autowired
private
IDeviceRedisService
deviceRedisService
;
@Autowired
private
MqttPahoMessageDrivenChannelAdapter
mqttInbound
;
@Autowired
private
ObjectMapper
mapper
;
private
ObjectMapper
mapper
;
/**
/**
* Check the status of the devices every 30 seconds. It is recommended to use cache.
* Check the status of the devices every 30 seconds. It is recommended to use cache.
...
@@ -58,4 +69,86 @@ public class GlobalScheduleService {
...
@@ -58,4 +69,86 @@ public class GlobalScheduleService {
log
.
info
(
"Subscriptions: {}"
,
Arrays
.
toString
(
topicService
.
getSubscribedTopic
()));
log
.
info
(
"Subscriptions: {}"
,
Arrays
.
toString
(
topicService
.
getSubscribedTopic
()));
}
}
/**
* Check MQTT connection/subscriptions every 30 seconds and recover missing dynamic subscriptions.
*/
@Scheduled
(
initialDelay
=
30
,
fixedRate
=
30
,
timeUnit
=
TimeUnit
.
SECONDS
)
private
void
mqttConnectionCheck
()
{
if
(!
mqttInbound
.
isRunning
())
{
log
.
warn
(
"MQTT adapter is not running, attempting to restart and re-subscribe..."
);
try
{
mqttInbound
.
start
();
resubscribeOnlineDevices
();
}
catch
(
Exception
e
)
{
log
.
error
(
"Failed to restart MQTT adapter"
,
e
);
}
return
;
}
String
[]
currentTopics
=
topicService
.
getSubscribedTopic
();
int
start
=
RedisConst
.
DEVICE_ONLINE_PREFIX
.
length
();
boolean
missingSubscription
=
false
;
for
(
String
key
:
RedisOpsUtils
.
getAllKeys
(
RedisConst
.
DEVICE_ONLINE_PREFIX
+
"*"
))
{
String
sn
=
key
.
substring
(
start
);
String
osdTopic
=
"thing/product/"
+
sn
+
"/osd"
;
if
(!
containsTopic
(
currentTopics
,
osdTopic
)
&&
!
containsTopic
(
currentTopics
,
"thing/product/+/osd"
))
{
missingSubscription
=
true
;
break
;
}
}
if
(
missingSubscription
)
{
log
.
warn
(
"Detected missing MQTT subscriptions after reconnect, re-subscribing online devices..."
);
resubscribeOnlineDevices
();
}
}
private
boolean
containsTopic
(
String
[]
topics
,
String
expected
)
{
if
(
topics
==
null
||
topics
.
length
==
0
||
!
StringUtils
.
hasText
(
expected
))
{
return
false
;
}
for
(
String
topic
:
topics
)
{
if
(
expected
.
equals
(
topic
))
{
return
true
;
}
}
return
false
;
}
/**
* Re-subscribe all currently online non-drone devices.
*/
private
void
resubscribeOnlineDevices
()
{
int
start
=
RedisConst
.
DEVICE_ONLINE_PREFIX
.
length
();
RedisOpsUtils
.
getAllKeys
(
RedisConst
.
DEVICE_ONLINE_PREFIX
+
"*"
)
.
stream
()
.
map
(
key
->
key
.
substring
(
start
))
.
map
(
deviceRedisService:
:
getDeviceOnline
)
.
filter
(
Optional:
:
isPresent
)
.
map
(
Optional:
:
get
)
.
forEach
(
device
->
{
try
{
if
(
DeviceDomainEnum
.
DRONE
==
device
.
getDomain
())
{
deviceService
.
subDroneOnlineSubscribeTopic
(
device
.
getDeviceSn
());
log
.
info
(
"Re-subscribed drone device: {}"
,
device
.
getDeviceSn
());
return
;
}
String
childSn
=
device
.
getChildDeviceSn
();
String
childThingVersion
=
StringUtils
.
hasText
(
childSn
)
?
deviceRedisService
.
getDeviceOnline
(
childSn
).
map
(
DeviceDTO:
:
getThingVersion
).
orElse
(
null
)
:
null
;
deviceService
.
subDeviceOnlineSubscribeTopic
(
SDKManager
.
registerDevice
(
device
.
getDeviceSn
(),
childSn
,
device
.
getDomain
(),
device
.
getType
(),
device
.
getSubType
(),
device
.
getThingVersion
(),
childThingVersion
));
log
.
info
(
"Re-subscribed device: {}"
,
device
.
getDeviceSn
());
}
catch
(
Exception
e
)
{
log
.
error
(
"Failed to re-subscribe device: {}"
,
device
.
getDeviceSn
(),
e
);
}
});
}
}
}
\ No newline at end of file
sample/src/main/java/com/dji/sample/component/mqtt/config/MqttPropertyConfiguration.java
View file @
b7b1a226
...
@@ -125,7 +125,7 @@ public class MqttPropertyConfiguration {
...
@@ -125,7 +125,7 @@ public class MqttPropertyConfiguration {
mqttConnectOptions
.
setPassword
(
StringUtils
.
hasText
(
customizeOptions
.
getPassword
())
?
mqttConnectOptions
.
setPassword
(
StringUtils
.
hasText
(
customizeOptions
.
getPassword
())
?
customizeOptions
.
getPassword
().
toCharArray
()
:
new
char
[
0
]);
customizeOptions
.
getPassword
().
toCharArray
()
:
new
char
[
0
]);
mqttConnectOptions
.
setAutomaticReconnect
(
true
);
mqttConnectOptions
.
setAutomaticReconnect
(
true
);
mqttConnectOptions
.
setKeepAliveInterval
(
1
0
);
mqttConnectOptions
.
setKeepAliveInterval
(
6
0
);
return
mqttConnectOptions
;
return
mqttConnectOptions
;
}
}
...
...
草莓机场OSD数据不转发问题分析.md
0 → 100644
View file @
b7b1a226
# 草莓机场 OSD 数据不转发到前端问题分析
# 草莓机场 OSD 数据不转发到前端问题分析
## 问题描述
草莓机场(S24M350S,type=88101)持续通过 MQTT 上报 OSD 数据,但前端显示设备一直离线,数据未能成功转发到 Web 浏览器端。
## 上报的 OSD 数据示例
```
json
{
"tid"
:
"dc856cbc-439d-42a9-bda4-60fec750f3c1"
,
"bid"
:
"e4f6db87-30c7-423b-9e3d-5f6bdcde5550"
,
"timestamp"
:
1770624907673
,
"gateway"
:
"962A0996E2D65E01BF9DB05F75D7D622"
,
"data"
:
{
"network_state"
:
{
"type"
:
2
,
"quality"
:
4
,
"rate"
:
10
},
"drone_charge_state"
:
{
"state"
:
0
,
"capacity_percent"
:
0
},
"drone_in_dock"
:
1
,
...
}
}
```
## 问题根因分析
### 1. OSD 数据处理流程
```
MQTT Topic → OsdRouter.osdRouterFlow() → SDKManager.getDeviceSDK() → OsdDeviceTypeEnum.find() → osdDock()
```
### 2. 关键代码位置
#### OsdRouter.java (第 48 行)
```
java
GatewayManager
gateway
=
SDKManager
.
getDeviceSDK
(
response
.
getGateway
());
```
**问题**
:如果草莓机场的 SN 没有注册到
`SDKManager`
,这里会抛出
`CloudSDKException`
#### SDKDeviceService.java (第 151-162 行)
```
java
@Override
public
void
osdDock
(
TopicOsdRequest
<
OsdDock
>
request
,
MessageHeaders
headers
)
{
String
from
=
request
.
getFrom
();
Optional
<
DeviceDTO
>
deviceOpt
=
deviceRedisService
.
getDeviceOnline
(
from
);
if
(
deviceOpt
.
isEmpty
()
||
!
StringUtils
.
hasText
(
deviceOpt
.
get
().
getWorkspaceId
()))
{
deviceOpt
=
deviceService
.
getDeviceBySn
(
from
);
if
(
deviceOpt
.
isEmpty
())
{
log
.
error
(
"Please restart the drone."
);
return
;
// ❌ 直接返回,不推送数据
}
}
...
}
```
**问题**
:设备不在 Redis 或数据库中时,直接返回,不推送 OSD 数据到前端
### 3. 设备注册时机
设备应该在以下时机注册到
`SDKManager`
:
-
设备上线时(
`updateTopoOnline`
)
-
应用启动时(
`ApplicationBootInitial`
)
## 可能的原因
### 原因 1:草莓机场未正确上线
-
草莓机场可能没有发送
`thing/product/{gateway_sn}/status`
上线消息
-
或者上线消息的 domain/type/subType 不正确
### 原因 2:设备未绑定到 Workspace
-
设备存在于数据库,但
`workspace_id`
字段为空
-
导致
`osdDock`
方法中的检查失败
### 原因 3:设备类型映射问题
虽然代码中已经定义了草莓机场的类型:
-
`DeviceTypeEnum._88101(88101)`
-
`DeviceEnum.S24M350S`
-
`GatewayTypeEnum.DOCK`
包含
`DeviceEnum.S24M350S`
但可能在某个环节类型映射失败。
## 解决方案
### 方案 1:检查设备是否正确上线和绑定(推荐)
1.
**检查数据库中的设备记录**
```
sql
SELECT
device_sn
,
device_name
,
workspace_id
,
domain
,
type
,
sub_type
,
thing_version
,
bound_status
FROM
tbl_device
WHERE
device_sn
=
'962A0996E2D65E01BF9DB05F75D7D622'
;
```
2.
**检查 Redis 中的设备在线状态**
```
bash
redis-cli
>
GET device:online:962A0996E2D65E01BF9DB05F75D7D622
```
3.
**检查设备是否发送了上线消息**
-
查看 MQTT 日志,确认是否收到
`thing/product/962A0996E2D65E01BF9DB05F75D7D622/status`
消息
-
确认消息内容中的 domain=3, type=88101, sub_type=0
4.
**如果设备未绑定,手动绑定到 Workspace**
```
sql
UPDATE
tbl_device
SET
workspace_id
=
'your_workspace_id'
WHERE
device_sn
=
'962A0996E2D65E01BF9DB05F75D7D622'
;
```
### 方案 2:增强错误处理和日志
修改
`OsdRouter.java`
,添加异常捕获:
```
java
@Bean
public
IntegrationFlow
osdRouterFlow
()
{
return
IntegrationFlows
.
from
(
ChannelName
.
INBOUND_OSD
)
.
transform
(
Message
.
class
,
source
->
{
try
{
TopicOsdRequest
response
=
Common
.
getObjectMapper
().
readValue
(
(
byte
[])
source
.
getPayload
(),
new
TypeReference
<
TopicOsdRequest
>()
{});
String
topic
=
String
.
valueOf
(
source
.
getHeaders
().
get
(
MqttHeaders
.
RECEIVED_TOPIC
));
return
response
.
setFrom
(
topic
.
substring
(
(
THING_MODEL_PRE
+
PRODUCT
).
length
(),
topic
.
indexOf
(
OSD_SUF
)));
}
catch
(
IOException
e
)
{
throw
new
CloudSDKException
(
e
);
}
},
null
)
.<
TopicOsdRequest
>
handle
((
response
,
headers
)
->
{
try
{
GatewayManager
gateway
=
SDKManager
.
getDeviceSDK
(
response
.
getGateway
());
OsdDeviceTypeEnum
typeEnum
=
OsdDeviceTypeEnum
.
find
(
gateway
.
getType
(),
response
.
getFrom
().
equals
(
response
.
getGateway
()));
// ... 后续处理
}
catch
(
CloudSDKException
e
)
{
log
.
error
(
"Failed to process OSD data for gateway: {}, error: {}"
,
response
.
getGateway
(),
e
.
getMessage
());
// 尝试自动注册设备或返回 null 跳过
return
null
;
}
})
// ... 其他处理
.
get
();
}
```
### 方案 3:修改 osdDock 方法,允许未绑定设备推送数据
修改
`SDKDeviceService.java`
的
`osdDock`
方法:
```
java
@Override
public
void
osdDock
(
TopicOsdRequest
<
OsdDock
>
request
,
MessageHeaders
headers
)
{
String
from
=
request
.
getFrom
();
Optional
<
DeviceDTO
>
deviceOpt
=
deviceRedisService
.
getDeviceOnline
(
from
);
if
(
deviceOpt
.
isEmpty
()
||
!
StringUtils
.
hasText
(
deviceOpt
.
get
().
getWorkspaceId
()))
{
deviceOpt
=
deviceService
.
getDeviceBySn
(
from
);
if
(
deviceOpt
.
isEmpty
())
{
log
.
warn
(
"Device {} not found in database, OSD data will not be pushed"
,
from
);
return
;
}
if
(!
StringUtils
.
hasText
(
deviceOpt
.
get
().
getWorkspaceId
()))
{
log
.
warn
(
"Device {} not bound to workspace, OSD data will not be pushed"
,
from
);
return
;
}
}
DeviceDTO
device
=
deviceOpt
.
get
();
if
(
StringUtils
.
hasText
(
device
.
getChildDeviceSn
()))
{
deviceService
.
getDeviceBySn
(
device
.
getChildDeviceSn
()).
ifPresent
(
device:
:
setChildren
);
}
deviceRedisService
.
setDeviceOnline
(
device
);
fillDockOsd
(
from
,
request
.
getData
());
// 推送 OSD 数据到前端
deviceService
.
pushOsdDataToWeb
(
device
.
getWorkspaceId
(),
BizCodeEnum
.
DOCK_OSD
,
from
,
request
.
getData
());
}
```
## 调试步骤
1.
**启用详细日志**
-
在
`application.yml`
中设置:
```yaml
logging:
level:
com.raytue.sdk.mqtt.osd: DEBUG
com.raytue.sample.manage.service.impl.SDKDeviceService: DEBUG
```
2.
**查看后台日志**
-
搜索关键字:
`962A0996E2D65E01BF9DB05F75D7D622`
-
查找异常:
`CloudSDKException`
,
`NOT_REGISTERED`
-
查找警告:
`Please restart the drone`
,
`Please bind the dock first`
3.
**使用 MQTT 工具订阅主题**
```
thing/product/+/osd
thing/product/962A0996E2D65E01BF9DB05F75D7D622/status
```
4.
**检查 WebSocket 连接**
-
前端是否正确连接到 WebSocket
-
WebSocket 是否订阅了正确的 workspace
## 预期结果
修复后,草莓机场的 OSD 数据应该能够:
1.
被
`OsdRouter`
正确路由到
`osdDock`
方法
2.
通过
`pushOsdDataToWeb`
推送到前端
3.
前端通过 WebSocket 接收到数据并更新设备状态为在线
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment