From 8892975d774752c298e0522d7268226edbbb6c16 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 26 Jan 2024 18:03:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/GB28181ResourceServiceImpl.java | 6 - .../iot/vmp/gb28181/bean/ControlCommand.java | 9 ++ .../request/impl/InviteRequestProcessor.java | 10 +- .../cmd/DeviceControlQueryMessageHandler.java | 140 ++++++++---------- .../vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../media/zlm/impl/StreamSendManagerImpl.java | 19 ++- .../iot/vmp/service/impl/PlayServiceImpl.java | 95 ++++++++---- 7 files changed, 150 insertions(+), 131 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/ControlCommand.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java index 3f777acb..d18049cd 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IResourcePlayCallback; import com.genersoft.iot.vmp.service.IResourceService; @@ -14,17 +13,12 @@ import com.genersoft.iot.vmp.service.bean.CommonGbChannelType; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; -import com.genersoft.iot.vmp.storager.impl.RedisCatchStorageImpl; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.StreamContent; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.net.MalformedURLException; -import java.net.URL; /** * 国标的资源实现类 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ControlCommand.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ControlCommand.java new file mode 100644 index 00000000..fb751d61 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ControlCommand.java @@ -0,0 +1,9 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +/** + * 国标28181控制指令 + */ +public class ControlCommand { + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 12a4793b..776c5650 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -83,6 +83,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired private IStreamSendManager streamSendManager; @Autowired @@ -94,15 +95,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private RedisPushStreamResponseListener redisPushStreamResponseListener; - @Autowired - private IPlayService playService; - @Autowired private Map resourceServiceMap; - @Autowired - private SIPSender sipSender; - @Autowired private ZLMServerFactory zlmServerFactory; @@ -121,7 +116,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private ZLMMediaListManager mediaListManager; - @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; @@ -226,7 +220,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements IResourcePlayCallback callback = (commonGbChannel, mediaServerItem, code, message, streamInfo) -> { if (code != 0) { - logger.info("[上级Invite] 获取资源流失败。{}, {}/{}", message, streamInfo.getApp(), streamInfo.getStream()); + logger.info("[上级Invite] channelId: {}, 获取资源流失败。{},", channelId, message); try { cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java index 9efc191d..3dc19cb2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java @@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.control.ControlMessageHandler; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IPlatformChannelService; +import com.genersoft.iot.vmp.service.IResourceService; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.Element; @@ -31,20 +32,19 @@ import javax.sip.address.SipURI; import javax.sip.message.Response; import java.text.ParseException; import java.util.List; +import java.util.Map; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*; @Component public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { - private Logger logger = LoggerFactory.getLogger(DeviceControlQueryMessageHandler.class); + private final Logger logger = LoggerFactory.getLogger(DeviceControlQueryMessageHandler.class); private final String cmdType = "DeviceControl"; @Autowired private ControlMessageHandler controlMessageHandler; - @Autowired - private IVideoManagerStorage storager; @Autowired private SIPCommander cmder; @@ -53,7 +53,7 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent private IPlatformChannelService platformChannelService; @Autowired - private IDeviceChannelService deviceChannelService; + private Map resourceServiceMap; @Qualifier("taskExecutor") @Autowired @@ -81,36 +81,6 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent if (!ObjectUtils.isEmpty(getText(rootElement, "TeleBoot"))) { // TODO 拒绝远程启动命令 logger.warn("[国标级联]收到平台的远程启动命令, 不处理"); - -// if (parentPlatform.getServerGBId().equals(targetGBId)) { -// // 远程启动本平台:需要在重新启动程序后先对SipStack解绑 -// logger.info("执行远程启动本平台命令"); -// try { -// cmderFroPlatform.unregister(parentPlatform, null, null); -// } catch (InvalidArgumentException | ParseException | SipException e) { -// logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage()); -// } -// taskExecutor.execute(() -> { -// // 远程启动 -//// try { -//// Thread.sleep(3000); -//// SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider"); -//// SipStackImpl stack = (SipStackImpl)up.getSipStack(); -//// stack.stop(); -//// Iterator listener = stack.getListeningPoints(); -//// while (listener.hasNext()) { -//// stack.deleteListeningPoint((ListeningPoint) listener.next()); -//// } -//// Iterator providers = stack.getSipProviders(); -//// while (providers.hasNext()) { -//// stack.deleteSipProvider((SipProvider) providers.next()); -//// } -//// VManageBootstrap.restart(); -//// } catch (InterruptedException | ObjectInUseException e) { -//// logger.error("[任务执行失败] 服务重启: {}", e.getMessage()); -//// } -// }); -// } } DeviceControlType deviceControlType = DeviceControlType.typeOf(rootElement); logger.info("[接受deviceControl命令] 命令: {}", deviceControlType); @@ -125,44 +95,35 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent } return; } - //判断是否存在该通道 - Device deviceForPlatform = deviceChannelService.getDeviceByChannelCommonGbId(commonGbChannel.getCommonGbId()); - if (deviceForPlatform == null) { - try { - responseAck(request, Response.NOT_FOUND); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 错误信息: {}", e.getMessage()); - } - return; - } + switch (deviceControlType) { case PTZ: - handlePtzCmd(deviceForPlatform, channelId, rootElement, request, DeviceControlType.PTZ); - break; - case ALARM: - handleAlarmCmd(deviceForPlatform, rootElement, request); - break; - case GUARD: - handleGuardCmd(deviceForPlatform, rootElement, request, DeviceControlType.GUARD); - break; - case RECORD: - handleRecordCmd(deviceForPlatform, channelId, rootElement, request, DeviceControlType.RECORD); - break; - case I_FRAME: - handleIFameCmd(deviceForPlatform, request, channelId); - break; - case TELE_BOOT: - handleTeleBootCmd(deviceForPlatform, request); - break; - case DRAG_ZOOM_IN: - handleDragZoom(deviceForPlatform, channelId, rootElement, request, DeviceControlType.DRAG_ZOOM_IN); - break; - case DRAG_ZOOM_OUT: - handleDragZoom(deviceForPlatform, channelId, rootElement, request, DeviceControlType.DRAG_ZOOM_OUT); - break; - case HOME_POSITION: - handleHomePositionCmd(deviceForPlatform, channelId, rootElement, request, DeviceControlType.HOME_POSITION); + handlePtzCmd(commonGbChannel, rootElement, request, DeviceControlType.PTZ); break; +// case ALARM: +// handleAlarmCmd(deviceForPlatform, rootElement, request); +// break; +// case GUARD: +// handleGuardCmd(deviceForPlatform, rootElement, request, DeviceControlType.GUARD); +// break; +// case RECORD: +// handleRecordCmd(deviceForPlatform, channelId, rootElement, request, DeviceControlType.RECORD); +// break; +// case I_FRAME: +// handleIFameCmd(deviceForPlatform, request, channelId); +// break; +// case TELE_BOOT: +// handleTeleBootCmd(deviceForPlatform, request); +// break; +// case DRAG_ZOOM_IN: +// handleDragZoom(deviceForPlatform, channelId, rootElement, request, DeviceControlType.DRAG_ZOOM_IN); +// break; +// case DRAG_ZOOM_OUT: +// handleDragZoom(deviceForPlatform, channelId, rootElement, request, DeviceControlType.DRAG_ZOOM_OUT); +// break; +// case HOME_POSITION: +// handleHomePositionCmd(deviceForPlatform, channelId, rootElement, request, DeviceControlType.HOME_POSITION); +// break; default: break; } @@ -171,21 +132,36 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent /** * 处理云台指令 - * - * @param device 设备 - * @param channelId 通道id - * @param rootElement - * @param request */ - private void handlePtzCmd(Device device, String channelId, Element rootElement, SIPRequest request, DeviceControlType type) { - String cmdString = getText(rootElement, type.getVal()); - try { - cmder.fronEndCmd(device, channelId, cmdString, - errorResult -> onError(request, errorResult), - okResult -> onOk(request, okResult)); - } catch (InvalidArgumentException | SipException | ParseException e) { - logger.error("[命令发送失败] 云台/前端: {}", e.getMessage()); + private void handlePtzCmd(CommonGbChannel commonGbChannel, Element rootElement, SIPRequest request, DeviceControlType type) { + IResourceService resourceService = resourceServiceMap.get(commonGbChannel.getType()); + if (resourceService == null) { + try { + responseAck(request, Response.FORBIDDEN); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 错误信息: {}", e.getMessage()); + } + return; } + // 解析云台控制参数 + +// resourceService.ptzControl(commonGbChannel) + + String cmdString = getText(rootElement, type.getVal()); + byte[] bytes = cmdString.getBytes(); + System.out.println(cmdString); + for (byte aByte : bytes) { + System.out.print(aByte); + System.out.print(" "); + } + System.out.println(" "); +// try { +// cmder.fronEndCmd(device, channelId, cmdString, +// errorResult -> onError(request, errorResult), +// okResult -> onOk(request, okResult)); +// } catch (InvalidArgumentException | SipException | ParseException e) { +// logger.error("[命令发送失败] 云台/前端: {}", e.getMessage()); +// } } /** diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 547f3727..99e7be8c 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -445,7 +445,7 @@ public class ZLMHttpHookListener { } if (!param.isRegist()) { List sendRtpItems = streamSendManager.getByAppAndStream(param.getApp(), param.getStream()); - if (!sendRtpItems.isEmpty()) { + if (sendRtpItems != null && !sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { String platformId = sendRtpItem.getDestId(); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java index a90de2e9..5ed2e031 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java @@ -30,7 +30,7 @@ public class StreamSendManagerImpl implements IStreamSendManager { @Override public void update(SendRtpItem sendRtpItem) { - if (sendRtpItem.getId() != null) { + if (sendRtpItem.getId() == null) { sendRtpItem.setId(UUID.randomUUID().toString()); } String dateId = datePrefix + sendRtpItem.getId(); @@ -94,6 +94,10 @@ public class StreamSendManagerImpl implements IStreamSendManager { return result; } + private SendRtpItem getById(String id) { + return (SendRtpItem)redisTemplate.opsForValue().get(datePrefix + id); + } + @Override public SendRtpItem getByCallId(String callId) { String dateId = (String) redisTemplate.opsForValue().get(getCallIdKey(callId)); @@ -147,24 +151,25 @@ public class StreamSendManagerImpl implements IStreamSendManager { @Override public void remove(SendRtpItem sendRtpItem) { - redisTemplate.delete(datePrefix); + String dateId = datePrefix + sendRtpItem.getId(); + redisTemplate.delete(dateId); if (sendRtpItem.getCallId() != null) { redisTemplate.delete(getCallIdKey(sendRtpItem.getCallId())); } if (sendRtpItem.getApp() != null && sendRtpItem.getStreamId() != null) { - redisTemplate.opsForSet().remove(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId())); + redisTemplate.opsForSet().remove(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId()), dateId); } if (sendRtpItem.getMediaServerId() != null) { - redisTemplate.opsForSet().remove(getMediaServerIdKey(sendRtpItem.getMediaServerId())); + redisTemplate.opsForSet().remove(getMediaServerIdKey(sendRtpItem.getMediaServerId()), dateId); } if (sendRtpItem.getDestId() != null) { - redisTemplate.opsForSet().remove(getDestIdKey(sendRtpItem.getDestId())); + redisTemplate.opsForSet().remove(getDestIdKey(sendRtpItem.getDestId()), dateId); } if (sendRtpItem.getSourceId() != null) { - redisTemplate.opsForSet().remove(getSourceIdKey(sendRtpItem.getSourceId())); + redisTemplate.opsForSet().remove(getSourceIdKey(sendRtpItem.getSourceId()), dateId); } if (sendRtpItem.getChannelId() != null) { - redisTemplate.opsForSet().remove(getChannelIdKey(sendRtpItem.getChannelId())); + redisTemplate.opsForSet().remove(getChannelIdKey(sendRtpItem.getChannelId()), dateId); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 435a1a9a..635d79aa 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -30,6 +30,7 @@ import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; import com.genersoft.iot.vmp.utils.CloudRecordUtils; @@ -107,7 +108,7 @@ public class PlayServiceImpl implements IPlayService { private DynamicTask dynamicTask; @Autowired - private CloudRecordServiceMapper cloudRecordServiceMapper; + private DeviceChannelMapper deviceChannelMapper; @Override @@ -155,7 +156,7 @@ public class PlayServiceImpl implements IPlayService { }else { // 点播发起了但是尚未成功, 仅注册回调等待结果即可 inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback); - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); + deviceChannelMapper.stopPlay(deviceId, channelId); inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); } } @@ -254,21 +255,37 @@ public class PlayServiceImpl implements IPlayService { // hook响应 StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId); if (streamInfo == null){ - callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + try { + callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + }catch (Exception e) { + logger.warn("[invite hook响应] 发送回调失败", e); + } + try { + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + }catch (Exception e) { + logger.warn("[invite] 发送回调失败", e); + } return; } - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.SUCCESS.getCode(), - InviteErrorCode.SUCCESS.getMsg(), - streamInfo); logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流"); snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channelId, ssrcInfo.getStream()); + try { + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + }catch (Exception e) { + logger.warn("[invite] 发送回调失败", e); + } + try { + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, + InviteErrorCode.SUCCESS.getCode(), + InviteErrorCode.SUCCESS.getMsg(), + streamInfo); + }catch (Exception e) { + logger.warn("[invite] 发送回调失败", e); + } }, (eventResult) -> { // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, @@ -281,14 +298,21 @@ public class PlayServiceImpl implements IPlayService { mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(), - String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null); - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); + + try { + callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(), + String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null); + }catch (Exception e) { + logger.warn("[invite] 发送回调失败", e); + } + try { + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, + InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), + String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null); + }catch (Exception e) { + logger.warn("[invite] 发送回调失败", e); + } }); } catch (InvalidArgumentException | SipException | ParseException e) { @@ -300,13 +324,20 @@ public class PlayServiceImpl implements IPlayService { streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), - InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), - InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); + try { + callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), + InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); + }catch (Exception exception) { + logger.warn("[invite] 发送回调失败", exception); + } + try { + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, + InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), + InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); + }catch (Exception exception) { + logger.warn("[invite] 发送回调失败", exception); + } } } @@ -320,6 +351,16 @@ public class PlayServiceImpl implements IPlayService { if (inviteInfo == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到"); } + // 调用zlm关闭流 + StreamInfo streamInfo = inviteInfo.getStreamInfo(); + if (streamInfo != null && streamInfo.getMediaServerId() != null) { + MediaServerItem mediaServerItem = mediaServerService.getOne(streamInfo.getMediaServerId()); + if (mediaServerItem != null) { + logger.info("[关闭流] {}/{}", streamInfo.getApp(), streamInfo.getStream()); + mediaService.closeStream(mediaServerItem, streamInfo.getApp(), streamInfo.getStream()); + } + } + if (InviteSessionStatus.ok == inviteInfo.getStatus()) { try { logger.info("[停止点播] {}/{}", deviceId, channelId); @@ -330,7 +371,7 @@ public class PlayServiceImpl implements IPlayService { } } inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); - storager.stopPlay(deviceId, channelId); + deviceChannelMapper.stopPlay(deviceId, channelId); } private void tcpActiveHandler(Device device, String channelId, String contentString, @@ -887,7 +928,7 @@ public class PlayServiceImpl implements IPlayService { public void zlmServerOffline(String mediaServerId) { // 处理正在向上推流的上级平台 List sendRtpItems = streamSendManager.getByMediaServerId(mediaServerId); - if (!sendRtpItems.isEmpty()) { + if (sendRtpItems != null && !sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { if (sendRtpItem.getMediaServerId().equals(mediaServerId)) { ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId());