diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index dda53d0b..5f51fb26 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -16,8 +16,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; +import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -93,6 +96,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private UserSetting userSetting; + @Autowired + private IStreamPushService pushService; + + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -116,7 +125,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In // 收流端发送的停止 if (sendRtpItem != null){ - logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType()); + logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType(), callIdHeader.getCallId()); String streamId = sendRtpItem.getStream(); Map param = new HashMap<>(); @@ -124,59 +133,82 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In param.put("app",sendRtpItem.getApp()); param.put("stream",streamId); param.put("ssrc",sendRtpItem.getSsrc()); - logger.info("[收到bye] 停止推流:{}", streamId); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), - callIdHeader.getCallId(), null); - zlmServerFactory.stopSendRtpStream(mediaInfo, param); - if (userSetting.getUseCustomSsrcForParentInvite()) { - mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); - } + logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId()); + if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { - ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); - if (platform != null) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(platform.getId()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + // 查询这路流是否是本平台的 + StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (push!= null && !push.isSelf()) { + // 不是本平台的就发送redis消息让其他wvp停止发流 + ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + if (platform != null) { + RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); + redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); + } }else { - logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), + callIdHeader.getCallId(), null); + zlmServerFactory.stopSendRtpStream(mediaInfo, param); + if (userSetting.getUseCustomSsrcForParentInvite()) { + mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); + } + + ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + if (platform != null) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(platform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + }else { + logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); + } + } + }else { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), + callIdHeader.getCallId(), null); + zlmServerFactory.stopSendRtpStream(mediaInfo, param); + if (userSetting.getUseCustomSsrcForParentInvite()) { + mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); } } + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaInfo != null) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { + // 来自上级平台的停止对讲 + logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + } - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { - // 来自上级平台的停止对讲 - logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - } - - int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); - if (totalReaderCount <= 0) { - logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); - if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { - Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); - if (device == null) { - logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); - } - try { - logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); + int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); + if (totalReaderCount <= 0) { + logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); + if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { + Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); + if (device == null) { + logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); + } + try { + logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { + logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); + } } } } } - // 可能是设备发送的停止 - SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId()); - if (ssrcTransaction == null) { - return; - } - logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); + // 可能是设备发送的停止 + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId()); + if (ssrcTransaction == null) { + return; + } + logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); ParentPlatform platform = platformService.queryPlatformByServerGBId(ssrcTransaction.getDeviceId()); if (platform != null ) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 3ecfc6ce..ac68bfea 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -618,9 +618,9 @@ public class ZLMHttpHookListener { } // 收到无人观看说明流也没有在往上级推送 if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( + List sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( inviteInfo.getChannelId()); - if (sendRtpItems.size() > 0) { + if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); try { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 15a1d7d9..737427e6 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -289,6 +289,10 @@ public class ZLMServerFactory { * 调用zlm RESTful API —— stopSendRtp */ public Boolean stopSendRtpStream(MediaServerItem mediaServerItem, Mapparam) { + if (mediaServerItem == null) { + logger.error("[停止RTP推流] 失败: 媒体节点为NULL"); + return false; + } Boolean result = false; JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param); if (jsonObject == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java new file mode 100755 index 00000000..fcba5111 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java @@ -0,0 +1,49 @@ +package com.genersoft.iot.vmp.service.bean; + +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; + + +public class RequestStopPushStreamMsg { + + + private SendRtpItem sendRtpItem; + + + private String platformName; + + + private int platFormIndex; + + public SendRtpItem getSendRtpItem() { + return sendRtpItem; + } + + public void setSendRtpItem(SendRtpItem sendRtpItem) { + this.sendRtpItem = sendRtpItem; + } + + public String getPlatformName() { + return platformName; + } + + public void setPlatformName(String platformName) { + this.platformName = platformName; + } + + + public int getPlatFormIndex() { + return platFormIndex; + } + + public void setPlatFormIndex(int platFormIndex) { + this.platFormIndex = platFormIndex; + } + + public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) { + RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg(); + streamMsg.setSendRtpItem(sendRtpItem); + streamMsg.setPlatformName(platformName); + streamMsg.setPlatFormIndex(platFormIndex); + return streamMsg; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java index cb118865..e9ee4cba 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java @@ -6,7 +6,17 @@ package com.genersoft.iot.vmp.service.bean; public class WvpRedisMsgCmd { + /** + * 请求获取推流信息 + */ public static final String GET_SEND_ITEM = "GetSendItem"; + /** + * 请求推流的请求 + */ public static final String REQUEST_PUSH_STREAM = "RequestPushStream"; + /** + * 停止推流的请求 + */ + public static final String REQUEST_STOP_PUSH_STREAM = "RequestStopPushStream"; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 65bd9f5d..a1c56ee8 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -262,6 +262,8 @@ public class DeviceServiceImpl implements IDeviceService { int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30); // 设置最小值为30 dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000); + + catalogSubscribeTask.run(); return true; } @@ -295,6 +297,7 @@ public class DeviceServiceImpl implements IDeviceService { int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30); // 刷新订阅 dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog * 1000); + mobilePositionSubscribeTask.run(); return true; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index fb6f0686..29bd734e 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -133,7 +133,10 @@ public class RedisGbPlayMsgListener implements MessageListener { case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - + break; + case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM: + RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent()); + requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; default: break; @@ -397,6 +400,19 @@ public class RedisGbPlayMsgListener implements MessageListener { redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } + /** + * 发送请求推流的消息 + */ + public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) { + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, + WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg)); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { if (platformGbId == null) { platformGbId = "*"; @@ -423,4 +439,36 @@ public class RedisGbPlayMsgListener implements MessageListener { return null; } } + + /** + * 处理收到的请求推流的请求 + */ + private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) { + SendRtpItem sendRtpItem = streamMsg.getSendRtpItem(); + if (sendRtpItem == null) { + logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL"); + return; + } + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaInfo == null) { + // TODO 回复错误 + return; + } + Map param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + + if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) { + logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + // 发送redis消息 + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + } + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java index f741a105..8620e13b 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java @@ -73,7 +73,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); if (push != null) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( + List sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( push.getGbId()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index b663c5c6..78fd2801 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -181,7 +181,7 @@ public interface IRedisCatchStorage { */ void sendStreamPushRequestedMsgForStatus(); - List querySendRTPServerByChnnelId(String channelId); + List querySendRTPServerByChannelId(String channelId); List querySendRTPServerByStream(String stream); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 27bbdba6..18a037dd 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -184,7 +184,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public List querySendRTPServerByChnnelId(String channelId) { + public List querySendRTPServerByChannelId(String channelId) { if (channelId == null) { return null; }