diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java index a23a4590..413381c4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java @@ -97,9 +97,6 @@ public class ParentPlatform { @Schema(description = "是否作为消息通道") private boolean asMessageChannel; - @Schema(description = "是否作为消息通道") - private boolean autoPushChannel; - @Schema(description = "点播回复200OK使用次IP") private String sendStreamIp; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java index 8a08a315..96e68cf4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java @@ -32,6 +32,7 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; @@ -90,9 +91,8 @@ public class PlayController { @PathVariable String channelId) { log.info("[开始点播] deviceId:{}, channelId:{}, ", deviceId, channelId); - if (ObjectUtils.isEmpty(deviceId) || ObjectUtils.isEmpty(channelId)) { - throw new ControllerException(ErrorCode.ERROR400); - } + Assert.notNull(deviceId, "设备国标编号不可为NULL"); + Assert.notNull(channelId, "通道国标编号不可为NULL"); // 获取可用的zlm Device device = deviceService.getDevice(deviceId); MediaServer newMediaServerItem = playService.getNewMediaServerItem(device); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 7575f996..c6c0a050 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -24,6 +24,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -597,10 +598,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setPlayType(InviteStreamType.PUSH); if (streamPushItem != null) { // 从redis查询是否正在接收这个推流 - OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); - if (pushListItem != null) { - sendRtpItem.setServerId(pushListItem.getSeverId()); - sendRtpItem.setMediaServerId(pushListItem.getMediaServerId()); + MediaArrivalEvent mediaArrivalEvent = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); + if (mediaArrivalEvent != null) { + sendRtpItem.setServerId(mediaArrivalEvent.getServerId()); + sendRtpItem.setMediaServerId(mediaArrivalEvent.getMediaServer().getId()); redisCatchStorage.updateSendRTPSever(sendRtpItem); // 开始推流 diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java index 8273bf9d..78b50384 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java @@ -4,12 +4,16 @@ import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; +import lombok.Data; +import lombok.Getter; +import lombok.Setter; import java.util.Map; /** * 流到来事件 */ + public class MediaArrivalEvent extends MediaEvent { public MediaArrivalEvent(Object source) { super(source); @@ -28,55 +32,29 @@ public class MediaArrivalEvent extends MediaEvent { return mediaArrivalEvent; } + @Getter + @Setter private MediaInfo mediaInfo; + @Getter + @Setter private String callId; + @Getter + @Setter private OnStreamChangedHookParam hookParam; + @Getter + @Setter private StreamContent streamInfo; + @Getter + @Setter private Map paramMap; - public MediaInfo getMediaInfo() { - return mediaInfo; - } - - public void setMediaInfo(MediaInfo mediaInfo) { - this.mediaInfo = mediaInfo; - } + @Getter + @Setter + private String serverId; - public String getCallId() { - return callId; - } - - public void setCallId(String callId) { - this.callId = callId; - } - - public OnStreamChangedHookParam getHookParam() { - return hookParam; - } - - public void setHookParam(OnStreamChangedHookParam hookParam) { - this.hookParam = hookParam; - } - - public StreamContent getStreamInfo() { - return streamInfo; - } - - public void setStreamInfo(StreamContent streamInfo) { - this.streamInfo = streamInfo; - } - - - public Map getParamMap() { - return paramMap; - } - - public void setParamMap(Map paramMap) { - this.paramMap = paramMap; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 869d39bf..2b70d6ec 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -216,7 +216,7 @@ public interface IRedisCatchStorage { void addPushListItem(String app, String stream, MediaArrivalEvent param); - OnStreamChangedHookParam getPushListItem(String app, String stream); + MediaArrivalEvent getPushListItem(String app, String stream); void removePushListItem(String app, String stream, String mediaServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index fc0fcbc3..86d36228 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -696,9 +696,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public OnStreamChangedHookParam getPushListItem(String app, String stream) { + public MediaArrivalEvent getPushListItem(String app, String stream) { String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; - return (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key); + return (MediaArrivalEvent)redisTemplate.opsForValue().get(key); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java b/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java index 782db5ef..86036a05 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java @@ -70,6 +70,12 @@ public class StreamPush extends CommonGBChannel implements Comparable> batchStop(Integer id){ + Assert.notNull(id, "推流ID不可为NULL"); + DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + result.onTimeout(()->{ + WVPResult fail = WVPResult.fail(ErrorCode.ERROR100.getCode(), "等待推流超时"); + result.setResult(fail); + }); + streamPushPlayService.start(id, streamInfo -> { + if (streamInfo != null) { + WVPResult success = WVPResult.success(new StreamContent(streamInfo)); + result.setResult(success); + } + }); + return result; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java index 53acd282..ba5f09f1 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java @@ -116,7 +116,7 @@ public interface StreamPushMapper { Map getAllGBId(); @Select("SELECT st.*, st.id as stream_push_id, wdc.*, wdc.id as gb_id FROM wvp_stream_push st LEFT join wvp_device_channel wdc on st.id = wdc.stream_push_id WHERE st.id=#{id}") - StreamPush select(@Param("id") int id); + StreamPush queryOne(@Param("id") int id); @Select("") void batchDel(List streamPushList); + } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java new file mode 100644 index 00000000..0157abbd --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java @@ -0,0 +1,8 @@ +package com.genersoft.iot.vmp.streamPush.service; + +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.common.StreamInfo; + +public interface IStreamPushPlayService { + void start(Integer id, CommonCallback callback); +} diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java new file mode 100644 index 00000000..47703230 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java @@ -0,0 +1,206 @@ +package com.genersoft.iot.vmp.streamPush.service.impl; + +import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.common.InviteInfo; +import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.streamPush.bean.StreamPush; +import com.genersoft.iot.vmp.streamPush.dao.StreamPushMapper; +import com.genersoft.iot.vmp.streamPush.service.IStreamPushPlayService; +import gov.nist.javax.sip.message.SIPResponse; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; + +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import javax.sip.message.Response; +import java.text.ParseException; +import java.util.UUID; + +@Service +@Slf4j +@DS("master") +public class StreamPushPlayServiceImpl implements IStreamPushPlayService { + + @Autowired + private StreamPushMapper streamPushMapper; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private UserSetting userSetting; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private HookSubscribe subscribe; + + @Override + public void start(Integer id, CommonCallback callback) { + StreamPush streamPush = streamPushMapper.queryOne(id); + Assert.notNull(streamPush, "推流信息未找到"); +// if (streamPush.isPushing() && streamPush.getMediaServerId() != null) { +// // 检查流是否准备就绪 +// MediaServer mediaServer = mediaServerService.getOne(streamPush.getMediaServerId()); +// if (mediaServer != null) { +// Boolean streamReady = mediaServerService.isStreamReady(mediaServer, streamPush.getApp(), streamPush.getStream()); +// if (streamReady != null && streamReady) { +// String callId = null; +// StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(streamPush.getApp(), streamPush.getStream()); +// if (streamAuthorityInfo != null) { +// callId = streamAuthorityInfo.getCallId(); +// } +// callback.run(mediaServerService.getStreamInfoByAppAndStream(mediaServer, +// streamPush.getApp(), streamPush.getStream(), null, callId)); +// return; +// } +// } +// } +// Assert.isTrue(streamPush.isAutoPushChannel(), "通道未推流"); +// // 发送redis消息,通知流上线 +// String timeOutTaskKey = UUID.randomUUID().toString(); +// Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamPush.getApp(), streamPush.getStream(), null); +// // 开启流上线监听 +// subscribe.addSubscribe(rtpHook, (hookData) -> { +// dynamicTask.stop(timeOutTaskKey); +// subscribe.removeSubscribe(rtpHook); +// if (hookData == null) { +// return; +// } +// String callId = null; +// StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(streamPush.getApp(), streamPush.getStream()); +// if (streamAuthorityInfo != null) { +// callId = streamAuthorityInfo.getCallId(); +// } +// callback.run(mediaServerService.getStreamInfoByAppAndStream(hookData.getMediaServer(), +// streamPush.getApp(), streamPush.getStream(), null, callId)); +// }); +// // 设置超时事件 +// dynamicTask.startDelay(timeOutTaskKey, () -> { +// // 取消流监听 +// subscribe.removeSubscribe(rtpHook); +// }, userSetting.getPlayTimeout()); + // 发送redis消息, 同时监听可能返回的拒绝消息 + + + MediaArrivalEvent pushListItem = redisCatchStorage.getPushListItem(streamPush.getApp(), streamPush.getStream()); + if (pushListItem != null) { + String callId = null; + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(streamPush.getApp(), streamPush.getStream()); + if (streamAuthorityInfo != null) { + callId = streamAuthorityInfo.getCallId(); + } + callback.run(mediaServerService.getStreamInfoByAppAndStream(pushListItem.getMediaServer(), + streamPush.getApp(), streamPush.getStream(), null, callId)); + return; + } + Assert.isTrue(streamPush.isAutoPushChannel(), "通道未推流"); + // 发送redis消息以使设备上线,流上线后被 + log.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", streamPush.getApp(), streamPush.getStream()); + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, + streamPush.getApp(), streamPush.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), + platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + // 设置超时 + dynamicTask.startDelay(sendRtpItem.getCallId(), () -> { + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + log.info("[ app={}, stream={} ] 等待设备开始推流超时", streamPush.getApp(), streamPush.getStream()); + try { + responseAck(request, Response.REQUEST_TIMEOUT); // 超时 + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("未处理的异常 ", e); + } + }, userSetting.getPlatformPlayTimeout()); + // + long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> { + dynamicTask.stop(sendRtpItem.getCallId()); + if (sendRtpItemKey == null) { + log.warn("[级联点播] 等待推流得到结果未空: {}/{}", streamPush.getApp(), streamPush.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("未处理的异常 ", e); + } + return; + } + SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey); + if (sendRtpItemFromRedis == null) { + log.warn("[级联点播] 等待推流, 未找到redis中缓存的发流信息: {}/{}", streamPush.getApp(), streamPush.getStream()); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("未处理的异常 ", e); + } + return; + } + if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) { + log.info("[级联点播] 等待的推流在本平台上线 {}/{}", streamPush.getApp(), streamPush.getStream()); + int localPort = sendRtpPortManager.getNextPort(mediaServerItem); + if (localPort == 0) { + log.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("未处理的异常 ", e); + } + return; + } + sendRtpItem.setLocalPort(localPort); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sendRtpItem.setLocalIp(platform.getSendStreamIp()); + } + + // 写入redis, 超时时回复 + sendRtpItem.setStatus(1); + SIPResponse response = sendStreamAck(request, sendRtpItem, platform); + if (response != null) { + sendRtpItem.setToTag(response.getToTag()); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } else { + // 其他平台内容 + otherWvpPushStream(sendRtpItemFromRedis, request, platform); + } + }); + // 添加回复的拒绝或者错误的通知 + // redis消息例如: PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":1,"msg":"失败","app":"1","stream":"2"}' + redisPushStreamResponseListener.addEvent(streamPush.getApp(), streamPush.getStream(), response -> { + if (response.getCode() != 0) { + dynamicTask.stop(sendRtpItem.getCallId()); + redisRpcService.stopWaitePushStreamOnline(sendRtpItem); + redisRpcService.removeCallback(key); + try { + responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage()); + } + } + }); + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index c7194311..1f388145 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -251,7 +251,7 @@ public class StreamPushServiceImpl implements IStreamPushService { Assert.notNull(streamPush, "推流信息不可为NULL"); Assert.isTrue(streamPush.getId() > 0, "推流信息ID必须存在"); log.info("[更新推流]:id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream()); - StreamPush streamPushInDb = streamPushMapper.select(streamPush.getId()); + StreamPush streamPushInDb = streamPushMapper.queryOne(streamPush.getId()); if (!streamPushInDb.getApp().equals(streamPush.getApp()) || !streamPushInDb.getStream().equals(streamPush.getStream())) { // app或者stream变化 StreamPush streamPushInDbForAppAndStream = streamPushMapper.selectByAppAndStream(streamPush.getApp(), streamPush.getStream()); @@ -538,7 +538,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public void updatePushStatus(Integer streamPushId, boolean pushIng) { - StreamPush streamPushInDb = streamPushMapper.select(streamPushId); + StreamPush streamPushInDb = streamPushMapper.queryOne(streamPushId); streamPushInDb.setPushing(pushIng); if (userSetting.isUsePushingAsStatus()) { streamPushInDb.setGbStatus(pushIng?"ON":"OFF"); @@ -576,7 +576,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override @Transactional public int delete(int id) { - StreamPush streamPush = streamPushMapper.select(id); + StreamPush streamPush = streamPushMapper.queryOne(id); if (streamPush == null) { return 0; } diff --git a/web_src/src/components/StreamPushEdit.vue b/web_src/src/components/StreamPushEdit.vue index 7973e25d..4fbdb0bd 100644 --- a/web_src/src/components/StreamPushEdit.vue +++ b/web_src/src/components/StreamPushEdit.vue @@ -13,7 +13,8 @@ - + + 基础信息 @@ -21,10 +22,14 @@ - + + 策略 + + + 拉起离线推流 + - 少年包青天 保存 diff --git a/web_src/src/components/StreamPushList.vue b/web_src/src/components/StreamPushList.vue index 4753ea9a..d8d32835 100755 --- a/web_src/src/components/StreamPushList.vue +++ b/web_src/src/components/StreamPushList.vue @@ -71,9 +71,7 @@