From c186ce94c1cdcb7d500936e8c2f90223f4a553bc Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sun, 7 Jan 2024 01:50:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8E=A8=E6=B5=81=E5=88=97?= =?UTF-8?q?=E8=A1=A8=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/media/zlm/ZLMHttpHookListener.java | 6 +- .../vmp/media/zlm/ZLMMediaListManager.java | 41 ++++--- .../iot/vmp/service/IStreamPushService.java | 14 ++- .../service/impl/StreamPushServiceImpl.java | 111 ++++++++++++++---- .../vmp/storager/dao/CommonChannelMapper.java | 1 + .../vmp/storager/dao/StreamPushMapper.java | 29 ++++- .../vmp/vmanager/bean/BatchGBStreamParam.java | 10 ++ .../streamPush/StreamPushController.java | 50 ++++++-- 8 files changed, 209 insertions(+), 53 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index eb5b5bb8..45d89b15 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -86,6 +86,9 @@ public class ZLMHttpHookListener { @Autowired private IStreamProxyService streamProxyService; + @Autowired + private IStreamPushService streamPushService; + @Autowired private DeferredResultHolder resultHolder; @@ -336,6 +339,7 @@ public class ZLMHttpHookListener { JSONObject json = (JSONObject) JSON.toJSON(param); taskExecutor.execute(() -> { + // 发送hook订阅通知 ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); if (mediaInfo == null) { @@ -347,7 +351,6 @@ public class ZLMHttpHookListener { } List tracks = param.getTracks(); - // TODO 重构此处逻辑 if (param.isRegist()) { // 处理流注册的鉴权信息, 流注销这里不再删除鉴权信息,下次来了新的鉴权信息会对就的进行覆盖 if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() @@ -416,6 +419,7 @@ public class ZLMHttpHookListener { if ("PUSH".equalsIgnoreCase(type)) { // 冗余数据,自己系统中自用 redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId()); + zlmMediaListManager.removePush(param); } } zlmMediaListManager.streamOffline(param.getApp(), param.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 1ffc58fc..ba846e50 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.media.zlm; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.service.ICommonGbChannelService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IResourceService; import com.genersoft.iot.vmp.service.IStreamPushService; @@ -10,10 +11,12 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; import com.genersoft.iot.vmp.utils.DateUtil; +import io.netty.util.internal.ObjectUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import java.text.ParseException; import java.util.*; @@ -27,27 +30,12 @@ public class ZLMMediaListManager { private Logger logger = LoggerFactory.getLogger("ZLMMediaListManager"); - @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private IVideoManagerStorage storager; - @Autowired private IStreamPushService streamPushService; @Autowired private Map resourceServiceMap; - @Autowired - private StreamPushMapper streamPushMapper; - - @Autowired - private ZlmHttpHookSubscribe subscribe; - @Autowired private UserSetting userSetting; @@ -57,6 +45,9 @@ public class ZLMMediaListManager { @Autowired private IMediaServerService mediaServerService; + @Autowired + private ICommonGbChannelService commonGbChannelService; + private Map channelOnPublishEvents = new ConcurrentHashMap<>(); public StreamPush addPush(OnStreamChangedHookParam onStreamChangedHookParam) { @@ -64,7 +55,8 @@ public class ZLMMediaListManager { StreamPush pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); if (pushInDb == null) { - + transform.setPushIng(true); + transform.setPushTime(DateUtil.getNow()); streamPushService.add(transform); }else { pushInDb.setPushIng(onStreamChangedHookParam.isRegist()); @@ -85,6 +77,22 @@ public class ZLMMediaListManager { return transform; } + public void removePush(OnStreamChangedHookParam param) { + StreamPush pushInDb = streamPushService.getPush(param.getApp(), param.getStream()); + if (pushInDb == null) { + return; + } + if (ObjectUtils.isEmpty(pushInDb.getGbId())) { + streamPushService.remove(pushInDb.getId()); + }else { + List onlinePushers = new ArrayList<>(); + onlinePushers.add(pushInDb.getCommonGbChannelId()); + commonGbChannelService.offlineForList(onlinePushers); + streamPushService.offline(pushInDb.getId()); + } + + } + public void sendStreamEvent(String app, String stream, String mediaServerId) { MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); // 查看推流状态 @@ -119,5 +127,4 @@ public class ZLMMediaListManager { public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { return this.channelOnPublishEvents.get(app + "_" + stream); } - } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 1c2aa58b..ad8827c5 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -60,7 +60,7 @@ public interface IStreamPushService { /** * 中止多个推流 */ - boolean batchStop(List streamPushItems); + boolean batchStop(List streamPushIds); /** * 导入时批量增加 @@ -101,10 +101,20 @@ public interface IStreamPushService { void batchUpdate(List streamPushItemForUpdate); - void update(StreamPush transform); + boolean update(StreamPush transform); /** * 更新redis发来的gps更新消息 */ void updateStreamGPS(List gpsMsgInfoList); + + /** + * 移除推流信息 + */ + boolean remove(Integer id); + + /** + * 设置推流离线 + */ + void offline(Integer id); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 900f8fa5..42204479 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.common.BatchLimit; import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.*; @@ -19,6 +20,7 @@ import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.github.pagehelper.PageHelper; @@ -31,6 +33,7 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; @Service public class StreamPushServiceImpl implements IStreamPushService { @@ -121,20 +124,20 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public StreamPush getPush(String app, String streamId) { - return streamPushMapper.selectOne(app, streamId); + return streamPushMapper.selectOneByAppAndStream(app, streamId); } @Override public boolean stop(String app, String streamId) { logger.info("[停止推流 ] {}/{}", app, streamId); - StreamPush streamPushItem = streamPushMapper.selectOne(app, streamId); + StreamPush streamPushItem = streamPushMapper.selectOneByAppAndStream(app, streamId); if (streamPushItem == null) { logger.info("[停止推流] 不存在 {}/{} ", app, streamId); return false; } if (streamPushItem.getCommonGbChannelId() == 0) { - streamPushMapper.del(app, streamId); + streamPushMapper.del(streamPushItem.getId()); } MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId); @@ -290,13 +293,19 @@ public class StreamPushServiceImpl implements IStreamPushService { streamPushListWithoutChannel.add(streamPush); } }); + Map commonGbChanneIdMap = new ConcurrentHashMap(); + if (!commonGbChannelList.isEmpty()) { - commonGbChannelService.batchAdd(commonGbChannelList); - - for (int i = 0; i < commonGbChannelList.size(); i++) { - streamPushListForChannel.get(i).setCommonGbChannelId(commonGbChannelList.get(i).getCommonGbId()); - } + commonGbChannelList.stream().forEach(commonGbChannel -> { + commonGbChanneIdMap.put(commonGbChannel.getCommonGbDeviceID(), commonGbChannel.getCommonGbId()); + }); + streamPushListForChannel.stream().forEach(streamPush -> { + String gbId = streamPush.getGbId(); + if (commonGbChanneIdMap.get(gbId) != null) { + streamPush.setCommonGbChannelId(commonGbChanneIdMap.get(gbId)); + } + }); streamPushListWithoutChannel.addAll(streamPushListForChannel); } if (streamPushListWithoutChannel.size() > BatchLimit.count) { @@ -342,10 +351,17 @@ public class StreamPushServiceImpl implements IStreamPushService { } }); + Map commonGbChanneIdMap = new ConcurrentHashMap(); commonGbChannelService.batchAdd(commonGbChannelList); - for (int i = 0; i < commonGbChannelList.size(); i++) { - streamPushListForChannel.get(i).setCommonGbChannelId(commonGbChannelList.get(i).getCommonGbId()); - } + commonGbChannelList.stream().forEach(commonGbChannel -> { + commonGbChanneIdMap.put(commonGbChannel.getCommonGbDeviceID(), commonGbChannel.getCommonGbId()); + }); + streamPushListForChannel.stream().forEach(streamPush -> { + String gbId = streamPush.getGbId(); + if (commonGbChanneIdMap.get(gbId) != null) { + streamPush.setCommonGbChannelId(commonGbChanneIdMap.get(gbId)); + } + }); streamPushListWithoutChannel.addAll(streamPushListForChannel); if (streamPushListWithoutChannel.size() > BatchLimit.count) { for (int i = 0; i < streamPushListWithoutChannel.size(); i += BatchLimit.count) { @@ -362,14 +378,27 @@ public class StreamPushServiceImpl implements IStreamPushService { } @Override - public boolean batchStop(List streamPushList) { - if (streamPushList == null || streamPushList.size() == 0) { + public boolean batchStop(List streamPushIdList) { + if (streamPushIdList == null || streamPushIdList.isEmpty()) { return false; } - int delStream = streamPushMapper.delAllByAppAndStream(streamPushList); - if (delStream > 0) { + List streamPushList = streamPushMapper.getListInIds(streamPushIdList); + List commonGbChannelIds = new ArrayList<>(); + streamPushList.stream().forEach(streamPush -> { + if (streamPush.getCommonGbChannelId() > 0) { + commonGbChannelIds.add(streamPush.getCommonGbChannelId()); + } + }); + if (!commonGbChannelIds.isEmpty()) { + commonGbChannelService.deleteByIdList(commonGbChannelIds); + } + Map mediaServerItemMap = new HashMap<>(); + if (streamPushMapper.delAllByIds(streamPushIdList) > 0) { for (StreamPush streamPush : streamPushList) { - MediaServerItem mediaServerItem = mediaServerService.getOne(streamPush.getMediaServerId()); + MediaServerItem mediaServerItem = mediaServerItemMap.get(streamPush.getMediaServerId()); + if (mediaServerItem == null) { + mediaServerItem = mediaServerService.getOne(streamPush.getMediaServerId()); + } zlmresTfulUtils.closeStreams(mediaServerItem, streamPush.getApp(), streamPush.getStream()); } } @@ -377,7 +406,6 @@ public class StreamPushServiceImpl implements IStreamPushService { } - @Override public void allStreamOffline() { List onlinePushers = streamPushMapper.getOnlinePusherForGb(); @@ -423,6 +451,10 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override @Transactional public boolean add(StreamPush stream) { + StreamPush streamPush = streamPushMapper.selectOneByAppAndStream(stream.getApp(), stream.getStream()); + if (streamPush != null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "应用名/流ID已存在"); + } String now = DateUtil.getNow(); CommonGbChannel commonGbChannel = null; if (!ObjectUtils.isEmpty(stream.getGbId())) { @@ -437,13 +469,12 @@ public class StreamPushServiceImpl implements IStreamPushService { stream.setServerId(userSetting.getServerId()); stream.setMediaServerId(mediaConfig.getId()); stream.setSelf(true); - stream.setPushIng(true); - return streamPushMapper.add(stream) > 1; + return streamPushMapper.add(stream) > 0; } @Override @Transactional - public void update(StreamPush streamPush) { + public boolean update(StreamPush streamPush) { assert streamPush.getId() > 0; StreamPush streamPushIDb = streamPushMapper.getOne(streamPush.getId()); assert streamPushIDb != null; @@ -456,6 +487,7 @@ public class StreamPushServiceImpl implements IStreamPushService { } streamPush.setUpdateTime(DateUtil.getNow()); streamPushMapper.update(streamPush); + return true; } @Override @@ -475,4 +507,43 @@ public class StreamPushServiceImpl implements IStreamPushService { public void updateStreamGPS(List gpsMsgInfoList) { streamPushMapper.updateStreamGPS(gpsMsgInfoList); } + + @Override + public boolean remove(Integer id) { + if (id == null) { + return false; + } + StreamPush streamPush = streamPushMapper.selectOne(id); + if (streamPush == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "ID不存在"); + } + if (streamPush.getCommonGbChannelId() > 0) { + commonGbChannelService.deleteById(streamPush.getCommonGbChannelId()); + } + if (streamPush.isPushIng()) { + String mediaServerId = streamPush.getMediaServerId(); + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + zlmresTfulUtils.closeStreams(mediaServerItem, streamPush.getApp(), streamPush.getStream()); + } + return streamPushMapper.del(id) > 0; + } + + @Override + public void offline(Integer id) { + if (id == null) { + return; + } + StreamPush streamPush = streamPushMapper.selectOne(id); + if (streamPush == null) { + return; + } + if (userSetting.isUsePushingAsStatus()) { + if (streamPush.getCommonGbChannelId() > 0) { + List pushers = new ArrayList<>(); + pushers.add(streamPush.getCommonGbChannelId()); + commonGbChannelService.offlineForList(pushers); + } + streamPushMapper.offlineById(streamPush.getId()); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelMapper.java index 1be3b8e9..4f3ae8af 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelMapper.java @@ -520,6 +520,7 @@ public interface CommonChannelMapper { "#{item.createTime}" + ") " + "") + @Options(useGeneratedKeys=true, keyProperty="commonGbId", keyColumn="common_gb_id") int batchAdd(@Param("commonGbChannels") List commonGbChannels); @Update({""}) int update(StreamPush streamPushItem); - @Delete("DELETE FROM wvp_stream_push WHERE app=#{app} AND stream=#{stream}") - int del(String app, String stream); + @Delete("DELETE FROM wvp_stream_push WHERE id=#{id}") + int del(@Param("id") int id); @Delete("") void offline(List offlineStreams); + @Update("UPDATE wvp_stream_push SET status=0 where id = #{id}" ) + void offlineById(@Param("id") int id); + @Update(""}) void updateStreamGPS(List gpsMsgInfoList); + + @Select("select * from wvp_stream_push where id=#{id}") + StreamPush selectOne(@Param("id") Integer id); + + + @Select("" ) + List getListInIds(List streamPushIdList); + + @Delete("") + int delAllByIds(List streamPushIdList); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/BatchGBStreamParam.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/BatchGBStreamParam.java index 496edbff..373c67ff 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/BatchGBStreamParam.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/BatchGBStreamParam.java @@ -12,6 +12,8 @@ import java.util.List; public class BatchGBStreamParam { @Schema(description = "推流信息列表") private List streamPushes; + @Schema(description = "推流信息列表") + private List streamPushIds; public List getStreamPushes() { return streamPushes; @@ -20,4 +22,12 @@ public class BatchGBStreamParam { public void setStreamPushes(List streamPushes) { this.streamPushes = streamPushes; } + + public List getStreamPushIds() { + return streamPushIds; + } + + public void setStreamPushIds(List streamPushIds) { + this.streamPushIds = streamPushIds; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java index f4258de3..bddbaf3d 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java @@ -85,11 +85,16 @@ public class StreamPushController { return pushList; } - @PostMapping(value = "/save") + @PostMapping(value = "/update") @ResponseBody - @Operation(summary = "将推流添加到资源", security = @SecurityRequirement(name = JwtUtils.HEADER)) - public void saveToCommonChannel(@RequestBody StreamPushWithCommonChannelParam param){ - + @Operation(summary = "更新", security = @SecurityRequirement(name = JwtUtils.HEADER)) + public void update(@RequestBody StreamPush streamPush){ + if (ObjectUtils.isEmpty(streamPush.getApp()) && ObjectUtils.isEmpty(streamPush.getStream())) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), "app或stream不可为空"); + } + if (!streamPushService.update(streamPush)) { + throw new ControllerException(ErrorCode.ERROR100); + } } @@ -104,14 +109,26 @@ public class StreamPushController { } } - @DeleteMapping(value = "/batchStop") +// @DeleteMapping(value = "/batchStop") +// @ResponseBody +// @Operation(summary = "中止多个推流", security = @SecurityRequirement(name = JwtUtils.HEADER)) +// public void batchStop(@RequestBody BatchGBStreamParam batchGBStreamParam){ +// if (batchGBStreamParam.getStreamPushes().size() == 0) { +// throw new ControllerException(ErrorCode.ERROR100); +// } +// if (!streamPushService.batchStop(batchGBStreamParam.getStreamPushes())){ +// throw new ControllerException(ErrorCode.ERROR100); +// } +// } + + @DeleteMapping(value = "/batchDelete") @ResponseBody - @Operation(summary = "中止多个推流", security = @SecurityRequirement(name = JwtUtils.HEADER)) - public void batchStop(@RequestBody BatchGBStreamParam batchGBStreamParam){ - if (batchGBStreamParam.getStreamPushes().size() == 0) { + @Operation(summary = "删除多个推流", security = @SecurityRequirement(name = JwtUtils.HEADER)) + public void batchDelete(@RequestBody BatchGBStreamParam batchGBStreamParam){ + if (batchGBStreamParam.getStreamPushIds().size() == 0) { throw new ControllerException(ErrorCode.ERROR100); } - if (!streamPushService.batchStop(batchGBStreamParam.getStreamPushes())){ + if (!streamPushService.batchStop(batchGBStreamParam.getStreamPushIds())){ throw new ControllerException(ErrorCode.ERROR100); } } @@ -250,9 +267,22 @@ public class StreamPushController { if (ObjectUtils.isEmpty(param.getApp()) && ObjectUtils.isEmpty(param.getStream())) { throw new ControllerException(ErrorCode.ERROR400.getCode(), "app或stream不可为空"); } - + param.setPushIng(false); if (!streamPushService.add(param)) { throw new ControllerException(ErrorCode.ERROR100); } } + + /** + * 移除推流信息 + */ + @DeleteMapping(value = "/delete") + @ResponseBody + @Operation(summary = "移除推流信息", security = @SecurityRequirement(name = JwtUtils.HEADER)) + public void delete(@RequestBody StreamPush param){ + + if (!streamPushService.remove(param.getId())) { + throw new ControllerException(ErrorCode.ERROR100); + } + } }