From c9bfdf2525c5e000d1f02e0e15183ca1b4fee9f3 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 24 Feb 2022 21:10:34 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=9B=BD=E6=A0=87?= =?UTF-8?q?=E7=9A=84=E7=BA=A7=E8=81=94=E5=BD=95=E5=83=8F=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/Device.java | 13 -- .../iot/vmp/gb28181/bean/MobilePosition.java | 11 -- .../iot/vmp/gb28181/bean/RecordItem.java | 12 +- .../iot/vmp/gb28181/event/EventPublisher.java | 13 +- .../KeepaliveTimeoutListenerForPlatform.java | 5 +- .../event/record/RecordEndEventListener.java | 16 ++- .../cmd/ISIPCommanderForPlatform.java | 9 ++ .../transmit/cmd/impl/SIPCommander.java | 18 ++- .../cmd/impl/SIPCommanderFroPlatform.java | 58 +++++++- .../cmd/RecordInfoQueryMessageHandler.java | 124 ++++++++++++------ .../cmd/RecordInfoResponseMessageHandler.java | 8 +- .../storager/dao/ParentPlatformMapper.java | 2 +- .../storager/dao/PlatformChannelMapper.java | 3 + 13 files changed, 203 insertions(+), 89 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index aff16711..761437fc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -3,11 +3,6 @@ package com.genersoft.iot.vmp.gb28181.bean; public class Device { - /** - * Id - */ - private int id; - /** * 设备Id */ @@ -119,13 +114,7 @@ public class Device { */ private int subscribeCycleForCatalog ; - public int getId() { - return id; - } - public void setId(int id) { - this.id = id; - } public String getDeviceId() { return deviceId; @@ -294,6 +283,4 @@ public class Device { public void setSubscribeCycleForCatalog(int subscribeCycleForCatalog) { this.subscribeCycleForCatalog = subscribeCycleForCatalog; } - - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java index 7c1f0adf..c6cf7825 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java @@ -7,10 +7,6 @@ package com.genersoft.iot.vmp.gb28181.bean; */ public class MobilePosition { - /** - * Id - */ - private int id; /** * 设备Id */ @@ -76,13 +72,6 @@ public class MobilePosition { */ private String cnLat; - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } public String getDeviceId() { return deviceId; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java index 39f894ce..ca7fd54c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java @@ -19,7 +19,9 @@ public class RecordItem implements Comparable{ private String name; private String filePath; - + + private String fileSize; + private String address; private String startTime; @@ -104,6 +106,14 @@ public class RecordItem implements Comparable{ this.recorderId = recorderId; } + public String getFileSize() { + return fileSize; + } + + public void setFileSize(String fileSize) { + this.fileSize = fileSize; + } + @Override public int compareTo(@NotNull RecordItem recordItem) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 8c8565be..1ab53128 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -1,12 +1,11 @@ package com.genersoft.iot.vmp.gb28181.event; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent; import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent; import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformCycleRegisterEvent; import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent; +import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent; import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent; @@ -15,7 +14,6 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; -import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent; import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent; @@ -144,4 +142,11 @@ public class EventPublisher { GbStream[] gbStreams = {gbStream}; catalogEventPublishForStream(platformId, gbStreams, type); } + + public void recordEndEventPush(RecordInfo recordInfo) { + RecordEndEvent outEvent = new RecordEndEvent(this); + outEvent.setRecordInfo(recordInfo); + applicationEventPublisher.publishEvent(outEvent); + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java index aaf5b5df..9ba0c055 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java @@ -76,10 +76,7 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent eventResult.callId = callid; eventResult.msg = "注册超时"; eventResult.type = "register timeout"; - if (sipSubscribe.getErrorSubscribe(callid) != null) { - sipSubscribe.getErrorSubscribe(callid).response(eventResult); - } - + sipSubscribe.getErrorSubscribe(callid).response(eventResult); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java index d7b33f21..95ffbfa0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.event.record; +import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,12 +24,8 @@ public class RecordEndEventListener implements ApplicationListener sseEmitters = new Hashtable<>(); - public void addSseEmitters(String browserId, SseEmitter sseEmitter) { - sseEmitters.put(browserId, sseEmitter); - } - public interface RecordEndEventHandler{ - void handler(List recordItems); + void handler(RecordInfo recordInfo); } private Map handlerMap = new HashMap<>(); @@ -38,6 +35,15 @@ public class RecordEndEventListener implements ApplicationListener 0) { + for (RecordEndEventHandler recordEndEventHandler : handlerMap.values()) { + recordEndEventHandler.handler(event.getRecordInfo()); + } + } } + + public void addEndEventHandler(String device, String channelId, RecordEndEventHandler recordEndEventHandler) { + handlerMap.put(device + channelId, recordEndEventHandler); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 7325889f..2550cb43 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; @@ -87,4 +88,12 @@ public interface ISIPCommanderForPlatform { */ boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, SubscribeInfo subscribeInfo); + /** + * 回复recordInfo + * @param deviceChannel 通道信息 + * @param parentPlatform 平台信息 + * @param fromTag fromTag + * @param recordInfo 录像信息 + */ + boolean recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 56681066..b0593bd4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1210,11 +1210,19 @@ public class SIPCommander implements ISIPCommander { recordInfoXml.append("RecordInfo\r\n"); recordInfoXml.append("" + sn + "\r\n"); recordInfoXml.append("" + channelId + "\r\n"); - recordInfoXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(startTime) + "\r\n"); - recordInfoXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(endTime) + "\r\n"); - recordInfoXml.append(" "+ secrecy + " \r\n"); - // 大华NVR要求必须增加一个值为all的文本元素节点Type - recordInfoXml.append("" + type+"\r\n"); + if (startTime != null) { + recordInfoXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(startTime) + "\r\n"); + } + if (endTime != null) { + recordInfoXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(endTime) + "\r\n"); + } + if (secrecy != null) { + recordInfoXml.append(" "+ secrecy + " \r\n"); + } + if (type != null) { + // 大华NVR要求必须增加一个值为all的文本元素节点Type + recordInfoXml.append("" + type+"\r\n"); + } recordInfoXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index b83e70b0..bd41ddb6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -1,12 +1,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; +import com.genersoft.iot.vmp.gb28181.utils.DateUtil; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; @@ -17,6 +15,7 @@ import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Lazy; import org.springframework.lang.Nullable; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; import javax.sip.*; import javax.sip.header.CallIdHeader; @@ -470,4 +469,55 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { return true; } + @Override + public boolean recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo) { + if ( parentPlatform ==null) { + return false; + } + try { + StringBuffer recordXml = new StringBuffer(600); + recordXml.append("\r\n"); + recordXml.append("\r\n"); + recordXml.append("RecordInfo\r\n"); + recordXml.append("" +recordInfo.getSn() + "\r\n"); + recordXml.append("" + recordInfo.getDeviceId() + "\r\n"); + recordXml.append("" + recordInfo.getSumNum() + "\r\n"); + recordXml.append("\r\n"); + for (RecordItem recordItem : recordInfo.getRecordList()) { + recordXml.append("\r\n"); + if (deviceChannel != null) { + recordXml.append("" + recordItem.getDeviceId() + "\r\n"); + recordXml.append("" + recordItem.getName() + "\r\n"); + recordXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getStartTime()) + "\r\n"); + recordXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getEndTime()) + "\r\n"); + recordXml.append("" + recordItem.getSecrecy() + "\r\n"); + recordXml.append("" + recordItem.getType() + "\r\n"); + if (!StringUtils.isEmpty(recordItem.getFileSize())) { + recordXml.append("" + recordItem.getFileSize() + "\r\n"); + } + if (!StringUtils.isEmpty(recordItem.getFilePath())) { + recordXml.append("" + recordItem.getFilePath() + "\r\n"); + } + } + recordXml.append("\r\n"); + } + + recordXml.append("\r\n"); + recordXml.append("\r\n"); + + // callid + CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() + : udpSipProvider.getNewCallId(); + System.out.println( + recordXml.toString() + ); + Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, recordXml.toString(), fromTag, callIdHeader); + transmitRequest(parentPlatform, request); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + return false; + } + return true; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java index 08a5d772..a2733143 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query. import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEventListener; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; @@ -46,6 +47,9 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp @Autowired private SIPCommander commander; + @Autowired + private RecordEndEventListener recordEndEventListener; + @Autowired private SipConfig config; @@ -65,49 +69,89 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp @Override public void handForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element rootElement) { - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + parentPlatform.getServerGBId(); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - try { - // 回复200 OK - responseAck(evt, Response.OK); - Element snElement = rootElement.element("SN"); - int sn = Integer.parseInt(snElement.getText()); - Element deviceIDElement = rootElement.element("DeviceID"); - String channelId = deviceIDElement.getText(); - Element startTimeElement = rootElement.element("StartTime"); - String startTime = startTimeElement.getText(); - Element endTimeElement = rootElement.element("EndTime"); - String endTime = endTimeElement.getText(); - Element secrecyElement = rootElement.element("Secrecy"); - int secrecy = Integer.parseInt(secrecyElement.getText()); - Element typeElement = rootElement.element("Type"); - String type = typeElement.getText(); - // 确认是直播还是国标, 国标直接请求下级,直播请求录像管理服务 - List channelSources = storager.getChannelSource(parentPlatform.getServerGBId(), channelId); - if (channelSources.get(0).getCount() > 0) { // 国标 - // 向国标设备请求录像数据 - Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(parentPlatform.getServerGBId(), channelId); - commander.recordInfoQuery(device, channelId, DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTime), - DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTime), sn, secrecy, type, (eventResult -> { - // 查询成功 - }),(eventResult -> { - // 查询失败 - - })); - - }else if (channelSources.get(0).getCount() > 0) { // 直播流 - // TODO - }else { // 错误的请求 - - } - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); + Element snElement = rootElement.element("SN"); + int sn = Integer.parseInt(snElement.getText()); + Element deviceIDElement = rootElement.element("DeviceID"); + String channelId = deviceIDElement.getText(); + Element startTimeElement = rootElement.element("StartTime"); + String startTime = null; + if (startTimeElement != null) { + startTime = startTimeElement.getText(); } + Element endTimeElement = rootElement.element("EndTime"); + String endTime = null; + if (endTimeElement != null) { + endTime = endTimeElement.getText(); + } + Element secrecyElement = rootElement.element("Secrecy"); + int secrecy = 0; + if (secrecyElement != null) { + secrecy = Integer.parseInt(secrecyElement.getText()); + } + String type = "all"; + Element typeElement = rootElement.element("Type"); + if (typeElement != null) { + type = typeElement.getText(); + } + // 确认是直播还是国标, 国标直接请求下级,直播请求录像管理服务 + List channelSources = storager.getChannelSource(parentPlatform.getServerGBId(), channelId); + if (channelSources.get(0).getCount() > 0) { // 国标 + // 向国标设备请求录像数据 + Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(parentPlatform.getServerGBId(), channelId); + DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(parentPlatform.getServerGBId(), channelId); + // 接收录像数据 + recordEndEventListener.addEndEventHandler(deviceChannel.getDeviceId(), channelId, (recordInfo)->{ + cmderFroPlatform.recordInfo(deviceChannel, parentPlatform, fromHeader.getTag(), recordInfo); + }); + commander.recordInfoQuery(device, channelId, DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTime), + DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTime), sn, secrecy, type, (eventResult -> { + // 回复200 OK + try { + responseAck(evt, Response.OK); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + }),(eventResult -> { + // 查询失败 + try { + responseAck(evt, eventResult.statusCode, eventResult.msg); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + })); + + }else if (channelSources.get(1).getCount() > 0) { // 直播流 + // TODO + try { + responseAck(evt, Response.NOT_IMPLEMENTED); // 回复未实现 + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + }else { // 错误的请求 + try { + responseAck(evt, Response.BAD_REQUEST); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index f0f84213..45b7e56b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordItem; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -49,6 +50,9 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @Autowired private DeferredResultHolder deferredResultHolder; + @Autowired + private EventPublisher eventPublisher; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -77,6 +81,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent Element recordListElement = rootElement.element("RecordList"); if (recordListElement == null || recordInfo.getSumNum() == 0) { logger.info("无录像数据"); + eventPublisher.recordEndEventPush(recordInfo); RequestMessage msg = new RequestMessage(); msg.setKey(key); msg.setData(recordInfo); @@ -99,6 +104,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent record.setDeviceId(getText(itemRecord, "DeviceID")); record.setName(getText(itemRecord, "Name")); record.setFilePath(getText(itemRecord, "FilePath")); + record.setFileSize(getText(itemRecord, "FileSize")); record.setAddress(getText(itemRecord, "Address")); record.setStartTime( DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "StartTime"))); @@ -112,7 +118,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent } recordInfo.setRecordList(recordList); } - + eventPublisher.recordEndEventPush(recordInfo); // 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题 String cacheKey = CACHE_RECORDINFO_KEY + device.getDeviceId() + sn; redis.set(cacheKey + "_" + uuid, recordList, 90); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java index bb44325c..d10dde51 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java @@ -90,6 +90,6 @@ public interface ParentPlatformMapper { @Select("select 'channel' as name, count(pgc.platformId) count from platform_gb_channel pgc where pgc.platformId=#{platformId} and pgc.channelId =#{gbId} " + "union " + - "select 'stream' as name, count(pgs.platformId) count from platform_gb_stream pgs left join gb_stream gs on pgs.gbStreamId = gs.id where pgs.platformId=#{platformId} and gs.gbId = #{gbId}") + "select 'stream' as name, count(pgs.platformId) count from platform_gb_stream pgs left join gb_stream gs on pgs.gbStreamId = gs.gbStreamId where pgs.platformId=#{platformId} and gs.gbId = #{gbId}") List getChannelSource(String platformId, String gbId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java index 37c86a81..4f5de2b8 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java @@ -34,6 +34,7 @@ public interface PlatformChannelMapper { "") int addChannels(String platformId, List channelReducesToAdd); + @Delete("") int cleanChannelForGB(String platformId); + @Select("SELECT * FROM device_channel WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE " + "platformId='${platformId}' AND channelId='${channelId}' ) AND channelId='${channelId}'") DeviceChannel queryChannelInParentPlatform(String platformId, String channelId); + @Select("select dc.channelId as id, dc.name as name, pgc.platformId as platformId, pgc.catalogId as parentId, 0 as childrenCount, 1 as type " + "from device_channel dc left join platform_gb_channel pgc on dc.deviceId = pgc.deviceId and dc.channelId = pgc.channelId " + "where pgc.platformId=#{platformId} and pgc.catalogId=#{catalogId}") From e94b99d11c46246532edc93cd25cbf8c0b88f03f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sun, 27 Feb 2022 20:01:31 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=9B=BD=E6=A0=87?= =?UTF-8?q?=E5=BD=95=E5=83=8F=E7=BA=A7=E8=81=94=E6=92=AD=E6=94=BE=EF=BC=8C?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=82=B9=E6=92=AD=E6=B5=81=E7=A8=8B=EF=BC=8C?= =?UTF-8?q?=E5=8A=A0=E5=BF=AB=E7=82=B9=E6=92=AD=E9=80=9F=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/SendRtpItem.java | 26 ++++++ .../iot/vmp/gb28181/session/SsrcConfig.java | 1 - .../vmp/gb28181/task/GPSSubscribeTask.java | 1 - .../transmit/SIPProcessorObserver.java | 2 - .../transmit/cmd/impl/SIPCommander.java | 16 ++-- .../cmd/impl/SIPCommanderFroPlatform.java | 3 - .../request/impl/AckRequestProcessor.java | 84 +++++++++++------- .../request/impl/ByeRequestProcessor.java | 19 +++- .../request/impl/InviteRequestProcessor.java | 88 ++++++++++++++++--- .../impl/InviteResponseProcessor.java | 3 - .../vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../vmp/media/zlm/ZLMHttpHookSubscribe.java | 30 ++++--- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 1 - .../vmp/media/zlm/ZLMRTPServerFactory.java | 9 ++ .../iot/vmp/service/impl/PlayServiceImpl.java | 5 +- .../storager/dao/ParentPlatformMapper.java | 2 +- .../storager/dao/PlatformChannelMapper.java | 8 +- .../storager/impl/RedisCatchStorageImpl.java | 1 - .../impl/DeviceAlarmServiceImplTest.java | 7 -- 19 files changed, 208 insertions(+), 100 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 2c9c494c..3e5d222a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -71,6 +71,16 @@ public class SendRtpItem { */ private String mediaServerId; + /** + * invite的callId + */ + private String CallId; + + /** + * 是否是play, false是playback + */ + private boolean isPlay; + public String getIp() { return ip; } @@ -174,4 +184,20 @@ public class SendRtpItem { public void setMediaServerId(String mediaServerId) { this.mediaServerId = mediaServerId; } + + public String getCallId() { + return CallId; + } + + public void setCallId(String callId) { + CallId = callId; + } + + public boolean isPlay() { + return isPlay; + } + + public void setPlay(boolean play) { + isPlay = play; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java index e96e6a5b..ac54c2d9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java @@ -81,7 +81,6 @@ public class SsrcConfig { isUsed.remove(sn); notUsed.add(sn); }catch (NullPointerException e){ - System.out.printf("11111"); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java index 0d56bd58..f0d90336 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java @@ -36,7 +36,6 @@ public class GPSSubscribeTask implements Runnable{ SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key); if (subscribe != null) { - System.out.println("发送GPS消息"); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); if (parentPlatform == null || parentPlatform.isStatus()) { // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 71025c00..d352bb2b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -141,7 +141,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { */ @Override public void processTimeout(TimeoutEvent timeoutEvent) { - System.out.println("processTimeout"); if(timeoutProcessor != null) { timeoutProcessor.process(timeoutEvent); } @@ -173,7 +172,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { @Override public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) { - System.out.println("processDialogTerminated"); CallIdHeader callId = dialogTerminatedEvent.getDialog().getCallId(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index b0593bd4..da664dd9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -346,8 +346,11 @@ public class SIPCommander implements ISIPCommander { subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; - event.response(mediaServerItemInUse, json); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); + if (event != null) { + event.response(mediaServerItemInUse, json); + } + +// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); }); // StringBuffer content = new StringBuffer(200); @@ -452,9 +455,11 @@ public class SIPCommander implements ISIPCommander { logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + System.out.println(344444); if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; - event.response(mediaServerItemInUse, json); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); + if (event != null) { + event.response(mediaServerItemInUse, json); + } }); StringBuffer content = new StringBuffer(200); @@ -466,8 +471,6 @@ public class SIPCommander implements ISIPCommander { content.append("t="+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)+" " +DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) +"\r\n"); - - String streamMode = device.getStreamMode().toUpperCase(); if (userSetup.isSeniorSdp()) { @@ -1202,7 +1205,6 @@ public class SIPCommander implements ISIPCommander { if (type == null) { type = "all"; } - try { StringBuffer recordInfoXml = new StringBuffer(200); recordInfoXml.append("\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index bd41ddb6..637381f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -508,9 +508,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { // callid CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); - System.out.println( - recordXml.toString() - ); Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, recordXml.toString(), fromTag, callIdHeader); transmitRequest(parentPlatform, request); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 127ef29a..1e99c0b9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -1,10 +1,13 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -24,6 +27,8 @@ import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; import java.util.HashMap; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; /** * SIP命令类型: ACK请求 @@ -52,6 +57,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IMediaServerService mediaServerService; + @Autowired + private ZLMHttpHookSubscribe subscribe; + /** * 处理 ACK请求 @@ -60,6 +68,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In */ @Override public void process(RequestEvent evt) { + logger.debug("ACK请求: {}", ((System.currentTimeMillis()))); Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState()== DialogState.CONFIRMED) { @@ -69,16 +78,17 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); StreamInfo streamInfo = null; - if (deviceId == null) { + if (sendRtpItem.isPlay()) { + streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + }else { + streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); + } + System.out.println(JSON.toJSON(streamInfo)); + if (streamInfo == null) { streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setStreamId(sendRtpItem.getStreamId()); - }else { - streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - sendRtpItem.setStreamId(streamInfo.getStreamId()); - streamInfo.setApp("rtp"); } - redisCatchStorage.updateSendRTPSever(sendRtpItem); logger.info(platformGbId); logger.info(channelId); @@ -90,34 +100,42 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); - //param.put ("src_port", sendRtpItem.getLocalPort()); // 设备推流查询,成功后才能转推 - boolean rtpPushed = false; - long startTime = System.currentTimeMillis(); - while (!rtpPushed) { - try { - if (System.currentTimeMillis() - startTime < 30 * 1000) { - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { - rtpPushed = true; - logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", - streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); - zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } else { - logger.info("等待设备推流[{}/{}].......", - streamInfo.getApp() ,streamInfo.getStreamId()); - Thread.sleep(1000); - continue; - } - } else { - rtpPushed = true; - logger.info("设备推流[{}/{}]超时,终止向上级推流", - streamInfo.getApp() ,streamInfo.getStreamId()); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { +// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", +// streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// } else { +// // 对hook进行订阅 +// logger.info("等待设备推流[{}/{}].......", +// streamInfo.getApp(), streamInfo.getStreamId()); +// Timer timer = new Timer(); +// timer.schedule(new TimerTask() { +// @Override +// public void run() { +// logger.info("设备推流[{}/{}]超时,终止向上级推流", +// finalStreamInfo.getApp() , finalStreamInfo.getStreamId()); +// +// } +// }, 30*1000L); +// // 添加订阅 +// JSONObject subscribeKey = new JSONObject(); +// subscribeKey.put("app", "rtp"); +// subscribeKey.put("stream", streamInfo.getStreamId()); +// subscribeKey.put("mediaServerId", streamInfo.getMediaServerId()); +// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey, +// (MediaServerItem mediaServerItemInUse, JSONObject json) -> { +// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", +// finalStreamInfo.getApp(), finalStreamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); +// timer.cancel(); +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); +// }); +// } + + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index feb44c54..9b6e7276 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -87,18 +87,29 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); - if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) { + int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); + if (totalReaderCount == 0) { logger.info(streamId + "无其它观看者,通知设备停止推流"); cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId); + }else if (totalReaderCount == -1){ + logger.warn(streamId + " 查找其它观看者失败"); } } // 可能是设备主动停止 Device device = storager.queryVideoDeviceByChannelId(platformGbId); if (device != null) { - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); + if (sendRtpItem.isPlay()) { + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); + if (streamInfo != null) { + redisCatchStorage.stopPlay(streamInfo); + } + }else { + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), channelId); + if (streamInfo != null) { + redisCatchStorage.stopPlayback(streamInfo); + } } + storager.stopPlay(device.getDeviceId(), channelId); mediaServerService.closeRTPServer(device, channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index ae2819ca..a157a5c3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,18 +1,28 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; +import gov.nist.javax.sdp.TimeDescriptionImpl; +import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import org.slf4j.Logger; @@ -27,10 +37,13 @@ import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; import javax.sip.address.SipURI; +import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.List; import java.util.Vector; @@ -60,6 +73,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IPlayService playService; + @Autowired + private ISIPCommander commander; + @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @@ -69,6 +85,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private SIPProcessorObserver sipProcessorObserver; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -84,6 +101,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Override public void process(RequestEvent evt) { // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 + Long startTimeForInvite = System.currentTimeMillis(); try { Request request = evt.getRequest(); SipURI sipURI = (SipURI) request.getRequestURI(); @@ -91,6 +109,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String requesterId = null; FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); + CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); AddressImpl address = (AddressImpl) fromHeader.getAddress(); SipUri uri = (SipUri) address.getURI(); requesterId = uri.getUser(); @@ -101,7 +120,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return; } - // 查询请求方是否上级平台 + // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform != null) { // 查询平台下是否有该通道 @@ -158,7 +177,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements ssrc = ssrcDefault; sdp = SdpFactory.getInstance().createSessionDescription(contentString); } + String sessionName = sdp.getSessionName().getValue(); + Long startTime = null; + Long stopTime = null; + Date start = null; + Date end = null; + if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { + TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0)); + TimeField startTimeFiled = (TimeField)timeDescription.getTime(); + startTime = startTimeFiled.getStartTime(); + stopTime = startTimeFiled.getStopTime(); + + start = new Date(startTime*1000); + end = new Date(stopTime*1000); + } // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 @@ -228,23 +261,31 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(evt, Response.BUSY_HERE); return; } - + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setPlay("Play".equals(sessionName)); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); - // 通知下级推流, - PlayResult playResult = playService.play(mediaServerItem,device.getDeviceId(), channelId, (mediaServerItemInUSe, responseJSON)->{ - // 收到推流, 回复200OK, 等待ack + + Device finalDevice = device; + MediaServerItem finalMediaServerItem = mediaServerItem; + Long finalStartTime = startTime; + Long finalStopTime = stopTime; + ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ + logger.info("[上级点播]收到下级开始点播订阅, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); // if (sendRtpItem == null) return; sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); - // TODO 添加对tcp的支持 StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); - content.append("s=Play\r\n"); + content.append("s=" + sessionName+"\r\n"); content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); - content.append("t=0 0\r\n"); + if ("Playback".equals(sessionName)) { + content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); + }else { + content.append("t=0 0\r\n"); + } content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); @@ -260,7 +301,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } catch (ParseException e) { e.printStackTrace(); } - } ,((event) -> { + if ("Playback".equals(sessionName) && responseJSON != null) { + playService.onPublishHandlerForPlayBack(finalMediaServerItem, responseJSON, finalDevice.getDeviceId(), channelId, null); + } + }; + SipSubscribe.Event errorEvent = ((event) -> { // 未知错误。直接转发设备点播的错误 Response response = null; try { @@ -271,11 +316,27 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } catch (ParseException | SipException | InvalidArgumentException e) { e.printStackTrace(); } - })); - if (logger.isDebugEnabled()) { - logger.debug(playResult.getResult().toString()); + }); + if ("Playback".equals(sessionName)) { + sendRtpItem.setPlay(false); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, sendRtpItem.getSsrc(), true); + sendRtpItem.setStreamId(ssrc); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + commander.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, format.format(start), format.format(end), hookEvent, errorEvent); + }else { + sendRtpItem.setPlay(true); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); + if (streamInfo == null) { + if (mediaServerItem.isRtpEnable()) { + sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId)); + } + sendRtpItem.setPlay(false); + playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent); + }else { + sendRtpItem.setStreamId(streamInfo.getStreamId()); + hookEvent.response(mediaServerItem, null); + } } - }else if (gbStream != null) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, @@ -295,7 +356,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); - // TODO 添加对tcp的支持 StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index 5446a902..1b5081b1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -82,9 +82,6 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { requestURI.setPort(event.getRemotePort()); reqAck.setRequestURI(requestURI); logger.info("向 " + event.getRemoteIpAddress() + ":" + event.getRemotePort() + "回复ack"); - SipURI sipURI = (SipURI)dialog.getRemoteParty().getURI(); - String deviceId = requestURI.getUser(); - String channelId = sipURI.getUser(); dialog.sendAck(reqAck); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 4f458135..e2c83ea9 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -181,7 +181,7 @@ public class ZLMHttpHookListener { @PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8") public ResponseEntity onPublish(@RequestBody JSONObject json) { - logger.debug("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString()); + logger.info("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString()); JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java index c8cca53e..84b36e3e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java @@ -77,21 +77,23 @@ public class ZLMHttpHookSubscribe { if (eventMap == null) { return; } - Iterator> iterator = eventMap.entrySet().iterator(); - while (iterator.hasNext()){ - Map.Entry next = iterator.next(); - JSONObject key = next.getKey(); - Boolean result = null; - for (String s : key.keySet()) { - if (result == null) { - result = key.getString(s).equals(hookResponse.getString(s)); - }else { - if (key.getString(s) == null) continue; - result = result && key.getString(s).equals(hookResponse.getString(s)); + + Set> entries = eventMap.entrySet(); + if (entries.size() > 0) { + for (Map.Entry entry : entries) { + JSONObject key = entry.getKey(); + Boolean result = null; + for (String s : key.keySet()) { + if (result == null) { + result = key.getString(s).equals(hookResponse.getString(s)); + }else { + if (key.getString(s) == null) continue; + result = result && key.getString(s).equals(hookResponse.getString(s)); + } + } + if (null != result && result){ + entries.remove(entry); } - } - if (null != result && result){ - iterator.remove(); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 05ecd3fb..d0b1cb2d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -72,7 +72,6 @@ public class ZLMRESTfulUtils { ResponseBody responseBody = response.body(); if (responseBody != null) { String responseStr = responseBody.string(); - System.out.println(responseStr); responseJSON = JSON.parseObject(responseStr); } }else { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 30a15096..76bab9c0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -242,9 +242,18 @@ public class ZLMRTPServerFactory { */ public int totalReaderCount(MediaServerItem mediaServerItem, String app, String streamId) { JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId); + Integer code = mediaInfo.getInteger("code"); if (mediaInfo == null) { return 0; } + if ( code < 0) { + logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg")); + return -1; + } + if ( code == 0 && ! mediaInfo.getBoolean("online")) { + logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg")); + return -1; + } return mediaInfo.getInteger("totalReaderCount"); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index cf30a794..d3f6976c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -122,7 +122,6 @@ public class PlayServiceImpl implements IPlayService { // 点播结束时调用截图接口 try { String classPath = ResourceUtils.getURL("classpath:").getPath(); - // System.out.println(classPath); // 兼容打包为jar的class路径 if(classPath.contains("jar")) { classPath = classPath.substring(0, classPath.lastIndexOf(".")); @@ -238,11 +237,11 @@ public class PlayServiceImpl implements IPlayService { } @Override - public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) { + public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid); + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId, uuid); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java index d10dde51..f74b6d4d 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java @@ -88,7 +88,7 @@ public interface ParentPlatformMapper { ""}) int setDefaultCatalog(String platformId, String catalogId); - @Select("select 'channel' as name, count(pgc.platformId) count from platform_gb_channel pgc where pgc.platformId=#{platformId} and pgc.channelId =#{gbId} " + + @Select("select 'channel' as name, count(pgc.platformId) count from platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId where pgc.platformId=#{platformId} and dc.channelId =#{gbId} " + "union " + "select 'stream' as name, count(pgs.platformId) count from platform_gb_stream pgs left join gb_stream gs on pgs.gbStreamId = gs.gbStreamId where pgs.platformId=#{platformId} and gs.gbId = #{gbId}") List getChannelSource(String platformId, String gbId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java index 3df38fc3..0abff270 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java @@ -52,8 +52,8 @@ public interface PlatformChannelMapper { int cleanChannelForGB(String platformId); - @Select("SELECT * FROM device_channel WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE " + - "platformId='${platformId}' AND channelId='${channelId}' ) AND channelId='${channelId}'") + @Select("SELECT dc.* FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE " + + "pgc.platformId=#{platformId} AND dc.channelId=#{channelId}") DeviceChannel queryChannelInParentPlatform(String platformId, String channelId); @@ -62,7 +62,7 @@ public interface PlatformChannelMapper { "where pgc.platformId=#{platformId} and pgc.catalogId=#{catalogId}") List queryChannelInParentPlatformAndCatalog(String platformId, String catalogId); - @Select("SELECT * FROM device WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE platformId='${platformId}' AND channelId='${channelId}')") + @Select("SELECT * FROM device WHERE deviceId = (SELECT deviceId FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE pgc.platformId='${platformId}' AND dc.channelId='${channelId}')") Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId); @Delete("") int delByCatalogIdAndChannelIdAndPlatformId(PlatformCatalog platformCatalog); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 92fdf6c5..1baefbe8 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -139,7 +139,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public StreamInfo queryPlayByDevice(String deviceId, String channelId) { -// List playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, List playLeys = redis.scan(String.format("%S_%s_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, userSetup.getServerId(), deviceId, diff --git a/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java b/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java index 3cb9aa56..23b9f6b2 100644 --- a/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java +++ b/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java @@ -50,14 +50,7 @@ class DeviceAlarmServiceImplTest { // System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, "1", null, // null, null).getSize()); - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null, - "2021-01-01 00:00:00", null).getSize()); - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null, - null, "2021-04-01 09:00:00").getSize()); - - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null, - "2021-02-01 01:00:00", "2021-04-01 04:00:00").getSize()); } From d21322a93258206eb910d7ac3a70a4812fc48cbc Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 3 Mar 2022 18:23:52 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=9B=BD=E6=A0=87?= =?UTF-8?q?=E7=BA=A7=E8=81=94=E5=BD=95=E5=83=8F=E9=A2=84=E8=A7=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/SDPInfo.java | 14 ++ .../transmit/cmd/impl/SIPCommander.java | 2 + .../request/impl/AckRequestProcessor.java | 6 +- .../request/impl/ByeRequestProcessor.java | 31 +-- .../request/impl/InviteRequestProcessor.java | 188 ++++++++++-------- .../iot/vmp/service/IMediaServerService.java | 2 +- .../vmp/service/bean/PlayBackCallback.java | 3 +- .../iot/vmp/service/bean/PlayBackResult.java | 55 +++++ .../service/impl/MediaServerServiceImpl.java | 10 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 36 +++- .../vmanager/gb28181/play/PlayController.java | 1 - .../gb28181/playback/PlaybackController.java | 4 +- 12 files changed, 227 insertions(+), 125 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java new file mode 100644 index 00000000..39225b57 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java @@ -0,0 +1,14 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import javax.sdp.SessionDescription; + +public class SDPInfo { + private byte[] source; + private SessionDescription sdpSource; + private String sessionName; + private Long startTime; + private Long stopTime; + private String username; + private String address; + private String ssrc; +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index ff871e3c..437c69d9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -453,6 +453,7 @@ public class SIPCommander implements ISIPCommander { subscribeKey.put("app", "rtp"); subscribeKey.put("stream", ssrcInfo.getStream()); subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); subscribeKey.put("mediaServerId", mediaServerItem.getId()); logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, @@ -718,6 +719,7 @@ public class SIPCommander implements ISIPCommander { if (ssrcTransaction != null) { MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc()); + mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream()); streamSession.remove(deviceId, channelId, ssrcTransaction.getStream()); } } catch (SipException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index cb03d4cf..d5bc99b7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -68,7 +68,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In */ @Override public void process(RequestEvent evt) { - logger.debug("ACK请求: {}", ((System.currentTimeMillis()))); + logger.info("ACK请求: {}", ((System.currentTimeMillis()))); Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState()== DialogState.CONFIRMED) { @@ -88,10 +88,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setStream(sendRtpItem.getStreamId()); - }else { - streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - sendRtpItem.setStreamId(streamInfo.getStream()); - streamInfo.setApp("rtp"); } redisCatchStorage.updateSendRTPSever(sendRtpItem); logger.info(platformGbId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 60ea11bb..deda7832 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -90,29 +91,31 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); if (totalReaderCount == 0) { logger.info(streamId + "无其它观看者,通知设备停止推流"); - cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId); + cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId); }else if (totalReaderCount == -1){ logger.warn(streamId + " 查找其它观看者失败"); } } // 可能是设备主动停止 Device device = storager.queryVideoDeviceByChannelId(platformGbId); - if (device != null) { - if (sendRtpItem.isPlay()) { - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); + if (device != null) { + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); + if (sendRtpItem != null) { + if (sendRtpItem.isPlay()) { + if (streamInfo != null) { + redisCatchStorage.stopPlay(streamInfo); + } + }else { + if (streamInfo != null) { + redisCatchStorage.stopPlayback(streamInfo); + } } - }else { - StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), channelId); - if (streamInfo != null) { - redisCatchStorage.stopPlayback(streamInfo); - } - } - storager.stopPlay(device.getDeviceId(), channelId); - mediaServerService.closeRTPServer(device, channelId, streamInfo.getStream()); + storager.stopPlay(device.getDeviceId(), channelId); + mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream()); + } } + } } catch (SipException e) { e.printStackTrace(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index e08d1fb9..52859e6a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -101,19 +102,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Override public void process(RequestEvent evt) { // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 - Long startTimeForInvite = System.currentTimeMillis(); try { Request request = evt.getRequest(); SipURI sipURI = (SipURI) request.getRequestURI(); String channelId = sipURI.getUser(); - String requesterId = null; - - FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); + String requesterId = SipUtils.getUserIdFromFromHeader(request); CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); - AddressImpl address = (AddressImpl) fromHeader.getAddress(); - SipUri uri = (SipUri) address.getURI(); - requesterId = uri.getUser(); - if (requesterId == null || channelId == null) { logger.info("无法从FromHeader的Address中获取到平台id,返回400"); responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误 @@ -122,7 +116,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); - if (platform != null) { + if (platform == null) { + inviteFromDeviceHandle(evt, requesterId); + }else { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); @@ -141,7 +137,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId); - responseAck(evt, Response.GONE, "media server not found"); + responseAck(evt, Response.GONE); return; } Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); @@ -197,7 +193,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 查看是否支持PS 负载96 //String ip = null; int port = -1; - //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (Object description : mediaDescriptions) { @@ -233,7 +228,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); - //String sessionName = sdp.getSessionName().getValue(); logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc); Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 @@ -271,8 +265,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements Long finalStartTime = startTime; Long finalStopTime = stopTime; ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ - logger.info("[上级点播]收到下级开始点播订阅, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); - // if (sendRtpItem == null) return; + logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); + // * 0 等待设备推流上来 + // * 1 下级已经推流,等待上级平台回复ack + // * 2 推流中 sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -301,9 +297,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } catch (ParseException e) { e.printStackTrace(); } - if ("Playback".equals(sessionName) && responseJSON != null) { - playService.onPublishHandlerForPlayBack(finalMediaServerItem, responseJSON, finalDevice.getDeviceId(), channelId, null); - } }; SipSubscribe.Event errorEvent = ((event) -> { // 未知错误。直接转发设备点播的错误 @@ -319,10 +312,29 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements }); if ("Playback".equals(sessionName)) { sendRtpItem.setPlay(false); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, sendRtpItem.getSsrc(), true); sendRtpItem.setStreamId(ssrc); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - commander.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, format.format(start), format.format(end), hookEvent, errorEvent); + playService.playBack(device.getDeviceId(), channelId, format.format(start), format.format(end),result -> { + if (result.getCode() != 0){ + logger.warn("录像回放失败"); + if (result.getEvent() != null) { + errorEvent.response(result.getEvent()); + } + try { + responseAck(evt, Response.REQUEST_TIMEOUT); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + }else { + if (result.getMediaServerItem() != null) { + hookEvent.response(result.getMediaServerItem(), result.getResponse()); + } + } + }); }else { sendRtpItem.setPlay(true); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); @@ -333,7 +345,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setPlay(false); playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent); }else { - sendRtpItem.setStreamId(streamInfo.getStreamId()); + sendRtpItem.setStreamId(streamInfo.getStream()); hookEvent.response(mediaServerItem, null); } } @@ -379,72 +391,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - } else { - // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) - Device device = redisCatchStorage.getDevice(requesterId); - if (device != null) { - logger.info("收到设备" + requesterId + "的语音广播Invite请求"); - responseAck(evt, Response.TRYING); - - String contentString = new String(request.getRawContent()); - // jainSip不支持y=字段, 移除移除以解析。 - String substring = contentString; - String ssrc = "0000000404"; - int ssrcIndex = contentString.indexOf("y="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - } - ssrcIndex = substring.indexOf("f="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - } - SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); - - // 获取支持的格式 - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - // 查看是否支持PS 负载96 - int port = -1; - //boolean recvonly = false; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (int i = 0; i < mediaDescriptions.size(); i++) { - MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("8")) { - port = media.getMediaPort(); - String protocol = media.getProtocol(); - // 区分TCP发流还是udp, 当前默认udp - if ("TCP/RTP/AVP".equals(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equals(setup)) { - tcpActive = true; - } else if ("passive".equals(setup)) { - tcpActive = false; - } - } - } - break; - } - } - if (port == -1) { - logger.info("不支持的媒体格式,返回415"); - // 回复不支持的格式 - responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 - return; - } - String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getOrigin().getAddress(); - logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); - - } else { - logger.warn("来自无效设备/平台的请求"); - responseAck(evt, Response.BAD_REQUEST); - } } } catch (SipException | InvalidArgumentException | ParseException e) { @@ -457,4 +403,74 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements e.printStackTrace(); } } + + public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { + + // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) + Device device = redisCatchStorage.getDevice(requesterId); + Request request = evt.getRequest(); + if (device != null) { + logger.info("收到设备" + requesterId + "的语音广播Invite请求"); + responseAck(evt, Response.TRYING); + + String contentString = new String(request.getRawContent()); + // jainSip不支持y=字段, 移除移除以解析。 + String substring = contentString; + String ssrc = "0000000404"; + int ssrcIndex = contentString.indexOf("y="); + if (ssrcIndex > 0) { + substring = contentString.substring(0, ssrcIndex); + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + } + ssrcIndex = substring.indexOf("f="); + if (ssrcIndex > 0) { + substring = contentString.substring(0, ssrcIndex); + } + SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); + + // 获取支持的格式 + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 查看是否支持PS 负载96 + int port = -1; + //boolean recvonly = false; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (int i = 0; i < mediaDescriptions.size(); i++) { + MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); + Media media = mediaDescription.getMedia(); + + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("8")) { + port = media.getMediaPort(); + String protocol = media.getProtocol(); + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equals(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equals(setup)) { + tcpActive = true; + } else if ("passive".equals(setup)) { + tcpActive = false; + } + } + } + break; + } + } + if (port == -1) { + logger.info("不支持的媒体格式,返回415"); + // 回复不支持的格式 + responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 + return; + } + String username = sdp.getOrigin().getUsername(); + String addressStr = sdp.getOrigin().getAddress(); + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); + + } else { + logger.warn("来自无效设备/平台的请求"); + responseAck(evt, Response.BAD_REQUEST); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 2e8a68ee..8c12c787 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -48,7 +48,7 @@ public interface IMediaServerService { SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean isPlayback); - void closeRTPServer(Device device, String channelId, String ssrc); + void closeRTPServer(String deviceId, String channelId, String ssrc); void clearRTPServer(MediaServerItem mediaServerItem); diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java index 089523f9..5ed6cf34 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java @@ -1,9 +1,10 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; public interface PlayBackCallback { - void call(RequestMessage msg); + void call(PlayBackResult msg); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java new file mode 100644 index 00000000..10a2759f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java @@ -0,0 +1,55 @@ +package com.genersoft.iot.vmp.service.bean; + +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; + +import javax.sip.RequestEvent; + +public class PlayBackResult { + private int code; + private T data; + private MediaServerItem mediaServerItem; + private JSONObject response; + private SipSubscribe.EventResult event; + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public T getData() { + return data; + } + + public void setData(T data) { + this.data = data; + } + + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } + + public JSONObject getResponse() { + return response; + } + + public void setResponse(JSONObject response) { + this.response = response; + } + + public SipSubscribe.EventResult getEvent() { + return event; + } + + public void setEvent(SipSubscribe.EventResult event) { + this.event = event; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 4f08c99d..f226a37f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -160,16 +160,16 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public void closeRTPServer(Device device, String channelId, String stream) { - String mediaServerId = streamSession.getMediaServerId(device.getDeviceId(), channelId, stream); - String ssrc = streamSession.getSSRC(device.getDeviceId(), channelId, stream); + public void closeRTPServer(String deviceId, String channelId, String stream) { + String mediaServerId = streamSession.getMediaServerId(deviceId, channelId, stream); + String ssrc = streamSession.getSSRC(deviceId, channelId, stream); MediaServerItem mediaServerItem = this.getOne(mediaServerId); if (mediaServerItem != null) { - String streamId = String.format("%s_%s", device.getDeviceId(), channelId); + String streamId = String.format("%s_%s", deviceId, channelId); zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); releaseSsrc(mediaServerItem, ssrc); } - streamSession.remove(device.getDeviceId(), channelId, stream); + streamSession.remove(deviceId, channelId, stream); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index de665082..0fefb0cc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; +import com.genersoft.iot.vmp.service.bean.PlayBackResult; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; @@ -115,11 +116,8 @@ public class PlayServiceImpl implements IPlayService { msg.setData(wvpResult); // 点播超时回复BYE cmder.streamByeCmd(device.getDeviceId(), channelId, streamInfo.getStream()); - // 释放rtpserver - mediaServerService.closeRTPServer(playResult.getDevice(), channelId, streamInfo.getStream()); // 回复之前所有的点播请求 resultHolder.invokeAllResult(msg); - // TODO 释放ssrc }); result.onCompletion(()->{ // 点播结束时调用截图接口 @@ -173,7 +171,10 @@ public class PlayServiceImpl implements IPlayService { WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); // 点播返回sip错误 - mediaServerService.closeRTPServer(playResult.getDevice(), channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); + streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); @@ -222,7 +223,10 @@ public class PlayServiceImpl implements IPlayService { logger.info("收到订阅消息: " + response.toJSONString()); onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid); }, (event) -> { - mediaServerService.closeRTPServer(playResult.getDevice(), channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); + streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); @@ -240,7 +244,7 @@ public class PlayServiceImpl implements IPlayService { RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId); + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -298,9 +302,12 @@ public class PlayServiceImpl implements IPlayService { RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(key); + PlayBackResult playBackResult = new PlayBackResult<>(); result.onTimeout(()->{ msg.setData("回放超时"); - callback.call(msg); + playBackResult.setCode(-1); + playBackResult.setData(msg); + callback.call(playBackResult); }); cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); @@ -308,15 +315,24 @@ public class PlayServiceImpl implements IPlayService { if (streamInfo == null) { logger.warn("设备回放API调用失败!"); msg.setData("设备回放API调用失败!"); - callback.call(msg); + playBackResult.setCode(-1); + playBackResult.setData(msg); + callback.call(playBackResult); return; } redisCatchStorage.startPlayback(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); - callback.call(msg); + playBackResult.setCode(0); + playBackResult.setData(msg); + playBackResult.setMediaServerItem(mediaServerItem); + playBackResult.setResponse(response); + callback.call(playBackResult); }, event -> { msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); - callback.call(msg); + playBackResult.setCode(-1); + playBackResult.setData(msg); + playBackResult.setEvent(event); + callback.call(playBackResult); }); return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index 8350d293..fd70690e 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -129,7 +129,6 @@ public class PlayController { //Response response = event.getResponse(); msg.setData(String.format("success")); resultHolder.invokeAllResult(msg); - mediaServerService.closeRTPServer(device, channelId, streamInfo.getStream()); }); if (deviceId != null || channelId != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java index 3607a8d4..b864f466 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java @@ -77,8 +77,8 @@ public class PlaybackController { logger.debug(String.format("设备回放 API调用,deviceId:%s ,channelId:%s", deviceId, channelId)); } - DeferredResult> result = playService.playBack(deviceId, channelId, startTime, endTime, msg->{ - resultHolder.invokeResult(msg); + DeferredResult> result = playService.playBack(deviceId, channelId, startTime, endTime, wvpResult->{ + resultHolder.invokeResult(wvpResult.getData()); }); return result;