From ca5139929b8b5853229ca3d63e2bca1ce82fa0ab Mon Sep 17 00:00:00 2001 From: songww Date: Wed, 13 May 2020 14:55:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E4=BF=AE=E5=A4=8Dcatalog?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E5=A4=B1=E8=B4=A5=E3=80=82=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E9=87=8D=E5=90=AF=E5=90=8E=E8=AE=BE=E5=A4=87=E6=9C=AA=E6=B3=A8?= =?UTF-8?q?=E5=86=8C=E4=BB=8D=E4=B8=8A=E6=8A=A5keeplive=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/event/DeviceOffLineDetector.java | 24 +++++++ .../iot/vmp/gb28181/event/EventPublisher.java | 4 +- .../KeepliveTimeoutListener.java | 2 +- .../OfflineEvent.java} | 6 +- .../OfflineEventListener.java} | 10 +-- .../gb28181/transmit/cmd/ISIPCommander.java | 2 +- .../transmit/cmd/impl/SIPCommander.java | 19 +++--- .../request/impl/MessageRequestProcessor.java | 68 +++++++++++++------ .../impl/InviteResponseProcessor.java | 49 +++++++++++-- .../vmanager/playback/PlaybackController.java | 7 +- src/main/resources/application.yml | 2 +- 11 files changed, 145 insertions(+), 48 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceOffLineDetector.java rename src/main/java/com/genersoft/iot/vmp/gb28181/event/{outline => offline}/KeepliveTimeoutListener.java (94%) rename src/main/java/com/genersoft/iot/vmp/gb28181/event/{outline/OutlineEvent.java => offline/OfflineEvent.java} (78%) rename src/main/java/com/genersoft/iot/vmp/gb28181/event/{outline/OutlineEventListener.java => offline/OfflineEventListener.java} (80%) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceOffLineDetector.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceOffLineDetector.java new file mode 100644 index 0000000..437e7de --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/DeviceOffLineDetector.java @@ -0,0 +1,24 @@ +package com.genersoft.iot.vmp.gb28181.event; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; + +/** + * @Description:设备离在线状态检测器,用于检测设备状态 + * @author: songww + * @date: 2020年5月13日 下午2:40:29 + */ +@Component +public class DeviceOffLineDetector { + + @Autowired + private RedisUtil redis; + + public boolean isOnline(String deviceId) { + String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + deviceId; + return redis.hasKey(key); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index de0dbc4..c6f5d44 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -4,8 +4,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; +import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent; import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent; -import com.genersoft.iot.vmp.gb28181.event.outline.OutlineEvent; /** * @Description:Event事件通知推送器,支持推送在线事件、离线事件 @@ -26,7 +26,7 @@ public class EventPublisher { } public void outlineEventPublish(String deviceId, String from){ - OutlineEvent outEvent = new OutlineEvent(this); + OfflineEvent outEvent = new OfflineEvent(this); outEvent.setDeviceId(deviceId); outEvent.setFrom(from); applicationEventPublisher.publishEvent(outEvent); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/KeepliveTimeoutListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepliveTimeoutListener.java similarity index 94% rename from src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/KeepliveTimeoutListener.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepliveTimeoutListener.java index 7f427e7..9f53d64 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/KeepliveTimeoutListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepliveTimeoutListener.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.gb28181.event.outline; +package com.genersoft.iot.vmp.gb28181.event.offline; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/OutlineEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEvent.java similarity index 78% rename from src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/OutlineEvent.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEvent.java index 7e72936..d2f612a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/OutlineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEvent.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.gb28181.event.outline; +package com.genersoft.iot.vmp.gb28181.event.offline; import org.springframework.context.ApplicationEvent; @@ -7,7 +7,7 @@ import org.springframework.context.ApplicationEvent; * @author: songww * @date: 2020年5月6日 上午11:33:13 */ -public class OutlineEvent extends ApplicationEvent { +public class OfflineEvent extends ApplicationEvent { /** * @Title: OutlineEvent @@ -15,7 +15,7 @@ public class OutlineEvent extends ApplicationEvent { * @param: @param source * @throws */ - public OutlineEvent(Object source) { + public OfflineEvent(Object source) { super(source); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/OutlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java similarity index 80% rename from src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/OutlineEventListener.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java index a87b42e..faa89fa 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/outline/OutlineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.gb28181.event.outline; +package com.genersoft.iot.vmp.gb28181.event.offline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,14 +13,14 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil; /** * @Description: 离线事件监听器,监听到离线后,修改设备离在线状态。 设备离线有两个来源: * 1、设备主动注销,发送注销指令,{@link com.genersoft.iot.vmp.gb28181.transmit.request.impl.RegisterRequestProcessor} - * 2、设备未知原因离线,心跳超时,{@link com.genersoft.iot.vmp.gb28181.event.outline.OutlineEventListener} + * 2、设备未知原因离线,心跳超时,{@link com.genersoft.iot.vmp.gb28181.event.offline.OfflineEventListener} * @author: songww * @date: 2020年5月6日 下午1:51:23 */ @Component -public class OutlineEventListener implements ApplicationListener { +public class OfflineEventListener implements ApplicationListener { - private final static Logger logger = LoggerFactory.getLogger(OutlineEventListener.class); + private final static Logger logger = LoggerFactory.getLogger(OfflineEventListener.class); @Autowired private IVideoManagerStorager storager; @@ -29,7 +29,7 @@ public class OutlineEventListener implements ApplicationListener { private RedisUtil redis; @Override - public void onApplicationEvent(OutlineEvent event) { + public void onApplicationEvent(OfflineEvent event) { if (logger.isDebugEnabled()) { logger.debug("设备离线事件触发,deviceId:" + event.getDeviceId() + ",from:" + event.getFrom()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index b0fa6cd..1039a35 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -79,7 +79,7 @@ public interface ISIPCommander { * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ - public String playbackStreamCmd(Device device,String channelId, String recordId, String startTime, String endTime); + public String playbackStreamCmd(Device device,String channelId, String startTime, String endTime); /** * 语音广播 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index b012f9a..a6b512f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1,14 +1,13 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import java.text.ParseException; -import java.util.Random; +import javax.sip.ClientTransaction; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import javax.sip.message.Request; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.conf.SipConfig; @@ -19,8 +18,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; import com.genersoft.iot.vmp.gb28181.utils.DateUtil; import com.genersoft.iot.vmp.gb28181.utils.SsrcUtil; -import tk.mybatis.mapper.util.StringUtil; - /** * @Description:设备能力接口,用于定义设备的控制、查询能力 * @author: songww @@ -181,16 +178,16 @@ public class SIPCommander implements ISIPCommander { * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ @Override - public String playbackStreamCmd(Device device, String channelId, String recordId, String startTime, String endTime) { + public String playbackStreamCmd(Device device, String channelId, String startTime, String endTime) { try { String ssrc = SsrcUtil.getPlayBackSsrc(); // StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o="+channelId+" 0 0 IN IP4 "+sipConfig.getSipIp()+"\r\n"); + content.append("o="+device.getDeviceId()+" 0 0 IN IP4 "+sipConfig.getSipIp()+"\r\n"); content.append("s=Playback\r\n"); - content.append("u="+recordId+":3\r\n"); + content.append("u="+channelId+":3\r\n"); content.append("c=IN IP4 "+sipConfig.getMediaIp()+"\r\n"); content.append("t="+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)+" "+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) +"\r\n"); if(device.getTransport().equals("TCP")) { @@ -439,11 +436,15 @@ public class SIPCommander implements ISIPCommander { } private void transmitRequest(Device device, Request request) throws SipException { + ClientTransaction clientTransaction = null; if(device.getTransport().equals("TCP")) { - sipLayer.getTcpSipProvider().sendRequest(request); + clientTransaction = sipLayer.getTcpSipProvider().getNewClientTransaction(request); + //sipLayer.getTcpSipProvider().sendRequest(request); } else if(device.getTransport().equals("UDP")) { - sipLayer.getUdpSipProvider().sendRequest(request); + clientTransaction = sipLayer.getUdpSipProvider().getNewClientTransaction(request); + //sipLayer.getUdpSipProvider().sendRequest(request); } + clientTransaction.sendRequest(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java index 9a39d5c..8a7c6cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java @@ -30,6 +30,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordItem; +import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -69,8 +70,21 @@ public class MessageRequestProcessor implements ISIPRequestProcessor { @Autowired private DeferredResultHolder deferredResultHolder; + @Autowired + private DeviceOffLineDetector offLineDetector; + private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; + private static final String MESSAGE_CATALOG = "Catalog"; + private static final String MESSAGE_DEVICE_INFO = "DeviceInfo"; + private static final String MESSAGE_KEEP_ALIVE = "Keepalive"; + private static final String MESSAGE_ALARM = "Alarm"; + private static final String MESSAGE_RECORD_INFO = "RecordInfo"; +// private static final String MESSAGE_BROADCAST = "Broadcast"; +// private static final String MESSAGE_DEVICE_STATUS = "DeviceStatus"; +// private static final String MESSAGE_MOBILE_POSITION = "MobilePosition"; +// private static final String MESSAGE_MOBILE_POSITION_INTERVAL = "Interval"; + /** * 处理MESSAGE请求 * @@ -85,22 +99,31 @@ public class MessageRequestProcessor implements ISIPRequestProcessor { this.transaction = transaction; Request request = evt.getRequest(); - - if (new String(request.getRawContent()).contains("Keepalive")) { - logger.info("接收到KeepAlive消息"); - processMessageKeepAlive(evt); - } else if (new String(request.getRawContent()).contains("Catalog")) { - logger.info("接收到Catalog消息"); - processMessageCatalogList(evt); - } else if (new String(request.getRawContent()).contains("DeviceInfo")) { - logger.info("接收到DeviceInfo消息"); - processMessageDeviceInfo(evt); - } else if (new String(request.getRawContent()).contains("Alarm")) { - logger.info("接收到Alarm消息"); - processMessageAlarm(evt); - } else if (new String(request.getRawContent()).contains("RecordInfo")) { - logger.info("接收到RecordInfo消息"); - processMessageRecordInfo(evt); + SAXReader reader = new SAXReader(); + Document xml; + try { + xml = reader.read(new ByteArrayInputStream(request.getRawContent())); + Element rootElement = xml.getRootElement(); + String cmd = rootElement.element("CmdType").getStringValue(); + + if (MESSAGE_KEEP_ALIVE.equals(cmd)) { + logger.info("接收到KeepAlive消息"); + processMessageKeepAlive(evt); + } else if (MESSAGE_CATALOG.equals(cmd)) { + logger.info("接收到Catalog消息"); + processMessageCatalogList(evt); + } else if (MESSAGE_DEVICE_INFO.equals(cmd)) { + logger.info("接收到DeviceInfo消息"); + processMessageDeviceInfo(evt); + } else if (MESSAGE_ALARM.equals(cmd)) { + logger.info("接收到Alarm消息"); + processMessageAlarm(evt); + } else if (MESSAGE_RECORD_INFO.equals(cmd)) { + logger.info("接收到RecordInfo消息"); + processMessageRecordInfo(evt); + } + } catch (DocumentException e) { + e.printStackTrace(); } } @@ -247,12 +270,17 @@ public class MessageRequestProcessor implements ISIPRequestProcessor { */ private void processMessageKeepAlive(RequestEvent evt){ try { - Request request = evt.getRequest(); - Response response = layer.getMessageFactory().createResponse(Response.OK,request); Element rootElement = getRootElement(evt); - Element deviceIdElement = rootElement.element("DeviceID"); + String deviceId = XmlUtil.getText(rootElement,"DeviceID"); + Request request = evt.getRequest(); + Response response = null; + if (offLineDetector.isOnline(deviceId)) { + response = layer.getMessageFactory().createResponse(Response.OK,request); + publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); + } else { + response = layer.getMessageFactory().createResponse(Response.BAD_REQUEST,request); + } transaction.sendResponse(response); - publisher.onlineEventPublish(deviceIdElement.getText(), VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/InviteResponseProcessor.java index 4204ce7..f825584 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/InviteResponseProcessor.java @@ -1,15 +1,25 @@ package com.genersoft.iot.vmp.gb28181.transmit.response.impl; +import java.text.ParseException; + +import javax.sip.ClientTransaction; import javax.sip.Dialog; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; +import javax.sip.address.SipURI; +import javax.sip.header.CSeqHeader; +import javax.sip.header.ViaHeader; import javax.sip.message.Request; +import javax.sip.message.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorFactory; import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor; /** @@ -20,20 +30,51 @@ import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor; @Component public class InviteResponseProcessor implements ISIPResponseProcessor { + private final static Logger logger = LoggerFactory.getLogger(SIPProcessorFactory.class); + /** * 处理invite响应 * - * @param request + * @param evt * 响应消息 */ @Override public void process(ResponseEvent evt, SipLayer layer, SipConfig config) { try { - Dialog dialog = evt.getDialog(); - Request reqAck =dialog.createAck(1L); - dialog.sendAck(reqAck); + Response response = evt.getResponse(); + int statusCode = response.getStatusCode(); + //trying不会回复 + if(statusCode == Response.TRYING){ + + } + //成功响应 + //下发ack + if(statusCode == Response.OK){ + ClientTransaction clientTransaction = evt.getClientTransaction(); + if(clientTransaction == null){ + logger.error("回复ACK时,clientTransaction为null >>> {}",response); + return; + } + Dialog clientDialog = clientTransaction.getDialog(); + + CSeqHeader clientCSeqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME); + long cseqId = clientCSeqHeader.getSeqNumber(); + /* + createAck函数,创建的ackRequest,会采用Invite响应的200OK,中的contact字段中的地址,作为目标地址。 + 有的终端传上来的可能还是内网地址,会造成ack发送不出去。接受不到音视频流 + 所以在此处统一替换地址。和响应消息的Via头中的地址保持一致。 + */ + Request ackRequest = clientDialog.createAck(cseqId); + SipURI requestURI = (SipURI) ackRequest.getRequestURI(); + ViaHeader viaHeader = (ViaHeader) response.getHeader(ViaHeader.NAME); + requestURI.setHost(viaHeader.getHost()); + requestURI.setPort(viaHeader.getPort()); + clientDialog.sendAck(ackRequest); + } } catch (InvalidArgumentException | SipException e) { e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/playback/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/playback/PlaybackController.java index 4be6110..20078a7 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/playback/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/playback/PlaybackController.java @@ -10,6 +10,7 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; @@ -30,7 +31,7 @@ public class PlaybackController { public ResponseEntity play(@PathVariable String deviceId,@PathVariable String channelId, String startTime, String endTime){ Device device = storager.queryVideoDevice(deviceId); - String ssrc = cmder.playStreamCmd(device, channelId); + String ssrc = cmder.playbackStreamCmd(device, channelId, startTime, endTime); if (logger.isDebugEnabled()) { logger.debug(String.format("设备预览 API调用,deviceId:%s ,channelId:%s",deviceId, channelId)); @@ -38,7 +39,9 @@ public class PlaybackController { } if(ssrc!=null) { - return new ResponseEntity(ssrc,HttpStatus.OK); + JSONObject json = new JSONObject(); + json.put("ssrc", ssrc); + return new ResponseEntity(json.toString(),HttpStatus.OK); } else { logger.warn("设备预览API调用失败!"); return new ResponseEntity(HttpStatus.INTERNAL_SERVER_ERROR); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4ec320e..02dc489 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -26,7 +26,7 @@ spring: server: port: 8080 sip: - ip: 10.200.64.63 + ip: 127.0.0.1 port: 5060 # 根据国标6.1.2中规定,domain宜采用ID统一编码的前十位编码。国标附录D中定义前8位为中心编码(由省级、市级、区级、基层编号组成,参照GB/T 2260-2007) # 后两位为行业编码,定义参照附录D.3