diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index c11cd7ad..b4c7ff09 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -446,7 +446,7 @@ public class DeviceServiceImpl implements IDeviceService { device.setCreateTime(DateUtil.getNow()); device.setUpdateTime(DateUtil.getNow()); if(device.getStreamMode() == null) { - device.setStreamMode("UDP"); + device.setStreamMode("TCP-PASSIVE"); } deviceMapper.addCustomDevice(device); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java index 8c353ab4..4a0a1b90 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java @@ -162,12 +162,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { public void playProxy(CommonGBChannel channel, ErrorCallback callback){ // 拉流代理通道 try { - StreamInfo streamInfo = streamProxyPlayService.start(channel.getDataDeviceId()); - if (streamInfo == null) { - callback.run(Response.BUSY_HERE, "busy here", null); - }else { - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - } + streamProxyPlayService.start(channel.getDataDeviceId(), callback); }catch (Exception e) { callback.run(Response.BUSY_HERE, "busy here", null); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index 8ad37f1e..85eff410 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -166,7 +166,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen if (device == null) { device = new Device(); - device.setStreamMode("UDP"); + device.setStreamMode("TCP-PASSIVE"); device.setCharset("GB2312"); device.setGeoCoordSys("WGS84"); device.setMediaServerId("auto"); @@ -174,7 +174,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen device.setOnLine(false); } else { if (ObjectUtils.isEmpty(device.getStreamMode())) { - device.setStreamMode("UDP"); + device.setStreamMode("TCP-PASSIVE"); } if (ObjectUtils.isEmpty(device.getCharset())) { device.setCharset("GB2312"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java index 9db37b41..1de3ff7a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java @@ -79,7 +79,7 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent device.setModel(getText(rootElement, "Model")); device.setFirmware(getText(rootElement, "Firmware")); if (ObjectUtils.isEmpty(device.getStreamMode())) { - device.setStreamMode("UDP"); + device.setStreamMode("TCP-PASSIVE"); } deviceService.updateDevice(device); diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java index 9f9ebf6f..18b1499a 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java @@ -1,12 +1,15 @@ package com.genersoft.iot.vmp.streamProxy.service; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; public interface IStreamProxyPlayService { StreamInfo start(int id); + void start(int id, ErrorCallback callback); + StreamInfo startProxy(StreamProxy streamProxy); void stop(int id); diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java index 1f112c1a..a514b090 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java @@ -3,16 +3,28 @@ package com.genersoft.iot.vmp.streamProxy.service.impl; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ConcurrentHashMap; + +import javax.sip.message.Response; + import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -35,6 +47,54 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { @Autowired private IRedisRpcPlayService redisRpcPlayService; + private ConcurrentHashMap> callbackMap = new ConcurrentHashMap<>(); + + private ConcurrentHashMap streamInfoMap = new ConcurrentHashMap<>(); + + /** + * 流到来的处理 + */ + @Async("taskExecutor") + @Transactional + @EventListener + public void onApplicationEvent(MediaArrivalEvent event) { + if ("rtsp".equals(event.getSchema())) { + StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(event.getApp(), event.getStream()); + if (streamProxy != null) { + ErrorCallback callback = callbackMap.remove(streamProxy.getId()); + StreamInfo streamInfo = streamInfoMap.remove(streamProxy.getId()); + if (callback != null && streamInfo != null) { + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + } + } + } + } + + @Override + public void start(int id, ErrorCallback callback) { + StreamProxy streamProxy = streamProxyMapper.select(id); + if (streamProxy == null) { + throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); + } + StreamInfo streamInfo = startProxy(streamProxy); + if (streamInfo == null) { + callback.run(Response.BUSY_HERE, "busy here", null); + return; + } + callbackMap.put(id, callback); + streamInfoMap.put(id, streamInfo); + + MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId()); + if (mediaServer != null) { + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream()); + if (mediaInfo != null) { + callbackMap.remove(id); + streamInfoMap.remove(id); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + } + } + } + @Override public StreamInfo start(int id) { StreamProxy streamProxy = streamProxyMapper.select(id); diff --git a/数据库/2.7.3/初始化-mysql-2.7.3.sql b/数据库/2.7.3/初始化-mysql-2.7.3.sql index 27e42738..8e486697 100644 --- a/数据库/2.7.3/初始化-mysql-2.7.3.sql +++ b/数据库/2.7.3/初始化-mysql-2.7.3.sql @@ -341,12 +341,11 @@ create table wvp_cloud_record media_server_id character varying(50), server_id character varying(50), file_name character varying(255), - folder character varying(255), - file_path character varying(255), + folder character varying(500), + file_path character varying(500), collect bool default false, file_size bigint, - time_len bigint, - constraint uk_stream_push_app_stream_path unique (app, stream, file_path) + time_len bigint ); create table wvp_user diff --git a/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql b/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql index 8b511957..38a6af81 100644 --- a/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql +++ b/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql @@ -358,8 +358,8 @@ create table wvp_cloud_record media_server_id character varying(50), server_id character varying(50), file_name character varying(255), - folder character varying(255), - file_path character varying(255), + folder character varying(500), + file_path character varying(500), collect bool default false, file_size int8, time_len int8, diff --git a/数据库/2.7.3/更新-mysql-2.7.3.sql b/数据库/2.7.3/更新-mysql-2.7.3.sql index 17101e6b..01f42da4 100644 --- a/数据库/2.7.3/更新-mysql-2.7.3.sql +++ b/数据库/2.7.3/更新-mysql-2.7.3.sql @@ -33,4 +33,11 @@ alter table wvp_device_channel drop stream_proxy_id; /* * 20241231 */ -alter table wvp_stream_proxy add relates_media_server_id character varying(50); \ No newline at end of file +alter table wvp_stream_proxy add relates_media_server_id character varying(50); + +/* +* 20250111 +*/ +drop index uk_stream_push_app_stream_path on wvp_cloud_record; +alter table wvp_cloud_record change folder folder varchar(500) null; +alter table wvp_cloud_record change file_path file_path varchar(500) null; \ No newline at end of file diff --git a/数据库/2.7.3/更新-postgresql-kingbase-2.7.3.sql b/数据库/2.7.3/更新-postgresql-kingbase-2.7.3.sql index 237c9d75..8317bce7 100644 --- a/数据库/2.7.3/更新-postgresql-kingbase-2.7.3.sql +++ b/数据库/2.7.3/更新-postgresql-kingbase-2.7.3.sql @@ -27,3 +27,16 @@ set data_type = 3, data_device_id = (SELECT stream_proxy_id from wvp_device_chan alter table wvp_device_channel drop device_db_id; alter table wvp_device_channel drop stream_push_id; alter table wvp_device_channel drop stream_proxy_id; + +/* +* 20241231 +*/ +alter table wvp_stream_proxy add relates_media_server_id character varying(50); + + +/* +* 20250111 +*/ +drop index uk_stream_push_app_stream_path on wvp_cloud_record; +alter table wvp_cloud_record change folder folder varchar(500) null; +alter table wvp_cloud_record change file_path file_path varchar(500) null; \ No newline at end of file