优化zlm接入是拉流代理的逻辑
parent
b38e5508f6
commit
8a42f15d34
|
@ -48,8 +48,8 @@ public interface StreamProxyMapper {
|
||||||
@SelectProvider(type = StreamProxyProvider.class, method = "selectOneByAppAndStream")
|
@SelectProvider(type = StreamProxyProvider.class, method = "selectOneByAppAndStream")
|
||||||
StreamProxy selectOneByAppAndStream(@Param("app") String app, @Param("stream") String stream);
|
StreamProxy selectOneByAppAndStream(@Param("app") String app, @Param("stream") String stream);
|
||||||
|
|
||||||
@SelectProvider(type = StreamProxyProvider.class, method = "selectForEnableInMediaServer")
|
@SelectProvider(type = StreamProxyProvider.class, method = "selectForPushingInMediaServer")
|
||||||
List<StreamProxy> selectForEnableInMediaServer(@Param("mediaServerId") String mediaServerId, @Param("enable") boolean enable);
|
List<StreamProxy> selectForPushingInMediaServer(@Param("mediaServerId") String mediaServerId, @Param("enable") boolean enable);
|
||||||
|
|
||||||
|
|
||||||
@Select("select count(1) from wvp_stream_proxy")
|
@Select("select count(1) from wvp_stream_proxy")
|
||||||
|
|
|
@ -19,9 +19,8 @@ public class StreamProxyProvider {
|
||||||
return getBaseSelectSql() + " WHERE st.id = " + params.get("id");
|
return getBaseSelectSql() + " WHERE st.id = " + params.get("id");
|
||||||
}
|
}
|
||||||
|
|
||||||
public String selectForEnableInMediaServer(Map<String, Object> params ){
|
public String selectForPushingInMediaServer(Map<String, Object> params ){
|
||||||
return getBaseSelectSql() + String.format(" WHERE st.enable=%s and st.media_server_id= '%s' order by st.create_time desc",
|
return getBaseSelectSql() + " WHERE st.pulling=1 and st.media_server_id=#{mediaServerId} order by st.create_time desc";
|
||||||
params.get("enable"), params.get("mediaServerId"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String selectOneByAppAndStream(Map<String, Object> params ){
|
public String selectOneByAppAndStream(Map<String, Object> params ){
|
||||||
|
|
|
@ -296,7 +296,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
// 这里主要是控制数据库/redis缓存/以及zlm中存在的代理流 三者状态一致。以数据库中数据为根本
|
// 这里主要是控制数据库/redis缓存/以及zlm中存在的代理流 三者状态一致。以数据库中数据为根本
|
||||||
redisCatchStorage.removeStream(mediaServer.getId(), "PULL");
|
redisCatchStorage.removeStream(mediaServer.getId(), "PULL");
|
||||||
|
|
||||||
List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServer.getId(), true);
|
List<StreamProxy> streamProxies = streamProxyMapper.selectForPushingInMediaServer(mediaServer.getId(), true);
|
||||||
if (streamProxies.isEmpty()) {
|
if (streamProxies.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -353,18 +353,16 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
streamProxyMapper.deleteByList(streamProxiesForRemove);
|
streamProxyMapper.deleteByList(streamProxiesForRemove);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (!streamProxyMapForDb.isEmpty()) {
|
if (!streamProxyMapForDb.isEmpty()) {
|
||||||
for (StreamProxy streamProxy : streamProxyMapForDb.values()) {
|
for (StreamProxy streamProxy : streamProxyMapForDb.values()) {
|
||||||
log.info("恢复流代理," + streamProxy.getApp() + "/" + streamProxy.getStream());
|
streamProxyMapper.offline(streamProxy.getId());
|
||||||
mediaServerService.startProxy(mediaServer, streamProxy);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void zlmServerOffline(MediaServer mediaServer) {
|
public void zlmServerOffline(MediaServer mediaServer) {
|
||||||
List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServer.getId(), true);
|
List<StreamProxy> streamProxies = streamProxyMapper.selectForPushingInMediaServer(mediaServer.getId(), true);
|
||||||
|
|
||||||
// 清理redis相关的缓存
|
// 清理redis相关的缓存
|
||||||
redisCatchStorage.removeStream(mediaServer.getId(), "PULL");
|
redisCatchStorage.removeStream(mediaServer.getId(), "PULL");
|
||||||
|
|
Loading…
Reference in New Issue