From 0ff4ed217de599e6c6336f0330787b7593641dc4 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 17 Oct 2022 12:39:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B6=88=E6=81=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86=E4=B8=AD=E5=AD=98=E5=9C=A8=E5=8F=AF=E8=83=BD=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E7=9A=84=E5=A4=84=E7=90=86=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/InviteRequestProcessor.java | 256 +++++++++++------- .../request/impl/NotifyRequestProcessor.java | 72 +++-- .../cmd/DeviceControlQueryMessageHandler.java | 2 +- .../cmd/KeepaliveNotifyMessageHandler.java | 49 ++-- .../MobilePositionNotifyMessageHandler.java | 16 +- .../query/cmd/CatalogQueryMessageHandler.java | 52 ++-- .../ConfigDownloadResponseMessageHandler.java | 23 +- .../DeviceControlResponseMessageHandler.java | 23 +- .../cmd/DeviceInfoResponseMessageHandler.java | 24 +- .../MobilePositionResponseMessageHandler.java | 15 +- .../PresetQueryResponseMessageHandler.java | 20 +- .../cmd/RecordInfoResponseMessageHandler.java | 168 ++++++------ 12 files changed, 414 insertions(+), 306 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index cfca8e1a..42ae826c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -132,7 +132,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (requesterId == null || channelId == null) { logger.info("无法从FromHeader的Address中获取到平台id,返回400"); // 参数不全, 发400,请求错误 - responseAck(serverTransaction, Response.BAD_REQUEST); + try { + responseAck(serverTransaction, Response.BAD_REQUEST); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); + } return; } @@ -141,6 +145,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform == null) { inviteFromDeviceHandle(serverTransaction, requesterId); + } else { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); @@ -158,7 +163,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // return; // } // 通道存在,发100,TRYING - responseAck(serverTransaction, Response.TRYING); + try { + responseAck(serverTransaction, Response.TRYING); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite TRYING: {}", e.getMessage()); + } } else if (channel == null && gbStream != null) { String mediaServerId = gbStream.getMediaServerId(); @@ -166,13 +175,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (mediaServerItem == null) { if ("proxy".equals(gbStream.getStreamType())) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - responseAck(serverTransaction, Response.GONE); + try { + responseAck(serverTransaction, Response.GONE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); + } return; } else { streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - responseAck(serverTransaction, Response.GONE); + try { + responseAck(serverTransaction, Response.GONE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); + } return; } } @@ -181,25 +198,47 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); if (streamPushItem == null) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - responseAck(serverTransaction, Response.GONE); + try { + responseAck(serverTransaction, Response.GONE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); + } return; } }else if("proxy".equals(gbStream.getStreamType())){ proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream()); if (proxyByAppAndStream == null) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - responseAck(serverTransaction, Response.GONE); + try { + responseAck(serverTransaction, Response.GONE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); + } return; } } } - responseAck(serverTransaction, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 + try { + responseAck(serverTransaction, Response.CALL_IS_BEING_FORWARDED); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite CALL_IS_BEING_FORWARDED: {}", e.getMessage()); + } } else if (catalog != null) { - responseAck(serverTransaction, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播 + try { + // 目录不支持点播 + responseAck(serverTransaction, Response.BAD_REQUEST, "catalog channel can not play"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 目录不支持点播: {}", e.getMessage()); + } return; } else { logger.info("通道不存在,返回404"); - responseAck(serverTransaction, Response.NOT_FOUND); // 通道不存在,发404,资源不存在 + try { + // 通道不存在,发404,资源不存在 + responseAck(serverTransaction, Response.NOT_FOUND); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 通道不存在: {}", e.getMessage()); + } return; } // 解析sdp消息, 使用jainsip 自带的sdp解析方式 @@ -270,7 +309,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (port == -1) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 - responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 + try { + // 不支持的格式,发415 + responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 不支持的格式: {}", e.getMessage()); + } return; } String username = sdp.getOrigin().getUsername(); @@ -283,13 +327,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); if (device == null) { logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); - responseAck(serverTransaction, Response.SERVER_INTERNAL_ERROR); + try { + responseAck(serverTransaction, Response.SERVER_INTERNAL_ERROR); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 未找到设备信息: {}", e.getMessage()); + } return; } mediaServerItem = playService.getNewMediaServerItem(device); if (mediaServerItem == null) { logger.warn("未找到可用的zlm"); - responseAck(serverTransaction, Response.BUSY_HERE); + try { + responseAck(serverTransaction, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite BUSY_HERE: {}", e.getMessage()); + } return; } SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, @@ -301,7 +353,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); - responseAck(serverTransaction, Response.BUSY_HERE); + try { + responseAck(serverTransaction, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + } return; } sendRtpItem.setCallId(callIdHeader.getCallId()); @@ -474,13 +530,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } } - - } catch (SipException | InvalidArgumentException | ParseException e) { - e.printStackTrace(); - logger.warn("sdp解析错误"); - e.printStackTrace(); } catch (SdpParseException e) { - e.printStackTrace(); + logger.error("sdp解析错误", e); } catch (SdpException e) { e.printStackTrace(); } @@ -492,7 +543,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private void pushProxyStream(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { + String channelId, String addressStr, String ssrc, String requesterId) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); if (streamReady) { // 自平台内容 @@ -502,7 +553,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); - responseAck(serverTransaction, Response.BUSY_HERE); + try { + responseAck(serverTransaction, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + } return; } if (tcpActive != null) { @@ -527,7 +582,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private void pushStream(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { + String channelId, String addressStr, String ssrc, String requesterId) { // 推流 if (streamPushItem.isSelf()) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); @@ -539,7 +594,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); - responseAck(serverTransaction, Response.BUSY_HERE); + try { + responseAck(serverTransaction, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + } return; } if (tcpActive != null) { @@ -577,15 +636,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private void notifyStreamOnline(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, - String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { + String channelId, String addressStr, String ssrc, String requesterId) { if ("proxy".equals(gbStream.getStreamType())) { // TODO 控制启用以使设备上线 logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); - responseAck(serverTransaction, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); + try { + responseAck(serverTransaction, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); + } } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { // 平台设置中关闭了拉起离线的推流则直接回复 - responseAck(serverTransaction, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); + try { + responseAck(serverTransaction, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); + } return; } // 发送redis消息以使设备上线 @@ -713,38 +780,28 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } redisCatchStorage.updateSendRTPSever(sendRtpItem); }, (wvpResult) -> { - try { - // 错误 - if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { - // 离线 - // 查询是否在本机上线了 - StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); - if (currentStreamPushItem.isPushIng()) { - // 在线状态 - pushStream(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } else { - // 不在线 拉起 - notifyStreamOnline(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } + // 错误 + if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { + // 离线 + // 查询是否在本机上线了 + StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); + if (currentStreamPushItem.isPushIng()) { + // 在线状态 + pushStream(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + + } else { + // 不在线 拉起 + notifyStreamOnline(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage()); } - - try { responseAck(serverTransaction, Response.BUSY_HERE); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[命令发送失败] 国标级联 点播回复 BUSY_HERE: {}", e.getMessage()); } - return; }); } @@ -782,14 +839,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return null; } - public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { + public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId) { // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); - responseAck(serverTransaction, Response.TRYING); - + try { + responseAck(serverTransaction, Response.TRYING); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); + } String contentString = new String(serverTransaction.getRequest().getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 String substring = contentString; @@ -803,51 +863,65 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (ssrcIndex > 0) { substring = contentString.substring(0, ssrcIndex); } - SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); + SessionDescription sdp = null; + try { + sdp = SdpFactory.getInstance().createSessionDescription(substring); + // 获取支持的格式 + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 查看是否支持PS 负载96 + int port = -1; + //boolean recvonly = false; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (int i = 0; i < mediaDescriptions.size(); i++) { + MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i); + Media media = mediaDescription.getMedia(); - // 获取支持的格式 - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - // 查看是否支持PS 负载96 - int port = -1; - //boolean recvonly = false; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (int i = 0; i < mediaDescriptions.size(); i++) { - MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i); - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("8")) { - port = media.getMediaPort(); - String protocol = media.getProtocol(); - // 区分TCP发流还是udp, 当前默认udp - if ("TCP/RTP/AVP".equals(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equals(setup)) { - tcpActive = true; - } else if ("passive".equals(setup)) { - tcpActive = false; + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("8")) { + port = media.getMediaPort(); + String protocol = media.getProtocol(); + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equals(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equals(setup)) { + tcpActive = true; + } else if ("passive".equals(setup)) { + tcpActive = false; + } } } + break; } - break; } + if (port == -1) { + logger.info("不支持的媒体格式,返回415"); + // 回复不支持的格式 + try { + responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 不支持的媒体格式,返回415, {}", e.getMessage()); + } + return; + } + String username = sdp.getOrigin().getUsername(); + String addressStr = sdp.getOrigin().getAddress(); + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); + } catch (SdpException e) { + logger.error("[SDP解析异常]", e); } - if (port == -1) { - logger.info("不支持的媒体格式,返回415"); - // 回复不支持的格式 - responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 - return; - } - String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getOrigin().getAddress(); - logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); + + } else { logger.warn("来自无效设备/平台的请求"); - responseAck(serverTransaction, Response.BAD_REQUEST); + try { + responseAck(serverTransaction, Response.BAD_REQUEST);; // 不支持的格式,发415 + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 来自无效设备/平台的请求, {}", e.getMessage()); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index f78ee181..930ddb59 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -93,46 +93,44 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Override public void process(RequestEvent evt) { + ServerTransaction serverTransaction = getServerTransaction(evt); try { - taskQueue.offer(new HandlerCatchData(evt, null, null)); - ServerTransaction serverTransaction = getServerTransaction(evt); responseAck(serverTransaction, Response.OK); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; - taskExecutor.execute(()-> { - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - Element rootElement = getRootElement(take.getEvt()); - if (rootElement == null) { - logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); - continue; - } - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - logger.info("接收到Catalog通知"); - processNotifyCatalogList(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("接收到Alarm通知"); - processNotifyAlarm(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("接收到MobilePosition通知"); - processNotifyMobilePosition(take.getEvt()); - } else { - logger.info("接收到消息:" + cmd); - } - } catch (DocumentException e) { - logger.error("处理NOTIFY消息时错误", e); - } - } - taskQueueHandlerRun = false; - }); - } - } catch (SipException | InvalidArgumentException | ParseException e) { + }catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); - } finally { - taskQueueHandlerRun = false; + } + taskQueue.offer(new HandlerCatchData(evt, null, null)); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(()-> { + while (!taskQueue.isEmpty()) { + try { + HandlerCatchData take = taskQueue.poll(); + Element rootElement = getRootElement(take.getEvt()); + if (rootElement == null) { + logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); + continue; + } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + + if (CmdType.CATALOG.equals(cmd)) { + logger.info("接收到Catalog通知"); + processNotifyCatalogList(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + logger.info("接收到Alarm通知"); + processNotifyAlarm(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + logger.info("接收到MobilePosition通知"); + processNotifyMobilePosition(take.getEvt()); + } else { + logger.info("接收到消息:" + cmd); + } + } catch (DocumentException e) { + logger.error("处理NOTIFY消息时错误", e); + } + } + taskQueueHandlerRun = false; + }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java index 855ad3b2..cd70dd0d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java @@ -112,10 +112,10 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent if (deviceForPlatform == null) { try { responseAck(serverTransaction, Response.NOT_FOUND); - return; } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 错误信息: {}", e.getMessage()); } + return; } try { cmder.fronEndCmd(deviceForPlatform, channelId, cmdString, eventResult -> { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 289f1628..cd0dcd13 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -52,35 +52,36 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp // 未注册的设备不做处理 return; } + // 回复200 OK try { - // 判断RPort是否改变,改变则说明路由nat信息变化,修改设备信息 - // 获取到通信地址等信息 - ViaHeader viaHeader = (ViaHeader) evt.getRequest().getHeader(ViaHeader.NAME); - String received = viaHeader.getReceived(); - int rPort = viaHeader.getRPort(); - // 解析本地地址替代 - if (ObjectUtils.isEmpty(received) || rPort == -1) { - received = viaHeader.getHost(); - rPort = viaHeader.getPort(); - } - if (device.getPort() != rPort) { - device.setPort(rPort); - device.setHostAddress(received.concat(":").concat(String.valueOf(rPort))); - } - device.setKeepaliveTime(DateUtil.getNow()); - // 回复200 OK responseAck(getServerTransaction(evt), Response.OK); - if (device.getOnline() == 1) { - deviceService.updateDevice(device); - }else { - // 对于已经离线的设备判断他的注册是否已经过期 - if (!deviceService.expire(device)){ - deviceService.online(device); - } - } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 心跳回复: {}", e.getMessage()); } + // 判断RPort是否改变,改变则说明路由nat信息变化,修改设备信息 + // 获取到通信地址等信息 + ViaHeader viaHeader = (ViaHeader) evt.getRequest().getHeader(ViaHeader.NAME); + String received = viaHeader.getReceived(); + int rPort = viaHeader.getRPort(); + // 解析本地地址替代 + if (ObjectUtils.isEmpty(received) || rPort == -1) { + received = viaHeader.getHost(); + rPort = viaHeader.getPort(); + } + if (device.getPort() != rPort) { + device.setPort(rPort); + device.setHostAddress(received.concat(":").concat(String.valueOf(rPort))); + } + device.setKeepaliveTime(DateUtil.getNow()); + + if (device.getOnline() == 1) { + deviceService.updateDevice(device); + }else { + // 对于已经离线的设备判断他的注册是否已经过期 + if (!deviceService.expire(device)){ + deviceService.online(device); + } + } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index 652cd830..82cd5528 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -81,8 +81,12 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen try { Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset()); if (rootElementAfterCharset == null) { - logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest()); - responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST); + try { + logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest()); + responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 移动设备位置数据通知 内容为空: {}", e.getMessage()); + } continue; } MobilePosition mobilePosition = new MobilePosition(); @@ -133,7 +137,11 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen } storager.updateChannelPosition(deviceChannel); //回复 200 OK - responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK); + try { + responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 移动设备位置数据回复200: {}", e.getMessage()); + } // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); @@ -147,7 +155,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + } catch (DocumentException e) { e.printStackTrace(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java index 82f4a257..9e79f3f0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java @@ -67,33 +67,37 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem try { // 回复200 OK responseAck(getServerTransaction(evt), Response.OK); - Element snElement = rootElement.element("SN"); - String sn = snElement.getText(); - // 准备回复通道信息 - List deviceChannelInPlatforms = storager.queryChannelWithCatalog(parentPlatform.getServerGBId()); - // 查询关联的直播通道 - List gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId()); - // 回复目录信息 - List catalogs = storager.queryCatalogInPlatform(parentPlatform.getServerGBId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 目录查询回复200OK: {}", e.getMessage()); + } + Element snElement = rootElement.element("SN"); + String sn = snElement.getText(); + // 准备回复通道信息 + List deviceChannelInPlatforms = storager.queryChannelWithCatalog(parentPlatform.getServerGBId()); + // 查询关联的直播通道 + List gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId()); + // 回复目录信息 + List catalogs = storager.queryCatalogInPlatform(parentPlatform.getServerGBId()); - List allChannels = new ArrayList<>(); + List allChannels = new ArrayList<>(); - // 回复平台 + // 回复平台 // DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform); // allChannels.add(deviceChannel); - // 回复目录 - if (catalogs.size() > 0) { - allChannels.addAll(catalogs); - } - // 回复级联的通道 - if (deviceChannelInPlatforms.size() > 0) { - allChannels.addAll(deviceChannelInPlatforms); - } - // 回复直播的通道 - if (gbStreams.size() > 0) { - allChannels.addAll(gbStreams); - } + // 回复目录 + if (catalogs.size() > 0) { + allChannels.addAll(catalogs); + } + // 回复级联的通道 + if (deviceChannelInPlatforms.size() > 0) { + allChannels.addAll(deviceChannelInPlatforms); + } + // 回复直播的通道 + if (gbStreams.size() > 0) { + allChannels.addAll(gbStreams); + } + try { if (allChannels.size() > 0) { cmderFroPlatform.catalogQuery(allChannels, parentPlatform, sn, fromHeader.getTag()); }else { @@ -101,9 +105,11 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), 0); } } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 目录查询: {}", e.getMessage()); + logger.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage()); } + + } private DeviceChannel getChannelForPlatform(ParentPlatform platform) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java index 1c4aa8a8..175b89bb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java @@ -53,19 +53,20 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar try { // 回复200 OK responseAck(getServerTransaction(evt), Response.OK); - // 此处是对本平台发出DeviceControl指令的应答 - JSONObject json = new JSONObject(); - XmlUtil.node2Json(element, json); - if (logger.isDebugEnabled()) { - logger.debug(json.toJSONString()); - } - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(json); - deferredResultHolder.invokeAllResult(msg); } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 设备配置查询: {}", e.getMessage()); + logger.error("[命令发送失败] 设备配置查询: {}", e.getMessage()); } + // 此处是对本平台发出DeviceControl指令的应答 + JSONObject json = new JSONObject(); + XmlUtil.node2Json(element, json); + if (logger.isDebugEnabled()) { + logger.debug(json.toJSONString()); + } + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(json); + deferredResultHolder.invokeAllResult(msg); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceControlResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceControlResponseMessageHandler.java index 12c84686..d068aeff 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceControlResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceControlResponseMessageHandler.java @@ -47,20 +47,21 @@ public class DeviceControlResponseMessageHandler extends SIPRequestProcessorPare // 此处是对本平台发出DeviceControl指令的应答 try { responseAck(getServerTransaction(evt), Response.OK); - JSONObject json = new JSONObject(); - String channelId = getText(element, "DeviceID"); - XmlUtil.node2Json(element, json); - if (logger.isDebugEnabled()) { - logger.debug(json.toJSONString()); - } - RequestMessage msg = new RequestMessage(); - String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + device.getDeviceId() + channelId; - msg.setKey(key); - msg.setData(json); - deferredResultHolder.invokeAllResult(msg); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 设备控制: {}", e.getMessage()); } + JSONObject json = new JSONObject(); + String channelId = getText(element, "DeviceID"); + XmlUtil.node2Json(element, json); + if (logger.isDebugEnabled()) { + logger.debug(json.toJSONString()); + } + RequestMessage msg = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + device.getDeviceId() + channelId; + msg.setKey(key); + msg.setData(json); + deferredResultHolder.invokeAllResult(msg); + } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java index a0109edd..1544289b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java @@ -78,9 +78,14 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent ServerTransaction serverTransaction = getServerTransaction(evt); try { rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { + + if (rootElement == null) { logger.warn("[ 接收到DeviceInfo应答消息 ] content cannot be null, {}", evt.getRequest()); - responseAck(serverTransaction, Response.BAD_REQUEST); + try { + responseAck(serverTransaction, Response.BAD_REQUEST); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] DeviceInfo应答消息 BAD_REQUEST: {}", e.getMessage()); + } return; } Element deviceIdElement = rootElement.element("DeviceID"); @@ -100,17 +105,16 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent msg.setKey(key); msg.setData(device); deferredResultHolder.invokeAllResult(msg); + } catch (DocumentException e) { + throw new RuntimeException(e); + } + try { // 回复200 OK responseAck(serverTransaction, Response.OK); - } catch (DocumentException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } catch (SipException e) { - e.printStackTrace(); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] DeviceInfo应答消息 200: {}", e.getMessage()); } + } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java index 158f5b71..5e6c39da 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java @@ -71,7 +71,11 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar rootElement = getRootElement(evt, device.getCharset()); if (rootElement == null) { logger.warn("[ 移动设备位置数据查询回复 ] content cannot be null, {}", evt.getRequest()); - responseAck(serverTransaction, Response.BAD_REQUEST); + try { + responseAck(serverTransaction, Response.BAD_REQUEST); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 移动设备位置数据查询 BAD_REQUEST: {}", e.getMessage()); + } return; } MobilePosition mobilePosition = new MobilePosition(); @@ -133,8 +137,13 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); //回复 200 OK - responseAck(serverTransaction, Response.OK); - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + try { + responseAck(serverTransaction, Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage()); + } + + } catch (DocumentException e) { e.printStackTrace(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java index 366c7add..92091833 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java @@ -58,7 +58,11 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent if (rootElement == null) { logger.warn("[ 设备预置位查询应答 ] content cannot be null, {}", evt.getRequest()); - responseAck(serverTransaction, Response.BAD_REQUEST); + try { + responseAck(serverTransaction, Response.BAD_REQUEST); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage()); + } return; } Element presetListNumElement = rootElement.element("PresetList"); @@ -67,7 +71,11 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent String deviceId = getText(rootElement, "DeviceID"); String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId; if (snElement == null || presetListNumElement == null) { - responseAck(serverTransaction, Response.BAD_REQUEST, "xml error"); + try { + responseAck(serverTransaction, Response.BAD_REQUEST, "xml error"); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage()); + } return; } int sumNum = Integer.parseInt(presetListNumElement.attributeValue("Num")); @@ -94,11 +102,13 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent requestMessage.setKey(key); requestMessage.setData(presetQuerySipReqList); deferredResultHolder.invokeAllResult(requestMessage); - responseAck(serverTransaction, Response.OK); + try { + responseAck(serverTransaction, Response.OK); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage()); + } } catch (DocumentException e) { logger.error("[解析xml]失败: ", e); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 50a4ed8d..3c755367 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -69,95 +69,91 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - - // 回复200 OK try { + // 回复200 OK responseAck(getServerTransaction(evt), Response.OK); - taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; - taskExecutor.execute(()->{ - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset()); - if (rootElement == null) { - logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest()); - continue; - } - String sn = getText(rootElementForCharset, "SN"); - String channelId = getText(rootElementForCharset, "DeviceID"); - RecordInfo recordInfo = new RecordInfo(); - recordInfo.setChannelId(channelId); - recordInfo.setDeviceId(take.getDevice().getDeviceId()); - recordInfo.setSn(sn); - recordInfo.setName(getText(rootElementForCharset, "Name")); - String sumNumStr = getText(rootElementForCharset, "SumNum"); - int sumNum = 0; - if (!ObjectUtils.isEmpty(sumNumStr)) { - sumNum = Integer.parseInt(sumNumStr); - } - recordInfo.setSumNum(sumNum); - Element recordListElement = rootElementForCharset.element("RecordList"); - if (recordListElement == null || sumNum == 0) { - logger.info("无录像数据"); - eventPublisher.recordEndEventPush(recordInfo); - recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>()); - releaseRequest(take.getDevice().getDeviceId(), sn); - } else { - Iterator recordListIterator = recordListElement.elementIterator(); - if (recordListIterator != null) { - List recordList = new ArrayList<>(); - // 遍历DeviceList - while (recordListIterator.hasNext()) { - Element itemRecord = recordListIterator.next(); - Element recordElement = itemRecord.element("DeviceID"); - if (recordElement == null) { - logger.info("记录为空,下一个..."); - continue; - } - RecordItem record = new RecordItem(); - record.setDeviceId(getText(itemRecord, "DeviceID")); - record.setName(getText(itemRecord, "Name")); - record.setFilePath(getText(itemRecord, "FilePath")); - record.setFileSize(getText(itemRecord, "FileSize")); - record.setAddress(getText(itemRecord, "Address")); - - String startTimeStr = getText(itemRecord, "StartTime"); - record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); - - String endTimeStr = getText(itemRecord, "EndTime"); - record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); - - record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 - : Integer.parseInt(getText(itemRecord, "Secrecy"))); - record.setType(getText(itemRecord, "Type")); - record.setRecorderId(getText(itemRecord, "RecorderID")); - recordList.add(record); - } - recordInfo.setRecordList(recordList); - // 发送消息,如果是上级查询此录像,则会通过这里通知给上级 - eventPublisher.recordEndEventPush(recordInfo); - int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList); - logger.info("[国标录像], {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum); - } - - if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){ - releaseRequest(take.getDevice().getDeviceId(), sn); - } - } - } catch (DocumentException e) { - logger.error("xml解析异常: ", e); - } - } - taskQueueHandlerRun = false; - }); - } - - } catch (SipException | InvalidArgumentException | ParseException e) { + }catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage()); - } finally { - taskQueueHandlerRun = false; + } + taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(()->{ + while (!taskQueue.isEmpty()) { + try { + HandlerCatchData take = taskQueue.poll(); + Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset()); + if (rootElement == null) { + logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest()); + continue; + } + String sn = getText(rootElementForCharset, "SN"); + String channelId = getText(rootElementForCharset, "DeviceID"); + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setChannelId(channelId); + recordInfo.setDeviceId(take.getDevice().getDeviceId()); + recordInfo.setSn(sn); + recordInfo.setName(getText(rootElementForCharset, "Name")); + String sumNumStr = getText(rootElementForCharset, "SumNum"); + int sumNum = 0; + if (!ObjectUtils.isEmpty(sumNumStr)) { + sumNum = Integer.parseInt(sumNumStr); + } + recordInfo.setSumNum(sumNum); + Element recordListElement = rootElementForCharset.element("RecordList"); + if (recordListElement == null || sumNum == 0) { + logger.info("无录像数据"); + eventPublisher.recordEndEventPush(recordInfo); + recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>()); + releaseRequest(take.getDevice().getDeviceId(), sn); + } else { + Iterator recordListIterator = recordListElement.elementIterator(); + if (recordListIterator != null) { + List recordList = new ArrayList<>(); + // 遍历DeviceList + while (recordListIterator.hasNext()) { + Element itemRecord = recordListIterator.next(); + Element recordElement = itemRecord.element("DeviceID"); + if (recordElement == null) { + logger.info("记录为空,下一个..."); + continue; + } + RecordItem record = new RecordItem(); + record.setDeviceId(getText(itemRecord, "DeviceID")); + record.setName(getText(itemRecord, "Name")); + record.setFilePath(getText(itemRecord, "FilePath")); + record.setFileSize(getText(itemRecord, "FileSize")); + record.setAddress(getText(itemRecord, "Address")); + + String startTimeStr = getText(itemRecord, "StartTime"); + record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); + + String endTimeStr = getText(itemRecord, "EndTime"); + record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); + + record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 + : Integer.parseInt(getText(itemRecord, "Secrecy"))); + record.setType(getText(itemRecord, "Type")); + record.setRecorderId(getText(itemRecord, "RecorderID")); + recordList.add(record); + } + recordInfo.setRecordList(recordList); + // 发送消息,如果是上级查询此录像,则会通过这里通知给上级 + eventPublisher.recordEndEventPush(recordInfo); + int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList); + logger.info("[国标录像], {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum); + } + + if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){ + releaseRequest(take.getDevice().getDeviceId(), sn); + } + } + } catch (DocumentException e) { + logger.error("xml解析异常: ", e); + } + } + taskQueueHandlerRun = false; + }); } }