diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index a160921f..f32d420f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -56,7 +56,7 @@ public class SIPRequestHeaderPlarformProvider { //via ArrayList viaHeaders = new ArrayList(); ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), - Integer.parseInt(parentPlatform.getDevicePort()), parentPlatform.getTransport(), SipUtils.getNewViaTag()); + parentPlatform.getDevicePort(), parentPlatform.getTransport(), SipUtils.getNewViaTag()); viaHeader.setRPort(); viaHeaders.add(viaHeader); //from diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index bbaf22cc..ffd41db3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -6,9 +6,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.SipLayer; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; -import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; @@ -624,9 +622,9 @@ public class SIPCommander implements ISIPCommander { logger.info("[语音喊话] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort()); HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + subscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, hookParam) -> { if (event != null) { - event.response(mediaServerItemInUse, json); + event.response(mediaServerItemInUse, hookParam); subscribe.removeSubscribe(hookSubscribeForStreamChange); } }); @@ -634,9 +632,9 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); callIdHeader.setCallId(callId); HookSubscribeForStreamPush hookSubscribeForStreamPush = HookSubscribeFactory.on_publish("rtp", stream, null, mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribeForStreamPush, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + subscribe.addSubscribe(hookSubscribeForStreamPush, (mediaServerItemInUse, hookParam) -> { if (eventForPush != null) { - eventForPush.response(mediaServerItemInUse, json); + eventForPush.response(mediaServerItemInUse, hookParam); } }); // diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 792d9018..30d62567 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -899,9 +900,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { if (event != null) { - event.response(mediaServerItemInUse, json); + event.response(mediaServerItemInUse, hookParam); subscribe.removeSubscribe(hookSubscribe); } }); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java index ab54d158..a970eb5c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; @@ -127,10 +128,9 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp // 消息发送成功, 向上级发送invite,获取推流 try { - platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, response)->{ + platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, hookParam)->{ + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; // 上级平台推流成功 - String app = response.getString("app"); - String stream = response.getString("stream"); AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId); if (broadcastCatch != null ) { if (playService.audioBroadcastInUse(device, targetId)) { @@ -138,24 +138,24 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp platform.getServerGBId(), deviceChannel.getChannelId()); // 查看语音通道已经建立且已经占用 回复BYE try { - platformService.stopBroadcast(platform, deviceChannel.getChannelId(), stream); + platformService.stopBroadcast(platform, deviceChannel.getChannelId(), streamChangedHookParam.getStream()); } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) { logger.info("[消息发送失败] 国标级联 语音喊话 platform: {}, channel: {}", platform.getServerGBId(), deviceChannel.getChannelId()); } }else { // 查看语音通道已经建立但是未占用 - broadcastCatch.setApp(app); - broadcastCatch.setStream(stream); + broadcastCatch.setApp(streamChangedHookParam.getApp()); + broadcastCatch.setStream(streamChangedHookParam.getStream()); broadcastCatch.setMediaServerItem(mediaServerItem); audioBroadcastManager.update(broadcastCatch); // 推流到设备 - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, stream, null); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, streamChangedHookParam.getStream(), null); if (sendRtpItem == null) { - logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, stream); - logger.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, stream); + logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream()); + logger.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, streamChangedHookParam.getStream()); try { - playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> { + playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> { logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId); }); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -173,7 +173,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp } }else { try { - playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> { + playService.audioBroadcastCmd(device, targetId, mediaServerItem, streamChangedHookParam.getApp(), streamChangedHookParam.getStream(), 60, true, msg -> { logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId); }); } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 95fb3aca..44bf11b1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; @@ -28,7 +29,7 @@ public interface IPlayService { ErrorCallback callback); SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback callback); - StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId); + StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId); MediaServerItem getNewMediaServerItem(Device device); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 6c337700..c43591bd 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.DynamicTask; @@ -16,6 +15,7 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; @@ -442,10 +442,11 @@ public class PlatformServiceImpl implements IPlatformService { inviteStreamService.removeInviteInfo(inviteInfo); }else { // 流确实尚在推流,直接回调结果 - JSONObject json = new JSONObject(); - json.put("app", inviteInfo.getStreamInfo().getApp()); - json.put("stream", inviteInfo.getStreamInfo().getStream()); - hookEvent.response(mediaServerItemForStreamInfo, json); + OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam(); + hookParam.setApp(inviteInfo.getStreamInfo().getApp()); + hookParam.setStream(inviteInfo.getStreamInfo().getStream()); + + hookEvent.response(mediaServerItemForStreamInfo, hookParam); return; } } @@ -498,14 +499,14 @@ public class PlatformServiceImpl implements IPlatformService { } } }, userSetting.getPlayTimeout()); - commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{ + commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, hookParam)->{ logger.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId); dynamicTask.stop(timeOutTaskKey); // hook响应 - playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId); + playService.onPublishHandlerForPlay(mediaServerItemForInvite, hookParam, platform.getServerGBId(), channelId); // 收到流 if (hookEvent != null) { - hookEvent.response(mediaServerItem, response); + hookEvent.response(mediaServerItem, hookParam); } }, event -> { // 收到200OK 检测ssrc是否有变化,防止上级自定义了ssrc @@ -524,30 +525,20 @@ public class PlatformServiceImpl implements IPlatformService { logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); if (!mediaServerItem.isRtpEnable()) { logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - - if (!ssrcFactory.checkSsrc(mediaServerItem.getId(), ssrcInResponse)) { - // ssrc 不可用 - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); - event.msg = "下级自定义了ssrc,但是此ssrc不可用"; - event.statusCode = 400; - errorEvent.response(event); - return; - } - + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); // 单端口模式streamId也有变化,需要重新设置监听 if (!mediaServerItem.isRtpEnable()) { // 添加订阅 HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); - subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); dynamicTask.stop(timeOutTaskKey); // hook响应 - playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId); - hookEvent.response(mediaServerItemInUse, response); + playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId); + hookEvent.response(mediaServerItemInUse, hookParam); }); } // 关闭rtp server diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index c345770d..65c479f3 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -293,12 +293,12 @@ public class PlayServiceImpl implements IPlayService { // 查看设备是否已经在推流 try { - cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("[语音对讲] 流已生成, 开始推流: " + response.toJSONString()); + cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (mediaServerItemInuse, hookParam) -> { + logger.info("[语音对讲] 流已生成, 开始推流: " + hookParam); dynamicTask.stop(timeOutTaskKey); // TODO 暂不做处理 - }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> { - logger.info("[语音对讲] 设备开始推流: " + json.toJSONString()); + }, (mediaServerItemInuse, hookParam) -> { + logger.info("[语音对讲] 设备开始推流: " + hookParam); dynamicTask.stop(timeOutTaskKey); }, (event) -> { @@ -617,10 +617,10 @@ public class PlayServiceImpl implements IPlayService { } @Override - public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) { - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; + StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); Device device = redisCatchStorage.getDevice(deviceId); - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -1571,7 +1571,7 @@ public class PlayServiceImpl implements IPlayService { } } - talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> { + talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> { logger.info("[语音对讲] 收到设备发来的流"); }, eventResult -> { logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg);