处理服务重启或设备重新上线时的订阅,优化通道导入重复的处理

pull/317/head
648540858 2022-01-20 16:48:30 +08:00
parent c5ddf59858
commit 7d88827415
14 changed files with 234 additions and 25 deletions

View File

@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.conf;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.util.Map;
@ -40,4 +39,8 @@ public class DynamicTask {
}
}
public boolean contains(String key) {
return futureMap.get(key) != null;
}
}

View File

@ -1,7 +1,8 @@
package com.genersoft.iot.vmp.conf.runner;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.beans.factory.annotation.Autowired;
@ -28,6 +29,9 @@ public class SipDeviceRunner implements CommandLineRunner {
@Autowired
private UserSetup userSetup;
@Autowired
private IDeviceService deviceService;
@Override
public void run(String... args) throws Exception {
// 读取redis没有心跳信息的则设置为离线等收到下次心跳设置为在线
@ -36,9 +40,15 @@ public class SipDeviceRunner implements CommandLineRunner {
List<String> onlineForAll = redisCatchStorage.getOnlineForAll();
for (String deviceId : onlineForAll) {
storager.online(deviceId);
Device device = redisCatchStorage.getDevice(deviceId);
if (device != null && device.getSubscribeCycleForCatalog() > 0) {
// 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
deviceService.addCatalogSubscribe(device);
}
}
// 重置cseq计数
redisCatchStorage.resetAllCSEQ();
// TODO 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
}
}

View File

@ -35,6 +35,9 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
@Autowired
private IVideoManagerStorager storager;
@Autowired
private IDeviceService deviceService;
@Autowired
private RedisUtil redis;
@ -57,6 +60,7 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
logger.debug("设备上线事件触发deviceId" + event.getDevice().getDeviceId() + ",from:" + event.getFrom());
}
Device device = event.getDevice();
if (device == null) return;
String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId();
switch (event.getFrom()) {
@ -84,15 +88,18 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
}
device.setOnline(1);
Device deviceInstore = storager.queryVideoDevice(device.getDeviceId());
if (deviceInstore != null && deviceInstore.getOnline() == 0) {
Device deviceInStore = storager.queryVideoDevice(device.getDeviceId());
if (deviceInStore != null && deviceInStore.getOnline() == 0) {
List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
}
// 处理上线监听
storager.updateDevice(device);
// TODO 上线添加订阅
// 上线添加订阅
if (device.getSubscribeCycleForCatalog() > 0) {
deviceService.addCatalogSubscribe(device);
}
}
}

View File

@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.*;
@ -76,12 +77,12 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
}else if (event.getGbStreams() != null) {
if (platforms.size() > 0) {
for (GbStream gbStream : event.getGbStreams()) {
if (gbStream == null || StringUtils.isEmpty(gbStream.getGbId())) continue;
List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForStreamWithGBId(gbStream.getApp(),gbStream.getStream(), platforms);
parentPlatformMap.put(gbStream.getGbId(), parentPlatformsForGB);
}
}
}
}
switch (event.getType()) {
case CatalogEvent.ON:

View File

@ -41,6 +41,8 @@ public class DeferredResultHolder {
public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP";
public static final String UPLOAD_FILE_CHANNEL = "UPLOAD_FILE_CHANNEL";
public static final String CALLBACK_CMD_MOBILEPOSITION = "CALLBACK_MOBILEPOSITION";
public static final String CALLBACK_CMD_PRESETQUERY = "CALLBACK_PRESETQUERY";

View File

@ -80,17 +80,14 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
Element rootElement = getRootElement(evt);
String cmd = XmlUtil.getText(rootElement, "CmdType");
if (CmdType.MOBILE_POSITION.equals(cmd)) {
logger.info("接收到MobilePosition订阅");
processNotifyMobilePosition(evt, rootElement);
// } else if (CmdType.ALARM.equals(cmd)) {
// logger.info("接收到Alarm订阅");
// processNotifyAlarm(evt, rootElement);
} else if (CmdType.CATALOG.equals(cmd)) {
logger.info("接收到Catalog订阅");
processNotifyCatalogList(evt, rootElement);
} else {
logger.info("接收到消息:" + cmd);
// responseAck(evt, Response.OK);
Response response = null;
response = getMessageFactory().createResponse(200, request);
@ -132,7 +129,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
String sn = XmlUtil.getText(rootElement, "SN");
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_MobilePosition_" + platformId;
logger.info("接收到{}的MobilePosition订阅", platformId);
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
.append("<Response>\r\n")
@ -182,7 +179,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
String sn = XmlUtil.getText(rootElement, "SN");
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platformId;
logger.info("接收到{}的Catalog订阅", platformId);
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
.append("<Response>\r\n")

View File

@ -30,11 +30,15 @@ public class DeviceServiceImpl implements IDeviceService {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
if (dynamicTask.contains(device.getDeviceId())) {
logger.info("[添加目录订阅] 设备{}的目录订阅以存在", device.getDeviceId());
return false;
}
logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
// 添加目录订阅
CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander);
catalogSubscribeTask.run();
// 提前开始刷新订阅
// TODO 使用jain sip的当时刷新订阅
int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog();
// 设置最小值为30
subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30);

View File

@ -14,15 +14,23 @@ import java.util.Set;
public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPushExcelDto> {
private ErrorDataHandler errorDataHandler;
private IStreamPushService pushService;
private String defaultMediaServerId;
private List<StreamPushItem> streamPushItems = new ArrayList<>();
private Set<String> streamPushStreamSet = new HashSet<>();
private Set<String> streamPushGBSet = new HashSet<>();
private List<String> errorStreamList = new ArrayList<>();
private List<String> errorGBList = new ArrayList<>();
public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId) {
public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) {
this.pushService = pushService;
this.defaultMediaServerId = defaultMediaServerId;
this.errorDataHandler = errorDataHandler;
}
public interface ErrorDataHandler{
void handle(List<String> streams, List<String> gbId);
}
@Override
@ -32,9 +40,16 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
|| StringUtils.isEmpty(streamPushExcelDto.getGbId())) {
return;
}
if (streamPushGBSet.contains(streamPushExcelDto.getGbId())) {
errorGBList.add(streamPushExcelDto.getGbId());
}
if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) {
errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream());
}
if (streamPushGBSet.contains(streamPushExcelDto.getGbId()) || streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream())) {
return;
}
StreamPushItem streamPushItem = new StreamPushItem();
streamPushItem.setApp(streamPushExcelDto.getApp());
streamPushItem.setStream(streamPushExcelDto.getStream());
@ -60,8 +75,11 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
// 这里也要保存数据,确保最后遗留的数据也存储到数据库
pushService.batchAdd(streamPushItems);
if (streamPushItems.size() > 0) {
pushService.batchAdd(streamPushItems);
}
streamPushGBSet.clear();
streamPushStreamSet.clear();
errorDataHandler.handle(errorStreamList, errorGBList);
}
}

