From 0f0be7d7c77d4382961ca0f48ee7d290efca0c3b Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: Fri, 29 Nov 2024 15:38:55 +0800
Subject: [PATCH] =?UTF-8?q?[=E5=BD=95=E5=88=B6=E8=AE=A1=E5=88=92]=20?=
=?UTF-8?q?=E5=AE=8C=E5=96=84=E5=92=8C=E4=BC=98=E5=8C=96=E5=BD=95=E5=88=B6?=
=?UTF-8?q?=E8=AE=A1=E5=88=92=E6=89=A7=E8=A1=8C?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../gb28181/dao/CommonGBChannelMapper.java | 48 -------
.../iot/vmp/service/IRecordPlanService.java | 2 +-
.../vmp/service/impl/MediaServiceImpl.java | 2 +-
.../service/impl/RecordPlanServiceImpl.java | 132 ++++++++++++------
.../vmp/storager/dao/RecordPlanMapper.java | 7 +-
5 files changed, 93 insertions(+), 98 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java
index d14ba09c..64f7dad8 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java
@@ -550,52 +550,4 @@ public interface CommonGBChannelMapper {
@Param("channelType") Integer channelType, @Param("online") Boolean online,
@Param("hasLink") Boolean hasLink);
- @Select("")
- List queryForRecordPlan(List planIdList);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IRecordPlanService.java b/src/main/java/com/genersoft/iot/vmp/service/IRecordPlanService.java
index abec8e0c..f3b34912 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IRecordPlanService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IRecordPlanService.java
@@ -27,5 +27,5 @@ public interface IRecordPlanService {
void cleanAll(Integer planId);
- boolean recording(String app, String stream);
+ Integer recording(String app, String stream);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
index 0959c784..fcb7570f 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
@@ -209,7 +209,7 @@ public class MediaServiceImpl implements IMediaService {
@Override
public boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema) {
boolean result = false;
- if (recordPlanService.recording(app, stream)) {
+ if (recordPlanService.recording(app, stream) != null) {
return false;
}
// 国标类型的流
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java
index 4bcc945a..ac146e86 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java
@@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
+import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
@@ -27,6 +28,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
+import java.util.concurrent.TimeUnit;
@Service
@Slf4j
@@ -53,63 +55,94 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
// 流断开,检查是否还处于录像状态, 如果是则继续录像
- if (recording(event.getApp(), event.getStream())) {
- // 重新拉起
-
+ Integer channelId = recording(event.getApp(), event.getStream());
+ if(channelId == null) {
+ return;
}
+ // 重新拉起
+ CommonGBChannel channel = channelMapper.queryById(channelId);
+ if (channel == null) {
+ log.warn("[录制计划] 流离开时拉起需要录像的流时, 发现通道不存在, id: {}", channelId);
+ return;
+ }
+ // 开启点播,
+ channelPlayService.play(channel, null, ((code, msg, streamInfo) -> {
+ if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) {
+ log.info("[录像] 流离开时拉起需要录像的流, 开启成功, 通道ID: {}", channel.getGbId());
+ recordStreamMap.put(channel.getGbId(), streamInfo);
+ } else {
+ recordStreamMap.remove(channelId);
+ log.info("[录像] 流离开时拉起需要录像的流, 开启失败, 十分钟后重试, 通道ID: {}", channel.getGbId());
+ }
+ }));
}
Map recordStreamMap = new HashMap<>();
- @Scheduled(cron = "0 */30 * * * *")
+// @Scheduled(cron = "0 */30 * * * *")
+ @Scheduled(fixedRate = 10, timeUnit = TimeUnit.MINUTES)
public void execution() {
- // 执行计划
+ log.info("[录制计划] 执行");
+ // 查询现在需要录像的通道Id
+ List startChannelIdList = queryCurrentChannelRecord();
- // 获取当前时间在一周内的序号
- LocalDateTime now = LocalDateTime.now();
- int week = now.getDayOfWeek().getValue();
- int index = now.getHour() * 2 + (now.getMinute() > 30?1:0);
- // 查询startTime等于现在的, 开始录像
- List startPlanList = recordPlanMapper.queryStart(week, index);
-
- Map channelMapWithoutRecord = new HashMap<>();
- if (startPlanList.isEmpty()) {
- // 停止所有正在录像的
- if(recordStreamMap.isEmpty()) {
- // 暂无录像任务
- return;
- }else {
- channelMapWithoutRecord.putAll(recordStreamMap);
+ if (startChannelIdList.isEmpty()) {
+ // 当前没有录像任务, 如果存在旧的正在录像的就移除
+ if(!recordStreamMap.isEmpty()) {
+ stopStreams(recordStreamMap.keySet(), recordStreamMap);
recordStreamMap.clear();
}
}else {
- channelMapWithoutRecord.putAll(recordStreamMap);
- // 获取所有的关联的通道
- List channelList = channelMapper.queryForRecordPlan(startPlanList);
- if (channelList.isEmpty()) {
- recordStreamMap.clear();
- }else {
- // 查找是否已经开启录像, 如果没有则开启录像
- for (CommonGBChannel channel : channelList) {
- if (recordStreamMap.get(channel.getGbId()) != null) {
- channelMapWithoutRecord.remove(channel.getGbId());
- }else {
+ // 当前存在录像任务, 获取正在录像中存在但是当前录制列表不存在的内容,进行停止; 获取正在录像中没有但是当前需录制的列表中存在的进行开启.
+ Set recordStreamSet = new HashSet<>(recordStreamMap.keySet());
+ startChannelIdList.forEach(recordStreamSet::remove);
+ if (!recordStreamSet.isEmpty()) {
+ // 正在录像中存在但是当前录制列表不存在的内容,进行停止;
+ stopStreams(recordStreamSet, recordStreamMap);
+ }
+
+ // 移除startChannelIdList中已经在录像的部分, 剩下的都是需要新添加的(正在录像中没有但是当前需录制的列表中存在的进行开启)
+ recordStreamMap.keySet().forEach(startChannelIdList::remove);
+ if (!startChannelIdList.isEmpty()) {
+ // 获取所有的关联的通道
+ List channelList = channelMapper.queryByIds(startChannelIdList);
+ if (!channelList.isEmpty()) {
+ // 查找是否已经开启录像, 如果没有则开启录像
+ for (CommonGBChannel channel : channelList) {
// 开启点播,
channelPlayService.play(channel, null, ((code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) {
log.info("[录像] 开启成功, 通道ID: {}", channel.getGbId());
recordStreamMap.put(channel.getGbId(), streamInfo);
- channelMapWithoutRecord.remove(channel.getGbId(), streamInfo);
+ } else {
+ log.info("[录像] 开启失败, 十分钟后重试, 通道ID: {}", channel.getGbId());
}
}));
}
+ } else {
+ log.error("[录制计划] 数据异常, 这些关联的通道已经不存在了: {}", Joiner.on(",").join(startChannelIdList));
}
}
}
- // 结束录像
- if(!channelMapWithoutRecord.isEmpty()) {
- for (Integer channelId : channelMapWithoutRecord.keySet()) {
- StreamInfo streamInfo = channelMapWithoutRecord.get(channelId);
+ }
+
+ /**
+ * 获取当前时间段应该录像的通道Id列表
+ */
+ private List queryCurrentChannelRecord(){
+ // 获取当前时间在一周内的序号, 数据库存储的从第几个30分钟开始, 0-47, 包括首尾
+ LocalDateTime now = LocalDateTime.now();
+ int week = now.getDayOfWeek().getValue();
+ int index = now.getHour() * 2 + (now.getMinute() > 30?1:0);
+
+ // 查询现在需要录像的通道Id
+ return recordPlanMapper.queryRecordIng(week, index);
+ }
+
+ private void stopStreams(Collection channelIds, Map recordStreamMap) {
+ for (Integer channelId : channelIds) {
+ try {
+ StreamInfo streamInfo = recordStreamMap.get(channelId);
if (streamInfo == null) {
continue;
}
@@ -117,23 +150,25 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
MediaInfo mediaInfo = mediaServerService.getMediaInfo(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
if (mediaInfo.getReaderCount() == null || mediaInfo.getReaderCount() == 0) {
mediaServerService.closeStreams(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
- log.info("[录像] 停止, 通道ID: {}", channelId);
+ log.info("[录制计划] 停止, 通道ID: {}", channelId);
}
+ }catch (Exception e) {
+ log.error("[录制计划] 停止时异常", e);
+ }finally {
+ recordStreamMap.remove(channelId);
}
}
}
- // 系统启动时
-
-
@Override
- public boolean recording(String app, String stream) {
- for (StreamInfo streamInfo : recordStreamMap.values()) {
- if (streamInfo.getApp().equals(app) && streamInfo.getStream().equals(stream)) {
- return true;
+ public Integer recording(String app, String stream) {
+ for (Integer channelId : recordStreamMap.keySet()) {
+ StreamInfo streamInfo = recordStreamMap.get(channelId);
+ if (streamInfo != null && streamInfo.getApp().equals(app) && streamInfo.getStream().equals(stream)) {
+ return channelId;
}
}
- return false;
+ return null;
}
@Override
@@ -226,7 +261,12 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
}else {
channelMapper.addRecordPlan(channelIds, planId);
}
- // TODO 更新录像队列
+ // 查看当前的待录制列表是否变化,如果变化,则调用录制计划马上开始录制
+ List currentChannelRecord = queryCurrentChannelRecord();
+ recordStreamMap.keySet().forEach(currentChannelRecord::remove);
+ if (!currentChannelRecord.isEmpty()) {
+ execution();
+ }
}
@Override
diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/RecordPlanMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/RecordPlanMapper.java
index e9304df0..ae0649aa 100644
--- a/src/main/java/com/genersoft/iot/vmp/storager/dao/RecordPlanMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/RecordPlanMapper.java
@@ -59,6 +59,9 @@ public interface RecordPlanMapper {
@Delete("DELETE FROM wvp_record_plan_item WHERE plan_id = #{planId}")
void cleanItems(@Param("planId") Integer planId);
- @Select("select plan_id from wvp_record_plan_item where week_day = #{week} and start >= #{index} and stop <= #{index} group by plan_id")
- List queryStart(@Param("week") int week, @Param("index") int index);
+ @Select(" ")
+ List queryRecordIng(@Param("week") int week, @Param("index") int index);
}