From efc4a7bc8eb8a967198d70ff4d88670e71541164 Mon Sep 17 00:00:00 2001 From: 648540858 <456panlinlin> Date: Wed, 13 Apr 2022 17:59:21 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BA=A7=E8=81=94?= =?UTF-8?q?=E7=A7=BB=E5=8A=A8=E4=BD=8D=E7=BD=AE=E8=AE=A2=E9=98=85=E4=BD=8D?= =?UTF-8?q?=E7=BD=AE=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/DynamicTask.java | 5 +- .../event/online/OnlineEventListener.java | 3 + .../iot/vmp/gb28181/task/ISubscribeTask.java | 4 ++ .../task/impl/CatalogSubscribeTask.java | 8 +++ .../MobilePositionSubscribeHandlerTask.java | 65 +++++++++++-------- .../impl/MobilePositionSubscribeTask.java | 7 ++ .../transmit/cmd/impl/SIPCommander.java | 27 +++++--- .../cmd/impl/SIPCommanderFroPlatform.java | 2 +- .../request/impl/NotifyRequestProcessor.java | 4 +- .../impl/SubscribeRequestProcessor.java | 23 +++++-- .../vmanager/gb28181/device/DeviceQuery.java | 38 ++++++++++- 11 files changed, 137 insertions(+), 49 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index 0c57bdef..b8423829 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -39,7 +39,7 @@ public class DynamicTask { public void startCron(String key, Runnable task, int cycleForCatalog) { stop(key); // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 - ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L); + ScheduledFuture future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L); futureMap.put(key, future); runnableMap.put(key, task); } @@ -78,4 +78,7 @@ public class DynamicTask { return futureMap.keySet(); } + public Runnable get(String key) { + return runnableMap.get(key); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java index fc9eb274..e454d490 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java @@ -99,7 +99,10 @@ public class OnlineEventListener implements ApplicationListener { storager.updateDevice(device); // 上线添加订阅 if (device.getSubscribeCycleForCatalog() > 0) { + // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅 deviceService.addCatalogSubscribe(device); + } + if (device.getSubscribeCycleForMobilePosition() > 0) { deviceService.addMobilePositionSubscribe(device); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java index 4c6a18ac..b6ec4519 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java @@ -1,5 +1,9 @@ package com.genersoft.iot.vmp.gb28181.task; +import javax.sip.DialogState; + public interface ISubscribeTask extends Runnable{ void stop(); + + DialogState getDialogState(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java index 51356d55..433eb3b1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; import javax.sip.Dialog; import javax.sip.DialogState; @@ -45,6 +46,7 @@ public class CatalogSubscribeTask implements ISubscribeTask { }); } + @Async @Override public void stop() { /** @@ -72,4 +74,10 @@ public class CatalogSubscribeTask implements ISubscribeTask { }); } } + + @Override + public DialogState getDialogState() { + if (dialog == null) return null; + return dialog.getState(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index fcac3e9d..569a9b7e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -1,16 +1,16 @@ package com.genersoft.iot.vmp.gb28181.task.impl; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; -import java.text.SimpleDateFormat; +import javax.sip.DialogState; import java.util.List; /** @@ -18,6 +18,8 @@ import java.util.List; */ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { + private Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeHandlerTask.class); + private IRedisCatchStorage redisCatchStorage; private IVideoManagerStorage storager; private ISIPCommanderForPlatform sipCommanderForPlatform; @@ -26,8 +28,6 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { private String sn; private String key; - private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) { this.redisCatchStorage = redisCatchStorage; this.storager = storager; @@ -38,40 +38,51 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { this.subscribeHolder = subscribeInfo; } + @Async @Override public void run() { + logger.info("执行MobilePositionSubscribeHandlerTask"); SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId); - if (subscribe != null) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); - if (parentPlatform == null || parentPlatform.isStatus()) { - // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 - List gbStreams = storager.queryGbStreamListInPlatform(platformId); - if (gbStreams.size() > 0) { - for (GbStream gbStream : gbStreams) { - String gbId = gbStream.getGbId(); - GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); - if (gpsMsgInfo != null) { - // 发送GPS消息 - sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe); - }else { - // 没有在redis找到新的消息就使用数据库的消息 - gpsMsgInfo = new GPSMsgInfo(); - gpsMsgInfo.setId(gbId); - gpsMsgInfo.setLat(gbStream.getLongitude()); - gpsMsgInfo.setLng(gbStream.getLongitude()); - // 发送GPS消息 - sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe); - } + if (parentPlatform == null ) { + logger.info("发送订阅时未找到平台信息:{}", platformId); + return; + } + if (!parentPlatform.isStatus()) { + logger.info("发送订阅时发现平台已经离线:{}", platformId); + return; + } + // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 + List gbStreams = storager.queryGbStreamListInPlatform(platformId); + if (gbStreams.size() == 0) { + logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platformId); + return; + } + for (GbStream gbStream : gbStreams) { + String gbId = gbStream.getGbId(); + GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); + if (gpsMsgInfo != null) { // 无最新位置不发送 + // 经纬度都为0不发送 + if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) { + continue; } + // 发送GPS消息 + sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe); } } } + logger.info("结束执行MobilePositionSubscribeHandlerTask"); } @Override public void stop() { } + + @Override + public DialogState getDialogState() { + return null; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java index e86c601c..7203ee2d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; import javax.sip.Dialog; import javax.sip.DialogState; @@ -25,6 +26,7 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { this.sipCommander = sipCommander; } + @Async @Override public void run() { sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> { @@ -74,4 +76,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { }); } } + @Override + public DialogState getDialogState() { + if (dialog == null) return null; + return dialog.getState(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 123d0e78..027238b7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1566,17 +1566,28 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("" + device.getDeviceId() + "\r\n"); cmdXml.append("\r\n"); - String tm = Long.toString(System.currentTimeMillis()); - CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() - : udpSipProvider.getNewCallId(); + Request request; + if (dialog != null) { + logger.info("发送目录订阅消息时 dialog的状态为: {}", dialog.getState()); + request = dialog.createRequest(Request.SUBSCRIBE); + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); + request.setContent(cmdXml.toString(), contentTypeHeader); + ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition()); + request.addHeader(expireHeader); + }else { + String tm = Long.toString(System.currentTimeMillis()); - // 有效时间默认为60秒以上 - Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, - "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , - callIdHeader); + CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() + : udpSipProvider.getNewCallId(); + + // 有效时间默认为60秒以上 + request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, + "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , + callIdHeader); + + } transmitRequest(device, request, errorEvent, okEvent); - return true; } catch ( NumberFormatException | ParseException | InvalidArgumentException | SipException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 0dc11e01..7768ed42 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -405,7 +405,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); callIdHeader.setCallId(subscribeInfo.getCallId()); - logger.info("[发送Notify-MobilePosition] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); + logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> { logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); }, null); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 8c87be99..8e03510e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -146,7 +146,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } else { mobilePosition.setAltitude(0.0); } - logger.info("[收到Notify-MobilePosition]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), + logger.info("[收到 移动位置订阅]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); BaiduPoint bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); @@ -281,7 +281,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements Element eventElement = itemDevice.element("Event"); DeviceChannel channel = XmlUtil.channelContentHander(itemDevice); channel.setDeviceId(device.getDeviceId()); - logger.info("[收到Notify-Catalog]:{}/{}", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); switch (eventElement.getText().toUpperCase()) { case CatalogEvent.ON: // 上线 logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index 5ae80530..979849ed 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -150,7 +150,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme } String sn = XmlUtil.getText(rootElement, "SN"); String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_MobilePosition_" + platformId; - logger.info("[notify-MobilePosition]: {}", platformId); + logger.info("[回复 移动位置订阅]: {}", platformId); StringBuilder resultXml = new StringBuilder(200); resultXml.append("\r\n") .append("\r\n") @@ -161,12 +161,21 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme .append("\r\n"); if (subscribeInfo.getExpires() > 0) { - if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) { - dynamicTask.stop(key); + + if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) { + String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 + subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); + dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); + }else { + if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null + && subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null + && !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) { + dynamicTask.stop(key); + String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 + subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); + dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); + } } - String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 - dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval) -1 ); - subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); }else if (subscribeInfo.getExpires() == 0) { dynamicTask.stop(key); subscribeHolder.removeMobilePositionSubscribe(platformId); @@ -203,7 +212,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme } String sn = XmlUtil.getText(rootElement, "SN"); String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + platformId; - logger.info("[notify-Catalog]: {}", platformId); + logger.info("[回复 目录订阅]: {}/{}", platformId, deviceID); StringBuilder resultXml = new StringBuilder(200); resultXml.append("\r\n") .append("\r\n") diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index 1b6d31eb..12136f47 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -4,8 +4,13 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; +import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; +import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; +import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask; +import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -29,9 +34,8 @@ import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; -import java.util.List; -import java.util.Set; -import java.util.UUID; +import javax.sip.DialogState; +import java.util.*; @Api(tags = "国标设备查询", value = "国标设备查询") @SuppressWarnings("rawtypes") @@ -63,6 +67,9 @@ public class DeviceQuery { @Autowired private DynamicTask dynamicTask; + @Autowired + private SubscribeHolder subscribeHolder; + /** * 使用ID查询国标设备 * @param deviceId 国标ID @@ -469,4 +476,29 @@ public class DeviceQuery { } return wvpResult; } + + @GetMapping("/{deviceId}/subscribe_info") + @ApiOperation(value = "获取设备的订阅状态", notes = "获取设备的订阅状态") + public WVPResult> getSubscribeInfo(@PathVariable String deviceId) { + Set allKeys = dynamicTask.getAllKeys(); + Map dialogStateMap = new HashMap<>(); + for (String key : allKeys) { + if (key.startsWith(deviceId)) { + ISubscribeTask subscribeTask = (ISubscribeTask)dynamicTask.get(key); + DialogState dialogState = subscribeTask.getDialogState(); + if (dialogState == null) { + continue; + } + if (subscribeTask instanceof CatalogSubscribeTask) { + dialogStateMap.put("catalog", dialogState.toString()); + }else if (subscribeTask instanceof MobilePositionSubscribeTask) { + dialogStateMap.put("mobilePosition", dialogState.toString()); + } + } + } + WVPResult> wvpResult = new WVPResult<>(); + wvpResult.setCode(0); + wvpResult.setData(dialogStateMap); + return wvpResult; + } } From e6ee7fe7477b485676ce506b4e971c9a50dfa588 Mon Sep 17 00:00:00 2001 From: 648540858 <456panlinlin> Date: Thu, 14 Apr 2022 16:52:48 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BA=A7=E8=81=94?= =?UTF-8?q?=E7=A7=BB=E5=8A=A8=E4=BD=8D=E7=BD=AE=E8=AE=A2=E9=98=85=E4=BD=8D?= =?UTF-8?q?=E7=BD=AE=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/DynamicTask.java | 55 ++++++++++++++++--- .../iot/vmp/gb28181/bean/SubscribeHolder.java | 51 ++++++++++++++++- .../iot/vmp/gb28181/bean/SubscribeInfo.java | 24 ++++++++ .../SubscribeListenerForPlatform.java | 50 ----------------- .../subscribe/catalog/CatalogEventLister.java | 2 - .../task/impl/CatalogSubscribeTask.java | 1 - .../MobilePositionSubscribeHandlerTask.java | 30 +++++----- .../impl/MobilePositionSubscribeTask.java | 26 +++++++-- .../cmd/impl/SIPCommanderFroPlatform.java | 6 +- .../impl/SubscribeRequestProcessor.java | 32 +++++------ .../cmd/CatalogResponseMessageHandler.java | 3 - .../vmp/service/impl/DeviceServiceImpl.java | 26 ++++----- .../vmp/vmanager/server/ServerController.java | 42 +++++++++++++- 13 files changed, 227 insertions(+), 121 deletions(-) delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index b8423829..bd10317c 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -1,6 +1,9 @@ package com.genersoft.iot.vmp.conf; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -18,6 +21,8 @@ import java.util.concurrent.ScheduledFuture; @Component public class DynamicTask { + private Logger logger = LoggerFactory.getLogger(DynamicTask.class); + @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @@ -26,7 +31,12 @@ public class DynamicTask { @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { - return new ThreadPoolTaskScheduler(); + ThreadPoolTaskScheduler schedulerPool = new ThreadPoolTaskScheduler(); + schedulerPool.setPoolSize(300); + schedulerPool.setWaitForTasksToCompleteOnShutdown(true); + schedulerPool.setAwaitTerminationSeconds(10); + return schedulerPool; + } /** @@ -37,11 +47,24 @@ public class DynamicTask { * @return */ public void startCron(String key, Runnable task, int cycleForCatalog) { - stop(key); + ScheduledFuture future = futureMap.get(key); + if (future != null) { + if (future.isCancelled()) { + logger.info("任务【{}】已存在但是关闭状态!!!", key); + } else { + logger.info("任务【{}】已存在且已启动!!!", key); + return; + } + } // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 - ScheduledFuture future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L); - futureMap.put(key, future); - runnableMap.put(key, task); + future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L); + if (future != null){ + futureMap.put(key, future); + runnableMap.put(key, task); + logger.info("任务【{}】启动成功!!!", key); + }else { + logger.info("任务【{}】启动失败!!!", key); + } } /** @@ -53,13 +76,31 @@ public class DynamicTask { */ public void startDelay(String key, Runnable task, int delay) { stop(key); + System.out.println("定时任务开始了"); Date starTime = new Date(System.currentTimeMillis() + delay); + + ScheduledFuture future = futureMap.get(key); + if (future != null) { + if (future.isCancelled()) { + logger.info("任务【{}】已存在但是关闭状态!!!", key); + } else { + logger.info("任务【{}】已存在且已启动!!!", key); + return; + } + } // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 - ScheduledFuture future = threadPoolTaskScheduler.schedule(task, starTime); - futureMap.put(key, future); + future = threadPoolTaskScheduler.schedule(task, starTime); + if (future != null){ + futureMap.put(key, future); + runnableMap.put(key, task); + logger.info("任务【{}】启动成功!!!", key); + }else { + logger.info("任务【{}】启动失败!!!", key); + } } public void stop(String key) { + System.out.println("定时任务结束了"); if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { futureMap.get(key).cancel(true); Runnable runnable = runnableMap.get(key); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 62a45d5a..981fe1ec 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -1,5 +1,12 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.ArrayList; @@ -9,12 +16,32 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class SubscribeHolder { + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private ISIPCommanderForPlatform sipCommanderForPlatform; + + @Autowired + private IVideoManagerStorage storager; + + private final String taskOverduePrefix = "subscribe_overdue_"; + private static ConcurrentHashMap catalogMap = new ConcurrentHashMap<>(); private static ConcurrentHashMap mobilePositionMap = new ConcurrentHashMap<>(); public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) { catalogMap.put(platformId, subscribeInfo); + // 添加订阅到期 + String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; + dynamicTask.stop(taskOverdueKey); + // 添加任务处理订阅过期 + dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()), + subscribeInfo.getExpires() * 1000); } public SubscribeInfo getCatalogSubscribe(String platformId) { @@ -23,10 +50,24 @@ public class SubscribeHolder { public void removeCatalogSubscribe(String platformId) { catalogMap.remove(platformId); + String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; + // 添加任务处理订阅过期 + dynamicTask.stop(taskOverdueKey); } public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) { mobilePositionMap.put(platformId, subscribeInfo); + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + "MobilePosition_" + platformId; + // 添加任务处理GPS定时推送 + dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, subscribeInfo.getSn(), key, this), subscribeInfo.getGpsInterval()); + String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; + dynamicTask.stop(taskOverdueKey); + // 添加任务处理订阅过期 + dynamicTask.startDelay(taskOverdueKey, () -> { + System.out.println("订阅过期"); + removeMobilePositionSubscribe(subscribeInfo.getId()); + }, + subscribeInfo.getExpires() * 1000); } public SubscribeInfo getMobilePositionSubscribe(String platformId) { @@ -35,6 +76,12 @@ public class SubscribeHolder { public void removeMobilePositionSubscribe(String platformId) { mobilePositionMap.remove(platformId); + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + "MobilePosition_" + platformId; + // 结束任务处理GPS定时推送 + dynamicTask.stop(key); + String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; + // 添加任务处理订阅过期 + dynamicTask.stop(taskOverdueKey); } public List getAllCatalogSubscribePlatform() { @@ -48,7 +95,7 @@ public class SubscribeHolder { } public void removeAllSubscribe(String platformId) { - mobilePositionMap.remove(platformId); - catalogMap.remove(platformId); + removeMobilePositionSubscribe(platformId); + removeCatalogSubscribe(platformId); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index 434a639a..feb6a724 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -33,6 +33,14 @@ public class SubscribeInfo { private ServerTransaction transaction; private Dialog dialog; + /** + * 以下为可选字段 + * @return + */ + private String sn; + private int gpsInterval; + + public String getId() { return id; } @@ -88,4 +96,20 @@ public class SubscribeInfo { public void setDialog(Dialog dialog) { this.dialog = dialog; } + + public String getSn() { + return sn; + } + + public void setSn(String sn) { + this.sn = sn; + } + + public int getGpsInterval() { + return gpsInterval; + } + + public void setGpsInterval(int gpsInterval) { + this.gpsInterval = gpsInterval; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java deleted file mode 100644 index 898e51d6..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.subscribe; - -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; -import com.genersoft.iot.vmp.conf.UserSetting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.stereotype.Component; - -/** - * 平台订阅到期事件 - */ -@Component -public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener { - - private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class); - - @Autowired - private UserSetting userSetting; - - @Autowired - private DynamicTask dynamicTask; - - public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) { - super(listenerContainer, userSetting); - } - - - /** - * 监听失效的key - * @param message - * @param pattern - */ - @Override - public void onMessage(Message message, byte[] pattern) { - // 获取失效的key - String expiredKey = message.toString(); - logger.debug(expiredKey); - // 订阅到期 - String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_"; - if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { - // 取消定时任务 - dynamicTask.stop(expiredKey); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index f9546f0c..f9ef10cd 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -61,8 +61,6 @@ public class CatalogEventLister implements ApplicationListener { if (event.getPlatformId() != null) { parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId()); if (parentPlatform != null && !parentPlatform.isStatus())return; - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + event.getPlatformId(); -// subscribe = redisCatchStorage.getSubscribe(key); subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId()); if (subscribe == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java index 433eb3b1..bee5fba6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java @@ -46,7 +46,6 @@ public class CatalogSubscribeTask implements ISubscribeTask { }); } - @Async @Override public void stop() { /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index 569a9b7e..f20baf90 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -24,52 +24,50 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { private IVideoManagerStorage storager; private ISIPCommanderForPlatform sipCommanderForPlatform; private SubscribeHolder subscribeHolder; - private String platformId; + private ParentPlatform platform; private String sn; private String key; public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) { + System.out.println("MobilePositionSubscribeHandlerTask 初始化"); this.redisCatchStorage = redisCatchStorage; this.storager = storager; - this.platformId = platformId; + this.platform = storager.queryParentPlatByServerGBId(platformId); this.sn = sn; this.key = key; this.sipCommanderForPlatform = sipCommanderForPlatform; this.subscribeHolder = subscribeInfo; } - @Async @Override public void run() { logger.info("执行MobilePositionSubscribeHandlerTask"); - SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId); + if (platform == null) return; + SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()); if (subscribe != null) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); - if (parentPlatform == null ) { - logger.info("发送订阅时未找到平台信息:{}", platformId); - return; - } - if (!parentPlatform.isStatus()) { - logger.info("发送订阅时发现平台已经离线:{}", platformId); - return; - } + +// if (!parentPlatform.isStatus()) { +// logger.info("发送订阅时发现平台已经离线:{}", platformId); +// return; +// } // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 - List gbStreams = storager.queryGbStreamListInPlatform(platformId); + List gbStreams = storager.queryGbStreamListInPlatform(platform.getServerGBId()); if (gbStreams.size() == 0) { - logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platformId); + logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platform.getServerGBId()); return; } for (GbStream gbStream : gbStreams) { String gbId = gbStream.getGbId(); GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); if (gpsMsgInfo != null) { // 无最新位置不发送 + logger.info("无最新位置不发送"); // 经纬度都为0不发送 if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) { continue; } // 发送GPS消息 - sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe); + sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java index 7203ee2d..884f0401 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java @@ -11,6 +11,8 @@ import org.springframework.scheduling.annotation.Async; import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.ResponseEvent; +import java.util.Timer; +import java.util.TimerTask; /** * 移动位置订阅的定时更新 @@ -21,18 +23,23 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { private ISIPCommander sipCommander; private Dialog dialog; + private Timer timer ; + public MobilePositionSubscribeTask(Device device, ISIPCommander sipCommander) { this.device = device; this.sipCommander = sipCommander; } - @Async @Override public void run() { + if (timer != null ) { + timer.cancel(); + timer = null; + } sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> { - if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) { - dialog = eventResult.dialog; - } +// if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) { +// dialog = eventResult.dialog; +// } ResponseEvent event = (ResponseEvent) eventResult.event; if (event.getResponse().getRawContent() != null) { // 成功 @@ -45,6 +52,13 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { dialog = null; // 失败 logger.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); + timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + MobilePositionSubscribeTask.this.run(); + } + }, 2000); }); } @@ -58,6 +72,10 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { * COMPLETED-> Completed Dialog状态-已完成 * TERMINATED-> Terminated Dialog状态-终止 */ + if (timer != null ) { + timer.cancel(); + timer = null; + } if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) { logger.info("取消移动订阅时dialog状态为{}", dialog.getState()); device.setSubscribeCycleForMobilePosition(0); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 7768ed42..c5cdae00 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -385,7 +385,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { if (parentPlatform == null) { return false; } - + logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); try { String characterSet = parentPlatform.getCharacterSet(); StringBuffer deviceStatusXml = new StringBuffer(600); @@ -405,7 +405,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); callIdHeader.setCallId(subscribeInfo.getCallId()); - logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); + sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> { logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); }, null); @@ -459,7 +459,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { // 设置编码, 防止中文乱码 messageFactory.setDefaultContentEncodingCharset(characterSet); Dialog dialog = subscribeInfo.getDialog(); - if (dialog == null) return; + if (dialog == null || !dialog.getState().equals(DialogState.CONFIRMED)) return; SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); notifyRequest.setContent(catalogXmlContent, contentTypeHeader); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index 979849ed..da1088a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -149,7 +149,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme subscribeInfo.setDialog(dialog); } String sn = XmlUtil.getText(rootElement, "SN"); - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_MobilePosition_" + platformId; logger.info("[回复 移动位置订阅]: {}", platformId); StringBuilder resultXml = new StringBuilder(200); resultXml.append("\r\n") @@ -161,23 +160,25 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme .append("\r\n"); if (subscribeInfo.getExpires() > 0) { - - if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) { - String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 - subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); - dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); + String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 + if (interval == null) { + subscribeInfo.setGpsInterval(5); }else { - if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null - && subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null - && !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) { - dynamicTask.stop(key); - String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 - subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); - dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); - } + subscribeInfo.setGpsInterval(Integer.parseInt(interval)); } + + subscribeInfo.setSn(sn); + subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); +// if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) { +// subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); +// }else { +// if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null +// && subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null +// && !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) { +// subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); +// } +// } }else if (subscribeInfo.getExpires() == 0) { - dynamicTask.stop(key); subscribeHolder.removeMobilePositionSubscribe(platformId); } @@ -211,7 +212,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme subscribeInfo.setDialog(dialog); } String sn = XmlUtil.getText(rootElement, "SN"); - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + platformId; logger.info("[回复 目录订阅]: {}/{}", platformId, deviceID); StringBuilder resultXml = new StringBuilder(200); resultXml.append("\r\n") diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 1bafb59d..dbc25fc6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -145,9 +145,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } // 回复200 OK responseAck(evt, Response.OK); - if (offLineDetector.isOnline(device.getDeviceId())) { - publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); - } } } catch (DocumentException e) { e.printStackTrace(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 8cd2c77b..d3432865 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -14,6 +14,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.sip.DialogState; + /** * 设备业务(目录订阅) */ @@ -39,19 +41,17 @@ public class DeviceServiceImpl implements IDeviceService { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } - if (dynamicTask.contains(device.getDeviceId() + "catalog")) { - // 存在则停止现有的,开启新的 - dynamicTask.stop(device.getDeviceId() + "catalog"); + CatalogSubscribeTask task = (CatalogSubscribeTask)dynamicTask.get(device.getDeviceId() + "catalog"); + if (task != null && task.getDialogState() != null && task.getDialogState().equals(DialogState.CONFIRMED)) { // 已存在不需要再次添加 + return true; } logger.info("[添加目录订阅] 设备{}", device.getDeviceId()); // 添加目录订阅 CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander); - catalogSubscribeTask.run(); // 提前开始刷新订阅 - int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog(); + int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30); // 设置最小值为30 - subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30); - dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, subscribeCycleForCatalog); + dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, subscribeCycleForCatalog -1); return true; } @@ -70,18 +70,16 @@ public class DeviceServiceImpl implements IDeviceService { if (device == null || device.getSubscribeCycleForMobilePosition() < 0) { return false; } - if (dynamicTask.contains(device.getDeviceId() + "mobile_position")) { - // 存在则停止现有的,开启新的 - dynamicTask.stop(device.getDeviceId() + "mobile_position"); - } logger.info("[添加移动位置订阅] 设备{}", device.getDeviceId()); + MobilePositionSubscribeTask task = (MobilePositionSubscribeTask)dynamicTask.get(device.getDeviceId() + "mobile_position"); + if (task != null && task.getDialogState() != null && task.getDialogState().equals(DialogState.CONFIRMED)) { // 已存在不需要再次添加 + return true; + } // 添加目录订阅 MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander); - mobilePositionSubscribeTask.run(); // 提前开始刷新订阅 - int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog(); // 设置最小值为30 - subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30); + int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30); dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog -1 ); return true; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java index a0e7a737..faed2c82 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.VManageBootstrap; import com.genersoft.iot.vmp.common.VersionPo; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.VersionInfo; @@ -27,6 +28,7 @@ import javax.sip.ObjectInUseException; import javax.sip.SipProvider; import java.util.Iterator; import java.util.List; +import java.util.Set; @SuppressWarnings("rawtypes") @Api(tags = "服务控制") @@ -42,13 +44,16 @@ public class ServerController { private IMediaServerService mediaServerService; @Autowired - VersionInfo versionInfo; + private VersionInfo versionInfo; @Autowired - SipConfig sipConfig; + private SipConfig sipConfig; @Autowired - UserSetting userSetting; + private UserSetting userSetting; + + @Autowired + private DynamicTask dynamicTask; @Value("${server.port}") private int serverPort; @@ -248,4 +253,35 @@ public class ServerController { result.setData(jsonObject); return result; } + +// @ApiOperation("当前进行中的动态任务") +// @GetMapping(value = "/dynamicTask") +// @ResponseBody +// public WVPResult getDynamicTask(){ +// WVPResult result = new WVPResult<>(); +// result.setCode(0); +// result.setMsg("success"); +// +// JSONObject jsonObject = new JSONObject(); +// +// Set allKeys = dynamicTask.getAllKeys(); +// jsonObject.put("server.port", serverPort); +// if (StringUtils.isEmpty(type)) { +// jsonObject.put("sip", JSON.toJSON(sipConfig)); +// jsonObject.put("base", JSON.toJSON(userSetting)); +// }else { +// switch (type){ +// case "sip": +// jsonObject.put("sip", sipConfig); +// break; +// case "base": +// jsonObject.put("base", userSetting); +// break; +// default: +// break; +// } +// } +// result.setData(jsonObject); +// return result; +// } } From 25102229f6ac19d9376309e7917f0da7cd2c8d56 Mon Sep 17 00:00:00 2001 From: 648540858 <456panlinlin> Date: Fri, 15 Apr 2022 18:04:19 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E6=8E=A5=E6=94=B6=E6=89=80=E6=9C=89notify?= =?UTF-8?q?=E8=AF=B7=E6=B1=82=EF=BC=8C=E5=8D=B3=E4=BD=BF=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bootstrap.sh | 91 +++++++++++++++++++ .../genersoft/iot/vmp/conf/DynamicTask.java | 2 - .../genersoft/iot/vmp/gb28181/SipLayer.java | 1 + .../MobilePositionSubscribeHandlerTask.java | 2 - 4 files changed, 92 insertions(+), 4 deletions(-) create mode 100755 bootstrap.sh diff --git a/bootstrap.sh b/bootstrap.sh new file mode 100755 index 00000000..0f3c4c97 --- /dev/null +++ b/bootstrap.sh @@ -0,0 +1,91 @@ +#!/bin/bash + +###################################################### +# Copyright 2019 Pham Ngoc Hoai +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Repo: https://github.com/tyrion9/spring-boot-startup-script +# +######### PARAM ###################################### + +JAVA_OPT=-Xmx1024m +JARFILE=`ls -1r *.jar 2>/dev/null | head -n 1` +PID_FILE=pid.file +RUNNING=N +PWD=`pwd` + +######### DO NOT MODIFY ######## + +if [ -f $PID_FILE ]; then + PID=`cat $PID_FILE` + if [ ! -z "$PID" ] && kill -0 $PID 2>/dev/null; then + RUNNING=Y + fi +fi + +start() +{ + if [ $RUNNING == "Y" ]; then + echo "Application already started" + else + if [ -z "$JARFILE" ] + then + echo "ERROR: jar file not found" + else + nohup java $JAVA_OPT -Djava.security.egd=file:/dev/./urandom -jar $PWD/$JARFILE > nohup.out 2>&1 & + echo $! > $PID_FILE + echo "Application $JARFILE starting..." + tail -f nohup.out + fi + fi +} + +stop() +{ + if [ $RUNNING == "Y" ]; then + kill -9 $PID + rm -f $PID_FILE + echo "Application stopped" + else + echo "Application not running" + fi +} + +restart() +{ + stop + start +} + +case "$1" in + + 'start') + start + ;; + + 'stop') + stop + ;; + + 'restart') + restart + ;; + + *) + echo "Usage: $0 { start | stop | restart }" + exit 1 + ;; +esac +exit 0 + diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index bd10317c..052f5336 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -76,7 +76,6 @@ public class DynamicTask { */ public void startDelay(String key, Runnable task, int delay) { stop(key); - System.out.println("定时任务开始了"); Date starTime = new Date(System.currentTimeMillis() + delay); ScheduledFuture future = futureMap.get(key); @@ -100,7 +99,6 @@ public class DynamicTask { } public void stop(String key) { - System.out.println("定时任务结束了"); if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { futureMap.get(key).cancel(true); Runnable runnable = runnableMap.get(key); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index 1a5cce57..7b7291c1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -48,6 +48,7 @@ public class SipLayer{ properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP"); properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getMonitorIp()); properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true"); + properties.setProperty("gov.nist.javax.sip.DELIVER_UNSOLICITED_NOTIFY", "true"); // 接收所有notify请求,即使没有订阅 /** * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE = * 0; public static final int TRACE_MESSAGES = 16; public static final int diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index f20baf90..034f9de4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -42,7 +42,6 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { @Override public void run() { - logger.info("执行MobilePositionSubscribeHandlerTask"); if (platform == null) return; SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()); if (subscribe != null) { @@ -71,7 +70,6 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { } } } - logger.info("结束执行MobilePositionSubscribeHandlerTask"); } @Override From 0dc1807f621ce9077b58dff40ad4485c9a3d6c40 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sun, 17 Apr 2022 19:48:05 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=80=9A=E9=81=93?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=B7=BB=E5=8A=A0=E5=AF=B9SN=E7=9A=84?= =?UTF-8?q?=E5=88=A4=E6=96=AD=EF=BC=8C=E7=B2=BE=E7=AE=80=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/CatalogData.java | 10 +++ .../event/online/OnlineEventListener.java | 3 +- .../vmp/gb28181/session/CatalogDataCatch.java | 62 +++++++++++-------- .../gb28181/transmit/cmd/ISIPCommander.java | 2 +- .../transmit/cmd/impl/SIPCommander.java | 4 +- .../cmd/CatalogResponseMessageHandler.java | 49 +++++---------- .../iot/vmp/service/IDeviceService.java | 13 +--- .../vmp/service/impl/DeviceServiceImpl.java | 18 +++--- .../impl/VideoManagerStorageImpl.java | 13 ++-- .../vmanager/gb28181/device/DeviceQuery.java | 8 +-- .../components/dialog/SyncChannelProgress.vue | 45 +++++++++----- 11 files changed, 121 insertions(+), 106 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java index 40f676e5..338f8ad5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java @@ -4,6 +4,7 @@ import java.util.Date; import java.util.List; public class CatalogData { + private int sn; // 命令序列号 private int total; private List channelList; private Date lastTime; @@ -15,6 +16,15 @@ public class CatalogData { } private CatalogDataStatus status; + + public int getSn() { + return sn; + } + + public void setSn(int sn) { + this.sn = sn; + } + public int getTotal() { return total; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java index e454d490..b97457a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java @@ -54,6 +54,7 @@ public class OnlineEventListener implements ApplicationListener { @Autowired private SIPCommander cmder; + private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override @@ -76,7 +77,7 @@ public class OnlineEventListener implements ApplicationListener { if (deviceInStore == null) { //第一次上线 logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId()); cmder.deviceInfoQuery(device); - cmder.catalogQuery(device, null); + deviceService.sync(device); } break; // 设备主动发送心跳触发的在线事件 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java index fbc2a323..c3de8a29 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java @@ -26,28 +26,35 @@ public class CatalogDataCatch { @Autowired private IVideoManagerStorage storager; - public void addReady(String key) { - CatalogData catalogData = data.get(key); + public void addReady(Device device, int sn ) { + CatalogData catalogData = data.get(device.getDeviceId()); if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { catalogData = new CatalogData(); catalogData.setChannelList(new ArrayList<>()); + catalogData.setDevice(device); + catalogData.setSn(sn); catalogData.setStatus(CatalogData.CatalogDataStatus.ready); catalogData.setLastTime(new Date(System.currentTimeMillis())); - data.put(key, catalogData); + data.put(device.getDeviceId(), catalogData); } } - public void put(String key, int total, Device device, List deviceChannelList) { - CatalogData catalogData = data.get(key); + public void put(String deviceId, int sn, int total, Device device, List deviceChannelList) { + CatalogData catalogData = data.get(deviceId); if (catalogData == null) { catalogData = new CatalogData(); + catalogData.setSn(sn); catalogData.setTotal(total); catalogData.setDevice(device); catalogData.setChannelList(new ArrayList<>()); catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); catalogData.setLastTime(new Date(System.currentTimeMillis())); - data.put(key, catalogData); + data.put(deviceId, catalogData); }else { + // 同一个设备的通道同步请求只考虑一个,其他的直接忽略 + if (catalogData.getSn() != sn) { + return; + } catalogData.setTotal(total); catalogData.setDevice(device); catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); @@ -56,20 +63,20 @@ public class CatalogDataCatch { } } - public List get(String key) { - CatalogData catalogData = data.get(key); + public List get(String deviceId) { + CatalogData catalogData = data.get(deviceId); if (catalogData == null) return null; return catalogData.getChannelList(); } - public int getTotal(String key) { - CatalogData catalogData = data.get(key); + public int getTotal(String deviceId) { + CatalogData catalogData = data.get(deviceId); if (catalogData == null) return 0; return catalogData.getTotal(); } - public SyncStatus getSyncStatus(String key) { - CatalogData catalogData = data.get(key); + public SyncStatus getSyncStatus(String deviceId) { + CatalogData catalogData = data.get(deviceId); if (catalogData == null) return null; SyncStatus syncStatus = new SyncStatus(); syncStatus.setCurrent(catalogData.getChannelList().size()); @@ -78,10 +85,6 @@ public class CatalogDataCatch { return syncStatus; } - public void del(String key) { - data.remove(key); - } - @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 private void timerTask(){ Set keys = data.keySet(); @@ -92,23 +95,30 @@ public class CatalogDataCatch { Calendar calendarBefore30S = Calendar.getInstance(); calendarBefore30S.setTime(new Date()); calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30); - for (String key : keys) { - CatalogData catalogData = data.get(key); - if (catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据 - storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); - String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; + for (String deviceId : keys) { + CatalogData catalogData = data.get(deviceId); + if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据 + if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { + storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); + if (catalogData.getTotal() != catalogData.getChannelList().size()) { + String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; + catalogData.setErrorMsg(errorMsg); + } + }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) { + String errorMsg = "同步失败,等待回复超时"; + catalogData.setErrorMsg(errorMsg); + } catalogData.setStatus(CatalogData.CatalogDataStatus.end); - catalogData.setErrorMsg(errorMsg); } - if (catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除 - data.remove(key); + if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除 + data.remove(deviceId); } } } - public void setChannelSyncEnd(String key, String errorMsg) { - CatalogData catalogData = data.get(key); + public void setChannelSyncEnd(String deviceId, String errorMsg) { + CatalogData catalogData = data.get(deviceId); if (catalogData == null)return; catalogData.setStatus(CatalogData.CatalogDataStatus.end); catalogData.setErrorMsg(errorMsg); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 1de03bd7..aea37b6c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -250,7 +250,7 @@ public interface ISIPCommander { * * @param device 视频设备 */ - boolean catalogQuery(Device device, SipSubscribe.Event errorEvent); + boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent); /** * 查询录像信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 027238b7..89e70d0a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1208,14 +1208,14 @@ public class SIPCommander implements ISIPCommander { * @param device 视频设备 */ @Override - public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) { + public boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) { try { StringBuffer catalogXml = new StringBuffer(200); String charset = device.getCharset(); catalogXml.append("\r\n"); catalogXml.append("\r\n"); catalogXml.append("Catalog\r\n"); - catalogXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); + catalogXml.append("" + sn + "\r\n"); catalogXml.append("" + device.getDeviceId() + "\r\n"); catalogXml.append("\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index dbc25fc6..2ec80476 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -86,23 +86,17 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp rootElement = getRootElement(evt, device.getCharset()); Element deviceListElement = rootElement.element("DeviceList"); Element sumNumElement = rootElement.element("SumNum"); - if (sumNumElement == null || deviceListElement == null) { + Element snElement = rootElement.element("SN"); + if (snElement == null || sumNumElement == null || deviceListElement == null) { responseAck(evt, Response.BAD_REQUEST, "xml error"); return; } int sumNum = Integer.parseInt(sumNumElement.getText()); + if (sumNum == 0) { // 数据已经完整接收 storager.cleanChannelsForDevice(device.getDeviceId()); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - WVPResult result = new WVPResult<>(); - result.setCode(0); - result.setData(device); - msg.setData(result); - result.setMsg("更新成功,共0条"); - deferredResultHolder.invokeAllResult(msg); - catalogDataCatch.del(key); + catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); }else { Iterator deviceListIterator = deviceListElement.elementIterator(); if (deviceListIterator != null) { @@ -123,24 +117,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp channelList.add(deviceChannel); } + int sn = Integer.parseInt(snElement.getText()); logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum); - catalogDataCatch.put(key, sumNum, device, channelList); - if (catalogDataCatch.get(key).size() == sumNum) { + catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList); + if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) { // 数据已经完整接收 - boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key)); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - WVPResult result = new WVPResult<>(); - result.setCode(0); - result.setData(device); - if (resetChannelsResult || sumNum ==0) { - result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条"); + boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId())); + if (!resetChannelsResult) { + String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条"; + catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg); }else { - result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条"); + catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); } - msg.setData(result); - deferredResultHolder.invokeAllResult(msg); - catalogDataCatch.del(key); } } // 回复200 OK @@ -228,21 +216,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } public SyncStatus getChannelSyncProgress(String deviceId) { - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; - if (catalogDataCatch.get(key) == null) { + if (catalogDataCatch.get(deviceId) == null) { return null; }else { - return catalogDataCatch.getSyncStatus(key); + return catalogDataCatch.getSyncStatus(deviceId); } } - public void setChannelSyncReady(String deviceId) { - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; - catalogDataCatch.addReady(key); + public void setChannelSyncReady(Device device, int sn) { + catalogDataCatch.addReady(device, sn); } public void setChannelSyncEnd(String deviceId, String errorMsg) { - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; - catalogDataCatch.setChannelSyncEnd(key, errorMsg); + catalogDataCatch.setChannelSyncEnd(deviceId, errorMsg); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java index 17cf7f42..08ccfffc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java @@ -44,15 +44,8 @@ public interface IDeviceService { SyncStatus getChannelSyncStatus(String deviceId); /** - * 设置通道同步状态 - * @param deviceId 设备ID + * 通道同步 + * @param device */ - void setChannelSyncReady(String deviceId); - - /** - * 设置同步结束 - * @param deviceId 设备ID - * @param errorMsg 错误信息 - */ - void setChannelSyncEnd(String deviceId, String errorMsg); + void sync(Device device); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index d3432865..f36b3aed 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -100,12 +100,16 @@ public class DeviceServiceImpl implements IDeviceService { } @Override - public void setChannelSyncReady(String deviceId) { - catalogResponseMessageHandler.setChannelSyncReady(deviceId); - } - - @Override - public void setChannelSyncEnd(String deviceId, String errorMsg) { - catalogResponseMessageHandler.setChannelSyncEnd(deviceId, errorMsg); + public void sync(Device device) { + if (catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()) != null) { + logger.info("开启同步时发现同步已经存在"); + return; + } + int sn = (int)((Math.random()*9+1)*100000); + catalogResponseMessageHandler.setChannelSyncReady(device, sn); + sipCommander.catalogQuery(device, sn, event -> { + String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); + catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg); + }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 20e56d95..a3c5c6cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -238,12 +238,15 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { @Override public boolean resetChannels(String deviceId, List deviceChannelList) { + if (deviceChannelList == null) { + return false; + } TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); // 数据去重 List channels = new ArrayList<>(); StringBuilder stringBuilder = new StringBuilder(); Map subContMap = new HashMap<>(); - if (deviceChannelList.size() > 1) { + if (deviceChannelList != null && deviceChannelList.size() > 1) { // 数据去重 Set gbIdSet = new HashSet<>(); for (DeviceChannel deviceChannel : deviceChannelList) { @@ -300,6 +303,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { dataSourceTransactionManager.commit(transactionStatus); //手动提交 return true; }catch (Exception e) { + e.printStackTrace(); dataSourceTransactionManager.rollback(transactionStatus); return false; } @@ -415,10 +419,9 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); boolean result = false; try { - if (platformChannelMapper.delChannelForDeviceId(deviceId) <0 // 删除与国标平台的关联 - || deviceChannelMapper.cleanChannelsByDeviceId(deviceId) < 0 // 删除他的通道 - || deviceMapper.del(deviceId) < 0 // 移除设备信息 - ) { + platformChannelMapper.delChannelForDeviceId(deviceId); + deviceChannelMapper.cleanChannelsByDeviceId(deviceId); + if ( deviceMapper.del(deviceId) < 0 ) { //事务回滚 dataSourceTransactionManager.rollback(transactionStatus); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index 12136f47..cbcb4ff5 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -172,12 +172,8 @@ public class DeviceQuery { wvpResult.setData(syncStatus); return wvpResult; } - SyncStatus syncStatusReady = new SyncStatus(); - deviceService.setChannelSyncReady(deviceId); - cmder.catalogQuery(device, event -> { - String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); - deviceService.setChannelSyncEnd(deviceId, errorMsg); - }); + deviceService.sync(device); + WVPResult wvpResult = new WVPResult<>(); wvpResult.setCode(0); wvpResult.setMsg("开始同步"); diff --git a/web_src/src/components/dialog/SyncChannelProgress.vue b/web_src/src/components/dialog/SyncChannelProgress.vue index 1ec16f45..246f1ae7 100644 --- a/web_src/src/components/dialog/SyncChannelProgress.vue +++ b/web_src/src/components/dialog/SyncChannelProgress.vue @@ -61,23 +61,36 @@ export default { if (!this.syncFlag) { this.syncFlag = true; } - if (res.data.data == null) { - this.syncStatus = "success" - this.percentage = 100; - this.msg = '同步成功'; - }else if (res.data.data.total == 0){ - this.msg = `等待同步中`; - this.timmer = setTimeout(this.getProgress, 300) - }else if (res.data.data.errorMsg !== null ){ - this.msg = res.data.data.errorMsg; - this.syncStatus = "exception" - }else { - this.total = res.data.data.total; - this.current = res.data.data.current; - this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100; - this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`; - this.timmer = setTimeout(this.getProgress, 300) + + if (res.data.data != null) { + if (res.data.data.total == 0) { + if (res.data.data.errorMsg !== null ){ + this.msg = res.data.data.errorMsg; + this.syncStatus = "exception" + }else { + this.msg = `等待同步中`; + this.timmer = setTimeout(this.getProgress, 300) + } + }else { + if (res.data.data.total == res.data.data.current) { + this.syncStatus = "success" + this.percentage = 100; + this.msg = '同步成功'; + }else { + if (res.data.data.errorMsg !== null ){ + this.msg = res.data.data.errorMsg; + this.syncStatus = "exception" + }else { + this.total = res.data.data.total; + this.current = res.data.data.current; + this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100; + this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`; + this.timmer = setTimeout(this.getProgress, 300) + } + } + } } + }else { if (this.syncFlag) { this.syncStatus = "success"