diff --git a/pom.xml b/pom.xml index a675c6ff..7cf44e76 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.genersoft wvp-pro - 2.6.7 + 2.6.8 web video platform 国标28181视频平台 ${project.packaging} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java index b3f144a7..0a8405b1 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java @@ -40,17 +40,20 @@ public class SipPlatformRunner implements CommandLineRunner { List parentPlatforms = storager.queryEnableParentPlatformList(true); for (ParentPlatform parentPlatform : parentPlatforms) { + + ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); + // 更新缓存 ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch(); parentPlatformCatch.setParentPlatform(parentPlatform); parentPlatformCatch.setId(parentPlatform.getServerGBId()); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - // 设置所有平台离线 - platformService.offline(parentPlatform, true); // 取消订阅 - sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ + sipCommanderForPlatform.unregister(parentPlatform, parentPlatformCatchOld.getSipTransactionInfo(), null, (eventResult)->{ platformService.login(parentPlatform); }); + // 设置所有平台离线 + platformService.offline(parentPlatform, true); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index 83b9c6af..9093970c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -191,6 +191,9 @@ public class Device { @Schema(description = "是否作为消息通道") private boolean asMessageChannel; + @Schema(description = "设备注册的事务信息") + private SipTransactionInfo sipTransactionInfo; + public String getDeviceId() { return deviceId; @@ -439,4 +442,12 @@ public class Device { public void setAsMessageChannel(boolean asMessageChannel) { this.asMessageChannel = asMessageChannel; } + + public SipTransactionInfo getSipTransactionInfo() { + return sipTransactionInfo; + } + + public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) { + this.sipTransactionInfo = sipTransactionInfo; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java index a53d26e4..6ff2fe32 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java @@ -16,6 +16,8 @@ public class ParentPlatformCatch { private ParentPlatform parentPlatform; + private SipTransactionInfo sipTransactionInfo; + public String getId() { return id; } @@ -55,4 +57,12 @@ public class ParentPlatformCatch { public void setCallId(String callId) { this.callId = callId; } + + public SipTransactionInfo getSipTransactionInfo() { + return sipTransactionInfo; + } + + public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) { + this.sipTransactionInfo = sipTransactionInfo; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java index 25f07f51..3352838f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java @@ -63,7 +63,7 @@ public class SipRunner implements CommandLineRunner { if (deviceService.expire(device)){ deviceService.offline(device.getDeviceId(), "注册已过期"); }else { - deviceService.online(device); + deviceService.online(device, null); } } // 重置cseq计数 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 0425356a..08d5b2e7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -18,14 +18,16 @@ public interface ISIPCommanderForPlatform { * @return */ void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; - void register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException; + + void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; + void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException; /** * 向上级平台注销 * @param parentPlatform * @return */ - void unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; + void unregister(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index 0fe11c01..86da5365 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -14,7 +14,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.DigestUtils; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.PeerUnavailableException; import javax.sip.address.Address; import javax.sip.address.SipURI; import javax.sip.header.*; @@ -22,7 +23,6 @@ import javax.sip.message.Request; import javax.validation.constraints.NotNull; import java.text.ParseException; import java.util.ArrayList; -import java.util.List; import java.util.UUID; /** @@ -45,7 +45,7 @@ public class SIPRequestHeaderPlarformProvider { @Autowired private IRedisCatchStorage redisCatchStorage; - public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, long CSeq, String fromTag, String viaTag, CallIdHeader callIdHeader, boolean isRegister) throws ParseException, InvalidArgumentException, PeerUnavailableException { + public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, long CSeq, String fromTag, String toTag, CallIdHeader callIdHeader, boolean isRegister) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; String sipAddress = parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort(); //请求行 @@ -53,7 +53,8 @@ public class SIPRequestHeaderPlarformProvider { parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); //via ArrayList viaHeaders = new ArrayList(); - ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getServerIP(), parentPlatform.getServerPort(), parentPlatform.getTransport(), viaTag); + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getServerIP(), + parentPlatform.getServerPort(), parentPlatform.getTransport(), SipUtils.getNewViaTag()); viaHeader.setRPort(); viaHeaders.add(viaHeader); //from @@ -63,7 +64,7 @@ public class SIPRequestHeaderPlarformProvider { //to SipURI toSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), sipConfig.getDomain()); Address toAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(toSipURI); - ToHeader toHeader = sipLayer.getSipFactory().createHeaderFactory().createToHeader(toAddress,null); + ToHeader toHeader = sipLayer.getSipFactory().createHeaderFactory().createToHeader(toAddress,toTag); //Forwards MaxForwardsHeader maxForwards = sipLayer.getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); @@ -85,11 +86,11 @@ public class SIPRequestHeaderPlarformProvider { return request; } - public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, String fromTag, String viaTag, - String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader, boolean isRegister) throws ParseException, PeerUnavailableException, InvalidArgumentException { + public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, String fromTag, String toTag, + WWWAuthenticateHeader www , CallIdHeader callIdHeader, boolean isRegister) throws ParseException, PeerUnavailableException, InvalidArgumentException { - Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader, isRegister); + Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, toTag, callIdHeader, isRegister); SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); if (www == null) { AuthorizationHeader authorizationHeader = sipLayer.getSipFactory().createHeaderFactory().createAuthorizationHeader("Digest"); @@ -107,8 +108,6 @@ public class SIPRequestHeaderPlarformProvider { // qop 保护质量 包含auth(默认的)和auth-int(增加了报文完整性检测)两种策略 String qop = www.getQop(); - callIdHeader.setCallId(callId); - String cNonce = null; String nc = "00000001"; if (qop != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index dc5c31ad..af31818d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -75,20 +75,40 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } @Override - public void unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { - register(parentPlatform, null, null, errorEvent, okEvent, false, false); + public void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { + + register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false, true); } @Override - public void register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www, + public void unregister(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { + register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false, false); + } + + @Override + public void register(ParentPlatform parentPlatform, @Nullable SipTransactionInfo sipTransactionInfo, @Nullable WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException { Request request; - if (!registerAgain ) { - CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); + CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); + String fromTag = SipUtils.getNewFromTag(); + String toTag = null; + if (sipTransactionInfo != null ) { + if (sipTransactionInfo.getCallId() != null) { + callIdHeader.setCallId(sipTransactionInfo.getCallId()); + } + if (sipTransactionInfo.getFromTag() != null) { + fromTag = sipTransactionInfo.getFromTag(); + } + if (sipTransactionInfo.getToTag() != null) { + toTag = sipTransactionInfo.getToTag(); + } + } + + if (!registerAgain ) { request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, - redisCatchStorage.getCSEQ(), SipUtils.getNewFromTag(), - SipUtils.getNewViaTag(), callIdHeader, isRegister); + redisCatchStorage.getCSEQ(), fromTag, + toTag, callIdHeader, isRegister); // 将 callid 写入缓存, 等注册成功可以更新状态 String callIdFromHeader = callIdHeader.getCallId(); redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister)); @@ -106,8 +126,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { }); }else { - CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); - request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, SipUtils.getNewFromTag(), null, callId, www, callIdHeader, isRegister); + request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister); } sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, null, okEvent); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index d307941f..72da6608 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import com.genersoft.iot.vmp.gb28181.bean.WvpSipDate; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; @@ -18,6 +19,7 @@ import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.header.SIPDateHeader; import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -31,6 +33,7 @@ import javax.sip.header.AuthorizationHeader; import javax.sip.header.ContactHeader; import javax.sip.header.FromHeader; import javax.sip.header.ViaHeader; +import javax.sip.message.Request; import javax.sip.message.Response; import java.security.NoSuchAlgorithmException; import java.text.ParseException; @@ -102,6 +105,30 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen SipUri uri = (SipUri) address.getURI(); String deviceId = uri.getUser(); Device device = deviceService.getDevice(deviceId); + + RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, + userSetting.getSipUseSourceIpAsRemoteAddress()); + + if (device != null && + device.getSipTransactionInfo() != null && + request.getCallIdHeader().getCallId().equals(device.getSipTransactionInfo().getCallId())) { + logger.info("[注册请求] 注册续订: {}", device.getDeviceId()); + device.setExpires(request.getExpires().getExpires()); + device.setIp(remoteAddressInfo.getIp()); + device.setPort(remoteAddressInfo.getPort()); + device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort()))); + device.setLocalIp(request.getLocalAddress().getHostAddress()); + Response registerOkResponse = getRegisterOkResponse(request); + // 判断TCP还是UDP + ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); + String transport = reqViaHeader.getTransport(); + device.setTransport("TCP".equalsIgnoreCase(transport) ? "TCP" : "UDP"); + sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), registerOkResponse); + device.setRegisterTime(DateUtil.getNow()); + SipTransactionInfo sipTransactionInfo = new SipTransactionInfo((SIPResponse)registerOkResponse); + deviceService.online(device, sipTransactionInfo); + return; + } String password = (device != null && !ObjectUtils.isEmpty(device.getPassword()))? device.getPassword() : sipConfig.getPassword(); AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); if (authHead == null && !ObjectUtils.isEmpty(password)) { @@ -144,9 +171,6 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen // 添加Expires头 response.addHeader(request.getExpires()); - RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, - userSetting.getSipUseSourceIpAsRemoteAddress()); - if (device == null) { device = new Device(); device.setStreamMode("UDP"); @@ -179,7 +203,8 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen if (registerFlag) { logger.info("[注册成功] deviceId: {}->{}", deviceId, requestAddress); device.setRegisterTime(DateUtil.getNow()); - deviceService.online(device); + SipTransactionInfo sipTransactionInfo = new SipTransactionInfo((SIPResponse)response); + deviceService.online(device, sipTransactionInfo); } else { logger.info("[注销成功] deviceId: {}->{}" ,deviceId, requestAddress); deviceService.offline(deviceId, "主动注销"); @@ -188,4 +213,23 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen logger.error("未处理的异常 ", e); } } + + private Response getRegisterOkResponse(Request request) throws ParseException { + // 携带授权头并且密码正确 + Response response = getMessageFactory().createResponse(Response.OK, request); + // 添加date头 + SIPDateHeader dateHeader = new SIPDateHeader(); + // 使用自己修改的 + WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis()); + dateHeader.setDate(wvpSipDate); + response.addHeader(dateHeader); + + // 添加Contact头 + response.addHeader(request.getHeader(ContactHeader.NAME)); + // 添加Expires头 + response.addHeader(request.getExpires()); + + return response; + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java index 4ac83de7..b2dd76b5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java @@ -73,35 +73,38 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent String channelId = getText(rootElement, "DeviceID"); // 远程启动功能 if (!ObjectUtils.isEmpty(getText(rootElement, "TeleBoot"))) { - if (parentPlatform.getServerGBId().equals(targetGBId)) { - // 远程启动本平台:需要在重新启动程序后先对SipStack解绑 - logger.info("执行远程启动本平台命令"); - try { - cmderFroPlatform.unregister(parentPlatform, null, null); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage()); - } - taskExecutor.execute(() -> { - // 远程启动 -// try { -// Thread.sleep(3000); -// SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider"); -// SipStackImpl stack = (SipStackImpl)up.getSipStack(); -// stack.stop(); -// Iterator listener = stack.getListeningPoints(); -// while (listener.hasNext()) { -// stack.deleteListeningPoint((ListeningPoint) listener.next()); -// } -// Iterator providers = stack.getSipProviders(); -// while (providers.hasNext()) { -// stack.deleteSipProvider((SipProvider) providers.next()); -// } -// VManageBootstrap.restart(); -// } catch (InterruptedException | ObjectInUseException e) { -// logger.error("[任务执行失败] 服务重启: {}", e.getMessage()); -// } - }); - } + // TODO 拒绝远程启动命令 + logger.warn("[国标级联]收到平台的远程启动命令, 不处理"); + +// if (parentPlatform.getServerGBId().equals(targetGBId)) { +// // 远程启动本平台:需要在重新启动程序后先对SipStack解绑 +// logger.info("执行远程启动本平台命令"); +// try { +// cmderFroPlatform.unregister(parentPlatform, null, null); +// } catch (InvalidArgumentException | ParseException | SipException e) { +// logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage()); +// } +// taskExecutor.execute(() -> { +// // 远程启动 +//// try { +//// Thread.sleep(3000); +//// SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider"); +//// SipStackImpl stack = (SipStackImpl)up.getSipStack(); +//// stack.stop(); +//// Iterator listener = stack.getListeningPoints(); +//// while (listener.hasNext()) { +//// stack.deleteListeningPoint((ListeningPoint) listener.next()); +//// } +//// Iterator providers = stack.getSipProviders(); +//// while (providers.hasNext()) { +//// stack.deleteSipProvider((SipProvider) providers.next()); +//// } +//// VManageBootstrap.restart(); +//// } catch (InterruptedException | ObjectInUseException e) { +//// logger.error("[任务执行失败] 服务重启: {}", e.getMessage()); +//// } +// }); +// } } DeviceControlType deviceControlType = DeviceControlType.typeOf(rootElement); logger.info("[接受deviceControl命令] 命令: {}", deviceControlType); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index b6c27abc..865b6623 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -88,7 +88,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp // 对于已经离线的设备判断他的注册是否已经过期 if (!deviceService.expire(device)){ device.setOnline(0); - deviceService.online(device); + deviceService.online(device, null); } } // 刷新过期任务 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java index 7dee8dca..da4743f6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java @@ -71,7 +71,7 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen } String text = onlineElement.getText(); if ("ONLINE".equalsIgnoreCase(text.trim())) { - deviceService.online(device); + deviceService.online(device, null); }else { deviceService.offline(device.getDeviceId(), "设备状态查询结果:" + text.trim()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index 14d1f84b..0294ba27 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -10,6 +11,7 @@ import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -18,7 +20,6 @@ import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; -import javax.sip.header.CallIdHeader; import javax.sip.header.WWWAuthenticateHeader; import javax.sip.message.Response; import java.text.ParseException; @@ -65,9 +66,8 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { */ @Override public void process(ResponseEvent evt) { - Response response = evt.getResponse(); - CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME); - String callId = callIdHeader.getCallId(); + SIPResponse response = (SIPResponse)evt.getResponse(); + String callId = response.getCallIdHeader().getCallId(); PlatformRegisterInfo platformRegisterInfo = redisCatchStorage.queryPlatformRegisterInfo(callId); if (platformRegisterInfo == null) { logger.info(String.format("[国标级联]未找到callId: %s 的注册/注销平台id", callId )); @@ -90,15 +90,17 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { if (response.getStatusCode() == Response.UNAUTHORIZED) { WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME); + SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response); try { - sipCommanderForPlatform.register(parentPlatform, callId, www, null, null, true, platformRegisterInfo.isRegister()); + sipCommanderForPlatform.register(parentPlatform, sipTransactionInfo, www, null, null, true, platformRegisterInfo.isRegister()); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 再次注册: {}", e.getMessage()); } }else if (response.getStatusCode() == Response.OK){ if (platformRegisterInfo.isRegister()) { - platformService.online(parentPlatform); + SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response); + platformService.online(parentPlatform, sipTransactionInfo); }else { platformService.offline(parentPlatform, false); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index ca5bb13e..f2eed112 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -116,7 +116,7 @@ public class ZLMHttpHookListener { @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) { - logger.info("[ZLM HOOK] 收到zlm心跳:" + param.getMediaServerId()); +// logger.info("[ZLM HOOK] 收到zlm心跳:" + param.getMediaServerId()); taskExecutor.execute(() -> { List subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java index 5ef4ef45..d977c473 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.vmanager.bean.BaseTree; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo; @@ -18,7 +19,7 @@ public interface IDeviceService { * 设备上线 * @param device 设备信息 */ - void online(Device device); + void online(Device device, SipTransactionInfo sipTransactionInfo); /** * 设备下线 diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java index 12a03b47..e9cddff8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import com.github.pagehelper.PageInfo; /** @@ -35,7 +36,7 @@ public interface IPlatformService { * 平台上线 * @param parentPlatform 平台信息 */ - void online(ParentPlatform parentPlatform); + void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo); /** * 平台离线 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 91b4805e..a3dd6a7e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -89,7 +89,7 @@ public class DeviceServiceImpl implements IDeviceService { private IMediaServerService mediaServerService; @Override - public void online(Device device) { + public void online(Device device, SipTransactionInfo sipTransactionInfo) { logger.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId()); Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId()); @@ -104,6 +104,14 @@ public class DeviceServiceImpl implements IDeviceService { // 默认心跳间隔60 device.setKeepaliveIntervalTime(60); } + if (sipTransactionInfo != null) { + device.setSipTransactionInfo(sipTransactionInfo); + }else { + if (deviceInRedis != null) { + device.setSipTransactionInfo(deviceInRedis.getSipTransactionInfo()); + } + } + // 第一次上线 或则设备之前是离线状态--进行通道同步和设备信息查询 if (device.getCreateTime() == null) { device.setOnline(1); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index d7d9f6c0..a1735572 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -123,8 +123,10 @@ public class PlatformServiceImpl implements IPlatformService { @Override public boolean update(ParentPlatform parentPlatform) { + logger.info("[国标级联]更新平台 {}", parentPlatform.getDeviceGBId()); parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase()); ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId()); + ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId()); parentPlatform.setUpdateTime(DateUtil.getNow()); if (!parentPlatformOld.getTreeType().equals(parentPlatform.getTreeType())) { // 目录结构发生变化,清空之前的关联关系 @@ -134,6 +136,7 @@ public class PlatformServiceImpl implements IPlatformService { platformGbStreamMapper.delByPlatformId(parentPlatformOld.getServerGBId()); } + // 停止心跳定时 final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId(); dynamicTask.stop(keepaliveTaskKey); @@ -142,9 +145,13 @@ public class PlatformServiceImpl implements IPlatformService { dynamicTask.stop(registerTaskKey); // 注销旧的 try { - commanderForPlatform.unregister(parentPlatformOld, null, eventResult -> { - logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId()); - }); + if (parentPlatformOld.isStatus()) { + logger.info("保存平台{}时发现救平台在线,发送注销命令", parentPlatform.getDeviceGBId()); + commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> { + logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId()); + }); + } + } catch (InvalidArgumentException | ParseException | SipException e) { logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage()); } @@ -185,36 +192,36 @@ public class PlatformServiceImpl implements IPlatformService { @Override - public void online(ParentPlatform parentPlatform) { - logger.info("[国标级联]:{}, 平台上线/更新注册", parentPlatform.getServerGBId()); + public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) { + logger.info("[国标级联]:{}, 平台上线", parentPlatform.getServerGBId()); platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true); ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); - if (parentPlatformCatch != null) { - parentPlatformCatch.getParentPlatform().setStatus(true); - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - }else { + if (parentPlatformCatch == null) { parentPlatformCatch = new ParentPlatformCatch(); parentPlatformCatch.setParentPlatform(parentPlatform); parentPlatformCatch.setId(parentPlatform.getServerGBId()); parentPlatform.setStatus(true); parentPlatformCatch.setParentPlatform(parentPlatform); - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); } + parentPlatformCatch.getParentPlatform().setStatus(true); + parentPlatformCatch.setSipTransactionInfo(sipTransactionInfo); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); + final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); if (!dynamicTask.isAlive(registerTaskKey)) { + logger.info("[国标级联]:{}, 添加定时注册任务", parentPlatform.getServerGBId()); // 添加注册任务 dynamicTask.startCron(registerTaskKey, // 注册失败(注册成功时由程序直接调用了online方法) - ()-> { - registerTask(parentPlatform); - }, - (parentPlatform.getExpires()) *1000); + ()-> registerTask(parentPlatform, sipTransactionInfo), + parentPlatform.getExpires() * 1000); } final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); if (!dynamicTask.contains(keepaliveTaskKey)) { + logger.info("[国标级联]:{}, 添加定时心跳任务", parentPlatform.getServerGBId()); // 添加心跳任务 dynamicTask.startCron(keepaliveTaskKey, ()-> { @@ -259,7 +266,7 @@ public class PlatformServiceImpl implements IPlatformService { } } - private void registerTask(ParentPlatform parentPlatform){ + private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){ try { // 设置超时重发, 后续从底层支持消息重发 String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout"; @@ -267,10 +274,10 @@ public class PlatformServiceImpl implements IPlatformService { return; } dynamicTask.startDelay(key, ()->{ - registerTask(parentPlatform); + registerTask(parentPlatform, sipTransactionInfo); }, 1000); - logger.info("[国标级联] 平台:{}注册即将到期,重新注册", parentPlatform.getServerGBId()); - commanderForPlatform.register(parentPlatform, eventResult -> { + logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId()); + commanderForPlatform.register(parentPlatform, sipTransactionInfo, eventResult -> { dynamicTask.stop(key); offline(parentPlatform, false); },eventResult -> { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java index e233d1b1..1b2a8f3f 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -229,12 +230,16 @@ public class PlatformController { throw new ControllerException(ErrorCode.ERROR400); } ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(serverGBId); + ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(serverGBId); if (parentPlatform == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "平台不存在"); } + if (parentPlatformCatch == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "平台不存在"); + } // 发送离线消息,无论是否成功都删除缓存 try { - commanderForPlatform.unregister(parentPlatform, (event -> { + commanderForPlatform.unregister(parentPlatform, parentPlatformCatch.getSipTransactionInfo(), (event -> { // 清空redis缓存 redisCatchStorage.delPlatformCatchInfo(parentPlatform.getServerGBId()); redisCatchStorage.delPlatformKeepalive(parentPlatform.getServerGBId());