diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java index 7a2e03cc..758dd8e1 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.common; import com.genersoft.iot.vmp.media.bean.MediaInfo; +import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.bean.DownloadFileInfo; import io.swagger.v3.oas.annotations.media.Schema; @@ -69,8 +70,8 @@ public class StreamInfo implements Serializable, Cloneable{ @Schema(description = "RTCS流地址") private StreamURL rtcs; - @Schema(description = "流媒体ID") - private String mediaServerId; + @Schema(description = "流媒体节点") + private MediaServer mediaServer; @Schema(description = "流编码信息") private MediaInfo mediaInfo; @Schema(description = "开始时间") @@ -91,6 +92,9 @@ public class StreamInfo implements Serializable, Cloneable{ @Schema(description = "转码后的视频流") private StreamInfo transcodeStream; + @Schema(description = "使用的WVP ID") + private String serverId; + public void setFlv(StreamURL flv) { this.flv = flv; } @@ -481,12 +485,12 @@ public class StreamInfo implements Serializable, Cloneable{ return rtcs; } - public String getMediaServerId() { - return mediaServerId; + public MediaServer getMediaServer() { + return mediaServer; } - public void setMediaServerId(String mediaServerId) { - this.mediaServerId = mediaServerId; + public void setMediaServer(MediaServer mediaServer) { + this.mediaServer = mediaServer; } public MediaInfo getMediaInfo() { @@ -647,4 +651,12 @@ public class StreamInfo implements Serializable, Cloneable{ public void setOriginType(int originType) { this.originType = originType; } + + public String getServerId() { + return serverId; + } + + public void setServerId(String serverId) { + this.serverId = serverId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonChannelPlayInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonChannelPlayInfo.java deleted file mode 100644 index 974ccf4f..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonChannelPlayInfo.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.bean; - -import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.media.bean.MediaServer; -import lombok.Data; - -@Data -public class CommonChannelPlayInfo { - - private StreamInfo streamInfo; - - private MediaServer mediaServer; - - public static CommonChannelPlayInfo build(MediaServer mediaServer, StreamInfo data) { - CommonChannelPlayInfo commonChannelPlayInfo = new CommonChannelPlayInfo(); - commonChannelPlayInfo.setMediaServer(mediaServer); - commonChannelPlayInfo.setStreamInfo(data); - return commonChannelPlayInfo; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlatformController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlatformController.java index 0a4f6533..91fbd0ab 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlatformController.java @@ -256,7 +256,7 @@ public class PlatformController { boolean deleteResult = storager.deleteParentPlatform(parentPlatform); // storager.delCatalogByPlatformId(parentPlatform.getServerGBId()); - storager.delRelationByPlatformId(parentPlatform.getServerGBId()); +// storager.delRelationByPlatformId(parentPlatform.getServerGBId()); // 停止发送位置订阅定时任务 String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_MobilePosition_" + parentPlatform.getServerGBId(); dynamicTask.stop(key); @@ -396,11 +396,11 @@ public class PlatformController { if (log.isDebugEnabled()) { log.debug("删除关联,{}", JSON.toJSONString(platformCatalog)); } - int delResult = storager.delRelation(platformCatalog); - - if (delResult <= 0) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "写入数据库失败"); - } +// int delResult = storager.delRelation(platformCatalog); +// +// if (delResult <= 0) { +// throw new ControllerException(ErrorCode.ERROR100.getCode(), "写入数据库失败"); +// } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java new file mode 100644 index 00000000..da530aca --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java @@ -0,0 +1,12 @@ +package com.genersoft.iot.vmp.gb28181.service; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; +import com.genersoft.iot.vmp.gb28181.bean.InviteInfo; +import com.genersoft.iot.vmp.gb28181.bean.Platform; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; + +public interface IGbChannelPlayService { + + void start(CommonGBChannel channel, InviteInfo inviteInfo, Platform platform, ErrorCallback callback); +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java index f86dcbea..e33655a9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.gb28181.service; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.github.pagehelper.PageInfo; import java.util.Collection; @@ -79,5 +78,4 @@ public interface IGbChannelService { CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId); - void start(CommonGBChannel channel, InviteInfo inviteInfo, ErrorCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java index 651a3dfa..88c43b39 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; /** @@ -58,12 +59,12 @@ public interface IInviteStreamService { /** * 添加一个invite回调 */ - void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback callback); + void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback callback); /** * 调用一个invite回调 */ - void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data); + void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, StreamInfo data); /** * 清空一个设备的所有invite信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java index 17287f4e..45f88406 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java @@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.exception.ServiceException; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.bean.ErrorCallback; @@ -25,19 +22,19 @@ import java.text.ParseException; public interface IPlayService { void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channelId, - ErrorCallback callback); + ErrorCallback callback); SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback); StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId); MediaServer getNewMediaServerItem(Device device); - void playBack(String deviceId, String channelId, String startTime, String endTime, ErrorCallback callback); - void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback callback); + void playBack(String deviceId, String channelId, String startTime, String endTime, ErrorCallback callback); + void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback callback); void zlmServerOffline(String mediaServerId); - void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); - void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); + void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); + void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream); @@ -66,4 +63,10 @@ public interface IPlayService { void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback); void stopPlay(Device device, String channelId); + + void play(CommonGBChannel channel, ErrorCallback callback); + + void playBack(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback callback); + + void download(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, ErrorCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java new file mode 100644 index 00000000..57af9c9c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java @@ -0,0 +1,159 @@ +package com.genersoft.iot.vmp.gb28181.service.impl; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; +import com.genersoft.iot.vmp.gb28181.bean.InviteInfo; +import com.genersoft.iot.vmp.gb28181.bean.Platform; +import com.genersoft.iot.vmp.gb28181.bean.PlayException; +import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; +import com.genersoft.iot.vmp.gb28181.service.IPlayService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; +import com.genersoft.iot.vmp.streamPush.service.IStreamPushPlayService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.sip.message.Response; + +@Slf4j +@Service +public class GbChannelPlayServiceImpl implements IGbChannelPlayService { + + @Autowired + private IPlayService deviceChannelPlayService; + + @Autowired + private IStreamProxyPlayService streamProxyPlayService; + + @Autowired + private IStreamPushPlayService streamPushPlayService; + + + @Override + public void start(CommonGBChannel channel, InviteInfo inviteInfo, Platform platform, ErrorCallback callback) { + if (channel == null || inviteInfo == null || callback == null) { + log.warn("[通用通道点播] 参数异常, channel: {}, inviteInfo: {}, callback: {}", channel != null, inviteInfo != null, callback != null); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + log.info("[点播通用通道] 类型:{}, 通道: {}({})", inviteInfo.getSessionName(), channel.getGbName(), channel.getGbDeviceId()); + if ("Play".equalsIgnoreCase(inviteInfo.getSessionName())) { + if (channel.getGbDeviceDbId() > 0) { + // 国标通道 + playGbDeviceChannel(channel, callback); + } else if (channel.getStreamProxyId() > 0) { + // 拉流代理 + playProxy(channel, callback); + } else if (channel.getStreamPushId() > 0) { + // 推流 + playPush(channel, platform.getServerGBId(), platform.getName(), callback); + } else { + // 通道数据异常 + log.error("[点播通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + }else if ("Playback".equals(inviteInfo.getSessionName())) { + if (channel.getGbDeviceDbId() > 0) { + // 国标通道 + playbackGbDeviceChannel(channel, inviteInfo.getStartTime(), inviteInfo.getStopTime(), callback); + } else if (channel.getStreamProxyId() > 0) { + // 拉流代理 + log.warn("[回放通用通道] 不支持回放拉流代理的录像: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.FORBIDDEN, "forbidden"); + } else if (channel.getStreamPushId() > 0) { + // 推流 + log.warn("[回放通用通道] 不支持回放推流的录像: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.FORBIDDEN, "forbidden"); + } else { + // 通道数据异常 + log.error("[回放通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + }else if ("Download".equals(inviteInfo.getSessionName())) { + if (channel.getGbDeviceDbId() > 0) { + int downloadSpeed = 4; + try { + if (inviteInfo.getDownloadSpeed() != null){ + downloadSpeed = Integer.parseInt(inviteInfo.getDownloadSpeed()); + } + }catch (Exception ignored) {} + + // 国标通道 + downloadGbDeviceChannel(channel, inviteInfo.getStartTime(), inviteInfo.getStopTime(), downloadSpeed, callback); + } else if (channel.getStreamProxyId() > 0) { + // 拉流代理 + log.warn("[下载通用通道录像] 不支持下载拉流代理的录像: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.FORBIDDEN, "forbidden"); + } else if (channel.getStreamPushId() > 0) { + // 推流 + log.warn("[下载通用通道录像] 不支持下载推流的录像: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.FORBIDDEN, "forbidden"); + } else { + // 通道数据异常 + log.error("[回放通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + }else { + // 不支持的点播方式 + log.error("[点播通用通道] 不支持的点播方式:{}, {}({})", inviteInfo.getSessionName(), channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.BAD_REQUEST, "bad request"); + } + } + + private void playGbDeviceChannel(CommonGBChannel channel, ErrorCallback callback){ + // 国标通道 + try { + deviceChannelPlayService.play(channel, callback); + } catch (PlayException e) { + callback.run(e.getCode(), e.getMsg(), null); + } catch (Exception e) { + callback.run(Response.BUSY_HERE, "busy here", null); + } + } + + private void playProxy(CommonGBChannel channel, ErrorCallback callback){ + // 拉流代理通道 + try { + StreamInfo streamInfo = streamProxyPlayService.start(channel.getStreamProxyId()); + if (streamInfo == null) { + callback.run(Response.BUSY_HERE, "busy here", null); + }else { + callback.run(Response.OK, "success", streamInfo); + } + }catch (Exception e) { + callback.run(Response.BUSY_HERE, "busy here", null); + } + } + + private void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback callback){ + // 推流 + try { + streamPushPlayService.start(channel.getStreamPushId(), callback, platformDeviceId, platformName); + }catch (PlayException e) { + callback.run(e.getCode(), e.getMsg(), null); + }catch (Exception e) { + callback.run(Response.BUSY_HERE, "busy here", null); + } + } + + private void playbackGbDeviceChannel(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback callback){ + try { + deviceChannelPlayService.playBack(channel, startTime, stopTime, callback); + } catch (PlayException e) { + callback.run(e.getCode(), e.getMsg(), null); + } catch (Exception e) { + callback.run(Response.BUSY_HERE, "busy here", null); + } + } + + private void downloadGbDeviceChannel(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, + ErrorCallback callback){ + try { + deviceChannelPlayService.download(channel, startTime, stopTime, downloadSpeed, callback); + } catch (PlayException e) { + callback.run(e.getCode(), e.getMsg(), null); + } catch (Exception e) { + callback.run(Response.BUSY_HERE, "busy here", null); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index e6c9f1a2..8fcefeea 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.service.impl; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper; @@ -8,11 +7,7 @@ import com.genersoft.iot.vmp.gb28181.dao.GroupMapper; import com.genersoft.iot.vmp.gb28181.dao.RegionMapper; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; -import com.genersoft.iot.vmp.gb28181.service.IPlayService; -import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.github.pagehelper.PageHelper; @@ -24,7 +19,6 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; -import javax.sip.message.Response; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -40,21 +34,12 @@ public class GbChannelServiceImpl implements IGbChannelService { @Autowired private CommonGBChannelMapper commonGBChannelMapper; - @Autowired - private IDeviceService deviceService; - - @Autowired - private IPlayService playService; - @Autowired private RegionMapper regionMapper; @Autowired private GroupMapper groupMapper; - @Autowired - private UserSetting userSetting; - @Override public CommonGBChannel queryByDeviceId(String gbDeviceId) { return commonGBChannelMapper.queryByDeviceId(gbDeviceId); @@ -652,48 +637,4 @@ public class GbChannelServiceImpl implements IGbChannelService { public CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId) { return commonGBChannelMapper.queryOneWithPlatform(platformId, channelDeviceId); } - - @Override - public void start(CommonGBChannel channel, InviteInfo inviteInfo, ErrorCallback callback) { - if (channel == null || inviteInfo == null || callback == null) { - log.warn("[通用通道点播] 参数异常, channel: {}, inviteInfo: {}, callback: {}", channel != null, inviteInfo != null, callback != null); - throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); - } - log.info("[点播通用通道] 类型:{}, 通道: {}({})", inviteInfo.getSessionName(), channel.getGbName(), channel.getGbDeviceId()); - if ("Play".equalsIgnoreCase(inviteInfo.getSessionName())) { - if (channel.getGbDeviceDbId() > 0) { - // 国标通道 - Device device = deviceService.getDevice(channel.getGbDeviceDbId()); - if (device == null) { - log.warn("[点播] 未找到通道{}的设备信息", channel); - throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); - } - MediaServer mediaServer = playService.getNewMediaServerItem(device); - if (mediaServer == null) { - log.warn("[点播] 未找到可用媒体节点"); - throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); - } - - playService.play(mediaServer, device.getDeviceId(), channel.getGbDeviceId(), null, (code, msg, data) -> { - if (callback != null) { - callback.run(code, msg, CommonChannelPlayInfo.build(mediaServer, data)); - } - }); - } else if (channel.getStreamProxyId() > 0) { - // 拉流代理 - } else if (channel.getStreamPushId() > 0) { - // 推流 - } else { - // 通道数据异常 - log.error("[点播通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId()); - throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); - } - }else if ("Playback".equals(inviteInfo.getSessionName())) { - - }else if ("Download".equals(inviteInfo.getSessionName())) { - - }else { - - } - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index 951e11a3..b81f07a0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.alibaba.fastjson2.JSON; import com.baomidou.dynamic.datasource.annotation.DS; -import com.genersoft.iot.vmp.common.InviteInfo; -import com.genersoft.iot.vmp.common.InviteSessionStatus; -import com.genersoft.iot.vmp.common.InviteSessionType; -import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; @@ -33,7 +30,7 @@ import java.util.concurrent.TimeUnit; @DS("master") public class InviteStreamServiceImpl implements IInviteStreamService { - private final Map>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); + private final Map>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); @Autowired private RedisTemplate redisTemplate; @@ -233,9 +230,9 @@ public class InviteStreamServiceImpl implements IInviteStreamService { } @Override - public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback callback) { + public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback callback) { String key = buildKey(type, deviceId, channelId, stream); - List> callbacks = inviteErrorCallbackMap.get(key); + List> callbacks = inviteErrorCallbackMap.get(key); if (callbacks == null) { callbacks = new CopyOnWriteArrayList<>(); inviteErrorCallbackMap.put(key, callbacks); @@ -270,7 +267,10 @@ public class InviteStreamServiceImpl implements IInviteStreamService { for (Object keyObj : scanResult) { String keyStr = (String) keyObj; InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr); - if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) { + if (inviteInfo != null + && inviteInfo.getStreamInfo() != null + && inviteInfo.getStreamInfo().getMediaServer() != null + && inviteInfo.getStreamInfo().getMediaServer().getId().equals(mediaServerId)) { if (inviteInfo.getType().equals(InviteSessionType.DOWNLOAD) && inviteInfo.getStreamInfo().getProgress() == 1) { continue; } @@ -282,13 +282,13 @@ public class InviteStreamServiceImpl implements IInviteStreamService { } @Override - public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) { + public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, StreamInfo data) { String key = buildSubStreamKey(type, deviceId, channelId, stream); - List> callbacks = inviteErrorCallbackMap.get(key); + List> callbacks = inviteErrorCallbackMap.get(key); if (callbacks == null) { return; } - for (ErrorCallback callback : callbacks) { + for (ErrorCallback callback : callbacks) { callback.run(code, msg, data); } inviteErrorCallbackMap.remove(key); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index b7991648..61b2d75a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -120,29 +120,29 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { private List getDeviceChannelListByChannelReduceList(List channelReduces, String catalogId, Platform platform) { List deviceChannelList = new ArrayList<>(); - if (!channelReduces.isEmpty()){ - PlatformCatalog catalog = catalogManager.selectByPlatFormAndCatalogId(platform.getServerGBId(),catalogId); - if (catalog == null && catalogId.equals(platform.getDeviceGBId())) { - for (ChannelReduce channelReduce : channelReduces) { - DeviceChannel deviceChannel = deviceChannelMapper.getOne(channelReduce.getId()); - deviceChannel.setParental(0); - deviceChannel.setCivilCode(platform.getServerGBDomain()); - deviceChannelList.add(deviceChannel); - } - return deviceChannelList; - } else if (catalog == null && !catalogId.equals(platform.getDeviceGBId())) { - log.warn("未查询到目录{}的信息", catalogId); - return null; - } - for (ChannelReduce channelReduce : channelReduces) { - DeviceChannel deviceChannel = deviceChannelMapper.getOne(channelReduce.getId()); - deviceChannel.setParental(0); - deviceChannel.setCivilCode(catalog.getCivilCode()); - deviceChannel.setParentId(catalog.getParentId()); - deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId()); - deviceChannelList.add(deviceChannel); - } - } +// if (!channelReduces.isEmpty()){ +// PlatformCatalog catalog = catalogManager.selectByPlatFormAndCatalogId(platform.getServerGBId(),catalogId); +// if (catalog == null && catalogId.equals(platform.getDeviceGBId())) { +// for (ChannelReduce channelReduce : channelReduces) { +// DeviceChannel deviceChannel = deviceChannelMapper.getOne(channelReduce.getId()); +// deviceChannel.setParental(0); +// deviceChannel.setCivilCode(platform.getServerGBDomain()); +// deviceChannelList.add(deviceChannel); +// } +// return deviceChannelList; +// } else if (catalog == null && !catalogId.equals(platform.getDeviceGBId())) { +// log.warn("未查询到目录{}的信息", catalogId); +// return null; +// } +// for (ChannelReduce channelReduce : channelReduces) { +// DeviceChannel deviceChannel = deviceChannelMapper.getOne(channelReduce.getId()); +// deviceChannel.setParental(0); +// deviceChannel.setCivilCode(catalog.getCivilCode()); +// deviceChannel.setParentId(catalog.getParentId()); +// deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId()); +// deviceChannelList.add(deviceChannel); +// } +// } return deviceChannelList; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 20332854..2cf3dc4a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -4,28 +4,29 @@ import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionStatus; import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; +import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.HookData; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; -import com.genersoft.iot.vmp.gb28181.service.IPlatformService; -import com.genersoft.iot.vmp.gb28181.service.IPlayService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -88,9 +89,6 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private VideoStreamSessionManager streamSession; - @Autowired - private IPlayService playService; - @Autowired private IInviteStreamService inviteStreamService; @@ -493,7 +491,7 @@ public class PlatformServiceImpl implements IPlatformService { if (inviteInfoForOld != null && inviteInfoForOld.getStreamInfo() != null) { // 如果zlm不存在这个流,则删除数据即可 - MediaServer mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfoForOld.getStreamInfo().getMediaServerId()); + MediaServer mediaServerItemForStreamInfo = mediaServerService.getOne(inviteInfoForOld.getStreamInfo().getMediaServer().getId()); if (mediaServerItemForStreamInfo != null) { Boolean ready = mediaServerService.isStreamReady(mediaServerItemForStreamInfo, inviteInfoForOld.getStreamInfo().getApp(), inviteInfoForOld.getStreamInfo().getStream()); if (!ready) { @@ -567,7 +565,7 @@ public class PlatformServiceImpl implements IPlatformService { log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId); dynamicTask.stop(timeOutTaskKey); // hook响应 - playService.onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId); + onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId); // 收到流 if (hookEvent != null) { hookEvent.response(hookData); @@ -623,6 +621,18 @@ public class PlatformServiceImpl implements IPlatformService { }); } + public void onPublishHandlerForBroadcast(MediaServer mediaServerItem, MediaInfo mediaInfo, String platFormServerId, String channelId) { + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, mediaInfo.getApp(), mediaInfo.getStream(), mediaInfo, null); + streamInfo.setChannelId(channelId); + + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platFormServerId, channelId); + if (inviteInfo != null) { + inviteInfo.setStatus(InviteSessionStatus.ok); + inviteInfo.setStreamInfo(streamInfo); + inviteStreamService.updateInviteInfo(inviteInfo); + } + } + private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem, Platform platform, String channelId, String timeOutTaskKey, ErrorCallback callback, InviteInfo inviteInfo, InviteSessionType inviteSessionType){ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 60d6ede6..227431b9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; @@ -8,7 +9,12 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.controller.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; +import com.genersoft.iot.vmp.gb28181.service.IDeviceService; +import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; +import com.genersoft.iot.vmp.gb28181.service.IPlayService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; @@ -26,10 +32,6 @@ import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; -import com.genersoft.iot.vmp.gb28181.service.IDeviceService; -import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; -import com.genersoft.iot.vmp.gb28181.service.IPlayService; import com.genersoft.iot.vmp.service.bean.DownloadFileInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; @@ -41,7 +43,6 @@ import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; -import com.genersoft.iot.vmp.gb28181.controller.bean.AudioBroadcastEvent; import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -55,6 +56,7 @@ import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.sip.header.CallIdHeader; +import javax.sip.message.Response; import java.io.File; import java.math.BigDecimal; import java.math.RoundingMode; @@ -265,7 +267,7 @@ public class PlayServiceImpl implements IPlayService { @Override - public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback) { + public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback) { if (mediaServerItem == null) { log.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); @@ -300,8 +302,7 @@ public class PlayServiceImpl implements IPlayService { null); return inviteInfo.getSsrcInfo(); } - String mediaServerId = streamInfo.getMediaServerId(); - MediaServer mediaInfo = mediaServerService.getOne(mediaServerId); + MediaServer mediaInfo = streamInfo.getMediaServer(); Boolean ready = mediaServerService.isStreamReady(mediaInfo, "rtp", streamId); if (ready != null && ready) { callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); @@ -465,7 +466,7 @@ public class PlayServiceImpl implements IPlayService { @Override public void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, - ErrorCallback callback) { + ErrorCallback callback) { if (mediaServerItem == null || ssrcInfo == null) { if (callback != null) { @@ -608,7 +609,7 @@ public class PlayServiceImpl implements IPlayService { private void tcpActiveHandler(Device device, String channelId, String contentString, MediaServer mediaServerItem, - String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback callback){ + String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback callback){ if (!device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { return; } @@ -752,7 +753,7 @@ public class PlayServiceImpl implements IPlayService { @Override public void playBack(String deviceId, String channelId, String startTime, - String endTime, ErrorCallback callback) { + String endTime, ErrorCallback callback) { Device device = deviceService.getDeviceByDeviceId(deviceId); if (device == null) { log.warn("[录像回放] 未找到设备 deviceId: {},channelId:{}", deviceId, channelId); @@ -785,7 +786,7 @@ public class PlayServiceImpl implements IPlayService { @Override public void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, - String endTime, ErrorCallback callback) { + String endTime, ErrorCallback callback) { if (mediaServerItem == null || ssrcInfo == null) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), @@ -868,7 +869,7 @@ public class PlayServiceImpl implements IPlayService { private void InviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, MediaServer mediaServerItem, - Device device, String channelId, String timeOutTaskKey, ErrorCallback callback, + Device device, String channelId, String timeOutTaskKey, ErrorCallback callback, InviteInfo inviteInfo, InviteSessionType inviteSessionType){ inviteInfo.setStatus(InviteSessionStatus.ok); ResponseEvent responseEvent = (ResponseEvent) eventResult.event; @@ -958,7 +959,7 @@ public class PlayServiceImpl implements IPlayService { @Override - public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { + public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { Device device = deviceService.getDeviceByDeviceId(deviceId); if (device == null) { return; @@ -982,7 +983,7 @@ public class PlayServiceImpl implements IPlayService { @Override - public void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { + public void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { if (mediaServerItem == null || ssrcInfo == null) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), @@ -1092,8 +1093,7 @@ public class PlayServiceImpl implements IPlayService { } // 获取当前已下载时长 - String mediaServerId = inviteInfo.getStreamInfo().getMediaServerId(); - MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); + MediaServer mediaServerItem = inviteInfo.getStreamInfo().getMediaServer(); if (mediaServerItem == null) { log.warn("[获取下载进度] 查询录像信息时发现节点不存在"); return null; @@ -1375,7 +1375,7 @@ public class PlayServiceImpl implements IPlayService { } inviteInfo.getStreamInfo().setPause(true); inviteStreamService.updateInviteInfo(inviteInfo); - MediaServer mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); + MediaServer mediaServerItem = inviteInfo.getStreamInfo().getMediaServer(); if (null == mediaServerItem) { log.warn("mediaServer 不存在!"); throw new ServiceException("mediaServer不存在"); @@ -1403,7 +1403,7 @@ public class PlayServiceImpl implements IPlayService { } inviteInfo.getStreamInfo().setPause(false); inviteStreamService.updateInviteInfo(inviteInfo); - MediaServer mediaServerItem = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); + MediaServer mediaServerItem = inviteInfo.getStreamInfo().getMediaServer(); if (null == mediaServerItem) { log.warn("mediaServer 不存在!"); throw new ServiceException("mediaServer不存在"); @@ -1580,7 +1580,7 @@ public class PlayServiceImpl implements IPlayService { if (inviteInfo != null) { if (inviteInfo.getStreamInfo() != null) { // 已存在线直接截图 - MediaServer mediaServerItemInuse = mediaServerService.getOne(inviteInfo.getStreamInfo().getMediaServerId()); + MediaServer mediaServerItemInuse = inviteInfo.getStreamInfo().getMediaServer(); String streamUrl; if (mediaServerItemInuse.getRtspPort() != 0) { streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", inviteInfo.getStreamInfo().getStream()); @@ -1635,7 +1635,55 @@ public class PlayServiceImpl implements IPlayService { channelService.stopPlay(device.getDeviceId(), channelId); channelService.stopPlay(device.getDeviceId(), channelId); if (inviteInfo.getStreamInfo() != null) { - mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream()); + mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStream()); } } + + @Override + public void play(CommonGBChannel channel, ErrorCallback callback) { + Device device = deviceService.getDevice(channel.getGbDeviceDbId()); + if (device == null) { + log.warn("[点播] 未找到通道{}的设备信息", channel); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + MediaServer mediaServer = getNewMediaServerItem(device); + if (mediaServer == null) { + log.warn("[点播] 未找到可用媒体节点"); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + play(mediaServer, device.getDeviceId(), channel.getGbDeviceId(), null, callback); + } + + @Override + public void playBack(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback callback) { + if (startTime == null || stopTime == null) { + throw new PlayException(Response.BAD_REQUEST, "bad request"); + } + // 国标通道 + Device device = deviceService.getDevice(channel.getGbDeviceDbId()); + if (device == null) { + log.warn("[点播] 未找到通道{}的设备信息", channel); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + String startTimeStr = DateUtil.timestampTo_yyyy_MM_dd_HH_mm_ss(startTime); + String stopTimeStr = DateUtil.timestampTo_yyyy_MM_dd_HH_mm_ss(stopTime); + playBack(device.getDeviceId(), channel.getGbDeviceId(), startTimeStr, stopTimeStr, callback); + } + + @Override + public void download(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, ErrorCallback callback) { + if (startTime == null || stopTime == null || downloadSpeed == null) { + throw new PlayException(Response.BAD_REQUEST, "bad request"); + } + // 国标通道 + Device device = deviceService.getDevice(channel.getGbDeviceDbId()); + if (device == null) { + log.warn("[点播] 未找到通道{}的设备信息", channel); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + String startTimeStr = DateUtil.timestampTo_yyyy_MM_dd_HH_mm_ss(startTime); + String stopTimeStr = DateUtil.timestampTo_yyyy_MM_dd_HH_mm_ss(stopTime); + download(device.getDeviceId(), channel.getGbDeviceId(), startTimeStr, stopTimeStr, downloadSpeed, callback); + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/RegionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/RegionServiceImpl.java index 9824beba..d052a6ba 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/RegionServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/RegionServiceImpl.java @@ -35,11 +35,9 @@ public class RegionServiceImpl implements IRegionService { @Autowired private RegionMapper regionMapper; - @Autowired private CommonGBChannelMapper commonGBChannelMapper; - @Autowired private IGbChannelService gbChannelService; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 5903cd15..a57bca1a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -222,7 +222,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In deviceChannelService.stopPlay(ssrcTransaction.getDeviceId(), channel.getDeviceId()); inviteStreamService.removeInviteInfo(inviteInfo); if (inviteInfo.getStreamInfo() != null) { - mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStreamInfo().getStream()); + mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getStream()); } } break; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index baa5a04c..c6e2bace 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.service.IDeviceService; +import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IPlayService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; @@ -34,7 +34,6 @@ import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; -import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.URIField; @@ -76,14 +75,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IVideoManagerStorage storager; - @Autowired - private IDeviceService deviceService; - @Autowired private IGbChannelService channelService; @Autowired - private IStreamPushService streamPushService; + private IGbChannelPlayService channelPlayService; @Autowired private IStreamProxyService streamProxyService; @@ -160,7 +156,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (platform == null) { inviteFromDeviceHandle(request, inviteInfo.getRequesterId(), inviteInfo.getChannelId()); } else { - // 查询平台下是否有该通道 CommonGBChannel channel= channelService.queryOneWithPlatform(platform.getId(), inviteInfo.getChannelId()); if (channel == null) { @@ -184,7 +179,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements log.error("[命令发送失败] 上级Invite TRYING: {}", e.getMessage()); } - channelService.start(channel, inviteInfo, ((code, msg, commonChannelPlayInfo) -> { + channelPlayService.start(channel, inviteInfo, platform, ((code, msg, streamInfo) -> { if (code != Response.OK) { try { responseAck(request, code, msg); @@ -194,9 +189,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements }else { // 点播成功, TODO 可以在此处检测cancel命令是否存在,存在则不发送 // 构建sendRTP内容 - SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(commonChannelPlayInfo.getMediaServer(), + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(streamInfo.getMediaServer(), inviteInfo.getIp(), inviteInfo.getPort(), inviteInfo.getSsrc(), platform.getServerGBId(), - commonChannelPlayInfo.getStreamInfo().getApp(), commonChannelPlayInfo.getStreamInfo().getStream(), + streamInfo.getApp(), streamInfo.getStream(), channel.getGbDeviceId(), inviteInfo.isTcp(), platform.isRtcp()); if (inviteInfo.isTcp() && inviteInfo.isTcpActive()) { sendRtpItem.setTcpActive(true); @@ -206,7 +201,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setPlayType("Play".equalsIgnoreCase(inviteInfo.getSessionName()) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); redisCatchStorage.updateSendRTPSever(sendRtpItem); - String sdpIp = commonChannelPlayInfo.getMediaServer().getSdpIp(); + String sdpIp = streamInfo.getMediaServer().getSdpIp(); if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { sdpIp = platform.getSendStreamIp(); } @@ -214,7 +209,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 超时未收到Ack应该回复bye,当前等待时间为10秒 dynamicTask.startDelay(inviteInfo.getCallId(), () -> { log.info("Ack 等待超时"); - mediaServerService.releaseSsrc(commonChannelPlayInfo.getMediaServer().getId(), sendRtpItem.getSsrc()); + mediaServerService.releaseSsrc(streamInfo.getMediaServer().getId(), sendRtpItem.getSsrc()); // 回复bye sendBye(platform, inviteInfo.getCallId()); }, 60 * 1000); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 39d65ad7..9a0fda0f 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -787,7 +787,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } streamInfoResult.setIp(addr); - streamInfoResult.setMediaServerId(mediaServer.getId()); + streamInfoResult.setMediaServer(mediaServer); String callIdParam = ObjectUtils.isEmpty(callId)?"":"?callId=" + callId; streamInfoResult.setRtmp(addr, mediaServer.getRtmpPort(),mediaServer.getRtmpSSlPort(), app, stream, callIdParam); streamInfoResult.setRtsp(addr, mediaServer.getRtspPort(),mediaServer.getRtspSSLPort(), app, stream, callIdParam); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 5e7f4563..d880fe75 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -197,11 +197,12 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { public StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServer, String app, String stream, MediaInfo mediaInfo, String callId, boolean isPlay) { StreamInfo streamInfoResult = new StreamInfo(); + streamInfoResult.setServerId(userSetting.getServerId()); streamInfoResult.setStream(stream); streamInfoResult.setApp(app); String addr = mediaServer.getStreamIp(); streamInfoResult.setIp(addr); - streamInfoResult.setMediaServerId(mediaServer.getId()); + streamInfoResult.setMediaServer(mediaServer); String callIdParam = ObjectUtils.isEmpty(callId)?"":"?callId=" + callId; streamInfoResult.setRtmp(addr, mediaServer.getRtmpPort(),mediaServer.getRtmpSSlPort(), app, stream, callIdParam); streamInfoResult.setRtsp(addr, mediaServer.getRtspPort(),mediaServer.getRtspSSLPort(), app, stream, callIdParam); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java index 5564b12b..8d4e10b3 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java @@ -67,7 +67,7 @@ public class RedisGpsMsgListener implements MessageListener { public void execute(){ List gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo(); if (gpsMsgInfo.size() > 0) { - storager.updateStreamGPS(gpsMsgInfo); +// storager.updateStreamGPS(gpsMsgInfo); for (GPSMsgInfo msgInfo : gpsMsgInfo) { msgInfo.setStored(true); redisCatchStorage.updateGpsMsgInfo(msgInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java index 8afbd590..8c7e87f9 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java @@ -1,8 +1,9 @@ package com.genersoft.iot.vmp.storager; -import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce; import java.util.List; @@ -76,13 +77,6 @@ public interface IVideoManagerStorage { */ public MobilePosition queryLatestPosition(String deviceId); - /** - * 根据国标ID获取平台关联的直播流 - * @param platformId - * @param channelId - * @return - */ - GbStream queryStreamInParentPlatform(String platformId, String channelId); /** * 根据通道ID获取其所在设备 @@ -91,16 +85,8 @@ public interface IVideoManagerStorage { */ Device queryVideoDeviceByChannelId(String channelId); - int delRelation(PlatformCatalog platformCatalog); - - int updateStreamGPS(List gpsMsgInfo); - List queryPlatFormListForGBWithGBId(String channelId, List platforms); - void delRelationByPlatformId(String serverGBId); - - List getChannelSource(String platformId, String gbId); - List queryEnablePlatformListWithAsMessageChannel(); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 013b0779..873708b7 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -1,21 +1,14 @@ package com.genersoft.iot.vmp.storager.impl; import com.baomidou.dynamic.datasource.annotation.DS; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce; import com.genersoft.iot.vmp.gb28181.dao.*; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Component; -import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; @@ -33,19 +26,6 @@ import java.util.List; public class VideoManagerStorageImpl implements IVideoManagerStorage { - @Autowired - EventPublisher eventPublisher; - - @Autowired - SipConfig sipConfig; - - - @Autowired - TransactionDefinition transactionDefinition; - - @Autowired - DataSourceTransactionManager dataSourceTransactionManager; - @Autowired private DeviceMapper deviceMapper; @@ -64,20 +44,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { @Autowired private PlatformChannelMapper platformChannelMapper; - @Autowired - private PlatformCatalogMapper platformCatalogMapper; - - @Autowired - private GbStreamMapper gbStreamMapper; - - @Autowired - private PlatformCatalogMapper catalogMapper; - - @Autowired - private PlatformGbStreamMapper platformGbStreamMapper; - - - /** * 查询移动位置轨迹 @@ -98,9 +64,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { } PlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); // .getDeviceGBId()); if (parentPlatform.getId() == null ) { - if (parentPlatform.getCatalogId() == null) { - parentPlatform.setCatalogId(parentPlatform.getServerGBId()); - } result = platformMapper.addParentPlatform(parentPlatform); if (parentPlatformCatch == null) { parentPlatformCatch = new PlatformCatch(); @@ -218,17 +181,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { return deviceMobilePositionMapper.queryLatestPositionByDevice(deviceId); } - /** - * 根据国标ID获取平台关联的直播流 - * @param platformId - * @param gbId - * @return - */ - @Override - public GbStream queryStreamInParentPlatform(String platformId, String gbId) { - return gbStreamMapper.queryStreamInPlatform(platformId, gbId); - } - @Override public Device queryVideoDeviceByChannelId( String channelId) { @@ -240,46 +192,8 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { return result; } - @Override - public int delRelation(PlatformCatalog platformCatalog) { - if (platformCatalog.getType() == 1) { - CommonGBChannel deviceChannel = new CommonGBChannel(); - deviceChannel.setGbDeviceId(platformCatalog.getId()); -// eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.DEL); - return platformChannelMapper.delByCatalogIdAndChannelIdAndPlatformId(platformCatalog); - }else if (platformCatalog.getType() == 2) { - List gbStreams = platformGbStreamMapper.queryChannelInParentPlatformAndCatalog(platformCatalog.getPlatformId(), platformCatalog.getParentId()); - for (GbStream gbStream : gbStreams) { - if (gbStream.getGbId().equals(platformCatalog.getId())) { - CommonGBChannel deviceChannel = new CommonGBChannel(); - deviceChannel.setGbDeviceId(gbStream.getGbId()); -// eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.DEL); - return platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream()); - } - } - } - return 0; - } - - @Override - public int updateStreamGPS(List gpsMsgInfos) { - return gbStreamMapper.updateStreamGPS(gpsMsgInfos); - } - @Override public List queryPlatFormListForGBWithGBId(String channelId, List platforms) { return platformChannelMapper.queryPlatFormListForGBWithGBId(channelId, platforms); } - - @Override - public void delRelationByPlatformId(String serverGBId) { - platformGbStreamMapper.delByPlatformId(serverGBId); - platformChannelMapper.delByPlatformId(serverGBId); - } - - - @Override - public List getChannelSource(String platformId, String gbId) { - return platformMapper.getChannelSource(platformId, gbId); - } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java index bbee224c..861dea30 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam; +import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; @@ -40,6 +41,9 @@ public class StreamProxyController { @Autowired private IStreamProxyService streamProxyService; + @Autowired + private IStreamProxyPlayService streamProxyPlayService; + @Operation(summary = "分页查询流代理", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "page", description = "当前页") @@ -181,7 +185,7 @@ public class StreamProxyController { @Parameter(name = "id", description = "代理Id", required = true) public StreamContent start(int id){ log.info("播放代理: {}", id); - StreamInfo streamInfo = streamProxyService.start(id); + StreamInfo streamInfo = streamProxyPlayService.start(id); if (streamInfo == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg()); }else { @@ -195,6 +199,6 @@ public class StreamProxyController { @Parameter(name = "id", description = "代理Id", required = true) public void stop(int id){ log.info("停用代理: {}", id); - streamProxyService.stop(id); + streamProxyPlayService.stop(id); } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java new file mode 100755 index 00000000..9f9ebf6f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java @@ -0,0 +1,15 @@ +package com.genersoft.iot.vmp.streamProxy.service; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; + +public interface IStreamProxyPlayService { + + StreamInfo start(int id); + + StreamInfo startProxy(StreamProxy streamProxy); + + void stop(int id); + + void stopProxy(StreamProxy streamProxy); +} diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java index f445cc16..90221862 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java @@ -92,8 +92,4 @@ public interface IStreamProxyService { StreamProxy getStreamProxy(int id); void delete(int id); - - StreamInfo start(int id); - - void stop(int id); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java new file mode 100755 index 00000000..915dade1 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java @@ -0,0 +1,100 @@ +package com.genersoft.iot.vmp.streamProxy.service.impl; + +import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; +import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper; +import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; + +/** + * 视频代理业务 + */ +@Slf4j +@Service +@DS("master") +public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { + + @Autowired + private StreamProxyMapper streamProxyMapper; + + @Autowired + private IMediaServerService mediaServerService; + + + + + + @Override + public StreamInfo start(int id) { + StreamProxy streamProxy = streamProxyMapper.select(id); + if (streamProxy == null) { + throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); + } + return startProxy(streamProxy); + } + + @Override + public StreamInfo startProxy(StreamProxy streamProxy){ + if (!streamProxy.isEnable()) { + return null; + } + MediaServer mediaServer; + String mediaServerId = streamProxy.getMediaServerId(); + if (mediaServerId == null) { + mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + }else { + mediaServer = mediaServerService.getOne(mediaServerId); + } + if (mediaServer == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点"); + } + StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy); + if (mediaServerId == null) { + streamProxy.setMediaServerId(mediaServer.getId()); + streamProxyMapper.update(streamProxy); + } + return streamInfo; + } + + @Override + public void stop(int id) { + StreamProxy streamProxy = streamProxyMapper.select(id); + if (streamProxy == null) { + throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); + } + stopProxy(streamProxy); + } + + @Override + public void stopProxy(StreamProxy streamProxy){ + + MediaServer mediaServer; + String mediaServerId = streamProxy.getMediaServerId(); + if (mediaServerId == null) { + mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + }else { + mediaServer = mediaServerService.getOne(mediaServerId); + } + if (mediaServer == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点"); + } + if (ObjectUtils.isEmpty(streamProxy.getStreamKey())) { + mediaServerService.closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream()); + }else { + mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey()); + } + streamProxy.setMediaServerId(mediaServer.getId()); + streamProxy.setStreamKey(null); + streamProxy.setPulling(false); + streamProxyMapper.update(streamProxy); + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 2c6384b2..eafd0fbf 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam; import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper; +import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -59,11 +60,14 @@ public class StreamProxyServiceImpl implements IStreamProxyService { private UserSetting userSetting; @Autowired - private IGbChannelService gbChannelService; + private IStreamProxyPlayService playService; @Autowired private IMediaServerService mediaServerService; + @Autowired + private IGbChannelService gbChannelService; + @Autowired DataSourceTransactionManager dataSourceTransactionManager; @@ -137,12 +141,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService { // 兼容旧接口 StreamProxy streamProxyInDb = getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyInDb != null && streamProxyInDb.getPulling()) { - stopProxy(streamProxyInDb); + playService.stopProxy(streamProxyInDb); } if (streamProxyInDb == null){ return add(param.buildStreamProxy()); }else { - stopProxy(streamProxyInDb); + playService.stopProxy(streamProxyInDb); streamProxyMapper.delete(streamProxyInDb.getId()); return add(param.buildStreamProxy()); } @@ -177,7 +181,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { private void delete(StreamProxy streamProxy) { Assert.notNull(streamProxy, "代理不可为NULL"); if (streamProxy.getPulling() != null && streamProxy.getPulling()) { - stopProxy(streamProxy); + playService.stopProxy(streamProxy); } if(streamProxy.getGbId() > 0) { gbChannelService.delete(streamProxy.getGbId()); @@ -219,8 +223,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService { || (streamProxyInDb.getMediaServerId() == null && streamProxy.getMediaServerId() != null) ) { // 变化则重启代理 - stopProxy(streamProxyInDb); - startProxy(streamProxy); + playService.stopProxy(streamProxyInDb); + playService.startProxy(streamProxy); } return true; } @@ -239,7 +243,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { if (streamProxy == null) { throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); } - StreamInfo streamInfo = startProxy(streamProxy); + StreamInfo streamInfo = playService.startProxy(streamProxy); return streamInfo != null; } @@ -249,31 +253,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService { if (streamProxy == null) { throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); } - stopProxy(streamProxy); + playService.stopProxy(streamProxy); } - private void stopProxy(StreamProxy streamProxy){ - MediaServer mediaServer; - String mediaServerId = streamProxy.getMediaServerId(); - if (mediaServerId == null) { - mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); - }else { - mediaServer = mediaServerService.getOne(mediaServerId); - } - if (mediaServer == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点"); - } - if (ObjectUtils.isEmpty(streamProxy.getStreamKey())) { - mediaServerService.closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream()); - }else { - mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey()); - } - streamProxy.setMediaServerId(mediaServer.getId()); - streamProxy.setStreamKey(null); - streamProxy.setPulling(false); - streamProxyMapper.update(streamProxy); - } @Override public Map getFFmpegCMDs(MediaServer mediaServer) { @@ -437,87 +420,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return new ResourceBaseInfo(total, online); } - - - @Override - public StreamInfo start(int id) { - StreamProxy streamProxy = streamProxyMapper.select(id); - if (streamProxy == null) { - throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); - } - return startProxy(streamProxy); - } - - private StreamInfo startProxy(StreamProxy streamProxy){ - if (!streamProxy.isEnable()) { - return null; - } - MediaServer mediaServer; - String mediaServerId = streamProxy.getMediaServerId(); - if (mediaServerId == null) { - mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); - }else { - mediaServer = mediaServerService.getOne(mediaServerId); - } - if (mediaServer == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点"); - } - StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy); - if (mediaServerId == null) { - streamProxy.setMediaServerId(mediaServer.getId()); - update(streamProxy); - } - return streamInfo; - } - @Override public StreamProxy getStreamProxy(int id) { return streamProxyMapper.select(id); } - @Override - public void stop(int id) { - StreamProxy streamProxy = streamProxyMapper.select(id); - if (streamProxy == null) { - throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); - } - stopProxy(streamProxy); - } - - // @Scheduled(cron = "* 0/10 * * * ?") -// public void asyncCheckStreamProxyStatus() { -// -// List all = mediaServerService.getAllOnline(); -// -// if (CollectionUtils.isEmpty(all)){ -// return; -// } -// -// Map serverItemMap = all.stream().collect(Collectors.toMap(MediaServer::getId, Function.identity(), (m1, m2) -> m1)); -// -// List list = streamProxyMapper.selectForEnable(true); -// -// if (CollectionUtils.isEmpty(list)){ -// return; -// } -// -// for (StreamProxy streamProxyItem : list) { -// -// MediaServer mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId()); -// -// MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream()); -// -// if (mediaInfo == null){ -// streamProxyItem.setStatus(false); -// } else { -// if (mediaInfo.getOnline() != null && mediaInfo.getOnline()) { -// streamProxyItem.setStatus(true); -// } else { -// streamProxyItem.setStatus(false); -// } -// } -// -// updateStreamProxy(streamProxyItem); -// } -// } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java b/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java index 95b85662..2c2da81e 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java @@ -85,10 +85,13 @@ public class StreamPush extends CommonGBChannel implements Comparable