增加RPC调用停止实时流播放
parent
0dda8298bf
commit
2e254c6c8c
|
@ -17,6 +17,7 @@ import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.sip.message.Response;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -128,7 +129,7 @@ public class RedisRpcConfig implements MessageListener {
|
||||||
if (method == null) {
|
if (method == null) {
|
||||||
// 回复404结果
|
// 回复404结果
|
||||||
RedisRpcResponse response = request.getResponse();
|
RedisRpcResponse response = request.getResponse();
|
||||||
response.setStatusCode(404);
|
response.setStatusCode(Response.NOT_FOUND);
|
||||||
sendResponse(response);
|
sendResponse(response);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package com.genersoft.iot.vmp.gb28181.service;
|
package com.genersoft.iot.vmp.gb28181.service;
|
||||||
|
|
||||||
|
import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
|
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.InviteInfo;
|
import com.genersoft.iot.vmp.gb28181.bean.InviteInfo;
|
||||||
|
@ -10,11 +11,20 @@ public interface IGbChannelPlayService {
|
||||||
|
|
||||||
void start(CommonGBChannel channel, InviteInfo inviteInfo, Platform platform, ErrorCallback<StreamInfo> callback);
|
void start(CommonGBChannel channel, InviteInfo inviteInfo, Platform platform, ErrorCallback<StreamInfo> callback);
|
||||||
|
|
||||||
void play(CommonGBChannel channel, Platform platform, ErrorCallback<StreamInfo> callback);
|
void stopPlay(InviteSessionType type, CommonGBChannel channel, String stream);
|
||||||
|
|
||||||
|
void play(CommonGBChannel channel, Platform platform, ErrorCallback<StreamInfo> callback);
|
||||||
|
|
||||||
void playGbDeviceChannel(CommonGBChannel channel, ErrorCallback<StreamInfo> callback);
|
void playGbDeviceChannel(CommonGBChannel channel, ErrorCallback<StreamInfo> callback);
|
||||||
|
|
||||||
|
void stopPlayDeviceChannel(CommonGBChannel channel, String stream);
|
||||||
|
|
||||||
void playProxy(CommonGBChannel channel, ErrorCallback<StreamInfo> callback);
|
void playProxy(CommonGBChannel channel, ErrorCallback<StreamInfo> callback);
|
||||||
|
|
||||||
|
void stopPlayProxy(CommonGBChannel channel);
|
||||||
|
|
||||||
void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback<StreamInfo> callback);
|
void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback<StreamInfo> callback);
|
||||||
|
|
||||||
|
|
||||||
|
void stopPlayPush(CommonGBChannel channel);
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,9 +68,12 @@ public interface IPlayService {
|
||||||
|
|
||||||
void play(CommonGBChannel channel, ErrorCallback<StreamInfo> callback);
|
void play(CommonGBChannel channel, ErrorCallback<StreamInfo> callback);
|
||||||
|
|
||||||
|
void stop(InviteSessionType inviteSessionType, CommonGBChannel channel, String stream);
|
||||||
|
|
||||||
void playBack(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback<StreamInfo> callback);
|
void playBack(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback<StreamInfo> callback);
|
||||||
|
|
||||||
void download(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, ErrorCallback<StreamInfo> callback);
|
void download(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, ErrorCallback<StreamInfo> callback);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package com.genersoft.iot.vmp.gb28181.service.impl;
|
package com.genersoft.iot.vmp.gb28181.service.impl;
|
||||||
|
|
||||||
|
import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
|
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.InviteInfo;
|
import com.genersoft.iot.vmp.gb28181.bean.InviteInfo;
|
||||||
|
@ -88,6 +89,24 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopPlay(InviteSessionType type, CommonGBChannel channel, String stream) {
|
||||||
|
if (channel.getGbDeviceDbId() != null) {
|
||||||
|
// 国标通道
|
||||||
|
stopPlayDeviceChannel(channel, stream);
|
||||||
|
} else if (channel.getStreamProxyId() != null) {
|
||||||
|
// 拉流代理
|
||||||
|
stopPlayProxy(channel);
|
||||||
|
} else if (channel.getStreamPushId() != null) {
|
||||||
|
// 推流
|
||||||
|
stopPlayPush(channel);
|
||||||
|
} else {
|
||||||
|
// 通道数据异常
|
||||||
|
log.error("[点播通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId());
|
||||||
|
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void play(CommonGBChannel channel, Platform platform, ErrorCallback<StreamInfo> callback) {
|
public void play(CommonGBChannel channel, Platform platform, ErrorCallback<StreamInfo> callback) {
|
||||||
if (channel.getGbDeviceDbId() != null) {
|
if (channel.getGbDeviceDbId() != null) {
|
||||||
|
@ -124,6 +143,18 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopPlayDeviceChannel(CommonGBChannel channel, String stream) {
|
||||||
|
// 国标通道
|
||||||
|
try {
|
||||||
|
deviceChannelPlayService.stop(InviteSessionType.PLAY, channel, stream);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[停止点播失败] {}({})", channel.getGbName(), channel.getGbDeviceId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void playProxy(CommonGBChannel channel, ErrorCallback<StreamInfo> callback){
|
public void playProxy(CommonGBChannel channel, ErrorCallback<StreamInfo> callback){
|
||||||
// 拉流代理通道
|
// 拉流代理通道
|
||||||
|
@ -139,6 +170,16 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopPlayProxy(CommonGBChannel channel) {
|
||||||
|
// 拉流代理通道
|
||||||
|
try {
|
||||||
|
streamProxyPlayService.stop(channel.getStreamProxyId());
|
||||||
|
}catch (Exception e) {
|
||||||
|
log.error("[停止点播失败] {}({})", channel.getGbName(), channel.getGbDeviceId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback<StreamInfo> callback){
|
public void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback<StreamInfo> callback){
|
||||||
// 推流
|
// 推流
|
||||||
|
@ -152,6 +193,16 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopPlayPush(CommonGBChannel channel) {
|
||||||
|
// 推流
|
||||||
|
try {
|
||||||
|
streamPushPlayService.stop(channel.getStreamPushId());
|
||||||
|
}catch (Exception e) {
|
||||||
|
log.error("[停止点播失败] {}({})", channel.getGbName(), channel.getGbDeviceId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void playbackGbDeviceChannel(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback<StreamInfo> callback){
|
private void playbackGbDeviceChannel(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback<StreamInfo> callback){
|
||||||
try {
|
try {
|
||||||
deviceChannelPlayService.playBack(channel, startTime, stopTime, callback);
|
deviceChannelPlayService.playBack(channel, startTime, stopTime, callback);
|
||||||
|
|
|
@ -1606,30 +1606,34 @@ public class PlayServiceImpl implements IPlayService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop(InviteSessionType type, Device device, DeviceChannel channel, String stream) {
|
public void stop(InviteSessionType type, Device device, DeviceChannel channel, String stream) {
|
||||||
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(type, channel.getId(), stream);
|
if (!userSetting.getServerId().equals(device.getServerId())) {
|
||||||
if (inviteInfo == null) {
|
redisRpcPlayService.stop(device.getServerId(), type, channel.getId(), stream);
|
||||||
if (type == InviteSessionType.PLAY) {
|
}else {
|
||||||
|
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(type, channel.getId(), stream);
|
||||||
|
if (inviteInfo == null) {
|
||||||
|
if (type == InviteSessionType.PLAY) {
|
||||||
|
deviceChannelService.stopPlay(channel.getId());
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
inviteStreamService.removeInviteInfo(inviteInfo);
|
||||||
|
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
|
||||||
|
try {
|
||||||
|
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
|
||||||
|
cmder.streamByeCmd(device, channel.getDeviceId(), inviteInfo.getStream(), null, null);
|
||||||
|
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
|
||||||
|
log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage());
|
||||||
|
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inviteInfo.getType() == InviteSessionType.PLAY) {
|
||||||
deviceChannelService.stopPlay(channel.getId());
|
deviceChannelService.stopPlay(channel.getId());
|
||||||
}
|
}
|
||||||
return;
|
if (inviteInfo.getStreamInfo() != null) {
|
||||||
}
|
receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getSsrcInfo());
|
||||||
inviteStreamService.removeInviteInfo(inviteInfo);
|
|
||||||
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
|
|
||||||
try {
|
|
||||||
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
|
|
||||||
cmder.streamByeCmd(device, channel.getDeviceId(), inviteInfo.getStream(), null, null);
|
|
||||||
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
|
|
||||||
log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage());
|
|
||||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (inviteInfo.getType() == InviteSessionType.PLAY) {
|
|
||||||
deviceChannelService.stopPlay(channel.getId());
|
|
||||||
}
|
|
||||||
if (inviteInfo.getStreamInfo() != null) {
|
|
||||||
receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getSsrcInfo());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1674,6 +1678,17 @@ public class PlayServiceImpl implements IPlayService {
|
||||||
play(device, deviceChannel, callback);
|
play(device, deviceChannel, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(InviteSessionType inviteSessionType, CommonGBChannel channel, String stream) {
|
||||||
|
Device device = deviceService.getDevice(channel.getGbDeviceDbId());
|
||||||
|
if (device == null) {
|
||||||
|
log.warn("[停止播放] 未找到通道{}的设备信息", channel);
|
||||||
|
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
|
||||||
|
}
|
||||||
|
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(channel.getGbId());
|
||||||
|
stop(InviteSessionType.PLAY, device, deviceChannel, stream);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void playBack(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback<StreamInfo> callback) {
|
public void playBack(CommonGBChannel channel, Long startTime, Long stopTime, ErrorCallback<StreamInfo> callback) {
|
||||||
if (startTime == null || stopTime == null) {
|
if (startTime == null || stopTime == null) {
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package com.genersoft.iot.vmp.service.redisMsg;
|
package com.genersoft.iot.vmp.service.redisMsg;
|
||||||
|
|
||||||
|
import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||||
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||||
|
|
||||||
|
@ -7,4 +8,6 @@ public interface IRedisRpcPlayService {
|
||||||
|
|
||||||
|
|
||||||
void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback);
|
void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback);
|
||||||
|
|
||||||
|
void stop(String serverId, InviteSessionType type, int channelId, String stream);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package com.genersoft.iot.vmp.service.redisMsg.control;
|
package com.genersoft.iot.vmp.service.redisMsg.control;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
|
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
|
||||||
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
|
||||||
|
@ -79,4 +81,42 @@ public class RedisRpcChannelPlayController extends RpcController {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 停止点播国标设备
|
||||||
|
*/
|
||||||
|
@RedisRpcMapping("stop")
|
||||||
|
public RedisRpcResponse stop(RedisRpcRequest request) {
|
||||||
|
System.out.println(request.getParam().toString());
|
||||||
|
JSONObject jsonObject = JSONObject.parseObject(request.getParam().toString());
|
||||||
|
|
||||||
|
RedisRpcResponse response = request.getResponse();
|
||||||
|
|
||||||
|
Integer channelId = jsonObject.getIntValue("channelId");
|
||||||
|
if (channelId == null || channelId <= 0) {
|
||||||
|
response.setStatusCode(Response.BAD_REQUEST);
|
||||||
|
response.setBody("param error");
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
String stream = jsonObject.getString("stream");
|
||||||
|
InviteSessionType type = jsonObject.getObject("inviteSessionType", InviteSessionType.class);
|
||||||
|
|
||||||
|
// 获取对应的设备和通道信息
|
||||||
|
CommonGBChannel channel = channelService.getOne(channelId);
|
||||||
|
if (channel == null) {
|
||||||
|
response.setStatusCode(Response.BAD_REQUEST);
|
||||||
|
response.setBody("param error");
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
channelPlayService.stopPlay(type, channel, stream);
|
||||||
|
response.setStatusCode(Response.OK);
|
||||||
|
}catch (Exception e){
|
||||||
|
response.setStatusCode(Response.SERVER_INTERNAL_ERROR);
|
||||||
|
response.setBody(e.getMessage());
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
package com.genersoft.iot.vmp.service.redisMsg.service;
|
package com.genersoft.iot.vmp.service.redisMsg.service;
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSON;
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
|
import com.genersoft.iot.vmp.conf.exception.ControllerException;
|
||||||
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
|
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
|
||||||
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
|
||||||
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
|
||||||
|
@ -51,5 +54,23 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(String serverId, InviteSessionType type, int channelId, String stream) {
|
||||||
|
JSONObject jsonObject = new JSONObject();
|
||||||
|
jsonObject.put("channelId", channelId);
|
||||||
|
jsonObject.put("stream", stream);
|
||||||
|
jsonObject.put("inviteSessionType", type);
|
||||||
|
RedisRpcRequest request = buildRequest("channel/stop", jsonObject.toJSONString());
|
||||||
|
request.setToId(serverId);
|
||||||
|
RedisRpcResponse response = redisRpcConfig.request(request, 50);
|
||||||
|
if (response == null) {
|
||||||
|
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
|
||||||
|
}else {
|
||||||
|
if (response.getStatusCode() != Response.OK) {
|
||||||
|
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,4 +5,6 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||||
|
|
||||||
public interface IStreamPushPlayService {
|
public interface IStreamPushPlayService {
|
||||||
void start(Integer id, ErrorCallback<StreamInfo> callback, String platformDeviceId, String platformName );
|
void start(Integer id, ErrorCallback<StreamInfo> callback, String platformDeviceId, String platformName );
|
||||||
|
|
||||||
|
void stop(Integer streamPushId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,4 +98,9 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(Integer streamPushId) {
|
||||||
|
// 推流无需主动停止
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue