diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/CommonChannelController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/CommonChannelController.java index ee9f99f7..fb50a4bf 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/CommonChannelController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/CommonChannelController.java @@ -82,7 +82,8 @@ public class CommonChannelController { @Operation(summary = "增加通道", security = @SecurityRequirement(name = JwtUtils.HEADER)) @ResponseBody @PostMapping("/add") - public void add(@RequestBody CommonGBChannel channel){ + public CommonGBChannel add(@RequestBody CommonGBChannel channel){ channelService.add(channel); + return channel; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MediaController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MediaController.java index ff41a54c..54530295 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MediaController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MediaController.java @@ -95,8 +95,8 @@ public class MediaController { return new StreamContent(streamInfo); }else { //获取流失败,重启拉流后重试一次 - streamProxyService.stop(app,stream); - boolean start = streamProxyService.start(app, stream); + streamProxyService.stopByAppAndStream(app,stream); + boolean start = streamProxyService.startByAppAndStream(app, stream); try { Thread.sleep(1000); } catch (InterruptedException e) { @@ -117,4 +117,30 @@ public class MediaController { } } } + /** + * 获取推流播放地址 + * @param app 应用名 + * @param stream 流id + * @return + */ + @GetMapping(value = "/getPlayUrl") + @ResponseBody + @Operation(summary = "获取推流播放地址", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "app", description = "应用名", required = true) + @Parameter(name = "stream", description = "流id", required = true) + @Parameter(name = "mediaServerId", description = "媒体服务器id") + public StreamContent getPlayUrl(@RequestParam String app, @RequestParam String stream, + @RequestParam(required = false) String mediaServerId){ + boolean authority = false; + // 是否登陆用户, 登陆用户返回完整信息 + LoginUser userInfo = SecurityUtils.getUserInfo(); + if (userInfo!= null) { + authority = true; + } + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); + if (streamInfo == null){ + throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取播放地址失败"); + } + return new StreamContent(streamInfo); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java index 67208316..9669e976 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java @@ -12,16 +12,12 @@ public interface CommonGBChannelMapper { @Select("select\n" + " id as gb_id,\n" + - " device_db_id,\n" + + " id as gb_id,\n" + + " device_db_id as gb_device_db_id,\n" + " stream_push_id,\n" + " stream_proxy_id,\n" + " create_time,\n" + " update_time,\n" + - " sub_count,\n" + - " stream_id,\n" + - " has_audio,\n" + - " gps_time,\n" + - " stream_identification,\n" + " coalesce(gb_device_id, device_id) as gb_device_id,\n" + " coalesce(gb_name, name) as gb_name,\n" + " coalesce(gb_manufacturer, manufacturer) as gb_manufacturer,\n" + @@ -141,6 +137,7 @@ public interface CommonGBChannelMapper { "#{gbSvcTimeSupportMode}"+ ")" + " ") + @Options(useGeneratedKeys = true, keyProperty = "gbId", keyColumn = "id") int insert(CommonGBChannel commonGBChannel); @Select(" select\n" + @@ -283,16 +280,11 @@ public interface CommonGBChannelMapper { @Select(value = {" "}) void batchDelete(List channelListInDb); + + @Select(value = {"select\n" + + " id as gb_id,\n" + + " device_db_id as gb_device_db_id,\n" + + " stream_push_id,\n" + + " stream_proxy_id,\n" + + " create_time,\n" + + " update_time,\n" + + " coalesce(gb_device_id, device_id) as gb_device_id,\n" + + " coalesce(gb_name, name) as gb_name,\n" + + " coalesce(gb_manufacturer, manufacturer) as gb_manufacturer,\n" + + " coalesce(gb_model, model) as gb_model,\n" + + " coalesce(gb_owner, owner) as gb_owner,\n" + + " coalesce(gb_civil_code, civil_code) as gb_civil_code,\n" + + " coalesce(gb_block, block) as gb_block,\n" + + " coalesce(gb_address, address) as gb_address,\n" + + " coalesce(gb_parental, parental) as gb_parental,\n" + + " coalesce(gb_parent_id, parent_id) as gb_parent_id,\n" + + " coalesce(gb_safety_way, safety_way) as gb_safety_way,\n" + + " coalesce(gb_register_way, register_way) as gb_register_way,\n" + + " coalesce(gb_cert_num, cert_num) as gb_cert_num,\n" + + " coalesce(gb_certifiable, certifiable) as gb_certifiable,\n" + + " coalesce(gb_err_code, err_code) as gb_err_code,\n" + + " coalesce(gb_end_time, end_time) as gb_end_time,\n" + + " coalesce(gb_secrecy, secrecy) as gb_secrecy,\n" + + " coalesce(gb_ip_address, ip_address) as gb_ip_address,\n" + + " coalesce(gb_port, port) as gb_port,\n" + + " coalesce(gb_password, password) as gb_password,\n" + + " coalesce(gb_status, status) as gb_status,\n" + + " coalesce(gb_longitude, longitude) as gb_longitude,\n" + + " coalesce(gb_latitude, latitude) as gb_latitude,\n" + + " coalesce(gb_ptz_type, ptz_type) as gb_ptz_type,\n" + + " coalesce(gb_position_type, position_type) as gb_position_type,\n" + + " coalesce(gb_room_type, room_type) as gb_room_type,\n" + + " coalesce(gb_use_type, use_type) as gb_use_type,\n" + + " coalesce(gb_supply_light_type, supply_light_type) as gb_supply_light_type,\n" + + " coalesce(gb_direction_type, direction_type) as gb_direction_type,\n" + + " coalesce(gb_resolution, resolution) as gb_resolution,\n" + + " coalesce(gb_business_group_id, business_group_id) as gb_business_group_id,\n" + + " coalesce(gb_download_speed, download_speed) as gb_download_speed,\n" + + " coalesce(gb_svc_space_support_mod, svc_space_support_mod) as gb_svc_space_support_mod,\n" + + " coalesce(gb_svc_time_support_mode,svc_time_support_mode) as gb_svc_time_support_mode\n" + + "from wvp_device_channel \n" + + "where stream_push_id = #{streamPushId}"}) + CommonGBChannel queryByStreamPushId(@Param("streamPushId") Integer streamPushId); + + @Select(value = {"select\n" + + " id as gb_id,\n" + + " device_db_id as gb_device_db_id,\n" + + " stream_push_id,\n" + + " stream_proxy_id,\n" + + " create_time,\n" + + " update_time,\n" + + " coalesce(gb_device_id, device_id) as gb_device_id,\n" + + " coalesce(gb_name, name) as gb_name,\n" + + " coalesce(gb_manufacturer, manufacturer) as gb_manufacturer,\n" + + " coalesce(gb_model, model) as gb_model,\n" + + " coalesce(gb_owner, owner) as gb_owner,\n" + + " coalesce(gb_civil_code, civil_code) as gb_civil_code,\n" + + " coalesce(gb_block, block) as gb_block,\n" + + " coalesce(gb_address, address) as gb_address,\n" + + " coalesce(gb_parental, parental) as gb_parental,\n" + + " coalesce(gb_parent_id, parent_id) as gb_parent_id,\n" + + " coalesce(gb_safety_way, safety_way) as gb_safety_way,\n" + + " coalesce(gb_register_way, register_way) as gb_register_way,\n" + + " coalesce(gb_cert_num, cert_num) as gb_cert_num,\n" + + " coalesce(gb_certifiable, certifiable) as gb_certifiable,\n" + + " coalesce(gb_err_code, err_code) as gb_err_code,\n" + + " coalesce(gb_end_time, end_time) as gb_end_time,\n" + + " coalesce(gb_secrecy, secrecy) as gb_secrecy,\n" + + " coalesce(gb_ip_address, ip_address) as gb_ip_address,\n" + + " coalesce(gb_port, port) as gb_port,\n" + + " coalesce(gb_password, password) as gb_password,\n" + + " coalesce(gb_status, status) as gb_status,\n" + + " coalesce(gb_longitude, longitude) as gb_longitude,\n" + + " coalesce(gb_latitude, latitude) as gb_latitude,\n" + + " coalesce(gb_ptz_type, ptz_type) as gb_ptz_type,\n" + + " coalesce(gb_position_type, position_type) as gb_position_type,\n" + + " coalesce(gb_room_type, room_type) as gb_room_type,\n" + + " coalesce(gb_use_type, use_type) as gb_use_type,\n" + + " coalesce(gb_supply_light_type, supply_light_type) as gb_supply_light_type,\n" + + " coalesce(gb_direction_type, direction_type) as gb_direction_type,\n" + + " coalesce(gb_resolution, resolution) as gb_resolution,\n" + + " coalesce(gb_business_group_id, business_group_id) as gb_business_group_id,\n" + + " coalesce(gb_download_speed, download_speed) as gb_download_speed,\n" + + " coalesce(gb_svc_space_support_mod, svc_space_support_mod) as gb_svc_space_support_mod,\n" + + " coalesce(gb_svc_time_support_mode,svc_time_support_mode) as gb_svc_time_support_mode\n" + + "from wvp_device_channel \n" + + "where stream_proxy_id = #{streamProxyId}"}) + CommonGBChannel queryByStreamProxyId(@Param("streamProxyId") Integer streamProxyId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index d861f530..1b8de39a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -34,6 +34,18 @@ public class GbChannelServiceImpl implements IGbChannelService { @Override public int add(CommonGBChannel commonGBChannel) { + if (commonGBChannel.getStreamPushId() != null && commonGBChannel.getStreamPushId() > 0) { + CommonGBChannel commonGBChannelInDb = commonGBChannelMapper.queryByStreamPushId(commonGBChannel.getStreamPushId()); + if (commonGBChannelInDb != null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "此推流已经关联通道"); + } + } + if (commonGBChannel.getStreamProxyId() != null && commonGBChannel.getStreamProxyId() > 0) { + CommonGBChannel commonGBChannelInDb = commonGBChannelMapper.queryByStreamProxyId(commonGBChannel.getStreamProxyId()); + if (commonGBChannelInDb != null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "此代理已经关联通道"); + } + } return commonGBChannelMapper.insert(commonGBChannel); } @@ -74,6 +86,7 @@ public class GbChannelServiceImpl implements IGbChannelService { log.warn("[更新通道] 未找到数据库ID,更新失败, {}", commonGBChannel.getGbDeviceDbId()); return 0; } + commonGBChannel.setUpdateTime(DateUtil.getNow()); int result = commonGBChannelMapper.update(commonGBChannel); if (result > 0) { try { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index fb1bf9df..4e5d79eb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -743,7 +743,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements log.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream()); hookSubscribe.removeSubscribe(hook); }, userSetting.getPlatformPlayTimeout()); - boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream()); + boolean start = streamProxyService.startByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); if (!start) { try { responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline"); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 0bad2598..7c4be2d7 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -749,13 +749,12 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, String addr, boolean authority) { - StreamInfo streamInfo = null; if (mediaServerId == null) { mediaServerId = mediaConfig.getId(); } MediaServer mediaInfo = getOne(mediaServerId); if (mediaInfo == null) { - return null; + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到使用的媒体节点"); } String calld = null; StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 62acfa4e..b0a02a31 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -210,7 +210,9 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { streamInfoResult.setRtc(addr, mediaServer.getHttpPort(),mediaServer.getHttpSSlPort(), app, stream, callIdParam, isPlay); streamInfoResult.setMediaInfo(mediaInfo); - streamInfoResult.setOriginType(mediaInfo.getOriginType()); + if (mediaInfo != null) { + streamInfoResult.setOriginType(mediaInfo.getOriginType()); + } return streamInfoResult; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index da6948ed..62454f24 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -266,14 +266,14 @@ public class ZLMRESTfulUtils { return sendPost(mediaServerItem, "getRtpInfo",param, null); } - public JSONObject addFFmpegSource(MediaServer mediaServerItem, String src_url, String dst_url, Integer timeout_ms, + public JSONObject addFFmpegSource(MediaServer mediaServerItem, String src_url, String dst_url, Integer timeout_sec, boolean enable_audio, boolean enable_mp4, String ffmpeg_cmd_key){ log.info(src_url); log.info(dst_url); Map param = new HashMap<>(); param.put("src_url", src_url); param.put("dst_url", dst_url); - param.put("timeout_ms", timeout_ms); + param.put("timeout_ms", timeout_sec*1000); param.put("enable_mp4", enable_mp4); param.put("ffmpeg_cmd_key", ffmpeg_cmd_key); return sendPost(mediaServerItem, "addFFmpegSource",param, null); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 3bd6617d..581bed4c 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -284,13 +284,13 @@ public class MediaServiceImpl implements IMediaService { if (streamProxy.isEnableRemoveNoneReader()) { // 无人观看自动移除 result = true; - streamProxyService.del(app, stream); + streamProxyService.delteByAppAndStream(app, stream); log.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, stream, streamProxy.getSrcUrl()); } else if (streamProxy.isEnableDisableNoneReader()) { // 无人观看停用 result = true; // 修改数据 - streamProxyService.stop(app, stream); + streamProxyService.stopByAppAndStream(app, stream); } else { // 无人观看不做处理 result = false; diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java index bae6657a..cdb2470f 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java @@ -35,7 +35,7 @@ public class StreamProxy extends CommonGBChannel { @Schema(description = "拉流地址") private String srcUrl; - @Schema(description = "超时时间") + @Schema(description = "超时时间:秒") private int timeout; @Schema(description = "ffmpeg模板KEY") diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxyParam.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxyParam.java new file mode 100755 index 00000000..1f45d711 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxyParam.java @@ -0,0 +1,71 @@ +package com.genersoft.iot.vmp.streamProxy.bean; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * @author lin + */ +@Data +@Schema(description = "拉流代理的信息") +public class StreamProxyParam { + + @Schema(description = "类型,取值,default: 流媒体直接拉流(默认),ffmpeg: ffmpeg实现拉流") + private String type; + + @Schema(description = "应用名") + private String app; + + @Schema(description = "流ID") + private String stream; + + @Schema(description = "流媒体服务ID") + private String mediaServerId; + + @Schema(description = "拉流地址") + private String url; + + @Schema(description = "超时时间:秒") + private int timeoutMs; + + @Schema(description = "ffmpeg模板KEY") + private String ffmpegCmdKey; + + @Schema(description = "rtsp拉流时,拉流方式,0:tcp,1:udp,2:组播") + private String rtpType; + + @Schema(description = "是否启用") + private boolean enable; + + @Schema(description = "是否启用音频") + private boolean enableAudio; + + @Schema(description = "是否启用MP4") + private boolean enableMp4; + + @Schema(description = "是否 无人观看时删除") + private boolean enableRemoveNoneReader; + + @Schema(description = "是否 无人观看时自动停用") + private boolean enableDisableNoneReader; + + + public StreamProxy buildStreamProxy() { + StreamProxy streamProxy = new StreamProxy(); + streamProxy.setApp(app); + streamProxy.setStream(stream); + streamProxy.setMediaServerId(mediaServerId); + streamProxy.setSrcUrl(url); + streamProxy.setTimeout(timeoutMs/1000); + streamProxy.setRtspType(rtpType); + streamProxy.setEnable(enable); + streamProxy.setEnableAudio(enableAudio); + streamProxy.setEnableMp4(enableMp4); + streamProxy.setEnableRemoveNoneReader(enableRemoveNoneReader); + streamProxy.setEnableDisableNoneReader(enableDisableNoneReader); + streamProxy.setFfmpegCmdKey(ffmpegCmdKey); + + return streamProxy; + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java index d4b7de75..f6d1a81e 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; @@ -44,7 +45,6 @@ public class StreamProxyController { @Parameter(name = "page", description = "当前页") @Parameter(name = "count", description = "每页查询数量") @Parameter(name = "query", description = "查询内容") - @Parameter(name = "query", description = "查询内容") @Parameter(name = "pulling", description = "是否正在拉流") @Parameter(name = "mediaServerId", description = "流媒体ID") @GetMapping(value = "/list") @@ -55,6 +55,12 @@ public class StreamProxyController { @RequestParam(required = false)Boolean pulling, @RequestParam(required = false)String mediaServerId){ + if (ObjectUtils.isEmpty(mediaServerId)) { + mediaServerId = null; + } + if (ObjectUtils.isEmpty(query)) { + query = null; + } return streamProxyService.getAll(page, count, query, pulling, mediaServerId); } @@ -73,7 +79,7 @@ public class StreamProxyController { }) @PostMapping(value = "/save") @ResponseBody - public StreamContent save(@RequestBody StreamProxy param){ + public StreamContent save(@RequestBody StreamProxyParam param){ log.info("添加代理: " + JSONObject.toJSONString(param)); if (ObjectUtils.isEmpty(param.getMediaServerId())) { param.setMediaServerId("auto"); @@ -81,13 +87,6 @@ public class StreamProxyController { if (ObjectUtils.isEmpty(param.getType())) { param.setType("default"); } - if (ObjectUtils.isEmpty(param.getGbId())) { - param.setGbDeviceId(null); - } - StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); - if (streamProxyItem != null) { - streamProxyService.del(param.getApp(), param.getStream()); - } StreamInfo streamInfo = streamProxyService.save(param); if (param.isEnable()) { @@ -107,10 +106,10 @@ public class StreamProxyController { }) @PostMapping(value = "/add") @ResponseBody - public StreamContent add(@RequestBody StreamProxy param){ + public StreamProxy add(@RequestBody StreamProxy param){ log.info("添加代理: " + JSONObject.toJSONString(param)); if (ObjectUtils.isEmpty(param.getMediaServerId())) { - param.setMediaServerId("auto"); + param.setMediaServerId(null); } if (ObjectUtils.isEmpty(param.getType())) { param.setType("default"); @@ -118,22 +117,24 @@ public class StreamProxyController { if (ObjectUtils.isEmpty(param.getGbId())) { param.setGbDeviceId(null); } - StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); - if (streamProxyItem != null) { - streamProxyService.del(param.getApp(), param.getStream()); - } + streamProxyService.add(param); + return param; + } - StreamInfo streamInfo = streamProxyService.add(param); - if (param.isEnable()) { - if (streamInfo == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg()); - }else { - return new StreamContent(streamInfo); - } - }else { - return null; + @Operation(summary = "更新代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = { + @Parameter(name = "param", description = "代理参数", required = true), + }) + @PostMapping(value = "/update") + @ResponseBody + public void update(@RequestBody StreamProxy param){ + log.info("更新代理: " + JSONObject.toJSONString(param)); + if (param.getId() == 0) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), "缺少代理信息的ID"); } - + if (ObjectUtils.isEmpty(param.getGbId())) { + param.setGbDeviceId(null); + } + streamProxyService.update(param); } @GetMapping(value = "/ffmpeg_cmd/list") @@ -160,18 +161,26 @@ public class StreamProxyController { if (app == null || stream == null) { throw new ControllerException(ErrorCode.ERROR400.getCode(), app == null ?"app不能为null":"stream不能为null"); }else { - streamProxyService.del(app, stream); + streamProxyService.delteByAppAndStream(app, stream); } } + @DeleteMapping(value = "/delte") + @ResponseBody + @Operation(summary = "移除代理", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "id", description = "代理ID", required = true) + public void delte(int id){ + log.info("移除代理: " + id ); + streamProxyService.delte(id); + } + @GetMapping(value = "/start") @ResponseBody @Operation(summary = "启用代理", security = @SecurityRequirement(name = JwtUtils.HEADER)) - @Parameter(name = "app", description = "应用名", required = true) - @Parameter(name = "stream", description = "流id", required = true) - public void start(String app, String stream){ - log.info("启用代理: " + app + "/" + stream); - boolean result = streamProxyService.start(app, stream); + @Parameter(name = "id", description = "代理Id", required = true) + public void start(int id){ + log.info("启用代理: " + id); + boolean result = streamProxyService.start(id); if (!result) { throw new ControllerException(ErrorCode.ERROR100); } @@ -184,6 +193,6 @@ public class StreamProxyController { @Parameter(name = "stream", description = "流id", required = true) public void stop(String app, String stream){ log.info("停用代理: " + app + "/" + stream); - streamProxyService.stop(app, stream); + streamProxyService.stopByAppAndStream(app, stream); } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java index 750eb051..7cf0a213 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java @@ -1,7 +1,7 @@ package com.genersoft.iot.vmp.streamProxy.dao; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; -import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; +import com.genersoft.iot.vmp.streamProxy.dao.provider.StreamProxyProvider; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; @@ -11,10 +11,10 @@ import java.util.List; @Repository public interface StreamProxyMapper { - @Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, src_url, " + + @Insert("INSERT INTO wvp_stream_proxy (type, app, stream,media_server_id, src_url, " + "timeout, ffmpeg_cmd_key, rtsp_type, enable_audio, enable_mp4, enable, pulling, stream_key, " + "enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" + - "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{srcUrl}, " + + "(#{type}, #{app}, #{stream}, #{mediaServerId}, #{srcUrl}, " + "#{timeout}, #{ffmpegCmdKey}, #{rtspType}, #{enableAudio}, #{enableMp4}, #{enable}, #{pulling}, #{streamKey}, " + "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )") @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") @@ -24,10 +24,8 @@ public interface StreamProxyMapper { "SET type=#{type}, " + "app=#{app}," + "stream=#{stream}," + - "name=#{name}," + "app=#{app}," + "stream=#{stream}," + - "url=#{url}, " + "media_server_id=#{mediaServerId}, " + "src_url=#{srcUrl}," + "timeout=#{timeout}, " + @@ -46,45 +44,14 @@ public interface StreamProxyMapper { @Delete("DELETE FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}") int delByAppAndStream(String app, String stream); - @Select("SELECT " + - " st.*, " + - " st.id as stream_proxy_id, " + - " wdc.*, " + - " wdc.id as gb_id" + - " FROM wvp_stream_proxy st " + - " LEFT join wvp_device_channel wdc " + - " on st.id = wdc.stream_proxy_id " + - " WHERE " + - " 1=1 " + - " AND (st.app LIKE concat('%',#{query},'%') OR st.stream LIKE concat('%',#{query},'%') " + - " OR wdc.gb_device_id LIKE concat('%',#{query},'%') OR wdc.gb_name LIKE concat('%',#{query},'%')) " + - " AND st.pulling=1" + - " AND st.pulling=0 " + - " AND st.media_server_id=#{mediaServerId} " + - "order by st.create_time desc") - List selectAll(@Param("query") String query, @Param("pushing") Boolean pushing, @Param("mediaServerId") String mediaServerId); + @SelectProvider(type = StreamProxyProvider.class, method = "selectAll") + List selectAll(@Param("query") String query, @Param("pulling") Boolean pulling, @Param("mediaServerId") String mediaServerId); - @Select("SELECT " + - " st.*, " + - " st.id as stream_proxy_id, " + - " wdc.*, " + - " wdc.id as gb_id" + - " FROM wvp_stream_proxy st " + - " LEFT join wvp_device_channel wdc " + - " on st.id = wdc.stream_proxy_id " + - " WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc") + @SelectProvider(type = StreamProxyProvider.class, method = "selectOneByAppAndStream") StreamProxy selectOneByAppAndStream(@Param("app") String app, @Param("stream") String stream); - @Select("SELECT " + - " st.*, " + - " st.id as stream_proxy_id, " + - " wdc.*, " + - " wdc.id as gb_id" + - " FROM wvp_stream_proxy st " + - " LEFT join wvp_device_channel wdc " + - " on st.id = wdc.stream_proxy_id " + - "WHERE st.enable=#{enable} and st.media_server_id= #{id} order by st.create_time desc") - List selectForEnableInMediaServer(@Param("id") String id, @Param("enable") boolean enable); + @SelectProvider(type = StreamProxyProvider.class, method = "selectForEnableInMediaServer") + List selectForEnableInMediaServer(@Param("mediaServerId") String mediaServerId, @Param("enable") boolean enable); @Select("select count(1) from wvp_stream_proxy") @@ -114,4 +81,7 @@ public interface StreamProxyMapper { "SET pulling=false " + "WHERE id=#{id}") int offline(@Param("id") int id); + + @SelectProvider(type = StreamProxyProvider.class, method = "select") + StreamProxy select(@Param("id") int id); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/provider/StreamProxyProvider.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/provider/StreamProxyProvider.java new file mode 100644 index 00000000..faa90bd7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/provider/StreamProxyProvider.java @@ -0,0 +1,63 @@ +package com.genersoft.iot.vmp.streamProxy.dao.provider; + +import java.util.Map; + +public class StreamProxyProvider { + + public String getBaseSelectSql(){ + return "SELECT " + + " st.*, " + + " st.id as stream_proxy_id, " + + " wdc.*, " + + " wdc.id as gb_id" + + " FROM wvp_stream_proxy st " + + " LEFT join wvp_device_channel wdc " + + " on st.id = wdc.stream_proxy_id "; + } + + public String select(Map params ){ + return getBaseSelectSql() + " WHERE st.id = " + params.get("id"); + } + + public String selectForEnableInMediaServer(Map params ){ + return getBaseSelectSql() + String.format(" WHERE st.enable=%s and st.media_server_id= %s order by st.create_time desc", + params.get("enable"), params.get("mediaServerId")); + } + + public String selectOneByAppAndStream(Map params ){ + return getBaseSelectSql() + String.format(" WHERE st.app=%s AND st.stream=%s order by st.create_time desc", + params.get("app"), params.get("stream")); + } + + public String selectAll(Map params ){ + StringBuilder sqlBuild = new StringBuilder(); + sqlBuild.append(getBaseSelectSql()); + sqlBuild.append(" WHERE 1=1 "); + if (params.get("query") != null) { + sqlBuild.append(" AND ") + .append(" (") + .append(" st.app LIKE ").append("'%").append(params.get("query")).append("%'") + .append(" OR") + .append(" st.stream LIKE ").append("'%").append(params.get("query")).append("%'") + .append(" OR") + .append(" wdc.gb_device_id LIKE ").append("'%").append(params.get("query")).append("%'") + .append(" OR") + .append(" wdc.gb_name LIKE ").append("'%").append(params.get("query")).append("%'") + .append(" )") + ; + } + Object pulling = params.get("pulling"); + if (pulling != null) { + if ((Boolean) pulling) { + sqlBuild.append(" AND st.pulling=1 "); + }else { + sqlBuild.append(" AND st.pulling=0 "); + } + } + if (params.get("mediaServerId") != null) { + sqlBuild.append(" AND st.media_server_id='").append(params.get("mediaServerId")).append("'"); + } + sqlBuild.append(" order by st.create_time desc"); + return sqlBuild.toString(); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java index ab224bda..c4c362ee 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.streamProxy.service; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.github.pagehelper.PageInfo; @@ -14,7 +15,7 @@ public interface IStreamProxyService { * 保存视频代理 * @param param */ - StreamInfo save(StreamProxy param); + StreamInfo save(StreamProxyParam param); /** * 分页查询 @@ -29,7 +30,7 @@ public interface IStreamProxyService { * @param app * @param stream */ - void del(String app, String stream); + void delteByAppAndStream(String app, String stream); /** * 启用视频代理 @@ -37,17 +38,7 @@ public interface IStreamProxyService { * @param stream * @return */ - boolean start(String app, String stream); - - /** - * 更新状态 - * @param status 状态 - * @param app - * @param stream - */ - int updateStatusByAppAndStream(String app, String stream, boolean status); - - + boolean startByAppAndStream(String app, String stream); /** * 停用用视频代理 @@ -55,7 +46,7 @@ public interface IStreamProxyService { * @param stream * @return */ - void stop(String app, String stream); + void stopByAppAndStream(String app, String stream); /** * 获取ffmpeg.cmd模板 @@ -88,7 +79,7 @@ public interface IStreamProxyService { /** * 更新代理流 */ - boolean updateStreamProxy(StreamProxy streamProxyItem); + boolean update(StreamProxy streamProxyItem); /** * 获取统计信息 @@ -97,4 +88,10 @@ public interface IStreamProxyService { ResourceBaseInfo getOverview(); StreamInfo add(StreamProxy streamProxy); + + StreamProxy getStreamProxy(int id); + + void delte(int id); + + boolean start(int id); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 0386b056..cacd1b06 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam; import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import com.genersoft.iot.vmp.utils.DateUtil; @@ -72,10 +73,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService { * 流到来的处理 */ @Async("taskExecutor") + @Transactional @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { if ("rtsp".equals(event.getSchema())) { - updateStatusByAppAndStream(event.getApp(), event.getStream(), true); + streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), true); } } @@ -84,9 +86,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService { */ @Async("taskExecutor") @EventListener + @Transactional public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema())) { - updateStatusByAppAndStream(event.getApp(), event.getStream(), false); + streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), false); } } @@ -102,7 +105,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { // 拉流代理 StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream()); if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) { - start(event.getApp(), event.getStream()); + startByAppAndStream(event.getApp(), event.getStream()); } } @@ -129,58 +132,80 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override @Transactional - public StreamInfo save(StreamProxy streamProxy) { - MediaServer mediaServer; - if (ObjectUtils.isEmpty(streamProxy.getMediaServerId()) || "auto".equals(streamProxy.getMediaServerId())){ - mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + public StreamInfo save(StreamProxyParam param) { + // 兼容旧接口 + StreamProxy streamProxyInDb = getStreamProxyByAppAndStream(param.getApp(), param.getStream()); + if (streamProxyInDb != null && streamProxyInDb.getPulling()) { + stopProxy(streamProxyInDb); + } + if (streamProxyInDb == null){ + return add(param.buildStreamProxy()); }else { - mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId()); - } - if (mediaServer == null) { - log.warn("保存代理未找到在线的ZLM..."); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM"); + stopProxy(streamProxyInDb); + streamProxyMapper.delete(streamProxyInDb.getId()); + return add(param.buildStreamProxy()); } + } - streamProxy.setMediaServerId(mediaServer.getId()); - boolean saveResult; - // 更新 - if (streamProxyMapper.selectOneByAppAndStream(streamProxy.getApp(), streamProxy.getStream()) != null) { - saveResult = updateStreamProxy(streamProxy); - }else { // 新增 - saveResult = addStreamProxy(streamProxy); + @Override + @Transactional + public StreamInfo add(StreamProxy streamProxy) { + StreamProxy streamProxyInDb = streamProxyMapper.selectOneByAppAndStream(streamProxy.getApp(), streamProxy.getStream()); + if (streamProxyInDb != null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "APP+STREAM已经存在"); } - if (!saveResult) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存失败"); + if (streamProxy.getGbDeviceId() != null) { + gbChannelService.add(streamProxy.buildCommonGBChannel()); } - + streamProxy.setCreateTime(DateUtil.getNow()); + streamProxy.setUpdateTime(DateUtil.getNow()); + streamProxyMapper.add(streamProxy); + streamProxy.setStreamProxyId(streamProxy.getId()); if (streamProxy.isEnable()) { - return mediaServerService.startProxy(mediaServer, streamProxy); + return startProxy(streamProxy); } return null; } - /** - * 新增代理流 - */ - @Transactional - public boolean addStreamProxy(StreamProxy streamProxy) { - String now = DateUtil.getNow(); - streamProxy.setCreateTime(now); - streamProxy.setUpdateTime(now); - - if (streamProxyMapper.add(streamProxy) > 0 && !ObjectUtils.isEmpty(streamProxy.getGbDeviceId())) { - gbChannelService.add(streamProxy.buildCommonGBChannel()); + @Override + public void delte(int id) { + StreamProxy streamProxy = getStreamProxy(id); + if (streamProxy == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在"); } - return true; + delte(streamProxy); + } + + private void delte(StreamProxy streamProxy) { + if (streamProxy.getPulling()) { + stopProxy(streamProxy); + } + if(streamProxy.getGbId() > 0) { + gbChannelService.delete(streamProxy.getGbId()); + } + streamProxyMapper.delete(streamProxy.getId()); + } + + @Override + @Transactional + public void delteByAppAndStream(String app, String stream) { + StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream); + if (streamProxy == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在"); + } + delte(streamProxy); } /** * 更新代理流 */ @Override - public boolean updateStreamProxy(StreamProxy streamProxy) { + public boolean update(StreamProxy streamProxy) { streamProxy.setUpdateTime(DateUtil.getNow()); - + StreamProxy streamProxyInDb = streamProxyMapper.select(streamProxy.getId()); + if (streamProxyInDb == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在"); + } if (streamProxyMapper.update(streamProxy) > 0 && !ObjectUtils.isEmpty(streamProxy.getGbDeviceId())) { if (streamProxy.getGbId() > 0) { gbChannelService.update(streamProxy.buildCommonGBChannel()); @@ -188,6 +213,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService { gbChannelService.add(streamProxy.buildCommonGBChannel()); } } + // 判断是否需要重启代理 + if (!streamProxyInDb.getApp().equals(streamProxy.getApp()) + || !streamProxyInDb.getStream().equals(streamProxy.getStream()) + || !streamProxyInDb.getMediaServerId().equals(streamProxy.getMediaServerId()) + ) { + // app/stream 变化则重启代理 + stopProxy(streamProxyInDb); + startProxy(streamProxy); + } return true; } @@ -198,63 +232,47 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return new PageInfo<>(all); } - @Override - @Transactional - public void del(String app, String stream) { - StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream); - if (streamProxy == null) { - return; - } - if (streamProxy.getStreamKey() != null) { - MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId()); - if (mediaServer != null) { - mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey()); - } - } - if (streamProxy.getGbId() > 0) { - gbChannelService.delete(streamProxy.getGbId()); - } - streamProxyMapper.delete(streamProxy.getId()); - } @Override - public boolean start(String app, String stream) { + public boolean startByAppAndStream(String app, String stream) { StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream); if (streamProxy == null) { throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); } + StreamInfo streamInfo = startProxy(streamProxy); + return streamInfo != null; + } + + @Override + public void stopByAppAndStream(String app, String stream) { + StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream); + if (streamProxy == null) { + throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); + } + stopProxy(streamProxy); + } + + private void stopProxy(StreamProxy streamProxy){ + MediaServer mediaServer; - if (ObjectUtils.isEmpty(streamProxy.getMediaServerId()) || "auto".equals(streamProxy.getMediaServerId())){ + String mediaServerId = streamProxy.getMediaServerId(); + if (mediaServerId == null) { mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); }else { - mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId()); + mediaServer = mediaServerService.getOne(mediaServerId); } if (mediaServer == null) { - log.warn("[启用代理] 未找到可用的媒体节点"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点"); } - StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy); - if (streamInfo == null) { - log.warn("[启用代理] 失败"); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "失败"); + if (ObjectUtils.isEmpty(streamProxy.getStreamKey())) { + mediaServerService.closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream()); + }else { + mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey()); } - if (!streamProxy.isEnable()) { - updateStreamProxy(streamProxy); - } - return true; - } - - @Override - public void stop(String app, String stream) { - StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream); - if (streamProxy == null) { - throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); - } - MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId()); - if (mediaServer == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到启用时使用的媒体节点"); - } - mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey()); + streamProxy.setMediaServerId(mediaServer.getId()); + streamProxy.setStreamKey(null); + streamProxy.setPulling(false); + streamProxyMapper.update(streamProxy); } @Override @@ -264,8 +282,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override - public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) { - return streamProxyMapper.selectOneByAppAndStream(app, streamId); + public StreamProxy getStreamProxyByAppAndStream(String app, String stream) { + return streamProxyMapper.selectOneByAppAndStream(app, stream); } @Override @@ -387,16 +405,19 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } - @Override @Transactional - public int updateStatusByAppAndStream(String app, String stream, boolean status) { + public void streamChangeHandler(String app, String stream, String mediaServerId, boolean status) { // 状态变化时推送到国标上级 StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream); if (streamProxy == null) { - return 0; + return; } - streamProxy.setPulling(true); - streamProxyMapper.online(streamProxy.getId()); + streamProxy.setPulling(status); + if (!mediaServerId.equals(streamProxy.getMediaServerId())) { + streamProxy.setMediaServerId(mediaServerId); + } + streamProxy.setUpdateTime(DateUtil.getNow()); + streamProxyMapper.update(streamProxy); streamProxy.setGbStatus(status?"ON":"OFF"); if (streamProxy.getGbId() > 0) { if (status) { @@ -405,7 +426,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { gbChannelService.offline(streamProxy.buildCommonGBChannel()); } } - return 1; } @Override @@ -417,21 +437,16 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return new ResourceBaseInfo(total, online); } + + @Override - @Transactional - public StreamInfo add(StreamProxy streamProxy) { - StreamProxy streamProxyInDb = streamProxyMapper.selectOneByAppAndStream(streamProxy.getApp(), streamProxy.getStream()); - if (streamProxyInDb != null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "APP+STREAM已经存在"); + public boolean start(int id) { + StreamProxy streamProxy = streamProxyMapper.select(id); + if (streamProxy == null) { + throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); } - if (streamProxy.getGbDeviceId() != null) { - gbChannelService.add(streamProxy.buildCommonGBChannel()); - } - streamProxyMapper.add(streamProxy); - if (streamProxy.isEnable()) { - return startProxy(streamProxy); - } - return null; + StreamInfo streamInfo = startProxy(streamProxy); + return streamInfo != null; } private StreamInfo startProxy(StreamProxy streamProxy){ @@ -440,7 +455,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } MediaServer mediaServer; String mediaServerId = streamProxy.getMediaServerId(); - if (mediaServerId == null || "auto".equals(mediaServerId)) { + if (mediaServerId == null) { mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); }else { mediaServer = mediaServerService.getOne(mediaServerId); @@ -448,10 +463,22 @@ public class StreamProxyServiceImpl implements IStreamProxyService { if (mediaServer == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点"); } - - return mediaServerService.startProxy(mediaServer, streamProxy); - + StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy); + if (mediaServerId == null) { + streamProxy.setMediaServerId(mediaServer.getId()); + update(streamProxy); + } + return streamInfo; } + + @Override + public StreamProxy getStreamProxy(int id) { + return streamProxyMapper.select(id); + } + + + + // @Scheduled(cron = "* 0/10 * * * ?") // public void asyncCheckStreamProxyStatus() { // diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java index 7a50eac2..b7a5f1a1 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java @@ -3,12 +3,9 @@ package com.genersoft.iot.vmp.streamPush.controller; import com.alibaba.excel.EasyExcel; import com.alibaba.excel.ExcelReader; import com.alibaba.excel.read.metadata.ReadSheet; -import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; -import com.genersoft.iot.vmp.conf.security.SecurityUtils; -import com.genersoft.iot.vmp.conf.security.dto.LoginUser; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.media.service.IMediaServerService; @@ -19,7 +16,6 @@ import com.genersoft.iot.vmp.streamPush.bean.StreamPushExcelDto; import com.genersoft.iot.vmp.streamPush.enent.StreamPushUploadFileHandler; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; import io.swagger.v3.oas.annotations.Operation; @@ -193,33 +189,6 @@ public class StreamPushController { return result; } - /** - * 获取推流播放地址 - * @param app 应用名 - * @param stream 流id - * @return - */ - @GetMapping(value = "/getPlayUrl") - @ResponseBody - @Operation(summary = "获取推流播放地址", security = @SecurityRequirement(name = JwtUtils.HEADER)) - @Parameter(name = "app", description = "应用名", required = true) - @Parameter(name = "stream", description = "流id", required = true) - @Parameter(name = "mediaServerId", description = "媒体服务器id") - public StreamContent getPlayUrl(@RequestParam String app, @RequestParam String stream, - @RequestParam(required = false) String mediaServerId){ - boolean authority = false; - // 是否登陆用户, 登陆用户返回完整信息 - LoginUser userInfo = SecurityUtils.getUserInfo(); - if (userInfo!= null) { - authority = true; - } - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); - if (streamInfo == null){ - throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取播放地址失败"); - } - return new StreamContent(streamInfo); - } - /** * 添加推流信息 * @param stream 推流信息 diff --git a/web_src/src/components/StreamProxyEdit.vue b/web_src/src/components/StreamProxyEdit.vue index 0f8ffcac..bf9877c7 100644 --- a/web_src/src/components/StreamProxyEdit.vue +++ b/web_src/src/components/StreamProxyEdit.vue @@ -12,7 +12,7 @@ - + @@ -34,7 +34,7 @@ - + @@ -44,6 +44,7 @@ style="width: 100%" placeholder="请选择拉流节点" > + - +
- 保存 + 保存 取消
@@ -109,6 +110,7 @@ diff --git a/web_src/src/components/StreamProxyList.vue b/web_src/src/components/StreamProxyList.vue index e2c740c0..30d23cf3 100755 --- a/web_src/src/components/StreamProxyList.vue +++ b/web_src/src/components/StreamProxyList.vue @@ -5,6 +5,27 @@