添加重启后拉流代理自动恢复

pull/324/head
648540858 2022-01-27 18:08:19 +08:00
parent 66cadafd1c
commit f33c3a3630
12 changed files with 90 additions and 16 deletions

View File

@ -444,6 +444,7 @@ CREATE TABLE `stream_proxy` (
`enable_hls` bit(1) DEFAULT NULL, `enable_hls` bit(1) DEFAULT NULL,
`enable_mp4` bit(1) DEFAULT NULL, `enable_mp4` bit(1) DEFAULT NULL,
`enable` bit(1) NOT NULL, `enable` bit(1) NOT NULL,
`status` bit(1) NOT NULL,
`enable_remove_none_reader` bit(1) NOT NULL, `enable_remove_none_reader` bit(1) NOT NULL,
`createTime` varchar(50) NOT NULL, `createTime` varchar(50) NOT NULL,
PRIMARY KEY (`app`,`stream`) PRIMARY KEY (`app`,`stream`)

1
sql/update.sql Normal file
View File

@ -0,0 +1 @@
ALTER TABLE stream_proxy ADD status bit(1) not null;

View File

@ -332,6 +332,11 @@ public class ZLMHttpHookListener {
}else { }else {
mediaServerService.removeCount(mediaServerId); mediaServerService.removeCount(mediaServerId);
} }
if (item.getOriginType() == OriginType.PULL.ordinal()
|| item.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) {
// 设置拉流代理上线/离线
streamProxyService.updateStatus(regist, app, streamId);
}
if ("rtp".equals(app) && !regist ) { if ("rtp".equals(app) && !regist ) {
StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
if (streamInfo!=null){ if (streamInfo!=null){
@ -355,6 +360,7 @@ public class ZLMHttpHookListener {
|| item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
streamPushItem = zlmMediaListManager.addPush(item); streamPushItem = zlmMediaListManager.addPush(item);
} }
List<GbStream> gbStreams = new ArrayList<>(); List<GbStream> gbStreams = new ArrayList<>();
if (streamPushItem == null || streamPushItem.getGbId() == null) { if (streamPushItem == null || streamPushItem.getGbId() == null) {
GbStream gbStream = storager.getGbStream(app, streamId); GbStream gbStream = storager.getGbStream(app, streamId);

View File

@ -151,4 +151,5 @@ public class StreamProxyItem extends GbStream {
public void setEnable_remove_none_reader(boolean enable_remove_none_reader) { public void setEnable_remove_none_reader(boolean enable_remove_none_reader) {
this.enable_remove_none_reader = enable_remove_none_reader; this.enable_remove_none_reader = enable_remove_none_reader;
} }
} }

View File

@ -55,6 +55,16 @@ public interface IStreamProxyService {
*/ */
boolean start(String app, String stream); boolean start(String app, String stream);
/**
*
* @param status
* @param app
* @param stream
*/
int updateStatus(boolean status, String app, String stream);
/** /**
* *
* @param app * @param app

View File

@ -14,8 +14,11 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
import com.genersoft.iot.vmp.utils.redis.JedisUtil; import com.genersoft.iot.vmp.utils.redis.JedisUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil;
@ -70,6 +73,12 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
@Autowired @Autowired
private RedisUtil redisUtil; private RedisUtil redisUtil;
@Autowired
private IVideoManagerStorager storager;
@Autowired
private IStreamProxyService streamProxyService;
@Autowired @Autowired
private EventPublisher publisher; private EventPublisher publisher;
@ -231,6 +240,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
public List<MediaServerItem> getAllOnline() { public List<MediaServerItem> getAllOnline() {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(); String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId();
Set<String> mediaServerIdSet = redisUtil.zRevRange(key, 0, -1); Set<String> mediaServerIdSet = redisUtil.zRevRange(key, 0, -1);
List<MediaServerItem> result = new ArrayList<>(); List<MediaServerItem> result = new ArrayList<>();
if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) { if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) {
for (String mediaServerId : mediaServerIdSet) { for (String mediaServerId : mediaServerIdSet) {
@ -238,6 +248,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
result.add((MediaServerItem) redisUtil.get(serverKey)); result.add((MediaServerItem) redisUtil.get(serverKey));
} }
} }
Collections.reverse(result);
return result; return result;
} }
@ -374,6 +385,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
resetOnlineServerItem(serverItem); resetOnlineServerItem(serverItem);
updateMediaServerKeepalive(serverItem.getId(), null); updateMediaServerKeepalive(serverItem.getId(), null);
setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable())); setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
publisher.zlmOnlineEventPublish(serverItem.getId()); publisher.zlmOnlineEventPublish(serverItem.getId());
} }

View File

@ -58,6 +58,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
@Autowired
private IVideoManagerStorager storager;
@Autowired @Autowired
private UserSetup userSetup; private UserSetup userSetup;
@ -278,7 +281,27 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override @Override
public void zlmServerOnline(String mediaServerId) { public void zlmServerOnline(String mediaServerId) {
zlmServerOffline(mediaServerId); // 移除开启了无人观看自动移除的流
List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId);
if (streamProxyItemList.size() > 0) {
gbStreamMapper.batchDel(streamProxyItemList);
}
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
// 恢复流代理, 只查找这个这个流媒体
List<StreamProxyItem> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
mediaServerId, true, false);
for (StreamProxyItem streamProxyDto : streamProxyListForEnable) {
logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto);
if (jsonObject == null) {
// 设置为离线
logger.info("恢复流代理失败" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
updateStatus(false, streamProxyDto.getApp(), streamProxyDto.getStream());
}else {
updateStatus(true, streamProxyDto.getApp(), streamProxyDto.getStream());
}
}
} }
@Override @Override
@ -289,8 +312,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
gbStreamMapper.batchDel(streamProxyItemList); gbStreamMapper.batchDel(streamProxyItemList);
} }
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
// 其他的流设置未启用 // 其他的流设置离线
streamProxyMapper.updateStatus(false, mediaServerId); streamProxyMapper.updateStatusByMediaServerId(false, mediaServerId);
String type = "PULL"; String type = "PULL";
// 发送redis消息 // 发送redis消息
@ -314,4 +337,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
public void clean() { public void clean() {
} }
@Override
public int updateStatus(boolean status, String app, String stream) {
return streamProxyMapper.updateStatus(status, app, stream);
}
} }

View File

@ -398,10 +398,11 @@ public interface IVideoManagerStorager {
/** /**
* ID/ * ID/
* @param id ID * @param id ID
* @param b / * @param enable /
* @param status
* @return * @return
*/ */
List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean b); List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable, boolean status);
/** /**
* ID * ID

View File

@ -11,9 +11,9 @@ import java.util.List;
public interface StreamProxyMapper { public interface StreamProxyMapper {
@Insert("INSERT INTO stream_proxy (type, app, stream,mediaServerId, url, src_url, dst_url, " + @Insert("INSERT INTO stream_proxy (type, app, stream,mediaServerId, url, src_url, dst_url, " +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, enable_remove_none_reader, createTime) VALUES" + "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, status, enable_remove_none_reader, createTime) VALUES" +
"('${type}','${app}', '${stream}', '${mediaServerId}','${url}', '${src_url}', '${dst_url}', " + "('${type}','${app}', '${stream}', '${mediaServerId}','${url}', '${src_url}', '${dst_url}', " +
"'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, " + "'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, ${status}, " +
"${enable_remove_none_reader}, '${createTime}' )") "${enable_remove_none_reader}, '${createTime}' )")
int add(StreamProxyItem streamProxyDto); int add(StreamProxyItem streamProxyDto);
@ -30,6 +30,7 @@ public interface StreamProxyMapper {
"rtp_type=#{rtp_type}, " + "rtp_type=#{rtp_type}, " +
"enable_hls=#{enable_hls}, " + "enable_hls=#{enable_hls}, " +
"enable=#{enable}, " + "enable=#{enable}, " +
"status=#{status}, " +
"enable_remove_none_reader=#{enable_remove_none_reader}, " + "enable_remove_none_reader=#{enable_remove_none_reader}, " +
"enable_mp4=#{enable_mp4} " + "enable_mp4=#{enable_mp4} " +
"WHERE app=#{app} AND stream=#{stream}") "WHERE app=#{app} AND stream=#{stream}")
@ -49,8 +50,8 @@ public interface StreamProxyMapper {
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " + @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " +
"LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
"WHERE st.enable=${enable} and st.mediaServerId = '${id}' order by st.createTime desc") "WHERE st.enable=${enable} and st.status=${status} and st.mediaServerId = '${id}' order by st.createTime desc")
List<StreamProxyItem> selectForEnableInMediaServer(String id, boolean enable); List<StreamProxyItem> selectForEnableInMediaServer(String id, boolean enable, boolean status);
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " + @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " +
"LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
@ -58,9 +59,14 @@ public interface StreamProxyMapper {
List<StreamProxyItem> selectInMediaServer(String id); List<StreamProxyItem> selectInMediaServer(String id);
@Update("UPDATE stream_proxy " + @Update("UPDATE stream_proxy " +
"SET enable=#{status} " + "SET status=#{status} " +
"WHERE mediaServerId=#{mediaServerId}") "WHERE mediaServerId=#{mediaServerId}")
void updateStatus(boolean status, String mediaServerId); void updateStatusByMediaServerId(boolean status, String mediaServerId);
@Update("UPDATE stream_proxy " +
"SET status=${status} " +
"WHERE app=#{app} AND stream=#{stream}")
int updateStatus(boolean status, String app, String stream);
@Delete("DELETE FROM stream_proxy WHERE enable_remove_none_reader=true AND mediaServerId=#{mediaServerId}") @Delete("DELETE FROM stream_proxy WHERE enable_remove_none_reader=true AND mediaServerId=#{mediaServerId}")
void deleteAutoRemoveItemByMediaServerId(String mediaServerId); void deleteAutoRemoveItemByMediaServerId(String mediaServerId);

View File

@ -860,8 +860,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
} }
@Override @Override
public List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable) { public List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable, boolean status) {
return streamProxyMapper.selectForEnableInMediaServer(id, enable); return streamProxyMapper.selectForEnableInMediaServer(id, enable, status);
} }

View File

@ -42,6 +42,14 @@
</el-table-column> </el-table-column>
<el-table-column prop="gbId" label="国标编码" width="180" align="center" show-overflow-tooltip/> <el-table-column prop="gbId" label="国标编码" width="180" align="center" show-overflow-tooltip/>
<el-table-column label="状态" width="120" align="center">
<template slot-scope="scope">
<div slot="reference" class="name-wrapper">
<el-tag size="medium" v-if="scope.row.status">线</el-tag>
<el-tag size="medium" type="info" v-if="!scope.row.status">线</el-tag>
</div>
</template>
</el-table-column>
<el-table-column label="启用" width="120" align="center"> <el-table-column label="启用" width="120" align="center">
<template slot-scope="scope"> <template slot-scope="scope">
<div slot="reference" class="name-wrapper"> <div slot="reference" class="name-wrapper">

View File

@ -46,7 +46,6 @@
style="width: 100%" style="width: 100%"
placeholder="请选择拉流节点" placeholder="请选择拉流节点"
> >
<el-option label="自动选择" value="auto"></el-option>
<el-option <el-option
v-for="item in mediaServerList" v-for="item in mediaServerList"
:key="item.id" :key="item.id"
@ -172,7 +171,7 @@ export default {
enable_mp4: false, enable_mp4: false,
enable_remove_none_reader: false, enable_remove_none_reader: false,
platformGbId: null, platformGbId: null,
mediaServerId: "auto", mediaServerId: null,
}, },
mediaServerList:{}, mediaServerList:{},
ffmpegCmdList:{}, ffmpegCmdList:{},
@ -206,7 +205,8 @@ export default {
console.log(error); console.log(error);
}); });
this.mediaServer.getOnlineMediaServerList((data)=>{ this.mediaServer.getOnlineMediaServerList((data)=>{
this.mediaServerList = data; this.mediaServerList = data.data;
this.proxyParam.mediaServerId = this.mediaServerList[0].id
}) })
}, },
mediaServerIdChange:function (){ mediaServerIdChange:function (){