优化hook订阅机制

pull/556/head^2
648540858 2022-07-28 16:18:41 +08:00
parent f84eebdb75
commit fd091e545b
12 changed files with 296 additions and 112 deletions

View File

@ -10,6 +10,9 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@ -348,25 +351,19 @@ public class SIPCommander implements ISIPCommander {
@Override @Override
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) { ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
String streamId = ssrcInfo.getStream(); String stream = ssrcInfo.getStream();
try { try {
if (device == null) { if (device == null) {
return; return;
} }
String streamMode = device.getStreamMode().toUpperCase(); String streamMode = device.getStreamMode().toUpperCase();
logger.info("{} 分配的ZLM为: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
// 添加订阅 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
JSONObject subscribeKey = new JSONObject(); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", streamId);
subscribeKey.put("regist", true);
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", mediaServerItem.getId());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
if (event != null) { if (event != null) {
event.response(mediaServerItemInUse, json); event.response(mediaServerItemInUse, json);
subscribe.removeSubscribe(hookSubscribe);
} }
}); });
// //
@ -440,7 +437,7 @@ public class SIPCommander implements ISIPCommander {
errorEvent.response(e); errorEvent.response(e);
}), e ->{ }), e ->{
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play); streamSession.put(device.getDeviceId(), channelId ,"play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog); streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog);
okEvent.response(e); okEvent.response(e);
}); });
@ -530,21 +527,14 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId(); : udpSipProvider.getNewCallId();
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId());
// 添加订阅 // 添加订阅
JSONObject subscribeKey = new JSONObject(); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", ssrcInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", mediaServerItem.getId());
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey);
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
if (hookEvent != null) { if (hookEvent != null) {
InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()); InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream());
hookEvent.call(inviteStreamInfo); hookEvent.call(inviteStreamInfo);
} }
subscribe.removeSubscribe(hookSubscribe);
}); });
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
@ -643,21 +633,15 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId(); : udpSipProvider.getNewCallId();
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
// 添加订阅 // 添加订阅
JSONObject subscribeKey = new JSONObject(); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", ssrcInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("mediaServerId", mediaServerItem.getId());
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream())); hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); subscribe.removeSubscribe(hookSubscribe);
subscribeKey.put("regist", false); hookSubscribe.getContent().put("regist", false);
subscribeKey.put("schema", "rtmp"); hookSubscribe.getContent().put("schema", "rtmp");
// 添加流注销的订阅注销了后向设备发送bye // 添加流注销的订阅注销了后向设备发送bye
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, subscribe.addSubscribe(hookSubscribe,
(MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{ (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{
ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId()); ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
if (transaction != null) { if (transaction != null) {

View File

@ -102,7 +102,7 @@ public class ZLMHttpHookListener {
logger.debug("[ ZLM HOOK ] on_server_keepalive API调用参数" + json.toString()); logger.debug("[ ZLM HOOK ] on_server_keepalive API调用参数" + json.toString());
} }
String mediaServerId = json.getString("mediaServerId"); String mediaServerId = json.getString("mediaServerId");
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive); List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
if (subscribes != null && subscribes.size() > 0) { if (subscribes != null && subscribes.size() > 0) {
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) { for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, json); subscribe.response(null, json);
@ -168,7 +168,7 @@ public class ZLMHttpHookListener {
logger.debug("[ ZLM HOOK ]on_play API调用参数" + JSON.toJSONString(param)); logger.debug("[ ZLM HOOK ]on_play API调用参数" + JSON.toJSONString(param));
} }
String mediaServerId = param.getMediaServerId(); String mediaServerId = param.getMediaServerId();
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json); ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
if (subscribe != null ) { if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) { if (mediaInfo != null) {
@ -253,7 +253,7 @@ public class ZLMHttpHookListener {
} }
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json); ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
if (subscribe != null) { if (subscribe != null) {
if (mediaInfo != null) { if (mediaInfo != null) {
subscribe.response(mediaInfo, json); subscribe.response(mediaInfo, json);
@ -377,7 +377,7 @@ public class ZLMHttpHookListener {
logger.debug("[ ZLM HOOK ]on_shell_login API调用参数" + json.toString()); logger.debug("[ ZLM HOOK ]on_shell_login API调用参数" + json.toString());
} }
String mediaServerId = json.getString("mediaServerId"); String mediaServerId = json.getString("mediaServerId");
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json); ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
if (subscribe != null ) { if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) { if (mediaInfo != null) {
@ -403,7 +403,7 @@ public class ZLMHttpHookListener {
logger.info("[ ZLM HOOK ]on_stream_changed API调用参数" + JSONObject.toJSONString(item)); logger.info("[ ZLM HOOK ]on_stream_changed API调用参数" + JSONObject.toJSONString(item));
String mediaServerId = item.getMediaServerId(); String mediaServerId = item.getMediaServerId();
JSONObject json = (JSONObject) JSON.toJSON(item); JSONObject json = (JSONObject) JSON.toJSON(item);
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json); ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
if (subscribe != null ) { if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) { if (mediaInfo != null) {
@ -614,7 +614,7 @@ public class ZLMHttpHookListener {
} }
String remoteAddr = request.getRemoteAddr(); String remoteAddr = request.getRemoteAddr();
jsonObject.put("ip", remoteAddr); jsonObject.put("ip", remoteAddr);
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started); List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
if (subscribes != null && subscribes.size() > 0) { if (subscribes != null && subscribes.size() > 0) {
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) { for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, jsonObject); subscribe.response(null, jsonObject);

View File

@ -1,12 +1,16 @@
package com.genersoft.iot.vmp.media.zlm; package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/** /**
* @description: ZLMediaServerhook * @description: ZLMediaServerhook
@ -16,49 +20,38 @@ import java.util.concurrent.ConcurrentHashMap;
@Component @Component
public class ZLMHttpHookSubscribe { public class ZLMHttpHookSubscribe {
public enum HookType{
on_flow_report,
on_http_access,
on_play,
on_publish,
on_record_mp4,
on_rtsp_auth,
on_rtsp_realm,
on_shell_login,
on_stream_changed,
on_stream_none_reader,
on_stream_not_found,
on_server_started,
on_server_keepalive
}
@FunctionalInterface @FunctionalInterface
public interface Event{ public interface Event{
void response(MediaServerItem mediaServerItem, JSONObject response); void response(MediaServerItem mediaServerItem, JSONObject response);
} }
private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>(); private Map<HookType, Map<IHookSubscribe, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) { public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) {
allSubscribes.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(hookResponse, event); if (hookSubscribe.getExpires() == null) {
// 默认5分钟过期
Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
hookSubscribe.setExpires(expiresInstant);
}
allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
} }
public ZLMHttpHookSubscribe.Event getSubscribe(HookType type, JSONObject hookResponse) { public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
ZLMHttpHookSubscribe.Event event= null; ZLMHttpHookSubscribe.Event event= null;
Map<JSONObject, Event> eventMap = allSubscribes.get(type); Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) { if (eventMap == null) {
return null; return null;
} }
for (JSONObject key : eventMap.keySet()) { for (IHookSubscribe key : eventMap.keySet()) {
Boolean result = null; Boolean result = null;
for (String s : key.keySet()) { for (String s : key.getContent().keySet()) {
if (result == null) { if (result == null) {
result = key.getString(s).equals(hookResponse.getString(s)); result = key.getContent().getString(s).equals(hookResponse.getString(s));
}else { }else {
if (key.getString(s) == null) { if (key.getContent().getString(s) == null) {
continue; continue;
} }
result = result && key.getString(s).equals(hookResponse.getString(s)); result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
} }
} }
@ -69,26 +62,30 @@ public class ZLMHttpHookSubscribe {
return event; return event;
} }
public void removeSubscribe(HookType type, JSONObject hookResponse) { public void removeSubscribe(IHookSubscribe hookSubscribe) {
Map<JSONObject, Event> eventMap = allSubscribes.get(type); Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
if (eventMap == null) { if (eventMap == null) {
return; return;
} }
Set<Map.Entry<JSONObject, Event>> entries = eventMap.entrySet(); Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
if (entries.size() > 0) { if (entries.size() > 0) {
List<Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>(); List<Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entries) { for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entries) {
JSONObject key = entry.getKey(); JSONObject content = entry.getKey().getContent();
if (content == null || content.size() == 0) {
entriesToRemove.add(entry);
continue;
}
Boolean result = null; Boolean result = null;
for (String s : key.keySet()) { for (String s : content.keySet()) {
if (result == null) { if (result == null) {
result = key.getString(s).equals(hookResponse.getString(s)); result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
}else { }else {
if (key.getString(s) == null) { if (content.getString(s) == null) {
continue; continue;
} }
result = result && key.getString(s).equals(hookResponse.getString(s)); result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
} }
} }
if (null != result && result){ if (null != result && result){
@ -97,7 +94,7 @@ public class ZLMHttpHookSubscribe {
} }
if (!CollectionUtils.isEmpty(entriesToRemove)) { if (!CollectionUtils.isEmpty(entriesToRemove)) {
for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) { for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
entries.remove(entry); entries.remove(entry);
} }
} }
@ -111,17 +108,25 @@ public class ZLMHttpHookSubscribe {
* @return * @return
*/ */
public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) { public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) {
// ZLMHttpHookSubscribe.Event event= null; Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
Map<JSONObject, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) { if (eventMap == null) {
return null; return null;
} }
List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>(); List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>();
for (JSONObject key : eventMap.keySet()) { for (IHookSubscribe key : eventMap.keySet()) {
result.add(eventMap.get(key)); result.add(eventMap.get(key));
} }
return result; return result;
} }
public List<IHookSubscribe> getAll(){
ArrayList<IHookSubscribe> result = new ArrayList<>();
Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
for (Map<IHookSubscribe, Event> value : values) {
result.addAll(value.keySet());
}
return result;
}
} }

View File

@ -6,22 +6,22 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
@Component @Component
@Order(value=1) @Order(value=1)
@ -37,18 +37,12 @@ public class ZLMRunner implements CommandLineRunner {
@Autowired @Autowired
private ZLMHttpHookSubscribe hookSubscribe; private ZLMHttpHookSubscribe hookSubscribe;
@Autowired
private IStreamProxyService streamProxyService;
@Autowired @Autowired
private EventPublisher publisher; private EventPublisher publisher;
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired @Autowired
private MediaConfig mediaConfig; private MediaConfig mediaConfig;
@ -67,17 +61,25 @@ public class ZLMRunner implements CommandLineRunner {
mediaServerService.updateToDatabase(mediaSerItem); mediaServerService.updateToDatabase(mediaSerItem);
} }
mediaServerService.syncCatchFromDatabase(); mediaServerService.syncCatchFromDatabase();
HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60));
// hookSubscribeForStreamChange.setExpires(expiresInstant);
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统 // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,new JSONObject(), hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
(MediaServerItem mediaServerItem, JSONObject response)->{ (MediaServerItem mediaServerItem, JSONObject response)->{
ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class); ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class);
if (zlmServerConfig !=null ) { if (zlmServerConfig !=null ) {
if (startGetMedia != null) { if (startGetMedia != null) {
startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId()); startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
if (startGetMedia.size() == 0) {
hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
}
} }
} }
}); });
// 获取zlm信息 // 获取zlm信息
logger.info("[zlm] 等待默认zlm中..."); logger.info("[zlm] 等待默认zlm中...");
@ -103,7 +105,6 @@ public class ZLMRunner implements CommandLineRunner {
} }
startGetMedia = null; startGetMedia = null;
} }
hookSubscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started, new JSONObject());
// TODO 清理数据库中与redis不匹配的zlm // TODO 清理数据库中与redis不匹配的zlm
}, 60 * 1000 ); }, 60 * 1000 );
} }
@ -116,6 +117,9 @@ public class ZLMRunner implements CommandLineRunner {
zlmServerConfigFirst.setIp(mediaServerItem.getIp()); zlmServerConfigFirst.setIp(mediaServerItem.getIp());
zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort()); zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort());
startGetMedia.remove(mediaServerItem.getId()); startGetMedia.remove(mediaServerItem.getId());
if (startGetMedia.size() == 0) {
hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
}
mediaServerService.zlmServerOnline(zlmServerConfigFirst); mediaServerService.zlmServerOnline(zlmServerConfigFirst);
}else { }else {
logger.info("[ {} ]-[ {}:{} ]主动连接失败, 清理相关资源, 开始尝试重试连接", logger.info("[ {} ]-[ {}:{} ]主动连接失败, 清理相关资源, 开始尝试重试连接",
@ -130,6 +134,9 @@ public class ZLMRunner implements CommandLineRunner {
zlmServerConfig.setIp(mediaServerItem.getIp()); zlmServerConfig.setIp(mediaServerItem.getIp());
zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort()); zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
startGetMedia.remove(mediaServerItem.getId()); startGetMedia.remove(mediaServerItem.getId());
if (startGetMedia.size() == 0) {
hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
}
mediaServerService.zlmServerOnline(zlmServerConfig); mediaServerService.zlmServerOnline(zlmServerConfig);
} }
}, 2000); }, 2000);

View File

@ -0,0 +1,33 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
/**
* hook
* @author lin
*/
public class HookSubscribeFactory {
public static HookSubscribeForStreamChange on_stream_changed(String app, String stream, boolean regist, String scheam, String mediaServerId) {
HookSubscribeForStreamChange hookSubscribe = new HookSubscribeForStreamChange();
JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject();
subscribeKey.put("app", app);
subscribeKey.put("stream", stream);
subscribeKey.put("regist", regist);
if (scheam != null) {
subscribeKey.put("schema", scheam);
}
subscribeKey.put("mediaServerId", mediaServerId);
hookSubscribe.setContent(subscribeKey);
return hookSubscribe;
}
public static HookSubscribeForServerStarted on_server_started() {
HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted();
hookSubscribe.setContent(new JSONObject());
return hookSubscribe;
}
}

View File

@ -0,0 +1,44 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import java.time.Instant;
/**
* hook-
* @author lin
*/
public class HookSubscribeForServerStarted implements IHookSubscribe{
private HookType hookType = HookType.on_server_started;
private JSONObject content;
@JSONField(format="yyyy-MM-dd HH:mm:ss")
private Instant expires;
@Override
public HookType getHookType() {
return hookType;
}
@Override
public JSONObject getContent() {
return content;
}
public void setContent(JSONObject content) {
this.content = content;
}
@Override
public Instant getExpires() {
return expires;
}
@Override
public void setExpires(Instant expires) {
this.expires = expires;
}
}

View File

@ -0,0 +1,43 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import java.time.Instant;
/**
* hook-
* @author lin
*/
public class HookSubscribeForStreamChange implements IHookSubscribe{
private HookType hookType = HookType.on_stream_changed;
private JSONObject content;
private Instant expires;
@Override
public HookType getHookType() {
return hookType;
}
@Override
public JSONObject getContent() {
return content;
}
public void setContent(JSONObject content) {
this.content = content;
}
@Override
public Instant getExpires() {
return expires;
}
@Override
public void setExpires(Instant expires) {
this.expires = expires;
}
}

View File

@ -0,0 +1,23 @@
package com.genersoft.iot.vmp.media.zlm.dto;
/**
* hook
* @author lin
*/
public enum HookType {
on_flow_report,
on_http_access,
on_play,
on_publish,
on_record_mp4,
on_rtsp_auth,
on_rtsp_realm,
on_shell_login,
on_stream_changed,
on_stream_none_reader,
on_stream_not_found,
on_server_started,
on_server_keepalive
}

View File

@ -0,0 +1,36 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
import java.time.Instant;
/**
* zlm hook
* @author lin
*/
public interface IHookSubscribe {
/**
* hook
* @return hook
*/
HookType getHookType();
/**
* hook
* @return hook
*/
JSONObject getContent();
/**
*
* @param instant
*/
void setExpires(Instant instant);
/**
*
* @return
*/
Instant getExpires();
}

View File

@ -13,6 +13,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@ -296,16 +299,10 @@ public class PlayServiceImpl implements IPlayService {
// 单端口模式streamId也有变化需要重新设置监听 // 单端口模式streamId也有变化需要重新设置监听
if (!mediaServerItem.isRtpEnable()) { if (!mediaServerItem.isRtpEnable()) {
// 添加订阅 // 添加订阅
JSONObject subscribeKey = new JSONObject(); HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
subscribeKey.put("app", "rtp"); subscribe.removeSubscribe(hookSubscribe);
subscribeKey.put("stream", stream); hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
subscribeKey.put("regist", true); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", mediaServerItem.getId());
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey);
subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject response)->{
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString()); logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
dynamicTask.stop(timeOutTaskKey); dynamicTask.stop(timeOutTaskKey);
// hook响应 // hook响应

View File

@ -8,6 +8,9 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.service.bean.*;
@ -270,14 +273,9 @@ public class RedisGbPlayMsgListener implements MessageListener {
}, userSetting.getPlatformPlayTimeout()); }, userSetting.getPlatformPlayTimeout());
// 添加订阅 // 添加订阅
JSONObject subscribeKey = new JSONObject(); HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtmp", mediaServerItem.getId());
subscribeKey.put("app", content.getApp());
subscribeKey.put("stream", content.getStream()); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
subscribeKey.put("regist", true);
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", mediaServerItem.getId());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
dynamicTask.stop(taskKey); dynamicTask.stop(taskKey);
responseSendItem(mediaServerItem, content, toId, serial); responseSendItem(mediaServerItem, content, toId, serial);
}); });

View File

@ -8,6 +8,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.VersionInfo;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.utils.SpringBeanFactory; import com.genersoft.iot.vmp.utils.SpringBeanFactory;
@ -38,7 +40,7 @@ import java.util.Set;
public class ServerController { public class ServerController {
@Autowired @Autowired
private ConfigurableApplicationContext context; private ZLMHttpHookSubscribe zlmHttpHookSubscribe;
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@ -254,6 +256,18 @@ public class ServerController {
return result; return result;
} }
@ApiOperation("获取当前所有hook")
@GetMapping(value = "/hooks")
@ResponseBody
public WVPResult<List<IHookSubscribe>> getHooks(){
WVPResult<List<IHookSubscribe>> result = new WVPResult<>();
result.setCode(0);
result.setMsg("success");
List<IHookSubscribe> all = zlmHttpHookSubscribe.getAll();
result.setData(all);
return result;
}
// @ApiOperation("当前进行中的动态任务") // @ApiOperation("当前进行中的动态任务")
// @GetMapping(value = "/dynamicTask") // @GetMapping(value = "/dynamicTask")
// @ResponseBody // @ResponseBody