增加RPC录像列表查询

dev/数据库统合
648540858 2024-12-13 14:35:44 +08:00
parent 5a63e7f958
commit a9099e0412
25 changed files with 409 additions and 204 deletions

View File

@ -37,6 +37,11 @@ public class UserSetting {
*/
private Integer playTimeout = 10000;
/**
* ,
*/
private Integer recordInfoTimeout = 15000;
/**
* ,
*/

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -129,7 +130,7 @@ public class RedisRpcConfig implements MessageListener {
if (method == null) {
// 回复404结果
RedisRpcResponse response = request.getResponse();
response.setStatusCode(Response.NOT_FOUND);
response.setStatusCode(ErrorCode.ERROR404.getCode());
sendResponse(response);
return;
}
@ -185,7 +186,7 @@ public class RedisRpcConfig implements MessageListener {
} catch (InterruptedException e) {
log.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
RedisRpcResponse redisRpcResponse = new RedisRpcResponse();
redisRpcResponse.setStatusCode(Response.BUSY_HERE);
redisRpcResponse.setStatusCode(ErrorCode.ERROR486.getCode());
return redisRpcResponse;
} finally {
this.unsubscribe(request.getSn());

View File

@ -36,6 +36,7 @@ import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Tag(name = "国标录像")
@Slf4j
@ -72,7 +73,7 @@ public class GBRecordController {
if (log.isDebugEnabled()) {
log.debug(String.format("录像信息查询 API调用deviceId%s startTime%s endTime%s",deviceId, startTime, endTime));
}
DeferredResult<WVPResult<RecordInfo>> result = new DeferredResult<>();
DeferredResult<WVPResult<RecordInfo>> result = new DeferredResult<>(Long.valueOf(userSetting.getRecordInfoTimeout()), TimeUnit.MILLISECONDS);
if (!DateUtil.verification(startTime, DateUtil.formatter)){
throw new ControllerException(ErrorCode.ERROR100.getCode(), "startTime格式为" + DateUtil.PATTERN);
}
@ -81,35 +82,24 @@ public class GBRecordController {
}
Device device = deviceService.getDeviceByDeviceId(deviceId);
// 指定超时时间 1分钟30秒
String uuid = UUID.randomUUID().toString();
int sn = (int)((Math.random()*9+1)*100000);
String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn;
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
try {
cmder.recordInfoQuery(device, channelId, startTime, endTime, sn, null, null, null, (eventResult -> {
WVPResult<RecordInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("查询录像失败, status: " + eventResult.statusCode + ", message: " + eventResult.msg);
msg.setData(wvpResult);
resultHolder.invokeResult(msg);
}));
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 查询录像: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), deviceId + " 不存在");
}
// 录像查询以channelId作为deviceId查询
resultHolder.put(key, uuid, result);
DeviceChannel channel = channelService.getOneForSource(device.getId(), channelId);
if (channel == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), channelId + " 不存在");
}
channelService.queryRecordInfo(device, channel, startTime, endTime, (code, msg, data)->{
WVPResult<RecordInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(code);
wvpResult.setMsg(msg);
result.setResult(wvpResult);
});
result.onTimeout(()->{
msg.setData("timeout");
WVPResult<RecordInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("timeout");
msg.setData(wvpResult);
resultHolder.invokeResult(msg);
result.setResult(wvpResult);
});
return result;
}

View File

@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent;
import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent;
import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -101,9 +101,5 @@ public class EventPublisher {
applicationEventPublisher.publishEvent(event);
}
public void recordEndEventPush(RecordInfo recordInfo) {
RecordEndEvent outEvent = new RecordEndEvent(this);
outEvent.setRecordInfo(recordInfo);
applicationEventPublisher.publishEvent(outEvent);
}
}

View File

@ -1,7 +1,8 @@
package com.genersoft.iot.vmp.gb28181.event.record;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
/**
@ -9,24 +10,18 @@ import org.springframework.context.ApplicationEvent;
* @author: pan
* @data: 2022-02-23
*/
public class RecordEndEvent extends ApplicationEvent {
@Setter
@Getter
public class RecordInfoEndEvent extends ApplicationEvent {
/**
*
*/
private static final long serialVersionUID = 1L;
public RecordEndEvent(Object source) {
public RecordInfoEndEvent(Object source) {
super(source);
}
private RecordInfo recordInfo;
public RecordInfo getRecordInfo() {
return recordInfo;
}
public void setRecordInfo(RecordInfo recordInfo) {
this.recordInfo = recordInfo;
}
}

View File

@ -0,0 +1,27 @@
package com.genersoft.iot.vmp.gb28181.event.record;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
/**
* @description:
* @author: pan
* @data: 2022-02-23
*/
@Setter
@Getter
public class RecordInfoEvent extends ApplicationEvent {
/**
*
*/
private static final long serialVersionUID = 1L;
public RecordInfoEvent(Object source) {
super(source);
}
private RecordInfo recordInfo;
}

View File

@ -15,15 +15,15 @@ import java.util.concurrent.ConcurrentHashMap;
*/
@Slf4j
@Component
public class RecordEndEventListener implements ApplicationListener<RecordEndEvent> {
public class RecordInfoEventListener implements ApplicationListener<RecordInfoEvent> {
private Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>();
private final Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>();
public interface RecordEndEventHandler{
void handler(RecordInfo recordInfo);
}
@Override
public void onApplicationEvent(RecordEndEvent event) {
public void onApplicationEvent(RecordInfoEvent event) {
String deviceId = event.getRecordInfo().getDeviceId();
String channelId = event.getRecordInfo().getChannelId();
int count = event.getRecordInfo().getCount();
@ -45,9 +45,6 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven
/**
*
* @param device
* @param channelId
* @param recordEndEventHandler
*/
public void addEndEventHandler(String device, String channelId, RecordEndEventHandler recordEndEventHandler) {
log.info("录像查询事件添加监听deviceId{}, channelId: {}", device, channelId);
@ -55,8 +52,6 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven
}
/**
*
* @param device
* @param channelId
*/
public void delEndEventHandler(String device, String channelId) {
log.info("录像查询事件移除监听deviceId{}, channelId: {}", device, channelId);

View File

@ -1,9 +1,8 @@
package com.genersoft.iot.vmp.gb28181.service;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
import com.github.pagehelper.PageInfo;
@ -125,4 +124,8 @@ public interface IDeviceChannelService {
List<DeviceChannel> queryChaneListByDeviceDbId(Integer deviceDbId);
List<Integer> queryChaneIdListByDeviceDbIds(List<Integer> deviceDbId);
void queryRecordInfo(Device device, DeviceChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> object);
void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> object);
}

View File

@ -1,9 +1,8 @@
package com.genersoft.iot.vmp.gb28181.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.github.pagehelper.PageInfo;

View File

@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
public interface IGbChannelPlayService {
@ -25,6 +26,5 @@ public interface IGbChannelPlayService {
void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback<StreamInfo> callback);
void stopPlayPush(CommonGBChannel channel);
}

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.service;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.github.pagehelper.PageInfo;
@ -87,4 +88,5 @@ public interface IGbChannelService {
PageInfo<CommonGBChannel> queryList(int page, int count, String query, Boolean online, Boolean hasRecordPlan, Integer channelType);
void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> callback);
}

View File

@ -5,37 +5,48 @@ import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbCode;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* @author lin
@ -71,6 +82,26 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Autowired
private IPlatformChannelService platformChannelService;
@Autowired
private IRedisRpcPlayService redisRpcPlayService;
@Autowired
private ISIPCommander commander;
// 记录录像查询的结果等待
private final Map<String, SynchronousQueue<RecordInfo>> topicSubscribers = new ConcurrentHashMap<>();
/**
*
*/
@Async("taskExecutor")
@org.springframework.context.event.EventListener
public void onApplicationEvent(RecordInfoEndEvent event) {
SynchronousQueue<RecordInfo> queue = topicSubscribers.get("record" + event.getRecordInfo().getSn());
if (queue != null) {
queue.offer(event.getRecordInfo());
}
}
@Override
public int updateChannels(Device device, List<DeviceChannel> channels) {
@ -164,10 +195,8 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Override
public ResourceBaseInfo getOverview() {
int online = channelMapper.getOnlineCount();
int total = channelMapper.getAllChannelCount();
return new ResourceBaseInfo(total, online);
}
@ -705,4 +734,49 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
public void updateChannelForNotify(DeviceChannel channel) {
channelMapper.updateChannelForNotify(channel);
}
@Override
public void queryRecordInfo(Device device, DeviceChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> callback) {
if (!userSetting.getServerId().equals(device.getServerId())){
redisRpcPlayService.queryRecordInfo(device.getServerId(), channel.getId(), startTime, endTime, callback);
return;
}
try {
int sn = (int)((Math.random()*9+1)*100000);
commander.recordInfoQuery(device, channel.getDeviceId(), startTime, endTime, sn, null, null, eventResult -> {
try {
// 消息发送成功, 监听等待数据到来
SynchronousQueue<RecordInfo> queue = new SynchronousQueue<>();
topicSubscribers.put("record" + sn, queue);
RecordInfo recordInfo = queue.poll(userSetting.getRecordInfoTimeout(), TimeUnit.MILLISECONDS);
if (recordInfo == null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), recordInfo);
}
} catch (InterruptedException e) {
callback.run(ErrorCode.ERROR100.getCode(), e.getMessage(), null);
} finally {
this.topicSubscribers.remove("record" + sn);
}
}, (eventResult -> {
callback.run(ErrorCode.ERROR100.getCode(), "查询录像失败, status: " + eventResult.statusCode + ", message: " + eventResult.msg, null);
}));
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 查询录像: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
}
@Override
public void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> callback) {
Device device = deviceMapper.query(channel.getGbDeviceDbId());
if (device == null) {
log.warn("[点播] 未找到通道{}的设备信息", channel);
callback.run(ErrorCode.ERROR100.getCode(), "设备不存在", null);
return;
}
DeviceChannel deviceChannel = getOneForSourceById(channel.getGbId());
queryRecordInfo(device, deviceChannel, startTime, endTime, callback);
}
}

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
@ -23,10 +24,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respons
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
@ -36,9 +40,13 @@ import org.springframework.transaction.annotation.Transactional;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
@ -69,9 +77,6 @@ public class DeviceServiceImpl implements IDeviceService {
@Autowired
private PlatformChannelMapper platformChannelMapper;
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired
private DeviceChannelMapper deviceChannelMapper;

View File

@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.PlayException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
@ -223,4 +220,6 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
callback.run(Response.BUSY_HERE, "busy here", null);
}
}
}

View File

@ -8,11 +8,15 @@ import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.RegionMapper;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
@ -22,6 +26,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import javax.sip.message.Response;
import java.util.*;
@Slf4j
@ -46,6 +51,9 @@ public class GbChannelServiceImpl implements IGbChannelService {
@Autowired
private GroupMapper groupMapper;
@Autowired
private IDeviceChannelService deviceChannelService;
@Override
public CommonGBChannel queryByDeviceId(String gbDeviceId) {
return commonGBChannelMapper.queryByDeviceId(gbDeviceId);
@ -726,4 +734,24 @@ public class GbChannelServiceImpl implements IGbChannelService {
List<CommonGBChannel> all = commonGBChannelMapper.queryList(query, online, hasRecordPlan, channelType);
return new PageInfo<>(all);
}
@Override
public void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> callback) {
if (channel.getGbDeviceDbId() != null) {
deviceChannelService.queryRecordInfo(channel, startTime, endTime, callback);
} else if (channel.getStreamProxyId() != null) {
// 拉流代理
log.warn("[下载通用通道录像] 不支持下载拉流代理的录像: {}({})", channel.getGbName(), channel.getGbDeviceId());
throw new PlayException(Response.FORBIDDEN, "forbidden");
} else if (channel.getStreamPushId() != null) {
// 推流
log.warn("[下载通用通道录像] 不支持下载推流的录像: {}({})", channel.getGbName(), channel.getGbDeviceId());
throw new PlayException(Response.FORBIDDEN, "forbidden");
} else {
// 通道数据异常
log.error("[回放通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId());
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
}
}
}

View File

@ -1,10 +1,9 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEventListener;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEventListener;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -24,8 +23,9 @@ public class RecordDataCatch {
@Autowired
private DeferredResultHolder deferredResultHolder;
@Autowired
private RecordEndEventListener recordEndEventListener;
private RecordInfoEventListener recordEndEventListener;
public int put(String deviceId,String channelId, String sn, int sumNum, List<RecordItem> recordItems) {

View File

@ -4,7 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEventListener;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEventListener;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
@ -52,7 +52,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp
private SIPCommander commander;
@Autowired
private RecordEndEventListener recordEndEventListener;
private RecordInfoEventListener recordInfoEventListener;
@Override
public void afterPropertiesSet() throws Exception {
@ -126,7 +126,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp
// 获取通道的原始信息
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(channel.getGbId());
// 接收录像数据
recordEndEventListener.addEndEventHandler(device.getDeviceId(), deviceChannel.getDeviceId(), (recordInfo)->{
recordInfoEventListener.addEndEventHandler(device.getDeviceId(), deviceChannel.getDeviceId(), (recordInfo)->{
try {
log.info("[国标级联] 录像查询收到数据, 通道: {},准备转发===", channelId);
cmderFroPlatform.recordInfo(channel, platform, request.getFromTag(), recordInfo);

View File

@ -6,6 +6,8 @@ import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEvent;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@ -19,6 +21,7 @@ import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
@ -48,14 +51,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
private ResponseMessageHandler responseMessageHandler;
@Autowired
private DeferredResultHolder deferredResultHolder;
@Autowired
private EventPublisher eventPublisher;
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@ -75,88 +71,89 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
}catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage());
}
taskExecutor.execute(()->{
try {
String sn = getText(rootElement, "SN");
String channelId = getText(rootElement, "DeviceID");
RecordInfo recordInfo = new RecordInfo();
recordInfo.setChannelId(channelId);
recordInfo.setDeviceId(device.getDeviceId());
recordInfo.setSn(sn);
recordInfo.setName(getText(rootElement, "Name"));
String sumNumStr = getText(rootElement, "SumNum");
int sumNum = 0;
if (!ObjectUtils.isEmpty(sumNumStr)) {
sumNum = Integer.parseInt(sumNumStr);
}
recordInfo.setSumNum(sumNum);
Element recordListElement = rootElement.element("RecordList");
if (recordListElement == null || sumNum == 0) {
log.info("无录像数据");
recordInfo.setCount(sumNum);
eventPublisher.recordEndEventPush(recordInfo);
releaseRequest(device.getDeviceId(), sn,recordInfo);
} else {
Iterator<Element> recordListIterator = recordListElement.elementIterator();
if (recordListIterator != null) {
List<RecordItem> recordList = new ArrayList<>();
// 遍历DeviceList
while (recordListIterator.hasNext()) {
Element itemRecord = recordListIterator.next();
Element recordElement = itemRecord.element("DeviceID");
if (recordElement == null) {
log.info("记录为空,下一个...");
continue;
}
RecordItem record = new RecordItem();
record.setDeviceId(getText(itemRecord, "DeviceID"));
record.setName(getText(itemRecord, "Name"));
record.setFilePath(getText(itemRecord, "FilePath"));
record.setFileSize(getText(itemRecord, "FileSize"));
record.setAddress(getText(itemRecord, "Address"));
String startTimeStr = getText(itemRecord, "StartTime");
record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
String endTimeStr = getText(itemRecord, "EndTime");
record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
: Integer.parseInt(getText(itemRecord, "Secrecy")));
record.setType(getText(itemRecord, "Type"));
record.setRecorderId(getText(itemRecord, "RecorderID"));
recordList.add(record);
}
Map<String, String> map = recordList.stream()
.filter(record -> record.getDeviceId() != null)
.collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
// 获取任务结果数据
String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn;
redisTemplate.opsForHash().putAll(resKey, map);
redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS);
String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn;
long incr = redisTemplate.opsForValue().increment(resCountKey, map.size());
redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS);
recordInfo.setRecordList(recordList);
recordInfo.setCount(Math.toIntExact(incr));
eventPublisher.recordEndEventPush(recordInfo);
if (incr < sumNum) {
return;
}
// 已接收完成
List<RecordItem> resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList());
if (resList.size() < sumNum) {
return;
}
recordInfo.setRecordList(resList);
releaseRequest(device.getDeviceId(), sn,recordInfo);
}
}
} catch (Exception e) {
log.error("[国标录像] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[国标录像] 异常内容: ", e);
try {
String sn = getText(rootElement, "SN");
String channelId = getText(rootElement, "DeviceID");
RecordInfo recordInfo = new RecordInfo();
recordInfo.setChannelId(channelId);
recordInfo.setDeviceId(device.getDeviceId());
recordInfo.setSn(sn);
recordInfo.setName(getText(rootElement, "Name"));
String sumNumStr = getText(rootElement, "SumNum");
int sumNum = 0;
if (!ObjectUtils.isEmpty(sumNumStr)) {
sumNum = Integer.parseInt(sumNumStr);
}
});
recordInfo.setSumNum(sumNum);
Element recordListElement = rootElement.element("RecordList");
if (recordListElement == null || sumNum == 0) {
log.info("无录像数据");
recordInfo.setCount(sumNum);
recordInfoEventPush(recordInfo);
recordInfoEndEventPush(recordInfo);
} else {
Iterator<Element> recordListIterator = recordListElement.elementIterator();
if (recordListIterator != null) {
List<RecordItem> recordList = new ArrayList<>();
// 遍历DeviceList
while (recordListIterator.hasNext()) {
Element itemRecord = recordListIterator.next();
Element recordElement = itemRecord.element("DeviceID");
if (recordElement == null) {
log.info("记录为空,下一个...");
continue;
}
RecordItem record = new RecordItem();
record.setDeviceId(getText(itemRecord, "DeviceID"));
record.setName(getText(itemRecord, "Name"));
record.setFilePath(getText(itemRecord, "FilePath"));
record.setFileSize(getText(itemRecord, "FileSize"));
record.setAddress(getText(itemRecord, "Address"));
String startTimeStr = getText(itemRecord, "StartTime");
record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
String endTimeStr = getText(itemRecord, "EndTime");
record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
: Integer.parseInt(getText(itemRecord, "Secrecy")));
record.setType(getText(itemRecord, "Type"));
record.setRecorderId(getText(itemRecord, "RecorderID"));
recordList.add(record);
}
Map<String, String> map = recordList.stream()
.filter(record -> record.getDeviceId() != null)
.collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
// 获取任务结果数据
String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn;
redisTemplate.opsForHash().putAll(resKey, map);
redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS);
String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn;
Long incr = redisTemplate.opsForValue().increment(resCountKey, map.size());
if (incr == null) {
incr = 0L;
}
redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS);
recordInfo.setRecordList(recordList);
recordInfo.setCount(Math.toIntExact(incr));
recordInfoEventPush(recordInfo);
if (incr < sumNum) {
return;
}
// 已接收完成
List<RecordItem> resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList());
if (resList.size() < sumNum) {
return;
}
recordInfo.setRecordList(resList);
recordInfoEndEventPush(recordInfo);
}
}
} catch (Exception e) {
log.error("[国标录像] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[国标录像] 异常内容: ", e);
}
}
@Override
@ -164,18 +161,31 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
}
public void releaseRequest(String deviceId, String sn,RecordInfo recordInfo){
String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn;
// 对数据进行排序
if(recordInfo!=null && recordInfo.getRecordList()!=null) {
private void recordInfoEventPush(RecordInfo recordInfo) {
if (recordInfo == null) {
return;
}
if(recordInfo.getRecordList() != null) {
Collections.sort(recordInfo.getRecordList());
}else{
recordInfo.setRecordList(new ArrayList<>());
}
RecordInfoEvent outEvent = new RecordInfoEvent(this);
outEvent.setRecordInfo(recordInfo);
applicationEventPublisher.publishEvent(outEvent);
}
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(recordInfo);
deferredResultHolder.invokeAllResult(msg);
private void recordInfoEndEventPush(RecordInfo recordInfo) {
if (recordInfo == null) {
return;
}
if(recordInfo.getRecordList() != null) {
Collections.sort(recordInfo.getRecordList());
}else{
recordInfo.setRecordList(new ArrayList<>());
}
RecordInfoEndEvent outEvent = new RecordInfoEndEvent(this);
outEvent.setRecordInfo(recordInfo);
applicationEventPublisher.publishEvent(outEvent);
}
}

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
public interface IRedisRpcPlayService {
@ -13,5 +14,8 @@ public interface IRedisRpcPlayService {
void playback(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<StreamInfo> callback);
void download(String serverId, Integer id, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback);
void download(String serverId, Integer channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback);
void queryRecordInfo(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<RecordInfo> callback);
}

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
@ -17,6 +18,7 @@ import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -60,14 +62,14 @@ public class RedisRpcChannelPlayController extends RpcController {
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(Response.BAD_REQUEST);
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(Response.BAD_REQUEST);
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
@ -76,7 +78,7 @@ public class RedisRpcChannelPlayController extends RpcController {
inviteInfo.setSessionName("Play");
channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(Response.OK);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(data);
}else {
response.setStatusCode(code);
@ -88,6 +90,50 @@ public class RedisRpcChannelPlayController extends RpcController {
}
/**
*
*/
@RedisRpcMapping("queryRecordInfo")
public RedisRpcResponse queryRecordInfo(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
int channelId = paramJson.getIntValue("channelId");
String startTime = paramJson.getString("startTime");
String endTime = paramJson.getString("endTime");
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
channelService.queryRecordInfo(channel, startTime, endTime, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(data);
}else {
response.setStatusCode(code);
}
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(ErrorCode.ERROR100.getCode());
response.setBody(e.getMessage());
}
return null;
}
/**
*
*/
@ -99,7 +145,7 @@ public class RedisRpcChannelPlayController extends RpcController {
Integer channelId = jsonObject.getIntValue("channelId");
if (channelId == null || channelId <= 0) {
response.setStatusCode(Response.BAD_REQUEST);
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
@ -110,13 +156,13 @@ public class RedisRpcChannelPlayController extends RpcController {
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(Response.BAD_REQUEST);
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
channelPlayService.stopPlay(type, channel, stream);
response.setStatusCode(Response.OK);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}catch (Exception e){
response.setStatusCode(Response.SERVER_INTERNAL_ERROR);
response.setBody(e.getMessage());
@ -136,14 +182,14 @@ public class RedisRpcChannelPlayController extends RpcController {
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(Response.BAD_REQUEST);
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(Response.BAD_REQUEST);
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
@ -154,7 +200,7 @@ public class RedisRpcChannelPlayController extends RpcController {
inviteInfo.setStopTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime));
channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(Response.OK);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(data);
}else {
response.setStatusCode(code);
@ -178,14 +224,14 @@ public class RedisRpcChannelPlayController extends RpcController {
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(Response.BAD_REQUEST);
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(Response.BAD_REQUEST);
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
@ -197,7 +243,7 @@ public class RedisRpcChannelPlayController extends RpcController {
inviteInfo.setDownloadSpeed(downloadSpeed + "");
channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(Response.OK);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(data);
}else {
response.setStatusCode(code);

View File

@ -49,7 +49,7 @@ public class RedisRpcSendRtpController extends RpcController {
if (sendRtpItem == null) {
log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息 callId{}", callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
log.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
@ -57,14 +57,14 @@ public class RedisRpcSendRtpController extends RpcController {
MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaServerItem == null) {
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}
// 自平台内容
int localPort = sendRtpServerService.getNextPort(mediaServerItem);
if (localPort == 0) {
log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
@ -77,7 +77,7 @@ public class RedisRpcSendRtpController extends RpcController {
}
sendRtpServerService.update(sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(callId);
return response;
}
@ -90,7 +90,7 @@ public class RedisRpcSendRtpController extends RpcController {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
if (sendRtpItem == null) {
log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息 callId{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
@ -134,7 +134,7 @@ public class RedisRpcSendRtpController extends RpcController {
String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(Response.OK);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
if (sendRtpItem == null) {
log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息 key{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");

View File

@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
@ -79,7 +80,7 @@ public class RedisRpcStreamPushController extends RpcController {
sendRtpServerService.update(sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setBody(sendRtpItem.getChannelId());
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
@ -98,7 +99,7 @@ public class RedisRpcStreamPushController extends RpcController {
redisTemplate.opsForValue().set(sendRtpItem.getChannelId(), sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setBody(sendRtpItem.getChannelId());
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
// 手动发送结果
sendResponse(response);
hookSubscribe.removeSubscribe(hook);
@ -120,7 +121,7 @@ public class RedisRpcStreamPushController extends RpcController {
log.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}", streamInfo.getApp(), streamInfo.getStream());
RedisRpcResponse response = request.getResponse();
response.setBody(JSONObject.toJSONString(streamInfoInServer));
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
@ -133,7 +134,7 @@ public class RedisRpcStreamPushController extends RpcController {
streamInfo.getApp(), streamInfo.getStream(), hookData.getMediaInfo(),
hookData.getMediaInfo() != null ? hookData.getMediaInfo().getCallId() : null);
response.setBody(JSONObject.toJSONString(streamInfoByAppAndStream));
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
// 手动发送结果
sendResponse(response);
hookSubscribe.removeSubscribe(hook);
@ -152,7 +153,7 @@ public class RedisRpcStreamPushController extends RpcController {
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
hookSubscribe.removeSubscribe(hook);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
@ -167,7 +168,7 @@ public class RedisRpcStreamPushController extends RpcController {
Hook hook = Hook.getInstance(HookType.on_media_arrival, streamInfo.getApp(), streamInfo.getStream(), null);
hookSubscribe.removeSubscribe(hook);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}

View File

@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
@ -48,7 +49,7 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == Response.OK) {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
@ -69,12 +70,33 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
if (response == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}else {
if (response.getStatusCode() != Response.OK) {
if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}
}
}
@Override
public void queryRecordInfo(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<RecordInfo> callback) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("channelId", channelId);
jsonObject.put("startTime", startTime);
jsonObject.put("endTime", endTime);
RedisRpcRequest request = buildRequest("channel/queryRecordInfo", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getRecordInfoTimeout(), TimeUnit.MILLISECONDS);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
RecordInfo recordInfo = JSON.parseObject(response.getBody().toString(), RecordInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), recordInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
}
@Override
public void playback(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<StreamInfo> callback) {
@ -88,7 +110,7 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == Response.OK) {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
@ -111,7 +133,7 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == Response.OK) {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {

View File

@ -9,6 +9,7 @@ public enum ErrorCode {
ERROR400(400, "参数或方法错误"),
ERROR404(404, "资源未找到"),
ERROR403(403, "无权限操作"),
ERROR486(486, "超时或无响应"),
ERROR401(401, "请登录后重新请求"),
ERROR500(500, "系统异常");

View File

@ -198,6 +198,8 @@ user-settings:
save-position-history: false
# 点播/录像回放 等待超时时间,单位:毫秒
play-timeout: 18000
# 获取设备录像数据超时时间,单位:毫秒
record-info-timeout: 10000
# 上级点播等待超时时间,单位:毫秒
platform-play-timeout: 60000
# 是否开启接口鉴权