diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java index a9842123..816917d4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java @@ -7,6 +7,9 @@ import lombok.Data; @Schema(description = "国标通道") public class CommonGBChannel { + @Schema(description = "国标-数据库自增ID") + private int gbId; + @Schema(description = "国标-编码") private String gbDeviceId; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java new file mode 100644 index 00000000..4858bc5a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java @@ -0,0 +1,25 @@ +package com.genersoft.iot.vmp.gb28181.service; + +import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; + +import java.util.List; + +public interface IGbChannelService { + + CommonGBChannel queryByDeviceId(String gbDeviceId); + + int add(CommonGBChannel commonGBChannel); + + int delete(int gbId); + + int update(CommonGBChannel commonGBChannel); + + int offline(CommonGBChannel commonGBChannel); + + int online(CommonGBChannel commonGBChannel); + + void closeSend(CommonGBChannel commonGBChannel); + + void batchAdd(List commonGBChannels); + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java new file mode 100644 index 00000000..adf5251c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -0,0 +1,10 @@ +package com.genersoft.iot.vmp.gb28181.service.impl; + +import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class GbChannelServiceImpl implements IGbChannelService { +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java index 2413f567..1e1fcca1 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java @@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.media.event.mediaServer; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; -import com.genersoft.iot.vmp.service.IStreamPushService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -23,9 +22,6 @@ public class MediaServerStatusEventListener { private final static Logger logger = LoggerFactory.getLogger(MediaServerStatusEventListener.class); - @Autowired - private IStreamPushService streamPushService; - @Autowired private IStreamProxyService streamProxyService; @@ -36,7 +32,6 @@ public class MediaServerStatusEventListener { @EventListener public void onApplicationEvent(MediaServerOnlineEvent event) { logger.info("[媒体节点] 上线 ID:" + event.getMediaServerId()); - streamPushService.zlmServerOnline(event.getMediaServerId()); streamProxyService.zlmServerOnline(event.getMediaServerId()); playService.zlmServerOnline(event.getMediaServerId()); } @@ -48,7 +43,6 @@ public class MediaServerStatusEventListener { logger.info("[媒体节点] 离线,ID:" + event.getMediaServerId()); // 处理ZLM离线 streamProxyService.zlmServerOffline(event.getMediaServerId()); - streamPushService.zlmServerOffline(event.getMediaServerId()); playService.zlmServerOffline(event.getMediaServerId()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java index 41d4af96..eeb444dc 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java @@ -8,6 +8,7 @@ import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import lombok.EqualsAndHashCode; import org.jetbrains.annotations.NotNull; +import org.springframework.util.ObjectUtils; @Data @@ -102,6 +103,16 @@ public class StreamPush extends CommonGBChannel implements Comparable getAllGBId(); void updateStatus(StreamPush push); + + void deleteByAppAndStream(String app, String stream); + + void updatePushStatus(Integer streamPushId, boolean pushIng); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 2f159336..31dfe4cc 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -5,19 +5,22 @@ import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.media.bean.MediaInfo; +import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; @@ -28,6 +31,7 @@ import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -36,18 +40,17 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.util.*; import java.util.stream.Collectors; @Service +@Slf4j @DS("master") public class StreamPushServiceImpl implements IStreamPushService { - private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class); - @Autowired private StreamPushMapper streamPushMapper; @@ -84,6 +87,9 @@ public class StreamPushServiceImpl implements IStreamPushService { @Autowired private MediaConfig mediaConfig; + @Autowired + private IGbChannelService gbChannelService; + /** * 流到来的处理 */ @@ -107,17 +113,17 @@ public class StreamPushServiceImpl implements IStreamPushService { streamAuthorityInfo.setOriginType(mediaInfo.getOriginType()); } redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo); - StreamPush transform = StreamPush.getInstance(event, userSetting.getServerId()); - transform.setPushIng(true); - transform.setUpdateTime(DateUtil.getNow()); - transform.setPushTime(DateUtil.getNow()); - transform.setSelf(true); - StreamPush pushInDb = getPush(event.getApp(), event.getStream()); - if (pushInDb == null) { - transform.setCreateTime(DateUtil.getNow()); - add(transform); + + StreamPush streamPushInDb = getPush(event.getApp(), event.getStream()); + if (streamPushInDb == null) { + StreamPush streamPush = StreamPush.getInstance(event, userSetting.getServerId()); + streamPush.setPushIng(true); + streamPush.setUpdateTime(DateUtil.getNow()); + streamPush.setPushTime(DateUtil.getNow()); + streamPush.setSelf(true); + add(streamPush); }else { - update(transform); + updatePushStatus(streamPushInDb.getId(), true); } // 冗余数据,自己系统中自用 if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) { @@ -143,6 +149,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { + // 兼容流注销时类型从redis记录获取 MediaInfo mediaInfo = redisCatchStorage.getStreamInfo( event.getApp(), event.getStream(), event.getMediaServer().getId()); @@ -178,37 +185,22 @@ public class StreamPushServiceImpl implements IStreamPushService { } } - - private List handleJSON(List streamInfoList) { - if (streamInfoList == null || streamInfoList.isEmpty()) { - return null; - } - Map result = new HashMap<>(); - for (StreamInfo streamInfo : streamInfoList) { - // 不保存国标推理以及拉流代理的流 - if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal() - || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal() - || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { - String key = streamInfo.getApp() + "_" + streamInfo.getStream(); - StreamPush streamPushItem = result.get(key); - if (streamPushItem == null) { - streamPushItem = streamPushItem.getInstance(streamInfo); - result.put(key, streamPushItem); - } - } - } - return new ArrayList<>(result.values()); + /** + * 流媒体节点上线 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaServerOnlineEvent event) { + zlmServerOnline(event.getMediaServerId()); } - @Override - public StreamPush transform(OnStreamChangedHookParam item) { - StreamPush streamPushItem = new StreamPush(); - streamPushItem.setApp(item.getApp()); - streamPushItem.setMediaServerId(item.getMediaServerId()); - streamPushItem.setStream(item.getStream()); - streamPushItem.setCreateTime(DateUtil.getNow()); - streamPushItem.setServerId(item.getSeverId()); - return streamPushItem; + /** + * 流媒体节点离线 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaServerOfflineEvent event) { + zlmServerOffline(event.getMediaServerId()); } @Override @@ -225,29 +217,113 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override - public StreamPush getPush(String app, String streamId) { - return streamPushMapper.selectOne(app, streamId); + public StreamPush getPush(String app, String stream) { + return streamPushMapper.selectByAppAndStream(app, stream); } @Override - public boolean stop(String app, String stream) { - logger.info("[推流] 停止推流: {}/{}", app, stream); - StreamPush streamPushItem = streamPushMapper.selectOne(app, stream); - if (streamPushItem != null) { - gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); + @Transactional + public boolean add(StreamPush stream) { + log.info("[添加推流] app: {}, stream: {}, 国标编号: {}", stream.getApp(), stream.getStream(), stream.getGbDeviceId()); + stream.setUpdateTime(DateUtil.getNow()); + stream.setCreateTime(DateUtil.getNow()); + int addResult = streamPushMapper.add(stream); + if (addResult <= 0) { + return false; } + if (ObjectUtils.isEmpty(stream.getGbDeviceId())) { + return true; + } + CommonGBChannel channel = gbChannelService.queryByDeviceId(stream.getGbDeviceId()); + if (channel != null) { + log.info("[添加推流]失败,国标编号已存在: {} app: {}, stream: {}, ", stream.getGbDeviceId(), stream.getApp(), stream.getStream()); + } + int addChannelResult = gbChannelService.add(stream.getCommonGBChannel()); + return addChannelResult > 0; + } - platformGbStreamMapper.delByAppAndStream(app, stream); - gbStreamMapper.del(app, stream); - int delStream = streamPushMapper.del(app, stream); - if (delStream > 0) { - MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); - mediaServerService.closeStreams(mediaServerItem,app, stream); + @Override + @Transactional + public void deleteByAppAndStream(String app, String stream) { + log.info("[删除推流] app: {}, stream: {}, ", app, stream); + StreamPush streamPush = streamPushMapper.selectByAppAndStream(app, stream); + if (streamPush == null) { + log.info("[删除推流]失败, 不存在 app: {}, stream: {}, ", app, stream); + return; + } + if (streamPush.isPushIng()) { + stop(streamPush); + } + if (streamPush.getGbId() > 0) { + gbChannelService.delete(streamPush.getGbId()); + } + } + @Override + @Transactional + public boolean update(StreamPush streamPush) { + log.info("[更新推流]:id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream()); + assert streamPush.getId() != null; + streamPush.setUpdateTime(DateUtil.getNow()); + streamPushMapper.update(streamPush); + if (streamPush.getGbId() > 0) { + gbChannelService.update(streamPush.getCommonGBChannel()); + } + return true; + } + + + @Override + @Transactional + public boolean stop(StreamPush streamPush) { + log.info("[主动停止推流] id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream()); + MediaServer mediaServer = null; + if (streamPush.getMediaServerId() == null) { + log.info("[主动停止推流]未找到使用MediaServer,开始自动检索 id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream()); + mediaServer = mediaServerService.getMediaServerByAppAndStream(streamPush.getApp(), streamPush.getStream()); + if (mediaServer != null) { + log.info("[主动停止推流] 检索到MediaServer为{}, id: {}, app: {}, stream: {}, ", mediaServer.getId(), streamPush.getId(), streamPush.getApp(), streamPush.getStream()); + }else { + log.info("[主动停止推流]未找到使用MediaServer id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream()); + } + }else { + mediaServer = mediaServerService.getOne(streamPush.getMediaServerId()); + if (mediaServer == null) { + log.info("[主动停止推流]未找到使用的MediaServer: {},开始自动检索 id: {}, app: {}, stream: {}, ",streamPush.getMediaServerId(), streamPush.getId(), streamPush.getApp(), streamPush.getStream()); + mediaServer = mediaServerService.getMediaServerByAppAndStream(streamPush.getApp(), streamPush.getStream()); + if (mediaServer != null) { + log.info("[主动停止推流] 检索到MediaServer为{}, id: {}, app: {}, stream: {}, ", mediaServer.getId(), streamPush.getId(), streamPush.getApp(), streamPush.getStream()); + }else { + log.info("[主动停止推流]未找到使用MediaServer id: {}, app: {}, stream: {}, ", streamPush.getId(), streamPush.getApp(), streamPush.getStream()); + } + } + } + if (mediaServer != null) { + mediaServerService.closeStreams(mediaServer, streamPush.getApp(), streamPush.getStream()); + } + streamPush.setPushIng(false); + if (userSetting.isUsePushingAsStatus()) { + streamPush.setGbStatus(false); + gbChannelService.offline(streamPush.getCommonGBChannel()); + } + gbChannelService.closeSend(streamPush.getCommonGBChannel()); + streamPush.setUpdateTime(DateUtil.getNow()); + streamPushMapper.update(streamPush); + return true; + } + + @Override + @Transactional + public boolean stopByAppAndStream(String app, String stream) { + log.info("[主动停止推流] : app: {}, stream: {}, ", app, stream); + StreamPush streamPushItem = streamPushMapper.selectByAppAndStream(app, stream); + if (streamPushItem != null) { + stop(streamPushItem); } return true; } @Override + @Transactional public void zlmServerOnline(String mediaServerId) { // 同步zlm推流信息 MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); @@ -260,14 +336,14 @@ public class StreamPushServiceImpl implements IStreamPushService { // redis记录 List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH"); Map streamInfoPushItemMap = new HashMap<>(); - if (pushList.size() > 0) { + if (!pushList.isEmpty()) { for (StreamPush streamPushItem : pushList) { if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); } } } - if (mediaInfoList.size() > 0) { + if (!mediaInfoList.isEmpty()) { for (MediaInfo mediaInfo : mediaInfoList) { streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo); } @@ -290,26 +366,33 @@ public class StreamPushServiceImpl implements IStreamPushService { streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); } } - List offlinePushItems = new ArrayList<>(pushItemMap.values()); - if (offlinePushItems.size() > 0) { - String type = "PUSH"; - int runLimit = 300; - if (offlinePushItems.size() > runLimit) { - for (int i = 0; i < offlinePushItems.size(); i += runLimit) { - int toIndex = i + runLimit; - if (i + runLimit > offlinePushItems.size()) { - toIndex = offlinePushItems.size(); - } - List streamPushItemsSub = offlinePushItems.subList(i, toIndex); - streamPushMapper.delAll(streamPushItemsSub); - } - }else { - streamPushMapper.delAll(offlinePushItems); + List changedStreamPushList = new ArrayList<>(pushItemMap.values()); + if (!changedStreamPushList.isEmpty()) { + for (StreamPush streamPush : changedStreamPushList) { + stop(streamPush); } - } + + +// if (!changedStreamPushList.isEmpty()) { +// String type = "PUSH"; +// int runLimit = 300; +// if (changedStreamPushList.size() > runLimit) { +// for (int i = 0; i < changedStreamPushList.size(); i += runLimit) { +// int toIndex = i + runLimit; +// if (i + runLimit > changedStreamPushList.size()) { +// toIndex = changedStreamPushList.size(); +// } +// List streamPushItemsSub = changedStreamPushList.subList(i, toIndex); +// streamPushMapper.delAll(streamPushItemsSub); +// } +// }else { +// streamPushMapper.delAll(changedStreamPushList); +// } +// +// } Collection mediaInfos = streamInfoPushItemMap.values(); - if (mediaInfos.size() > 0) { + if (!mediaInfos.isEmpty()) { String type = "PUSH"; for (MediaInfo mediaInfo : mediaInfos) { JSONObject jsonObject = new JSONObject(); @@ -327,7 +410,7 @@ public class StreamPushServiceImpl implements IStreamPushService { } Collection streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); - if (streamAuthorityInfos.size() > 0) { + if (!streamAuthorityInfos.isEmpty()) { for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { // 移除redis内流的信息 redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); @@ -336,19 +419,24 @@ public class StreamPushServiceImpl implements IStreamPushService { } @Override + @Transactional public void zlmServerOffline(String mediaServerId) { - List streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId); - // 移除没有GBId的推流 - streamPushMapper.deleteWithoutGBId(mediaServerId); - gbStreamMapper.deleteWithoutGBId("push", mediaServerId); - // 其他的流设置未启用 - streamPushMapper.updateStatusByMediaServerId(mediaServerId, false); - streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false); + List streamPushItems = streamPushMapper.selectAllByMediaServerId(mediaServerId); + if (!streamPushItems.isEmpty()) { + for (StreamPush streamPushItem : streamPushItems) { + stop(streamPushItem); + } + } +// // 移除没有GBId的推流 +// streamPushMapper.deleteWithoutGBId(mediaServerId); +// // 其他的流设置未启用 +// streamPushMapper.updateStatusByMediaServerId(mediaServerId, false); +// streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false); // 发送流停止消息 String type = "PUSH"; // 发送redis消息 List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type); - if (mediaInfoList.size() > 0) { + if (!mediaInfoList.isEmpty()) { for (MediaInfo mediaInfo : mediaInfoList) { // 移除redis内流的信息 redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream()); @@ -367,41 +455,16 @@ public class StreamPushServiceImpl implements IStreamPushService { } @Override - public void clean() { - - } - - @Override - public boolean saveToRandomGB() { - List streamPushItems = streamPushMapper.selectAll(); - long gbId = 100001; - for (StreamPush streamPushItem : streamPushItems) { - streamPushItem.setStreamType("push"); - streamPushItem.setStatus(true); - streamPushItem.setGbId("34020000004111" + gbId); - streamPushItem.setCreateTime(DateUtil.getNow()); - gbId ++; - } - int limitCount = 30; - - if (streamPushItems.size() > limitCount) { - for (int i = 0; i < streamPushItems.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > streamPushItems.size()) { - toIndex = streamPushItems.size(); - } - gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex)); - } - }else { - gbStreamMapper.batchAdd(streamPushItems); - } - return true; - } - - @Override + @Transactional public void batchAdd(List streamPushItems) { streamPushMapper.addAll(streamPushItems); - gbStreamMapper.batchAdd(streamPushItems); + List commonGBChannels = new ArrayList<>(); + for (StreamPush streamPush : streamPushItems) { + if (ObjectUtils.isEmpty(streamPush.getGbDeviceId())) { + commonGBChannels.add(streamPush.getCommonGBChannel()); + } + } + gbChannelService.batchAdd(commonGBChannels); } @@ -459,7 +522,7 @@ public class StreamPushServiceImpl implements IStreamPushService { // 不存在这个平台,则忽略导入此关联关系 if (platformInfoMap.get(platFormInfoArray[0]) == null || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) { - logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] ); + log.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] ); continue; } streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]); @@ -544,38 +607,6 @@ public class StreamPushServiceImpl implements IStreamPushService { eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON); } - @Override - public boolean add(StreamPush stream) { - stream.setUpdateTime(DateUtil.getNow()); - stream.setCreateTime(DateUtil.getNow()); - stream.setServerId(userSetting.getServerId()); - stream.setMediaServerId(mediaConfig.getId()); - stream.setSelf(true); - stream.setPushIng(true); - - // 放在事务内执行 - boolean result = false; - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - try { - int addStreamResult = streamPushMapper.add(stream); - if (!ObjectUtils.isEmpty(stream.getGbId())) { - stream.setStreamType("push"); - gbStreamMapper.add(stream); - } - dataSourceTransactionManager.commit(transactionStatus); - result = true; - }catch (Exception e) { - logger.error("批量移除流与平台的关系时错误", e); - dataSourceTransactionManager.rollback(transactionStatus); - } - return result; - } - - @Override - public boolean update(StreamPush stream) { - - } - @Override public List getAllAppAndStream() { @@ -600,32 +631,41 @@ public class StreamPushServiceImpl implements IStreamPushService { return streamPushMapper.getAllGBId(); } - @Override - public void updatePush(OnStreamChangedHookParam param) { - StreamPush transform = transform(param); - StreamPush pushInDb = getPush(param.getApp(), param.getStream()); - transform.setPushIng(param.isRegist()); - transform.setUpdateTime(DateUtil.getNow()); - transform.setPushTime(DateUtil.getNow()); - transform.setSelf(userSetting.getServerId().equals(param.getSeverId())); - if (pushInDb == null) { - transform.setCreateTime(DateUtil.getNow()); - streamPushMapper.add(transform); - }else { - streamPushMapper.update(transform); - gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId()); - } - } - @Override public void updateStatus(StreamPush push) { } - @Override - public void deleteByAppAndStream(String app, String stream) { + + @Override + public void updatePushStatus(Integer streamPushId, boolean pushIng) { + streamPushInDb.setPushIng(true); + if (userSetting.isUsePushingAsStatus()) { + streamPushInDb.setGbStatus(true); + } + streamPushInDb.setPushTime(DateUtil.getNow()); } + private List handleJSON(List streamInfoList) { + if (streamInfoList == null || streamInfoList.isEmpty()) { + return null; + } + Map result = new HashMap<>(); + for (StreamInfo streamInfo : streamInfoList) { + // 不保存国标推理以及拉流代理的流 + if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { + String key = streamInfo.getApp() + "_" + streamInfo.getStream(); + StreamPush streamPushItem = result.get(key); + if (streamPushItem == null) { + streamPushItem = streamPushItem.getInstance(streamInfo); + result.put(key, streamPushItem); + } + } + } + return new ArrayList<>(result.values()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java index 38eb5344..d9c35e27 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java @@ -44,7 +44,7 @@ public class RedisCloseStreamMsgListener implements MessageListener { JSONObject jsonObject = JSON.parseObject(msg.getBody()); String app = jsonObject.getString("app"); String stream = jsonObject.getString("stream"); - pushService.stop(app, stream); + pushService.stopByAppAndStream(app, stream); }catch (Exception e) { log.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); log.error("[REDIS的关闭推流通知] 异常内容: ", e); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index 6c6f93d0..10372851 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -87,7 +87,7 @@ public interface StreamPushMapper { List selectAll(); @Select("SELECT st.*, gs.gb_id, gs.name, gs.longitude, gs.latitude FROM wvp_stream_push st LEFT join wvp_gb_stream gs on st.app = gs.app AND st.stream = gs.stream WHERE st.app=#{app} AND st.stream=#{stream}") - StreamPush selectOne(@Param("app") String app, @Param("stream") String stream); + StreamPush selectByAppAndStream(@Param("app") String app, @Param("stream") String stream); @Insert("