Commit e2718944 by guoxuejian Committed by gdj

feat: add client ID support for live session management and optimize session reuse logic

parent 3d9d432b
...@@ -36,4 +36,12 @@ public class LiveTypeDTO { ...@@ -36,4 +36,12 @@ public class LiveTypeDTO {
*/ */
@JsonProperty("session_id") @JsonProperty("session_id")
private String sessionId; private String sessionId;
/**
* 客户端标识(前端浏览器标签页级别),用于 liveStart 时去重。
* 同一 clientId 再次开流时,后端会自动清理该 clientId 对应的旧会话,避免刷新页面产生幽灵会话。
* 存储于 sessionStorage,刷新不变、新标签页不同。
*/
@JsonProperty("client_id")
private String clientId;
} }
\ No newline at end of file
...@@ -128,27 +128,46 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -128,27 +128,46 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
return responseResult; return responseResult;
} }
ILivestreamUrl url = LiveStreamProperty.get(liveParam.getUrlType()); ILivestreamUrl url = LiveStreamProperty.get(liveParam.getUrlType());
url = setExt(liveParam.getUrlType(), url, liveParam.getVideoId()); url = setExt(liveParam.getUrlType(), url, liveParam.getVideoId());
TopicServicesResponse<ServicesReplyData<String>> response = abstractLivestreamService.liveStartPush( // ========== 优化:先查 Redis,已有活跃会话则直接复用,跳过 MQTT 推流 ==========
SDKManager.getDeviceSDK(responseResult.getData().getDeviceSn()), Set<String> existingSessions = findSessionKeys(liveParam.getVideoId());
new LiveStartPushRequest() boolean streamAlreadyRunning = existingSessions != null && !existingSessions.isEmpty();
.setUrl(url) String rtspOutputUrl = null; // RTSP 特殊:播放地址来自设备响应
.setUrlType(liveParam.getUrlType())
.setVideoId(liveParam.getVideoId()) if (streamAlreadyRunning) {
.setVideoQuality(liveParam.getVideoQuality())); log.info("Stream already running with {} viewer(s), skip MQTT push and reuse. videoId={}",
existingSessions.size(), liveParam.getVideoId());
boolean isStartPushSuccess = response.getData().getResult().isSuccess(); } else {
if (!isStartPushSuccess) { // 首个观看者,真正向设备发送 MQTT 推流指令
HttpResultResponse errorResponse = HttpResultResponse.error(response.getData().getResult()); TopicServicesResponse<ServicesReplyData<String>> response = abstractLivestreamService.liveStartPush(
// 复用场景:设备已在直播时,不应阻断新观看者会话注册 SDKManager.getDeviceSDK(responseResult.getData().getDeviceSn()),
if (errorResponse.getCode() != LIVE_ALREADY_STREAMING_CODE) { new LiveStartPushRequest()
return errorResponse; .setUrl(url)
.setUrlType(liveParam.getUrlType())
.setVideoId(liveParam.getVideoId())
.setVideoQuality(liveParam.getVideoQuality()));
if (!response.getData().getResult().isSuccess()) {
int errorCode = response.getData().getResult().getCode();
// 设备报"已在直播"但 Redis 无会话(可能 Redis 被清过),视为可复用
if (errorCode == LIVE_ALREADY_STREAMING_CODE) {
log.info("Device reports already streaming but no Redis sessions, treating as reuse. videoId={}",
liveParam.getVideoId());
} else {
return HttpResultResponse.error(response.getData().getResult());
}
}
// 保存 RTSP 设备返回的 output(仅首次推流时设备会返回)
if (StringUtils.hasText(response.getData().getOutput())) {
rtspOutputUrl = response.getData().getOutput();
} }
log.info("Live stream already started on device, reuse existing stream. videoId={}", liveParam.getVideoId());
} }
// ========== 构造播放地址(无论首个还是复用,URL 构造逻辑一致) ==========
LiveDTO live = new LiveDTO(); LiveDTO live = new LiveDTO();
switch (liveParam.getUrlType()) { switch (liveParam.getUrlType()) {
...@@ -169,13 +188,10 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -169,13 +188,10 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
.toString()); .toString());
break; break;
case RTSP: case RTSP:
if (StringUtils.hasText(response.getData().getOutput())) { // RTSP 优先用设备返回的 output 地址,复用时回退到配置 URL
live.setUrl(response.getData().getOutput()); if (StringUtils.hasText(rtspOutputUrl)) {
live.setUrl(rtspOutputUrl);
} else { } else {
// RTSP 在“已在直播”且无 output 时无法构造可用地址,返回原始错误
if (!isStartPushSuccess) {
return HttpResultResponse.error(response.getData().getResult());
}
live.setUrl(url.toString()); live.setUrl(url.toString());
} }
break; break;
...@@ -186,11 +202,18 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -186,11 +202,18 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
return HttpResultResponse.error(LiveErrorCodeEnum.URL_TYPE_NOT_SUPPORTED); return HttpResultResponse.error(LiveErrorCodeEnum.URL_TYPE_NOT_SUPPORTED);
} }
// 注册观看者会话(仅在 MQTT 推流成功后),生成唯一 sessionId // ========== 无论首个还是复用,都注册新的观看者会话 ==========
// 如果前端传了 clientId,先清理同一 clientId 的旧会话(处理页面刷新场景)
if (StringUtils.hasText(liveParam.getClientId())) {
cleanupClientSessions(liveParam.getVideoId(), liveParam.getClientId());
}
String sessionId = UUID.randomUUID().toString(); String sessionId = UUID.randomUUID().toString();
registerSession(liveParam.getVideoId(), sessionId); registerSession(liveParam.getVideoId(), sessionId, liveParam.getClientId());
live.setSessionId(sessionId); live.setSessionId(sessionId);
log.info("Registered live session sessionId={}, videoId={}", sessionId, liveParam.getVideoId()); log.info("Registered live session sessionId={}, clientId={}, videoId={}, totalViewers={}",
sessionId, liveParam.getClientId(), liveParam.getVideoId(),
countSessionKeys(liveParam.getVideoId()));
return HttpResultResponse.success(live); return HttpResultResponse.success(live);
} }
...@@ -348,10 +371,38 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -348,10 +371,38 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
/** /**
* 注册观看者会话到 Redis,设置 TTL。 * 注册观看者会话到 Redis,设置 TTL。
* value 存储 clientId(用于刷新去重),无 clientId 则存 sessionId。
*/ */
private void registerSession(VideoId videoId, String sessionId) { private void registerSession(VideoId videoId, String sessionId, String clientId) {
String key = buildSessionKey(videoId, sessionId); String key = buildSessionKey(videoId, sessionId);
RedisOpsUtils.setWithExpire(key, sessionId, RedisConst.LIVE_SESSION_TTL_SECOND); String value = StringUtils.hasText(clientId) ? clientId : sessionId;
RedisOpsUtils.setWithExpire(key, value, RedisConst.LIVE_SESSION_TTL_SECOND);
}
/**
* 清理同一 clientId 的旧会话(处理页面刷新 / 重复开流场景)。
* 扫描该 videoId 下所有会话 key,value == clientId 的全部删除。
*/
private void cleanupClientSessions(VideoId videoId, String clientId) {
Set<String> keys = findSessionKeys(videoId);
if (keys == null || keys.isEmpty()) {
return;
}
for (String key : keys) {
Object val = RedisOpsUtils.get(key);
if (clientId.equals(val)) {
RedisOpsUtils.del(key);
log.info("Cleaned up stale session for clientId={}, key={}", clientId, key);
}
}
}
/**
* 统计某路直播流当前活跃观看者数量。
*/
private int countSessionKeys(VideoId videoId) {
Set<String> keys = findSessionKeys(videoId);
return keys != null ? keys.size() : 0;
} }
@Override @Override
...@@ -408,4 +459,4 @@ public class LiveStreamServiceImpl implements ILiveStreamService { ...@@ -408,4 +459,4 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
// 直接执行真正停流 // 直接执行真正停流
return this.liveStop(videoId); return this.liveStop(videoId);
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment