diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index efc1d91d..dfb7a5d8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.response.ISIPResponseProcessor; -import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor; import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -13,6 +12,7 @@ import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.sip.*; +import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.util.Map; @@ -27,9 +27,8 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class SIPProcessorObserver implements ISIPProcessorObserver { - private static Map requestProcessorMap = new ConcurrentHashMap<>(); - private static Map responseProcessorMap = new ConcurrentHashMap<>(); - private static ITimeoutProcessor timeoutProcessor; + private static final Map requestProcessorMap = new ConcurrentHashMap<>(); + private static final Map responseProcessorMap = new ConcurrentHashMap<>(); @Autowired private SipSubscribe sipSubscribe; @@ -55,14 +54,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { responseProcessorMap.put(method, processor); } - /** - * 添加 超时事件订阅 - * @param processor 处理程序 - */ - public void addTimeoutProcessor(ITimeoutProcessor processor) { - timeoutProcessor = processor; - } - /** * 分发RequestEvent事件 * @param requestEvent RequestEvent事件 @@ -95,8 +86,9 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && !sipSubscribe.isEmpty() ) { CallIdHeader callIdHeader = response.getCallIdHeader(); + CSeqHeader cSeqHeader = response.getCSeqHeader(); if (callIdHeader != null) { - SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); + SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId() + cSeqHeader.getSeqNumber()); if (sipEvent != null) { if (sipEvent.getOkEvent() != null) { SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(responseEvent); @@ -117,9 +109,10 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { } else { log.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()); if (responseEvent.getResponse() != null && !sipSubscribe.isEmpty() ) { - CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); + CallIdHeader callIdHeader = response.getCallIdHeader(); + CSeqHeader cSeqHeader = response.getCSeqHeader(); if (callIdHeader != null) { - SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); + SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId() + cSeqHeader.getSeqNumber()); if (sipEvent != null ) { if (sipEvent.getErrorEvent() != null) { SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(responseEvent); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java index fcda7a72..f033f53e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java @@ -13,6 +13,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.sip.SipException; +import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; import javax.sip.header.UserAgentHeader; import javax.sip.header.ViaHeader; @@ -71,7 +72,9 @@ public class SIPSender { if (okEvent != null || errorEvent != null) { CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME); - SipEvent sipEvent = SipEvent.getInstance(callIdHeader.getCallId(), eventResult -> { + CSeqHeader cSeqHeader = (CSeqHeader) message.getHeader(CSeqHeader.NAME); + String key = callIdHeader.getCallId() + cSeqHeader.getSeqNumber(); + SipEvent sipEvent = SipEvent.getInstance(key, eventResult -> { sipSubscribe.removeSubscribe(callIdHeader.getCallId()); if(okEvent != null) { okEvent.response(eventResult); @@ -82,7 +85,7 @@ public class SIPSender { errorEvent.response(eventResult); } }), timeout == null ? sipConfig.getTimeout() : timeout); - sipSubscribe.addSubscribe(callIdHeader.getCallId(), sipEvent); + sipSubscribe.addSubscribe(key, sipEvent); } if ("TCP".equals(transport)) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java index 237eab16..6d5cadda 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java @@ -24,6 +24,7 @@ import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; +import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; @@ -69,6 +70,7 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement // logger.info("接收到消息:" + evt.getRequest()); String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); CallIdHeader callIdHeader = sipRequest.getCallIdHeader(); + CSeqHeader cSeqHeader = sipRequest.getCSeqHeader(); // 先从会话内查找 SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callIdHeader.getCallId()); // 兼容海康 媒体通知 消息from字段不是设备ID的问题 @@ -94,7 +96,7 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement // 不存在则回复404 responseAck(request, Response.NOT_FOUND, "device "+ deviceId +" not found"); log.warn("[设备未找到 ]deviceId: {}, callId: {}", deviceId, callIdHeader.getCallId()); - SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); + SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId() + cSeqHeader.getSeqNumber()); if (sipEvent != null && sipEvent.getErrorEvent() != null){ DeviceNotFoundEvent deviceNotFoundEvent = new DeviceNotFoundEvent(evt.getDialog()); deviceNotFoundEvent.setCallId(callIdHeader.getCallId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/ITimeoutProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/ITimeoutProcessor.java deleted file mode 100755 index e0bb1f89..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/ITimeoutProcessor.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.transmit.event.timeout; - -import javax.sip.TimeoutEvent; - -public interface ITimeoutProcessor { - void process(TimeoutEvent event); -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/impl/TimeoutProcessorImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/impl/TimeoutProcessorImpl.java deleted file mode 100755 index c36a2e54..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/impl/TimeoutProcessorImpl.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.transmit.event.timeout.impl; - -import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; -import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; -import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; -import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.sip.TimeoutEvent; -import javax.sip.header.CallIdHeader; - -@Slf4j -@Component -public class TimeoutProcessorImpl implements InitializingBean, ITimeoutProcessor { - - @Autowired - private SIPProcessorObserver processorObserver; - - @Autowired - private SipSubscribe sipSubscribe; - - @Override - public void afterPropertiesSet() throws Exception { - processorObserver.addTimeoutProcessor(this); - } - - @Override - public void process(TimeoutEvent event) { - try { - // TODO Auto-generated method stub - CallIdHeader callIdHeader = event.getClientTransaction().getDialog().getCallId(); - String callId = callIdHeader.getCallId(); - SipEvent sipEvent = sipSubscribe.getSubscribe(callId); - if (sipEvent != null && sipEvent.getErrorEvent() != null) { - SipSubscribe.EventResult timeoutEventEventResult = new SipSubscribe.EventResult<>(event); - sipEvent.getErrorEvent().response(timeoutEventEventResult); - sipSubscribe.removeSubscribe(callId); - } - } catch (Exception e) { - log.error("[超时事件失败]: {}", e.getMessage()); - } - } -}