临时提交

pull/1642/head
648540858 2024-06-27 17:51:13 +08:00
parent 400eef2d2e
commit b2aa5c839d
10 changed files with 65 additions and 307 deletions

View File

@ -16,10 +16,13 @@ public interface IGbChannelService {
int offline(CommonGBChannel commonGBChannel); int offline(CommonGBChannel commonGBChannel);
int offline(List<CommonGBChannel> commonGBChannelList);
int online(CommonGBChannel commonGBChannel); int online(CommonGBChannel commonGBChannel);
int online(List<CommonGBChannel> commonGBChannelList);
void closeSend(CommonGBChannel commonGBChannel); void closeSend(CommonGBChannel commonGBChannel);
void batchAdd(List<CommonGBChannel> commonGBChannels); void batchAdd(List<CommonGBChannel> commonGBChannels);
} }

View File

@ -70,7 +70,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
if (statusChangeFromPushStream.isSetAllOffline()) { if (statusChangeFromPushStream.isSetAllOffline()) {
// 所有设备离线 // 所有设备离线
streamPushService.allStreamOffline(); streamPushService.allOffline();
} }
if (statusChangeFromPushStream.getOfflineStreams() != null if (statusChangeFromPushStream.getOfflineStreams() != null
&& statusChangeFromPushStream.getOfflineStreams().size() > 0) { && statusChangeFromPushStream.getOfflineStreams().size() > 0) {
@ -99,7 +99,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{ dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{
logger.info("[REDIS消息]未收到redis回复推流设备状态执行推流设备离线"); logger.info("[REDIS消息]未收到redis回复推流设备状态执行推流设备离线");
// 五秒收不到请求就设置通道离线,然后通知上级离线 // 五秒收不到请求就设置通道离线,然后通知上级离线
streamPushService.allStreamOffline(); streamPushService.allOffline();
}, 5000); }, 5000);
} }
} }

View File

@ -18,11 +18,11 @@ public class StreamPushExcelDto {
@ExcelProperty("国标ID") @ExcelProperty("国标ID")
private String gbId; private String gbId;
@ExcelProperty("平台ID") // @ExcelProperty("平台ID")
private String platformId; // private String platformId;
//
@ExcelProperty("目录ID") // @ExcelProperty("目录ID")
private String catalogId; // private String catalogId;
@ExcelProperty("在线状态") @ExcelProperty("在线状态")
private boolean status; private boolean status;

View File

@ -1,16 +0,0 @@
package com.genersoft.iot.vmp.streamPush.bean;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
*
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class StreamPushInfoForUpdateLoad extends StreamPush{
private String platformId;
private String catalogId;
}

View File

@ -100,18 +100,6 @@ public class StreamPushController {
} }
} }
@DeleteMapping(value = "/batchStop")
@ResponseBody
@Operation(summary = "中止多个推流", security = @SecurityRequirement(name = JwtUtils.HEADER))
public void batchStop(@RequestBody BatchGBStreamParam batchGBStreamParam){
if (batchGBStreamParam.getGbStreams().size() == 0) {
throw new ControllerException(ErrorCode.ERROR100);
}
if (!streamPushService.batchStop(batchGBStreamParam.getGbStreams())){
throw new ControllerException(ErrorCode.ERROR100);
}
}
@PostMapping(value = "upload") @PostMapping(value = "upload")
@ResponseBody @ResponseBody
public DeferredResult<ResponseEntity<WVPResult<Object>>> uploadChannelFile(@RequestParam(value = "file") MultipartFile file){ public DeferredResult<ResponseEntity<WVPResult<Object>>> uploadChannelFile(@RequestParam(value = "file") MultipartFile file){
@ -252,10 +240,8 @@ public class StreamPushController {
if (ObjectUtils.isEmpty(stream.getApp()) && ObjectUtils.isEmpty(stream.getStream())) { if (ObjectUtils.isEmpty(stream.getApp()) && ObjectUtils.isEmpty(stream.getStream())) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "app或stream不可为空"); throw new ControllerException(ErrorCode.ERROR400.getCode(), "app或stream不可为空");
} }
stream.setStatus(false); stream.setGbStatus(false);
stream.setPushIng(false); stream.setPushIng(false);
stream.setAliveSecond(0L);
stream.setTotalReaderCount(0);
if (!streamPushService.add(stream)) { if (!streamPushService.add(stream)) {
throw new ControllerException(ErrorCode.ERROR100); throw new ControllerException(ErrorCode.ERROR100);
} }

View File

@ -144,7 +144,7 @@ public interface StreamPushMapper {
"(#{item.app}, #{item.stream}) " + "(#{item.app}, #{item.stream}) " +
"</foreach>" + "</foreach>" +
")</script>") ")</script>")
List<GbStream> getOnlinePusherForGbInList(List<StreamPushItemFromRedis> offlineStreams); List<StreamPush> getOnlinePusherForGbInList(List<StreamPushItemFromRedis> offlineStreams);
@Update("<script> "+ @Update("<script> "+
"UPDATE wvp_stream_push SET status=0 where (app, stream) in (" + "UPDATE wvp_stream_push SET status=0 where (app, stream) in (" +
@ -161,7 +161,7 @@ public interface StreamPushMapper {
"(#{item.app}, #{item.stream}) " + "(#{item.app}, #{item.stream}) " +
"</foreach>" + "</foreach>" +
") </script>") ") </script>")
List<GbStream> getOfflinePusherForGbInList(List<StreamPushItemFromRedis> onlineStreams); List<StreamPush> getOfflinePusherForGbInList(List<StreamPushItemFromRedis> onlineStreams);
@Update("<script> "+ @Update("<script> "+
"UPDATE wvp_stream_push SET status=1 where (app, stream) in (" + "UPDATE wvp_stream_push SET status=1 where (app, stream) in (" +
@ -172,7 +172,7 @@ public interface StreamPushMapper {
void online(List<StreamPushItemFromRedis> onlineStreams); void online(List<StreamPushItemFromRedis> onlineStreams);
@Select("SELECT gs.* FROM wvp_stream_push sp left join wvp_gb_stream gs on sp.app = gs.app AND sp.stream = gs.stream where sp.status = true") @Select("SELECT gs.* FROM wvp_stream_push sp left join wvp_gb_stream gs on sp.app = gs.app AND sp.stream = gs.stream where sp.status = true")
List<GbStream> getOnlinePusherForGb(); List<StreamPush> getOnlinePusherForGb();
@Update("UPDATE wvp_stream_push SET status=0") @Update("UPDATE wvp_stream_push SET status=0")
void setAllStreamOffline(); void setAllStreamOffline();
@ -210,4 +210,5 @@ public interface StreamPushMapper {
" left join wvp_gb_stream wgs on wgs.app = wsp.app and wgs.stream = wsp.stream") " left join wvp_gb_stream wgs on wgs.app = wsp.app and wgs.stream = wsp.stream")
Map<String, StreamPush> getAllGBId(); Map<String, StreamPush> getAllGBId();
StreamPush select(Integer streamPushId);
} }

View File

@ -2,8 +2,8 @@ package com.genersoft.iot.vmp.streamPush.enent;
import com.alibaba.excel.context.AnalysisContext; import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener; import com.alibaba.excel.event.AnalysisEventListener;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.streamPush.bean.StreamPushExcelDto; import com.genersoft.iot.vmp.streamPush.bean.StreamPushExcelDto;
import com.genersoft.iot.vmp.streamPush.bean.StreamPushInfoForUpdateLoad;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.google.common.collect.BiMap; import com.google.common.collect.BiMap;
@ -17,37 +17,22 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
/** /**
* *
*/ */
private ErrorDataHandler errorDataHandler; private final ErrorDataHandler errorDataHandler;
/** /**
* *
*/ */
private IStreamPushService pushService; private final IStreamPushService pushService;
/** /**
* ID * ID
*/ */
private String defaultMediaServerId; private final String defaultMediaServerId;
/**
*
*/
private final List<StreamPushInfoForUpdateLoad> streamPushItems = new ArrayList<>();
/** /**
* APP+Streamstream_pushgb_stream * APP+Streamstream_pushgb_stream
*/ */
private final Map<String, StreamPushInfoForUpdateLoad> streamPushItemForSave = new HashMap<>(); private final Map<String, StreamPush> streamPushItemForSave = new HashMap<>();
/**
* APP+StreamKEY ID+Id valuegb_streamapp+Stream
*/
private final Map<String, List<String[]>> streamPushItemsForPlatform = new HashMap<>();
/**
* app+Stream+ID
*/
private final Set<String> streamPushStreamSet = new HashSet<>();
/** /**
* APP+Stream->ID , APP+Stream->ID * APP+Stream->ID , APP+Stream->ID
@ -115,49 +100,21 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
} }
} }
if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) { StreamPush streamPush = new StreamPush();
errorStreamList.add("行:" + rowIndex + ", " + streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ " 平台信息重复"); streamPush.setApp(streamPushExcelDto.getApp());
return; streamPush.setStream(streamPushExcelDto.getStream());
}else { streamPush.setGbDeviceId(streamPushExcelDto.getGbId());
if (pushMapInDb.get(streamPushExcelDto.getApp()+streamPushExcelDto.getStream()) != null) { streamPush.setGbStatus(streamPushExcelDto.isStatus());
errorStreamList.add("行:" + rowIndex + ", " + streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ " 数据已存在"); streamPush.setCreateTime(DateUtil.getNow());
return; streamPush.setMediaServerId(defaultMediaServerId);
} streamPush.setGbName(streamPushExcelDto.getName());
streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId());
}
StreamPushInfoForUpdateLoad streamPushInfoForUpdateLoad = new StreamPushInfoForUpdateLoad(); streamPushItemForSave.put(streamPush.getApp() + streamPush.getStream(), streamPush);
streamPushInfoForUpdateLoad.setApp(streamPushExcelDto.getApp());
streamPushInfoForUpdateLoad.setStream(streamPushExcelDto.getStream());
streamPushInfoForUpdateLoad.setGbDeviceId(streamPushExcelDto.getGbId());
streamPushInfoForUpdateLoad.setGbStatus(streamPushExcelDto.isStatus());
streamPushInfoForUpdateLoad.setCreateTime(DateUtil.getNow());
streamPushInfoForUpdateLoad.setMediaServerId(defaultMediaServerId);
streamPushInfoForUpdateLoad.setGbName(streamPushExcelDto.getName());
streamPushInfoForUpdateLoad.setPlatformId(streamPushExcelDto.getPlatformId());
streamPushInfoForUpdateLoad.setCatalogId(streamPushExcelDto.getCatalogId());
// 存入所有的通道信息
streamPushItems.add(streamPushInfoForUpdateLoad);
streamPushItemForSave.put(streamPushInfoForUpdateLoad.getApp() + streamPushInfoForUpdateLoad.getStream(), streamPushInfoForUpdateLoad);
if (!ObjectUtils.isEmpty(streamPushExcelDto.getPlatformId())) {
List<String[]> platformList = streamPushItemsForPlatform.computeIfAbsent(streamPushInfoForUpdateLoad.getApp() + streamPushInfoForUpdateLoad.getStream(), k -> new ArrayList<>());
String platformId = streamPushExcelDto.getPlatformId();
String catalogId = streamPushExcelDto.getCatalogId();
if (ObjectUtils.isEmpty(streamPushExcelDto.getCatalogId())) {
catalogId = null;
}
String[] platFormInfoArray = new String[]{platformId, catalogId};
platformList.add(platFormInfoArray);
}
loadedSize ++; loadedSize ++;
if (loadedSize > 1000) { if (loadedSize > 1000) {
saveData(); saveData();
streamPushItems.clear();
streamPushItemForSave.clear(); streamPushItemForSave.clear();
streamPushItemsForPlatform.clear();
loadedSize = 0; loadedSize = 0;
} }
@ -167,18 +124,15 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
public void doAfterAllAnalysed(AnalysisContext analysisContext) { public void doAfterAllAnalysed(AnalysisContext analysisContext) {
// 这里也要保存数据,确保最后遗留的数据也存储到数据库 // 这里也要保存数据,确保最后遗留的数据也存储到数据库
saveData(); saveData();
streamPushItems.clear();
streamPushItemForSave.clear(); streamPushItemForSave.clear();
gBMap.clear(); gBMap.clear();
streamPushStreamSet.clear();
streamPushItemsForPlatform.clear();
errorDataHandler.handle(errorStreamList, errorInfoList); errorDataHandler.handle(errorStreamList, errorInfoList);
} }
private void saveData(){ private void saveData(){
if (!streamPushItemForSave.isEmpty()) { if (!streamPushItemForSave.isEmpty()) {
// 向数据库查询是否存在重复的app // 向数据库查询是否存在重复的app
pushService.batchAddForUpload(new ArrayList<>(streamPushItemForSave.values()), streamPushItemsForPlatform); pushService.batchAdd(new ArrayList<>(streamPushItemForSave.values()));
} }
} }
} }

View File

@ -1,13 +1,9 @@
package com.genersoft.iot.vmp.streamPush.service; package com.genersoft.iot.vmp.streamPush.service;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.streamPush.bean.StreamPushInfoForUpdateLoad;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import org.springframework.transaction.annotation.Transactional;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -17,20 +13,6 @@ import java.util.Map;
*/ */
public interface IStreamPushService { public interface IStreamPushService {
/**
* ID
* @param stream
* @return
*/
boolean saveToGB(GbStream stream);
/**
* ID
* @param stream
* @return
*/
boolean removeFromGB(GbStream stream);
/** /**
* *
*/ */
@ -38,8 +20,6 @@ public interface IStreamPushService {
List<StreamPush> getPushList(String mediaSererId); List<StreamPush> getPushList(String mediaSererId);
StreamPush transform(OnStreamChangedHookParam item);
StreamPush getPush(String app, String streamId); StreamPush getPush(String app, String streamId);
boolean stop(StreamPush streamPush); boolean stop(StreamPush streamPush);
@ -61,36 +41,16 @@ public interface IStreamPushService {
*/ */
void zlmServerOffline(String mediaServerId); void zlmServerOffline(String mediaServerId);
/**
*
*/
void clean();
boolean saveToRandomGB();
/** /**
* *
*/ */
void batchAdd(List<StreamPush> streamPushExcelDtoList); void batchAdd(List<StreamPush> streamPushExcelDtoList);
@Transactional
void batchAddForUpload(List<StreamPushInfoForUpdateLoad> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll);
/**
*
*/
boolean batchStop(List<GbStream> streamPushItems);
/**
*
*/
void batchAddForUpload(List<StreamPushInfoForUpdateLoad> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll);
/** /**
* 线 * 线
*/ */
void allStreamOffline(); void allOffline();
/** /**
* 线 * 线
@ -123,9 +83,6 @@ public interface IStreamPushService {
Map<String, StreamPush> getAllAppAndStreamMap(); Map<String, StreamPush> getAllAppAndStreamMap();
void updatePush(OnStreamChangedHookParam param);
Map<String, StreamPush> getAllGBId(); Map<String, StreamPush> getAllGBId();
void updateStatus(StreamPush push); void updateStatus(StreamPush push);

View File

@ -3,14 +3,8 @@ package com.genersoft.iot.vmp.streamPush.service.impl;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -23,12 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.streamPush.bean.StreamPushInfoForUpdateLoad;
import com.genersoft.iot.vmp.streamPush.dao.StreamPushMapper; import com.genersoft.iot.vmp.streamPush.dao.StreamPushMapper;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
@ -56,21 +45,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Autowired @Autowired
private StreamPushMapper streamPushMapper; private StreamPushMapper streamPushMapper;
@Autowired
private StreamProxyMapper streamProxyMapper;
@Autowired
private ParentPlatformMapper parentPlatformMapper;
@Autowired
private PlatformCatalogMapper platformCatalogMapper;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private EventPublisher eventPublisher;
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
@ -86,9 +60,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Autowired @Autowired
TransactionDefinition transactionDefinition; TransactionDefinition transactionDefinition;
@Autowired
private MediaConfig mediaConfig;
@Autowired @Autowired
private IGbChannelService gbChannelService; private IGbChannelService gbChannelService;
@ -473,146 +444,39 @@ public class StreamPushServiceImpl implements IStreamPushService {
gbChannelService.batchAdd(commonGBChannels); gbChannelService.batchAdd(commonGBChannels);
} }
@Override @Override
@Transactional public void allOffline() {
public void batchAddForUpload(List<StreamPushInfoForUpdateLoad> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) { List<StreamPush> onlinePushers = streamPushMapper.getOnlinePusherForGb();
// 存储数据到stream_push表 if (onlinePushers.isEmpty()) {
streamPushMapper.addAll(streamPushItems);
List<CommonGBChannel> channelList = new ArrayList<>();
for (StreamPush streamPushItem : streamPushItems) {
if (streamPushItem.getGbDeviceId() != null && streamPushItem.getId() > 0) {
channelList.add(streamPushItem.getCommonGBChannel());
}
}
// 存储数据到gb_stream表 id会返回到streamPushItemForGbStream里
if (!channelList.isEmpty()) {
gbChannelService.batchAdd(channelList);
}
if (!streamPushItemsForPlatform.isEmpty()) {
// 获取所有平台,平台和目录信息一般不会特别大量。
List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
if (parentPlatformList.isEmpty()) {
return; return;
} }
for (ParentPlatform platform : parentPlatformList) { List<CommonGBChannel> commonGBChannelList = new ArrayList<>();
Map<String, PlatformCatalog> catalogMap = new HashMap<>(); for (StreamPush onlinePusher : onlinePushers) {
commonGBChannelList.add(onlinePusher.getCommonGBChannel());
// 创建根节点
PlatformCatalog platformCatalog = new PlatformCatalog();
platformCatalog.setId(platform.getServerGBId());
catalogMap.put(platform.getServerGBId(), platformCatalog);
// 查询所有节点信息
List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
if (platformCatalogs.size() > 0) {
for (PlatformCatalog catalog : platformCatalogs) {
catalogMap.put(catalog.getId(), catalog);
} }
} gbChannelService.offline(commonGBChannelList);
platformInfoMap.put(platform.getServerGBId(), catalogMap);
}
List<StreamPush> streamPushItemListFroPlatform = new ArrayList<>();
Map<String, List<GbStream>> platformForEvent = new HashMap<>();
// 遍历存储结果查找app+Stream->platformId+catalogId的对应关系然后执行批量写入
for (StreamPush streamPushItem : streamPushItemsForPlatform) {
List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
if (platFormInfoList != null && platFormInfoList.size() > 0) {
for (String[] platFormInfoArray : platFormInfoList) {
StreamPush streamPushItemForPlatform = new StreamPush();
streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
if (platFormInfoArray.length > 0) {
// 数组 platFormInfoArray 0 为平台ID。 1为目录ID
// 不存在这个平台,则忽略导入此关联关系
if (platformInfoMap.get(platFormInfoArray[0]) == null
|| platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
log.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
continue;
}
streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
if (gbStreamList == null) {
gbStreamList = new ArrayList<>();
platformForEvent.put(platFormInfoArray[0], gbStreamList);
}
// 为发送通知整理数据
streamPushItemForPlatform.setName(streamPushItem.getName());
streamPushItemForPlatform.setApp(streamPushItem.getApp());
streamPushItemForPlatform.setStream(streamPushItem.getStream());
streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
gbStreamList.add(streamPushItemForPlatform);
}
if (platFormInfoArray.length > 1) {
streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
}
streamPushItemListFroPlatform.add(streamPushItemForPlatform);
}
}
}
if (!streamPushItemListFroPlatform.isEmpty()) {
platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
// 发送通知
for (String platformId : platformForEvent.keySet()) {
eventPublisher.catalogEventPublishForStream(
platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
}
}
}
}
@Override
public boolean batchStop(List<GbStream> gbStreams) {
if (gbStreams == null || gbStreams.size() == 0) {
return false;
}
gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
platformGbStreamMapper.delByGbStreams(gbStreams);
gbStreamMapper.batchDelForGbStream(gbStreams);
int delStream = streamPushMapper.delAllForGbStream(gbStreams);
if (delStream > 0) {
for (GbStream gbStream : gbStreams) {
MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
}
}
return true;
}
@Override
public void allStreamOffline() {
List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
if (onlinePushers.size() == 0) {
return;
}
streamPushMapper.setAllStreamOffline();
// 发送通知
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
} }
@Override @Override
public void offline(List<StreamPushItemFromRedis> offlineStreams) { public void offline(List<StreamPushItemFromRedis> offlineStreams) {
// 更新部分设备离线 // 更新部分设备离线
List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams); List<StreamPush> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
streamPushMapper.offline(offlineStreams); List<CommonGBChannel> commonGBChannelList = new ArrayList<>();
// 发送通知 for (StreamPush onlinePusher : onlinePushers) {
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); commonGBChannelList.add(onlinePusher.getCommonGBChannel());
}
gbChannelService.offline(commonGBChannelList);
} }
@Override @Override
public void online(List<StreamPushItemFromRedis> onlineStreams) { public void online(List<StreamPushItemFromRedis> onlineStreams) {
// 更新部分设备上线streamPushService // 更新部分设备上线streamPushService
List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams); List<StreamPush> offlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
streamPushMapper.online(onlineStreams); List<CommonGBChannel> commonGBChannelList = new ArrayList<>();
// 发送通知 for (StreamPush onlinePusher : offlinePushers) {
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON); commonGBChannelList.add(onlinePusher.getCommonGBChannel());
}
gbChannelService.online(commonGBChannelList);
} }
@Override @Override
@ -641,18 +505,27 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override @Override
public void updateStatus(StreamPush push) { public void updateStatus(StreamPush push) {
if (push.getGbDeviceId() != null) {
return;
}
if (push.getGbStatus()) {
gbChannelService.online(push.getCommonGBChannel());
}else {
gbChannelService.offline(push.getCommonGBChannel());
}
} }
@Override @Override
public void updatePushStatus(Integer streamPushId, boolean pushIng) { public void updatePushStatus(Integer streamPushId, boolean pushIng) {
streamPushInDb.setPushIng(true); StreamPush streamPushInDb = streamPushMapper.select(streamPushId);
streamPushInDb.setPushIng(pushIng);
if (userSetting.isUsePushingAsStatus()) { if (userSetting.isUsePushingAsStatus()) {
streamPushInDb.setGbStatus(true); streamPushInDb.setGbStatus(pushIng);
} }
streamPushInDb.setPushTime(DateUtil.getNow()); streamPushInDb.setPushTime(DateUtil.getNow());
updateStatus(streamPushInDb);
} }
private List<StreamPush> handleJSON(List<StreamInfo> streamInfoList) { private List<StreamPush> handleJSON(List<StreamInfo> streamInfoList) {

View File

@ -2,4 +2,4 @@ spring:
application: application:
name: wvp name: wvp
profiles: profiles:
active: local271 active: local272