支持推流和拉流代理通道状态变化发送通知

pull/829/head
648540858 2023-04-23 15:54:34 +08:00
parent 269ad8cedb
commit b944f8867c
5 changed files with 93 additions and 68 deletions

View File

@ -1,14 +1,9 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -16,12 +11,14 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.SipException; import javax.sip.SipException;
import java.text.ParseException; import java.text.ParseException;
import java.util.*; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** /**
* catalog * catalog
@ -43,6 +40,9 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
@Autowired @Autowired
private SubscribeHolder subscribeHolder; private SubscribeHolder subscribeHolder;
@Autowired
private UserSetting userSetting;
@Override @Override
public void onApplicationEvent(CatalogEvent event) { public void onApplicationEvent(CatalogEvent event) {
SubscribeInfo subscribe = null; SubscribeInfo subscribe = null;
@ -93,6 +93,9 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
} }
if (event.getGbStreams() != null && event.getGbStreams().size() > 0){ if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
for (GbStream gbStream : event.getGbStreams()) { for (GbStream gbStream : event.getGbStreams()) {
if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
continue;
}
DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform); DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform);
deviceChannelList.add(deviceChannelByStream); deviceChannelList.add(deviceChannelByStream);
} }

View File

@ -118,6 +118,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 离线 // 离线
logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
if (userSetting.getRefuseChannelStatusChannelFormNotify()) { if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
}else {
updateChannelOfflineList.add(channel); updateChannelOfflineList.add(channel);
if (updateChannelOfflineList.size() > 300) { if (updateChannelOfflineList.size() > 300) {
executeSaveForOffline(); executeSaveForOffline();
@ -126,14 +128,14 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
} }
}else {
logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
} }
break; break;
case CatalogEvent.VLOST: case CatalogEvent.VLOST:
// 视频丢失 // 视频丢失
logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
if (userSetting.getRefuseChannelStatusChannelFormNotify()) { if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
}else {
updateChannelOfflineList.add(channel); updateChannelOfflineList.add(channel);
if (updateChannelOfflineList.size() > 300) { if (updateChannelOfflineList.size() > 300) {
executeSaveForOffline(); executeSaveForOffline();
@ -142,14 +144,14 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
} }
}else {
logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
} }
break; break;
case CatalogEvent.DEFECT: case CatalogEvent.DEFECT:
// 故障 // 故障
logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
if (userSetting.getRefuseChannelStatusChannelFormNotify()) { if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
}else {
updateChannelOfflineList.add(channel); updateChannelOfflineList.add(channel);
if (updateChannelOfflineList.size() > 300) { if (updateChannelOfflineList.size() > 300) {
executeSaveForOffline(); executeSaveForOffline();
@ -158,8 +160,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
} }
}else {
logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
} }
break; break;
case CatalogEvent.ADD: case CatalogEvent.ADD:

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
@ -292,22 +293,24 @@ public class ZLMHttpHookListener {
JSONObject json = (JSONObject) JSON.toJSON(param); JSONObject json = (JSONObject) JSON.toJSON(param);
taskExecutor.execute(() -> { taskExecutor.execute(() -> {
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
if (mediaInfo == null) {
logger.info("[ZLM HOOK] 流变化未找到ZLM, {}", param.getMediaServerId());
return;
}
if (subscribe != null) { if (subscribe != null) {
MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); subscribe.response(mediaInfo, json);
if (mediaInfo != null) {
subscribe.response(mediaInfo, json);
}
} }
List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks(); List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
// TODO 重构此处逻辑 // TODO 重构此处逻辑
boolean isPush = false;
if (param.isRegist()) { if (param.isRegist()) {
// 处理流注册的鉴权信息 // 处理流注册的鉴权信息
if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
isPush = true;
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
if (streamAuthorityInfo == null) { if (streamAuthorityInfo == null) {
streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
@ -329,7 +332,10 @@ public class ZLMHttpHookListener {
mediaServerService.removeCount(param.getMediaServerId()); mediaServerService.removeCount(param.getMediaServerId());
} }
// 设置拉流代理上线/离线 // 设置拉流代理上线/离线
streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream());
if (updateStatusResult > 0) {
}
if ("rtp".equals(param.getApp()) && !param.isRegist()) { if ("rtp".equals(param.getApp()) && !param.isRegist()) {
StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(param.getStream()); StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(param.getStream());
@ -337,7 +343,8 @@ public class ZLMHttpHookListener {
redisCatchStorage.stopPlay(streamInfo); redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
} else { } else {
streamInfo = redisCatchStorage.queryPlayback(null, null, param.getStream(), null); streamInfo = redisCatchStorage.queryPlayback(null, null,
param.getStream(), null);
if (streamInfo != null) { if (streamInfo != null) {
redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(), redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(),
streamInfo.getStream(), null); streamInfo.getStream(), null);
@ -346,48 +353,50 @@ public class ZLMHttpHookListener {
} else { } else {
if (!"rtp".equals(param.getApp())) { if (!"rtp".equals(param.getApp())) {
String type = OriginType.values()[param.getOriginType()].getType(); String type = OriginType.values()[param.getOriginType()].getType();
MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); if (param.isRegist()) {
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(
if (mediaServerItem != null) { param.getApp(), param.getStream());
if (param.isRegist()) { String callId = null;
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); if (streamAuthorityInfo != null) {
String callId = null; callId = streamAuthorityInfo.getCallId();
if (streamAuthorityInfo != null) { }
callId = streamAuthorityInfo.getCallId(); StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo,
} param.getApp(), param.getStream(), tracks, callId);
StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem, param.setStreamInfo(new StreamContent(streamInfoByAppAndStream));
param.getApp(), param.getStream(), tracks, callId); redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param);
param.setStreamInfo(new StreamContent(streamInfoByAppAndStream)); if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
redisCatchStorage.addStream(mediaServerItem, type, param.getApp(), param.getStream(), param); || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
|| param.getOriginType() == OriginType.RTMP_PUSH.ordinal() param.setSeverId(userSetting.getServerId());
|| param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { zlmMediaListManager.addPush(param);
param.setSeverId(userSetting.getServerId()); }
zlmMediaListManager.addPush(param); } else {
} // 兼容流注销时类型从redis记录获取
} else { OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
// 兼容流注销时类型从redis记录获取 param.getApp(), param.getStream(), param.getMediaServerId());
OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(param.getApp(), param.getStream(), param.getMediaServerId()); if (onStreamChangedHookParam != null) {
if (onStreamChangedHookParam != null) { type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream());
redisCatchStorage.removeStream(mediaServerItem.getId(), type, param.getApp(), param.getStream()); }
} GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) {
if (gbStream != null) {
// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
}
zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
}
if (type != null) {
// 发送流变化redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", param.getApp());
jsonObject.put("stream", param.getStream());
jsonObject.put("register", param.isRegist());
jsonObject.put("mediaServerId", param.getMediaServerId());
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
} }
zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
}
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
if (gbStream != null) {
eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF);
}
if (type != null) {
// 发送流变化redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", param.getApp());
jsonObject.put("stream", param.getStream());
jsonObject.put("register", param.isRegist());
jsonObject.put("mediaServerId", param.getMediaServerId());
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
} }
} }
} }
@ -403,7 +412,8 @@ public class ZLMHttpHookListener {
try { try {
if (platform != null) { if (platform != null) {
commanderFroPlatform.streamByeCmd(platform, sendRtpItem); commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStreamId());
} else { } else {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
} }
@ -428,7 +438,8 @@ public class ZLMHttpHookListener {
@PostMapping(value = "/on_stream_none_reader", produces = "application/json;charset=UTF-8") @PostMapping(value = "/on_stream_none_reader", produces = "application/json;charset=UTF-8")
public JSONObject onStreamNoneReader(@RequestBody OnStreamNoneReaderHookParam param) { public JSONObject onStreamNoneReader(@RequestBody OnStreamNoneReaderHookParam param) {
logger.info("[ZLM HOOK]流无人观看:{]->{}->{}/{}" + param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); logger.info("[ZLM HOOK]流无人观看:{]->{}->{}/{}" + param.getMediaServerId(), param.getSchema(),
param.getApp(), param.getStream());
JSONObject ret = new JSONObject(); JSONObject ret = new JSONObject();
ret.put("code", 0); ret.put("code", 0);
// 国标类型的流 // 国标类型的流
@ -440,7 +451,8 @@ public class ZLMHttpHookListener {
if (streamInfoForPlayCatch != null) { if (streamInfoForPlayCatch != null) {
// 收到无人观看说明流也没有在往上级推送 // 收到无人观看说明流也没有在往上级推送
if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) { if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(streamInfoForPlayCatch.getChannelId()); List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
streamInfoForPlayCatch.getChannelId());
if (sendRtpItems.size() > 0) { if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) { for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
@ -470,7 +482,8 @@ public class ZLMHttpHookListener {
return ret; return ret;
} }
// 录像回放 // 录像回放
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, param.getStream(), null); StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null,
param.getStream(), null);
if (streamInfoForPlayBackCatch != null) { if (streamInfoForPlayBackCatch != null) {
if (streamInfoForPlayBackCatch.isPause()) { if (streamInfoForPlayBackCatch.isPause()) {
ret.put("close", false); ret.put("close", false);
@ -491,7 +504,8 @@ public class ZLMHttpHookListener {
return ret; return ret;
} }
// 录像下载 // 录像下载
StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, param.getStream(), null); StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null,
param.getStream(), null);
// 进行录像下载时无人观看不断流 // 进行录像下载时无人观看不断流
if (streamInfoForDownload != null) { if (streamInfoForDownload != null) {
ret.put("close", false); ret.put("close", false);

View File

@ -18,6 +18,10 @@ public class HookResult {
return new HookResult(0, "success"); return new HookResult(0, "success");
} }
public static HookResult Fail(){
return new HookResult(-1, "fail");
}
public int getCode() { public int getCode() {
return code; return code;
} }

View File

@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*; import java.util.*;
@ -43,6 +44,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Autowired @Autowired
private RedisTemplate<Object, Object> redisTemplate; private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override @Override
public Long getCSEQ() { public Long getCSEQ() {
String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId(); String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId();
@ -913,7 +917,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
msg.append(":").append(channelId); msg.append(":").append(channelId);
} }
msg.append(" ").append(online? "ON":"OFF"); msg.append(" ").append(online? "ON":"OFF");
// 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
redisTemplate.convertAndSend(key, msg.toString()); stringRedisTemplate.convertAndSend(key, msg.toString());
} }
} }