diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index d932a208..e7b7ab80 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -52,7 +52,7 @@ public class SubscribeHolder { Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(null); } // 添加任务处理订阅过期 dynamicTask.stop(taskOverdueKey); @@ -87,7 +87,7 @@ public class SubscribeHolder { Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(null); } // 添加任务处理订阅过期 dynamicTask.stop(taskOverdueKey); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java index a4e711d9..8d1c7d2e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java @@ -1,10 +1,10 @@ package com.genersoft.iot.vmp.gb28181.task; -import javax.sip.DialogState; +import com.genersoft.iot.vmp.common.CommonCallback; /** * @author lin */ public interface ISubscribeTask extends Runnable{ - void stop(); + void stop(CommonCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java index 2ffbfe40..d9270bbf 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.task.impl; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; @@ -7,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import javax.sip.*; +import javax.sip.DialogState; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; import javax.sip.header.ToHeader; import java.text.ParseException; -import java.util.Timer; -import java.util.TimerTask; /** * 目录订阅任务 @@ -71,7 +71,7 @@ public class CatalogSubscribeTask implements ISubscribeTask { } @Override - public void stop() { + public void stop(CommonCallback callback) { /** * dialog 的各个状态 * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息 @@ -94,6 +94,9 @@ public class CatalogSubscribeTask implements ISubscribeTask { // 成功 logger.info("[取消目录订阅]成功: {}", device.getDeviceId()); } + if (callback != null) { + callback.run(event.getResponse().getRawContent() != null); + } },eventResult -> { // 失败 logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index 2e792c1c..a4512f35 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -1,20 +1,9 @@ package com.genersoft.iot.vmp.gb28181.task.impl; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.IPlatformService; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.SpringBeanFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Async; - -import javax.sip.DialogState; -import java.util.List; /** * 向已经订阅(移动位置)的上级发送MobilePosition消息 @@ -38,7 +27,7 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { } @Override - public void stop() { + public void stop(CommonCallback callback) { } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java index 0abd3cae..9fed0793 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java @@ -1,21 +1,19 @@ package com.genersoft.iot.vmp.gb28181.task.impl; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import gov.nist.javax.sip.message.SIPRequest; -import gov.nist.javax.sip.message.SIPResponse; -import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Async; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; import javax.sip.header.ToHeader; import java.text.ParseException; -import java.util.Timer; -import java.util.TimerTask; /** * 移动位置订阅的定时更新 @@ -70,7 +68,7 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { } @Override - public void stop() { + public void stop(CommonCallback callback) { /** * dialog 的各个状态 * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息 @@ -92,6 +90,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { // 成功 logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId()); } + if (callback != null) { + callback.run(event.getResponse().getRawContent() != null); + } },eventResult -> { // 失败 logger.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 22c34a64..034e24f9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -82,8 +82,9 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp device.setIp(remoteAddressInfo.getIp()); // 设备地址变化会引起目录订阅任务失效,需要重新添加 if (device.getSubscribeCycleForCatalog() > 0) { - deviceService.removeCatalogSubscribe(device); - deviceService.addCatalogSubscribe(device); + deviceService.removeCatalogSubscribe(device, result->{ + deviceService.addCatalogSubscribe(device); + }); } } if (device.getKeepaliveTime() == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java index e20c3bf2..afa00442 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; @@ -39,7 +40,7 @@ public interface IDeviceService { * @param device 设备信息 * @return 布尔 */ - boolean removeCatalogSubscribe(Device device); + boolean removeCatalogSubscribe(Device device, CommonCallback callback); /** * 添加移动位置订阅 @@ -53,7 +54,7 @@ public interface IDeviceService { * @param device 设备信息 * @return 布尔 */ - boolean removeMobilePositionSubscribe(Device device); + boolean removeMobilePositionSubscribe(Device device, CommonCallback callback); /** * 移除移动位置订阅 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 1df9ee45..8665e058 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; @@ -231,8 +232,8 @@ public class DeviceServiceImpl implements IDeviceService { } } // 移除订阅 - removeCatalogSubscribe(device); - removeMobilePositionSubscribe(device); + removeCatalogSubscribe(device, null); + removeMobilePositionSubscribe(device, null); } @Override @@ -251,7 +252,7 @@ public class DeviceServiceImpl implements IDeviceService { } @Override - public boolean removeCatalogSubscribe(Device device) { + public boolean removeCatalogSubscribe(Device device, CommonCallback callback) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } @@ -261,7 +262,7 @@ public class DeviceServiceImpl implements IDeviceService { Runnable runnable = dynamicTask.get(taskKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(callback); } } dynamicTask.stop(taskKey); @@ -284,7 +285,7 @@ public class DeviceServiceImpl implements IDeviceService { } @Override - public boolean removeMobilePositionSubscribe(Device device) { + public boolean removeMobilePositionSubscribe(Device device, CommonCallback callback) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } @@ -294,7 +295,7 @@ public class DeviceServiceImpl implements IDeviceService { Runnable runnable = dynamicTask.get(taskKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(callback); } } dynamicTask.stop(taskKey); @@ -522,39 +523,54 @@ public class DeviceServiceImpl implements IDeviceService { if (!ObjectUtils.isEmpty(device.getStreamMode())) { deviceInStore.setStreamMode(device.getStreamMode()); } - - // 目录订阅相关的信息 if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { if (device.getSubscribeCycleForCatalog() > 0) { // 若已开启订阅,但订阅周期不同,则先取消 if (deviceInStore.getSubscribeCycleForCatalog() != 0) { - removeCatalogSubscribe(deviceInStore); + removeCatalogSubscribe(deviceInStore, result->{ + // 开启订阅 + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); + addCatalogSubscribe(deviceInStore); + // 因为是异步执行,需要在这里更新下数据 + deviceMapper.updateCustom(deviceInStore); + redisCatchStorage.updateDevice(deviceInStore); + }); + }else { + // 开启订阅 + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); + addCatalogSubscribe(deviceInStore); } - // 开启订阅 - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); - addCatalogSubscribe(deviceInStore); + }else if (device.getSubscribeCycleForCatalog() == 0) { // 取消订阅 - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); - removeCatalogSubscribe(deviceInStore); + deviceInStore.setSubscribeCycleForCatalog(0); + removeCatalogSubscribe(deviceInStore, null); } } - // 移动位置订阅相关的信息 - if (device.getSubscribeCycleForMobilePosition() > 0) { - if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) { - deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); - deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); - // 开启订阅 - addMobilePositionSubscribe(deviceInStore); - } - }else if (device.getSubscribeCycleForMobilePosition() == 0) { - if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { - deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); - deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) { + if (device.getSubscribeCycleForMobilePosition() > 0) { + // 若已开启订阅,但订阅周期不同,则先取消 + if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { + removeMobilePositionSubscribe(deviceInStore, result->{ + // 开启订阅 + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + addMobilePositionSubscribe(deviceInStore); + // 因为是异步执行,需要在这里更新下数据 + deviceMapper.updateCustom(deviceInStore); + redisCatchStorage.updateDevice(deviceInStore); + }); + }else { + // 开启订阅 + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + addMobilePositionSubscribe(deviceInStore); + } + + }else if (device.getSubscribeCycleForMobilePosition() == 0) { // 取消订阅 - removeMobilePositionSubscribe(deviceInStore); + deviceInStore.setSubscribeCycleForCatalog(0); + removeCatalogSubscribe(deviceInStore, null); } } if (deviceInStore.getGeoCoordSys() != null) { @@ -574,9 +590,8 @@ public class DeviceServiceImpl implements IDeviceService { //作为消息通道 deviceInStore.setAsMessageChannel(device.isAsMessageChannel()); - // 更新redis deviceMapper.updateCustom(deviceInStore); - redisCatchStorage.removeDevice(deviceInStore.getDeviceId()); + redisCatchStorage.updateDevice(deviceInStore); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java index 91c992fa..05501cc4 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java @@ -153,10 +153,7 @@ public class MobilePositionController { Device device = storager.queryVideoDevice(deviceId); device.setSubscribeCycleForMobilePosition(Integer.parseInt(expires)); device.setMobilePositionSubmissionInterval(Integer.parseInt(interval)); - deviceService.updateDevice(device); - if (!deviceService.removeMobilePositionSubscribe(device)) { - throw new ControllerException(ErrorCode.ERROR100); - } + deviceService.updateCustomDevice(device); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index f0af27f4..852f1c43 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -199,7 +199,7 @@ public class DeviceQuery { Runnable runnable = dynamicTask.get(key); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(null); } dynamicTask.stop(key); }