适配拉流代理

pull/1642/head
648540858 2024-07-01 17:12:43 +08:00
parent e531e497c8
commit d1df17c007
3 changed files with 103 additions and 87 deletions

View File

@ -25,4 +25,6 @@ public interface IGbChannelService {
void closeSend(CommonGBChannel commonGBChannel); void closeSend(CommonGBChannel commonGBChannel);
void batchAdd(List<CommonGBChannel> commonGBChannels); void batchAdd(List<CommonGBChannel> commonGBChannels);
void updateStatus(List<CommonGBChannel> channelList);
} }

View File

@ -95,4 +95,5 @@ public interface StreamProxyMapper {
int delete(int id); int delete(int id);
void deleteByList(List<StreamProxy> streamProxiesForRemove);
} }

View File

@ -2,59 +2,51 @@ package com.genersoft.iot.vmp.streamProxy.service.impl;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper; import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
/** /**
* *
@ -301,74 +293,122 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
} }
@Override @Override
@Transactional
public void zlmServerOnline(String mediaServerId) { public void zlmServerOnline(String mediaServerId) {
MediaServer mediaServer = mediaServerService.getOne(mediaServerId); MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer == null) { if (mediaServer == null) {
return; return;
} }
// 这里主要是控制数据库/redis缓存/以及zlm中存在的代理流 三者状态一致。以数据库中数据为根本 // 这里主要是控制数据库/redis缓存/以及zlm中存在的代理流 三者状态一致。以数据库中数据为根本
redisCatchStorage.removeStream(mediaServerId, "pull");
List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true); List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true);
if (streamProxies.isEmpty()){ if (streamProxies.isEmpty()){
return; return;
} }
Map<String, StreamProxy> streamProxyMapForDb = new HashMap<>();
for (StreamProxy streamProxy : streamProxies) {
streamProxyMapForDb.put(streamProxy.getApp() + "_" + streamProxy.getStream(), streamProxy);
}
List<StreamInfo> streamInfoList = mediaServerService.getMediaList(mediaServer, null, null, null);
List<CommonGBChannel> channelListForOnline = new ArrayList<>();
for (StreamInfo streamInfo : streamInfoList) {
String key = streamInfo.getApp() + streamInfo.getStream();
StreamProxy streamProxy = streamProxyMapForDb.get(key);
if (streamProxy == null) {
// 流媒体存在,数据库中不存在
continue;
}
if (streamInfo.getOriginType() == OriginType.PULL.ordinal()
|| streamInfo.getOriginType() == OriginType.FFMPEG_PULL.ordinal() ) {
if (streamProxyMapForDb.get(key) != null) {
redisCatchStorage.addStream(mediaServer, "pull", streamInfo.getApp(), streamInfo.getStream(), streamInfo.getMediaInfo());
if (!streamProxy.getGbStatus() && streamProxy.getGbId() > 0) {
streamProxy.setGbStatus(true);
channelListForOnline.add(streamProxy.getCommonGBChannel());
}
streamProxyMapForDb.remove(key);
}
}
}
if (!channelListForOnline.isEmpty()) {
gbChannelService.online(channelListForOnline);
}
List<CommonGBChannel> channelListForOffline = new ArrayList<>();
List<StreamProxy> streamProxiesForRemove = new ArrayList<>();
if (!streamProxyMapForDb.isEmpty()) {
for (StreamProxy streamProxy : streamProxyMapForDb.values()) {
if (streamProxy.getGbStatus() && streamProxy.getGbId() > 0) {
streamProxy.setGbStatus(false);
channelListForOffline.add(streamProxy.getCommonGBChannel());
}
// 移除开启了无人观看自动移除的流
if (streamProxy.getGbDeviceId() == null && streamProxy.isEnableRemoveNoneReader()) {
streamProxiesForRemove.add(streamProxy);
streamProxyMapForDb.remove(streamProxy.getApp() + streamProxy.getStream());
}
}
}
if (!channelListForOffline.isEmpty()) {
gbChannelService.offline(channelListForOffline);
}
if (!streamProxiesForRemove.isEmpty()) {
streamProxyMapper.deleteByList(streamProxiesForRemove);
}
// 移除开启了无人观看自动移除的流 if (!streamProxyMapForDb.isEmpty()) {
// List<StreamProxy> streamProxyItemList = streamProxyMapper.selectWithAutoRemoveAndWithoutGbDeviceIdByMediaServerId(mediaServerId); for (StreamProxy streamProxy : streamProxyMapForDb.values()) {
streamProxyMapper.deleteWithAutoRemoveAndWithoutGbDeviceIdByMediaServerId(mediaServerId); log.info("恢复流代理," + streamProxy.getApp() + "/" + streamProxy.getStream());
mediaServerService.startProxy(mediaServer, streamProxy);
// 移除拉流代理生成的流信息
syncPullStream(mediaServerId);
// 恢复流代理, 只查找这个这个流媒体
List<StreamProxy> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
mediaServerId, true);
for (StreamProxy streamProxyDto : streamProxyListForEnable) {
log.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy);
WVPResult<String> wvpResult = addStreamProxyToZlm(streamProxyDto);
if (wvpResult == null) {
// 设置为离线
log.info("恢复流代理失败" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
updateStatusByAppAndStream(streamProxyDto.getApp(), streamProxyDto.getStream(), false);
}else {
updateStatusByAppAndStream(streamProxyDto.getApp(), streamProxyDto.getStream(), true);
} }
} }
} }
@Override @Override
public void zlmServerOffline(String mediaServerId) { public void zlmServerOffline(String mediaServerId) {
// 移除开启了无人观看自动移除的流 List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true);
List<StreamProxy> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
if (streamProxyItemList.size() > 0) {
gbStreamMapper.batchDel(streamProxyItemList);
}
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
// 其他的流设置离线
streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
String type = "PULL";
// 发送redis消息 // 清理redis相关的缓存
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type); redisCatchStorage.removeStream(mediaServerId, "pull");
if (mediaInfoList.size() > 0) {
for (MediaInfo mediaInfo : mediaInfoList) { if (streamProxies.isEmpty()){
JSONObject jsonObject = new JSONObject(); return;
jsonObject.put("serverId", userSetting.getServerId()); }
jsonObject.put("app", mediaInfo.getApp()); List<StreamProxy> streamProxiesForRemove = new ArrayList<>();
jsonObject.put("stream", mediaInfo.getStream()); List<StreamProxy> streamProxiesForSendMessage = new ArrayList<>();
jsonObject.put("register", false); List<CommonGBChannel> channelListForOffline = new ArrayList<>();
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject); for (StreamProxy streamProxy : streamProxies) {
// 移除redis内流的信息 if (streamProxy.getGbId() > 0 && streamProxy.getGbStatus()) {
redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream()); channelListForOffline.add(streamProxy.getCommonGBChannel());
}
if (streamProxy.getGbId() == 0 && streamProxy.isEnableRemoveNoneReader()) {
streamProxiesForRemove.add(streamProxy);
}
if (streamProxy.getGbStatus()) {
streamProxiesForSendMessage.add(streamProxy);
} }
} }
} // 移除开启了无人观看自动移除的流
streamProxyMapper.deleteByList(streamProxiesForRemove);
@Override // 修改国标关联的国标通道的状态
public void clean() { gbChannelService.offline(channelListForOffline);
if (!streamProxiesForSendMessage.isEmpty()) {
for (StreamProxy streamProxy : streamProxiesForSendMessage) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", streamProxy.getApp());
jsonObject.put("stream", streamProxy.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg("pull", jsonObject);
}
}
} }
@Override @Override
@ -385,33 +425,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return result; return result;
} }
private void syncPullStream(String mediaServerId){
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer != null) {
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PULL");
if (!mediaInfoList.isEmpty()) {
List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServer, null, null, null);
Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
if (mediaList != null && !mediaList.isEmpty()) {
for (StreamInfo streamInfo : mediaList) {
stringStreamInfoMap.put(streamInfo.getApp() + streamInfo.getStream(), streamInfo);
}
}
if (stringStreamInfoMap.isEmpty()) {
redisCatchStorage.removeStream(mediaServerId, "PULL");
}else {
for (String key : stringStreamInfoMap.keySet()) {
StreamInfo streamInfo = stringStreamInfoMap.get(key);
if (stringStreamInfoMap.get(streamInfo.getApp() + streamInfo.getStream()) == null) {
redisCatchStorage.removeStream(mediaServerId, "PULL", streamInfo.getApp(),
streamInfo.getStream());
}
}
}
}
}
}
@Override @Override
public ResourceBaseInfo getOverview() { public ResourceBaseInfo getOverview() {