添加目录订阅消息与接口
parent
f1217682a9
commit
eb4716ba82
|
@ -0,0 +1,42 @@
|
|||
package com.genersoft.iot.vmp.conf;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
import org.springframework.scheduling.support.CronTrigger;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
/**
|
||||
* 动态定时任务
|
||||
*/
|
||||
@Component
|
||||
public class DynamicTask {
|
||||
|
||||
@Autowired
|
||||
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
|
||||
|
||||
private Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Bean
|
||||
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
|
||||
return new ThreadPoolTaskScheduler();
|
||||
}
|
||||
|
||||
public String startCron(String key, Runnable task, String corn) {
|
||||
stopCron(key);
|
||||
ScheduledFuture future = threadPoolTaskScheduler.schedule(task, new CronTrigger(corn));
|
||||
futureMap.put(key, future);
|
||||
return "startCron";
|
||||
}
|
||||
|
||||
public void stopCron(String key) {
|
||||
if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) {
|
||||
futureMap.get(key).cancel(true);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.request;
|
||||
|
||||
import javax.sip.RequestEvent;
|
||||
|
||||
/**
|
||||
* @description: 对SIP事件进行处理,包括request, response, timeout, ioException, transactionTerminated,dialogTerminated
|
||||
* @author: panlinlin
|
||||
* @date: 2021年11月5日 15:47
|
||||
*/
|
||||
public interface ISIPRequestProcessor {
|
||||
|
||||
void process(RequestEvent event);
|
||||
|
||||
}
|
|
@ -0,0 +1,179 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.request;
|
||||
|
||||
import gov.nist.javax.sip.SipProviderImpl;
|
||||
import gov.nist.javax.sip.SipStackImpl;
|
||||
import gov.nist.javax.sip.message.SIPRequest;
|
||||
import gov.nist.javax.sip.stack.SIPServerTransaction;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.Element;
|
||||
import org.dom4j.io.SAXReader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
|
||||
import javax.sip.*;
|
||||
import javax.sip.address.Address;
|
||||
import javax.sip.address.AddressFactory;
|
||||
import javax.sip.address.SipURI;
|
||||
import javax.sip.header.ContentTypeHeader;
|
||||
import javax.sip.header.HeaderFactory;
|
||||
import javax.sip.header.ViaHeader;
|
||||
import javax.sip.message.MessageFactory;
|
||||
import javax.sip.message.Request;
|
||||
import javax.sip.message.Response;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.text.ParseException;
|
||||
|
||||
/**
|
||||
* @description:处理接收IPCamera发来的SIP协议请求消息
|
||||
* @author: songww
|
||||
* @date: 2020年5月3日 下午4:42:22
|
||||
*/
|
||||
public abstract class SIPRequestProcessorAbstract implements InitializingBean, ISIPRequestProcessor {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(SIPRequestProcessorAbstract.class);
|
||||
|
||||
@Autowired
|
||||
@Qualifier(value="tcpSipProvider")
|
||||
private SipProviderImpl tcpSipProvider;
|
||||
|
||||
@Autowired
|
||||
@Qualifier(value="udpSipProvider")
|
||||
private SipProviderImpl udpSipProvider;
|
||||
|
||||
/**
|
||||
* 根据 RequestEvent 获取 ServerTransaction
|
||||
* @param evt
|
||||
* @return
|
||||
*/
|
||||
public ServerTransaction getServerTransaction(RequestEvent evt) {
|
||||
Request request = evt.getRequest();
|
||||
ServerTransaction serverTransaction = evt.getServerTransaction();
|
||||
// 判断TCP还是UDP
|
||||
boolean isTcp = false;
|
||||
ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
|
||||
String transport = reqViaHeader.getTransport();
|
||||
if (transport.equals("TCP")) {
|
||||
isTcp = true;
|
||||
}
|
||||
|
||||
if (serverTransaction == null) {
|
||||
try {
|
||||
if (isTcp) {
|
||||
SipStackImpl stack = (SipStackImpl)tcpSipProvider.getSipStack();
|
||||
serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true);
|
||||
if (serverTransaction == null) {
|
||||
serverTransaction = tcpSipProvider.getNewServerTransaction(request);
|
||||
}
|
||||
} else {
|
||||
SipStackImpl stack = (SipStackImpl)udpSipProvider.getSipStack();
|
||||
serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true);
|
||||
if (serverTransaction == null) {
|
||||
serverTransaction = udpSipProvider.getNewServerTransaction(request);
|
||||
}
|
||||
}
|
||||
} catch (TransactionAlreadyExistsException e) {
|
||||
logger.error(e.getMessage());
|
||||
} catch (TransactionUnavailableException e) {
|
||||
logger.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
return serverTransaction;
|
||||
}
|
||||
|
||||
public AddressFactory getAddressFactory() {
|
||||
try {
|
||||
return SipFactory.getInstance().createAddressFactory();
|
||||
} catch (PeerUnavailableException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public HeaderFactory getHeaderFactory() {
|
||||
try {
|
||||
return SipFactory.getInstance().createHeaderFactory();
|
||||
} catch (PeerUnavailableException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public MessageFactory getMessageFactory() {
|
||||
try {
|
||||
return SipFactory.getInstance().createMessageFactory();
|
||||
} catch (PeerUnavailableException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/***
|
||||
* 回复状态码
|
||||
* 100 trying
|
||||
* 200 OK
|
||||
* 400
|
||||
* 404
|
||||
* @param evt
|
||||
* @throws SipException
|
||||
* @throws InvalidArgumentException
|
||||
* @throws ParseException
|
||||
*/
|
||||
public void responseAck(RequestEvent evt, int statusCode) throws SipException, InvalidArgumentException, ParseException {
|
||||
Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
|
||||
ServerTransaction serverTransaction = getServerTransaction(evt);
|
||||
serverTransaction.sendResponse(response);
|
||||
if (statusCode >= 200) {
|
||||
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
|
||||
}
|
||||
}
|
||||
|
||||
public void responseAck(RequestEvent evt, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException {
|
||||
Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
|
||||
response.setReasonPhrase(msg);
|
||||
ServerTransaction serverTransaction = getServerTransaction(evt);
|
||||
serverTransaction.sendResponse(response);
|
||||
if (statusCode >= 200) {
|
||||
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 回复带sdp的200
|
||||
* @param evt
|
||||
* @param sdp
|
||||
* @throws SipException
|
||||
* @throws InvalidArgumentException
|
||||
* @throws ParseException
|
||||
*/
|
||||
public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
|
||||
Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
|
||||
SipFactory sipFactory = SipFactory.getInstance();
|
||||
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
|
||||
response.setContent(sdp, contentTypeHeader);
|
||||
|
||||
SipURI sipURI = (SipURI)evt.getRequest().getRequestURI();
|
||||
|
||||
Address concatAddress = sipFactory.createAddressFactory().createAddress(
|
||||
sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort()
|
||||
));
|
||||
response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
|
||||
getServerTransaction(evt).sendResponse(response);
|
||||
}
|
||||
|
||||
public Element getRootElement(RequestEvent evt) throws DocumentException {
|
||||
return getRootElement(evt, "gb2312");
|
||||
}
|
||||
public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
|
||||
if (charset == null) charset = "gb2312";
|
||||
Request request = evt.getRequest();
|
||||
SAXReader reader = new SAXReader();
|
||||
reader.setEncoding(charset);
|
||||
Document xml = reader.read(new ByteArrayInputStream(request.getRawContent()));
|
||||
return xml.getRootElement();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
|
||||
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.Dialog;
|
||||
import javax.sip.DialogState;
|
||||
import javax.sip.RequestEvent;
|
||||
import javax.sip.address.SipURI;
|
||||
import javax.sip.header.FromHeader;
|
||||
import javax.sip.header.HeaderAddress;
|
||||
import javax.sip.header.ToHeader;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @description:ACK请求处理器
|
||||
* @author: swwheihei
|
||||
* @date: 2020年5月3日 下午5:31:45
|
||||
*/
|
||||
@Component
|
||||
public class AckRequestProcessor extends SIPRequestProcessorAbstract {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class);
|
||||
private String method = "ACK";
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addRequestProcessor(method, this);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private IRedisCatchStorage redisCatchStorage;
|
||||
|
||||
@Autowired
|
||||
private ZLMRTPServerFactory zlmrtpServerFactory;
|
||||
|
||||
@Autowired
|
||||
private IMediaServerService mediaServerService;
|
||||
|
||||
|
||||
/**
|
||||
* 处理 ACK请求
|
||||
*
|
||||
* @param evt
|
||||
*/
|
||||
@Override
|
||||
public void process(RequestEvent evt) {
|
||||
Dialog dialog = evt.getDialog();
|
||||
if (dialog == null) return;
|
||||
if (dialog.getState()== DialogState.CONFIRMED) {
|
||||
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
|
||||
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
|
||||
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
|
||||
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
|
||||
String deviceId = sendRtpItem.getDeviceId();
|
||||
StreamInfo streamInfo = null;
|
||||
if (deviceId == null) {
|
||||
streamInfo = new StreamInfo();
|
||||
streamInfo.setApp(sendRtpItem.getApp());
|
||||
streamInfo.setStreamId(sendRtpItem.getStreamId());
|
||||
}else {
|
||||
streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
|
||||
sendRtpItem.setStreamId(streamInfo.getStreamId());
|
||||
streamInfo.setApp("rtp");
|
||||
}
|
||||
|
||||
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
||||
logger.info(platformGbId);
|
||||
logger.info(channelId);
|
||||
Map<String, Object> param = new HashMap<>();
|
||||
param.put("vhost","__defaultVhost__");
|
||||
param.put("app",streamInfo.getApp());
|
||||
param.put("stream",streamInfo.getStreamId());
|
||||
param.put("ssrc", sendRtpItem.getSsrc());
|
||||
param.put("dst_url",sendRtpItem.getIp());
|
||||
param.put("dst_port", sendRtpItem.getPort());
|
||||
param.put("is_udp", is_Udp);
|
||||
//param.put ("src_port", sendRtpItem.getLocalPort());
|
||||
// 设备推流查询,成功后才能转推
|
||||
boolean rtpPushed = false;
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (!rtpPushed) {
|
||||
try {
|
||||
if (System.currentTimeMillis() - startTime < 30 * 1000) {
|
||||
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||
if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) {
|
||||
rtpPushed = true;
|
||||
logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
|
||||
streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
|
||||
zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
|
||||
} else {
|
||||
logger.info("等待设备推流[{}/{}].......",
|
||||
streamInfo.getApp() ,streamInfo.getStreamId());
|
||||
Thread.sleep(1000);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
rtpPushed = true;
|
||||
logger.info("设备推流[{}/{}]超时,终止向上级推流",
|
||||
streamInfo.getApp() ,streamInfo.getStreamId());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
|
||||
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.*;
|
||||
import javax.sip.address.SipURI;
|
||||
import javax.sip.header.FromHeader;
|
||||
import javax.sip.header.HeaderAddress;
|
||||
import javax.sip.header.ToHeader;
|
||||
import javax.sip.message.Response;
|
||||
import java.text.ParseException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @description: BYE请求处理器
|
||||
* @author: lawrencehj
|
||||
* @date: 2021年3月9日
|
||||
*/
|
||||
@Component
|
||||
public class ByeRequestProcessor extends SIPRequestProcessorAbstract {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class);
|
||||
|
||||
@Autowired
|
||||
private ISIPCommander cmder;
|
||||
|
||||
@Autowired
|
||||
private IRedisCatchStorage redisCatchStorage;
|
||||
|
||||
@Autowired
|
||||
private IVideoManagerStorager storager;
|
||||
|
||||
@Autowired
|
||||
private ZLMRTPServerFactory zlmrtpServerFactory;
|
||||
|
||||
@Autowired
|
||||
private IMediaServerService mediaServerService;
|
||||
|
||||
private String method = "BYE";
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addRequestProcessor(method, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理BYE请求
|
||||
* @param evt
|
||||
*/
|
||||
@Override
|
||||
public void process(RequestEvent evt) {
|
||||
try {
|
||||
responseAck(evt, Response.OK);
|
||||
Dialog dialog = evt.getDialog();
|
||||
if (dialog == null) return;
|
||||
if (dialog.getState().equals(DialogState.TERMINATED)) {
|
||||
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
|
||||
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
|
||||
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
|
||||
logger.info("收到bye, [{}/{}]", platformGbId, channelId);
|
||||
if (sendRtpItem != null){
|
||||
String streamId = sendRtpItem.getStreamId();
|
||||
Map<String, Object> param = new HashMap<>();
|
||||
param.put("vhost","__defaultVhost__");
|
||||
param.put("app",sendRtpItem.getApp());
|
||||
param.put("stream",streamId);
|
||||
param.put("ssrc",sendRtpItem.getSsrc());
|
||||
logger.info("停止向上级推流:" + streamId);
|
||||
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
|
||||
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
|
||||
if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) {
|
||||
logger.info(streamId + "无其它观看者,通知设备停止推流");
|
||||
cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId);
|
||||
}
|
||||
}
|
||||
// 可能是设备主动停止
|
||||
Device device = storager.queryVideoDeviceByChannelId(platformGbId);
|
||||
if (device != null) {
|
||||
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
|
||||
if (streamInfo != null) {
|
||||
redisCatchStorage.stopPlay(streamInfo);
|
||||
}
|
||||
storager.stopPlay(device.getDeviceId(), channelId);
|
||||
mediaServerService.closeRTPServer(device, channelId);
|
||||
}
|
||||
}
|
||||
} catch (SipException e) {
|
||||
e.printStackTrace();
|
||||
} catch (InvalidArgumentException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.RequestEvent;
|
||||
|
||||
/**
|
||||
* @description:CANCEL请求处理器
|
||||
* @author: swwheihei
|
||||
* @date: 2020年5月3日 下午5:32:23
|
||||
*/
|
||||
@Component
|
||||
public class CancelRequestProcessor extends SIPRequestProcessorAbstract {
|
||||
|
||||
private String method = "CANCEL";
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addRequestProcessor(method, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理CANCEL请求
|
||||
*
|
||||
* @param evt 事件
|
||||
*/
|
||||
@Override
|
||||
public void process(RequestEvent evt) {
|
||||
// TODO 优先级99 Cancel Request消息实现,此消息一般为级联消息,上级给下级发送请求取消指令
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,386 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
|
||||
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.service.IPlayService;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
|
||||
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
|
||||
import gov.nist.javax.sip.address.AddressImpl;
|
||||
import gov.nist.javax.sip.address.SipUri;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sdp.*;
|
||||
import javax.sip.InvalidArgumentException;
|
||||
import javax.sip.RequestEvent;
|
||||
import javax.sip.ServerTransaction;
|
||||
import javax.sip.SipException;
|
||||
import javax.sip.address.SipURI;
|
||||
import javax.sip.header.FromHeader;
|
||||
import javax.sip.message.Request;
|
||||
import javax.sip.message.Response;
|
||||
import java.text.ParseException;
|
||||
import java.util.Vector;
|
||||
|
||||
/**
|
||||
* @description:处理INVITE请求
|
||||
* @author: panll
|
||||
* @date: 2021年1月14日
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Component
|
||||
public class InviteRequestProcessor extends SIPRequestProcessorAbstract {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class);
|
||||
|
||||
private String method = "INVITE";
|
||||
|
||||
@Autowired
|
||||
private SIPCommanderFroPlatform cmderFroPlatform;
|
||||
|
||||
@Autowired
|
||||
private IVideoManagerStorager storager;
|
||||
|
||||
@Autowired
|
||||
private IRedisCatchStorage redisCatchStorage;
|
||||
|
||||
@Autowired
|
||||
private SIPCommander cmder;
|
||||
|
||||
@Autowired
|
||||
private IPlayService playService;
|
||||
|
||||
@Autowired
|
||||
private ZLMRTPServerFactory zlmrtpServerFactory;
|
||||
|
||||
@Autowired
|
||||
private IMediaServerService mediaServerService;
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addRequestProcessor(method, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理invite请求
|
||||
*
|
||||
* @param evt
|
||||
* 请求消息
|
||||
*/
|
||||
@Override
|
||||
public void process(RequestEvent evt) {
|
||||
// Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
|
||||
try {
|
||||
Request request = evt.getRequest();
|
||||
SipURI sipURI = (SipURI) request.getRequestURI();
|
||||
String channelId = sipURI.getUser();
|
||||
String requesterId = null;
|
||||
|
||||
FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
|
||||
AddressImpl address = (AddressImpl) fromHeader.getAddress();
|
||||
SipUri uri = (SipUri) address.getURI();
|
||||
requesterId = uri.getUser();
|
||||
|
||||
if (requesterId == null || channelId == null) {
|
||||
logger.info("无法从FromHeader的Address中获取到平台id,返回400");
|
||||
responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误
|
||||
return;
|
||||
}
|
||||
|
||||
// 查询请求方是否上级平台
|
||||
ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
|
||||
if (platform != null) {
|
||||
// 查询平台下是否有该通道
|
||||
DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
|
||||
GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
|
||||
MediaServerItem mediaServerItem = null;
|
||||
// 不是通道可能是直播流
|
||||
if (channel != null && gbStream == null ) {
|
||||
if (channel.getStatus() == 0) {
|
||||
logger.info("通道离线,返回400");
|
||||
responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline");
|
||||
return;
|
||||
}
|
||||
responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
|
||||
}else if(channel == null && gbStream != null){
|
||||
String mediaServerId = gbStream.getMediaServerId();
|
||||
mediaServerItem = mediaServerService.getOne(mediaServerId);
|
||||
if (mediaServerItem == null) {
|
||||
logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId);
|
||||
responseAck(evt, Response.GONE, "media server not found");
|
||||
return;
|
||||
}
|
||||
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
|
||||
if (!streamReady ) {
|
||||
logger.info("[ app={}, stream={} ]通道离线,返回400",gbStream.getApp(), gbStream.getStream());
|
||||
responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
|
||||
return;
|
||||
}
|
||||
responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中
|
||||
}else {
|
||||
logger.info("通道不存在,返回404");
|
||||
responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在
|
||||
return;
|
||||
}
|
||||
// 解析sdp消息, 使用jainsip 自带的sdp解析方式
|
||||
String contentString = new String(request.getRawContent());
|
||||
|
||||
// jainSip不支持y=字段, 移除移除以解析。
|
||||
int ssrcIndex = contentString.indexOf("y=");
|
||||
//ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段
|
||||
String ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
|
||||
String substring = contentString.substring(0, contentString.indexOf("y="));
|
||||
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
|
||||
|
||||
// 获取支持的格式
|
||||
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
|
||||
// 查看是否支持PS 负载96
|
||||
//String ip = null;
|
||||
int port = -1;
|
||||
//boolean recvonly = false;
|
||||
boolean mediaTransmissionTCP = false;
|
||||
Boolean tcpActive = null;
|
||||
for (Object description : mediaDescriptions) {
|
||||
MediaDescription mediaDescription = (MediaDescription) description;
|
||||
Media media = mediaDescription.getMedia();
|
||||
|
||||
Vector mediaFormats = media.getMediaFormats(false);
|
||||
if (mediaFormats.contains("96")) {
|
||||
port = media.getMediaPort();
|
||||
//String mediaType = media.getMediaType();
|
||||
String protocol = media.getProtocol();
|
||||
|
||||
// 区分TCP发流还是udp, 当前默认udp
|
||||
if ("TCP/RTP/AVP".equals(protocol)) {
|
||||
String setup = mediaDescription.getAttribute("setup");
|
||||
if (setup != null) {
|
||||
mediaTransmissionTCP = true;
|
||||
if ("active".equals(setup)) {
|
||||
tcpActive = true;
|
||||
} else if ("passive".equals(setup)) {
|
||||
tcpActive = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (port == -1) {
|
||||
logger.info("不支持的媒体格式,返回415");
|
||||
// 回复不支持的格式
|
||||
responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
|
||||
return;
|
||||
}
|
||||
String username = sdp.getOrigin().getUsername();
|
||||
String addressStr = sdp.getOrigin().getAddress();
|
||||
//String sessionName = sdp.getSessionName().getValue();
|
||||
logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc);
|
||||
Device device = null;
|
||||
// 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
|
||||
if (channel != null) {
|
||||
device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
|
||||
if (device == null) {
|
||||
logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
|
||||
responseAck(evt, Response.SERVER_INTERNAL_ERROR);
|
||||
return;
|
||||
}
|
||||
mediaServerItem = playService.getNewMediaServerItem(device);
|
||||
if (mediaServerItem == null) {
|
||||
logger.warn("未找到可用的zlm");
|
||||
responseAck(evt, Response.BUSY_HERE);
|
||||
return;
|
||||
}
|
||||
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
||||
device.getDeviceId(), channelId,
|
||||
mediaTransmissionTCP);
|
||||
if (tcpActive != null) {
|
||||
sendRtpItem.setTcpActive(tcpActive);
|
||||
}
|
||||
if (sendRtpItem == null) {
|
||||
logger.warn("服务器端口资源不足");
|
||||
responseAck(evt, Response.BUSY_HERE);
|
||||
return;
|
||||
}
|
||||
|
||||
// 写入redis, 超时时回复
|
||||
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
||||
// 通知下级推流,
|
||||
PlayResult playResult = playService.play(mediaServerItem,device.getDeviceId(), channelId, (mediaServerItemInUSe, responseJSON)->{
|
||||
// 收到推流, 回复200OK, 等待ack
|
||||
// if (sendRtpItem == null) return;
|
||||
sendRtpItem.setStatus(1);
|
||||
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
||||
// TODO 添加对tcp的支持
|
||||
|
||||
StringBuffer content = new StringBuffer(200);
|
||||
content.append("v=0\r\n");
|
||||
content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n");
|
||||
content.append("s=Play\r\n");
|
||||
content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n");
|
||||
content.append("t=0 0\r\n");
|
||||
content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
|
||||
content.append("a=sendonly\r\n");
|
||||
content.append("a=rtpmap:96 PS/90000\r\n");
|
||||
content.append("y="+ ssrc + "\r\n");
|
||||
content.append("f=\r\n");
|
||||
|
||||
try {
|
||||
responseAck(evt, content.toString());
|
||||
} catch (SipException e) {
|
||||
e.printStackTrace();
|
||||
} catch (InvalidArgumentException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} ,((event) -> {
|
||||
// 未知错误。直接转发设备点播的错误
|
||||
Response response = null;
|
||||
try {
|
||||
response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
|
||||
ServerTransaction serverTransaction = getServerTransaction(evt);
|
||||
serverTransaction.sendResponse(response);
|
||||
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
|
||||
} catch (ParseException | SipException | InvalidArgumentException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}));
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(playResult.getResult().toString());
|
||||
}
|
||||
|
||||
}else if (gbStream != null) {
|
||||
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
||||
gbStream.getApp(), gbStream.getStream(), channelId,
|
||||
mediaTransmissionTCP);
|
||||
|
||||
if (tcpActive != null) {
|
||||
sendRtpItem.setTcpActive(tcpActive);
|
||||
}
|
||||
if (sendRtpItem == null) {
|
||||
logger.warn("服务器端口资源不足");
|
||||
responseAck(evt, Response.BUSY_HERE);
|
||||
return;
|
||||
}
|
||||
|
||||
// 写入redis, 超时时回复
|
||||
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
||||
|
||||
sendRtpItem.setStatus(1);
|
||||
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
||||
// TODO 添加对tcp的支持
|
||||
StringBuffer content = new StringBuffer(200);
|
||||
content.append("v=0\r\n");
|
||||
content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
|
||||
content.append("s=Play\r\n");
|
||||
content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
|
||||
content.append("t=0 0\r\n");
|
||||
content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
|
||||
content.append("a=sendonly\r\n");
|
||||
content.append("a=rtpmap:96 PS/90000\r\n");
|
||||
content.append("y="+ ssrc + "\r\n");
|
||||
content.append("f=\r\n");
|
||||
|
||||
try {
|
||||
responseAck(evt, content.toString());
|
||||
} catch (SipException e) {
|
||||
e.printStackTrace();
|
||||
} catch (InvalidArgumentException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
|
||||
Device device = storager.queryVideoDevice(requesterId);
|
||||
if (device != null) {
|
||||
logger.info("收到设备" + requesterId + "的语音广播Invite请求");
|
||||
responseAck(evt, Response.TRYING);
|
||||
|
||||
String contentString = new String(request.getRawContent());
|
||||
// jainSip不支持y=字段, 移除移除以解析。
|
||||
String substring = contentString;
|
||||
String ssrc = "0000000404";
|
||||
int ssrcIndex = contentString.indexOf("y=");
|
||||
if (ssrcIndex > 0) {
|
||||
substring = contentString.substring(0, ssrcIndex);
|
||||
ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
|
||||
}
|
||||
ssrcIndex = substring.indexOf("f=");
|
||||
if (ssrcIndex > 0) {
|
||||
substring = contentString.substring(0, ssrcIndex);
|
||||
}
|
||||
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
|
||||
|
||||
// 获取支持的格式
|
||||
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
|
||||
// 查看是否支持PS 负载96
|
||||
int port = -1;
|
||||
//boolean recvonly = false;
|
||||
boolean mediaTransmissionTCP = false;
|
||||
Boolean tcpActive = null;
|
||||
for (int i = 0; i < mediaDescriptions.size(); i++) {
|
||||
MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i);
|
||||
Media media = mediaDescription.getMedia();
|
||||
|
||||
Vector mediaFormats = media.getMediaFormats(false);
|
||||
if (mediaFormats.contains("8")) {
|
||||
port = media.getMediaPort();
|
||||
String protocol = media.getProtocol();
|
||||
// 区分TCP发流还是udp, 当前默认udp
|
||||
if ("TCP/RTP/AVP".equals(protocol)) {
|
||||
String setup = mediaDescription.getAttribute("setup");
|
||||
if (setup != null) {
|
||||
mediaTransmissionTCP = true;
|
||||
if ("active".equals(setup)) {
|
||||
tcpActive = true;
|
||||
} else if ("passive".equals(setup)) {
|
||||
tcpActive = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (port == -1) {
|
||||
logger.info("不支持的媒体格式,返回415");
|
||||
// 回复不支持的格式
|
||||
responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415
|
||||
return;
|
||||
}
|
||||
String username = sdp.getOrigin().getUsername();
|
||||
String addressStr = sdp.getOrigin().getAddress();
|
||||
logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc);
|
||||
|
||||
} else {
|
||||
logger.warn("来自无效设备/平台的请求");
|
||||
responseAck(evt, Response.BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||
e.printStackTrace();
|
||||
logger.warn("sdp解析错误");
|
||||
e.printStackTrace();
|
||||
} catch (SdpParseException e) {
|
||||
e.printStackTrace();
|
||||
} catch (SdpException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,384 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.conf.UserSetup;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
|
||||
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
|
||||
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
|
||||
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
|
||||
import com.genersoft.iot.vmp.utils.GpsUtil;
|
||||
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.Element;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.sip.InvalidArgumentException;
|
||||
import javax.sip.RequestEvent;
|
||||
import javax.sip.SipException;
|
||||
import javax.sip.header.FromHeader;
|
||||
import javax.sip.message.Response;
|
||||
import java.text.ParseException;
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* @description: Notify请求处理器
|
||||
* @author: lawrencehj
|
||||
* @date: 2021年1月27日
|
||||
*/
|
||||
@Component
|
||||
public class NotifyRequestProcessor extends SIPRequestProcessorAbstract {
|
||||
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class);
|
||||
|
||||
@Autowired
|
||||
private UserSetup userSetup;
|
||||
|
||||
@Autowired
|
||||
private IVideoManagerStorager storager;
|
||||
|
||||
@Autowired
|
||||
private IRedisCatchStorage redisCatchStorage;
|
||||
|
||||
@Autowired
|
||||
private EventPublisher publisher;
|
||||
|
||||
@Autowired
|
||||
private DeviceOffLineDetector offLineDetector;
|
||||
|
||||
private static final String NOTIFY_CATALOG = "Catalog";
|
||||
private static final String NOTIFY_ALARM = "Alarm";
|
||||
private static final String NOTIFY_MOBILE_POSITION = "MobilePosition";
|
||||
private String method = "NOTIFY";
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addRequestProcessor(method, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(RequestEvent evt) {
|
||||
try {
|
||||
Element rootElement = getRootElement(evt);
|
||||
String cmd = XmlUtil.getText(rootElement, "CmdType");
|
||||
|
||||
if (NOTIFY_CATALOG.equals(cmd)) {
|
||||
logger.info("接收到Catalog通知");
|
||||
processNotifyCatalogList(evt);
|
||||
} else if (NOTIFY_ALARM.equals(cmd)) {
|
||||
logger.info("接收到Alarm通知");
|
||||
processNotifyAlarm(evt);
|
||||
} else if (NOTIFY_MOBILE_POSITION.equals(cmd)) {
|
||||
logger.info("接收到MobilePosition通知");
|
||||
processNotifyMobilePosition(evt);
|
||||
} else {
|
||||
logger.info("接收到消息:" + cmd);
|
||||
responseAck(evt, Response.OK);
|
||||
}
|
||||
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理MobilePosition移动位置Notify
|
||||
*
|
||||
* @param evt
|
||||
*/
|
||||
private void processNotifyMobilePosition(RequestEvent evt) {
|
||||
try {
|
||||
// 回复 200 OK
|
||||
Element rootElement = getRootElement(evt);
|
||||
MobilePosition mobilePosition = new MobilePosition();
|
||||
Element deviceIdElement = rootElement.element("DeviceID");
|
||||
String deviceId = deviceIdElement.getTextTrim().toString();
|
||||
Device device = storager.queryVideoDevice(deviceId);
|
||||
if (device != null) {
|
||||
if (!StringUtils.isEmpty(device.getName())) {
|
||||
mobilePosition.setDeviceName(device.getName());
|
||||
}
|
||||
}
|
||||
mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID"));
|
||||
mobilePosition.setTime(XmlUtil.getText(rootElement, "Time"));
|
||||
mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
|
||||
mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
|
||||
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
|
||||
mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
|
||||
} else {
|
||||
mobilePosition.setSpeed(0.0);
|
||||
}
|
||||
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
|
||||
mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
|
||||
} else {
|
||||
mobilePosition.setDirection(0.0);
|
||||
}
|
||||
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
|
||||
mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
|
||||
} else {
|
||||
mobilePosition.setAltitude(0.0);
|
||||
}
|
||||
mobilePosition.setReportSource("Mobile Position");
|
||||
BaiduPoint bp = new BaiduPoint();
|
||||
bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
|
||||
logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat());
|
||||
mobilePosition.setGeodeticSystem("BD-09");
|
||||
mobilePosition.setCnLng(bp.getBdLng());
|
||||
mobilePosition.setCnLat(bp.getBdLat());
|
||||
if (!userSetup.getSavePositionHistory()) {
|
||||
storager.clearMobilePositionsByDeviceId(deviceId);
|
||||
}
|
||||
storager.insertMobilePosition(mobilePosition);
|
||||
responseAck(evt, Response.OK);
|
||||
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/***
|
||||
* 处理alarm设备报警Notify
|
||||
*
|
||||
* @param evt
|
||||
*/
|
||||
private void processNotifyAlarm(RequestEvent evt) {
|
||||
try {
|
||||
Element rootElement = getRootElement(evt);
|
||||
Element deviceIdElement = rootElement.element("DeviceID");
|
||||
String deviceId = deviceIdElement.getText().toString();
|
||||
|
||||
Device device = storager.queryVideoDevice(deviceId);
|
||||
if (device == null) {
|
||||
return;
|
||||
}
|
||||
rootElement = getRootElement(evt, device.getCharset());
|
||||
DeviceAlarm deviceAlarm = new DeviceAlarm();
|
||||
deviceAlarm.setDeviceId(deviceId);
|
||||
deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
|
||||
deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
|
||||
deviceAlarm.setAlarmTime(XmlUtil.getText(rootElement, "AlarmTime"));
|
||||
if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
|
||||
deviceAlarm.setAlarmDescription("");
|
||||
} else {
|
||||
deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
|
||||
}
|
||||
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
|
||||
deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
|
||||
} else {
|
||||
deviceAlarm.setLongitude(0.00);
|
||||
}
|
||||
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
|
||||
deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
|
||||
} else {
|
||||
deviceAlarm.setLatitude(0.00);
|
||||
}
|
||||
|
||||
if (deviceAlarm.getAlarmMethod().equals("4")) {
|
||||
MobilePosition mobilePosition = new MobilePosition();
|
||||
mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
|
||||
mobilePosition.setTime(deviceAlarm.getAlarmTime());
|
||||
mobilePosition.setLongitude(deviceAlarm.getLongitude());
|
||||
mobilePosition.setLatitude(deviceAlarm.getLatitude());
|
||||
mobilePosition.setReportSource("GPS Alarm");
|
||||
BaiduPoint bp = new BaiduPoint();
|
||||
bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
|
||||
logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat());
|
||||
mobilePosition.setGeodeticSystem("BD-09");
|
||||
mobilePosition.setCnLng(bp.getBdLng());
|
||||
mobilePosition.setCnLat(bp.getBdLat());
|
||||
if (!userSetup.getSavePositionHistory()) {
|
||||
storager.clearMobilePositionsByDeviceId(deviceId);
|
||||
}
|
||||
storager.insertMobilePosition(mobilePosition);
|
||||
}
|
||||
// TODO: 需要实现存储报警信息、报警分类
|
||||
|
||||
// 回复200 OK
|
||||
responseAck(evt, Response.OK);
|
||||
if (offLineDetector.isOnline(deviceId)) {
|
||||
publisher.deviceAlarmEventPublish(deviceAlarm);
|
||||
}
|
||||
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/***
|
||||
* 处理catalog设备目录列表Notify
|
||||
*
|
||||
* @param evt
|
||||
*/
|
||||
private void processNotifyCatalogList(RequestEvent evt) {
|
||||
try {
|
||||
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
|
||||
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
|
||||
|
||||
Element rootElement = getRootElement(evt);
|
||||
Element deviceIdElement = rootElement.element("DeviceID");
|
||||
String channelId = deviceIdElement.getText();
|
||||
Device device = storager.queryVideoDevice(deviceId);
|
||||
if (device == null) {
|
||||
return;
|
||||
}
|
||||
if (device != null ) {
|
||||
rootElement = getRootElement(evt, device.getCharset());
|
||||
}
|
||||
Element deviceListElement = rootElement.element("DeviceList");
|
||||
if (deviceListElement == null) {
|
||||
return;
|
||||
}
|
||||
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
|
||||
if (deviceListIterator != null) {
|
||||
|
||||
// 遍历DeviceList
|
||||
while (deviceListIterator.hasNext()) {
|
||||
Element itemDevice = deviceListIterator.next();
|
||||
Element channelDeviceElement = itemDevice.element("DeviceID");
|
||||
if (channelDeviceElement == null) {
|
||||
continue;
|
||||
}
|
||||
String channelDeviceId = channelDeviceElement.getTextTrim();
|
||||
Element channdelNameElement = itemDevice.element("Name");
|
||||
String channelName = channdelNameElement != null ? channdelNameElement.getTextTrim().toString() : "";
|
||||
Element statusElement = itemDevice.element("Status");
|
||||
String status = statusElement != null ? statusElement.getTextTrim().toString() : "ON";
|
||||
DeviceChannel deviceChannel = new DeviceChannel();
|
||||
deviceChannel.setName(channelName);
|
||||
deviceChannel.setChannelId(channelDeviceId);
|
||||
// ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理
|
||||
if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) {
|
||||
deviceChannel.setStatus(1);
|
||||
}
|
||||
if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) {
|
||||
deviceChannel.setStatus(0);
|
||||
}
|
||||
|
||||
deviceChannel.setManufacture(XmlUtil.getText(itemDevice, "Manufacturer"));
|
||||
deviceChannel.setModel(XmlUtil.getText(itemDevice, "Model"));
|
||||
deviceChannel.setOwner(XmlUtil.getText(itemDevice, "Owner"));
|
||||
deviceChannel.setCivilCode(XmlUtil.getText(itemDevice, "CivilCode"));
|
||||
deviceChannel.setBlock(XmlUtil.getText(itemDevice, "Block"));
|
||||
deviceChannel.setAddress(XmlUtil.getText(itemDevice, "Address"));
|
||||
if (XmlUtil.getText(itemDevice, "Parental") == null
|
||||
|| XmlUtil.getText(itemDevice, "Parental") == "") {
|
||||
deviceChannel.setParental(0);
|
||||
} else {
|
||||
deviceChannel.setParental(Integer.parseInt(XmlUtil.getText(itemDevice, "Parental")));
|
||||
}
|
||||
deviceChannel.setParentId(XmlUtil.getText(itemDevice, "ParentID"));
|
||||
if (XmlUtil.getText(itemDevice, "SafetyWay") == null
|
||||
|| XmlUtil.getText(itemDevice, "SafetyWay") == "") {
|
||||
deviceChannel.setSafetyWay(0);
|
||||
} else {
|
||||
deviceChannel.setSafetyWay(Integer.parseInt(XmlUtil.getText(itemDevice, "SafetyWay")));
|
||||
}
|
||||
if (XmlUtil.getText(itemDevice, "RegisterWay") == null
|
||||
|| XmlUtil.getText(itemDevice, "RegisterWay") == "") {
|
||||
deviceChannel.setRegisterWay(1);
|
||||
} else {
|
||||
deviceChannel.setRegisterWay(Integer.parseInt(XmlUtil.getText(itemDevice, "RegisterWay")));
|
||||
}
|
||||
deviceChannel.setCertNum(XmlUtil.getText(itemDevice, "CertNum"));
|
||||
if (XmlUtil.getText(itemDevice, "Certifiable") == null
|
||||
|| XmlUtil.getText(itemDevice, "Certifiable") == "") {
|
||||
deviceChannel.setCertifiable(0);
|
||||
} else {
|
||||
deviceChannel.setCertifiable(Integer.parseInt(XmlUtil.getText(itemDevice, "Certifiable")));
|
||||
}
|
||||
if (XmlUtil.getText(itemDevice, "ErrCode") == null
|
||||
|| XmlUtil.getText(itemDevice, "ErrCode") == "") {
|
||||
deviceChannel.setErrCode(0);
|
||||
} else {
|
||||
deviceChannel.setErrCode(Integer.parseInt(XmlUtil.getText(itemDevice, "ErrCode")));
|
||||
}
|
||||
deviceChannel.setEndTime(XmlUtil.getText(itemDevice, "EndTime"));
|
||||
deviceChannel.setSecrecy(XmlUtil.getText(itemDevice, "Secrecy"));
|
||||
deviceChannel.setIpAddress(XmlUtil.getText(itemDevice, "IPAddress"));
|
||||
if (XmlUtil.getText(itemDevice, "Port") == null || XmlUtil.getText(itemDevice, "Port") == "") {
|
||||
deviceChannel.setPort(0);
|
||||
} else {
|
||||
deviceChannel.setPort(Integer.parseInt(XmlUtil.getText(itemDevice, "Port")));
|
||||
}
|
||||
deviceChannel.setPassword(XmlUtil.getText(itemDevice, "Password"));
|
||||
if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {
|
||||
deviceChannel.setLongitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Longitude")));
|
||||
} else {
|
||||
deviceChannel.setLongitude(0.00);
|
||||
}
|
||||
if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Latitude"))) {
|
||||
deviceChannel.setLatitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Latitude")));
|
||||
} else {
|
||||
deviceChannel.setLatitude(0.00);
|
||||
}
|
||||
if (XmlUtil.getText(itemDevice, "PTZType") == null
|
||||
|| XmlUtil.getText(itemDevice, "PTZType") == "") {
|
||||
deviceChannel.setPTZType(0);
|
||||
} else {
|
||||
deviceChannel.setPTZType(Integer.parseInt(XmlUtil.getText(itemDevice, "PTZType")));
|
||||
}
|
||||
deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC
|
||||
storager.updateChannel(device.getDeviceId(), deviceChannel);
|
||||
}
|
||||
|
||||
// RequestMessage msg = new RequestMessage();
|
||||
// msg.setDeviceId(deviceId);
|
||||
// msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG);
|
||||
// msg.setData(device);
|
||||
// deferredResultHolder.invokeResult(msg);
|
||||
// 回复200 OK
|
||||
responseAck(evt, Response.OK);
|
||||
if (offLineDetector.isOnline(deviceId)) {
|
||||
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
|
||||
}
|
||||
}
|
||||
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
public void setCmder(SIPCommander cmder) {
|
||||
}
|
||||
|
||||
public void setStorager(IVideoManagerStorager storager) {
|
||||
this.storager = storager;
|
||||
}
|
||||
|
||||
public void setPublisher(EventPublisher publisher) {
|
||||
this.publisher = publisher;
|
||||
}
|
||||
|
||||
public void setRedis(RedisUtil redis) {
|
||||
}
|
||||
|
||||
public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
|
||||
}
|
||||
|
||||
public void setOffLineDetector(DeviceOffLineDetector offLineDetector) {
|
||||
this.offLineDetector = offLineDetector;
|
||||
}
|
||||
|
||||
public IRedisCatchStorage getRedisCatchStorage() {
|
||||
return redisCatchStorage;
|
||||
}
|
||||
|
||||
public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
|
||||
this.redisCatchStorage = redisCatchStorage;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,197 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.conf.SipConfig;
|
||||
import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper;
|
||||
import com.genersoft.iot.vmp.gb28181.auth.RegisterLogicHandler;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.WvpSipDate;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
|
||||
import gov.nist.javax.sip.RequestEventExt;
|
||||
import gov.nist.javax.sip.address.AddressImpl;
|
||||
import gov.nist.javax.sip.address.SipUri;
|
||||
import gov.nist.javax.sip.header.Expires;
|
||||
import gov.nist.javax.sip.header.SIPDateHeader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import javax.sip.InvalidArgumentException;
|
||||
import javax.sip.RequestEvent;
|
||||
import javax.sip.ServerTransaction;
|
||||
import javax.sip.SipException;
|
||||
import javax.sip.header.*;
|
||||
import javax.sip.message.Request;
|
||||
import javax.sip.message.Response;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Calendar;
|
||||
import java.util.Locale;
|
||||
|
||||
/**
|
||||
* @description:收到注册请求 处理
|
||||
* @author: swwheihei
|
||||
* @date: 2020年5月3日 下午4:47:25
|
||||
*/
|
||||
@Component
|
||||
public class RegisterRequestProcessor extends SIPRequestProcessorAbstract {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class);
|
||||
|
||||
public String method = "REGISTER";
|
||||
|
||||
@Autowired
|
||||
private SipConfig sipConfig;
|
||||
|
||||
@Autowired
|
||||
private RegisterLogicHandler handler;
|
||||
|
||||
@Autowired
|
||||
private IVideoManagerStorager storager;
|
||||
|
||||
@Autowired
|
||||
private EventPublisher publisher;
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addRequestProcessor(method, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 收到注册请求 处理
|
||||
* @param evt
|
||||
*/
|
||||
@Override
|
||||
public void process(RequestEvent evt) {
|
||||
try {
|
||||
RequestEventExt evtExt = (RequestEventExt)evt;
|
||||
String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort();
|
||||
logger.info("[{}] 收到注册请求,开始处理", requestAddress);
|
||||
Request request = evt.getRequest();
|
||||
|
||||
Response response = null;
|
||||
boolean passwordCorrect = false;
|
||||
// 注册标志 0:未携带授权头或者密码错误 1:注册成功 2:注销成功
|
||||
int registerFlag = 0;
|
||||
FromHeader fromHeader = (FromHeader) request.getHeader(FromHeader.NAME);
|
||||
AddressImpl address = (AddressImpl) fromHeader.getAddress();
|
||||
SipUri uri = (SipUri) address.getURI();
|
||||
String deviceId = uri.getUser();
|
||||
Device device = storager.queryVideoDevice(deviceId);
|
||||
AuthorizationHeader authorhead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
|
||||
// 校验密码是否正确
|
||||
if (authorhead != null) {
|
||||
passwordCorrect = new DigestServerAuthenticationHelper().doAuthenticatePlainTextPassword(request,
|
||||
sipConfig.getPassword());
|
||||
}
|
||||
if (StringUtils.isEmpty(sipConfig.getPassword())){
|
||||
passwordCorrect = true;
|
||||
}
|
||||
|
||||
// 未携带授权头或者密码错误 回复401
|
||||
if (authorhead == null ) {
|
||||
|
||||
logger.info("[{}] 未携带授权头 回复401", requestAddress);
|
||||
response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request);
|
||||
new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain());
|
||||
}else {
|
||||
if (!passwordCorrect){
|
||||
// 注册失败
|
||||
response = getMessageFactory().createResponse(Response.FORBIDDEN, request);
|
||||
response.setReasonPhrase("wrong password");
|
||||
logger.info("[{}] 密码/SIP服务器ID错误, 回复403", requestAddress);
|
||||
}else {
|
||||
// 携带授权头并且密码正确
|
||||
response = getMessageFactory().createResponse(Response.OK, request);
|
||||
// 添加date头
|
||||
SIPDateHeader dateHeader = new SIPDateHeader();
|
||||
// 使用自己修改的
|
||||
WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
|
||||
dateHeader.setDate(wvpSipDate);
|
||||
response.addHeader(dateHeader);
|
||||
|
||||
ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
|
||||
if (expiresHeader == null) {
|
||||
response = getMessageFactory().createResponse(Response.BAD_REQUEST, request);
|
||||
ServerTransaction serverTransaction = getServerTransaction(evt);
|
||||
serverTransaction.sendResponse(response);
|
||||
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
|
||||
return;
|
||||
}
|
||||
// 添加Contact头
|
||||
response.addHeader(request.getHeader(ContactHeader.NAME));
|
||||
// 添加Expires头
|
||||
response.addHeader(request.getExpires());
|
||||
|
||||
// 获取到通信地址等信息
|
||||
ViaHeader viaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
|
||||
String received = viaHeader.getReceived();
|
||||
int rPort = viaHeader.getRPort();
|
||||
// 解析本地地址替代
|
||||
if (StringUtils.isEmpty(received) || rPort == -1) {
|
||||
received = viaHeader.getHost();
|
||||
rPort = viaHeader.getPort();
|
||||
}
|
||||
//
|
||||
|
||||
if (device == null) {
|
||||
device = new Device();
|
||||
device.setStreamMode("UDP");
|
||||
device.setCharset("gb2312");
|
||||
device.setDeviceId(deviceId);
|
||||
device.setFirsRegister(true);
|
||||
}
|
||||
device.setIp(received);
|
||||
device.setPort(rPort);
|
||||
device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
|
||||
// 注销成功
|
||||
if (expiresHeader.getExpires() == 0) {
|
||||
registerFlag = 2;
|
||||
}
|
||||
// 注册成功
|
||||
else {
|
||||
device.setExpires(expiresHeader.getExpires());
|
||||
registerFlag = 1;
|
||||
// 判断TCP还是UDP
|
||||
boolean isTcp = false;
|
||||
ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
|
||||
String transport = reqViaHeader.getTransport();
|
||||
if (transport.equals("TCP")) {
|
||||
isTcp = true;
|
||||
}
|
||||
device.setTransport(isTcp ? "TCP" : "UDP");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ServerTransaction serverTransaction = getServerTransaction(evt);
|
||||
serverTransaction.sendResponse(response);
|
||||
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
|
||||
// 注册成功
|
||||
// 保存到redis
|
||||
// 下发catelog查询目录
|
||||
if (registerFlag == 1 ) {
|
||||
logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress);
|
||||
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER);
|
||||
// 重新注册更新设备和通道,以免设备替换或更新后信息无法更新
|
||||
handler.onRegister(device);
|
||||
} else if (registerFlag == 2) {
|
||||
logger.info("[{}] 注销成功! deviceId:" + device.getDeviceId(), requestAddress);
|
||||
publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER);
|
||||
}
|
||||
} catch (SipException | InvalidArgumentException | NoSuchAlgorithmException | ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.InvalidArgumentException;
|
||||
import javax.sip.RequestEvent;
|
||||
import javax.sip.ServerTransaction;
|
||||
import javax.sip.SipException;
|
||||
import javax.sip.header.ExpiresHeader;
|
||||
import javax.sip.message.Request;
|
||||
import javax.sip.message.Response;
|
||||
import java.text.ParseException;
|
||||
|
||||
/**
|
||||
* @description:SUBSCRIBE请求处理器
|
||||
* @author: swwheihei
|
||||
* @date: 2020年5月3日 下午5:31:20
|
||||
*/
|
||||
@Component
|
||||
public class SubscribeRequestProcessor extends SIPRequestProcessorAbstract {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(SubscribeRequestProcessor.class);
|
||||
private String method = "SUBSCRIBE";
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addRequestProcessor(method, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理SUBSCRIBE请求
|
||||
*
|
||||
* @param evt
|
||||
*/
|
||||
@Override
|
||||
public void process(RequestEvent evt) {
|
||||
Request request = evt.getRequest();
|
||||
|
||||
try {
|
||||
Response response = null;
|
||||
response = getMessageFactory().createResponse(200, request);
|
||||
if (response != null) {
|
||||
ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
|
||||
response.setExpires(expireHeader);
|
||||
}
|
||||
logger.info("response : " + response.toString());
|
||||
ServerTransaction transaction = getServerTransaction(evt);
|
||||
if (transaction != null) {
|
||||
transaction.sendResponse(response);
|
||||
transaction.getDialog().delete();
|
||||
transaction.terminate();
|
||||
} else {
|
||||
logger.info("processRequest serverTransactionId is null.");
|
||||
}
|
||||
|
||||
} catch (ParseException e) {
|
||||
e.printStackTrace();
|
||||
} catch (SipException e) {
|
||||
e.printStackTrace();
|
||||
} catch (InvalidArgumentException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.response;
|
||||
|
||||
import javax.sip.ResponseEvent;
|
||||
|
||||
/**
|
||||
* @description:处理接收IPCamera发来的SIP协议响应消息
|
||||
* @author: swwheihei
|
||||
* @date: 2020年5月3日 下午4:42:22
|
||||
*/
|
||||
public interface ISIPResponseProcessor {
|
||||
|
||||
void process(ResponseEvent evt);
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.conf.SipConfig;
|
||||
import com.genersoft.iot.vmp.gb28181.SipLayer;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.ResponseEvent;
|
||||
|
||||
/**
|
||||
* @description: BYE请求响应器
|
||||
* @author: swwheihei
|
||||
* @date: 2020年5月3日 下午5:32:05
|
||||
*/
|
||||
@Component
|
||||
public class ByeResponseProcessor extends SIPResponseProcessorAbstract {
|
||||
|
||||
private String method = "BYE";
|
||||
|
||||
@Autowired
|
||||
private SipLayer sipLayer;
|
||||
|
||||
@Autowired
|
||||
private SipConfig config;
|
||||
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addResponseProcessor(method, this);
|
||||
}
|
||||
/**
|
||||
* 处理BYE响应
|
||||
*
|
||||
* @param evt
|
||||
*/
|
||||
@Override
|
||||
public void process(ResponseEvent evt) {
|
||||
// TODO Auto-generated method stub
|
||||
System.out.println("收到bye");
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.conf.SipConfig;
|
||||
import com.genersoft.iot.vmp.gb28181.SipLayer;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.ResponseEvent;
|
||||
|
||||
/**
|
||||
* @description: CANCEL响应处理器
|
||||
* @author: panlinlin
|
||||
* @date: 2021年11月5日 16:35
|
||||
*/
|
||||
@Component
|
||||
public class CancelResponseProcessor extends SIPResponseProcessorAbstract {
|
||||
|
||||
private String method = "CANCEL";
|
||||
|
||||
@Autowired
|
||||
private SipLayer sipLayer;
|
||||
|
||||
@Autowired
|
||||
private SipConfig config;
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addResponseProcessor(method, this);
|
||||
}
|
||||
/**
|
||||
* 处理CANCEL响应
|
||||
*
|
||||
* @param evt
|
||||
*/
|
||||
@Override
|
||||
public void process(ResponseEvent evt) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.conf.SipConfig;
|
||||
import com.genersoft.iot.vmp.gb28181.SipLayer;
|
||||
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
|
||||
import gov.nist.javax.sip.ResponseEventExt;
|
||||
import gov.nist.javax.sip.stack.SIPDialog;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.InvalidArgumentException;
|
||||
import javax.sip.ResponseEvent;
|
||||
import javax.sip.SipException;
|
||||
import javax.sip.address.SipURI;
|
||||
import javax.sip.header.CSeqHeader;
|
||||
import javax.sip.message.Request;
|
||||
import javax.sip.message.Response;
|
||||
import java.text.ParseException;
|
||||
|
||||
|
||||
/**
|
||||
* @description: 处理INVITE响应
|
||||
* @author: panlinlin
|
||||
* @date: 2021年11月5日 16:40
|
||||
*/
|
||||
@Component
|
||||
public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class);
|
||||
private String method = "INVITE";
|
||||
|
||||
@Autowired
|
||||
private SipLayer sipLayer;
|
||||
|
||||
@Autowired
|
||||
private SipConfig config;
|
||||
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addResponseProcessor(method, this);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private VideoStreamSessionManager streamSession;
|
||||
|
||||
/**
|
||||
* 处理invite响应
|
||||
*
|
||||
* @param evt 响应消息
|
||||
* @throws ParseException
|
||||
*/
|
||||
@Override
|
||||
public void process(ResponseEvent evt ){
|
||||
try {
|
||||
Response response = evt.getResponse();
|
||||
int statusCode = response.getStatusCode();
|
||||
// trying不会回复
|
||||
if (statusCode == Response.TRYING) {
|
||||
}
|
||||
// 成功响应
|
||||
// 下发ack
|
||||
if (statusCode == Response.OK) {
|
||||
ResponseEventExt event = (ResponseEventExt)evt;
|
||||
SIPDialog dialog = (SIPDialog)evt.getDialog();
|
||||
CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
|
||||
Request reqAck = dialog.createAck(cseq.getSeqNumber());
|
||||
SipURI requestURI = (SipURI) reqAck.getRequestURI();
|
||||
try {
|
||||
requestURI.setHost(event.getRemoteIpAddress());
|
||||
} catch (ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
requestURI.setPort(event.getRemotePort());
|
||||
reqAck.setRequestURI(requestURI);
|
||||
logger.info("向 " + event.getRemoteIpAddress() + ":" + event.getRemotePort() + "回复ack");
|
||||
SipURI sipURI = (SipURI)dialog.getRemoteParty().getURI();
|
||||
String deviceId = requestURI.getUser();
|
||||
String channelId = sipURI.getUser();
|
||||
|
||||
dialog.sendAck(reqAck);
|
||||
|
||||
}
|
||||
} catch (InvalidArgumentException | SipException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.ResponseEvent;
|
||||
import javax.sip.header.CallIdHeader;
|
||||
import javax.sip.header.WWWAuthenticateHeader;
|
||||
import javax.sip.message.Response;
|
||||
|
||||
/**
|
||||
* @description:Register响应处理器
|
||||
* @author: swwheihei
|
||||
* @date: 2020年5月3日 下午5:32:23
|
||||
*/
|
||||
@Component
|
||||
public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class);
|
||||
private String method = "REGISTER";
|
||||
|
||||
@Autowired
|
||||
private ISIPCommanderForPlatform sipCommanderForPlatform;
|
||||
|
||||
@Autowired
|
||||
private IVideoManagerStorager storager;
|
||||
|
||||
@Autowired
|
||||
private IRedisCatchStorage redisCatchStorage;
|
||||
|
||||
@Autowired
|
||||
private SIPProcessorObserver sipProcessorObserver;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
sipProcessorObserver.addResponseProcessor(method, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理Register响应
|
||||
*
|
||||
* @param evt 事件
|
||||
*/
|
||||
@Override
|
||||
public void process(ResponseEvent evt) {
|
||||
Response response = evt.getResponse();
|
||||
CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME);
|
||||
String callId = callIdHeader.getCallId();
|
||||
|
||||
String platformGBId = redisCatchStorage.queryPlatformRegisterInfo(callId);
|
||||
if (platformGBId == null) {
|
||||
logger.info(String.format("未找到callId: %s 的注册/注销平台id", callId ));
|
||||
return;
|
||||
}
|
||||
|
||||
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId);
|
||||
if (parentPlatformCatch == null) {
|
||||
logger.warn(String.format("收到 %s 的注册/注销%S请求, 但是平台缓存信息未查询到!!!", platformGBId, response.getStatusCode()));
|
||||
return;
|
||||
}
|
||||
String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册";
|
||||
logger.info(String.format("收到 %s %s的%S响应", platformGBId, action, response.getStatusCode() ));
|
||||
ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform();
|
||||
if (parentPlatform == null) {
|
||||
logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode()));
|
||||
return;
|
||||
}
|
||||
|
||||
if (response.getStatusCode() == 401) {
|
||||
WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
|
||||
sipCommanderForPlatform.register(parentPlatform, callId, www, null, null);
|
||||
}else if (response.getStatusCode() == 200){
|
||||
// 注册/注销成功
|
||||
logger.info(String.format("%s %s成功", platformGBId, action));
|
||||
redisCatchStorage.delPlatformRegisterInfo(callId);
|
||||
parentPlatform.setStatus("注册".equals(action));
|
||||
// 取回Expires设置,避免注销过程中被置为0
|
||||
ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
|
||||
String expires = parentPlatformTmp.getExpires();
|
||||
parentPlatform.setExpires(expires);
|
||||
parentPlatform.setId(parentPlatformTmp.getId());
|
||||
storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
|
||||
|
||||
redisCatchStorage.updatePlatformRegister(parentPlatform);
|
||||
|
||||
redisCatchStorage.updatePlatformKeepalive(parentPlatform);
|
||||
|
||||
parentPlatformCatch.setParentPlatform(parentPlatform);
|
||||
|
||||
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package com.genersoft.iot.vmp.service;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
|
||||
/**
|
||||
* 设备相关业务处理
|
||||
*/
|
||||
public interface IDeviceService {
|
||||
|
||||
/**
|
||||
* 添加目录订阅
|
||||
* @param device 设备信息
|
||||
* @return
|
||||
*/
|
||||
boolean addCatalogSubscribe(Device device);
|
||||
|
||||
/**
|
||||
* 移除目录订阅
|
||||
* @param device 设备信息
|
||||
* @return
|
||||
*/
|
||||
boolean removeCatalogSubscribe(Device device);
|
||||
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package com.genersoft.iot.vmp.service.bean;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
|
||||
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.Element;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.sip.ResponseEvent;
|
||||
|
||||
public class CatalogSubscribeTask implements Runnable{
|
||||
private final Logger logger = LoggerFactory.getLogger(CatalogSubscribeTask.class);
|
||||
private Device device;
|
||||
private ISIPCommander sipCommander;
|
||||
|
||||
public CatalogSubscribeTask(Device device, ISIPCommander sipCommander) {
|
||||
this.device = device;
|
||||
this.sipCommander = sipCommander;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
sipCommander.catalogSubscribe(device, eventResult -> {
|
||||
ResponseEvent event = (ResponseEvent) eventResult.event;
|
||||
Element rootElement = null;
|
||||
try {
|
||||
rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312");
|
||||
} catch (DocumentException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
Element resultElement = rootElement.element("Result");
|
||||
String result = resultElement.getText();
|
||||
if (result.toUpperCase().equals("OK")){
|
||||
// 成功
|
||||
logger.info("目录订阅成功: {}", device.getDeviceId());
|
||||
}else {
|
||||
// 失败
|
||||
logger.info("目录订阅失败: {}-{}", device.getDeviceId(), result);
|
||||
}
|
||||
|
||||
},eventResult -> {
|
||||
// 失败
|
||||
logger.warn("目录订阅失败: {}-信令发送失败", device.getDeviceId());
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
package com.genersoft.iot.vmp.service.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
|
||||
import com.genersoft.iot.vmp.service.IDeviceService;
|
||||
import com.genersoft.iot.vmp.service.bean.CatalogSubscribeTask;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class DeviceServiceImpl implements IDeviceService {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class);
|
||||
|
||||
@Autowired
|
||||
private DynamicTask dynamicTask;
|
||||
;
|
||||
|
||||
@Autowired
|
||||
private ISIPCommander sipCommander;
|
||||
|
||||
@Override
|
||||
public boolean addCatalogSubscribe(Device device) {
|
||||
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
|
||||
return false;
|
||||
}
|
||||
// 添加目录订阅
|
||||
CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander);
|
||||
catalogSubscribeTask.run();
|
||||
// 提前开始刷新订阅
|
||||
String cron = getCron(device.getSubscribeCycleForCatalog() - 60);
|
||||
dynamicTask.startCron(device.getDeviceId(), catalogSubscribeTask, cron);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeCatalogSubscribe(Device device) {
|
||||
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
|
||||
return false;
|
||||
}
|
||||
dynamicTask.stopCron(device.getDeviceId());
|
||||
return true;
|
||||
}
|
||||
|
||||
public String getCron(int time) {
|
||||
if (time <= 59) {
|
||||
return "0/" + time +" * * * * ?";
|
||||
}else if (time <= 60* 59) {
|
||||
int minute = time/(60);
|
||||
return "0 0/" + minute +" * * * ?";
|
||||
}else if (time <= 60* 60* 59) {
|
||||
int hour = time/(60*60);
|
||||
return "0 0 0/" + hour +" * * ?";
|
||||
}else {
|
||||
return "0 0/10 * * * ?";
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue