优化部分hook订阅

pull/556/head
648540858 2022-07-27 16:16:12 +08:00
parent d891b89c4b
commit f84eebdb75
4 changed files with 14 additions and 16 deletions

View File

@ -108,6 +108,7 @@ public class ZLMHttpHookListener {
subscribe.response(null, json); subscribe.response(null, json);
} }
} }
mediaServerService.updateMediaServerKeepalive(mediaServerId, json.getJSONObject("data"));
JSONObject ret = new JSONObject(); JSONObject ret = new JSONObject();
ret.put("code", 0); ret.put("code", 0);
@ -619,10 +620,15 @@ public class ZLMHttpHookListener {
subscribe.response(null, jsonObject); subscribe.response(null, jsonObject);
} }
} }
ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(jsonObject, ZLMServerConfig.class);
if (zlmServerConfig !=null ) {
mediaServerService.zlmServerOnline(zlmServerConfig);
}
JSONObject ret = new JSONObject(); JSONObject ret = new JSONObject();
ret.put("code", 0); ret.put("code", 0);
ret.put("msg", "success"); ret.put("msg", "success");
return new ResponseEntity<String>(ret.toString(),HttpStatus.OK); return new ResponseEntity<>(ret.toString(),HttpStatus.OK);
} }
private Map<String, String> urlParamToMap(String params) { private Map<String, String> urlParamToMap(String params) {

View File

@ -75,19 +75,9 @@ public class ZLMRunner implements CommandLineRunner {
if (startGetMedia != null) { if (startGetMedia != null) {
startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId()); startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
} }
mediaServerService.zlmServerOnline(zlmServerConfig);
} }
}); });
// 订阅 zlm保活事件, 当zlm离线时做业务的处理
hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_keepalive,new JSONObject(),
(MediaServerItem mediaServerItem, JSONObject response)->{
String mediaServerId = response.getString("mediaServerId");
if (mediaServerId !=null ) {
mediaServerService.updateMediaServerKeepalive(mediaServerId, response.getJSONObject("data"));
}
});
// 获取zlm信息 // 获取zlm信息
logger.info("[zlm] 等待默认zlm中..."); logger.info("[zlm] 等待默认zlm中...");
@ -113,6 +103,7 @@ public class ZLMRunner implements CommandLineRunner {
} }
startGetMedia = null; startGetMedia = null;
} }
hookSubscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started, new JSONObject());
// TODO 清理数据库中与redis不匹配的zlm // TODO 清理数据库中与redis不匹配的zlm
}, 60 * 1000 ); }, 60 * 1000 );
} }

View File

@ -355,14 +355,15 @@ public class MediaServerServiceImpl implements IMediaServerService {
*/ */
@Override @Override
public void zlmServerOnline(ZLMServerConfig zlmServerConfig) { public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
logger.info("[ZLM] 正在连接 : {} -> {}:{}",
zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()); MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId());
if (serverItem == null) { if (serverItem == null) {
logger.warn("[未注册的zlm] 拒接接入:{}来自{}{}", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() ); logger.warn("[未注册的zlm] 拒接接入:{}来自{}{}", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() );
logger.warn("请检查ZLM的<general.mediaServerId>配置是否与WVP的<media.id>一致"); logger.warn("请检查ZLM的<general.mediaServerId>配置是否与WVP的<media.id>一致");
return; return;
}else {
logger.info("[ZLM] 正在连接 : {} -> {}:{}",
zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
} }
serverItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval()); serverItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval());
if (serverItem.getHttpPort() == 0) { if (serverItem.getHttpPort() == 0) {

View File

@ -66,7 +66,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
@Override @Override
public void onMessage(Message message, byte[] bytes) { public void onMessage(Message message, byte[] bytes) {
// TODO 增加队列 // TODO 增加队列
logger.warn("[REDIS 消息-推流设备状态变化] {}", new String(message.getBody())); logger.warn("[REDIS消息-推流设备状态变化] {}", new String(message.getBody()));
taskQueue.offer(message); taskQueue.offer(message);
if (!taskQueueHandlerRun) { if (!taskQueueHandlerRun) {
@ -76,7 +76,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
Message msg = taskQueue.poll(); Message msg = taskQueue.poll();
PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
if (statusChangeFromPushStream == null) { if (statusChangeFromPushStream == null) {
logger.warn("[REDIS 消息]推流设备状态变化消息解析失败"); logger.warn("[REDIS消息]推流设备状态变化消息解析失败");
return; return;
} }
// 取消定时任务 // 取消定时任务
@ -106,7 +106,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
// 启动时设置所有推流通道离线,发起查询请求 // 启动时设置所有推流通道离线,发起查询请求
redisCatchStorage.sendStreamPushRequestedMsgForStatus(); redisCatchStorage.sendStreamPushRequestedMsgForStatus();
dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{ dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{
logger.info("[REDIS 消息]未收到redis回复推流设备状态执行推流设备离线"); logger.info("[REDIS消息]未收到redis回复推流设备状态执行推流设备离线");
// 五秒收不到请求就设置通道离线,然后通知上级离线 // 五秒收不到请求就设置通道离线,然后通知上级离线
streamPushService.allStreamOffline(); streamPushService.allStreamOffline();
}, 5000); }, 5000);