diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index c4d49a84..e1d7c495 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -22,6 +22,9 @@ public class VideoManagerConstants { public static final String DEVICE_PREFIX = "VMP_DEVICE_"; + // 设备同步完成 + public static final String DEVICE_SYNC_PREFIX = "VMP_DEVICE_SYNC_"; + public static final String CACHEKEY_PREFIX = "VMP_CHANNEL_"; public static final String KEEPLIVEKEY_PREFIX = "VMP_KEEPALIVE_"; @@ -69,6 +72,7 @@ public class VideoManagerConstants { public static final String SYSTEM_INFO_NET_PREFIX = "VMP_SYSTEM_INFO_NET_"; + //************************** redis 消息********************************* // 流变化的通知 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java index c74fb87b..40f676e5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java @@ -8,6 +8,12 @@ public class CatalogData { private List channelList; private Date lastTime; private Device device; + private String errorMsg; + + public enum CatalogDataStatus{ + ready, runIng, end + } + private CatalogDataStatus status; public int getTotal() { return total; @@ -40,4 +46,20 @@ public class CatalogData { public void setDevice(Device device) { this.device = device; } + + public String getErrorMsg() { + return errorMsg; + } + + public void setErrorMsg(String errorMsg) { + this.errorMsg = errorMsg; + } + + public CatalogDataStatus getStatus() { + return status; + } + + public void setStatus(CatalogDataStatus status) { + this.status = status; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SyncStatus.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SyncStatus.java new file mode 100644 index 00000000..2ae50e69 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SyncStatus.java @@ -0,0 +1,34 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +/** + * 摄像机同步状态 + */ +public class SyncStatus { + private int total; + private int current; + private String errorMsg; + + public int getTotal() { + return total; + } + + public void setTotal(int total) { + this.total = total; + } + + public int getCurrent() { + return current; + } + + public void setCurrent(int current) { + this.current = current; + } + + public String getErrorMsg() { + return errorMsg; + } + + public void setErrorMsg(String errorMsg) { + this.errorMsg = errorMsg; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java index 755967be..fc9eb274 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java @@ -97,8 +97,6 @@ public class OnlineEventListener implements ApplicationListener { } // 处理上线监听 storager.updateDevice(device); - List deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId()); - eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON); // 上线添加订阅 if (device.getSubscribeCycleForCatalog() > 0) { deviceService.addCatalogSubscribe(device); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java index 199f8267..fbc2a323 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.gb28181.bean.CatalogData; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -25,6 +26,17 @@ public class CatalogDataCatch { @Autowired private IVideoManagerStorage storager; + public void addReady(String key) { + CatalogData catalogData = data.get(key); + if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { + catalogData = new CatalogData(); + catalogData.setChannelList(new ArrayList<>()); + catalogData.setStatus(CatalogData.CatalogDataStatus.ready); + catalogData.setLastTime(new Date(System.currentTimeMillis())); + data.put(key, catalogData); + } + } + public void put(String key, int total, Device device, List deviceChannelList) { CatalogData catalogData = data.get(key); if (catalogData == null) { @@ -32,10 +44,16 @@ public class CatalogDataCatch { catalogData.setTotal(total); catalogData.setDevice(device); catalogData.setChannelList(new ArrayList<>()); + catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); + catalogData.setLastTime(new Date(System.currentTimeMillis())); data.put(key, catalogData); + }else { + catalogData.setTotal(total); + catalogData.setDevice(device); + catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); + catalogData.getChannelList().addAll(deviceChannelList); + catalogData.setLastTime(new Date(System.currentTimeMillis())); } - catalogData.getChannelList().addAll(deviceChannelList); - catalogData.setLastTime(new Date(System.currentTimeMillis())); } public List get(String key) { @@ -50,6 +68,16 @@ public class CatalogDataCatch { return catalogData.getTotal(); } + public SyncStatus getSyncStatus(String key) { + CatalogData catalogData = data.get(key); + if (catalogData == null) return null; + SyncStatus syncStatus = new SyncStatus(); + syncStatus.setCurrent(catalogData.getChannelList().size()); + syncStatus.setTotal(catalogData.getTotal()); + syncStatus.setErrorMsg(catalogData.getErrorMsg()); + return syncStatus; + } + public void del(String key) { data.remove(key); } @@ -57,24 +85,32 @@ public class CatalogDataCatch { @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 private void timerTask(){ Set keys = data.keySet(); - Calendar calendar = Calendar.getInstance(); - calendar.setTime(new Date()); - calendar.set(Calendar.SECOND, calendar.get(Calendar.SECOND) - 5); + Calendar calendarBefore5S = Calendar.getInstance(); + calendarBefore5S.setTime(new Date()); + calendarBefore5S.set(Calendar.SECOND, calendarBefore5S.get(Calendar.SECOND) - 5); + + Calendar calendarBefore30S = Calendar.getInstance(); + calendarBefore30S.setTime(new Date()); + calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30); for (String key : keys) { CatalogData catalogData = data.get(key); - if (catalogData.getLastTime().before(calendar.getTime())) { - + if (catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据 storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - WVPResult result = new WVPResult<>(); - result.setCode(0); - result.setMsg("更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"); - result.setData(catalogData.getDevice()); - msg.setData(result); - deferredResultHolder.invokeAllResult(msg); + String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; + catalogData.setStatus(CatalogData.CatalogDataStatus.end); + catalogData.setErrorMsg(errorMsg); + } + if (catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除 data.remove(key); } } } + + + public void setChannelSyncEnd(String key, String errorMsg) { + CatalogData catalogData = data.get(key); + if (catalogData == null)return; + catalogData.setStatus(CatalogData.CatalogDataStatus.end); + catalogData.setErrorMsg(errorMsg); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 1f58a151..a900819c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -46,6 +46,7 @@ public interface ISIPCommanderForPlatform { * @return */ boolean catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size); + boolean catalogQuery(List channels, ParentPlatform parentPlatform, String sn, String fromTag); /** * 向上级回复DeviceInfo查询信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 299799fb..123d0e78 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -235,7 +235,7 @@ public class SIPCommander implements ISIPCommander { String cmdStr= cmdString(leftRight, upDown, inOut, moveSpeed, zoomSpeed); StringBuffer ptzXml = new StringBuffer(200); String charset = device.getCharset(); - ptzXml.append("\r\n"); + ptzXml.append("\r\n"); ptzXml.append("\r\n"); ptzXml.append("DeviceControl\r\n"); ptzXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -278,7 +278,7 @@ public class SIPCommander implements ISIPCommander { logger.debug("控制字符串:" + cmdStr); StringBuffer ptzXml = new StringBuffer(200); String charset = device.getCharset(); - ptzXml.append("\r\n"); + ptzXml.append("\r\n"); ptzXml.append("\r\n"); ptzXml.append("DeviceControl\r\n"); ptzXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -314,7 +314,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer ptzXml = new StringBuffer(200); String charset = device.getCharset(); - ptzXml.append("\r\n"); + ptzXml.append("\r\n"); ptzXml.append("\r\n"); ptzXml.append("DeviceControl\r\n"); ptzXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -778,7 +778,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer broadcastXml = new StringBuffer(200); String charset = device.getCharset(); - broadcastXml.append("\r\n"); + broadcastXml.append("\r\n"); broadcastXml.append("\r\n"); broadcastXml.append("Broadcast\r\n"); broadcastXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -804,7 +804,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer broadcastXml = new StringBuffer(200); String charset = device.getCharset(); - broadcastXml.append("\r\n"); + broadcastXml.append("\r\n"); broadcastXml.append("\r\n"); broadcastXml.append("Broadcast\r\n"); broadcastXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -837,7 +837,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("DeviceControl\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -873,7 +873,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("DeviceControl\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -906,7 +906,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("DeviceControl\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -938,7 +938,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("DeviceControl\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -983,7 +983,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("DeviceControl\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1022,7 +1022,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("DeviceControl\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1091,7 +1091,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("DeviceConfig\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1146,7 +1146,7 @@ public class SIPCommander implements ISIPCommander { try { String charset = device.getCharset(); StringBuffer catalogXml = new StringBuffer(200); - catalogXml.append("\r\n"); + catalogXml.append("\r\n"); catalogXml.append("\r\n"); catalogXml.append("DeviceStatus\r\n"); catalogXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1179,7 +1179,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer catalogXml = new StringBuffer(200); String charset = device.getCharset(); - catalogXml.append("\r\n"); + catalogXml.append("\r\n"); catalogXml.append("\r\n"); catalogXml.append("DeviceInfo\r\n"); catalogXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1212,7 +1212,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer catalogXml = new StringBuffer(200); String charset = device.getCharset(); - catalogXml.append("\r\n"); + catalogXml.append("\r\n"); catalogXml.append("\r\n"); catalogXml.append("Catalog\r\n"); catalogXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1252,7 +1252,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer recordInfoXml = new StringBuffer(200); String charset = device.getCharset(); - recordInfoXml.append("\r\n"); + recordInfoXml.append("\r\n"); recordInfoXml.append("\r\n"); recordInfoXml.append("RecordInfo\r\n"); recordInfoXml.append("" + sn + "\r\n"); @@ -1306,7 +1306,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("Alarm\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1357,7 +1357,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("ConfigDownload\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1393,7 +1393,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("PresetQuery\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1428,7 +1428,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer mobilePostitionXml = new StringBuffer(200); String charset = device.getCharset(); - mobilePostitionXml.append("\r\n"); + mobilePostitionXml.append("\r\n"); mobilePostitionXml.append("\r\n"); mobilePostitionXml.append("MobilePosition\r\n"); mobilePostitionXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1462,7 +1462,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer subscribePostitionXml = new StringBuffer(200); String charset = device.getCharset(); - subscribePostitionXml.append("\r\n"); + subscribePostitionXml.append("\r\n"); subscribePostitionXml.append("\r\n"); subscribePostitionXml.append("MobilePosition\r\n"); subscribePostitionXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1513,7 +1513,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("Alarm\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1559,7 +1559,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); - cmdXml.append("\r\n"); + cmdXml.append("\r\n"); cmdXml.append("\r\n"); cmdXml.append("Catalog\r\n"); cmdXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -1590,7 +1590,7 @@ public class SIPCommander implements ISIPCommander { try { StringBuffer dragXml = new StringBuffer(200); String charset = device.getCharset(); - dragXml.append("\r\n"); + dragXml.append("\r\n"); dragXml.append("\r\n"); dragXml.append("DeviceControl\r\n"); dragXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 102aff89..0dc11e01 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -147,7 +147,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { try { String characterSet = parentPlatform.getCharacterSet(); StringBuffer keepaliveXml = new StringBuffer(200); - keepaliveXml.append("\r\n"); + keepaliveXml.append("\r\n"); keepaliveXml.append("\r\n"); keepaliveXml.append("Keepalive\r\n"); keepaliveXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -215,44 +215,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { return false; } try { - String characterSet = parentPlatform.getCharacterSet(); - StringBuffer catalogXml = new StringBuffer(600); - catalogXml.append("\r\n"); - catalogXml.append("\r\n"); - catalogXml.append("Catalog\r\n"); - catalogXml.append("" +sn + "\r\n"); - catalogXml.append("" + parentPlatform.getDeviceGBId() + "\r\n"); - catalogXml.append("" + size + "\r\n"); - catalogXml.append("\r\n"); - catalogXml.append("\r\n"); - if (channel != null) { - catalogXml.append("" + channel.getChannelId() + "\r\n"); - catalogXml.append("" + channel.getName() + "\r\n"); - catalogXml.append("" + channel.getManufacture() + "\r\n"); - catalogXml.append("" + channel.getModel() + "\r\n"); - catalogXml.append("" + channel.getOwner() + "\r\n"); - catalogXml.append("" + channel.getCivilCode() + "\r\n"); - catalogXml.append("
" + channel.getAddress() + "
\r\n"); - catalogXml.append("" + channel.getParental() + "\r\n"); - if (channel.getParentId() != null) { - catalogXml.append("" + channel.getParentId() + "\r\n"); - } - catalogXml.append("" + channel.getSecrecy() + "\r\n"); - catalogXml.append("" + channel.getRegisterWay() + "\r\n"); - catalogXml.append("" + (channel.getStatus() == 0?"OFF":"ON") + "\r\n"); - catalogXml.append("" + channel.getLongitude() + "\r\n"); - catalogXml.append("" + channel.getLatitude() + "\r\n"); - catalogXml.append("" + channel.getIpAddress() + "\r\n"); - catalogXml.append("" + channel.getPort() + "\r\n"); - catalogXml.append("\r\n"); - catalogXml.append("" + channel.getPTZType() + "\r\n"); - catalogXml.append("\r\n"); - } - - - catalogXml.append("
\r\n"); - catalogXml.append("
\r\n"); - catalogXml.append("
\r\n"); + String catalogXml = getCatalogXml(channel, sn, parentPlatform, size); // callid CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() @@ -268,6 +231,77 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { return true; } + @Override + public boolean catalogQuery(List channels, ParentPlatform parentPlatform, String sn, String fromTag) { + if ( parentPlatform ==null) { + return false; + } + sendCatalogResponse(channels, parentPlatform, sn, fromTag, 0); + return true; + } + private String getCatalogXml(DeviceChannel channel, String sn, ParentPlatform parentPlatform, int size) { + String characterSet = parentPlatform.getCharacterSet(); + StringBuffer catalogXml = new StringBuffer(600); + catalogXml.append("\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("Catalog\r\n"); + catalogXml.append("" +sn + "\r\n"); + catalogXml.append("" + parentPlatform.getDeviceGBId() + "\r\n"); + catalogXml.append("" + size + "\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("\r\n"); + if (channel != null) { + catalogXml.append("" + channel.getChannelId() + "\r\n"); + catalogXml.append("" + channel.getName() + "\r\n"); + catalogXml.append("" + channel.getManufacture() + "\r\n"); + catalogXml.append("" + channel.getModel() + "\r\n"); + catalogXml.append("" + channel.getOwner() + "\r\n"); + catalogXml.append("" + channel.getCivilCode() + "\r\n"); + catalogXml.append("
" + channel.getAddress() + "
\r\n"); + catalogXml.append("" + channel.getParental() + "\r\n"); + if (channel.getParentId() != null) { + catalogXml.append("" + channel.getParentId() + "\r\n"); + } + catalogXml.append("" + channel.getSecrecy() + "\r\n"); + catalogXml.append("" + channel.getRegisterWay() + "\r\n"); + catalogXml.append("" + (channel.getStatus() == 0?"OFF":"ON") + "\r\n"); + catalogXml.append("" + channel.getLongitude() + "\r\n"); + catalogXml.append("" + channel.getLatitude() + "\r\n"); + catalogXml.append("" + channel.getIpAddress() + "\r\n"); + catalogXml.append("" + channel.getPort() + "\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("" + channel.getPTZType() + "\r\n"); + catalogXml.append("\r\n"); + } + + + catalogXml.append("
\r\n"); + catalogXml.append("
\r\n"); + catalogXml.append("
\r\n"); + return catalogXml.toString(); + } + + private void sendCatalogResponse(List channels, ParentPlatform parentPlatform, String sn, String fromTag, int index) { + if (index >= channels.size()) { + return; + } + try { + DeviceChannel deviceChannel = channels.get(index); + String catalogXml = getCatalogXml(deviceChannel, sn, parentPlatform, channels.size()); + // callid + CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() + : udpSipProvider.getNewCallId(); + + Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, catalogXml, fromTag, callIdHeader); + transmitRequest(parentPlatform, request, null, eventResult -> { + int indexNext = index + 1; + sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext); + }); + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } + /** * 向上级回复DeviceInfo查询信息 * @param parentPlatform 平台信息 @@ -283,7 +317,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { try { String characterSet = parentPlatform.getCharacterSet(); StringBuffer deviceInfoXml = new StringBuffer(600); - deviceInfoXml.append("\r\n"); + deviceInfoXml.append("\r\n"); deviceInfoXml.append("\r\n"); deviceInfoXml.append("DeviceInfo\r\n"); deviceInfoXml.append("" +sn + "\r\n"); @@ -323,7 +357,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { try { String characterSet = parentPlatform.getCharacterSet(); StringBuffer deviceStatusXml = new StringBuffer(600); - deviceStatusXml.append("\r\n"); + deviceStatusXml.append("\r\n"); deviceStatusXml.append("\r\n"); deviceStatusXml.append("DeviceStatus\r\n"); deviceStatusXml.append("" +sn + "\r\n"); @@ -355,7 +389,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { try { String characterSet = parentPlatform.getCharacterSet(); StringBuffer deviceStatusXml = new StringBuffer(600); - deviceStatusXml.append("\r\n"); + deviceStatusXml.append("\r\n"); deviceStatusXml.append("\r\n"); deviceStatusXml.append("MobilePosition\r\n"); deviceStatusXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -472,7 +506,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { channel.setParentId(parentPlatform.getDeviceGBId()); } String characterSet = parentPlatform.getCharacterSet(); - catalogXml.append("\r\n"); + catalogXml.append("\r\n"); catalogXml.append("\r\n"); catalogXml.append("Catalog\r\n"); catalogXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); @@ -546,7 +580,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } String characterSet = parentPlatform.getCharacterSet(); StringBuffer catalogXml = new StringBuffer(600); - catalogXml.append("\r\n"); + catalogXml.append("\r\n"); catalogXml.append("\r\n"); catalogXml.append("Catalog\r\n"); catalogXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); @@ -569,7 +603,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { try { String characterSet = parentPlatform.getCharacterSet(); StringBuffer recordXml = new StringBuffer(600); - recordXml.append("\r\n"); + recordXml.append("\r\n"); recordXml.append("\r\n"); recordXml.append("RecordInfo\r\n"); recordXml.append("" +recordInfo.getSn() + "\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java index 4f1249ab..136b9120 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java @@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; @@ -23,6 +24,7 @@ import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; +import javax.sip.address.SipURI; import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; @@ -81,6 +83,17 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement // 查询上级平台是否存在 ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId); try { + if (device != null && parentPlatform != null) { + logger.warn("[重复]平台与设备编号重复:{}", deviceId); + SIPRequest request = (SIPRequest) evt.getRequest(); + String hostAddress = request.getRemoteAddress().getHostAddress(); + int remotePort = request.getRemotePort(); + if (device.getHostAddress().equals(hostAddress + ":" + remotePort)) { + parentPlatform = null; + }else { + device = null; + } + } if (device == null && parentPlatform == null) { // 不存在则回复404 responseAck(evt, Response.NOT_FOUND, "device "+ deviceId +" not found"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/CatalogNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/CatalogNotifyMessageHandler.java index e548cfe9..419a5c26 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/CatalogNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/CatalogNotifyMessageHandler.java @@ -18,6 +18,7 @@ import javax.sip.SipException; import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; +import java.util.ArrayList; import java.util.List; @Component @@ -58,7 +59,8 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple List deviceChannels = storage.queryChannelListInParentPlatform(parentPlatform.getServerGBId()); // 查询关联的直播通道 List gbStreams = storage.queryGbStreamListInPlatform(parentPlatform.getServerGBId()); - int size = deviceChannels.size() + gbStreams.size(); + + List allChannels = new ArrayList<>(); // 回复目录信息 List catalogs = storage.queryCatalogInPlatform(parentPlatform.getServerGBId()); if (catalogs.size() > 0) { @@ -81,9 +83,7 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple deviceChannel.setModel("live"); deviceChannel.setOwner("wvp-pro"); deviceChannel.setSecrecy("0"); - cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); - // 防止发送过快 - Thread.sleep(100); + allChannels.add(deviceChannel); } } // 回复级联的通道 @@ -96,9 +96,7 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple deviceChannel.setParental(0); deviceChannel.setParentId(channel.getCatalogId()); deviceChannel.setCivilCode(parentPlatform.getDeviceGBId().substring(0, 6)); - cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); - // 防止发送过快 - Thread.sleep(100); + allChannels.add(deviceChannel); } } // 回复直播的通道 @@ -123,16 +121,16 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple deviceChannel.setOwner("wvp-pro"); deviceChannel.setParental(0); deviceChannel.setSecrecy("0"); - cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); - // 防止发送过快 - Thread.sleep(100); + allChannels.add(deviceChannel); } } - if (size == 0) { + if (allChannels.size() > 0) { + cmderFroPlatform.catalogQuery(allChannels, parentPlatform, sn, fromHeader.getTag()); + }else { // 回复无通道 - cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), size); + cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), 0); } - } catch (SipException | InvalidArgumentException | ParseException | InterruptedException e) { + } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java index d16506d3..6a33da45 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java @@ -22,6 +22,7 @@ import javax.sip.SipException; import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; +import java.util.ArrayList; import java.util.List; @Component @@ -45,6 +46,9 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem @Autowired private EventPublisher publisher; + @Autowired + private IVideoManagerStorage storage; + @Override public void afterPropertiesSet() throws Exception { queryMessageHandler.addHandler(cmdType, this); @@ -71,10 +75,11 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem List gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId()); // 回复目录信息 List catalogs = storager.queryCatalogInPlatform(parentPlatform.getServerGBId()); - int size = catalogs.size() + deviceChannelInPlatforms.size() + gbStreams.size(); + + List allChannels = new ArrayList<>(); if (catalogs.size() > 0) { for (PlatformCatalog catalog : catalogs) { - if (catalog.getParentId().equals(parentPlatform.getServerGBId())) { + if (catalog.getParentId().equals(catalog.getPlatformId())) { catalog.setParentId(parentPlatform.getDeviceGBId()); } DeviceChannel deviceChannel = new DeviceChannel(); @@ -92,9 +97,7 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem deviceChannel.setModel("live"); deviceChannel.setOwner("wvp-pro"); deviceChannel.setSecrecy("0"); - cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); - // 防止发送过快 - Thread.sleep(100); + allChannels.add(deviceChannel); } } // 回复级联的通道 @@ -103,20 +106,18 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem if (channel.getCatalogId().equals(parentPlatform.getServerGBId())) { channel.setCatalogId(parentPlatform.getDeviceGBId()); } - DeviceChannel deviceChannel = storager.queryChannel(channel.getDeviceId(), channel.getChannelId()); + DeviceChannel deviceChannel = storage.queryChannel(channel.getDeviceId(), channel.getChannelId()); deviceChannel.setParental(0); deviceChannel.setParentId(channel.getCatalogId()); deviceChannel.setCivilCode(parentPlatform.getDeviceGBId().substring(0, 6)); - cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); - // 防止发送过快 - Thread.sleep(100); + allChannels.add(deviceChannel); } } // 回复直播的通道 if (gbStreams.size() > 0) { for (GbStream gbStream : gbStreams) { if (gbStream.getCatalogId().equals(parentPlatform.getServerGBId())) { - gbStream.setCatalogId(parentPlatform.getDeviceGBId()); + gbStream.setCatalogId(null); } DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setChannelId(gbStream.getGbId()); @@ -134,15 +135,14 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem deviceChannel.setOwner("wvp-pro"); deviceChannel.setParental(0); deviceChannel.setSecrecy("0"); - - cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); - // 防止发送过快 - Thread.sleep(100); + allChannels.add(deviceChannel); } } - if (size == 0) { + if (allChannels.size() > 0) { + cmderFroPlatform.catalogQuery(allChannels, parentPlatform, sn, fromHeader.getTag()); + }else { // 回复无通道 - cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), size); + cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), 0); } } catch (SipException e) { e.printStackTrace(); @@ -150,8 +150,6 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 32b645c0..1bafb59d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -123,7 +123,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp channelList.add(deviceChannel); } - logger.debug("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum); + logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum); catalogDataCatch.put(key, sumNum, device, channelList); if (catalogDataCatch.get(key).size() == sumNum) { // 数据已经完整接收 @@ -230,8 +230,22 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } } - public String getChannelSyncProgress(String deviceId) { + public SyncStatus getChannelSyncProgress(String deviceId) { String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; - return catalogDataCatch.get(key) == null ? "0/0" : catalogDataCatch.get(key).size() + "/" + catalogDataCatch.getTotal(key); + if (catalogDataCatch.get(key) == null) { + return null; + }else { + return catalogDataCatch.getSyncStatus(key); + } + } + + public void setChannelSyncReady(String deviceId) { + String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; + catalogDataCatch.addReady(key); + } + + public void setChannelSyncEnd(String deviceId, String errorMsg) { + String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; + catalogDataCatch.setChannelSyncEnd(key, errorMsg); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java index 68a2af39..17cf7f42 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; /** * 设备相关业务处理 @@ -34,4 +35,24 @@ public interface IDeviceService { * @return */ boolean removeMobilePositionSubscribe(Device device); + + /** + * 移除移动位置订阅 + * @param deviceId 设备ID + * @return + */ + SyncStatus getChannelSyncStatus(String deviceId); + + /** + * 设置通道同步状态 + * @param deviceId 设备ID + */ + void setChannelSyncReady(String deviceId); + + /** + * 设置同步结束 + * @param deviceId 设备ID + * @param errorMsg 错误信息 + */ + void setChannelSyncEnd(String deviceId, String errorMsg); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index e9f7968f..8cd2c77b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -3,9 +3,12 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; +import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -25,6 +28,12 @@ public class DeviceServiceImpl implements IDeviceService { @Autowired private ISIPCommander sipCommander; + @Autowired + private CatalogResponseMessageHandler catalogResponseMessageHandler; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Override public boolean addCatalogSubscribe(Device device) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { @@ -86,4 +95,19 @@ public class DeviceServiceImpl implements IDeviceService { dynamicTask.stop(device.getDeviceId() + "mobile_position"); return true; } + + @Override + public SyncStatus getChannelSyncStatus(String deviceId) { + return catalogResponseMessageHandler.getChannelSyncProgress(deviceId); + } + + @Override + public void setChannelSyncReady(String deviceId) { + catalogResponseMessageHandler.setChannelSyncReady(deviceId); + } + + @Override + public void setChannelSyncEnd(String deviceId, String errorMsg) { + catalogResponseMessageHandler.setChannelSyncEnd(deviceId, errorMsg); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index e66dfa0b..c6b7e648 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -216,4 +216,5 @@ public interface IRedisCatchStorage { void sendMobilePositionMsg(JSONObject jsonObject); void sendStreamPushRequestedMsg(MessageForPushChannel messageForPushChannel); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 1fb7fd0c..fc78de4a 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -638,4 +638,5 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { logger.info("[redis 推流被请求通知] {}: {}-{}", key, msg.getApp(), msg.getStream()); redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); } + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 667a6197..20e56d95 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -445,8 +445,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { device.setOnline(1); logger.info("更新设备在线: " + deviceId); redisCatchStorage.updateDevice(device); - List deviceChannelList = deviceChannelMapper.queryOnlineChannelsByDeviceId(deviceId); - eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON); return deviceMapper.update(device) > 0; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index 64133963..1b6d31eb 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -18,6 +19,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; +import org.kxml2.wap.wv.WV; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -149,49 +151,30 @@ public class DeviceQuery { @ApiImplicitParam(name="deviceId", value = "设备id", required = true, dataTypeClass = String.class), }) @PostMapping("/devices/{deviceId}/sync") - public DeferredResult> devicesSync(@PathVariable String deviceId){ + public WVPResult devicesSync(@PathVariable String deviceId){ if (logger.isDebugEnabled()) { logger.debug("设备通道信息同步API调用,deviceId:" + deviceId); } Device device = storager.queryVideoDevice(deviceId); - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; - String uuid = UUID.randomUUID().toString(); - // 默认超时时间为30分钟 - DeferredResult> result = new DeferredResult>(30*60*1000L); - result.onTimeout(()->{ - logger.warn("设备[{}]通道信息同步超时", deviceId); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setId(uuid); - WVPResult wvpResult = new WVPResult<>(); - wvpResult.setCode(-1); - wvpResult.setData(device); - wvpResult.setMsg("更新超时"); - msg.setData(wvpResult); - resultHolder.invokeAllResult(msg); - - }); - // 等待其他相同请求返回时一起返回 - if (resultHolder.exist(key, null)) { - resultHolder.put(key, uuid, result); - return result; - }else { - cmder.catalogQuery(device, event -> { - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setId(uuid); - WVPResult wvpResult = new WVPResult<>(); - wvpResult.setCode(-1); - wvpResult.setData(device); - wvpResult.setMsg(String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg)); - msg.setData(wvpResult); - resultHolder.invokeAllResult(msg); - }); - resultHolder.put(key, uuid, result); - return result; + SyncStatus syncStatus = deviceService.getChannelSyncStatus(deviceId); + // 已存在则返回进度 + if (syncStatus != null && syncStatus.getErrorMsg() == null) { + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(0); + wvpResult.setData(syncStatus); + return wvpResult; } + SyncStatus syncStatusReady = new SyncStatus(); + deviceService.setChannelSyncReady(deviceId); + cmder.catalogQuery(device, event -> { + String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); + deviceService.setChannelSyncEnd(deviceId, errorMsg); + }); + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(0); + wvpResult.setMsg("开始同步"); + return wvpResult; } /** @@ -468,4 +451,22 @@ public class DeviceQuery { public WVPResult> tree(@PathVariable String deviceId) { return WVPResult.Data(storager.tree(deviceId)); } + + @GetMapping("/{deviceId}/sync_status") + @ApiOperation(value = "获取通道同步进度", notes = "获取通道同步进度") + public WVPResult getSyncStatus(@PathVariable String deviceId) { + SyncStatus channelSyncStatus = deviceService.getChannelSyncStatus(deviceId); + WVPResult wvpResult = new WVPResult<>(); + if (channelSyncStatus == null) { + wvpResult.setCode(-1); + wvpResult.setMsg("同步尚未开始"); + }else { + wvpResult.setCode(0); + wvpResult.setData(channelSyncStatus); + if (channelSyncStatus.getErrorMsg() != null) { + wvpResult.setMsg(channelSyncStatus.getErrorMsg()); + } + } + return wvpResult; + } } diff --git a/web_src/src/components/DeviceList.vue b/web_src/src/components/DeviceList.vue index 8a156d5b..4f8b4606 100644 --- a/web_src/src/components/DeviceList.vue +++ b/web_src/src/components/DeviceList.vue @@ -57,7 +57,7 @@