将device信息写入redis以提高sip处理速度

pull/276/head
648540858 2021-12-13 17:20:23 +08:00
parent b6fa459bc3
commit bc0319b3f3
12 changed files with 111 additions and 10 deletions

View File

@ -318,7 +318,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} else { } else {
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = storager.queryVideoDevice(requesterId); Device device = redisCatchStorage.getDevice(requesterId);
if (device != null) { if (device != null) {
logger.info("收到设备" + requesterId + "的语音广播Invite请求"); logger.info("收到设备" + requesterId + "的语音广播Invite请求");
responseAck(evt, Response.TRYING); responseAck(evt, Response.TRYING);

View File

@ -112,7 +112,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
MobilePosition mobilePosition = new MobilePosition(); MobilePosition mobilePosition = new MobilePosition();
Element deviceIdElement = rootElement.element("DeviceID"); Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getTextTrim().toString(); String deviceId = deviceIdElement.getTextTrim().toString();
Device device = storager.queryVideoDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
if (device != null) { if (device != null) {
if (!StringUtils.isEmpty(device.getName())) { if (!StringUtils.isEmpty(device.getName())) {
mobilePosition.setDeviceName(device.getName()); mobilePosition.setDeviceName(device.getName());
@ -168,7 +168,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
Element deviceIdElement = rootElement.element("DeviceID"); Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getText().toString(); String deviceId = deviceIdElement.getText().toString();
Device device = storager.queryVideoDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) { if (device == null) {
return; return;
} }
@ -235,7 +235,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
Element rootElement = getRootElement(evt); Element rootElement = getRootElement(evt);
Device device = storager.queryVideoDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) { if (device == null) {
return; return;
} }

View File

@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import gov.nist.javax.sip.RequestEventExt; import gov.nist.javax.sip.RequestEventExt;
import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.AddressImpl;
@ -51,6 +52,9 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
@Autowired @Autowired
private RegisterLogicHandler handler; private RegisterLogicHandler handler;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired @Autowired
private IVideoManagerStorager storager; private IVideoManagerStorager storager;
@ -86,7 +90,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
AddressImpl address = (AddressImpl) fromHeader.getAddress(); AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI(); SipUri uri = (SipUri) address.getURI();
String deviceId = uri.getUser(); String deviceId = uri.getUser();
Device device = storager.queryVideoDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
AuthorizationHeader authorhead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); AuthorizationHeader authorhead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
// 校验密码是否正确 // 校验密码是否正确
if (authorhead != null) { if (authorhead != null) {

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
@ -38,6 +39,9 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
@Autowired @Autowired
private IVideoManagerStorager storage; private IVideoManagerStorager storage;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅 // 添加消息处理的订阅
@ -53,7 +57,7 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
logger.debug("接收到消息:" + evt.getRequest()); logger.debug("接收到消息:" + evt.getRequest());
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
// 查询设备是否存在 // 查询设备是否存在
Device device = storage.queryVideoDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
// 查询上级平台是否存在 // 查询上级平台是否存在
ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId); ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId);
try { try {

View File

@ -467,7 +467,7 @@ public class ZLMHttpHookListener {
if (s.length == 2) { if (s.length == 2) {
String deviceId = s[0]; String deviceId = s[0];
String channelId = s[1]; String channelId = s[1];
Device device = storager.queryVideoDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
if (device != null) { if (device != null) {
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
SSRCInfo ssrcInfo; SSRCInfo ssrcInfo;

View File

@ -63,4 +63,6 @@ public interface IStreamPushService {
void zlmServerOffline(String mediaServerId); void zlmServerOffline(String mediaServerId);
void clean(); void clean();
boolean saveToRandomGB();
} }

View File

@ -96,7 +96,7 @@ public class PlayServiceImpl implements IPlayService {
resultHolder.invokeResult(msg); resultHolder.invokeResult(msg);
return playResult; return playResult;
} }
Device device = storager.queryVideoDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
playResult.setDevice(device); playResult.setDevice(device);
// 超时处理 // 超时处理

View File

@ -255,4 +255,30 @@ public class StreamPushServiceImpl implements IStreamPushService {
public void clean() { public void clean() {
} }
@Override
public boolean saveToRandomGB() {
List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
long gbId = 100001;
for (StreamPushItem streamPushItem : streamPushItems) {
streamPushItem.setStreamType("push");
streamPushItem.setStatus(true);
streamPushItem.setGbId("34020000004111" + gbId);
gbId ++;
}
int limitCount = 30;
if (streamPushItems.size() > limitCount) {
for (int i = 0; i < streamPushItems.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > streamPushItems.size()) {
toIndex = streamPushItems.size();
}
gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
}
}else {
gbStreamMapper.batchAdd(streamPushItems);
}
return true;
}
} }

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
@ -169,4 +170,15 @@ public interface IRedisCatchStorage {
ThirdPartyGB queryMemberNoGBId(String queryKey); ThirdPartyGB queryMemberNoGBId(String queryKey);
List<StreamInfo> getStreams(String mediaServerId, String pull); List<StreamInfo> getStreams(String mediaServerId, String pull);
/**
* deviceredis
* @param device
*/
void updateDevice(Device device);
/**
* Device
*/
Device getDevice(String deviceId);
} }

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import org.apache.ibatis.annotations.*; import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
@ -79,4 +80,17 @@ public interface GbStreamMapper {
"</foreach>" + "</foreach>" +
"</script>") "</script>")
void batchDel(List<StreamProxyItem> streamProxyItemList); void batchDel(List<StreamProxyItem> streamProxyItemList);
@Insert("<script> " +
"insert into gb_stream " +
"(app, stream, gbId, name, " +
"longitude, latitude, streamType, mediaServerId, status)" +
"values " +
"<foreach collection='subList' index='index' item='item' separator=','> " +
"('${item.app}', '${item.stream}', '${item.gbId}', '${item.name}', " +
"'${item.longitude}', '${item.latitude}', '${item.streamType}', " +
"'${item.mediaServerId}', ${item.status}) "+
"</foreach> " +
"</script>")
void batchAdd(List<StreamPushItem> subList);
} }

View File

@ -377,4 +377,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
} }
return result; return result;
} }
@Override
public void updateDevice(Device device) {
String key = VideoManagerConstants.DEVICE_PREFIX + userSetup.getServerId() + "_" + device.getDeviceId();
redis.set(key, device);
}
@Override
public Device getDevice(String deviceId) {
String key = VideoManagerConstants.DEVICE_PREFIX + userSetup.getServerId() + "_" + deviceId;
return (Device)redis.get(key);
}
} }

