diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java index 003478f9..d9f7e731 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java @@ -10,15 +10,13 @@ import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; -import com.genersoft.iot.vmp.gb28181.service.IDeviceService; -import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; -import com.genersoft.iot.vmp.gb28181.service.IPlayService; +import com.genersoft.iot.vmp.gb28181.service.*; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; @@ -64,6 +62,9 @@ public class PlayController { @Autowired private IPlayService playService; + @Autowired + private IGbChannelRpcPlayService playRpcService; + @Autowired private IMediaServerService mediaServerService; @@ -88,14 +89,10 @@ public class PlayController { Assert.notNull(channelId, "通道国标编号不可为NULL"); // 获取可用的zlm Device device = deviceService.getDeviceByDeviceId(deviceId); - Assert.notNull(deviceId, "设备不存在"); DeviceChannel channel = deviceChannelService.getOne(deviceId, channelId); Assert.notNull(channel, "通道不存在"); - - MediaServer newMediaServerItem = playService.getNewMediaServerItem(device); - RequestMessage requestMessage = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; requestMessage.setKey(key); @@ -118,7 +115,7 @@ public class PlayController { // 录像查询以channelId作为deviceId查询 resultHolder.put(key, uuid, result); - playService.play(newMediaServerItem, deviceId, channelId, null, (code, msg, streamInfo) -> { + ErrorCallback callback = (code, msg, streamInfo) -> { WVPResult wvpResult = new WVPResult<>(); if (code == InviteErrorCode.SUCCESS.getCode()) { wvpResult.setCode(ErrorCode.SUCCESS.getCode()); @@ -136,8 +133,8 @@ public class PlayController { } streamInfo.channgeStreamIp(host); } - if (!ObjectUtils.isEmpty(newMediaServerItem.getTranscodeSuffix()) && !"null".equalsIgnoreCase(newMediaServerItem.getTranscodeSuffix())) { - streamInfo.setStream(streamInfo.getStream() + "_" + newMediaServerItem.getTranscodeSuffix()); + if (!ObjectUtils.isEmpty(streamInfo.getMediaServer().getTranscodeSuffix()) && !"null".equalsIgnoreCase(streamInfo.getMediaServer().getTranscodeSuffix())) { + streamInfo.setStream(streamInfo.getStream() + "_" + streamInfo.getMediaServer().getTranscodeSuffix()); } wvpResult.setData(new StreamContent(streamInfo)); }else { @@ -151,7 +148,13 @@ public class PlayController { requestMessage.setData(wvpResult); // 此处必须释放所有请求 resultHolder.invokeAllResult(requestMessage); - }); + }; + // 判断设备是否属于当前平台, 如果不属于则发起自动调用 + if (userSetting.getServerId().equals(device.getServerId())) { + playRpcService.play(device.getServerId(), channel.getId(), callback); + }else { + playService.play(device, channel, callback); + } return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelRpcPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelRpcPlayService.java new file mode 100644 index 00000000..1847e430 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelRpcPlayService.java @@ -0,0 +1,8 @@ +package com.genersoft.iot.vmp.gb28181.service; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; + +public interface IGbChannelRpcPlayService { + void play(String serverId, Integer channelId, ErrorCallback callback); +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java index aa2ca3ab..4a1403a0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java @@ -25,6 +25,8 @@ public interface IPlayService { SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback); + void play(Device device, DeviceChannel channel, ErrorCallback callback); + StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, Device device, DeviceChannel channel); MediaServer getNewMediaServerItem(Device device); @@ -70,4 +72,5 @@ public interface IPlayService { void download(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, ErrorCallback callback); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayRpcService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayRpcService.java new file mode 100644 index 00000000..96eb3ced --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayRpcService.java @@ -0,0 +1,54 @@ +package com.genersoft.iot.vmp.gb28181.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.service.IGbChannelRpcPlayService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.sip.message.Response; + +@Slf4j +@Service("playRpcService") +public class PlayRpcService implements IGbChannelRpcPlayService { + + @Autowired + private RedisRpcConfig redisRpcConfig; + + @Autowired + private UserSetting userSetting; + + + private RedisRpcRequest buildRequest(String uri, Object param) { + RedisRpcRequest request = new RedisRpcRequest(); + request.setFromId(userSetting.getServerId()); + request.setParam(param); + request.setUri(uri); + return request; + } + + @Override + public void play(String serverId, Integer channelId, ErrorCallback callback) { + log.info("[点播其他WVP的设备] 通道Id:{}", channelId); + RedisRpcRequest request = buildRequest("playChannel", channelId); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout()); + if (response == null) { + callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); + }else { + if (response.getStatusCode() == Response.OK) { + StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class); + callback.run(response.getStatusCode(), "success", streamInfo); + }else { + callback.run(response.getStatusCode(), response.getBody().toString(), null); + } + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 2c9b777b..8b3d9f06 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -63,7 +63,7 @@ import java.util.Vector; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Slf4j -@Service +@Service("playService") public class PlayServiceImpl implements IPlayService { @Autowired @@ -285,6 +285,15 @@ public class PlayServiceImpl implements IPlayService { } } + @Override + public void play(Device device, DeviceChannel channel, ErrorCallback callback) { + MediaServer mediaServerItem = getNewMediaServerItem(device); + if (mediaServerItem == null) { + log.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", device.getDeviceId(), channel.getDeviceId()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); + } + play(mediaServerItem, device, channel, null, callback); + } @Override public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index b74df842..946178bd 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -8,7 +8,10 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; +import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; +import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -17,6 +20,7 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import lombok.extern.slf4j.Slf4j; @@ -24,6 +28,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import javax.sip.message.Response; + /** * 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用 */ @@ -49,6 +55,12 @@ public class RedisRpcController { @Autowired private RedisTemplate redisTemplate; + @Autowired + private IGbChannelService channelService; + + @Autowired + private IGbChannelPlayService channelPlayService; + /** * 获取发流的信息 @@ -255,7 +267,7 @@ public class RedisRpcController { String callId = request.getParam().toString(); SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + response.setStatusCode(Response.OK); if (sendRtpItem == null) { log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", callId); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); @@ -290,4 +302,37 @@ public class RedisRpcController { message.setResponse(response); redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message); } + + /** + * 点播国标设备 + */ + public RedisRpcResponse playChannel(RedisRpcRequest request) { + int channelId = Integer.parseInt(request.getParam().toString()); + RedisRpcResponse response = request.getResponse(); + + if (channelId <= 0) { + response.setStatusCode(Response.BAD_REQUEST); + response.setBody("param error"); + return response; + } + // 获取对应的设备和通道信息 + CommonGBChannel channel = channelService.getOne(channelId); + if (channel == null) { + response.setStatusCode(Response.BAD_REQUEST); + response.setBody("param error"); + return response; + } + + channelPlayService.play(channel, null, (code, msg, data) ->{ + if (code == InviteErrorCode.SUCCESS.getCode()) { + response.setStatusCode(Response.OK); + response.setBody(data); + }else { + response.setStatusCode(code); + } + // 手动发送结果 + sendResponse(response); + }); + return null; + } }