Commit 17092f18 by guoxuejian

feat: enhance subscription logic to support wildcard checks and improve unsubscribe handling

parent c07b3aed
...@@ -28,8 +28,8 @@ public class MqttTopicServiceImpl implements IMqttTopicService { ...@@ -28,8 +28,8 @@ public class MqttTopicServiceImpl implements IMqttTopicService {
public void subscribe(String... topics) { public void subscribe(String... topics) {
Set<String> topicSet = new HashSet<>(Arrays.asList(getSubscribedTopic())); Set<String> topicSet = new HashSet<>(Arrays.asList(getSubscribedTopic()));
for (String topic : topics) { for (String topic : topics) {
if (topicSet.contains(topic)) { if (topicSet.contains(topic) || isCoveredByWildcard(topicSet, topic)) {
return; continue;
} }
subscribe(topic, 1); subscribe(topic, 1);
} }
...@@ -38,7 +38,7 @@ public class MqttTopicServiceImpl implements IMqttTopicService { ...@@ -38,7 +38,7 @@ public class MqttTopicServiceImpl implements IMqttTopicService {
@Override @Override
public void subscribe(String topic, int qos) { public void subscribe(String topic, int qos) {
Set<String> topicSet = new HashSet<>(Arrays.asList(getSubscribedTopic())); Set<String> topicSet = new HashSet<>(Arrays.asList(getSubscribedTopic()));
if (topicSet.contains(topic)) { if (topicSet.contains(topic) || isCoveredByWildcard(topicSet, topic)) {
return; return;
} }
log.debug("subscribe topic: {}", topic); log.debug("subscribe topic: {}", topic);
...@@ -47,11 +47,41 @@ public class MqttTopicServiceImpl implements IMqttTopicService { ...@@ -47,11 +47,41 @@ public class MqttTopicServiceImpl implements IMqttTopicService {
@Override @Override
public void unsubscribe(String... topics) { public void unsubscribe(String... topics) {
log.debug("unsubscribe topic: {}", Arrays.toString(topics)); Set<String> currentTopics = new HashSet<>(Arrays.asList(getSubscribedTopic()));
adapter.removeTopic(topics); for (String topic : topics) {
if (!currentTopics.contains(topic)) {
continue;
}
if (isCoveredByWildcard(currentTopics, topic)) {
log.debug("skip unsubscribe, topic {} is covered by wildcard", topic);
continue;
}
log.debug("unsubscribe topic: {}", topic);
adapter.removeTopic(topic);
}
} }
public String[] getSubscribedTopic() { public String[] getSubscribedTopic() {
return adapter.getTopic(); return adapter.getTopic();
} }
/**
* Check if a specific topic is already covered by a single-level wildcard (+) subscription.
* e.g. "thing/product/DRONE-001/osd" is covered by "thing/product/+/osd"
*/
private boolean isCoveredByWildcard(Set<String> existingTopics, String topic) {
String[] parts = topic.split("/");
if (parts.length < 3) {
return false;
}
for (int i = 0; i < parts.length; i++) {
String[] wildcardParts = parts.clone();
wildcardParts[i] = "+";
String wildcardTopic = String.join("/", wildcardParts);
if (existingTopics.contains(wildcardTopic)) {
return true;
}
}
return false;
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment