diff --git a/sql/2.6.9更新.sql b/sql/2.6.9更新.sql new file mode 100644 index 00000000..2e047948 --- /dev/null +++ b/sql/2.6.9更新.sql @@ -0,0 +1,2 @@ +alter table wvp_device_channel + change stream_id stream_id varying(255) \ No newline at end of file diff --git a/sql/初始化.sql b/sql/初始化.sql index f2952ba1..93eef4ed 100644 --- a/sql/初始化.sql +++ b/sql/初始化.sql @@ -79,7 +79,7 @@ create table wvp_device_channel ( custom_longitude double precision, latitude double precision, custom_latitude double precision, - stream_id character varying(50), + stream_id character varying(255), device_id character varying(50) not null, parental character varying(50), has_audio bool default false, diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 33daca61..7dd23127 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -98,6 +98,10 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId); return; } + // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤 + if (sendRtpItem.isTcpActive()) { + return; + } logger.info("[收到ACK]:rtp/{}开始级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); // 取消设置的超时任务 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 1710a9cc..2d26b4a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -50,6 +50,8 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.Vector; @@ -406,6 +408,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); content.append("f=\r\n"); + try { // 超时未收到Ack应该回复bye,当前等待时间为10秒 dynamicTask.startDelay(callIdHeader.getCallId(), () -> { @@ -418,8 +421,34 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } }, 60 * 1000); - - responseSdpAck(request, content.toString(), platform); + responseSdpAck(request, content.toString(), platform); + // tcp主动模式,回复sdp后开启监听 + if (sendRtpItem.isTcpActive()) { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + if (!sendRtpItem.isTcpActive()) { + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + } + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + param.put("is_udp", is_Udp); + param.put("src_port", localPort); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + if (!sendRtpItem.isTcp()) { + // 开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param); + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); + } + } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 回复SdpAck", e); } @@ -553,6 +582,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { + if (jsonObject == null) { + logger.error("下级TCP被动启动监听失败: 请检查ZLM服务"); + } else if (jsonObject.getInteger("code") == 0) { + logger.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject); + logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + } else { + logger.error("启动监听TCP被动推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); + } + } + /** * 安排推流 */ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 729eec39..5c577ba6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -59,6 +59,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp // 未注册的设备不做处理 return; } + logger.info("[收到心跳], device: {}", device.getDeviceId()); SIPRequest request = (SIPRequest) evt.getRequest(); // 回复200 OK try { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index a33665c7..7197ff47 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -25,8 +25,10 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; @@ -641,7 +643,7 @@ public class ZLMHttpHookListener { if ("rtp".equals(param.getApp())) { String[] s = param.getStream().split("_"); - if (!mediaInfo.isRtpEnable() || s.length != 2) { + if (!mediaInfo.isRtpEnable() || (s.length != 2 && s.length != 4)) { defaultResult.setResult(HookResult.SUCCESS()); return defaultResult; } @@ -657,33 +659,79 @@ public class ZLMHttpHookListener { defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); return defaultResult; } - logger.info("[ZLM HOOK] 流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + if (s.length == 2) { + logger.info("[ZLM HOOK] 预览流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); - RequestMessage msg = new RequestMessage(); - String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; - boolean exist = resultHolder.exist(key, null); - msg.setKey(key); - String uuid = UUID.randomUUID().toString(); - msg.setId(uuid); - DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + RequestMessage msg = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; + boolean exist = resultHolder.exist(key, null); + msg.setKey(key); + String uuid = UUID.randomUUID().toString(); + msg.setId(uuid); + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); - result.onTimeout(() -> { - logger.info("[ZLM HOOK] 自动点播, 等待超时"); - // 释放rtpserver - msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); - resultHolder.invokeResult(msg); - }); - - // 录像查询以channelId作为deviceId查询 - resultHolder.put(key, uuid, result); - - if (!exist) { - playService.play(mediaInfo, deviceId, channelId, null, (code, message, data) -> { - msg.setData(new HookResult(code, message)); + result.onTimeout(() -> { + logger.info("[ZLM HOOK] 预览流自动点播, 等待超时"); + // 释放rtpserver + msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); resultHolder.invokeResult(msg); }); + + resultHolder.put(key, uuid, result); + + if (!exist) { + playService.play(mediaInfo, deviceId, channelId, null, (code, message, data) -> { + msg.setData(new HookResult(code, message)); + resultHolder.invokeResult(msg); + }); + } + return result; + }else if(s.length == 4){ + // 此时为录像回放, 录像回放格式为> 设备ID_通道ID_开始时间_结束时间 + String startTimeStr = s[2]; + String endTimeStr = s[3]; + if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) { + defaultResult.setResult(HookResult.SUCCESS()); + return defaultResult; + } + String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr); + String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr); + logger.info("[ZLM HOOK] 回放流未找到, 发起自动点播:{}->{}->{}/{}-{}-{}", + param.getMediaServerId(), param.getSchema(), + param.getApp(), param.getStream(), + startTime, endTime + ); + RequestMessage msg = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; + boolean exist = resultHolder.exist(key, null); + msg.setKey(key); + String uuid = UUID.randomUUID().toString(); + msg.setId(uuid); + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + + result.onTimeout(() -> { + logger.info("[ZLM HOOK] 回放流自动点播, 等待超时"); + // 释放rtpserver + msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); + resultHolder.invokeResult(msg); + }); + + resultHolder.put(key, uuid, result); + + if (!exist) { + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaInfo, param.getStream(), null, + device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); + playService.playBack(mediaInfo, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> { + msg.setData(new HookResult(code, message)); + resultHolder.invokeResult(msg); + }); + } + return result; + }else { + defaultResult.setResult(HookResult.SUCCESS()); + return defaultResult; } - return result; + } else { // 拉流代理 StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 60d377da..765cc843 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -40,7 +40,6 @@ public interface IPlayService { void playBack(String deviceId, String channelId, String startTime, String endTime, ErrorCallback callback); void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback callback); - void zlmServerOffline(String mediaServerId); void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); @@ -72,4 +71,6 @@ public interface IPlayService { void stopTalk(Device device, String channelId, Boolean streamIsReady); void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java index fa164289..517cb046 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java @@ -126,7 +126,15 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { List deviceChannelList = new ArrayList<>(); if (channelReduces.size() > 0){ PlatformCatalog catalog = catalogManager.selectByPlatFormAndCatalogId(platform.getServerGBId(),catalogId); - if (catalog == null || !catalogId.equals(platform.getDeviceGBId())) { + if (catalog == null && catalogId.equals(platform.getDeviceGBId())) { + for (ChannelReduce channelReduce : channelReduces) { + DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId()); + deviceChannel.setParental(0); + deviceChannel.setCivilCode(platform.getServerGBDomain()); + deviceChannelList.add(deviceChannel); + } + return deviceChannelList; + } else if (catalog == null || !catalogId.equals(platform.getDeviceGBId())) { logger.warn("未查询到目录{}的信息", catalogId); return null; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 3b9d6174..cce87cfa 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -710,7 +710,19 @@ public class PlayServiceImpl implements IPlayService { return; } MediaServerItem newMediaServerItem = getNewMediaServerItem(device); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,false, device.getStreamModeForParam()); + String stream = null; + if (newMediaServerItem.isRtpEnable()) { + String startTimeStr = startTime.replace("-", "") + .replace(":", "") + .replace(" ", ""); + System.out.println(startTimeStr); + String endTimeTimeStr = endTime.replace("-", "") + .replace(":", "") + .replace(" ", ""); + System.out.println(endTimeTimeStr); + stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr; + } + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false,false, device.getStreamModeForParam()); playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java index 7a65a610..923f8346 100644 --- a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java @@ -53,6 +53,10 @@ public class DateUtil { return formatter.format(formatterCompatibleISO8601.parse(formatTime)); } + public static String urlToyyyy_MM_dd_HH_mm_ss(String formatTime) { + return formatter.format(urlFormatter.parse(formatTime)); + } + /** * yyyy_MM_dd_HH_mm_ss 转时间戳 * @param formatTime @@ -82,6 +86,7 @@ public class DateUtil { return urlFormatter.format(nowDateTime); } + /** * 格式校验 * @param timeStr 时间字符串