diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index ccf8151a..7faf428d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1,13 +1,14 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.SipLayer; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; @@ -19,6 +20,7 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.utils.DateUtil; @@ -276,9 +278,9 @@ public class SIPCommander implements ISIPCommander { logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort()); HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { if (event != null) { - event.response(mediaServerItemInUse, json); + event.response(mediaServerItemInUse, hookParam); subscribe.removeSubscribe(hookSubscribe); } }); @@ -466,9 +468,9 @@ public class SIPCommander implements ISIPCommander { HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); // 添加订阅 - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { if (hookEvent != null) { - hookEvent.response(mediaServerItemInUse, json); + hookEvent.response(mediaServerItemInUse, hookParam); } subscribe.removeSubscribe(hookSubscribe); }); @@ -569,15 +571,15 @@ public class SIPCommander implements ISIPCommander { // 添加订阅 CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); String callId= newCallIdHeader.getCallId(); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { logger.debug("sipc 添加订阅===callId {}",callId); - hookEvent.response(mediaServerItemInUse, json); + hookEvent.response(mediaServerItemInUse, hookParam); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("regist", false); hookSubscribe.getContent().put("schema", "rtsp"); // 添加流注销的订阅,注销了后向设备发送bye subscribe.addSubscribe(hookSubscribe, - (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd) -> { + (mediaServerItemForEnd, hookParam1) -> { logger.info("[录像]下载结束, 发送BYE"); try { streamByeCmd(device, channelId, ssrcInfo.getStream(), callId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 2d28a219..aaf56843 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; @@ -646,10 +647,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); // 监听流上线 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId()); - zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, responseJSON) -> { - String app = responseJSON.getString("app"); - String stream = responseJSON.getString("stream"); - logger.info("[上级点播]拉流代理已经就绪, {}/{}", app, stream); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; + logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); dynamicTask.stop(callIdHeader.getCallId()); pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 3810eca8..7f27b896 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -124,14 +124,12 @@ public class ZLMHttpHookListener { @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) { -// logger.info("[ZLM HOOK] 收到zlm心跳:" + param.getMediaServerId()); taskExecutor.execute(() -> { List subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); - JSONObject json = (JSONObject) JSON.toJSON(param); if (subscribes != null && subscribes.size() > 0) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, json); + subscribe.response(null, param); } } }); @@ -158,7 +156,7 @@ public class ZLMHttpHookListener { if (subscribe != null) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); if (mediaInfo != null) { - subscribe.response(mediaInfo, json); + subscribe.response(mediaInfo, param); } } }); @@ -234,7 +232,7 @@ public class ZLMHttpHookListener { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); if (subscribe != null) { if (mediaInfo != null) { - subscribe.response(mediaInfo, json); + subscribe.response(mediaInfo, param); } else { new HookResultForOnPublish(1, "zlm not register"); } @@ -306,7 +304,7 @@ public class ZLMHttpHookListener { return; } if (subscribe != null) { - subscribe.response(mediaInfo, json); + subscribe.response(mediaInfo, param); } List tracks = param.getTracks(); @@ -649,7 +647,7 @@ public class ZLMHttpHookListener { List subscribes = this.subscribe.getSubscribes(HookType.on_server_started); if (subscribes != null && subscribes.size() > 0) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, jsonObject); + subscribe.response(null, zlmServerConfig); } } mediaServerService.zlmServerOnline(zlmServerConfig); @@ -704,7 +702,7 @@ public class ZLMHttpHookListener { List subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout); if (subscribes != null && subscribes.size() > 0) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { - subscribe.response(null, json); + subscribe.response(null, param); } } }); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 2378d529..a350469d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -270,6 +270,11 @@ public class ZLMRESTfulUtils { } public JSONObject openRtpServer(MediaServerItem mediaServerItem, Map param){ + System.out.println("==============openRtpServer================="); + for (String s : param.keySet()) { + System.out.println(s + "-->" + param.get(s)); + } + System.out.println("==============================="); return sendPost(mediaServerItem, "openRtpServer",param, null); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 081d9191..13631683 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -9,6 +9,8 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -225,7 +227,8 @@ public class ZLMRTPServerFactory { int localPort = 0; if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc); + System.out.println("createSendRtpItem1"); + localPort = keepPort(serverItem, ssrc, localPort); if (localPort == 0) { return null; } @@ -261,7 +264,8 @@ public class ZLMRTPServerFactory { // 默认为随机端口 int localPort = 0; if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc); + System.out.println("createSendRtpItem2"); + localPort = keepPort(serverItem, ssrc, localPort); if (localPort == 0) { return null; } @@ -285,30 +289,37 @@ public class ZLMRTPServerFactory { /** * 保持端口,直到需要需要发流时再释放 */ - public int keepPort(MediaServerItem serverItem, String ssrc) { - int localPort = 0; + public int keepPort(MediaServerItem serverItem, String ssrc, Integer localPort) { Map param = new HashMap<>(3); - param.put("port", 0); + param.put("port", localPort); param.put("enable_tcp", 1); param.put("stream_id", ssrc); + System.out.println("用于收流"); JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param); + System.out.println(jsonObject); if (jsonObject.getInteger("code") == 0) { localPort = jsonObject.getInteger("port"); HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); + Integer finalLocalPort = localPort; hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, - (MediaServerItem mediaServerItem, JSONObject response)->{ - logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc); - int port = keepPort(serverItem, ssrc); + (MediaServerItem mediaServerItem, HookParam hookParam)->{ + logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort); + OnRtpServerTimeoutHookParam rtpServerTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam; + if (!ssrc.equals(rtpServerTimeoutHookParam.getSsrc())) { + return; + } + int port = keepPort(serverItem, ssrc, finalLocalPort); if (port == 0) { logger.info("[上级点播] {}->监听端口失败,移除监听", ssrc); hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); } }); logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort); + return localPort; }else { - logger.info("[上级点播] 监听端口失败: {}", ssrc); + logger.info("[上级点播] 监听端口失败: {}->{}", ssrc, localPort); + return 0; } - return localPort; } /** diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java index afbbafa8..6e594024 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java @@ -65,8 +65,8 @@ public class ZLMRunner implements CommandLineRunner { HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started(); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 hookSubscribe.addSubscribe(hookSubscribeForServerStarted, - (MediaServerItem mediaServerItem, JSONObject response)->{ - ZLMServerConfig zlmServerConfig = response.to(ZLMServerConfig.class); + (mediaServerItem, hookParam)->{ + ZLMServerConfig zlmServerConfig = (ZLMServerConfig)hookParam; if (zlmServerConfig !=null ) { if (startGetMedia != null) { startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java index 36862c0b..80910c02 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java @@ -1,8 +1,9 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson2.annotation.JSONField; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; -public class ZLMServerConfig { +public class ZLMServerConfig extends HookParam { @JSONField(name = "api.apiDebug") private String apiDebug; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java index 8f6a3faf..cad119ad 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -26,7 +27,7 @@ public class ZlmHttpHookSubscribe { @FunctionalInterface public interface Event{ - void response(MediaServerItem mediaServerItem, JSONObject response); + void response(MediaServerItem mediaServerItem, HookParam hookParam); } private Map> allSubscribes = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java index cb8e7383..3a58d156 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java @@ -50,4 +50,6 @@ public class HookResultForOnPublish extends HookResult{ public void setMp4_save_path(String mp4_save_path) { this.mp4_save_path = mp4_save_path; } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPlayHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPlayHookParam.java index 4d2c26fd..6e41ccea 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPlayHookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPlayHookParam.java @@ -81,6 +81,15 @@ public class OnPlayHookParam extends HookParam{ @Override public String toString() { - return String.format("%s://%s:%s/%s/%s?%s", schema, ip, port, app, stream, params); + return "OnPlayHookParam{" + + "id='" + id + '\'' + + ", app='" + app + '\'' + + ", stream='" + stream + '\'' + + ", ip='" + ip + '\'' + + ", params='" + params + '\'' + + ", port=" + port + + ", schema='" + schema + '\'' + + ", vhost='" + vhost + '\'' + + '}'; } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPublishHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPublishHookParam.java index e353163f..8a3e084d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPublishHookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnPublishHookParam.java @@ -81,6 +81,15 @@ public class OnPublishHookParam extends HookParam{ @Override public String toString() { - return String.format("%s://%s:%s/%s/%s?%s", schema, ip, port, app, stream, params); + return "OnPublishHookParam{" + + "id='" + id + '\'' + + ", app='" + app + '\'' + + ", stream='" + stream + '\'' + + ", ip='" + ip + '\'' + + ", params='" + params + '\'' + + ", port=" + port + + ", schema='" + schema + '\'' + + ", vhost='" + vhost + '\'' + + '}'; } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnRtpServerTimeoutHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnRtpServerTimeoutHookParam.java index e1912bbf..6179ce46 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnRtpServerTimeoutHookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnRtpServerTimeoutHookParam.java @@ -50,4 +50,15 @@ public class OnRtpServerTimeoutHookParam extends HookParam{ public void setSsrc(String ssrc) { this.ssrc = ssrc; } + + @Override + public String toString() { + return "OnRtpServerTimeoutHookParam{" + + "local_port=" + local_port + + ", stream_id='" + stream_id + '\'' + + ", tcpMode=" + tcpMode + + ", re_use_port=" + re_use_port + + ", ssrc='" + ssrc + '\'' + + '}'; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnSendRtpStoppedHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnSendRtpStoppedHookParam.java index bbdada9d..4989b4ab 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnSendRtpStoppedHookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnSendRtpStoppedHookParam.java @@ -24,4 +24,12 @@ public class OnSendRtpStoppedHookParam extends HookParam{ public void setStream(String stream) { this.stream = stream; } + + @Override + public String toString() { + return "OnSendRtpStoppedHookParam{" + + "app='" + app + '\'' + + ", stream='" + stream + '\'' + + '}'; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnServerKeepaliveHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnServerKeepaliveHookParam.java index 23360560..5439f20f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnServerKeepaliveHookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnServerKeepaliveHookParam.java @@ -17,4 +17,11 @@ public class OnServerKeepaliveHookParam extends HookParam{ public void setData(ServerKeepaliveData data) { this.data = data; } + + @Override + public String toString() { + return "OnServerKeepaliveHookParam{" + + "data=" + data + + '}'; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamChangedHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamChangedHookParam.java index 07c09e69..4587fb0f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamChangedHookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamChangedHookParam.java @@ -430,4 +430,14 @@ public class OnStreamChangedHookParam extends HookParam{ public void setCallId(String callId) { this.callId = callId; } + + @Override + public String toString() { + return "OnStreamChangedHookParam{" + + "regist=" + regist + + ", app='" + app + '\'' + + ", stream='" + stream + '\'' + + ", severId='" + severId + '\'' + + '}'; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNoneReaderHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNoneReaderHookParam.java index 0282ee59..3b628428 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNoneReaderHookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNoneReaderHookParam.java @@ -38,4 +38,14 @@ public class OnStreamNoneReaderHookParam extends HookParam{ public void setVhost(String vhost) { this.vhost = vhost; } + + @Override + public String toString() { + return "OnStreamNoneReaderHookParam{" + + "schema='" + schema + '\'' + + ", app='" + app + '\'' + + ", stream='" + stream + '\'' + + ", vhost='" + vhost + '\'' + + '}'; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNotFoundHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNotFoundHookParam.java index 20fdf823..76e6a721 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNotFoundHookParam.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnStreamNotFoundHookParam.java @@ -81,6 +81,15 @@ public class OnStreamNotFoundHookParam extends HookParam{ @Override public String toString() { - return String.format("%s://%s:%s/%s/%s?%s", schema, ip, port, app, stream, params); + return "OnStreamNotFoundHookParam{" + + "id='" + id + '\'' + + ", app='" + app + '\'' + + ", stream='" + stream + '\'' + + ", ip='" + ip + '\'' + + ", params='" + params + '\'' + + ", port=" + port + + ", schema='" + schema + '\'' + + ", vhost='" + vhost + '\'' + + '}'; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index cebfa1d5..e52fac2f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionStatus; @@ -25,6 +24,8 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; @@ -321,11 +322,11 @@ public class PlayServiceImpl implements IPlayService { }, userSetting.getPlayTimeout()); try { - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId,isSubStream, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); + cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId,isSubStream, (mediaServerItemInuse, hookParam ) -> { + logger.info("收到订阅消息: " + hookParam); dynamicTask.stop(timeOutTaskKey); // hook响应 - StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId,isSubStream); + StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId,isSubStream); if (streamInfo == null){ callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); @@ -438,11 +439,11 @@ public class PlayServiceImpl implements IPlayService { String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); hookSubscribe.getContent().put("stream", stream); inviteInfo.setStream(stream); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); dynamicTask.stop(timeOutTaskKey); // hook响应 - StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId,isSubStream); + StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId,isSubStream); if (streamInfo == null){ callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); @@ -568,13 +569,14 @@ public class PlayServiceImpl implements IPlayService { } } - private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId,boolean isSubStream) { + private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId, boolean isSubStream) { StreamInfo streamInfo = null; Device device = redisCatchStorage.getDevice(deviceId); + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; if( device.isSwitchPrimarySubStream() ){ - streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId,isSubStream); + streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId,isSubStream); }else { - streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); } if (streamInfo != null) { InviteInfo inviteInfo; @@ -603,9 +605,9 @@ public class PlayServiceImpl implements IPlayService { } - private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String startTime, String endTime) { - - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param; + StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); if (streamInfo != null) { streamInfo.setStartTime(startTime); streamInfo.setEndTime(endTime); @@ -724,10 +726,10 @@ public class PlayServiceImpl implements IPlayService { inviteStreamService.removeInviteInfo(inviteInfo); }; - ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> { - logger.info("收到回放订阅消息: " + jsonObject); + ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { + logger.info("收到回放订阅消息: " + hookParam); dynamicTask.stop(playBackTimeOutTaskKey); - StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime); + StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime); if (streamInfo == null) { logger.warn("设备回放API调用失败!"); callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), @@ -804,11 +806,11 @@ public class PlayServiceImpl implements IPlayService { String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); hookSubscribe.getContent().put("stream", stream); inviteInfo.setStream(stream); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); dynamicTask.stop(playBackTimeOutTaskKey); // hook响应 - hookEvent.response(mediaServerItemInUse, response); + hookEvent.response(mediaServerItemInUse, hookParam); }); } // 更新ssrc @@ -920,10 +922,10 @@ public class PlayServiceImpl implements IPlayService { streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); inviteStreamService.removeInviteInfo(inviteInfo); }; - ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> { - logger.info("[录像下载]收到订阅消息: " + jsonObject); + ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> { + logger.info("[录像下载]收到订阅消息: " + hookParam); dynamicTask.stop(downLoadTimeOutTaskKey); - StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime); + StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime); if (streamInfo == null) { logger.warn("[录像下载] 获取流地址信息失败"); callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), @@ -997,10 +999,10 @@ public class PlayServiceImpl implements IPlayService { HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); dynamicTask.stop(downLoadTimeOutTaskKey); - hookEvent.response(mediaServerItemInUse, response); + hookEvent.response(mediaServerItemInUse, hookParam); }); } @@ -1090,8 +1092,9 @@ public class PlayServiceImpl implements IPlayService { return null; } - private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, JSONObject response, String deviceId, String channelId, String startTime, String endTime) { - StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, response, deviceId, channelId); + private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; + StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId); if (streamInfo != null) { streamInfo.setProgress(0); streamInfo.setStartTime(startTime); @@ -1108,10 +1111,8 @@ public class PlayServiceImpl implements IPlayService { } - public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) { - String streamId = resonse.getString("stream"); - JSONArray tracks = resonse.getJSONArray("tracks"); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null); + public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) { + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null); streamInfo.setDeviceID(deviceId); streamInfo.setChannelId(channelId); return streamInfo; @@ -1307,9 +1308,9 @@ public class PlayServiceImpl implements IPlayService { /*======================设备主子码流逻辑START=========================*/ - public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId,boolean isSubStream) { - String streamId = resonse.getString("stream"); - JSONArray tracks = resonse.getJSONArray("tracks"); + public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId,boolean isSubStream) { + String streamId = hookParam.getStream(); + List tracks = hookParam.getTracks(); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null); streamInfo.setDeviceID(deviceId); streamInfo.setChannelId(channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index 7399b2a7..9f04950d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -289,7 +289,7 @@ public class RedisGbPlayMsgListener implements MessageListener { // 添加订阅 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{ dynamicTask.stop(taskKey); responseSendItem(mediaServerItem, content, toId, serial); }); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 80de5efb..3f478442 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,4 +2,4 @@ spring: application: name: wvp profiles: - active: dev \ No newline at end of file + active: local \ No newline at end of file