View File

@ -50,7 +50,7 @@ public interface StreamPushMapper {
StreamPushItem selectOne(String app, String stream);
@Insert("<script>" +
"INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
"REPLACE INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
"createStamp, aliveSecond, mediaServerId) " +
"VALUES <foreach collection='streamPushItems' item='item' index='index' separator=','>" +
"( '${item.app}', '${item.stream}', '${item.totalReaderCount}', '${item.originType}', " +

View File

@ -4,11 +4,14 @@ import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.ExcelReader;
import com.alibaba.excel.read.metadata.ReadSheet;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.impl.StreamPushUploadFileHandler;
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
@ -18,12 +21,19 @@ import org.apache.poi.sl.usermodel.Sheet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Api(tags = "推流信息管理")
@Controller
@ -39,6 +49,9 @@ public class StreamPushController {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private DeferredResultHolder resultHolder;
@ApiOperation("推流列表查询")
@ApiImplicitParams({
@ApiImplicitParam(name="page", value = "当前页", required = true, dataTypeClass = Integer.class),
@ -103,10 +116,44 @@ public class StreamPushController {
}
@PostMapping(value = "upload")
@ResponseBody
public String uploadChannelFile(@RequestParam(value = "file") MultipartFile file){
public DeferredResult<ResponseEntity<WVPResult<Object>>> uploadChannelFile(@RequestParam(value = "file") MultipartFile file){
// 最多处理文件一个小时
DeferredResult<ResponseEntity<WVPResult<Object>>> result = new DeferredResult<>(60*60*1000L);
// 录像查询以channelId作为deviceId查询
String key = DeferredResultHolder.UPLOAD_FILE_CHANNEL;
String uuid = UUID.randomUUID().toString();
if (file.isEmpty()) {
return "fail";
logger.warn("通道导入文件为空");
WVPResult<Object> wvpResult = new WVPResult<>();
wvpResult.setCode(-1);
wvpResult.setMsg("文件为空");
result.setResult(ResponseEntity.status(HttpStatus.BAD_REQUEST).body(wvpResult));
return result;
}
// 同时只处理一个文件
if (resultHolder.exist(key, null)) {
logger.warn("已有导入任务正在执行");
WVPResult<Object> wvpResult = new WVPResult<>();
wvpResult.setCode(-1);
wvpResult.setMsg("已有导入任务正在执行");
result.setResult(ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(wvpResult));
return result;
}
resultHolder.put(key, uuid, result);
result.onTimeout(()->{
logger.warn("通道导入超时,可能文件过大");
RequestMessage msg = new RequestMessage();
msg.setKey(key);
WVPResult<Object> wvpResult = new WVPResult<>();
wvpResult.setCode(-1);
wvpResult.setMsg("导入超时,可能文件过大");
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
});
//获取文件流
InputStream inputStream = null;
try {
@ -117,11 +164,29 @@ public class StreamPushController {
}
//传入参数
ExcelReader excelReader = EasyExcel.read(inputStream, StreamPushExcelDto.class,
new StreamPushUploadFileHandler(streamPushService, mediaServerService.getDefaultMediaServer().getId())).build();
new StreamPushUploadFileHandler(streamPushService, mediaServerService.getDefaultMediaServer().getId(), (errorStreams, errorGBs)->{
logger.info("通道导入成功存在重复App+Stream为{}个存在国标ID为{}个", errorStreams.size(), errorGBs.size());
RequestMessage msg = new RequestMessage();
msg.setKey(key);
WVPResult<Map<String, List<String>>> wvpResult = new WVPResult<>();
if (errorStreams.size() == 0 && errorGBs.size() == 0) {
wvpResult.setCode(0);
wvpResult.setMsg("成功");
}else {
wvpResult.setCode(1);
wvpResult.setMsg("导入成功。但是存在重复数据");
Map<String, List<String>> errorData = new HashMap<>();
errorData.put("gbId", errorGBs);
errorData.put("stream", errorStreams);
wvpResult.setData(errorData);
}
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
})).build();
ReadSheet readSheet = EasyExcel.readSheet(0).build();
excelReader.read(readSheet);
excelReader.finish();
return "success";
return result;
}

View File

@ -127,8 +127,6 @@
count: that.count
}
}).then(function (res) {
console.log(res);
console.log(res.data.list);
that.total = res.data.total;
that.pushList = res.data.list;
that.getDeviceListLoading = false;

View File

@ -16,6 +16,8 @@
drag
:action="uploadUrl"
name="file"
:on-success="successHook"
:on-error="errorHook"
>
<i class="el-icon-upload"></i>
<div class="el-upload__text">将文件拖到此处<em>点击上传</em></div>
@ -23,14 +25,19 @@
</el-upload>
</div>
</el-dialog>
<ShowErrorData ref="showErrorData" :gbIds="errorGBIds" :streams="errorStreams" ></ShowErrorData>
</div>
</template>
<script>
import ShowErrorData from './importChannelShowErrorData.vue'
export default {
name: "importChannel",
computed: {},
components: {
ShowErrorData,
},
created() {},
data() {
return {
@ -38,6 +45,8 @@ export default {
showDialog: false,
isLoging: false,
isEdit: false,
errorStreams: null,
errorGBIds: null,
uploadUrl: process.env.NODE_ENV === 'development'?`debug/api/push/upload`:`api/push/upload`,
};
},
@ -73,8 +82,35 @@ export default {
},
close: function () {
this.showDialog = false;
this.$refs.form.resetFields();
},
successHook: function(response, file, fileList){
if (response.code === 0) {
this.$message({
showClose: true,
message: response.msg,
type: "success",
});
}else if (response.code === 1) {
this.errorGBIds = response.data.gbId
this.errorStreams = response.data.stream
console.log(this.$refs)
console.log(this.$refs.showErrorData)
this.$refs.showErrorData.openDialog()
}else {
this.$message({
showClose: true,
message: response.msg,
type: "error",
});
}
},
errorHook: function (err, file, fileList) {
this.$message({
showClose: true,
message: err,
type: "error",
});
}
},
};
</script>
@ -82,4 +118,8 @@ export default {
.upload-box{
text-align: center;
}
.errDataBox{
max-height: 15rem;
overflow: auto;
}
</style>

View File

@ -0,0 +1,64 @@
<template>
<div id="importChannelShowErrorData" v-loading="isLoging">
<el-dialog
title="导入通道数据成功,但数据存在重复"
width="30rem"
top="2rem"
:append-to-body="true"
:close-on-click-modal="false"
:visible.sync="showDialog"
:destroy-on-close="true"
@close="close()"
>
<div >
重复国标ID:
<el-button style="float: right;" type="primary" size="mini" icon="el-icon-document-copy" title="点击拷贝" v-clipboard="gbIds.join(',')" @success="$message({type:'success', message:'成功拷贝到粘贴板'})"></el-button>
<ul class="errDataBox">
<li v-for="id in gbIds" >
{{ id }}
</li>
</ul>
</div>
<div >
重复App/stream:
<el-button style="float: right;" type="primary" size="mini" icon="el-icon-document-copy" title="点击拷贝" v-clipboard="streams.join(',')" @success="$message({type:'success', message:'成功拷贝到粘贴板'})"></el-button>
<ul class="errDataBox">
<li v-for="id in streams" >
{{ id }}
</li>
</ul>
</div>
</el-dialog>
</div>
</template>
<script>
export default {
name: "importChannelShowErrorData",
computed: {},
created() {},
props: ['gbIds', 'streams'],
data() {
return {
isLoging: false,
showDialog: false,
};
},
methods: {
openDialog: function () {
this.showDialog = true;
},
close: function () {
this.showDialog = false;
},
},
};
</script>
<style>
.errDataBox{
max-height: 15rem;
overflow: auto;
}
</style>

View File

@ -78,7 +78,7 @@ export default {
this.jessibuca = new window.Jessibuca(Object.assign(
{
container: this.$refs.container,
videoBuffer: 0.5, //
videoBuffer: 0.2, //
isResize: true,
isFlv: true,
decoder: "./static/js/jessibuca/index.js",