Commit 2eafe0b9 by guoxuejian

feat: add client ID support for live session management and optimize session reuse logic

parent 8056704b
...@@ -36,4 +36,12 @@ public class LiveTypeDTO { ...@@ -36,4 +36,12 @@ public class LiveTypeDTO {
*/ */
@JsonProperty("session_id") @JsonProperty("session_id")
private String sessionId; private String sessionId;
/**
* 客户端标识(前端浏览器标签页级别),用于 liveStart 时去重。
* 同一 clientId 再次开流时,后端会自动清理该 clientId 对应的旧会话,避免刷新页面产生幽灵会话。
* 存储于 sessionStorage,刷新不变、新标签页不同。
*/
@JsonProperty("client_id")
private String clientId;
} }
\ No newline at end of file
...@@ -128,9 +128,20 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -128,9 +128,20 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
return responseResult; return responseResult;
} }
ILivestreamUrl url = LiveStreamProperty.get(liveParam.getUrlType()); ILivestreamUrl url = LiveStreamProperty.get(liveParam.getUrlType());
url = setExt(liveParam.getUrlType(), url, liveParam.getVideoId()); url = setExt(liveParam.getUrlType(), url, liveParam.getVideoId());
// ========== 优化:先查 Redis,已有活跃会话则直接复用,跳过 MQTT 推流 ==========
Set<String> existingSessions = findSessionKeys(liveParam.getVideoId());
boolean streamAlreadyRunning = existingSessions != null && !existingSessions.isEmpty();
String rtspOutputUrl = null; // RTSP 特殊:播放地址来自设备响应
if (streamAlreadyRunning) {
log.info("Stream already running with {} viewer(s), skip MQTT push and reuse. videoId={}",
existingSessions.size(), liveParam.getVideoId());
} else {
// 首个观看者,真正向设备发送 MQTT 推流指令
TopicServicesResponse<ServicesReplyData<String>> response = abstractLivestreamService.liveStartPush( TopicServicesResponse<ServicesReplyData<String>> response = abstractLivestreamService.liveStartPush(
SDKManager.getDeviceSDK(responseResult.getData().getDeviceSn()), SDKManager.getDeviceSDK(responseResult.getData().getDeviceSn()),
new LiveStartPushRequest() new LiveStartPushRequest()
...@@ -139,16 +150,24 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -139,16 +150,24 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
.setVideoId(liveParam.getVideoId()) .setVideoId(liveParam.getVideoId())
.setVideoQuality(liveParam.getVideoQuality())); .setVideoQuality(liveParam.getVideoQuality()));
boolean isStartPushSuccess = response.getData().getResult().isSuccess(); if (!response.getData().getResult().isSuccess()) {
if (!isStartPushSuccess) { int errorCode = response.getData().getResult().getCode();
HttpResultResponse errorResponse = HttpResultResponse.error(response.getData().getResult()); // 设备报"已在直播"但 Redis 无会话(可能 Redis 被清过),视为可复用
// 复用场景:设备已在直播时,不应阻断新观看者会话注册 if (errorCode == LIVE_ALREADY_STREAMING_CODE) {
if (errorResponse.getCode() != LIVE_ALREADY_STREAMING_CODE) { log.info("Device reports already streaming but no Redis sessions, treating as reuse. videoId={}",
return errorResponse; liveParam.getVideoId());
} else {
return HttpResultResponse.error(response.getData().getResult());
} }
log.info("Live stream already started on device, reuse existing stream. videoId={}", liveParam.getVideoId());
} }
// 保存 RTSP 设备返回的 output(仅首次推流时设备会返回)
if (StringUtils.hasText(response.getData().getOutput())) {
rtspOutputUrl = response.getData().getOutput();
}
}
// ========== 构造播放地址(无论首个还是复用,URL 构造逻辑一致) ==========
LiveDTO live = new LiveDTO(); LiveDTO live = new LiveDTO();
switch (liveParam.getUrlType()) { switch (liveParam.getUrlType()) {
...@@ -169,13 +188,10 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -169,13 +188,10 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
.toString()); .toString());
break; break;
case RTSP: case RTSP:
if (StringUtils.hasText(response.getData().getOutput())) { // RTSP 优先用设备返回的 output 地址,复用时回退到配置 URL
live.setUrl(response.getData().getOutput()); if (StringUtils.hasText(rtspOutputUrl)) {
live.setUrl(rtspOutputUrl);
} else { } else {
// RTSP 在“已在直播”且无 output 时无法构造可用地址,返回原始错误
if (!isStartPushSuccess) {
return HttpResultResponse.error(response.getData().getResult());
}
live.setUrl(url.toString()); live.setUrl(url.toString());
} }
break; break;
...@@ -186,11 +202,18 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -186,11 +202,18 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
return HttpResultResponse.error(LiveErrorCodeEnum.URL_TYPE_NOT_SUPPORTED); return HttpResultResponse.error(LiveErrorCodeEnum.URL_TYPE_NOT_SUPPORTED);
} }
// 注册观看者会话(仅在 MQTT 推流成功后),生成唯一 sessionId // ========== 无论首个还是复用,都注册新的观看者会话 ==========
// 如果前端传了 clientId,先清理同一 clientId 的旧会话(处理页面刷新场景)
if (StringUtils.hasText(liveParam.getClientId())) {
cleanupClientSessions(liveParam.getVideoId(), liveParam.getClientId());
}
String sessionId = UUID.randomUUID().toString(); String sessionId = UUID.randomUUID().toString();
registerSession(liveParam.getVideoId(), sessionId); registerSession(liveParam.getVideoId(), sessionId, liveParam.getClientId());
live.setSessionId(sessionId); live.setSessionId(sessionId);
log.info("Registered live session sessionId={}, videoId={}", sessionId, liveParam.getVideoId()); log.info("Registered live session sessionId={}, clientId={}, videoId={}, totalViewers={}",
sessionId, liveParam.getClientId(), liveParam.getVideoId(),
countSessionKeys(liveParam.getVideoId()));
return HttpResultResponse.success(live); return HttpResultResponse.success(live);
} }
...@@ -348,10 +371,38 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -348,10 +371,38 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
/** /**
* 注册观看者会话到 Redis,设置 TTL。 * 注册观看者会话到 Redis,设置 TTL。
* value 存储 clientId(用于刷新去重),无 clientId 则存 sessionId。
*/ */
private void registerSession(VideoId videoId, String sessionId) { private void registerSession(VideoId videoId, String sessionId, String clientId) {
String key = buildSessionKey(videoId, sessionId); String key = buildSessionKey(videoId, sessionId);
RedisOpsUtils.setWithExpire(key, sessionId, RedisConst.LIVE_SESSION_TTL_SECOND); String value = StringUtils.hasText(clientId) ? clientId : sessionId;
RedisOpsUtils.setWithExpire(key, value, RedisConst.LIVE_SESSION_TTL_SECOND);
}
/**
* 清理同一 clientId 的旧会话(处理页面刷新 / 重复开流场景)。
* 扫描该 videoId 下所有会话 key,value == clientId 的全部删除。
*/
private void cleanupClientSessions(VideoId videoId, String clientId) {
Set<String> keys = findSessionKeys(videoId);
if (keys == null || keys.isEmpty()) {
return;
}
for (String key : keys) {
Object val = RedisOpsUtils.get(key);
if (clientId.equals(val)) {
RedisOpsUtils.del(key);
log.info("Cleaned up stale session for clientId={}, key={}", clientId, key);
}
}
}
/**
* 统计某路直播流当前活跃观看者数量。
*/
private int countSessionKeys(VideoId videoId) {
Set<String> keys = findSessionKeys(videoId);
return keys != null ? keys.size() : 0;
} }
@Override @Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment