Merge branch 'wvp-28181-2.0'

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
结构优化
648540858 2022-08-15 15:37:13 +08:00
commit cb16cabb64
18 changed files with 305 additions and 310 deletions

View File

@ -24,7 +24,7 @@
7. 启动服务以linux为例 7. 启动服务以linux为例
**启动WVP-PRO** **启动WVP-PRO**
```shell ```shell
nohup java -jar java -jar wvp-pro-*.jar & nohup java -jar wvp-pro-*.jar &
``` ```
**启动ZLM** **启动ZLM**

12
pom.xml
View File

@ -61,13 +61,6 @@
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId> <artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<!-- 去掉 Lettuce 的依赖, Spring Boot 优先使用 Lettuce 作为 Redis 客户端 -->
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
@ -94,11 +87,6 @@
<artifactId>spring-boot-starter-security</artifactId> <artifactId>spring-boot-starter-security</artifactId>
</dependency> </dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!-- druid数据库连接池 --> <!-- druid数据库连接池 -->
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>

View File

@ -77,38 +77,54 @@ public class VideoManagerConstants {
//************************** redis 消息********************************* //************************** redis 消息*********************************
// 流变化的通知 /**
*
*/
public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_"; public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
// 接收推流设备的GPS变化通知 /**
* GPS
*/
public static final String VM_MSG_GPS = "VM_MSG_GPS"; public static final String VM_MSG_GPS = "VM_MSG_GPS";
// 接收推流设备的GPS变化通知 /**
* GPS
*/
public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE"; public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE";
// redis 消息通知设备推流到平台 /**
* redis
*/
public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED"; public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED";
// redis 消息请求所有的在线通道 /**
* redis 线
*/
public static final String VM_MSG_GET_ALL_ONLINE_REQUESTED = "VM_MSG_GET_ALL_ONLINE_REQUESTED"; public static final String VM_MSG_GET_ALL_ONLINE_REQUESTED = "VM_MSG_GET_ALL_ONLINE_REQUESTED";
// 移动位置订阅通知 /**
*
*/
public static final String VM_MSG_SUBSCRIBE_MOBILE_POSITION = "mobileposition"; public static final String VM_MSG_SUBSCRIBE_MOBILE_POSITION = "mobileposition";
// 报警订阅的通知收到报警向redis发出通知 /**
* redis
*/
public static final String VM_MSG_SUBSCRIBE_ALARM = "alarm"; public static final String VM_MSG_SUBSCRIBE_ALARM = "alarm";
// 报警通知的发送 收到redis发出的通知转发给其他平台 /**
* redis
*/
public static final String VM_MSG_SUBSCRIBE_ALARM_RECEIVE= "alarm_receive"; public static final String VM_MSG_SUBSCRIBE_ALARM_RECEIVE= "alarm_receive";
// 设备状态订阅的通知 /**
*
*/
public static final String VM_MSG_SUBSCRIBE_DEVICE_STATUS = "device"; public static final String VM_MSG_SUBSCRIBE_DEVICE_STATUS = "device";
//************************** 第三方 **************************************** //************************** 第三方 ****************************************
public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.conf; package com.genersoft.iot.vmp.conf;
import com.alibaba.fastjson.parser.ParserConfig;
import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.service.impl.*; import com.genersoft.iot.vmp.service.impl.*;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -9,15 +10,14 @@ import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer;
import com.alibaba.fastjson.parser.ParserConfig;
import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer; import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/** /**
* @description:Redis使spring-data-redisapplication.ymlredis * @description:Redis使spring-data-redisapplication.ymlredis
@ -28,23 +28,6 @@ import redis.clients.jedis.JedisPoolConfig;
@Configuration @Configuration
public class RedisConfig extends CachingConfigurerSupport { public class RedisConfig extends CachingConfigurerSupport {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.database}")
private int database;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.timeout}")
private int timeout;
@Value("${spring.redis.poolMaxTotal:1000}")
private int poolMaxTotal;
@Value("${spring.redis.poolMaxIdle:500}")
private int poolMaxIdle;
@Value("${spring.redis.poolMaxWait:5}")
private int poolMaxWait;
@Autowired @Autowired
private RedisGpsMsgListener redisGPSMsgListener; private RedisGpsMsgListener redisGPSMsgListener;
@ -61,36 +44,24 @@ public class RedisConfig extends CachingConfigurerSupport {
private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
@Bean @Bean
public JedisPool jedisPool() { public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
if (StringUtils.isBlank(password)) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
password = null; // 使用fastJson序列化
} FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
JedisPoolConfig poolConfig = new JedisPoolConfig(); // value值的序列化采用fastJsonRedisSerializer
poolConfig.setMaxIdle(poolMaxIdle); redisTemplate.setValueSerializer(fastJsonRedisSerializer);
poolConfig.setMaxTotal(poolMaxTotal); redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
// 秒转毫秒 // 全局开启AutoType不建议使用
poolConfig.setMaxWaitMillis(poolMaxWait * 1000L); ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
JedisPool jp = new JedisPool(poolConfig, host, port, timeout * 1000, password, database); // 建议使用这种方式,小范围指定白名单,需要序列化的类
return jp; // ParserConfig.getGlobalInstance().addAccept("com.avatar");
// key的序列化采用StringRedisSerializer
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate;
} }
@Bean("redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
// 使用fastjson进行序列化处理提高解析效率
FastJsonRedisSerializer<Object> serializer = new FastJsonRedisSerializer<Object>(Object.class);
// value值的序列化采用fastJsonRedisSerializer
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
// key的序列化采用StringRedisSerializer
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setConnectionFactory(redisConnectionFactory);
// 使用fastjson时需设置此项否则会报异常not support type
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
return template;
}
/** /**
* redis redis * redis redis

View File

@ -28,7 +28,7 @@ public class RedisKeyExpirationEventMessageListener extends KeyExpirationEventMe
RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection(); RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection();
Properties config = connection.getConfig("notify-keyspace-events"); Properties config = connection.getConfig("notify-keyspace-events");
try { try {
if (!config.getProperty("notify-keyspace-events").equals(keyspaceNotificationsConfigParameter)) { if (!keyspaceNotificationsConfigParameter.equals(config.getProperty("notify-keyspace-events"))) {
connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter); connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);
} }
} finally { } finally {

View File

@ -53,10 +53,15 @@ public class SipLayer{
* gov/nist/javax/sip/SipStackImpl.class * gov/nist/javax/sip/SipStackImpl.class
*/ */
properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true"); properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true");
properties.setProperty("gov.nist.javax.sip.DELIVER_UNSOLICITED_NOTIFY", "true"); // 接收所有notify请求即使没有订阅 // 接收所有notify请求即使没有订阅
properties.setProperty("gov.nist.javax.sip.DELIVER_TERMINATED_EVENT_FOR_NULL_DIALOG", "true"); // 为_NULL _对话框传递_终止的_事件 properties.setProperty("gov.nist.javax.sip.DELIVER_UNSOLICITED_NOTIFY", "true");
properties.setProperty("gov.nist.javax.sip.RELEASE_REFERENCES_STRATEGY", "Normal"); // 会话清理策略 // 为_NULL _对话框传递_终止的_事件
properties.setProperty("gov.nist.javax.sip.RELIABLE_CONNECTION_KEEP_ALIVE_TIMEOUT", "10"); properties.setProperty("gov.nist.javax.sip.DELIVER_TERMINATED_EVENT_FOR_NULL_DIALOG", "true");
// 会话清理策略
properties.setProperty("gov.nist.javax.sip.RELEASE_REFERENCES_STRATEGY", "Normal");
// 处理由该服务器处理的基于底层TCP的保持生存超时
properties.setProperty("gov.nist.javax.sip.RELIABLE_CONNECTION_KEEP_ALIVE_TIMEOUT", "60");
/** /**
* sip_server_log.log sip_debug_log.log public static final int TRACE_NONE = * sip_server_log.log sip_debug_log.log public static final int TRACE_NONE =
* 0; public static final int TRACE_MESSAGES = 16; public static final int * 0; public static final int TRACE_MESSAGES = 16; public static final int

View File

@ -62,7 +62,7 @@ public class SIPRequestHeaderPlarformProvider {
// Forwards // Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq // ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE); CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.MESSAGE);
request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader, request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards); toHeader, viaHeaders, maxForwards);
@ -120,7 +120,7 @@ public class SIPRequestHeaderPlarformProvider {
String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException { String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException {
Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader); Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader);
SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort());
if (www == null) { if (www == null) {
AuthorizationHeader authorizationHeader = sipFactory.createHeaderFactory().createAuthorizationHeader("Digest"); AuthorizationHeader authorizationHeader = sipFactory.createHeaderFactory().createAuthorizationHeader("Digest");
@ -213,7 +213,7 @@ public class SIPRequestHeaderPlarformProvider {
// Forwards // Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq // ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE); CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.MESSAGE);
MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
// 设置编码, 防止中文乱码 // 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset(parentPlatform.getCharacterSet()); messageFactory.setDefaultContentEncodingCharset(parentPlatform.getCharacterSet());

View File

@ -2,11 +2,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import javax.sip.Dialog; import javax.sip.*;
import javax.sip.InvalidArgumentException;
import javax.sip.PeerUnavailableException;
import javax.sip.SipFactory;
import javax.sip.address.Address; import javax.sip.address.Address;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.*; import javax.sip.header.*;
@ -15,7 +13,11 @@ import javax.sip.message.Request;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.stack.SIPDialog;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
@ -41,6 +43,14 @@ public class SIPRequestHeaderProvider {
@Autowired @Autowired
private VideoStreamSessionManager streamSession; private VideoStreamSessionManager streamSession;
@Autowired
@Qualifier(value="tcpSipProvider")
private SipProviderImpl tcpSipProvider;
@Autowired
@Qualifier(value="udpSipProvider")
private SipProviderImpl udpSipProvider;
public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null; Request request = null;
// sipuri // sipuri
@ -95,7 +105,7 @@ public class SIPRequestHeaderProvider {
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
//ceq //ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.INVITE), Request.INVITE); CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE);
request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards);
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort())); Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
@ -131,7 +141,7 @@ public class SIPRequestHeaderProvider {
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
//ceq //ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.INVITE), Request.INVITE); CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE);
request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards);
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort())); Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
@ -200,7 +210,7 @@ public class SIPRequestHeaderProvider {
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq // ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.SUBSCRIBE), Request.SUBSCRIBE); CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.SUBSCRIBE);
request = sipFactory.createMessageFactory().createRequest(requestURI, Request.SUBSCRIBE, callIdHeader, cSeqHeader, fromHeader, request = sipFactory.createMessageFactory().createRequest(requestURI, Request.SUBSCRIBE, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards); toHeader, viaHeaders, maxForwards);
@ -226,55 +236,55 @@ public class SIPRequestHeaderProvider {
} }
public Request createInfoRequest(Device device, StreamInfo streamInfo, String content) public Request createInfoRequest(Device device, StreamInfo streamInfo, String content)
throws PeerUnavailableException, ParseException, InvalidArgumentException { throws SipException, ParseException, InvalidArgumentException {
Request request = null;
if (streamInfo == null) { if (streamInfo == null) {
return null; return null;
} }
Dialog dialog = streamSession.getDialogByStream(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream()); Request request = null;
SIPDialog dialog = streamSession.getDialogByStream(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream());
if (dialog == null) { if (dialog == null) {
return null; return null;
} }
SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), SipStack sipStack = udpSipProvider.getSipStack();
device.getHostAddress()); SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
// via if (dialog != sipDialog) {
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>(); dialog = sipDialog;
ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getIp(), device.getPort(), }else {
device.getTransport(), null); dialog.setSipProvider(udpSipProvider);
}
streamSession.put(streamInfo.getDeviceID(), streamInfo.getChannelId(), dialog.getCallId().getCallId(), dialog);
Request infoRequest = dialog.createRequest(Request.INFO);
SipURI sipURI = (SipURI) infoRequest.getRequestURI();
sipURI.setHost(device.getIp());
sipURI.setPort(device.getPort());
sipURI.setUser(streamInfo.getChannelId());
ViaHeader viaHeader = (ViaHeader) infoRequest.getHeader(ViaHeader.NAME);
viaHeader.setRPort(); viaHeader.setRPort();
viaHeaders.add(viaHeader); // 增加Contact header
// from
SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(sipConfig.getId(),
sipConfig.getDomain());
Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, dialog.getLocalTag());
// to
SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(streamInfo.getChannelId(),
sipConfig.getDomain());
Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, dialog.getRemoteTag());
// callid
CallIdHeader callIdHeader = dialog.getCallId();
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
Long cseq = redisCatchStorage.getCSEQ(Request.INVITE);
// ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory()
.createCSeqHeader(cseq, Request.INFO);
request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader,
fromHeader, toHeader, viaHeaders, maxForwards);
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory() Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
.createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort())); .createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort()));
request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); infoRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
// TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
} catch (ParseException e) {
throw new RuntimeException(e);
}
infoRequest.addHeader(userAgentHeader);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application",
"MANSRTSP"); "MANSRTSP");
request.setContent(content, contentTypeHeader); infoRequest.setContent(content, contentTypeHeader);
return request;
CSeqHeader cSeqHeader = (CSeqHeader)infoRequest.getHeader(CSeqHeader.NAME);
cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ());
// ceq
infoRequest.addHeader(cSeqHeader);
return infoRequest;
} }
} }

View File

@ -727,21 +727,28 @@ public class SIPCommander implements ISIPCommander {
} }
} }
streamByeCmd(dialog, (SIPRequest)transaction.getRequest(), okEvent);
} catch (SipException | ParseException e) {
e.printStackTrace();
}
}
@Override
public void streamByeCmd(SIPDialog dialog, SIPRequest request, SipSubscribe.Event okEvent) throws SipException, ParseException {
Request byeRequest = dialog.createRequest(Request.BYE); Request byeRequest = dialog.createRequest(Request.BYE);
SipURI byeURI = (SipURI) byeRequest.getRequestURI(); SipURI byeURI = (SipURI) byeRequest.getRequestURI();
SIPRequest request = (SIPRequest)transaction.getRequest();
byeURI.setHost(request.getRemoteAddress().getHostAddress()); byeURI.setHost(request.getRemoteAddress().getHostAddress());
byeURI.setPort(request.getRemotePort()); byeURI.setPort(request.getRemotePort());
byeURI.setUser(channelId);
ViaHeader viaHeader = (ViaHeader) byeRequest.getHeader(ViaHeader.NAME); ViaHeader viaHeader = (ViaHeader) byeRequest.getHeader(ViaHeader.NAME);
String protocol = viaHeader.getTransport().toUpperCase(); String protocol = viaHeader.getTransport().toUpperCase();
viaHeader.setRPort();
// 增加Contact header
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
byeRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
// TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
} catch (ParseException e) {
throw new RuntimeException(e);
}
byeRequest.addHeader(userAgentHeader);
ClientTransaction clientTransaction = null; ClientTransaction clientTransaction = null;
if("TCP".equals(protocol)) { if("TCP".equals(protocol)) {
clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest); clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest);
@ -753,8 +760,15 @@ public class SIPCommander implements ISIPCommander {
if (okEvent != null) { if (okEvent != null) {
sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent);
} }
CSeqHeader cSeqHeader = (CSeqHeader)byeRequest.getHeader(CSeqHeader.NAME);
cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ());
dialog.sendRequest(clientTransaction); dialog.sendRequest(clientTransaction);
} catch (SipException | ParseException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
throw new RuntimeException(e);
}
} }
/** /**
@ -1450,7 +1464,7 @@ public class SIPCommander implements ISIPCommander {
request.setContent(subscribePostitionXml.toString(), contentTypeHeader); request.setContent(subscribePostitionXml.toString(), contentTypeHeader);
CSeqHeader cSeqHeader = (CSeqHeader)request.getHeader(CSeqHeader.NAME); CSeqHeader cSeqHeader = (CSeqHeader)request.getHeader(CSeqHeader.NAME);
cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ(Request.SUBSCRIBE)); cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ());
request.removeHeader(CSeqHeader.NAME); request.removeHeader(CSeqHeader.NAME);
request.addHeader(cSeqHeader); request.addHeader(cSeqHeader);
}else { }else {
@ -1554,7 +1568,7 @@ public class SIPCommander implements ISIPCommander {
request.setContent(cmdXml.toString(), contentTypeHeader); request.setContent(cmdXml.toString(), contentTypeHeader);
CSeqHeader cSeqHeader = (CSeqHeader)request.getHeader(CSeqHeader.NAME); CSeqHeader cSeqHeader = (CSeqHeader)request.getHeader(CSeqHeader.NAME);
cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ(Request.SUBSCRIBE)); cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ());
request.removeHeader(CSeqHeader.NAME); request.removeHeader(CSeqHeader.NAME);
request.addHeader(cSeqHeader); request.addHeader(cSeqHeader);
@ -1664,10 +1678,9 @@ public class SIPCommander implements ISIPCommander {
@Override @Override
public void playPauseCmd(Device device, StreamInfo streamInfo) { public void playPauseCmd(Device device, StreamInfo streamInfo) {
try { try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("PAUSE RTSP/1.0\r\n"); content.append("PAUSE RTSP/1.0\r\n");
content.append("CSeq: " + cseq + "\r\n"); content.append("CSeq: " + getInfoCseq() + "\r\n");
content.append("PauseTime: now\r\n"); content.append("PauseTime: now\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
if (request == null) { if (request == null) {
@ -1695,10 +1708,9 @@ public class SIPCommander implements ISIPCommander {
@Override @Override
public void playResumeCmd(Device device, StreamInfo streamInfo) { public void playResumeCmd(Device device, StreamInfo streamInfo) {
try { try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("PLAY RTSP/1.0\r\n"); content.append("PLAY RTSP/1.0\r\n");
content.append("CSeq: " + cseq + "\r\n"); content.append("CSeq: " + getInfoCseq() + "\r\n");
content.append("Range: npt=now-\r\n"); content.append("Range: npt=now-\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
if (request == null) { if (request == null) {
@ -1725,10 +1737,9 @@ public class SIPCommander implements ISIPCommander {
@Override @Override
public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) { public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) {
try { try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("PLAY RTSP/1.0\r\n"); content.append("PLAY RTSP/1.0\r\n");
content.append("CSeq: " + cseq + "\r\n"); content.append("CSeq: " + getInfoCseq() + "\r\n");
content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n"); content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
@ -1756,11 +1767,11 @@ public class SIPCommander implements ISIPCommander {
@Override @Override
public void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) { public void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) {
try { try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("PLAY RTSP/1.0\r\n"); content.append("PLAY RTSP/1.0\r\n");
content.append("CSeq: " + cseq + "\r\n"); content.append("CSeq: " + getInfoCseq() + "\r\n");
content.append("Scale: " + String.format("%.1f",speed) + "\r\n"); content.append("Scale: " + String.format("%.6f",speed) + "\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
if (request == null) { if (request == null) {
return; return;
@ -1780,6 +1791,10 @@ public class SIPCommander implements ISIPCommander {
} }
} }
private int getInfoCseq() {
return (int) ((Math.random() * 9 + 1) * Math.pow(10, 8));
}
@Override @Override
public void playbackControlCmd(Device device, StreamInfo streamInfo, String content,SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) { public void playbackControlCmd(Device device, StreamInfo streamInfo, String content,SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) {
try { try {
@ -1787,7 +1802,6 @@ public class SIPCommander implements ISIPCommander {
if (request == null) { if (request == null) {
return; return;
} }
logger.info(request.toString());
ClientTransaction clientTransaction = null; ClientTransaction clientTransaction = null;
if ("TCP".equals(device.getTransport())) { if ("TCP".equals(device.getTransport())) {
clientTransaction = tcpSipProvider.getNewClientTransaction(request); clientTransaction = tcpSipProvider.getNewClientTransaction(request);

View File

@ -105,7 +105,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
} }
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform,
redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, redisCatchStorage.getCSEQ(), "FromRegister" + tm,
"z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader); "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader);
// 将 callid 写入缓存, 等注册成功可以更新状态 // 将 callid 写入缓存, 等注册成功可以更新状态
String callIdFromHeader = callIdHeader.getCallId(); String callIdFromHeader = callIdHeader.getCallId();

View File

@ -2,24 +2,32 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import gov.nist.javax.sip.ResponseEventExt; import gov.nist.javax.sip.ResponseEventExt;
import gov.nist.javax.sip.message.SIPResponse;
import gov.nist.javax.sip.stack.SIPDialog; import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException; import javax.sdp.SdpFactory;
import javax.sip.ResponseEvent; import javax.sdp.SdpParseException;
import javax.sip.SipException; import javax.sdp.SessionDescription;
import javax.sip.*;
import javax.sip.address.Address;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.CSeqHeader; import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.header.UserAgentHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
/** /**
@ -34,23 +42,24 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
private final String method = "INVITE"; private final String method = "INVITE";
@Autowired @Autowired
private SipLayer sipLayer; private VideoStreamSessionManager streamSession;
@Autowired
private SipConfig config;
@Autowired @Autowired
private SIPProcessorObserver sipProcessorObserver; private SIPProcessorObserver sipProcessorObserver;
@Autowired
private SipConfig sipConfig;
@Autowired
private SipFactory sipFactory;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅 // 添加消息处理的订阅
sipProcessorObserver.addResponseProcessor(method, this); sipProcessorObserver.addResponseProcessor(method, this);
} }
@Autowired
private VideoStreamSessionManager streamSession;
/** /**
* invite * invite
@ -74,6 +83,19 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME); CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
Request reqAck = dialog.createAck(cseq.getSeqNumber()); Request reqAck = dialog.createAck(cseq.getSeqNumber());
SipURI requestURI = (SipURI) reqAck.getRequestURI(); SipURI requestURI = (SipURI) reqAck.getRequestURI();
String contentString = new String(response.getRawContent());
// jainSip不支持y=字段, 移除以解析。
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
SessionDescription sdp;
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有“f=”字段
String substring = contentString.substring(0, contentString.indexOf("y="));
sdp = SdpFactory.getInstance().createSessionDescription(substring);
} else {
sdp = SdpFactory.getInstance().createSessionDescription(contentString);
}
requestURI.setUser(sdp.getOrigin().getUsername());
try { try {
requestURI.setHost(event.getRemoteIpAddress()); requestURI.setHost(event.getRemoteIpAddress());
} catch (ParseException e) { } catch (ParseException e) {
@ -81,6 +103,18 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
} }
requestURI.setPort(event.getRemotePort()); requestURI.setPort(event.getRemotePort());
reqAck.setRequestURI(requestURI); reqAck.setRequestURI(requestURI);
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
// TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
} catch (ParseException e) {
throw new RuntimeException(e);
}
reqAck.addHeader(userAgentHeader);
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
reqAck.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
logger.info("[回复ack] {}-> {}:{} ",requestURI, event.getRemoteIpAddress(), event.getRemotePort()); logger.info("[回复ack] {}-> {}:{} ",requestURI, event.getRemoteIpAddress(), event.getRemotePort());
dialog.sendAck(reqAck); dialog.sendAck(reqAck);
@ -88,6 +122,10 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
} }
} catch (InvalidArgumentException | SipException e) { } catch (InvalidArgumentException | SipException e) {
e.printStackTrace(); e.printStackTrace();
} catch (ParseException e) {
throw new RuntimeException(e);
} catch (SdpParseException e) {
throw new RuntimeException(e);
} }
} }

View File

@ -98,9 +98,7 @@ public class ZLMHttpHookListener {
@PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8")
public ResponseEntity<String> onServerKeepalive(@RequestBody JSONObject json){ public ResponseEntity<String> onServerKeepalive(@RequestBody JSONObject json){
if (logger.isDebugEnabled()) { logger.info("[ ZLM HOOK ] on_server_keepalive API调用参数" + json.toString());
logger.debug("[ ZLM HOOK ] on_server_keepalive API调用参数" + json.toString());
}
String mediaServerId = json.getString("mediaServerId"); String mediaServerId = json.getString("mediaServerId");
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
if (subscribes != null && subscribes.size() > 0) { if (subscribes != null && subscribes.size() > 0) {
@ -445,12 +443,15 @@ public class ZLMHttpHookListener {
if (streamInfo!=null){ if (streamInfo!=null){
redisCatchStorage.stopPlay(streamInfo); redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
// 如果正在给上级推送则发送bye
}else{ }else{
streamInfo = redisCatchStorage.queryPlayback(null, null, stream, null); streamInfo = redisCatchStorage.queryPlayback(null, null, stream, null);
if (streamInfo != null) { if (streamInfo != null) {
redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(), redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(),
streamInfo.getStream(), null); streamInfo.getStream(), null);
} }
// 如果正在给上级推送则发送bye
} }
}else { }else {
if (!"rtp".equals(app)){ if (!"rtp".equals(app)){

View File

@ -38,7 +38,6 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.JedisUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@ -101,9 +100,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired @Autowired
private EventPublisher publisher; private EventPublisher publisher;
@Autowired
JedisUtil jedisUtil;
/** /**
* *
*/ */
@ -291,13 +287,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
return null; return null;
} }
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId; String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key); return (MediaServerItem)redisUtil.get(key);
if(null==serverItem){
//zlm服务不在线启动重连
reloadZlm();
serverItem=(MediaServerItem)redisUtil.get(key);
}
return serverItem;
} }
@Override @Override
@ -426,7 +416,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
} }
redisUtil.set(key, serverItem); redisUtil.set(key, serverItem);
resetOnlineServerItem(serverItem); resetOnlineServerItem(serverItem);
updateMediaServerKeepalive(serverItem.getId(), null);
if (serverItem.isAutoConfig()) { if (serverItem.isAutoConfig()) {
setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable())); setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
} }
@ -490,9 +479,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) { if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
logger.info("获取负载最低的节点时无在线节点,启动重连机制");
//启动重连
reloadZlm();
if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) { if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
logger.info("获取负载最低的节点时无在线节点"); logger.info("获取负载最低的节点时无在线节点");
return null; return null;
@ -657,6 +643,11 @@ public class MediaServerServiceImpl implements IMediaServerService {
public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) { public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
MediaServerItem mediaServerItem = getOne(mediaServerId); MediaServerItem mediaServerItem = getOne(mediaServerId);
if (mediaServerItem == null) { if (mediaServerItem == null) {
// 缓存不存在,从数据库查询,如果数据库不存在则是错误的
MediaServerItem mediaServerItemFromDatabase = getOneFromDatabase(mediaServerId);
if (mediaServerItemFromDatabase == null) {
return;
}
// zlm连接重试 // zlm连接重试
logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm"); logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm");
reloadZlm(); reloadZlm();
@ -672,6 +663,10 @@ public class MediaServerServiceImpl implements IMediaServerService {
redisUtil.set(key, data, hookAliveInterval); redisUtil.set(key, data, hookAliveInterval);
} }
private MediaServerItem getOneFromDatabase(String mediaServerId) {
return mediaServerMapper.queryOne(mediaServerId);
}
@Override @Override
public void syncCatchFromDatabase() { public void syncCatchFromDatabase() {
List<MediaServerItem> allInCatch = getAll(); List<MediaServerItem> allInCatch = getAll();

View File

@ -2,9 +2,7 @@ package com.genersoft.iot.vmp.service.impl;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.util.List; import java.util.*;
import java.util.Objects;
import java.util.UUID;
import javax.sip.ResponseEvent; import javax.sip.ResponseEvent;
@ -12,8 +10,10 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
@ -131,6 +131,10 @@ public class PlayServiceImpl implements IPlayService {
private ZLMHttpHookSubscribe subscribe; private ZLMHttpHookSubscribe subscribe;
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
@ -162,6 +166,7 @@ public class PlayServiceImpl implements IPlayService {
result.onCompletion(()->{ result.onCompletion(()->{
// 点播结束时调用截图接口 // 点播结束时调用截图接口
taskExecutor.execute(()->{
// TODO 应该在上流时调用更好,结束也可能是错误结束 // TODO 应该在上流时调用更好,结束也可能是错误结束
String path = "snap"; String path = "snap";
String fileName = deviceId + "_" + channelId + ".jpg"; String fileName = deviceId + "_" + channelId + ".jpg";
@ -178,6 +183,7 @@ public class PlayServiceImpl implements IPlayService {
} }
} }
}); });
});
if (streamInfo != null) { if (streamInfo != null) {
String streamId = streamInfo.getStream(); String streamId = streamInfo.getStream();
if (streamId == null) { if (streamId == null) {
@ -759,6 +765,53 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public void zlmServerOnline(String mediaServerId) { public void zlmServerOnline(String mediaServerId) {
// 似乎没啥需要做的 // TODO 查找之前的点播流如果不存在则给下级发送bye
// MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
// zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
// Integer code = mediaList.getInteger("code");
// if (code == 0) {
// JSONArray data = mediaList.getJSONArray("data");
// if (data == null || data.size() == 0) {
// zlmServerOffline(mediaServerId);
// }else {
// Map<String, JSONObject> mediaListMap = new HashMap<>();
// for (int i = 0; i < data.size(); i++) {
// JSONObject json = data.getJSONObject(i);
// String app = json.getString("app");
// if ("rtp".equals(app)) {
// String stream = json.getString("stream");
// if (mediaListMap.get(stream) != null) {
// continue;
// }
// mediaListMap.put(stream, json);
// // 处理正在观看的国标设备
// List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(null, null, null, stream);
// if (ssrcTransactions.size() > 0) {
// for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
// if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
// cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
// ssrcTransaction.getStream(), null);
// }
// }
// }
// }
// }
// if (mediaListMap.size() > 0 ) {
// // 处理正在向上推流的上级平台
// List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
// if (sendRtpItems.size() > 0) {
// for (SendRtpItem sendRtpItem : sendRtpItems) {
// if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
// if (mediaListMap.get(sendRtpItem.getStreamId()) == null) {
// ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
// sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
// }
// }
// }
// }
// }
// }
// }
// }));
} }
} }

View File

@ -17,10 +17,9 @@ public interface IRedisCatchStorage {
/** /**
* cseq * cseq
* *
* @param method sip
* @return * @return
*/ */
Long getCSEQ(String method); Long getCSEQ();
/** /**
* *

View File

@ -42,8 +42,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
private UserSetting userSetting; private UserSetting userSetting;
@Override @Override
public Long getCSEQ(String method) { public Long getCSEQ() {
String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId() + "_" + method; String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId();
long result = redis.incr(key, 1L); long result = redis.incr(key, 1L);
if (result > Integer.MAX_VALUE) { if (result > Integer.MAX_VALUE) {

View File

@ -1,97 +0,0 @@
package com.genersoft.iot.vmp.utils.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.Set;
/**
* @description:Jedis
* @author: wangshaopeng@sunnybs.com
* @date: 20210322 8:27:29
*/
@Component
public class JedisUtil {
@Autowired
private JedisPool jedisPool;
// ============================== Key ==============================
/**
* key
*
* @param key
* @return
*/
public Boolean exists(String key) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Boolean exists = jedis.exists(key);
return exists;
} finally {
returnToPool(jedis);
}
}
// ============================== Set ==============================
/**
* SADD key member [member ...]
* member key member
* key member
* key
*/
public Long sadd(String key, String... members) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Long smove = jedis.sadd(key, members);
return smove;
} finally {
returnToPool(jedis);
}
}
/**
* SMEMBERS key
* key
* key
*/
public Set<String> smembers(String key) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Set<String> smembers = jedis.smembers(key);
return smembers;
} finally {
returnToPool(jedis);
}
}
/**
* SREM key member1 [member2]
*
*/
public Long srem(String key, String... member) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Long srem = jedis.srem(key, member);
return srem;
} finally {
returnToPool(jedis);
}
}
private void returnToPool(Jedis jedis) {
if (jedis != null) {
jedis.close();
}
}
}

View File

@ -152,6 +152,8 @@ public class PlayController {
// 超时处理 // 超时处理
result.onTimeout(()->{ result.onTimeout(()->{
logger.warn(String.format("设备预览/回放停止超时deviceId/channelId%s_%s ", deviceId, channelId)); logger.warn(String.format("设备预览/回放停止超时deviceId/channelId%s_%s ", deviceId, channelId));
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
msg.setId(uuid); msg.setId(uuid);
msg.setKey(key); msg.setKey(key);