临时提交

结构优化
648540858 2024-01-26 18:03:58 +08:00
parent 2a84e44610
commit 8892975d77
7 changed files with 150 additions and 131 deletions

View File

@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IResourcePlayCallback;
import com.genersoft.iot.vmp.service.IResourceService;
@ -14,17 +13,12 @@ import com.genersoft.iot.vmp.service.bean.CommonGbChannelType;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.impl.RedisCatchStorageImpl;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.net.MalformedURLException;
import java.net.URL;
/**
*

View File

@ -0,0 +1,9 @@
package com.genersoft.iot.vmp.gb28181.bean;
/**
* 28181
*/
public class ControlCommand {
}

View File

@ -83,6 +83,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IStreamSendManager streamSendManager;
@Autowired
@ -94,15 +95,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private RedisPushStreamResponseListener redisPushStreamResponseListener;
@Autowired
private IPlayService playService;
@Autowired
private Map<String, IResourceService> resourceServiceMap;
@Autowired
private SIPSender sipSender;
@Autowired
private ZLMServerFactory zlmServerFactory;
@ -121,7 +116,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private ZLMMediaListManager mediaListManager;
@Autowired
private RedisGbPlayMsgListener redisGbPlayMsgListener;
@ -226,7 +220,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
IResourcePlayCallback callback = (commonGbChannel, mediaServerItem, code, message, streamInfo) -> {
if (code != 0) {
logger.info("[上级Invite] 获取资源流失败。{}, {}/{}", message, streamInfo.getApp(), streamInfo.getStream());
logger.info("[上级Invite] channelId: {}, 获取资源流失败。{},", channelId, message);
try {
cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {

View File

@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.control.ControlMessageHandler;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IPlatformChannelService;
import com.genersoft.iot.vmp.service.IResourceService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.Element;
@ -31,20 +32,19 @@ import javax.sip.address.SipURI;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*;
@Component
public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
private Logger logger = LoggerFactory.getLogger(DeviceControlQueryMessageHandler.class);
private final Logger logger = LoggerFactory.getLogger(DeviceControlQueryMessageHandler.class);
private final String cmdType = "DeviceControl";
@Autowired
private ControlMessageHandler controlMessageHandler;
@Autowired
private IVideoManagerStorage storager;
@Autowired
private SIPCommander cmder;
@ -53,7 +53,7 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
private IPlatformChannelService platformChannelService;
@Autowired
private IDeviceChannelService deviceChannelService;
private Map<String, IResourceService> resourceServiceMap;
@Qualifier("taskExecutor")
@Autowired
@ -81,36 +81,6 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
if (!ObjectUtils.isEmpty(getText(rootElement, "TeleBoot"))) {
// TODO 拒绝远程启动命令
logger.warn("[国标级联]收到平台的远程启动命令, 不处理");
// if (parentPlatform.getServerGBId().equals(targetGBId)) {
// // 远程启动本平台需要在重新启动程序后先对SipStack解绑
// logger.info("执行远程启动本平台命令");
// try {
// cmderFroPlatform.unregister(parentPlatform, null, null);
// } catch (InvalidArgumentException | ParseException | SipException e) {
// logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
// }
// taskExecutor.execute(() -> {
// // 远程启动
//// try {
//// Thread.sleep(3000);
//// SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider");
//// SipStackImpl stack = (SipStackImpl)up.getSipStack();
//// stack.stop();
//// Iterator listener = stack.getListeningPoints();
//// while (listener.hasNext()) {
//// stack.deleteListeningPoint((ListeningPoint) listener.next());
//// }
//// Iterator providers = stack.getSipProviders();
//// while (providers.hasNext()) {
//// stack.deleteSipProvider((SipProvider) providers.next());
//// }
//// VManageBootstrap.restart();
//// } catch (InterruptedException | ObjectInUseException e) {
//// logger.error("[任务执行失败] 服务重启: {}", e.getMessage());
//// }
// });
// }
}
DeviceControlType deviceControlType = DeviceControlType.typeOf(rootElement);
logger.info("[接受deviceControl命令] 命令: {}", deviceControlType);
@ -125,44 +95,35 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
}
return;
}
//判断是否存在该通道
Device deviceForPlatform = deviceChannelService.getDeviceByChannelCommonGbId(commonGbChannel.getCommonGbId());
if (deviceForPlatform == null) {
try {
responseAck(request, Response.NOT_FOUND);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 错误信息: {}", e.getMessage());
}
return;
}
switch (deviceControlType) {
case PTZ:
handlePtzCmd(deviceForPlatform, channelId, rootElement, request, DeviceControlType.PTZ);
break;
case ALARM:
handleAlarmCmd(deviceForPlatform, rootElement, request);
break;
case GUARD:
handleGuardCmd(deviceForPlatform, rootElement, request, DeviceControlType.GUARD);
break;
case RECORD:
handleRecordCmd(deviceForPlatform, channelId, rootElement, request, DeviceControlType.RECORD);
break;
case I_FRAME:
handleIFameCmd(deviceForPlatform, request, channelId);
break;
case TELE_BOOT:
handleTeleBootCmd(deviceForPlatform, request);
break;
case DRAG_ZOOM_IN:
handleDragZoom(deviceForPlatform, channelId, rootElement, request, DeviceControlType.DRAG_ZOOM_IN);
break;
case DRAG_ZOOM_OUT:
handleDragZoom(deviceForPlatform, channelId, rootElement, request, DeviceControlType.DRAG_ZOOM_OUT);
break;
case HOME_POSITION:
handleHomePositionCmd(deviceForPlatform, channelId, rootElement, request, DeviceControlType.HOME_POSITION);
handlePtzCmd(commonGbChannel, rootElement, request, DeviceControlType.PTZ);
break;
// case ALARM:
// handleAlarmCmd(deviceForPlatform, rootElement, request);
// break;
// case GUARD:
// handleGuardCmd(deviceForPlatform, rootElement, request, DeviceControlType.GUARD);
// break;
// case RECORD:
// handleRecordCmd(deviceForPlatform, channelId, rootElement, request, DeviceControlType.RECORD);
// break;
// case I_FRAME:
// handleIFameCmd(deviceForPlatform, request, channelId);
// break;
// case TELE_BOOT:
// handleTeleBootCmd(deviceForPlatform, request);
// break;
// case DRAG_ZOOM_IN:
// handleDragZoom(deviceForPlatform, channelId, rootElement, request, DeviceControlType.DRAG_ZOOM_IN);
// break;
// case DRAG_ZOOM_OUT:
// handleDragZoom(deviceForPlatform, channelId, rootElement, request, DeviceControlType.DRAG_ZOOM_OUT);
// break;
// case HOME_POSITION:
// handleHomePositionCmd(deviceForPlatform, channelId, rootElement, request, DeviceControlType.HOME_POSITION);
// break;
default:
break;
}
@ -171,21 +132,36 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
/**
*
*
* @param device
* @param channelId id
* @param rootElement
* @param request
*/
private void handlePtzCmd(Device device, String channelId, Element rootElement, SIPRequest request, DeviceControlType type) {
String cmdString = getText(rootElement, type.getVal());
try {
cmder.fronEndCmd(device, channelId, cmdString,
errorResult -> onError(request, errorResult),
okResult -> onOk(request, okResult));
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 云台/前端: {}", e.getMessage());
private void handlePtzCmd(CommonGbChannel commonGbChannel, Element rootElement, SIPRequest request, DeviceControlType type) {
IResourceService resourceService = resourceServiceMap.get(commonGbChannel.getType());
if (resourceService == null) {
try {
responseAck(request, Response.FORBIDDEN);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 错误信息: {}", e.getMessage());
}
return;
}
// 解析云台控制参数
// resourceService.ptzControl(commonGbChannel)
String cmdString = getText(rootElement, type.getVal());
byte[] bytes = cmdString.getBytes();
System.out.println(cmdString);
for (byte aByte : bytes) {
System.out.print(aByte);
System.out.print(" ");
}
System.out.println(" ");
// try {
// cmder.fronEndCmd(device, channelId, cmdString,
// errorResult -> onError(request, errorResult),
// okResult -> onOk(request, okResult));
// } catch (InvalidArgumentException | SipException | ParseException e) {
// logger.error("[命令发送失败] 云台/前端: {}", e.getMessage());
// }
}
/**

View File

@ -445,7 +445,7 @@ public class ZLMHttpHookListener {
}
if (!param.isRegist()) {
List<SendRtpItem> sendRtpItems = streamSendManager.getByAppAndStream(param.getApp(), param.getStream());
if (!sendRtpItems.isEmpty()) {
if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
String platformId = sendRtpItem.getDestId();

View File

@ -30,7 +30,7 @@ public class StreamSendManagerImpl implements IStreamSendManager {
@Override
public void update(SendRtpItem sendRtpItem) {
if (sendRtpItem.getId() != null) {
if (sendRtpItem.getId() == null) {
sendRtpItem.setId(UUID.randomUUID().toString());
}
String dateId = datePrefix + sendRtpItem.getId();
@ -94,6 +94,10 @@ public class StreamSendManagerImpl implements IStreamSendManager {
return result;
}
private SendRtpItem getById(String id) {
return (SendRtpItem)redisTemplate.opsForValue().get(datePrefix + id);
}
@Override
public SendRtpItem getByCallId(String callId) {
String dateId = (String) redisTemplate.opsForValue().get(getCallIdKey(callId));
@ -147,24 +151,25 @@ public class StreamSendManagerImpl implements IStreamSendManager {
@Override
public void remove(SendRtpItem sendRtpItem) {
redisTemplate.delete(datePrefix);
String dateId = datePrefix + sendRtpItem.getId();
redisTemplate.delete(dateId);
if (sendRtpItem.getCallId() != null) {
redisTemplate.delete(getCallIdKey(sendRtpItem.getCallId()));
}
if (sendRtpItem.getApp() != null && sendRtpItem.getStreamId() != null) {
redisTemplate.opsForSet().remove(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId()));
redisTemplate.opsForSet().remove(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId()), dateId);
}
if (sendRtpItem.getMediaServerId() != null) {
redisTemplate.opsForSet().remove(getMediaServerIdKey(sendRtpItem.getMediaServerId()));
redisTemplate.opsForSet().remove(getMediaServerIdKey(sendRtpItem.getMediaServerId()), dateId);
}
if (sendRtpItem.getDestId() != null) {
redisTemplate.opsForSet().remove(getDestIdKey(sendRtpItem.getDestId()));
redisTemplate.opsForSet().remove(getDestIdKey(sendRtpItem.getDestId()), dateId);
}
if (sendRtpItem.getSourceId() != null) {
redisTemplate.opsForSet().remove(getSourceIdKey(sendRtpItem.getSourceId()));
redisTemplate.opsForSet().remove(getSourceIdKey(sendRtpItem.getSourceId()), dateId);
}
if (sendRtpItem.getChannelId() != null) {
redisTemplate.opsForSet().remove(getChannelIdKey(sendRtpItem.getChannelId()));
redisTemplate.opsForSet().remove(getChannelIdKey(sendRtpItem.getChannelId()), dateId);
}
}

View File

@ -30,6 +30,7 @@ import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
@ -107,7 +108,7 @@ public class PlayServiceImpl implements IPlayService {
private DynamicTask dynamicTask;
@Autowired
private CloudRecordServiceMapper cloudRecordServiceMapper;
private DeviceChannelMapper deviceChannelMapper;
@Override
@ -155,7 +156,7 @@ public class PlayServiceImpl implements IPlayService {
}else {
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
deviceChannelMapper.stopPlay(deviceId, channelId);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
}
}
@ -254,21 +255,37 @@ public class PlayServiceImpl implements IPlayService {
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId);
if (streamInfo == null){
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
try {
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
}catch (Exception e) {
logger.warn("[invite hook响应] 发送回调失败", e);
}
try {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
}catch (Exception e) {
logger.warn("[invite] 发送回调失败", e);
}
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channelId,
device.isSwitchPrimarySubStream() ? "辅码流" : "主码流");
snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channelId, ssrcInfo.getStream());
try {
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}catch (Exception e) {
logger.warn("[invite] 发送回调失败", e);
}
try {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
}catch (Exception e) {
logger.warn("[invite] 发送回调失败", e);
}
}, (eventResult) -> {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId,
@ -281,14 +298,21 @@ public class PlayServiceImpl implements IPlayService {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
try {
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
}catch (Exception e) {
logger.warn("[invite] 发送回调失败", e);
}
try {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
}catch (Exception e) {
logger.warn("[invite] 发送回调失败", e);
}
});
} catch (InvalidArgumentException | SipException | ParseException e) {
@ -300,13 +324,20 @@ public class PlayServiceImpl implements IPlayService {
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
try {
callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
}catch (Exception exception) {
logger.warn("[invite] 发送回调失败", exception);
}
try {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
}catch (Exception exception) {
logger.warn("[invite] 发送回调失败", exception);
}
}
}
@ -320,6 +351,16 @@ public class PlayServiceImpl implements IPlayService {
if (inviteInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
}
// 调用zlm关闭流
StreamInfo streamInfo = inviteInfo.getStreamInfo();
if (streamInfo != null && streamInfo.getMediaServerId() != null) {
MediaServerItem mediaServerItem = mediaServerService.getOne(streamInfo.getMediaServerId());
if (mediaServerItem != null) {
logger.info("[关闭流] {}/{}", streamInfo.getApp(), streamInfo.getStream());
mediaService.closeStream(mediaServerItem, streamInfo.getApp(), streamInfo.getStream());
}
}
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try {
logger.info("[停止点播] {}/{}", deviceId, channelId);
@ -330,7 +371,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
storager.stopPlay(deviceId, channelId);
deviceChannelMapper.stopPlay(deviceId, channelId);
}
private void tcpActiveHandler(Device device, String channelId, String contentString,
@ -887,7 +928,7 @@ public class PlayServiceImpl implements IPlayService {
public void zlmServerOffline(String mediaServerId) {
// 处理正在向上推流的上级平台
List<SendRtpItem> sendRtpItems = streamSendManager.getByMediaServerId(mediaServerId);
if (!sendRtpItems.isEmpty()) {
if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId());