diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 557563ee..18ad2b01 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -49,6 +49,7 @@ public class CatalogEventLister implements ApplicationListener { ParentPlatform parentPlatform = null; Map> parentPlatformMap = new HashMap<>(); + Map channelMap = new HashMap<>(); if (!ObjectUtils.isEmpty(event.getPlatformId())) { subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId()); if (subscribe == null) { @@ -67,6 +68,7 @@ public class CatalogEventLister implements ApplicationListener { for (DeviceChannel deviceChannel : event.getDeviceChannels()) { List parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms); parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB); + channelMap.put(deviceChannel.getChannelId(), deviceChannel); } } }else if (event.getGbStreams() != null) { @@ -174,7 +176,7 @@ public class CatalogEventLister implements ApplicationListener { } logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); List deviceChannelList = new ArrayList<>(); - DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId); + DeviceChannel deviceChannel = channelMap.get(gbId); deviceChannelList.add(deviceChannel); GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId); if(gbStream != null){ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 2082d39e..a9ce759f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -597,6 +597,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { Integer finalIndex = index; String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels, deviceChannels.size(), type, subscribeInfo); + System.out.println(catalogXmlContent); logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size()); sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); @@ -626,7 +627,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List channels, int sumNum, String type, SubscribeInfo subscribeInfo) { StringBuffer catalogXml = new StringBuffer(600); - String characterSet = parentPlatform.getCharacterSet(); catalogXml.append("\r\n") .append("\r\n") diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index 7cbfe70e..f3f74317 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.google.common.primitives.Bytes; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import org.apache.commons.lang3.ArrayUtils; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -172,6 +171,7 @@ public abstract class SIPRequestProcessorParent { return getRootElement(evt, "gb2312"); } public Element getRootElement(RequestEvent evt, String charset) throws DocumentException { + if (charset == null) { charset = "gb2312"; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 460a5078..1b7daf02 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -25,6 +25,7 @@ import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -67,7 +68,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor private final static String talkKey = "notify-request-for-mobile-position-task"; -// @Async("taskExecutor") + @Async("taskExecutor") public void process(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); @@ -172,11 +173,11 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor deviceChannel.setGpsTime(mobilePosition.getTime()); updateChannelMap.put(deviceId + channelId, deviceChannel); addMobilePositionList.add(mobilePosition); - if(updateChannelMap.size() > 100) { + if(updateChannelMap.size() > 2000) { executeSaveChannel(); } if (userSetting.isSavePositionHistory()) { - if(addMobilePositionList.size() > 100) { + if(addMobilePositionList.size() > 2000) { executeSaveMobilePosition(); } } @@ -212,8 +213,8 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor dynamicTask.execute(); try { logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size()); -// ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); -// deviceChannelService.batchUpdateChannelGPS(deviceChannels); + ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); + deviceChannelService.batchUpdateChannelGPS(deviceChannels); updateChannelMap.clear(); }catch (Exception e) { @@ -223,8 +224,8 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor public void executeSaveMobilePosition(){ if (userSetting.isSavePositionHistory()) { try { -// logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); -// deviceChannelService.batchAddMobilePosition(addMobilePositionList); + logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); + deviceChannelService.batchAddMobilePosition(addMobilePositionList); addMobilePositionList.clear(); }catch (Exception e) { logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 2dd107a7..da348b78 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -136,9 +136,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } else if (CmdType.MOBILE_POSITION.equals(cmd)) { // logger.info("接收到MobilePosition通知"); // processNotifyMobilePosition(take.getEvt()); - taskExecutor.execute(() -> { +// taskExecutor.execute(() -> { notifyRequestForMobilePositionProcessor.process(take.getEvt()); - }); +// }); } else { logger.info("接收到消息:" + cmd); @@ -226,8 +226,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } else { mobilePosition.setAltitude(0.0); } - logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude()); +// logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), +// mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); // 更新device channel 的经纬度 @@ -370,8 +370,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements this.redisCatchStorage = redisCatchStorage; } - @Scheduled(fixedRate = 1000) //每1秒执行一次 + @Scheduled(fixedRate = 10000) //每1秒执行一次 public void execute(){ - System.out.println("待处理消息数量: " + taskQueue.size()); + logger.info("[待处理Notify消息数量]: {}", taskQueue.size()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index bc052767..26867eed 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -149,7 +149,6 @@ public class PlayServiceImpl implements IPlayService { logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); } - Device device = redisCatchStorage.getDevice(deviceId); if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) { logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId); @@ -163,6 +162,8 @@ public class PlayServiceImpl implements IPlayService { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); if (inviteInfo != null ) { if (inviteInfo.getStreamInfo() == null) { + // 释放生成的ssrc,使用上一次申请的 + ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); // 点播发起了但是尚未成功, 仅注册回调等待结果即可 inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback); logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId);