From bf66f8f694607fdd41ff9d7f6149459eef96bb67 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 11 Feb 2022 16:45:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A8=E6=B5=81=E5=AF=BC=E5=85=A5=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=B7=BB=E5=8A=A0=E5=B9=B3=E5=8F=B0=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E4=B8=8E=E7=9B=AE=E5=BD=95=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/event/EventPublisher.java | 7 +- .../event/subscribe/catalog/CatalogEvent.java | 6 +- .../subscribe/catalog/CatalogEventLister.java | 4 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../iot/vmp/service/IStreamPushService.java | 2 + .../service/impl/StreamPushServiceImpl.java | 33 ++++++-- .../impl/StreamPushUploadFileHandler.java | 81 ++++++++++++++++--- .../storager/dao/PlatformGbStreamMapper.java | 15 ++++ .../vmp/vmanager/bean/StreamPushExcelDto.java | 23 ++++++ web_src/src/components/PushVideoList.vue | 2 +- 10 files changed, 145 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 426e2e58..8c8565be 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -131,7 +131,7 @@ public class EventPublisher { } @Async - public void catalogEventPublishForStream(String platformId, List gbStreams, String type) { + public void catalogEventPublishForStream(String platformId, GbStream[] gbStreams, String type) { CatalogEvent outEvent = new CatalogEvent(this); outEvent.setGbStreams(gbStreams); outEvent.setType(type); @@ -141,8 +141,7 @@ public class EventPublisher { @Async public void catalogEventPublishForStream(String platformId, GbStream gbStream, String type) { - List gbStreamList = new ArrayList<>(); - gbStreamList.add(gbStream); - catalogEventPublishForStream(platformId, gbStreamList, type); + GbStream[] gbStreams = {gbStream}; + catalogEventPublishForStream(platformId, gbStreams, type); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java index c035b808..e343fec7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java @@ -20,7 +20,7 @@ public class CatalogEvent extends ApplicationEvent { public static final String UPDATE = "UPDATE"; // 更新 private List deviceChannels; - private List gbStreams; + private GbStream[] gbStreams; private String type; private String platformId; @@ -48,11 +48,11 @@ public class CatalogEvent extends ApplicationEvent { this.platformId = platformId; } - public List getGbStreams() { + public GbStream[] getGbStreams() { return gbStreams; } - public void setGbStreams(List gbStreams) { + public void setGbStreams(GbStream[] gbStreams) { this.gbStreams = gbStreams; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 52442cc7..997031b6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -94,7 +94,7 @@ public class CatalogEventLister implements ApplicationListener { if (event.getDeviceChannels() != null) { deviceChannelList.addAll(event.getDeviceChannels()); } - if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ + if (event.getGbStreams() != null && event.getGbStreams().length > 0){ for (GbStream gbStream : event.getGbStreams()) { DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId()); deviceChannelList.add(deviceChannelByStream); @@ -134,7 +134,7 @@ public class CatalogEventLister implements ApplicationListener { if (event.getDeviceChannels() != null) { deviceChannelList.addAll(event.getDeviceChannels()); } - if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ + if (event.getGbStreams() != null && event.getGbStreams().length > 0){ for (GbStream gbStream : event.getGbStreams()) { DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId()); deviceChannelList.add(deviceChannelByStream); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 6e6b7b50..34ee4a86 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -371,7 +371,7 @@ public class ZLMHttpHookListener { } } if (gbStreams.size() > 0) { - eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON); + eventPublisher.catalogEventPublishForStream(null, gbStreams.toArray(new GbStream[0]), CatalogEvent.ON); } }else { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 53ca6c41..1d57d468 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -68,4 +68,6 @@ public interface IStreamPushService { void batchAdd(List streamPushExcelDtoList); boolean batchStop(List streamPushItems); + + void batchAddForUpload(String platformId, String catalogId, List streamPushItems); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 2ce9234a..ff31d8a7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; @@ -18,10 +19,7 @@ import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; +import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -43,6 +41,9 @@ public class StreamPushServiceImpl implements IStreamPushService { @Autowired private ParentPlatformMapper parentPlatformMapper; + @Autowired + private PlatformCatalogMapper platformCatalogMapper; + @Autowired private PlatformGbStreamMapper platformGbStreamMapper; @@ -95,13 +96,12 @@ public class StreamPushServiceImpl implements IStreamPushService { streamPushItem.setMediaServerId(item.getMediaServerId()); streamPushItem.setStream(item.getStream()); streamPushItem.setAliveSecond(item.getAliveSecond()); - streamPushItem.setCreateStamp(item.getCreateStamp()); streamPushItem.setOriginSock(item.getOriginSock()); streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); streamPushItem.setOriginType(item.getOriginType()); streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); streamPushItem.setOriginUrl(item.getOriginUrl()); - streamPushItem.setCreateStamp(item.getCreateStamp()); + streamPushItem.setCreateStamp(item.getCreateStamp() * 1000); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setStatus(true); streamPushItem.setStreamType("push"); @@ -358,6 +358,27 @@ public class StreamPushServiceImpl implements IStreamPushService { } } + @Override + public void batchAddForUpload(String platformId, String catalogId, List streamPushItems) { + streamPushMapper.addAll(streamPushItems); + gbStreamMapper.batchAdd(streamPushItems); + if (platformId != null) { + ParentPlatform platform = parentPlatformMapper.getParentPlatByServerGBId(platformId); + if (platform != null) { + if (catalogId == null) { + catalogId = platform.getCatalogId(); + }else { + PlatformCatalog catalog = platformCatalogMapper.select(catalogId); + if (catalog == null) { + return; + } + } + platformGbStreamMapper.batchAdd(platformId, catalogId, streamPushItems); + eventPublisher.catalogEventPublishForStream(platformId, streamPushItems.toArray(new GbStream[0]), CatalogEvent.ADD); + } + } + } + @Override public boolean batchStop(List gbStreams) { if (gbStreams == null || gbStreams.size() == 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java index 586b5b49..2f947097 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java @@ -7,10 +7,7 @@ import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import org.springframework.util.StringUtils; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; public class StreamPushUploadFileHandler extends AnalysisEventListener { @@ -18,10 +15,13 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener streamPushItems = new ArrayList<>(); + private Map streamPushItemsForPlatform = new HashMap<>(); private Set streamPushStreamSet = new HashSet<>(); private Set streamPushGBSet = new HashSet<>(); private List errorStreamList = new ArrayList<>(); private List errorGBList = new ArrayList<>(); + // 读取数量计数器 + private int loadedSize = 0; public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) { this.pushService = pushService; @@ -33,6 +33,16 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener streams, List gbId); } + private class UploadData{ + public String platformId; + public Map> catalogData = new HashMap<>(); + public List streamPushItems = new ArrayList<>(); + + public UploadData(String platformId) { + this.platformId = platformId; + } + } + @Override public void invoke(StreamPushExcelDto streamPushExcelDto, AnalysisContext analysisContext) { if (StringUtils.isEmpty(streamPushExcelDto.getApp()) @@ -43,10 +53,10 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener streamPushItems = uploadData.catalogData.get(streamPushExcelDto.getCatalogId()); + if (streamPushItems == null) { + streamPushItems = new ArrayList<>(); + uploadData.catalogData.put(streamPushExcelDto.getCatalogId(), streamPushItems); + } + streamPushItems.add(streamPushItem); + }else { + uploadData.streamPushItems.add(streamPushItem); + } + + } + streamPushGBSet.add(streamPushExcelDto.getGbId()); streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream()); - if (streamPushItems.size() > 300) { - pushService.batchAdd(streamPushItems); - // 存储完成清理 list + loadedSize ++; + if (loadedSize > 1000) { + saveData(); streamPushItems.clear(); + streamPushItemsForPlatform.clear(); + loadedSize = 0; } + } @Override public void doAfterAllAnalysed(AnalysisContext analysisContext) { // 这里也要保存数据,确保最后遗留的数据也存储到数据库 - if (streamPushItems.size() > 0) { - pushService.batchAdd(streamPushItems); - } + saveData(); streamPushGBSet.clear(); streamPushStreamSet.clear(); errorDataHandler.handle(errorStreamList, errorGBList); } + + private void saveData(){ + if (streamPushItems.size() > 0) { + pushService.batchAddForUpload(null, null, streamPushItems); + } + // 处理已分配到平台的流 + if (streamPushItemsForPlatform.size() > 0){ + for (String platformId : streamPushItemsForPlatform.keySet()) { + UploadData uploadData = streamPushItemsForPlatform.get(platformId); + if (uploadData.streamPushItems.size() > 0) { + pushService.batchAddForUpload(platformId, null, uploadData.streamPushItems); + } + if (uploadData.catalogData.size() > 0) { + for (String catalogId : uploadData.catalogData.keySet()) { + if (uploadData.catalogData.get(catalogId).size() > 0) { + pushService.batchAddForUpload(platformId, catalogId, uploadData.catalogData.get(catalogId)); + } + } + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java index e4639e71..1e153608 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; @@ -19,6 +20,17 @@ public interface PlatformGbStreamMapper { "('${app}', '${stream}', '${platformId}', '${catalogId}')") int add(PlatformGbStream platformGbStream); + + @Insert("") + int batchAdd(String platformId, String catalogId, List streamPushItems); + @Delete("DELETE FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}") int delByAppAndStream(String app, String stream); @@ -82,4 +94,7 @@ public interface PlatformGbStreamMapper { "" + "") void delByGbStreams(List gbStreams); + + + } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamPushExcelDto.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamPushExcelDto.java index c0c7611f..956f6479 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamPushExcelDto.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamPushExcelDto.java @@ -16,6 +16,12 @@ public class StreamPushExcelDto { @ExcelProperty("国标ID") private String gbId; + @ExcelProperty("平台ID") + private String platformId; + + @ExcelProperty("目录ID") + private String catalogId; + public String getName() { return name; } @@ -47,4 +53,21 @@ public class StreamPushExcelDto { public void setGbId(String gbId) { this.gbId = gbId; } + + + public String getPlatformId() { + return platformId; + } + + public void setPlatformId(String platformId) { + this.platformId = platformId; + } + + public String getCatalogId() { + return catalogId; + } + + public void setCatalogId(String catalogId) { + this.catalogId = catalogId; + } } diff --git a/web_src/src/components/PushVideoList.vue b/web_src/src/components/PushVideoList.vue index 5fa162b5..3831cdcf 100644 --- a/web_src/src/components/PushVideoList.vue +++ b/web_src/src/components/PushVideoList.vue @@ -233,7 +233,7 @@ dateFormat: function(/** timestamp=0 **/) { let ts = arguments[0] || 0; let t,y,m,d,h,i,s; - t = ts ? new Date(ts*1000) : new Date(); + t = ts ? new Date(ts) : new Date(); y = t.getFullYear(); m = t.getMonth()+1; d = t.getDate();