优化SIP消息回复判定条件,修复目录推送BUG #1725

master
648540858 2025-01-03 17:21:09 +08:00
parent cb7c41c3c1
commit 5cf5c506b1
5 changed files with 16 additions and 71 deletions

View File

@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.ISIPResponseProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.response.ISIPResponseProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -13,6 +12,7 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.*; import javax.sip.*;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.util.Map; import java.util.Map;
@ -27,9 +27,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Component @Component
public class SIPProcessorObserver implements ISIPProcessorObserver { public class SIPProcessorObserver implements ISIPProcessorObserver {
private static Map<String, ISIPRequestProcessor> requestProcessorMap = new ConcurrentHashMap<>(); private static final Map<String, ISIPRequestProcessor> requestProcessorMap = new ConcurrentHashMap<>();
private static Map<String, ISIPResponseProcessor> responseProcessorMap = new ConcurrentHashMap<>(); private static final Map<String, ISIPResponseProcessor> responseProcessorMap = new ConcurrentHashMap<>();
private static ITimeoutProcessor timeoutProcessor;
@Autowired @Autowired
private SipSubscribe sipSubscribe; private SipSubscribe sipSubscribe;
@ -55,14 +54,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
responseProcessorMap.put(method, processor); responseProcessorMap.put(method, processor);
} }
/**
*
* @param processor
*/
public void addTimeoutProcessor(ITimeoutProcessor processor) {
timeoutProcessor = processor;
}
/** /**
* RequestEvent * RequestEvent
* @param requestEvent RequestEvent * @param requestEvent RequestEvent
@ -95,8 +86,9 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && !sipSubscribe.isEmpty() ) { if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && !sipSubscribe.isEmpty() ) {
CallIdHeader callIdHeader = response.getCallIdHeader(); CallIdHeader callIdHeader = response.getCallIdHeader();
CSeqHeader cSeqHeader = response.getCSeqHeader();
if (callIdHeader != null) { if (callIdHeader != null) {
SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId() + cSeqHeader.getSeqNumber());
if (sipEvent != null) { if (sipEvent != null) {
if (sipEvent.getOkEvent() != null) { if (sipEvent.getOkEvent() != null) {
SipSubscribe.EventResult<ResponseEvent> eventResult = new SipSubscribe.EventResult<>(responseEvent); SipSubscribe.EventResult<ResponseEvent> eventResult = new SipSubscribe.EventResult<>(responseEvent);
@ -117,9 +109,10 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
} else { } else {
log.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase()); log.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase());
if (responseEvent.getResponse() != null && !sipSubscribe.isEmpty() ) { if (responseEvent.getResponse() != null && !sipSubscribe.isEmpty() ) {
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = response.getCallIdHeader();
CSeqHeader cSeqHeader = response.getCSeqHeader();
if (callIdHeader != null) { if (callIdHeader != null) {
SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId() + cSeqHeader.getSeqNumber());
if (sipEvent != null ) { if (sipEvent != null ) {
if (sipEvent.getErrorEvent() != null) { if (sipEvent.getErrorEvent() != null) {
SipSubscribe.EventResult<ResponseEvent> eventResult = new SipSubscribe.EventResult<>(responseEvent); SipSubscribe.EventResult<ResponseEvent> eventResult = new SipSubscribe.EventResult<>(responseEvent);

View File

@ -13,6 +13,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.header.UserAgentHeader; import javax.sip.header.UserAgentHeader;
import javax.sip.header.ViaHeader; import javax.sip.header.ViaHeader;
@ -71,7 +72,9 @@ public class SIPSender {
if (okEvent != null || errorEvent != null) { if (okEvent != null || errorEvent != null) {
CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME);
SipEvent sipEvent = SipEvent.getInstance(callIdHeader.getCallId(), eventResult -> { CSeqHeader cSeqHeader = (CSeqHeader) message.getHeader(CSeqHeader.NAME);
String key = callIdHeader.getCallId() + cSeqHeader.getSeqNumber();
SipEvent sipEvent = SipEvent.getInstance(key, eventResult -> {
sipSubscribe.removeSubscribe(callIdHeader.getCallId()); sipSubscribe.removeSubscribe(callIdHeader.getCallId());
if(okEvent != null) { if(okEvent != null) {
okEvent.response(eventResult); okEvent.response(eventResult);
@ -82,7 +85,7 @@ public class SIPSender {
errorEvent.response(eventResult); errorEvent.response(eventResult);
} }
}), timeout == null ? sipConfig.getTimeout() : timeout); }), timeout == null ? sipConfig.getTimeout() : timeout);
sipSubscribe.addSubscribe(callIdHeader.getCallId(), sipEvent); sipSubscribe.addSubscribe(key, sipEvent);
} }
if ("TCP".equals(transport)) { if ("TCP".equals(transport)) {

View File

@ -24,6 +24,7 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
@ -69,6 +70,7 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
// logger.info("接收到消息:" + evt.getRequest()); // logger.info("接收到消息:" + evt.getRequest());
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
CallIdHeader callIdHeader = sipRequest.getCallIdHeader(); CallIdHeader callIdHeader = sipRequest.getCallIdHeader();
CSeqHeader cSeqHeader = sipRequest.getCSeqHeader();
// 先从会话内查找 // 先从会话内查找
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callIdHeader.getCallId()); SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callIdHeader.getCallId());
// 兼容海康 媒体通知 消息from字段不是设备ID的问题 // 兼容海康 媒体通知 消息from字段不是设备ID的问题
@ -94,7 +96,7 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
// 不存在则回复404 // 不存在则回复404
responseAck(request, Response.NOT_FOUND, "device "+ deviceId +" not found"); responseAck(request, Response.NOT_FOUND, "device "+ deviceId +" not found");
log.warn("[设备未找到 ]deviceId: {}, callId: {}", deviceId, callIdHeader.getCallId()); log.warn("[设备未找到 ]deviceId: {}, callId: {}", deviceId, callIdHeader.getCallId());
SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId() + cSeqHeader.getSeqNumber());
if (sipEvent != null && sipEvent.getErrorEvent() != null){ if (sipEvent != null && sipEvent.getErrorEvent() != null){
DeviceNotFoundEvent deviceNotFoundEvent = new DeviceNotFoundEvent(evt.getDialog()); DeviceNotFoundEvent deviceNotFoundEvent = new DeviceNotFoundEvent(evt.getDialog());
deviceNotFoundEvent.setCallId(callIdHeader.getCallId()); deviceNotFoundEvent.setCallId(callIdHeader.getCallId());

View File

@ -1,7 +0,0 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.timeout;
import javax.sip.TimeoutEvent;
public interface ITimeoutProcessor {
void process(TimeoutEvent event);
}

View File

@ -1,46 +0,0 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.timeout.impl;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.TimeoutEvent;
import javax.sip.header.CallIdHeader;
@Slf4j
@Component
public class TimeoutProcessorImpl implements InitializingBean, ITimeoutProcessor {
@Autowired
private SIPProcessorObserver processorObserver;
@Autowired
private SipSubscribe sipSubscribe;
@Override
public void afterPropertiesSet() throws Exception {
processorObserver.addTimeoutProcessor(this);
}
@Override
public void process(TimeoutEvent event) {
try {
// TODO Auto-generated method stub
CallIdHeader callIdHeader = event.getClientTransaction().getDialog().getCallId();
String callId = callIdHeader.getCallId();
SipEvent sipEvent = sipSubscribe.getSubscribe(callId);
if (sipEvent != null && sipEvent.getErrorEvent() != null) {
SipSubscribe.EventResult<TimeoutEvent> timeoutEventEventResult = new SipSubscribe.EventResult<>(event);
sipEvent.getErrorEvent().response(timeoutEventEventResult);
sipSubscribe.removeSubscribe(callId);
}
} catch (Exception e) {
log.error("[超时事件失败]: {}", e.getMessage());
}
}
}