diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index f83eaf1c..873feab5 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -46,6 +46,9 @@ public class DynamicTask { * @return */ public void startCron(String key, Runnable task, int cycleForCatalog) { + if(ObjectUtils.isEmpty(key)) { + return; + } ScheduledFuture future = futureMap.get(key); if (future != null) { if (future.isCancelled()) { @@ -74,6 +77,9 @@ public class DynamicTask { * @return */ public void startDelay(String key, Runnable task, int delay) { + if(ObjectUtils.isEmpty(key)) { + return; + } stop(key); // 获取执行的时刻 @@ -100,9 +106,12 @@ public class DynamicTask { } public boolean stop(String key) { + if(ObjectUtils.isEmpty(key)) { + return false; + } boolean result = false; if (!ObjectUtils.isEmpty(futureMap.get(key)) && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) { - result = futureMap.get(key).cancel(false); + result = futureMap.get(key).cancel(true); futureMap.remove(key); runnableMap.remove(key); } @@ -110,6 +119,9 @@ public class DynamicTask { } public boolean contains(String key) { + if(ObjectUtils.isEmpty(key)) { + return false; + } return futureMap.get(key) != null; } @@ -118,6 +130,9 @@ public class DynamicTask { } public Runnable get(String key) { + if(ObjectUtils.isEmpty(key)) { + return null; + } return runnableMap.get(key); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 374e5dcc..b6aac9c7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -138,7 +138,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); } try { - logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 5c577ba6..7b9f69a3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -33,7 +33,7 @@ import java.text.ParseException; public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { - private Logger logger = LoggerFactory.getLogger(KeepaliveNotifyMessageHandler.class); + private final Logger logger = LoggerFactory.getLogger(KeepaliveNotifyMessageHandler.class); private final static String cmdType = "Keepalive"; @Autowired @@ -59,14 +59,19 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp // 未注册的设备不做处理 return; } - logger.info("[收到心跳], device: {}", device.getDeviceId()); SIPRequest request = (SIPRequest) evt.getRequest(); + logger.info("[收到心跳], device: {}, callId: {}", device.getDeviceId(), request.getCallIdHeader().getCallId()); + // 回复200 OK try { responseAck(request, Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 心跳回复: {}", e.getMessage()); } + if (DateUtil.getDifferenceForNow(device.getKeepaliveTime()) <= 3000L){ + logger.info("[收到心跳] 心跳发送过于频繁,已忽略 device: {}, callId: {}", device.getDeviceId(), request.getCallIdHeader().getCallId()); + return; + } RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress()); if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) { @@ -80,7 +85,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp }else { long lastTime = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(device.getKeepaliveTime()); if (System.currentTimeMillis()/1000-lastTime > 10) { - device.setKeepaliveIntervalTime(new Long(System.currentTimeMillis()/1000-lastTime).intValue()); + device.setKeepaliveIntervalTime(Long.valueOf(System.currentTimeMillis()/1000-lastTime).intValue()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 9b2864fd..465aa2f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -494,6 +494,7 @@ public class ZLMHttpHookListener { Device device = deviceService.getDevice(inviteInfo.getDeviceId()); if (device != null) { try { + // 多查询一次防止已经被处理了 InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); if (info != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java index f4128163..6e460823 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java @@ -98,6 +98,9 @@ public class InviteStreamServiceImpl implements IInviteStreamService { "_" + inviteInfo.getChannelId() + "_" + stream; inviteInfoInDb.setStream(stream); + if (inviteInfoInDb.getSsrcInfo() != null) { + inviteInfoInDb.getSsrcInfo().setStream(stream); + } redisTemplate.opsForValue().set(key, inviteInfoInDb); return inviteInfoInDb; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 142b8100..580561b3 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -151,9 +151,14 @@ public class MediaServerServiceImpl implements IMediaServerService { if (streamId == null) { streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase(); } + int ssrcCheckParam = 0; + if (ssrcCheck && tcpMode > 1) { + // 目前zlm不支持 tcp模式更新ssrc,暂时关闭ssrc校验 + logger.warn("[openRTPServer] TCP被动/TCP主动收流时,默认关闭ssrc检验"); + } int rtpServerPort; if (mediaServerItem.isRtpEnable()) { - rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port, reUsePort, tcpMode); + rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, (ssrcCheck && tcpMode == 0)?Integer.parseInt(ssrc):0, port, reUsePort, tcpMode); } else { rtpServerPort = mediaServerItem.getRtpProxyPort(); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 113c8592..260b9a42 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; @@ -34,6 +35,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -95,7 +97,6 @@ public class PlayServiceImpl implements IPlayService { @Autowired private VideoStreamSessionManager streamSession; - @Autowired private IDeviceService deviceService; @@ -108,25 +109,25 @@ public class PlayServiceImpl implements IPlayService { @Autowired private ZlmHttpHookSubscribe subscribe; - @Autowired - private SSRCFactory ssrcFactory; - - @Autowired - private RedisTemplate redisTemplate; - @Override public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback) { if (mediaServerItem == null) { + logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); } Device device = redisCatchStorage.getDevice(deviceId); + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) { + logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流"); + } InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); if (inviteInfo != null ) { if (inviteInfo.getStreamInfo() == null) { // 点播发起了但是尚未成功, 仅注册回调等待结果即可 inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback); + logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId); return inviteInfo.getSsrcInfo(); }else { StreamInfo streamInfo = inviteInfo.getStreamInfo(); @@ -149,6 +150,7 @@ public class PlayServiceImpl implements IPlayService { InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + logger.info("[点播已存在] 直接返回, deviceId: {}, channelId: {}", device.getDeviceId(), channelId); return inviteInfo.getSsrcInfo(); }else { // 点播发起了但是尚未成功, 仅注册回调等待结果即可 @@ -171,7 +173,6 @@ public class PlayServiceImpl implements IPlayService { null); return null; } - // TODO 记录点播的状态 play(mediaServerItem, ssrcInfo, device, channelId, callback); return ssrcInfo; } @@ -187,8 +188,8 @@ public class PlayServiceImpl implements IPlayService { null); return; } - logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", - device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), + logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, STREAM:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", + device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getStream(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); //端口获取失败的ssrcInfo 没有必要发送点播指令 if (ssrcInfo.getPort() <= 0) { @@ -219,16 +220,6 @@ public class PlayServiceImpl implements IPlayService { device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getSsrc()); - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 -// InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId); -// if (inviteInfoForTimeout == null) { -// return; -// } -// if (InviteSessionStatus.ok == inviteInfoForTimeout.getStatus() ) { -// // TODO 发送bye -// }else { -// // TODO 发送cancel -// } callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); @@ -272,99 +263,10 @@ public class PlayServiceImpl implements IPlayService { logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流"); snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channelId, ssrcInfo.getStream()); - }, (event) -> { - inviteInfo.setStatus(InviteSessionStatus.ok); - - ResponseEvent responseEvent = (ResponseEvent) event.event; - String contentString = new String(responseEvent.getResponse().getRawContent()); - // 获取ssrc - int ssrcIndex = contentString.indexOf("y="); - // 检查是否有y字段 - if (ssrcIndex >= 0) { - //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 - String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim(); - // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 - if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback); - } - return; - } - logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); - if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { - logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - // 单端口模式streamId也有变化,重新设置监听即可 - if (!mediaServerItem.isRtpEnable()) { - // 添加订阅 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.removeSubscribe(hookSubscribe); - String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); - hookSubscribe.getContent().put("stream", stream); - inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); - dynamicTask.stop(timeOutTaskKey); - // hook响应 - StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId); - if (streamInfo == null){ - callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); - return; - } - callback.run(InviteErrorCode.SUCCESS.getCode(), - InviteErrorCode.SUCCESS.getMsg(), streamInfo); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.SUCCESS.getCode(), - InviteErrorCode.SUCCESS.getMsg(), - streamInfo); - snapOnPlay(mediaServerItemInUse, device.getDeviceId(), channelId, stream); - }); - return; - } - - // 更新ssrc - Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); - if (!result) { - try { - logger.warn("[点播] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId); - cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); - } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage()); - } - - dynamicTask.stop(timeOutTaskKey); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - "下级自定义了ssrc,重新设置收流信息失败", null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - "下级自定义了ssrc,重新设置收流信息失败", null); - - }else { - if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) { - inviteStreamService.removeInviteInfo(inviteInfo); - } - ssrcInfo.setSsrc(ssrcInResponse); - inviteInfo.setSsrcInfo(ssrcInfo); - inviteInfo.setStream(ssrcInfo.getStream()); - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback); - } - } - }else { - logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); - } - } - inviteStreamService.updateInviteInfo(inviteInfo); + }, (eventResult) -> { + // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 + InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, + timeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAY); }, (event) -> { dynamicTask.stop(timeOutTaskKey); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); @@ -548,19 +450,23 @@ public class PlayServiceImpl implements IPlayService { String endTime, ErrorCallback callback) { Device device = storager.queryVideoDevice(deviceId); if (device == null) { - return; + logger.warn("[录像回放] 未找到设备 deviceId: {},channelId:{}", deviceId, channelId); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备:" + deviceId); } + MediaServerItem newMediaServerItem = getNewMediaServerItem(device); + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && ! newMediaServerItem.isRtpEnable()) { + logger.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流"); + } String stream = null; if (newMediaServerItem.isRtpEnable()) { String startTimeStr = startTime.replace("-", "") .replace(":", "") .replace(" ", ""); - System.out.println(startTimeStr); String endTimeTimeStr = endTime.replace("-", "") .replace(":", "") .replace(" ", ""); - System.out.println(endTimeTimeStr); stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr; } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); @@ -636,84 +542,13 @@ public class PlayServiceImpl implements IPlayService { try { cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, hookEvent, eventResult -> { - inviteInfo.setStatus(InviteSessionStatus.ok); - ResponseEvent responseEvent = (ResponseEvent) eventResult.event; - String contentString = new String(responseEvent.getResponse().getRawContent()); - // 获取ssrc - int ssrcIndex = contentString.indexOf("y="); - // 检查是否有y字段 - if (ssrcIndex >= 0) { - //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 - String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 - if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, playBackTimeOutTaskKey, ssrcInfo, callback); - } - return; - } - logger.info("[录像回放] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); - if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { - logger.info("[录像回放] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); + // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 + InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, + playBackTimeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAYBACK); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - // 单端口模式streamId也有变化,需要重新设置监听 - if (!mediaServerItem.isRtpEnable()) { - // 添加订阅 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.removeSubscribe(hookSubscribe); - String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); - hookSubscribe.getContent().put("stream", stream); - inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); - dynamicTask.stop(playBackTimeOutTaskKey); - // hook响应 - hookEvent.response(mediaServerItemInUse, hookParam); - }); - } - // 更新ssrc - Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); - if (!result) { - try { - logger.warn("[录像回放] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId); - cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); - } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage()); - - } - - dynamicTask.stop(playBackTimeOutTaskKey); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - "下级自定义了ssrc,重新设置收流信息失败", null); - - }else { - if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) { - inviteStreamService.removeInviteInfo(inviteInfo); - } - - ssrcInfo.setSsrc(ssrcInResponse); - inviteInfo.setSsrcInfo(ssrcInfo); - inviteInfo.setStream(ssrcInfo.getStream()); - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, playBackTimeOutTaskKey, ssrcInfo, callback); - } - } - }else { - logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); - } - } - inviteStreamService.updateInviteInfo(inviteInfo); }, errorEvent); } catch (InvalidArgumentException | SipException | ParseException e) { - logger.error("[命令发送失败] 回放: {}", e.getMessage()); + logger.error("[命令发送失败] 录像回放: {}", e.getMessage()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent; @@ -724,6 +559,121 @@ public class PlayServiceImpl implements IPlayService { } + private void InviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, MediaServerItem mediaServerItem, + Device device, String channelId, String timeOutTaskKey, ErrorCallback callback, + InviteInfo inviteInfo, InviteSessionType inviteSessionType){ + inviteInfo.setStatus(InviteSessionStatus.ok); + ResponseEvent responseEvent = (ResponseEvent) eventResult.event; + String contentString = new String(responseEvent.getResponse().getRawContent()); + String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString); + if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { + // ssrc 一致 + if (mediaServerItem.isRtpEnable()) { + // 多端口 + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { + tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback); + } + }else { + // 单端口 + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { + logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); + } + + } + }else { + logger.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); + // ssrc 不一致 + if (mediaServerItem.isRtpEnable()) { + // 多端口 + if (device.isSsrcCheck()) { + // ssrc检验 + // 更新ssrc + logger.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); + if (!result) { + try { + logger.warn("[Invite 200OK] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId); + cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage()); + } + + dynamicTask.stop(timeOutTaskKey); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + + callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), + "下级自定义了ssrc,重新设置收流信息失败", null); + inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null, + InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), + "下级自定义了ssrc,重新设置收流信息失败", null); + + }else { + ssrcInfo.setSsrc(ssrcInResponse); + inviteInfo.setSsrcInfo(ssrcInfo); + inviteInfo.setStream(ssrcInfo.getStream()); + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { + if (mediaServerItem.isRtpEnable()) { + tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback); + }else { + logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); + } + } + inviteStreamService.updateInviteInfo(inviteInfo); + } + } + }else { + if (ssrcInResponse != null) { + // 单端口 + // 重新订阅流上线 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", + ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(), + inviteInfo.getChannelId(), null, inviteInfo.getStream()); + streamSession.remove(inviteInfo.getDeviceId(), + inviteInfo.getChannelId(), inviteInfo.getStream()); + + String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); + hookSubscribe.getContent().put("stream", stream); + + inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); + streamSession.put(device.getDeviceId(), channelId, ssrcTransaction.getCallId(), + stream, ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[Invite 200OK] ssrc修正后收到订阅消息: " + hookParam); + dynamicTask.stop(timeOutTaskKey); + subscribe.removeSubscribe(hookSubscribe); + // hook响应 + StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId); + if (streamInfo == null){ + callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null, + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + return; + } + callback.run(InviteErrorCode.SUCCESS.getCode(), + InviteErrorCode.SUCCESS.getMsg(), streamInfo); + inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null, + InviteErrorCode.SUCCESS.getCode(), + InviteErrorCode.SUCCESS.getMsg(), + streamInfo); + if (inviteSessionType == InviteSessionType.PLAY) { + snapOnPlay(mediaServerItemInUse, device.getDeviceId(), channelId, stream); + } + }); + } + } + } + } + + @Override public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { @@ -738,7 +688,17 @@ public class PlayServiceImpl implements IPlayService { null); return; } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); + String stream = null; + if (newMediaServerItem.isRtpEnable()) { + String startTimeStr = startTime.replace("-", "") + .replace(":", "") + .replace(" ", ""); + String endTimeTimeStr = endTime.replace("-", "") + .replace(":", "") + .replace(" ", ""); + stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr; + } + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback); } @@ -806,79 +766,9 @@ public class PlayServiceImpl implements IPlayService { try { cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, hookEvent, errorEvent, eventResult ->{ - inviteInfo.setStatus(InviteSessionStatus.ok); - ResponseEvent responseEvent = (ResponseEvent) eventResult.event; - String contentString = new String(responseEvent.getResponse().getRawContent()); - // 获取ssrc - int ssrcIndex = contentString.indexOf("y="); - // 检查是否有y字段 - if (ssrcIndex >= 0) { - //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 - String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 - if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, downLoadTimeOutTaskKey, ssrcInfo, callback); - } - return; - } - logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); - if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { - logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - // 单端口模式streamId也有变化,需要重新设置监听 - if (!mediaServerItem.isRtpEnable()) { - // 添加订阅 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.removeSubscribe(hookSubscribe); - String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); - hookSubscribe.getContent().put("stream", stream); - inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); - dynamicTask.stop(downLoadTimeOutTaskKey); - hookEvent.response(mediaServerItemInUse, hookParam); - }); - } - - // 更新ssrc - Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); - if (!result) { - try { - logger.warn("[录像下载] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId); - cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); - } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage()); - } - - dynamicTask.stop(downLoadTimeOutTaskKey); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - "下级自定义了ssrc,重新设置收流信息失败", null); - - }else { - if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) { - inviteStreamService.removeInviteInfo(inviteInfo); - } - ssrcInfo.setSsrc(ssrcInResponse); - inviteInfo.setSsrcInfo(ssrcInfo); - inviteInfo.setStream(ssrcInfo.getStream()); - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, downLoadTimeOutTaskKey, ssrcInfo, callback); - } - } - }else { - logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); - } - } - inviteStreamService.updateInviteInfo(inviteInfo); + // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 + InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, + downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD); }); } catch (InvalidArgumentException | SipException | ParseException e) { logger.error("[命令发送失败] 录像下载: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java index 923f8346..23cb9dac 100644 --- a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java @@ -7,6 +7,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAccessor; import java.util.Locale; @@ -106,4 +107,9 @@ public class DateUtil { LocalDateTime nowDateTime = LocalDateTime.now(); return formatterISO8601.format(nowDateTime); } + + public static long getDifferenceForNow(String keepaliveTime) { + Instant beforeInstant = Instant.from(formatter.parse(keepaliveTime)); + return ChronoUnit.MILLIS.between(beforeInstant, Instant.now()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index 445e42f4..3d233dcf 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -111,7 +111,7 @@ public class PlayController { wvpResult.setCode(ErrorCode.ERROR100.getCode()); wvpResult.setMsg("点播超时"); requestMessage.setData(wvpResult); - resultHolder.invokeResult(requestMessage); + resultHolder.invokeAllResult(requestMessage); inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); storager.stopPlay(deviceId, channelId); }); @@ -166,7 +166,7 @@ public class PlayController { } if (InviteSessionStatus.ok == inviteInfo.getStatus()) { try { - logger.warn("[停止点播] {}/{}", device.getDeviceId(), channelId); + logger.info("[停止点播] {}/{}", device.getDeviceId(), channelId); cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null); } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());