View File

@ -110,6 +110,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
*/ */
@Override @Override
public synchronized boolean create(Device device) { public synchronized boolean create(Device device) {
redisCatchStorage.updateDevice(device);
return deviceMapper.add(device) > 0; return deviceMapper.add(device) > 0;
} }
@ -128,11 +129,14 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
Device deviceByDeviceId = deviceMapper.getDeviceByDeviceId(device.getDeviceId()); Device deviceByDeviceId = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
if (deviceByDeviceId == null) { if (deviceByDeviceId == null) {
device.setCreateTime(now); device.setCreateTime(now);
redisCatchStorage.updateDevice(device);
return deviceMapper.add(device) > 0; return deviceMapper.add(device) > 0;
}else { }else {
redisCatchStorage.updateDevice(device);
return deviceMapper.update(device) > 0; return deviceMapper.update(device) > 0;
} }
} }
@Override @Override
@ -185,11 +189,32 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
} }
} }
} }
int limitCount = 300;
if (addChannels.size() > 0) { if (addChannels.size() > 0) {
deviceChannelMapper.batchAdd(addChannels); if (addChannels.size() > limitCount) {
for (int i = 0; i < addChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > addChannels.size()) {
toIndex = addChannels.size();
}
deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex));
}
}else {
deviceChannelMapper.batchAdd(addChannels);
}
} }
if (updateChannels.size() > 0) { if (updateChannels.size() > 0) {
deviceChannelMapper.batchUpdate(updateChannels); if (updateChannels.size() > limitCount) {
for (int i = 0; i < updateChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > updateChannels.size()) {
toIndex = updateChannels.size();
}
deviceChannelMapper.batchAdd(updateChannels.subList(i, toIndex));
}
}else {
deviceChannelMapper.batchUpdate(updateChannels);
}
} }
} }
} }
@ -322,6 +347,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
} }
device.setOnline(1); device.setOnline(1);
logger.info("更新设备在线: " + deviceId); logger.info("更新设备在线: " + deviceId);
redisCatchStorage.updateDevice(device);
return deviceMapper.update(device) > 0; return deviceMapper.update(device) > 0;
} }
@ -337,6 +363,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
Device device = deviceMapper.getDeviceByDeviceId(deviceId); Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) return false; if (device == null) return false;
device.setOnline(0); device.setOnline(0);
redisCatchStorage.updateDevice(device);
return deviceMapper.update(device) > 0; return deviceMapper.update(device) > 0;
} }