diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java index 41a56cd6..928c305e 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java @@ -39,6 +39,8 @@ public class StreamInfo { private String endTime; private double progress; + private boolean pause; + public static class TransactionInfo{ public String callId; public String localTag; @@ -312,4 +314,12 @@ public class StreamInfo { public void setRtcs(String rtcs) { this.rtcs = rtcs; } + + public boolean isPause() { + return pause; + } + + public void setPause(boolean pause) { + this.pause = pause; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/exception/ServiceException.java b/src/main/java/com/genersoft/iot/vmp/conf/exception/ServiceException.java new file mode 100644 index 00000000..5566d4bb --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/exception/ServiceException.java @@ -0,0 +1,27 @@ +package com.genersoft.iot.vmp.conf.exception; + +/** + * @author lin + */ +public class ServiceException extends Exception{ + private String msg; + + + + public ServiceException(String msg) { + this.msg = msg; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + @Override + public String getMessage() { + return msg; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/exception/SsrcTransactionNotFoundException.java b/src/main/java/com/genersoft/iot/vmp/conf/exception/SsrcTransactionNotFoundException.java index 4038cd11..05015184 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/exception/SsrcTransactionNotFoundException.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/exception/SsrcTransactionNotFoundException.java @@ -39,12 +39,12 @@ public class SsrcTransactionNotFoundException extends Exception{ @Override public String getMessage() { StringBuffer msg = new StringBuffer(); - msg.append(StringFormatter.format("缓存事务信息未找到,device:%s channel: %s ", deviceId, channelId)); + msg.append(String.format("缓存事务信息未找到,device:%s channel: %s ", deviceId, channelId)); if (callId != null) { - msg.append("callId: " + callId); + msg.append(",callId: " + callId); } if (stream != null) { - msg.append("stream: " + stream); + msg.append(",stream: " + stream); } return msg.toString(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 2410e6d0..9eaf52a7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -129,6 +129,7 @@ public interface ISIPCommander { * 视频流停止 */ void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; + void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException; /** diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index b043cadf..e1689c94 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -521,10 +521,15 @@ public class ZLMHttpHookListener { if (sendRtpItem.getApp().equals(app)) { String platformId = sendRtpItem.getPlatformId(); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + Device device = deviceService.queryDevice(platformId); try { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - } catch (SipException | InvalidArgumentException | ParseException e) { + if (platform != null) { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + }else { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), stream, sendRtpItem.getCallId()); + } + } catch (SipException | InvalidArgumentException | ParseException | SsrcTransactionNotFoundException e) { logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } } @@ -587,19 +592,24 @@ public class ZLMHttpHookListener { storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId()); }else{ StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, streamId, null); - if (streamInfoForPlayBackCatch != null) { - Device device = deviceService.queryDevice(streamInfoForPlayCatch.getDeviceID()); - if (device != null) { - try { - cmder.streamByeCmd(device,streamInfoForPlayBackCatch.getChannelId(), - streamInfoForPlayBackCatch.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage()); + if (streamInfoForPlayBackCatch != null ) { + if (streamInfoForPlayBackCatch.isPause()) { + ret.put("close", false); + }else { + Device device = deviceService.queryDevice(streamInfoForPlayBackCatch.getDeviceID()); + if (device != null) { + try { + cmder.streamByeCmd(device,streamInfoForPlayBackCatch.getChannelId(), + streamInfoForPlayBackCatch.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { + logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage()); + } } + redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch.getDeviceID(), + streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null); } - redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch.getDeviceID(), - streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null); + }else { StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, streamId, null); // 进行录像下载时无人观看不断流 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 5e0d3171..c0ee6ab3 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -23,6 +23,9 @@ public class ZLMRESTfulUtils { private final static Logger logger = LoggerFactory.getLogger(ZLMRESTfulUtils.class); + + + public interface RequestCallback{ void run(JSONObject response); } @@ -322,10 +325,22 @@ public class ZLMRESTfulUtils { } public void getSnap(MediaServerItem mediaServerItem, String flvUrl, int timeout_sec, int expire_sec, String targetPath, String fileName) { - Map param = new HashMap<>(); + Map param = new HashMap<>(3); param.put("url", flvUrl); param.put("timeout_sec", timeout_sec); param.put("expire_sec", expire_sec); sendGetForImg(mediaServerItem, "getSnap", param, targetPath, fileName); } + + public JSONObject pauseRtpCheck(MediaServerItem mediaServerItem, String streamId) { + Map param = new HashMap<>(1); + param.put("stream_id", streamId); + return sendPost(mediaServerItem, "pauseRtpCheck",param, null); + } + + public JSONObject resumeRtpCheck(MediaServerItem mediaServerItem, String streamId) { + Map param = new HashMap<>(1); + param.put("stream_id", streamId); + return sendPost(mediaServerItem, "resumeRtpCheck",param, null); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 91b5a517..24e0dcfa 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -103,7 +103,6 @@ public class ZLMRTPServerFactory { param.put("stream_id", streamId); JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(mediaServerItem, param); if (jsonObject != null ) { - System.out.println(jsonObject); if (jsonObject.getInteger("code") == 0) { return createRTPServer(mediaServerItem, streamId, ssrc, port); }else { @@ -150,7 +149,6 @@ public class ZLMRTPServerFactory { param.put("stream_id", streamId); JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(serverItem, param); if (jsonObject != null ) { - System.out.println(jsonObject); if (jsonObject.getInteger("code") == 0) { result = jsonObject.getInteger("hit") == 1; }else { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 1e4dc133..07bddb27 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo; @@ -15,6 +16,10 @@ import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import org.springframework.web.context.request.async.DeferredResult; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; + /** * 点播处理 */ @@ -42,4 +47,8 @@ public interface IPlayService { StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream); void zlmServerOnline(String mediaServerId); + + void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; + + void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 30d42774..7e174030 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -9,10 +9,13 @@ import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.service.IDeviceService; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -822,4 +825,52 @@ public class PlayServiceImpl implements IPlayService { // } // })); } + + @Override + public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException { + String key = redisCatchStorage.queryPlaybackForKey(null, null, streamId, null); + StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null); + if (null == streamInfo) { + logger.warn("streamId不存在!"); + throw new ServiceException("streamId不存在"); + } + streamInfo.setPause(true); + RedisUtil.set(key, streamInfo); + MediaServerItem mediaServerItem = mediaServerService.getOne(streamInfo.getMediaServerId()); + if (null == mediaServerItem) { + logger.warn("mediaServer 不存在!"); + throw new ServiceException("mediaServer不存在"); + } + // zlm 暂停RTP超时检查 + JSONObject jsonObject = zlmresTfulUtils.pauseRtpCheck(mediaServerItem, streamId); + if (jsonObject == null || jsonObject.getInteger("code") != 0) { + throw new ServiceException("暂停RTP接收失败"); + } + Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + cmder.playPauseCmd(device, streamInfo); + } + + @Override + public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException { + String key = redisCatchStorage.queryPlaybackForKey(null, null, streamId, null); + StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null); + if (null == streamInfo) { + logger.warn("streamId不存在!"); + throw new ServiceException("streamId不存在"); + } + streamInfo.setPause(false); + RedisUtil.set(key, streamInfo); + MediaServerItem mediaServerItem = mediaServerService.getOne(streamInfo.getMediaServerId()); + if (null == mediaServerItem) { + logger.warn("mediaServer 不存在!"); + throw new ServiceException("mediaServer不存在"); + } + // zlm 暂停RTP超时检查 + JSONObject jsonObject = zlmresTfulUtils.resumeRtpCheck(mediaServerItem, streamId); + if (jsonObject == null || jsonObject.getInteger("code") != 0) { + throw new ServiceException("继续RTP接收失败"); + } + Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + cmder.playResumeCmd(device, streamInfo); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index f66b3011..ab9a831b 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -56,6 +56,8 @@ public interface IRedisCatchStorage { StreamInfo queryPlayback(String deviceId, String channelID, String stream, String callId); + String queryPlaybackForKey(String deviceId, String channelId, String stream, String callId); + void updatePlatformCatchInfo(ParentPlatformCatch parentPlatformCatch); ParentPlatformCatch queryPlatformCatchInfo(String platformGbId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 258b956b..f4118810 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; @@ -22,7 +21,6 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import java.util.*; @@ -127,6 +125,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override public StreamInfo queryPlayByStreamId(String streamId) { + System.out.println(String.format("%S_%s_%s_*", VideoManagerConstants.PLAYER_PREFIX, userSetting.getServerId(), streamId)); List playLeys = RedisUtil.scan(String.format("%S_%s_%s_*", VideoManagerConstants.PLAYER_PREFIX, userSetting.getServerId(), streamId)); if (playLeys == null || playLeys.size() == 0) { return null; @@ -165,6 +164,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public boolean startPlayback(StreamInfo stream, String callId) { + System.out.println(String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, + userSetting.getServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId)); return RedisUtil.set(String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, userSetting.getServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream); } @@ -285,6 +286,34 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } } + @Override + public String queryPlaybackForKey(String deviceId, String channelId, String stream, String callId) { + if (stream == null && callId == null) { + return null; + } + if (deviceId == null) { + deviceId = "*"; + } + if (channelId == null) { + channelId = "*"; + } + if (stream == null) { + stream = "*"; + } + if (callId == null) { + callId = "*"; + } + String key = String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, + userSetting.getServerId(), + deviceId, + channelId, + stream, + callId + ); + List streamInfoScan = RedisUtil.scan(key); + return (String) streamInfoScan.get(0); + } + @Override public void updatePlatformCatchInfo(ParentPlatformCatch parentPlatformCatch) { String key = VideoManagerConstants.PLATFORM_CATCH_PREFIX + userSetting.getServerId() + "_" + parentPlatformCatch.getId(); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java index 20860bfb..696fae0e 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java @@ -2,9 +2,11 @@ package com.genersoft.iot.vmp.vmanager.gb28181.playback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -45,6 +47,9 @@ public class PlaybackController { @Autowired private SIPCommander cmder; + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + @Autowired private IVideoManagerStorage storager; @@ -113,14 +118,11 @@ public class PlaybackController { @GetMapping("/pause/{streamId}") public void playPause(@PathVariable String streamId) { logger.info("playPause: "+streamId); - StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null); - if (null == streamInfo) { - logger.warn("streamId不存在!"); - throw new ControllerException(ErrorCode.ERROR400.getCode(), "streamId不存在"); - } - Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + try { - cmder.playPauseCmd(device, streamInfo); + playService.pauseRtp(streamId); + } catch (ServiceException e) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), e.getMessage()); } catch (InvalidArgumentException | ParseException | SipException e) { throw new ControllerException(ErrorCode.ERROR100.getCode(), e.getMessage()); } @@ -132,14 +134,10 @@ public class PlaybackController { @GetMapping("/resume/{streamId}") public void playResume(@PathVariable String streamId) { logger.info("playResume: "+streamId); - StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null); - if (null == streamInfo) { - logger.warn("streamId不存在!"); - throw new ControllerException(ErrorCode.ERROR400.getCode(), "streamId不存在"); - } - Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); try { - cmder.playResumeCmd(device, streamInfo); + playService.resumeRtp(streamId); + } catch (ServiceException e) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), e.getMessage()); } catch (InvalidArgumentException | ParseException | SipException e) { throw new ControllerException(ErrorCode.ERROR100.getCode(), e.getMessage()); }