Commit 984de8bc by tntxia

Merge branch 'prod' of http://git.raytue.com/GeoFly/GeoFlyApi into prod

parents 7920a055 f43581be
......@@ -28,8 +28,8 @@ public class MqttTopicServiceImpl implements IMqttTopicService {
public void subscribe(String... topics) {
Set<String> topicSet = new HashSet<>(Arrays.asList(getSubscribedTopic()));
for (String topic : topics) {
if (topicSet.contains(topic)) {
return;
if (topicSet.contains(topic) || isCoveredByWildcard(topicSet, topic)) {
continue;
}
subscribe(topic, 1);
}
......@@ -38,7 +38,7 @@ public class MqttTopicServiceImpl implements IMqttTopicService {
@Override
public void subscribe(String topic, int qos) {
Set<String> topicSet = new HashSet<>(Arrays.asList(getSubscribedTopic()));
if (topicSet.contains(topic)) {
if (topicSet.contains(topic) || isCoveredByWildcard(topicSet, topic)) {
return;
}
log.debug("subscribe topic: {}", topic);
......@@ -47,11 +47,41 @@ public class MqttTopicServiceImpl implements IMqttTopicService {
@Override
public void unsubscribe(String... topics) {
log.debug("unsubscribe topic: {}", Arrays.toString(topics));
adapter.removeTopic(topics);
Set<String> currentTopics = new HashSet<>(Arrays.asList(getSubscribedTopic()));
for (String topic : topics) {
if (!currentTopics.contains(topic)) {
continue;
}
if (isCoveredByWildcard(currentTopics, topic)) {
log.debug("skip unsubscribe, topic {} is covered by wildcard", topic);
continue;
}
log.debug("unsubscribe topic: {}", topic);
adapter.removeTopic(topic);
}
}
public String[] getSubscribedTopic() {
return adapter.getTopic();
}
/**
* Check if a specific topic is already covered by a single-level wildcard (+) subscription.
* e.g. "thing/product/DRONE-001/osd" is covered by "thing/product/+/osd"
*/
private boolean isCoveredByWildcard(Set<String> existingTopics, String topic) {
String[] parts = topic.split("/");
if (parts.length < 3) {
return false;
}
for (int i = 0; i < parts.length; i++) {
String[] wildcardParts = parts.clone();
wildcardParts[i] = "+";
String wildcardTopic = String.join("/", wildcardParts);
if (existingTopics.contains(wildcardTopic)) {
return true;
}
}
return false;
}
}
......@@ -1698,9 +1698,22 @@ public class DeviceServiceImpl extends ServiceImpl<IDeviceMapper, DeviceEntity>
} else if (DeviceDomainEnum.REMOTER_CONTROL == dbDevice.getDomain()) {
recoverGatewayOnline(dbDevice);
} else if (DeviceDomainEnum.DRONE == dbDevice.getDomain()) {
// 通过 child_sn 反查父设备(数据库中没有 parent_sn 字段,parentSn 只存在于 Redis)
List<DeviceDTO> parentDevices = this.getDevicesByParams(
DeviceQueryParam.builder().childSn(deviceSn).build());
if (!parentDevices.isEmpty()) {
// 子设备无人机:通过父设备(机场/遥控器)恢复整个拓扑
DeviceDTO parentDevice = parentDevices.get(0);
log.info("[Auto-Recovery] Drone {} is a sub-device, recovering via gateway {}",
deviceSn, parentDevice.getDeviceSn());
recoverGatewayOnline(parentDevice);
} else {
// 独立无人机:直接恢复
log.info("[Auto-Recovery] Drone {} is an independent device, recovering directly", deviceSn);
recoverDroneOnline(dbDevice);
}
}
}
@Override
public void recoverGatewayOnline(String gatewaySn) {
......@@ -1726,6 +1739,7 @@ public class DeviceServiceImpl extends ServiceImpl<IDeviceMapper, DeviceEntity>
}
DeviceDTO dbDevice = dbDeviceOpt.get();
log.info("[Auto-Recovery] Recovering gateway {} and sub-device {}", gatewaySn, deviceSn);
GatewayManager gatewayManager = SDKManager.registerDevice(gatewaySn, deviceSn,
dbGateway.getDomain(), dbGateway.getType(),
dbGateway.getSubType(), dbGateway.getThingVersion(), dbDevice.getThingVersion());
......@@ -1759,6 +1773,7 @@ public class DeviceServiceImpl extends ServiceImpl<IDeviceMapper, DeviceEntity>
if (DeviceDomainEnum.DRONE != dbDevice.getDomain()) {
return;
}
log.info("[Auto-Recovery] Recovering independent drone {}", dbDevice.getDeviceSn());
deviceRedisService.setDeviceOnline(dbDevice);
this.subDroneOnlineSubscribeTopic(dbDevice.getDeviceSn());
// deviceService.pushDeviceOnlineTopo(dbGateway.getWorkspaceId(), dbGateway.getDeviceSn(), dbDevice.getDeviceSn());
......
......@@ -151,6 +151,7 @@ public class SDKDeviceService extends AbstractDeviceService {
public void osdDock(TopicOsdRequest<OsdDock> request, MessageHeaders headers) {
String from = request.getFrom();
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(from);
boolean needsRecovery = deviceOpt.isEmpty();
if (deviceOpt.isEmpty() || !StringUtils.hasText(deviceOpt.get().getWorkspaceId())) {
deviceOpt = deviceService.getDeviceBySn(from);
if (deviceOpt.isEmpty()) {
......@@ -168,6 +169,11 @@ public class SDKDeviceService extends AbstractDeviceService {
}
deviceRedisService.setDeviceOnline(device);
if (needsRecovery && StringUtils.hasText(device.getWorkspaceId())) {
triggerDeviceAutoRecovery(from);
}
fillDockOsd(from, request.getData());
deviceService.pushOsdDataToWeb(device.getWorkspaceId(), BizCodeEnum.DOCK_OSD, from, request.getData());
......@@ -177,6 +183,7 @@ public class SDKDeviceService extends AbstractDeviceService {
public void osdDockDrone(TopicOsdRequest<OsdDockDrone> request, MessageHeaders headers) {
String from = request.getFrom();
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(from);
boolean needsRecovery = deviceOpt.isEmpty();
if (deviceOpt.isEmpty()) {
deviceOpt = deviceService.getDeviceBySn(from);
if (deviceOpt.isEmpty()) {
......@@ -191,6 +198,10 @@ public class SDKDeviceService extends AbstractDeviceService {
DeviceDTO device = deviceOpt.get();
deviceRedisService.setDeviceOnline(device);
if (needsRecovery && StringUtils.hasText(device.getWorkspaceId())) {
triggerDeviceAutoRecovery(from);
}
deviceRedisService.setDeviceOsd(from, request.getData());
// 存放数据
deviceDetailService.saveDeviceOsd(from, request.getData());
......@@ -234,6 +245,7 @@ public class SDKDeviceService extends AbstractDeviceService {
public void osdRemoteControl(TopicOsdRequest<OsdRemoteControl> request, MessageHeaders headers) {
String from = request.getFrom();
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(from);
boolean needsRecovery = deviceOpt.isEmpty();
if (deviceOpt.isEmpty() || !StringUtils.hasText(deviceOpt.get().getWorkspaceId())) {
deviceOpt = deviceService.getDeviceBySn(from);
if (deviceOpt.isEmpty()) {
......@@ -247,6 +259,10 @@ public class SDKDeviceService extends AbstractDeviceService {
}
deviceRedisService.setDeviceOnline(device);
if (needsRecovery && StringUtils.hasText(device.getWorkspaceId())) {
triggerDeviceAutoRecovery(from);
}
OsdRemoteControl data = request.getData();
deviceService.pushOsdDataToPilot(device.getWorkspaceId(), from,
new DeviceOsdHost()
......@@ -261,6 +277,7 @@ public class SDKDeviceService extends AbstractDeviceService {
public void osdRcDrone(TopicOsdRequest<OsdRcDrone> request, MessageHeaders headers) {
String from = request.getFrom();
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(from);
boolean needsRecovery = deviceOpt.isEmpty();
if (deviceOpt.isEmpty()) {
deviceOpt = deviceService.getDeviceBySn(from);
if (deviceOpt.isEmpty()) {
......@@ -275,6 +292,10 @@ public class SDKDeviceService extends AbstractDeviceService {
deviceRedisService.setDeviceOnline(device);
if (needsRecovery && StringUtils.hasText(device.getWorkspaceId())) {
triggerDeviceAutoRecovery(from);
}
OsdRcDrone data = request.getData();
deviceService.pushOsdDataToPilot(device.getWorkspaceId(), from,
new DeviceOsdHost()
......@@ -425,6 +446,15 @@ public class SDKDeviceService extends AbstractDeviceService {
}
private void triggerDeviceAutoRecovery(String deviceSn) {
log.warn("[Auto-Recovery] OSD received from offline device {}, triggering online recovery", deviceSn);
try {
deviceService.recoverDeviceOnline(deviceSn);
} catch (Exception e) {
log.error("[Auto-Recovery] Failed to recover device {}: {}", deviceSn, e.getMessage());
}
}
public void deviceOnlineAgain(String workspaceId, String gatewaySn, String deviceSn) {
DeviceDTO device = DeviceDTO.builder().loginTime(LocalDateTime.now()).deviceSn(deviceSn).build();
DeviceDTO gateway = DeviceDTO.builder()
......
......@@ -112,7 +112,7 @@ mqtt:
cloud-sdk:
mqtt:
# Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",".
inbound-topic: sys/product/+/status,thing/product/+/requests,/ai_info
inbound-topic: sys/product/+/status,thing/product/+/requests,thing/product/+/osd,/ai_info
url:
manage:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment