Commit c2c11e95 by guoxuejian Committed by gdj

feat: implement Redis-based live session management for improved viewer experience

parent 6eb36174
...@@ -69,6 +69,11 @@ public final class RedisConst { ...@@ -69,6 +69,11 @@ public final class RedisConst {
public static final String DRONE_CONTROL_PREFiX = "control_source" + DELIMITER; public static final String DRONE_CONTROL_PREFiX = "control_source" + DELIMITER;
/** 直播观看者会话前缀,完整 key = live_session:{videoIdStr}:{sessionId} */
public static final String LIVE_SESSION_PREFIX = "live_session" + DELIMITER;
/** 直播观看者心跳 TTL(秒),心跳间隔建议为 10s,TTL 设为 30s */
public static final Integer LIVE_SESSION_TTL_SECOND = 30;
public static final String DEVICE_DETAIL_PREFIX = "device_detail" + DELIMITER; public static final String DEVICE_DETAIL_PREFIX = "device_detail" + DELIMITER;
/** /**
......
...@@ -79,6 +79,7 @@ public class LiveStreamController { ...@@ -79,6 +79,7 @@ public class LiveStreamController {
/** /**
* Live streaming according to the parameters passed in from the web side. * Live streaming according to the parameters passed in from the web side.
* Server generates a unique sessionId and returns it in the response.
* @param liveParam Live streaming parameters. * @param liveParam Live streaming parameters.
* @return * @return
*/ */
...@@ -89,6 +90,7 @@ public class LiveStreamController { ...@@ -89,6 +90,7 @@ public class LiveStreamController {
/** /**
* 不需要权限 Live streaming according to the parameters passed in from the web side. * 不需要权限 Live streaming according to the parameters passed in from the web side.
* Server generates a unique sessionId and returns it in the response.
* @param liveParam Live streaming parameters. * @param liveParam Live streaming parameters.
* @return * @return
*/ */
...@@ -98,23 +100,34 @@ public class LiveStreamController { ...@@ -98,23 +100,34 @@ public class LiveStreamController {
} }
/** /**
* Stop live streaming according to the parameters passed in from the web side. * Session-aware stop: only removes the specified sessionId's viewer session.
* @param liveParam Live streaming parameters. * Actual device stop is triggered only when all viewers have left.
* @param liveParam Live streaming parameters (must include session_id from liveStart response).
* @return * @return
*/ */
@PostMapping("/streams/stop") @PostMapping("/streams/stop")
public HttpResultResponse liveStop(@RequestBody LiveTypeDTO liveParam) { public HttpResultResponse liveStop(@RequestBody LiveTypeDTO liveParam) {
return liveStreamService.liveStop(liveParam.getVideoId()); return liveStreamService.liveStopSession(liveParam.getVideoId(), liveParam.getSessionId());
} }
/** /**
* 不需要权限 Stop live streaming according to the parameters passed in from the web side. * 不需要权限 Force stop: clears all viewer sessions and immediately stops the stream.
* @param liveParam Live streaming parameters. * @param liveParam Live streaming parameters.
* @return * @return
*/ */
@PostMapping("/streams/stop2") @PostMapping("/streams/stop2")
public HttpResultResponse liveStop2(@RequestBody LiveTypeDTO liveParam) { public HttpResultResponse liveStop2(@RequestBody LiveTypeDTO liveParam) {
return liveStreamService.liveStop(liveParam.getVideoId()); return liveStreamService.liveForceStop(liveParam.getVideoId());
}
/**
* Heartbeat to keep viewer session alive. Frontend should call every 10 seconds.
* @param liveParam Live streaming parameters (videoId + session_id required).
* @return
*/
@PostMapping("/streams/heartbeat")
public HttpResultResponse liveHeartbeat(@RequestBody LiveTypeDTO liveParam) {
return liveStreamService.liveHeartbeat(liveParam.getVideoId(), liveParam.getSessionId());
} }
/** /**
......
package com.dji.sample.manage.model.dto; package com.dji.sample.manage.model.dto;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data; import lombok.Data;
/** /**
...@@ -17,4 +18,10 @@ public class LiveDTO { ...@@ -17,4 +18,10 @@ public class LiveDTO {
private String username; private String username;
private String password; private String password;
/**
* 观看会话ID,由服务端在 liveStart 时生成,前端需保存并在 heartbeat/stop 时回传。
*/
@JsonProperty("session_id")
private String sessionId;
} }
\ No newline at end of file
...@@ -31,4 +31,9 @@ public class LiveTypeDTO { ...@@ -31,4 +31,9 @@ public class LiveTypeDTO {
@JsonProperty("camera_position") @JsonProperty("camera_position")
private CameraPositionEnum cameraPosition; private CameraPositionEnum cameraPosition;
/**
* 观看会话ID,由服务端在 liveStart 时生成(UUID),前端在 heartbeat/stop 时回传。
*/
@JsonProperty("session_id")
private String sessionId;
} }
\ No newline at end of file
...@@ -71,4 +71,27 @@ public interface ILiveStreamService { ...@@ -71,4 +71,27 @@ public interface ILiveStreamService {
*/ */
HttpResultResponse liveCameraChange(LiveTypeDTO liveParam); HttpResultResponse liveCameraChange(LiveTypeDTO liveParam);
/**
* 会话感知的停流:仅注销指定 sessionId 的观看会话,当观看者计数归零时才真正向设备发送停流指令。
* @param videoId 直播流标识
* @param sessionId 观看会话ID(服务端在 liveStart 时生成的 UUID)
* @return 操作结果
*/
HttpResultResponse liveStopSession(VideoId videoId, String sessionId);
/**
* 心跳续约:刷新指定 sessionId 的观看者会话 Redis key 的 TTL。
* @param videoId 直播流标识
* @param sessionId 观看会话ID
* @return 操作结果
*/
HttpResultResponse liveHeartbeat(VideoId videoId, String sessionId);
/**
* 强制停流:清除所有观看者会话并立即执行真正停流(供 stop2 等无鉴权接口使用)。
* @param videoId 直播流标识
* @return 操作结果
*/
HttpResultResponse liveForceStop(VideoId videoId);
} }
...@@ -5,6 +5,8 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; ...@@ -5,6 +5,8 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.dji.sample.common.error.CommonErrorEnum; import com.dji.sample.common.error.CommonErrorEnum;
import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.component.mqtt.model.EventsReceiver; import com.dji.sample.component.mqtt.model.EventsReceiver;
import com.dji.sample.component.websocket.model.BizCodeEnum; import com.dji.sample.component.websocket.model.BizCodeEnum;
import com.dji.sample.component.websocket.service.IWebSocketMessageService; import com.dji.sample.component.websocket.service.IWebSocketMessageService;
...@@ -57,6 +59,7 @@ import java.time.ZoneId; ...@@ -57,6 +59,7 @@ import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.dji.sample.common.constant.DeviceConstant.CUSTOM_DOCK_START; import static com.dji.sample.common.constant.DeviceConstant.CUSTOM_DOCK_START;
...@@ -160,6 +163,7 @@ public class DeviceServiceImpl extends ServiceImpl<IDeviceMapper, DeviceEntity> ...@@ -160,6 +163,7 @@ public class DeviceServiceImpl extends ServiceImpl<IDeviceMapper, DeviceEntity>
deviceRedisService.subDeviceOffline(deviceSn); deviceRedisService.subDeviceOffline(deviceSn);
// Publish the latest device topology information in the current workspace. // Publish the latest device topology information in the current workspace.
pushDeviceOfflineTopo(deviceOpt.get().getWorkspaceId(), deviceSn); pushDeviceOfflineTopo(deviceOpt.get().getWorkspaceId(), deviceSn);
clearLiveSessionManagersOnDeviceOffline(deviceSn);
log.debug("{} offline.", deviceSn); log.debug("{} offline.", deviceSn);
} }
...@@ -177,9 +181,38 @@ public class DeviceServiceImpl extends ServiceImpl<IDeviceMapper, DeviceEntity> ...@@ -177,9 +181,38 @@ public class DeviceServiceImpl extends ServiceImpl<IDeviceMapper, DeviceEntity>
offlineUnsubscribeTopic(SDKManager.getDeviceSDK(gatewaySn)); offlineUnsubscribeTopic(SDKManager.getDeviceSDK(gatewaySn));
// Publish the latest device topology information in the current workspace. // Publish the latest device topology information in the current workspace.
pushDeviceOfflineTopo(deviceOpt.get().getWorkspaceId(), gatewaySn); pushDeviceOfflineTopo(deviceOpt.get().getWorkspaceId(), gatewaySn);
clearLiveSessionManagersOnDeviceOffline(deviceOpt.get().getChildDeviceSn());
clearLiveSessionManagersOnDeviceOffline(gatewaySn);
log.debug("{} offline.", gatewaySn); log.debug("{} offline.", gatewaySn);
} }
/**
* 任一可直播设备离线后,清理该设备对应的直播会话管理 Redis key。
* 仅按 SN 前缀清理,避免误删其他机器的会话。
*/
private void clearLiveSessionManagersOnDeviceOffline(String sn) {
if (!StringUtils.hasText(sn)) {
return;
}
// 防御式校验:仅在该设备确实离线时执行清理
if (deviceRedisService.checkDeviceOnline(sn)) {
return;
}
clearLiveSessionKeysBySn(sn);
}
private void clearLiveSessionKeysBySn(String sn) {
String pattern = RedisConst.LIVE_SESSION_PREFIX + sn + "_*";
Set<String> keys = RedisOpsUtils.getAllKeys(pattern);
if (CollectionUtils.isEmpty(keys)) {
return;
}
keys.forEach(RedisOpsUtils::del);
log.info("Cleared {} live session key(s) for sn={}", keys.size(), sn);
}
@Override @Override
public void gatewayOnlineSubscribeTopic(GatewayManager gateway) { public void gatewayOnlineSubscribeTopic(GatewayManager gateway) {
statusSubscribe.subscribe(gateway); statusSubscribe.subscribe(gateway);
......
package com.dji.sample.manage.service.impl; package com.dji.sample.manage.service.impl;
import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.manage.model.dto.*; import com.dji.sample.manage.model.dto.*;
import com.dji.sample.manage.model.param.DeviceQueryParam; import com.dji.sample.manage.model.param.DeviceQueryParam;
import com.dji.sample.manage.service.*; import com.dji.sample.manage.service.*;
...@@ -11,14 +13,18 @@ import com.dji.sdk.common.HttpResultResponse; ...@@ -11,14 +13,18 @@ import com.dji.sdk.common.HttpResultResponse;
import com.dji.sdk.common.SDKManager; import com.dji.sdk.common.SDKManager;
import com.dji.sdk.mqtt.services.ServicesReplyData; import com.dji.sdk.mqtt.services.ServicesReplyData;
import com.dji.sdk.mqtt.services.TopicServicesResponse; import com.dji.sdk.mqtt.services.TopicServicesResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -28,6 +34,7 @@ import java.util.stream.Collectors; ...@@ -28,6 +34,7 @@ import java.util.stream.Collectors;
*/ */
@Service @Service
@Transactional @Transactional
@Slf4j
public class LiveStreamServiceImpl implements ILiveStreamService { public class LiveStreamServiceImpl implements ILiveStreamService {
@Autowired @Autowired
...@@ -163,6 +170,12 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -163,6 +170,12 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
return HttpResultResponse.error(LiveErrorCodeEnum.URL_TYPE_NOT_SUPPORTED); return HttpResultResponse.error(LiveErrorCodeEnum.URL_TYPE_NOT_SUPPORTED);
} }
// 注册观看者会话(仅在 MQTT 推流成功后),生成唯一 sessionId
String sessionId = UUID.randomUUID().toString();
registerSession(liveParam.getVideoId(), sessionId);
live.setSessionId(sessionId);
log.info("Registered live session sessionId={}, videoId={}", sessionId, liveParam.getVideoId());
return HttpResultResponse.success(live); return HttpResultResponse.success(live);
} }
...@@ -294,4 +307,89 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -294,4 +307,89 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
} }
return url; return url;
} }
// ==================== 直播会话管理(多用户防冲突 + 心跳保活) ====================
/**
* 生成观看者会话 Redis key。
* 格式: live_session:{droneSn}_{payloadIndex}:{sessionId}
*/
private String buildSessionKey(VideoId videoId, String sessionId) {
String videoIdStr = videoId.getDroneSn() + "_" + videoId.getPayloadIndex().toString();
return RedisConst.LIVE_SESSION_PREFIX + videoIdStr + RedisConst.DELIMITER + sessionId;
}
/**
* 查找某路直播流下所有活跃观看者会话 key。
* 模式: live_session:{droneSn}_{payloadIndex}:*
*/
private Set<String> findSessionKeys(VideoId videoId) {
String pattern = RedisConst.LIVE_SESSION_PREFIX
+ videoId.getDroneSn() + "_" + videoId.getPayloadIndex().toString()
+ RedisConst.DELIMITER + "*";
return RedisOpsUtils.getAllKeys(pattern);
}
/**
* 注册观看者会话到 Redis,设置 TTL。
*/
private void registerSession(VideoId videoId, String sessionId) {
String key = buildSessionKey(videoId, sessionId);
RedisOpsUtils.setWithExpire(key, sessionId, RedisConst.LIVE_SESSION_TTL_SECOND);
}
@Override
public HttpResultResponse liveStopSession(VideoId videoId, String sessionId) {
if (Objects.isNull(videoId) || !StringUtils.hasText(sessionId)) {
return HttpResultResponse.error("videoId and sessionId are required");
}
// 1. 删除指定 sessionId 的会话 key
String key = buildSessionKey(videoId, sessionId);
RedisOpsUtils.del(key);
log.info("Removed live session sessionId={}, videoId={}", sessionId, videoId);
// 2. 检查剩余观看者
Set<String> remaining = findSessionKeys(videoId);
if (remaining == null || remaining.isEmpty()) {
// 最后一个观看者离开 → 真正停流
log.info("No remaining viewers for videoId={}, executing real stop", videoId);
return this.liveStop(videoId);
}
// 还有其他观看者,仅注销当前会话
log.info("Still {} viewer(s) watching videoId={}, skip real stop", remaining.size(), videoId);
return HttpResultResponse.success();
}
@Override
public HttpResultResponse liveHeartbeat(VideoId videoId, String sessionId) {
if (Objects.isNull(videoId) || !StringUtils.hasText(sessionId)) {
return HttpResultResponse.error("videoId and sessionId are required");
}
String key = buildSessionKey(videoId, sessionId);
if (!RedisOpsUtils.checkExist(key)) {
return HttpResultResponse.error("Session not found, please restart live");
}
RedisOpsUtils.expireKey(key, RedisConst.LIVE_SESSION_TTL_SECOND);
return HttpResultResponse.success();
}
@Override
public HttpResultResponse liveForceStop(VideoId videoId) {
if (Objects.isNull(videoId)) {
return HttpResultResponse.error("videoId is required");
}
// 清除所有观看者会话
Set<String> keys = findSessionKeys(videoId);
if (keys != null && !keys.isEmpty()) {
keys.forEach(RedisOpsUtils::del);
log.info("Force cleared {} session(s) for videoId={}", keys.size(), videoId);
}
// 直接执行真正停流
return this.liveStop(videoId);
}
} }
\ No newline at end of file
# 直播会话管理器(livestream-session-manager)任务说明
## 背景
为解决多人同时观看同一路直播时,单个用户关闭页面触发停流导致全员断流的问题,后端引入了基于 Redis 的观看会话管理。
## 主要改动
### 1) 会话模型从 userId 调整为 sessionId
- 每次 `/streams/start` 成功后,服务端生成唯一 `session_id`(UUID)并返回前端。
- 同一用户多个页面/标签页拥有独立 `session_id`,互不影响。
### 2) 直播控制接口增强
- `/streams/stop`:按 `videoId + session_id` 仅注销当前观看会话。
- `/streams/heartbeat`:按 `videoId + session_id` 刷新 TTL。
- `/streams/stop2`:强制停流,清空该视频流全部会话并执行真实停流。
### 3) Redis 会话管理
- Key 前缀:`live_session:`
- Key 结构:`live_session:{droneSn}_{payloadIndex}:{sessionId}`
- TTL:30 秒(建议前端 10 秒心跳)
### 4) 设备离线清理策略(按设备)
- 任一可直播设备(机场/无人机)离线后,清理该设备对应的会话 key。
- 清理按设备 SN 精确匹配(`live_session:{sn}_*`),不会清理其他正常设备会话。
## 关键参数与时序(上线必看)
### Redis 参数
- 常量位置:`sample/src/main/java/com/dji/sample/component/redis/RedisConst.java`
- `LIVE_SESSION_PREFIX = "live_session:"`
- `LIVE_SESSION_TTL_SECOND = 30`
### 会话 Key 规则
- 生成位置:`LiveStreamServiceImpl.buildSessionKey(...)`
- 格式:`live_session:{droneSn}_{payloadIndex}:{sessionId}`
- 示例:`live_session:5CGAC123ABCD_0-52-0:3f3e6b9b-0dd8-4f6a-9aab-1f9b6e3efc5e`
说明:
- `{droneSn}_{payloadIndex}` 代表同一路视频源(videoId)
- `sessionId` 是单次观看会话唯一标识(UUID)
### 心跳与超时建议
- 前端心跳周期:**10 秒**
- Redis 会话 TTL:**30 秒**
- 建议重试:
- 心跳失败可立即重试 1 次;
- 连续失败后停止心跳并提示用户重开直播。
容错窗口:
- 10 秒心跳 + 30 秒 TTL,允许短时网络抖动,不会立即误判离线。
## 接口参数说明(关键字段)
### 1) 开始直播 `/streams/start` 或 `/streams/start2`
请求关键字段(`LiveTypeDTO`):
- `video_id`:包含 `drone_sn``payload_index`
- `url_type`:推流协议类型
- `video_quality`:清晰度
响应关键字段(`LiveDTO`):
- `url`:播放器地址
- `session_id`:本次观看会话 ID(后续 heartbeat/stop 必传)
### 2) 心跳 `/streams/heartbeat`
请求必填:
- `video_id`
- `session_id`
后端行为:
- 若 session 存在:刷新 TTL 为 30 秒
- 若 session 不存在:返回错误(提示前端需重新 start)
### 3) 停止 `/streams/stop`
请求必填:
- `video_id`
- `session_id`
后端行为:
1. 删除当前 `session_id` 对应 key
2. 统计同一路流剩余会话数
3. 若剩余为 0:调用真实停流(MQTT `liveStopPush`
4. 若仍有其他会话:不执行真实停流
### 4) 强制停止 `/streams/stop2`
请求关键字段:
- `video_id`
后端行为:
- 删除该路流全部 `live_session` key
- 立即执行真实停流
## 离线清理规则(当前实现)
位置:`DeviceServiceImpl`
- 触发点:
- `subDeviceOffline(deviceSn)`
- `gatewayOffline(gatewaySn)`
- 处理逻辑:
- 任一可直播设备离线后,按 SN 清理该设备会话:`live_session:{sn}_*`
-`gatewayOffline` 中,会同时清理 `gatewaySn``childDeviceSn` 的会话(均为按 SN 范围清理)
保障:
- 清理按 SN 前缀精确过滤,不会误删其他设备会话。
## 前端协同要求(细化)
1. 调用 `/streams/start` 成功后,保存 `session_id`(建议按 `video_id` 维度缓存)。
2. 每 10 秒调用一次 `/streams/heartbeat`,请求体必须带当前 `session_id`
3. 关闭播放器时调用 `/streams/stop`,并携带同一 `session_id`
4. 监听设备离线状态后:
- 立即停止该设备心跳;
- 清理本地该设备对应 `session_id`
- 不再继续对该设备发 heartbeat/stop。
5. 设备重新上线后必须重新 `/streams/start` 获取新 `session_id`,禁止复用旧会话。
## 回归测试建议
1. **同一用户双标签页**
- A/B 各自 start,拿到不同 `session_id`
- 关闭 A(stop A)后 B 仍可看
- 关闭 B 后才真实停流
2. **心跳超时**
- start 后停止 heartbeat
- 等待 >30 秒,会话自动过期
- 再 stop 应保持幂等,不影响其他设备
3. **设备离线清理**
- 某设备离线后,验证该 SN 的 `live_session:{sn}_*` 被清理
- 其他设备直播会话不受影响
## 涉及文件(核心)
- `sample/src/main/java/com/dji/sample/manage/model/dto/LiveTypeDTO.java`
- `sample/src/main/java/com/dji/sample/manage/model/dto/LiveDTO.java`
- `sample/src/main/java/com/dji/sample/manage/service/ILiveStreamService.java`
- `sample/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java`
- `sample/src/main/java/com/dji/sample/manage/controller/LiveStreamController.java`
- `sample/src/main/java/com/dji/sample/component/redis/RedisConst.java`
- `sample/src/main/java/com/dji/sample/manage/service/impl/DeviceServiceImpl.java`
## 前端协同要求
1. 调用 `/streams/start` 后保存返回的 `session_id`
2. 心跳与停止接口必须携带该 `session_id`
3. 设备离线(前端状态判定)后停止该设备心跳,并清理本地会话缓存。
4. 设备重新上线后重新 `start`,不要复用旧 `session_id`
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment