From a9099e0412aa5ba9b00ef5d0dcc0dc70e9b3627d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 13 Dec 2024 14:35:44 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0RPC=E5=BD=95=E5=83=8F?= =?UTF-8?q?=E5=88=97=E8=A1=A8=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/UserSetting.java | 5 + .../iot/vmp/conf/redis/RedisRpcConfig.java | 5 +- .../controller/GBRecordController.java | 40 ++-- .../iot/vmp/gb28181/event/EventPublisher.java | 8 +- ...dEndEvent.java => RecordInfoEndEvent.java} | 17 +- .../gb28181/event/record/RecordInfoEvent.java | 27 +++ ...ener.java => RecordInfoEventListener.java} | 11 +- .../service/IDeviceChannelService.java | 9 +- .../vmp/gb28181/service/IDeviceService.java | 5 +- .../service/IGbChannelPlayService.java | 2 +- .../gb28181/service/IGbChannelService.java | 2 + .../impl/DeviceChannelServiceImpl.java | 86 +++++++- .../service/impl/DeviceServiceImpl.java | 11 +- .../impl/GbChannelPlayServiceImpl.java | 7 +- .../service/impl/GbChannelServiceImpl.java | 28 +++ .../vmp/gb28181/session/RecordDataCatch.java | 6 +- .../cmd/RecordInfoQueryMessageHandler.java | 6 +- .../cmd/RecordInfoResponseMessageHandler.java | 204 +++++++++--------- .../redisMsg/IRedisRpcPlayService.java | 6 +- .../RedisRpcChannelPlayController.java | 70 ++++-- .../control/RedisRpcSendRtpController.java | 12 +- .../control/RedisRpcStreamPushController.java | 13 +- .../service/RedisRpcPlayServiceImpl.java | 30 ++- .../iot/vmp/vmanager/bean/ErrorCode.java | 1 + src/main/resources/配置详情.yml | 2 + 25 files changed, 409 insertions(+), 204 deletions(-) rename src/main/java/com/genersoft/iot/vmp/gb28181/event/record/{RecordEndEvent.java => RecordInfoEndEvent.java} (54%) create mode 100755 src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordInfoEvent.java rename src/main/java/com/genersoft/iot/vmp/gb28181/event/record/{RecordEndEventListener.java => RecordInfoEventListener.java} (85%) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index 604c8c56..b9356592 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -37,6 +37,11 @@ public class UserSetting { */ private Integer playTimeout = 10000; + /** + * 获取设备录像数据超时时间,单位:毫秒 + */ + private Integer recordInfoTimeout = 15000; + /** * 上级点播等待超时时间,单位:毫秒 */ diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java index 02293685..29e090ad 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -129,7 +130,7 @@ public class RedisRpcConfig implements MessageListener { if (method == null) { // 回复404结果 RedisRpcResponse response = request.getResponse(); - response.setStatusCode(Response.NOT_FOUND); + response.setStatusCode(ErrorCode.ERROR404.getCode()); sendResponse(response); return; } @@ -185,7 +186,7 @@ public class RedisRpcConfig implements MessageListener { } catch (InterruptedException e) { log.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e); RedisRpcResponse redisRpcResponse = new RedisRpcResponse(); - redisRpcResponse.setStatusCode(Response.BUSY_HERE); + redisRpcResponse.setStatusCode(ErrorCode.ERROR486.getCode()); return redisRpcResponse; } finally { this.unsubscribe(request.getSn()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/GBRecordController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/GBRecordController.java index e48ae616..87357d27 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/GBRecordController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/GBRecordController.java @@ -36,6 +36,7 @@ import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.util.UUID; +import java.util.concurrent.TimeUnit; @Tag(name = "国标录像") @Slf4j @@ -72,7 +73,7 @@ public class GBRecordController { if (log.isDebugEnabled()) { log.debug(String.format("录像信息查询 API调用,deviceId:%s ,startTime:%s, endTime:%s",deviceId, startTime, endTime)); } - DeferredResult> result = new DeferredResult<>(); + DeferredResult> result = new DeferredResult<>(Long.valueOf(userSetting.getRecordInfoTimeout()), TimeUnit.MILLISECONDS); if (!DateUtil.verification(startTime, DateUtil.formatter)){ throw new ControllerException(ErrorCode.ERROR100.getCode(), "startTime格式为" + DateUtil.PATTERN); } @@ -81,35 +82,24 @@ public class GBRecordController { } Device device = deviceService.getDeviceByDeviceId(deviceId); - // 指定超时时间 1分钟30秒 - String uuid = UUID.randomUUID().toString(); - int sn = (int)((Math.random()*9+1)*100000); - String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn; - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - try { - cmder.recordInfoQuery(device, channelId, startTime, endTime, sn, null, null, null, (eventResult -> { - WVPResult wvpResult = new WVPResult<>(); - wvpResult.setCode(ErrorCode.ERROR100.getCode()); - wvpResult.setMsg("查询录像失败, status: " + eventResult.statusCode + ", message: " + eventResult.msg); - msg.setData(wvpResult); - resultHolder.invokeResult(msg); - })); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 查询录像: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + if (device == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), deviceId + " 不存在"); } - - // 录像查询以channelId作为deviceId查询 - resultHolder.put(key, uuid, result); + DeviceChannel channel = channelService.getOneForSource(device.getId(), channelId); + if (channel == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), channelId + " 不存在"); + } + channelService.queryRecordInfo(device, channel, startTime, endTime, (code, msg, data)->{ + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(code); + wvpResult.setMsg(msg); + result.setResult(wvpResult); + }); result.onTimeout(()->{ - msg.setData("timeout"); WVPResult wvpResult = new WVPResult<>(); wvpResult.setCode(ErrorCode.ERROR100.getCode()); wvpResult.setMsg("timeout"); - msg.setData(wvpResult); - resultHolder.invokeResult(msg); + result.setResult(wvpResult); }); return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 7d4c876a..a19a1009 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.gb28181.event; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent; import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent; -import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent; +import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -101,9 +101,5 @@ public class EventPublisher { applicationEventPublisher.publishEvent(event); } - public void recordEndEventPush(RecordInfo recordInfo) { - RecordEndEvent outEvent = new RecordEndEvent(this); - outEvent.setRecordInfo(recordInfo); - applicationEventPublisher.publishEvent(outEvent); - } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordInfoEndEvent.java similarity index 54% rename from src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEvent.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordInfoEndEvent.java index cfd2985c..4788eb62 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordInfoEndEvent.java @@ -1,7 +1,8 @@ package com.genersoft.iot.vmp.gb28181.event.record; -import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; +import lombok.Getter; +import lombok.Setter; import org.springframework.context.ApplicationEvent; /** @@ -9,24 +10,18 @@ import org.springframework.context.ApplicationEvent; * @author: pan * @data: 2022-02-23 */ - -public class RecordEndEvent extends ApplicationEvent { +@Setter +@Getter +public class RecordInfoEndEvent extends ApplicationEvent { /** * */ private static final long serialVersionUID = 1L; - public RecordEndEvent(Object source) { + public RecordInfoEndEvent(Object source) { super(source); } private RecordInfo recordInfo; - public RecordInfo getRecordInfo() { - return recordInfo; - } - - public void setRecordInfo(RecordInfo recordInfo) { - this.recordInfo = recordInfo; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordInfoEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordInfoEvent.java new file mode 100755 index 00000000..d78a22b2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordInfoEvent.java @@ -0,0 +1,27 @@ +package com.genersoft.iot.vmp.gb28181.event.record; + +import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; +import lombok.Getter; +import lombok.Setter; +import org.springframework.context.ApplicationEvent; + +/** + * @description: 录像查询结束时间 + * @author: pan + * @data: 2022-02-23 + */ + +@Setter +@Getter +public class RecordInfoEvent extends ApplicationEvent { + /** + * + */ + private static final long serialVersionUID = 1L; + + public RecordInfoEvent(Object source) { + super(source); + } + + private RecordInfo recordInfo; +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordInfoEventListener.java similarity index 85% rename from src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordInfoEventListener.java index 411b54d5..f2a0986d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordInfoEventListener.java @@ -15,15 +15,15 @@ import java.util.concurrent.ConcurrentHashMap; */ @Slf4j @Component -public class RecordEndEventListener implements ApplicationListener { +public class RecordInfoEventListener implements ApplicationListener { - private Map handlerMap = new ConcurrentHashMap<>(); + private final Map handlerMap = new ConcurrentHashMap<>(); public interface RecordEndEventHandler{ void handler(RecordInfo recordInfo); } @Override - public void onApplicationEvent(RecordEndEvent event) { + public void onApplicationEvent(RecordInfoEvent event) { String deviceId = event.getRecordInfo().getDeviceId(); String channelId = event.getRecordInfo().getChannelId(); int count = event.getRecordInfo().getCount(); @@ -45,9 +45,6 @@ public class RecordEndEventListener implements ApplicationListener queryChaneListByDeviceDbId(Integer deviceDbId); List queryChaneIdListByDeviceDbIds(List deviceDbId); + + void queryRecordInfo(Device device, DeviceChannel channel, String startTime, String endTime, ErrorCallback object); + + void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback object); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java index a8f48a47..43a965f4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java @@ -1,9 +1,8 @@ package com.genersoft.iot.vmp.gb28181.service; import com.genersoft.iot.vmp.common.CommonCallback; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; -import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.github.pagehelper.PageInfo; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java index 967661a8..61c0b8cb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo; import com.genersoft.iot.vmp.gb28181.bean.Platform; +import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; public interface IGbChannelPlayService { @@ -25,6 +26,5 @@ public interface IGbChannelPlayService { void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback callback); - void stopPlayPush(CommonGBChannel channel); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java index 8dc67df2..441b524f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.service; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.github.pagehelper.PageInfo; @@ -87,4 +88,5 @@ public interface IGbChannelService { PageInfo queryList(int page, int count, String query, Boolean online, Boolean hasRecordPlan, Integer channelType); + void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index b397f4d4..062fd1d5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -5,37 +5,48 @@ import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.GbCode; -import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import javax.sip.message.Response; +import java.text.ParseException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; /** * @author lin @@ -71,6 +82,26 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Autowired private IPlatformChannelService platformChannelService; + @Autowired + private IRedisRpcPlayService redisRpcPlayService; + + @Autowired + private ISIPCommander commander; + + // 记录录像查询的结果等待 + private final Map> topicSubscribers = new ConcurrentHashMap<>(); + + /** + * 监听录像查询结束事件 + */ + @Async("taskExecutor") + @org.springframework.context.event.EventListener + public void onApplicationEvent(RecordInfoEndEvent event) { + SynchronousQueue queue = topicSubscribers.get("record" + event.getRecordInfo().getSn()); + if (queue != null) { + queue.offer(event.getRecordInfo()); + } + } @Override public int updateChannels(Device device, List channels) { @@ -164,10 +195,8 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Override public ResourceBaseInfo getOverview() { - int online = channelMapper.getOnlineCount(); int total = channelMapper.getAllChannelCount(); - return new ResourceBaseInfo(total, online); } @@ -705,4 +734,49 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { public void updateChannelForNotify(DeviceChannel channel) { channelMapper.updateChannelForNotify(channel); } + + @Override + public void queryRecordInfo(Device device, DeviceChannel channel, String startTime, String endTime, ErrorCallback callback) { + if (!userSetting.getServerId().equals(device.getServerId())){ + redisRpcPlayService.queryRecordInfo(device.getServerId(), channel.getId(), startTime, endTime, callback); + return; + } + try { + int sn = (int)((Math.random()*9+1)*100000); + commander.recordInfoQuery(device, channel.getDeviceId(), startTime, endTime, sn, null, null, eventResult -> { + try { + // 消息发送成功, 监听等待数据到来 + SynchronousQueue queue = new SynchronousQueue<>(); + topicSubscribers.put("record" + sn, queue); + RecordInfo recordInfo = queue.poll(userSetting.getRecordInfoTimeout(), TimeUnit.MILLISECONDS); + if (recordInfo == null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), recordInfo); + } + } catch (InterruptedException e) { + callback.run(ErrorCode.ERROR100.getCode(), e.getMessage(), null); + } finally { + this.topicSubscribers.remove("record" + sn); + } + + }, (eventResult -> { + callback.run(ErrorCode.ERROR100.getCode(), "查询录像失败, status: " + eventResult.statusCode + ", message: " + eventResult.msg, null); + })); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 查询录像: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + } + + @Override + public void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback callback) { + Device device = deviceMapper.query(channel.getGbDeviceDbId()); + if (device == null) { + log.warn("[点播] 未找到通道{}的设备信息", channel); + callback.run(ErrorCode.ERROR100.getCode(), "设备不存在", null); + return; + } + DeviceChannel deviceChannel = getOneForSourceById(channel.getGbId()); + queryRecordInfo(device, deviceChannel, startTime, endTime, callback); + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 4773b9fd..3c69da75 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; @@ -23,10 +24,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respons import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; @@ -36,9 +40,13 @@ import org.springframework.transaction.annotation.Transactional; import javax.sip.InvalidArgumentException; import javax.sip.SipException; +import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** @@ -69,9 +77,6 @@ public class DeviceServiceImpl implements IDeviceService { @Autowired private PlatformChannelMapper platformChannelMapper; - @Autowired - private IDeviceChannelService deviceChannelService; - @Autowired private DeviceChannelMapper deviceChannelMapper; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java index c1c009d3..cd3bf212 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java @@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; -import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.bean.PlayException; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.gb28181.service.IPlayService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; @@ -223,4 +220,6 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { callback.run(Response.BUSY_HERE, "busy here", null); } } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 792fd88b..5fb01d4b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -8,11 +8,15 @@ import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.RegionMapper; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; +import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; @@ -22,6 +26,7 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; +import javax.sip.message.Response; import java.util.*; @Slf4j @@ -46,6 +51,9 @@ public class GbChannelServiceImpl implements IGbChannelService { @Autowired private GroupMapper groupMapper; + @Autowired + private IDeviceChannelService deviceChannelService; + @Override public CommonGBChannel queryByDeviceId(String gbDeviceId) { return commonGBChannelMapper.queryByDeviceId(gbDeviceId); @@ -726,4 +734,24 @@ public class GbChannelServiceImpl implements IGbChannelService { List all = commonGBChannelMapper.queryList(query, online, hasRecordPlan, channelType); return new PageInfo<>(all); } + + @Override + public void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback callback) { + if (channel.getGbDeviceDbId() != null) { + + deviceChannelService.queryRecordInfo(channel, startTime, endTime, callback); + } else if (channel.getStreamProxyId() != null) { + // 拉流代理 + log.warn("[下载通用通道录像] 不支持下载拉流代理的录像: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.FORBIDDEN, "forbidden"); + } else if (channel.getStreamPushId() != null) { + // 推流 + log.warn("[下载通用通道录像] 不支持下载推流的录像: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.FORBIDDEN, "forbidden"); + } else { + // 通道数据异常 + log.error("[回放通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java index 3f24dbee..e89cedf2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java @@ -1,10 +1,9 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEventListener; +import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEventListener; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -24,8 +23,9 @@ public class RecordDataCatch { @Autowired private DeferredResultHolder deferredResultHolder; + @Autowired - private RecordEndEventListener recordEndEventListener; + private RecordInfoEventListener recordEndEventListener; public int put(String deviceId,String channelId, String sn, int sumNum, List recordItems) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java index 79deb042..ccb1b2c9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java @@ -4,7 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEventListener; +import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEventListener; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; @@ -52,7 +52,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp private SIPCommander commander; @Autowired - private RecordEndEventListener recordEndEventListener; + private RecordInfoEventListener recordInfoEventListener; @Override public void afterPropertiesSet() throws Exception { @@ -126,7 +126,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp // 获取通道的原始信息 DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(channel.getGbId()); // 接收录像数据 - recordEndEventListener.addEndEventHandler(device.getDeviceId(), deviceChannel.getDeviceId(), (recordInfo)->{ + recordInfoEventListener.addEndEventHandler(device.getDeviceId(), deviceChannel.getDeviceId(), (recordInfo)->{ try { log.info("[国标级联] 录像查询收到数据, 通道: {},准备转发===", channelId); cmderFroPlatform.recordInfo(channel, platform, request.getFromTag(), recordInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 655c0551..0ab97e87 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -6,6 +6,8 @@ import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordItem; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent; +import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEvent; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -19,6 +21,7 @@ import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @@ -48,14 +51,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent private ResponseMessageHandler responseMessageHandler; @Autowired - private DeferredResultHolder deferredResultHolder; - - @Autowired - private EventPublisher eventPublisher; - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private ApplicationEventPublisher applicationEventPublisher; @Autowired private RedisTemplate redisTemplate; @@ -75,88 +71,89 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent }catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage()); } - taskExecutor.execute(()->{ - try { - String sn = getText(rootElement, "SN"); - String channelId = getText(rootElement, "DeviceID"); - RecordInfo recordInfo = new RecordInfo(); - recordInfo.setChannelId(channelId); - recordInfo.setDeviceId(device.getDeviceId()); - recordInfo.setSn(sn); - recordInfo.setName(getText(rootElement, "Name")); - String sumNumStr = getText(rootElement, "SumNum"); - int sumNum = 0; - if (!ObjectUtils.isEmpty(sumNumStr)) { - sumNum = Integer.parseInt(sumNumStr); - } - recordInfo.setSumNum(sumNum); - Element recordListElement = rootElement.element("RecordList"); - if (recordListElement == null || sumNum == 0) { - log.info("无录像数据"); - recordInfo.setCount(sumNum); - eventPublisher.recordEndEventPush(recordInfo); - releaseRequest(device.getDeviceId(), sn,recordInfo); - } else { - Iterator recordListIterator = recordListElement.elementIterator(); - if (recordListIterator != null) { - List recordList = new ArrayList<>(); - // 遍历DeviceList - while (recordListIterator.hasNext()) { - Element itemRecord = recordListIterator.next(); - Element recordElement = itemRecord.element("DeviceID"); - if (recordElement == null) { - log.info("记录为空,下一个..."); - continue; - } - RecordItem record = new RecordItem(); - record.setDeviceId(getText(itemRecord, "DeviceID")); - record.setName(getText(itemRecord, "Name")); - record.setFilePath(getText(itemRecord, "FilePath")); - record.setFileSize(getText(itemRecord, "FileSize")); - record.setAddress(getText(itemRecord, "Address")); - - String startTimeStr = getText(itemRecord, "StartTime"); - record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); - - String endTimeStr = getText(itemRecord, "EndTime"); - record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); - - record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 - : Integer.parseInt(getText(itemRecord, "Secrecy"))); - record.setType(getText(itemRecord, "Type")); - record.setRecorderId(getText(itemRecord, "RecorderID")); - recordList.add(record); - } - Map map = recordList.stream() - .filter(record -> record.getDeviceId() != null) - .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson)); - // 获取任务结果数据 - String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn; - redisTemplate.opsForHash().putAll(resKey, map); - redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS); - String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn; - long incr = redisTemplate.opsForValue().increment(resCountKey, map.size()); - redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS); - recordInfo.setRecordList(recordList); - recordInfo.setCount(Math.toIntExact(incr)); - eventPublisher.recordEndEventPush(recordInfo); - if (incr < sumNum) { - return; - } - // 已接收完成 - List resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList()); - if (resList.size() < sumNum) { - return; - } - recordInfo.setRecordList(resList); - releaseRequest(device.getDeviceId(), sn,recordInfo); - } - } - } catch (Exception e) { - log.error("[国标录像] 发现未处理的异常, \r\n{}", evt.getRequest()); - log.error("[国标录像] 异常内容: ", e); + try { + String sn = getText(rootElement, "SN"); + String channelId = getText(rootElement, "DeviceID"); + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setChannelId(channelId); + recordInfo.setDeviceId(device.getDeviceId()); + recordInfo.setSn(sn); + recordInfo.setName(getText(rootElement, "Name")); + String sumNumStr = getText(rootElement, "SumNum"); + int sumNum = 0; + if (!ObjectUtils.isEmpty(sumNumStr)) { + sumNum = Integer.parseInt(sumNumStr); } - }); + recordInfo.setSumNum(sumNum); + Element recordListElement = rootElement.element("RecordList"); + if (recordListElement == null || sumNum == 0) { + log.info("无录像数据"); + recordInfo.setCount(sumNum); + recordInfoEventPush(recordInfo); + recordInfoEndEventPush(recordInfo); + } else { + Iterator recordListIterator = recordListElement.elementIterator(); + if (recordListIterator != null) { + List recordList = new ArrayList<>(); + // 遍历DeviceList + while (recordListIterator.hasNext()) { + Element itemRecord = recordListIterator.next(); + Element recordElement = itemRecord.element("DeviceID"); + if (recordElement == null) { + log.info("记录为空,下一个..."); + continue; + } + RecordItem record = new RecordItem(); + record.setDeviceId(getText(itemRecord, "DeviceID")); + record.setName(getText(itemRecord, "Name")); + record.setFilePath(getText(itemRecord, "FilePath")); + record.setFileSize(getText(itemRecord, "FileSize")); + record.setAddress(getText(itemRecord, "Address")); + + String startTimeStr = getText(itemRecord, "StartTime"); + record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); + + String endTimeStr = getText(itemRecord, "EndTime"); + record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); + + record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 + : Integer.parseInt(getText(itemRecord, "Secrecy"))); + record.setType(getText(itemRecord, "Type")); + record.setRecorderId(getText(itemRecord, "RecorderID")); + recordList.add(record); + } + Map map = recordList.stream() + .filter(record -> record.getDeviceId() != null) + .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson)); + // 获取任务结果数据 + String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn; + redisTemplate.opsForHash().putAll(resKey, map); + redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS); + String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn; + Long incr = redisTemplate.opsForValue().increment(resCountKey, map.size()); + if (incr == null) { + incr = 0L; + } + redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS); + recordInfo.setRecordList(recordList); + recordInfo.setCount(Math.toIntExact(incr)); + recordInfoEventPush(recordInfo); + if (incr < sumNum) { + return; + } + // 已接收完成 + List resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList()); + if (resList.size() < sumNum) { + return; + } + recordInfo.setRecordList(resList); + recordInfoEndEventPush(recordInfo); + } + } + } catch (Exception e) { + log.error("[国标录像] 发现未处理的异常, \r\n{}", evt.getRequest()); + log.error("[国标录像] 异常内容: ", e); + } } @Override @@ -164,18 +161,31 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent } - public void releaseRequest(String deviceId, String sn,RecordInfo recordInfo){ - String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn; - // 对数据进行排序 - if(recordInfo!=null && recordInfo.getRecordList()!=null) { + private void recordInfoEventPush(RecordInfo recordInfo) { + if (recordInfo == null) { + return; + } + if(recordInfo.getRecordList() != null) { Collections.sort(recordInfo.getRecordList()); }else{ recordInfo.setRecordList(new ArrayList<>()); } + RecordInfoEvent outEvent = new RecordInfoEvent(this); + outEvent.setRecordInfo(recordInfo); + applicationEventPublisher.publishEvent(outEvent); + } - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(recordInfo); - deferredResultHolder.invokeAllResult(msg); + private void recordInfoEndEventPush(RecordInfo recordInfo) { + if (recordInfo == null) { + return; + } + if(recordInfo.getRecordList() != null) { + Collections.sort(recordInfo.getRecordList()); + }else{ + recordInfo.setRecordList(new ArrayList<>()); + } + RecordInfoEndEvent outEvent = new RecordInfoEndEvent(this); + outEvent.setRecordInfo(recordInfo); + applicationEventPublisher.publishEvent(outEvent); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java index 4c1bc066..11f80ee8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; public interface IRedisRpcPlayService { @@ -13,5 +14,8 @@ public interface IRedisRpcPlayService { void playback(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback callback); - void download(String serverId, Integer id, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); + void download(String serverId, Integer channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); + + void queryRecordInfo(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback callback); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java index f28aa6f1..3eb9c1e3 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; @@ -17,6 +18,7 @@ import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController; import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping; import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -60,14 +62,14 @@ public class RedisRpcChannelPlayController extends RpcController { RedisRpcResponse response = request.getResponse(); if (channelId <= 0) { - response.setStatusCode(Response.BAD_REQUEST); + response.setStatusCode(ErrorCode.ERROR400.getCode()); response.setBody("param error"); return response; } // 获取对应的设备和通道信息 CommonGBChannel channel = channelService.getOne(channelId); if (channel == null) { - response.setStatusCode(Response.BAD_REQUEST); + response.setStatusCode(ErrorCode.ERROR400.getCode()); response.setBody("param error"); return response; } @@ -76,7 +78,7 @@ public class RedisRpcChannelPlayController extends RpcController { inviteInfo.setSessionName("Play"); channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{ if (code == InviteErrorCode.SUCCESS.getCode()) { - response.setStatusCode(Response.OK); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); response.setBody(data); }else { response.setStatusCode(code); @@ -88,6 +90,50 @@ public class RedisRpcChannelPlayController extends RpcController { } + /** + * 点播国标设备 + */ + @RedisRpcMapping("queryRecordInfo") + public RedisRpcResponse queryRecordInfo(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + int channelId = paramJson.getIntValue("channelId"); + String startTime = paramJson.getString("startTime"); + String endTime = paramJson.getString("endTime"); + RedisRpcResponse response = request.getResponse(); + + if (channelId <= 0) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + // 获取对应的设备和通道信息 + CommonGBChannel channel = channelService.getOne(channelId); + if (channel == null) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + + try { + channelService.queryRecordInfo(channel, startTime, endTime, (code, msg, data) ->{ + if (code == InviteErrorCode.SUCCESS.getCode()) { + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(data); + }else { + response.setStatusCode(code); + } + // 手动发送结果 + sendResponse(response); + }); + }catch (ControllerException e) { + response.setStatusCode(ErrorCode.ERROR100.getCode()); + response.setBody(e.getMessage()); + } + + return null; + } + + /** * 停止点播国标设备 */ @@ -99,7 +145,7 @@ public class RedisRpcChannelPlayController extends RpcController { Integer channelId = jsonObject.getIntValue("channelId"); if (channelId == null || channelId <= 0) { - response.setStatusCode(Response.BAD_REQUEST); + response.setStatusCode(ErrorCode.ERROR400.getCode()); response.setBody("param error"); return response; } @@ -110,13 +156,13 @@ public class RedisRpcChannelPlayController extends RpcController { // 获取对应的设备和通道信息 CommonGBChannel channel = channelService.getOne(channelId); if (channel == null) { - response.setStatusCode(Response.BAD_REQUEST); + response.setStatusCode(ErrorCode.ERROR400.getCode()); response.setBody("param error"); return response; } try { channelPlayService.stopPlay(type, channel, stream); - response.setStatusCode(Response.OK); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); }catch (Exception e){ response.setStatusCode(Response.SERVER_INTERNAL_ERROR); response.setBody(e.getMessage()); @@ -136,14 +182,14 @@ public class RedisRpcChannelPlayController extends RpcController { RedisRpcResponse response = request.getResponse(); if (channelId <= 0) { - response.setStatusCode(Response.BAD_REQUEST); + response.setStatusCode(ErrorCode.ERROR400.getCode()); response.setBody("param error"); return response; } // 获取对应的设备和通道信息 CommonGBChannel channel = channelService.getOne(channelId); if (channel == null) { - response.setStatusCode(Response.BAD_REQUEST); + response.setStatusCode(ErrorCode.ERROR400.getCode()); response.setBody("param error"); return response; } @@ -154,7 +200,7 @@ public class RedisRpcChannelPlayController extends RpcController { inviteInfo.setStopTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime)); channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{ if (code == InviteErrorCode.SUCCESS.getCode()) { - response.setStatusCode(Response.OK); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); response.setBody(data); }else { response.setStatusCode(code); @@ -178,14 +224,14 @@ public class RedisRpcChannelPlayController extends RpcController { RedisRpcResponse response = request.getResponse(); if (channelId <= 0) { - response.setStatusCode(Response.BAD_REQUEST); + response.setStatusCode(ErrorCode.ERROR400.getCode()); response.setBody("param error"); return response; } // 获取对应的设备和通道信息 CommonGBChannel channel = channelService.getOne(channelId); if (channel == null) { - response.setStatusCode(Response.BAD_REQUEST); + response.setStatusCode(ErrorCode.ERROR400.getCode()); response.setBody("param error"); return response; } @@ -197,7 +243,7 @@ public class RedisRpcChannelPlayController extends RpcController { inviteInfo.setDownloadSpeed(downloadSpeed + ""); channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{ if (code == InviteErrorCode.SUCCESS.getCode()) { - response.setStatusCode(Response.OK); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); response.setBody(data); }else { response.setStatusCode(code); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java index ee5d1e18..fa818025 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java @@ -49,7 +49,7 @@ public class RedisRpcSendRtpController extends RpcController { if (sendRtpItem == null) { log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息, callId:{}", callId); RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); return response; } log.info("[redis-rpc] 获取发流的信息: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); @@ -57,14 +57,14 @@ public class RedisRpcSendRtpController extends RpcController { MediaServer mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream()); if (mediaServerItem == null) { RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); } // 自平台内容 int localPort = sendRtpServerService.getNextPort(mediaServerItem); if (localPort == 0) { log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" ); RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); } // 写入redis, 超时时回复 sendRtpItem.setStatus(1); @@ -77,7 +77,7 @@ public class RedisRpcSendRtpController extends RpcController { } sendRtpServerService.update(sendRtpItem); RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); response.setBody(callId); return response; } @@ -90,7 +90,7 @@ public class RedisRpcSendRtpController extends RpcController { String callId = request.getParam().toString(); SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); if (sendRtpItem == null) { log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息, callId:{}", callId); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); @@ -134,7 +134,7 @@ public class RedisRpcSendRtpController extends RpcController { String callId = request.getParam().toString(); SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); RedisRpcResponse response = request.getResponse(); - response.setStatusCode(Response.OK); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); if (sendRtpItem == null) { log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", callId); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java index e1ef57b5..1cf46890 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java @@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController; import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping; import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; @@ -79,7 +80,7 @@ public class RedisRpcStreamPushController extends RpcController { sendRtpServerService.update(sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setBody(sendRtpItem.getChannelId()); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); } // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null); @@ -98,7 +99,7 @@ public class RedisRpcStreamPushController extends RpcController { redisTemplate.opsForValue().set(sendRtpItem.getChannelId(), sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setBody(sendRtpItem.getChannelId()); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); // 手动发送结果 sendResponse(response); hookSubscribe.removeSubscribe(hook); @@ -120,7 +121,7 @@ public class RedisRpcStreamPushController extends RpcController { log.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}", streamInfo.getApp(), streamInfo.getStream()); RedisRpcResponse response = request.getResponse(); response.setBody(JSONObject.toJSONString(streamInfoInServer)); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); return response; } // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 @@ -133,7 +134,7 @@ public class RedisRpcStreamPushController extends RpcController { streamInfo.getApp(), streamInfo.getStream(), hookData.getMediaInfo(), hookData.getMediaInfo() != null ? hookData.getMediaInfo().getCallId() : null); response.setBody(JSONObject.toJSONString(streamInfoByAppAndStream)); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); // 手动发送结果 sendResponse(response); hookSubscribe.removeSubscribe(hook); @@ -152,7 +153,7 @@ public class RedisRpcStreamPushController extends RpcController { Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null); hookSubscribe.removeSubscribe(hook); RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); return response; } @@ -167,7 +168,7 @@ public class RedisRpcStreamPushController extends RpcController { Hook hook = Hook.getInstance(HookType.on_media_arrival, streamInfo.getApp(), streamInfo.getStream(), null); hookSubscribe.removeSubscribe(hook); RedisRpcResponse response = request.getResponse(); - response.setStatusCode(200); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); return response; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java index 623aae13..a22e9dad 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; @@ -48,7 +49,7 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { if (response == null) { callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); }else { - if (response.getStatusCode() == Response.OK) { + if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) { StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class); callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); }else { @@ -69,12 +70,33 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { if (response == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg()); }else { - if (response.getStatusCode() != Response.OK) { + if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) { throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg()); } } } + @Override + public void queryRecordInfo(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback callback) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("channelId", channelId); + jsonObject.put("startTime", startTime); + jsonObject.put("endTime", endTime); + RedisRpcRequest request = buildRequest("channel/queryRecordInfo", jsonObject); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getRecordInfoTimeout(), TimeUnit.MILLISECONDS); + if (response == null) { + callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); + }else { + if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) { + RecordInfo recordInfo = JSON.parseObject(response.getBody().toString(), RecordInfo.class); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), recordInfo); + }else { + callback.run(response.getStatusCode(), response.getBody().toString(), null); + } + } + } + @Override public void playback(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback callback) { @@ -88,7 +110,7 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { if (response == null) { callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); }else { - if (response.getStatusCode() == Response.OK) { + if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) { StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class); callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); }else { @@ -111,7 +133,7 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { if (response == null) { callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); }else { - if (response.getStatusCode() == Response.OK) { + if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) { StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class); callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); }else { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/ErrorCode.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/ErrorCode.java index e2e3879b..53b8be60 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/ErrorCode.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/ErrorCode.java @@ -9,6 +9,7 @@ public enum ErrorCode { ERROR400(400, "参数或方法错误"), ERROR404(404, "资源未找到"), ERROR403(403, "无权限操作"), + ERROR486(486, "超时或无响应"), ERROR401(401, "请登录后重新请求"), ERROR500(500, "系统异常"); diff --git a/src/main/resources/配置详情.yml b/src/main/resources/配置详情.yml index cef72151..167dd834 100644 --- a/src/main/resources/配置详情.yml +++ b/src/main/resources/配置详情.yml @@ -198,6 +198,8 @@ user-settings: save-position-history: false # 点播/录像回放 等待超时时间,单位:毫秒 play-timeout: 18000 + # 获取设备录像数据超时时间,单位:毫秒 + record-info-timeout: 10000 # 上级点播等待超时时间,单位:毫秒 platform-play-timeout: 60000 # 是否开启接口鉴权