Commit 50829767 by guoxuejian

feat: implement live refresh and restart functionality with event broadcasting

parent 4f9774ea
......@@ -2,6 +2,8 @@ package com.dji.sample.manage.service.impl;
import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.component.websocket.model.BizCodeEnum;
import com.dji.sample.component.websocket.service.IWebSocketMessageService;
import com.dji.sample.manage.model.dto.*;
import com.dji.sample.manage.model.param.DeviceQueryParam;
import com.dji.sample.manage.service.*;
......@@ -21,6 +23,8 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
......@@ -38,6 +42,7 @@ import java.util.stream.Collectors;
public class LiveStreamServiceImpl implements ILiveStreamService {
private static final int LIVE_ALREADY_STREAMING_CODE = 513003;
private static final int LIVE_NO_STREAM_CODE = 513011;
@Autowired
private ICapacityCameraService capacityCameraService;
......@@ -54,6 +59,9 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
@Autowired
private AbstractLivestreamService abstractLivestreamService;
@Autowired
private IWebSocketMessageService webSocketMessageService;
@Override
public CapacityDeviceDTO getLiveCapacity(String workspaceId, String deviceSn) {
......@@ -122,33 +130,49 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
@Override
public HttpResultResponse liveStart(LiveTypeDTO liveParam) {
return doLiveStart(liveParam, false);
}
/**
* 直播开流核心逻辑。
* @param liveParam 开流参数
* @param forceMqttPush true = 强制向设备发送 MQTT 推流指令(refresh 场景),
* false = 存在活跃会话时复用、跳过 MQTT(正常 start 场景)
*/
private HttpResultResponse doLiveStart(LiveTypeDTO liveParam, boolean forceMqttPush) {
// Check if this lens is available live.
HttpResultResponse<DeviceDTO> responseResult = this.checkBeforeLive(liveParam.getVideoId());
if (HttpResultResponse.CODE_SUCCESS != responseResult.getCode()) {
return responseResult;
}
ILivestreamUrl url = LiveStreamProperty.get(liveParam.getUrlType());
url = setExt(liveParam.getUrlType(), url, liveParam.getVideoId());
// ========== 优化:先查 Redis,已有活跃会话则直接复用,跳过 MQTT 推流 ==========
Set<String> existingSessions = findSessionKeys(liveParam.getVideoId());
boolean streamAlreadyRunning = existingSessions != null && !existingSessions.isEmpty();
boolean streamAlreadyRunning = !forceMqttPush
&& existingSessions != null && !existingSessions.isEmpty();
String rtspOutputUrl = null; // RTSP 特殊:播放地址来自设备响应
if (streamAlreadyRunning) {
log.info("Stream already running with {} viewer(s), skip MQTT push and reuse. videoId={}",
existingSessions.size(), liveParam.getVideoId());
} else {
// 首个观看者,真正向设备发送 MQTT 推流指令
// 首个观看者 / 强制推流,真正向设备发送 MQTT 推流指令
if (forceMqttPush) {
log.info("Force MQTT push (refresh mode). videoId={}", liveParam.getVideoId());
}
// videoQuality 为空时默认 AUTO(refresh 场景前端可能不传)
VideoQualityEnum quality = liveParam.getVideoQuality() != null
? liveParam.getVideoQuality() : VideoQualityEnum.AUTO;
TopicServicesResponse<ServicesReplyData<String>> response = abstractLivestreamService.liveStartPush(
SDKManager.getDeviceSDK(responseResult.getData().getDeviceSn()),
new LiveStartPushRequest()
.setUrl(url)
.setUrlType(liveParam.getUrlType())
.setVideoId(liveParam.getVideoId())
.setVideoQuality(liveParam.getVideoQuality()));
.setVideoQuality(quality));
if (!response.getData().getResult().isSuccess()) {
int errorCode = response.getData().getResult().getCode();
......@@ -219,6 +243,120 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
}
@Override
public HttpResultResponse liveRestart(LiveTypeDTO liveParam) {
if (Objects.isNull(liveParam) || Objects.isNull(liveParam.getVideoId())) {
return HttpResultResponse.error("videoId is required");
}
if (!StringUtils.hasText(liveParam.getWorkspaceId())) {
return HttpResultResponse.error("workspaceId is required");
}
if (hitRestartDebounce(liveParam.getVideoId())) {
Map<String, Object> result = new HashMap<>(2);
result.put("debounced", true);
result.put("debounce_seconds", RedisConst.LIVE_RESTART_DEBOUNCE_SECOND);
return HttpResultResponse.success(result);
}
String eventId = UUID.randomUUID().toString();
long beginMs = System.currentTimeMillis();
LiveRestartNotifyDTO beginPayload = LiveRestartNotifyDTO.builder()
.eventId(eventId)
.workspaceId(liveParam.getWorkspaceId())
.videoId(liveParam.getVideoId())
.status("begin")
.reason(liveParam.getReason())
.operatorClientId(liveParam.getClientId())
.build();
broadcastRestartEvent(liveParam.getWorkspaceId(), BizCodeEnum.LIVE_RESTART_BEGIN, beginPayload);
try {
HttpResultResponse stopResult = this.liveForceStop(liveParam.getVideoId());
if (HttpResultResponse.CODE_SUCCESS != stopResult.getCode()) {
LiveRestartNotifyDTO failedPayload = LiveRestartNotifyDTO.builder()
.eventId(eventId)
.workspaceId(liveParam.getWorkspaceId())
.videoId(liveParam.getVideoId())
.status("failed")
.reason(liveParam.getReason())
.operatorClientId(liveParam.getClientId())
.errorCode(stopResult.getCode())
.errorMsg(stopResult.getMessage())
.durationMs(System.currentTimeMillis() - beginMs)
.build();
broadcastRestartEvent(liveParam.getWorkspaceId(), BizCodeEnum.LIVE_RESTART_FAILED, failedPayload);
return stopResult;
}
Thread.sleep(500L);
HttpResultResponse startResult = this.liveStart(liveParam);
if (HttpResultResponse.CODE_SUCCESS != startResult.getCode()) {
LiveRestartNotifyDTO failedPayload = LiveRestartNotifyDTO.builder()
.eventId(eventId)
.workspaceId(liveParam.getWorkspaceId())
.videoId(liveParam.getVideoId())
.status("failed")
.reason(liveParam.getReason())
.operatorClientId(liveParam.getClientId())
.errorCode(startResult.getCode())
.errorMsg(startResult.getMessage())
.durationMs(System.currentTimeMillis() - beginMs)
.build();
broadcastRestartEvent(liveParam.getWorkspaceId(), BizCodeEnum.LIVE_RESTART_FAILED, failedPayload);
return startResult;
}
LiveDTO live = startResult.getData() instanceof LiveDTO ? (LiveDTO) startResult.getData() : null;
LiveRestartNotifyDTO successPayload = LiveRestartNotifyDTO.builder()
.eventId(eventId)
.workspaceId(liveParam.getWorkspaceId())
.videoId(liveParam.getVideoId())
.status("success")
.reason(liveParam.getReason())
.operatorClientId(liveParam.getClientId())
.url(Objects.nonNull(live) ? live.getUrl() : null)
.sessionId(Objects.nonNull(live) ? live.getSessionId() : null)
.durationMs(System.currentTimeMillis() - beginMs)
.build();
broadcastRestartEvent(liveParam.getWorkspaceId(), BizCodeEnum.LIVE_RESTART_SUCCESS, successPayload);
return startResult;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Live restart interrupted. eventId={}, videoId={}", eventId, liveParam.getVideoId(), e);
LiveRestartNotifyDTO failedPayload = LiveRestartNotifyDTO.builder()
.eventId(eventId)
.workspaceId(liveParam.getWorkspaceId())
.videoId(liveParam.getVideoId())
.status("failed")
.reason(liveParam.getReason())
.operatorClientId(liveParam.getClientId())
.errorCode(HttpResultResponse.CODE_FAILED)
.errorMsg("restart interrupted")
.durationMs(System.currentTimeMillis() - beginMs)
.build();
broadcastRestartEvent(liveParam.getWorkspaceId(), BizCodeEnum.LIVE_RESTART_FAILED, failedPayload);
return HttpResultResponse.error("restart interrupted");
} catch (Exception e) {
log.error("Live restart failed unexpectedly. eventId={}, videoId={}", eventId, liveParam.getVideoId(), e);
LiveRestartNotifyDTO failedPayload = LiveRestartNotifyDTO.builder()
.eventId(eventId)
.workspaceId(liveParam.getWorkspaceId())
.videoId(liveParam.getVideoId())
.status("failed")
.reason(liveParam.getReason())
.operatorClientId(liveParam.getClientId())
.errorCode(HttpResultResponse.CODE_FAILED)
.errorMsg(e.getMessage())
.durationMs(System.currentTimeMillis() - beginMs)
.build();
broadcastRestartEvent(liveParam.getWorkspaceId(), BizCodeEnum.LIVE_RESTART_FAILED, failedPayload);
return HttpResultResponse.error("restart failed");
}
}
@Override
public HttpResultResponse liveStop(VideoId videoId) {
HttpResultResponse<DeviceDTO> responseResult = this.checkBeforeLive(videoId);
if (HttpResultResponse.CODE_SUCCESS != responseResult.getCode()) {
......@@ -405,6 +543,33 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
return keys != null ? keys.size() : 0;
}
private String buildRestartDebounceKey(VideoId videoId) {
return RedisConst.LIVE_RESTART_DEBOUNCE_PREFIX
+ videoId.getDroneSn() + "_" + videoId.getPayloadIndex().toString();
}
private boolean hitRestartDebounce(VideoId videoId) {
String key = buildRestartDebounceKey(videoId);
if (RedisOpsUtils.checkExist(key)) {
return true;
}
RedisOpsUtils.setWithExpire(key, "1", RedisConst.LIVE_RESTART_DEBOUNCE_SECOND);
return false;
}
private void broadcastRestartEvent(String workspaceId, BizCodeEnum bizCode, LiveRestartNotifyDTO payload) {
if (!StringUtils.hasText(workspaceId)) {
log.warn("Skip restart broadcast because workspaceId is empty. payload={}", payload);
return;
}
try {
webSocketMessageService.sendBatch(workspaceId, bizCode.getCode(), payload);
} catch (Exception e) {
log.warn("Failed to broadcast restart event. workspaceId={}, bizCode={}, payload={}",
workspaceId, bizCode.getCode(), payload, e);
}
}
@Override
public HttpResultResponse liveStopSession(VideoId videoId, String sessionId) {
if (Objects.isNull(videoId) || !StringUtils.hasText(sessionId)) {
......@@ -459,4 +624,161 @@ public class LiveStreamServiceImpl implements ILiveStreamService {
// 直接执行真正停流
return this.liveStop(videoId);
}
@Override
public HttpResultResponse liveRefreshStop(VideoId videoId) {
if (Objects.isNull(videoId)) {
return HttpResultResponse.error("videoId is required");
}
// 只停设备流,不清除会话(其他客户端保持 session_id 等待重新推流)
HttpResultResponse stopResult = this.liveStop(videoId);
if (HttpResultResponse.CODE_SUCCESS != stopResult.getCode()) {
// 513011: 设备本身已无活跃流(可能前端 stopSession 已触发了真正停流),视为成功
if (stopResult.getCode() == LIVE_NO_STREAM_CODE) {
log.info("RefreshStop: device reports no active stream (513011), treating as success. videoId={}", videoId);
return HttpResultResponse.success();
}
// 其他错误:停流失败,清除所有会话(兜底逻辑)
Set<String> keys = findSessionKeys(videoId);
if (keys != null && !keys.isEmpty()) {
keys.forEach(RedisOpsUtils::del);
log.warn("RefreshStop failed (code={}), cleared {} session(s) as fallback. videoId={}",
stopResult.getCode(), keys.size(), videoId);
}
} else {
log.info("RefreshStop succeeded, sessions preserved for videoId={}", videoId);
}
return stopResult;
}
@Override
public HttpResultResponse liveRefresh(LiveTypeDTO liveParam) {
if (Objects.isNull(liveParam) || Objects.isNull(liveParam.getVideoId())) {
return HttpResultResponse.error("videoId is required");
}
if (!StringUtils.hasText(liveParam.getWorkspaceId())) {
return HttpResultResponse.error("workspaceId is required");
}
if (Objects.isNull(liveParam.getUrlType())) {
return HttpResultResponse.error("urlType is required");
}
// 防抖:同一路流短时间内不允许重复刷新
if (hitRefreshDebounce(liveParam.getVideoId())) {
Map<String, Object> result = new HashMap<>(2);
result.put("debounced", true);
result.put("debounce_seconds", RedisConst.LIVE_REFRESH_DEBOUNCE_SECOND);
return HttpResultResponse.success(result);
}
try {
// 1. 停设备流(不清会话),失败时内部已兜底清除会话
HttpResultResponse stopResult = this.liveRefreshStop(liveParam.getVideoId());
if (HttpResultResponse.CODE_SUCCESS != stopResult.getCode()) {
// refreshStop 内部已清除会话,广播失败通知
broadcastRefreshFailed(liveParam, "stop failed: " + stopResult.getMessage());
return stopResult;
}
Thread.sleep(500L);
// 2. 强制 MQTT 推流(绕过会话复用检查)
HttpResultResponse startResult = this.doLiveStart(liveParam, true);
if (HttpResultResponse.CODE_SUCCESS != startResult.getCode()) {
// start 失败:设备流已停但会话还在,必须清除并通知
clearAllSessions(liveParam.getVideoId(), "refresh start failed");
broadcastRefreshFailed(liveParam, "start failed: " + startResult.getMessage());
return startResult;
}
// 3. 广播成功通知,其他客户端复用 session_id 等待 MQTT 推流恢复
LiveDTO live = startResult.getData() instanceof LiveDTO ? (LiveDTO) startResult.getData() : null;
LiveRestartNotifyDTO notifyPayload = LiveRestartNotifyDTO.builder()
.workspaceId(liveParam.getWorkspaceId())
.videoId(liveParam.getVideoId())
.status("refresh")
.reason(liveParam.getReason())
.operatorClientId(liveParam.getClientId())
.url(Objects.nonNull(live) ? live.getUrl() : null)
.build();
broadcastRefreshEvent(liveParam.getWorkspaceId(), notifyPayload);
return startResult;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Live refresh interrupted. videoId={}", liveParam.getVideoId(), e);
clearAllSessions(liveParam.getVideoId(), "refresh interrupted");
broadcastRefreshFailed(liveParam, "refresh interrupted");
return HttpResultResponse.error("refresh interrupted");
} catch (Exception e) {
log.error("Live refresh failed unexpectedly. videoId={}", liveParam.getVideoId(), e);
clearAllSessions(liveParam.getVideoId(), "refresh exception");
broadcastRefreshFailed(liveParam, "refresh failed");
return HttpResultResponse.error("refresh failed");
}
}
/**
* 清除指定 videoId 的所有会话(兜底用)。
*/
private void clearAllSessions(VideoId videoId, String reason) {
Set<String> keys = findSessionKeys(videoId);
if (keys != null && !keys.isEmpty()) {
keys.forEach(RedisOpsUtils::del);
log.warn("Cleared {} session(s) for videoId={}, reason={}", keys.size(), videoId, reason);
}
}
private String buildRefreshDebounceKey(VideoId videoId) {
return RedisConst.LIVE_REFRESH_DEBOUNCE_PREFIX
+ videoId.getDroneSn() + "_" + videoId.getPayloadIndex().toString();
}
private boolean hitRefreshDebounce(VideoId videoId) {
String key = buildRefreshDebounceKey(videoId);
if (RedisOpsUtils.checkExist(key)) {
return true;
}
RedisOpsUtils.setWithExpire(key, "1", RedisConst.LIVE_REFRESH_DEBOUNCE_SECOND);
return false;
}
private void broadcastRefreshEvent(String workspaceId, LiveRestartNotifyDTO payload) {
if (!StringUtils.hasText(workspaceId)) {
log.warn("Skip refresh broadcast because workspaceId is empty. payload={}", payload);
return;
}
try {
webSocketMessageService.sendBatch(workspaceId, BizCodeEnum.LIVE_REFRESH_NOTIFY.getCode(), payload);
} catch (Exception e) {
log.warn("Failed to broadcast refresh event. workspaceId={}, payload={}",
workspaceId, payload, e);
}
}
private void broadcastRefreshFailed(LiveTypeDTO liveParam, String errorMsg) {
if (!StringUtils.hasText(liveParam.getWorkspaceId())) {
return;
}
try {
LiveRestartNotifyDTO failedPayload = LiveRestartNotifyDTO.builder()
.workspaceId(liveParam.getWorkspaceId())
.videoId(liveParam.getVideoId())
.status("failed")
.reason(liveParam.getReason())
.operatorClientId(liveParam.getClientId())
.errorMsg(errorMsg)
.build();
webSocketMessageService.sendBatch(
liveParam.getWorkspaceId(),
BizCodeEnum.LIVE_REFRESH_FAILED.getCode(),
failedPayload);
} catch (Exception e) {
log.warn("Failed to broadcast refresh-failed event. videoId={}",
liveParam.getVideoId(), e);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment