Merge branch '648540858:wvp-28181-2.0' into develop
commit
941b9a8374
|
@ -5,4 +5,4 @@ alter table wvp_platform
|
||||||
add auto_push_channel bool default false
|
add auto_push_channel bool default false
|
||||||
|
|
||||||
alter table wvp_stream_proxy
|
alter table wvp_stream_proxy
|
||||||
add stream_key varying(255)
|
add stream_key character varying(255)
|
||||||
|
|
|
@ -89,17 +89,17 @@ public class CatalogSubscribeTask implements ISubscribeTask {
|
||||||
ResponseEvent event = (ResponseEvent) eventResult.event;
|
ResponseEvent event = (ResponseEvent) eventResult.event;
|
||||||
if (event.getResponse().getRawContent() != null) {
|
if (event.getResponse().getRawContent() != null) {
|
||||||
// 成功
|
// 成功
|
||||||
logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
|
logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
|
||||||
}else {
|
}else {
|
||||||
// 成功
|
// 成功
|
||||||
logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
|
logger.info("[取消目录订阅]成功: {}", device.getDeviceId());
|
||||||
}
|
}
|
||||||
},eventResult -> {
|
},eventResult -> {
|
||||||
// 失败
|
// 失败
|
||||||
logger.warn("[取消目录订阅订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
|
logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
|
||||||
});
|
});
|
||||||
} catch (InvalidArgumentException | SipException | ParseException e) {
|
} catch (InvalidArgumentException | SipException | ParseException e) {
|
||||||
logger.error("[命令发送失败] 取消目录订阅订阅: {}", e.getMessage());
|
logger.error("[命令发送失败] 取消目录订阅: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,7 +132,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
|
||||||
|
|
||||||
if (CmdType.CATALOG.equals(cmd)) {
|
if (CmdType.CATALOG.equals(cmd)) {
|
||||||
logger.info("接收到Catalog通知");
|
logger.info("接收到Catalog通知");
|
||||||
processNotifyCatalogList(take.getEvt());
|
|
||||||
notifyRequestForCatalogProcessor.process(take.getEvt());
|
notifyRequestForCatalogProcessor.process(take.getEvt());
|
||||||
} else if (CmdType.ALARM.equals(cmd)) {
|
} else if (CmdType.ALARM.equals(cmd)) {
|
||||||
logger.info("接收到Alarm通知");
|
logger.info("接收到Alarm通知");
|
||||||
|
@ -371,114 +370,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/***
|
|
||||||
* 处理catalog设备目录列表Notify
|
|
||||||
*
|
|
||||||
* @param evt
|
|
||||||
*/
|
|
||||||
private void processNotifyCatalogList(RequestEvent evt) {
|
|
||||||
try {
|
|
||||||
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
|
|
||||||
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
|
|
||||||
|
|
||||||
Device device = redisCatchStorage.getDevice(deviceId);
|
|
||||||
if (device == null || !device.isOnLine()) {
|
|
||||||
logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Element rootElement = getRootElement(evt, device.getCharset());
|
|
||||||
if (rootElement == null) {
|
|
||||||
logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Element deviceListElement = rootElement.element("DeviceList");
|
|
||||||
if (deviceListElement == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
|
|
||||||
if (deviceListIterator != null) {
|
|
||||||
|
|
||||||
// 遍历DeviceList
|
|
||||||
while (deviceListIterator.hasNext()) {
|
|
||||||
Element itemDevice = deviceListIterator.next();
|
|
||||||
Element channelDeviceElement = itemDevice.element("DeviceID");
|
|
||||||
if (channelDeviceElement == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Element eventElement = itemDevice.element("Event");
|
|
||||||
String event;
|
|
||||||
if (eventElement == null) {
|
|
||||||
logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
|
|
||||||
event = CatalogEvent.ADD;
|
|
||||||
}else {
|
|
||||||
event = eventElement.getText().toUpperCase();
|
|
||||||
}
|
|
||||||
DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf);
|
|
||||||
if (channel == null) {
|
|
||||||
logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
|
|
||||||
channel.setParentId(null);
|
|
||||||
}
|
|
||||||
channel.setDeviceId(device.getDeviceId());
|
|
||||||
logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
|
|
||||||
switch (event) {
|
|
||||||
case CatalogEvent.ON:
|
|
||||||
// 上线
|
|
||||||
logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
|
|
||||||
storager.deviceChannelOnline(deviceId, channel.getChannelId());
|
|
||||||
break;
|
|
||||||
case CatalogEvent.OFF :
|
|
||||||
// 离线
|
|
||||||
logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
|
|
||||||
if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
|
|
||||||
storager.deviceChannelOffline(deviceId, channel.getChannelId());
|
|
||||||
}else {
|
|
||||||
logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case CatalogEvent.VLOST:
|
|
||||||
// 视频丢失
|
|
||||||
logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
|
|
||||||
if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
|
|
||||||
storager.deviceChannelOffline(deviceId, channel.getChannelId());
|
|
||||||
}else {
|
|
||||||
logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case CatalogEvent.DEFECT:
|
|
||||||
// 故障
|
|
||||||
break;
|
|
||||||
case CatalogEvent.ADD:
|
|
||||||
// 增加
|
|
||||||
logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
|
|
||||||
deviceChannelService.updateChannel(deviceId, channel);
|
|
||||||
break;
|
|
||||||
case CatalogEvent.DEL:
|
|
||||||
// 删除
|
|
||||||
logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
|
|
||||||
storager.delChannel(deviceId, channel.getChannelId());
|
|
||||||
break;
|
|
||||||
case CatalogEvent.UPDATE:
|
|
||||||
// 更新
|
|
||||||
logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
|
|
||||||
deviceChannelService.updateChannel(deviceId, channel);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
logger.warn("[ NotifyCatalog ] event not found : {}", event );
|
|
||||||
|
|
||||||
}
|
|
||||||
// 转发变化信息
|
|
||||||
eventPublisher.catalogEventPublish(null, channel, event);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (DocumentException e) {
|
|
||||||
logger.error("未处理的异常 ", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCmder(SIPCommander cmder) {
|
public void setCmder(SIPCommander cmder) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -215,6 +215,21 @@ public class ZLMRESTfulUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public JSONObject isMediaOnline(MediaServerItem mediaServerItem, String app, String stream, String schema){
|
||||||
|
Map<String, Object> param = new HashMap<>();
|
||||||
|
if (app != null) {
|
||||||
|
param.put("app",app);
|
||||||
|
}
|
||||||
|
if (stream != null) {
|
||||||
|
param.put("stream",stream);
|
||||||
|
}
|
||||||
|
if (schema != null) {
|
||||||
|
param.put("schema",schema);
|
||||||
|
}
|
||||||
|
param.put("vhost","__defaultVhost__");
|
||||||
|
return sendPost(mediaServerItem, "isMediaOnline", param, null);
|
||||||
|
}
|
||||||
|
|
||||||
public JSONObject getMediaList(MediaServerItem mediaServerItem, String app, String stream, String schema, RequestCallback callback){
|
public JSONObject getMediaList(MediaServerItem mediaServerItem, String app, String stream, String schema, RequestCallback callback){
|
||||||
Map<String, Object> param = new HashMap<>();
|
Map<String, Object> param = new HashMap<>();
|
||||||
if (app != null) {
|
if (app != null) {
|
||||||
|
|
|
@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
|
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
|
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
|
||||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
|
@ -520,16 +520,18 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||||
|
|
||||||
|
|
||||||
// 目录订阅相关的信息
|
// 目录订阅相关的信息
|
||||||
if (device.getSubscribeCycleForCatalog() > 0) {
|
if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
|
||||||
if (deviceInStore.getSubscribeCycleForCatalog() == 0 || deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
|
if (device.getSubscribeCycleForCatalog() > 0) {
|
||||||
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
|
// 若已开启订阅,但订阅周期不同,则先取消
|
||||||
|
if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
|
||||||
|
removeCatalogSubscribe(deviceInStore);
|
||||||
|
}
|
||||||
// 开启订阅
|
// 开启订阅
|
||||||
addCatalogSubscribe(deviceInStore);
|
|
||||||
}
|
|
||||||
}else if (device.getSubscribeCycleForCatalog() == 0) {
|
|
||||||
if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
|
|
||||||
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
|
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
|
||||||
|
addCatalogSubscribe(deviceInStore);
|
||||||
|
}else if (device.getSubscribeCycleForCatalog() == 0) {
|
||||||
// 取消订阅
|
// 取消订阅
|
||||||
|
deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
|
||||||
removeCatalogSubscribe(deviceInStore);
|
removeCatalogSubscribe(deviceInStore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -544,6 +546,8 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||||
}
|
}
|
||||||
}else if (device.getSubscribeCycleForMobilePosition() == 0) {
|
}else if (device.getSubscribeCycleForMobilePosition() == 0) {
|
||||||
if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
|
if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
|
||||||
|
deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
|
||||||
|
deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
|
||||||
// 取消订阅
|
// 取消订阅
|
||||||
removeMobilePositionSubscribe(deviceInStore);
|
removeMobilePositionSubscribe(deviceInStore);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,15 +35,19 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.TransactionDefinition;
|
import org.springframework.transaction.TransactionDefinition;
|
||||||
import org.springframework.transaction.TransactionStatus;
|
import org.springframework.transaction.TransactionStatus;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
import org.springframework.util.ObjectUtils;
|
import org.springframework.util.ObjectUtils;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 视频代理业务
|
* 视频代理业务
|
||||||
|
@ -560,4 +564,43 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
|
|
||||||
return new ResourceBaseInfo(total, online);
|
return new ResourceBaseInfo(total, online);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Scheduled(cron = "* 0/10 * * * ?")
|
||||||
|
public void asyncCheckStreamProxyStatus() {
|
||||||
|
|
||||||
|
List<MediaServerItem> all = mediaServerService.getAllOnline();
|
||||||
|
|
||||||
|
if (CollectionUtils.isEmpty(all)){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, MediaServerItem> serverItemMap = all.stream().collect(Collectors.toMap(MediaServerItem::getId, Function.identity(), (m1, m2) -> m1));
|
||||||
|
|
||||||
|
List<StreamProxyItem> list = videoManagerStorager.getStreamProxyListForEnable(true);
|
||||||
|
|
||||||
|
if (CollectionUtils.isEmpty(list)){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (StreamProxyItem streamProxyItem : list) {
|
||||||
|
|
||||||
|
MediaServerItem mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
|
||||||
|
|
||||||
|
// TODO 支持其他 schema
|
||||||
|
JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream(), "rtsp");
|
||||||
|
|
||||||
|
if (mediaInfo == null){
|
||||||
|
streamProxyItem.setStatus(false);
|
||||||
|
} else {
|
||||||
|
if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) {
|
||||||
|
streamProxyItem.setStatus(true);
|
||||||
|
} else {
|
||||||
|
streamProxyItem.setStatus(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
updateStreamProxy(streamProxyItem);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -506,6 +506,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||||
stream.setUpdateTime(DateUtil.getNow());
|
stream.setUpdateTime(DateUtil.getNow());
|
||||||
stream.setCreateTime(DateUtil.getNow());
|
stream.setCreateTime(DateUtil.getNow());
|
||||||
stream.setServerId(userSetting.getServerId());
|
stream.setServerId(userSetting.getServerId());
|
||||||
|
stream.setMediaServerId(mediaConfig.getId());
|
||||||
|
stream.setSelf(true);
|
||||||
|
stream.setPushIng(true);
|
||||||
|
|
||||||
// 放在事务内执行
|
// 放在事务内执行
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
|
|
Loading…
Reference in New Issue