完善国标级联点播推流,拉流代理,国标

pull/1642/head
648540858 2024-08-17 22:31:13 +08:00
parent 20f93d52d7
commit 9247c7e805
13 changed files with 53 additions and 40 deletions

View File

@ -38,32 +38,33 @@ public class SipPlatformRunner implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
// 获取所有启用的平台
List<Platform> parentPlatforms = storager.queryEnableParentPlatformList(true);
List<Platform> parentPlatforms = platformService.queryEnablePlatformList();
for (Platform parentPlatform : parentPlatforms) {
for (Platform platform : parentPlatforms) {
PlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
PlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId());
// 更新缓存
PlatformCatch parentPlatformCatch = new PlatformCatch();
parentPlatformCatch.setPlatform(parentPlatform);
parentPlatformCatch.setId(parentPlatform.getServerGBId());
parentPlatformCatch.setPlatform(platform);
parentPlatformCatch.setId(platform.getServerGBId());
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
if (parentPlatformCatchOld != null) {
// 取消订阅
try {
sipCommanderForPlatform.unregister(parentPlatform, parentPlatformCatchOld.getSipTransactionInfo(), null, (eventResult)->{
platformService.login(parentPlatform);
log.info("[平台主动注销] {}({})", platform.getName(), platform.getServerGBId());
sipCommanderForPlatform.unregister(platform, parentPlatformCatchOld.getSipTransactionInfo(), null, (eventResult)->{
platformService.login(platform);
});
} catch (Exception e) {
log.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
platformService.offline(parentPlatform, true);
platformService.offline(platform, true);
continue;
}
}
// 设置所有平台离线
platformService.offline(parentPlatform, false);
// 设置平台离线
platformService.offline(platform, false);
}
}
}

View File

@ -93,7 +93,7 @@ public interface PlatformChannelMapper {
"left join wvp_device_channel dc on " +
"dc.id = pgc.device_channel_id " +
"WHERE " +
"dc.channel_id = #{channelId} and pp.status = true " +
"dc.device_id = #{channelId} and pp.status = true " +
"AND pp.server_gb_id IN" +
"<foreach collection='platforms' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
"</script> ")

View File

@ -83,4 +83,7 @@ public interface PlatformMapper {
@Update("UPDATE wvp_platform SET status=#{online} WHERE server_gb_id=#{platformGbID}" )
int updateStatus(@Param("platformGbID") String platformGbID, @Param("online") boolean online);
@Select("SELECT * FROM wvp_platform WHERE enable=true")
List<Platform> queryEnablePlatformList();
}

View File

@ -28,11 +28,11 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven
String channelId = event.getRecordInfo().getChannelId();
int count = event.getRecordInfo().getCount();
int sumNum = event.getRecordInfo().getSumNum();
log.info("录像查询完成事件触发deviceId{}, channelId: {}, 录像数量{}/{}条", event.getRecordInfo().getDeviceId(),
log.info("录像查询事件触发deviceId{}, channelId: {}, 录像数量{}/{}条", event.getRecordInfo().getDeviceId(),
event.getRecordInfo().getChannelId(), count,sumNum);
if (!handlerMap.isEmpty()) {
RecordEndEventHandler handler = handlerMap.get(deviceId + channelId);
log.info("录像查询完成事件触发, 发送订阅deviceId{}, channelId: {}",
log.info("录像查询事件触发, 发送订阅deviceId{}, channelId: {}",
event.getRecordInfo().getDeviceId(), event.getRecordInfo().getChannelId());
if (handler !=null){
handler.handler(event.getRecordInfo());

View File

@ -12,6 +12,7 @@ import com.github.pagehelper.PageInfo;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.List;
/**
*
@ -84,4 +85,8 @@ public interface IPlatformService {
void addSimulatedSubscribeInfo(Platform parentPlatform);
Platform queryOne(Integer platformId);
List<Platform> queryEnablePlatformList();
}

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.PlayException;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushPlayService;
import lombok.extern.slf4j.Slf4j;
@ -38,13 +39,13 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
}
log.info("[点播通用通道] 类型:{} 通道: {}({})", inviteInfo.getSessionName(), channel.getGbName(), channel.getGbDeviceId());
if ("Play".equalsIgnoreCase(inviteInfo.getSessionName())) {
if (channel.getGbDeviceDbId() > 0) {
if (channel.getGbDeviceDbId() != null) {
// 国标通道
playGbDeviceChannel(channel, callback);
} else if (channel.getStreamProxyId() > 0) {
} else if (channel.getStreamProxyId() != null) {
// 拉流代理
playProxy(channel, callback);
} else if (channel.getStreamPushId() > 0) {
} else if (channel.getStreamPushId() != null) {
// 推流
playPush(channel, platform.getServerGBId(), platform.getName(), callback);
} else {
@ -53,14 +54,14 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
}
}else if ("Playback".equals(inviteInfo.getSessionName())) {
if (channel.getGbDeviceDbId() > 0) {
if (channel.getGbDeviceDbId() != null) {
// 国标通道
playbackGbDeviceChannel(channel, inviteInfo.getStartTime(), inviteInfo.getStopTime(), callback);
} else if (channel.getStreamProxyId() > 0) {
} else if (channel.getStreamProxyId() != null) {
// 拉流代理
log.warn("[回放通用通道] 不支持回放拉流代理的录像: {}({})", channel.getGbName(), channel.getGbDeviceId());
throw new PlayException(Response.FORBIDDEN, "forbidden");
} else if (channel.getStreamPushId() > 0) {
} else if (channel.getStreamPushId() != null) {
// 推流
log.warn("[回放通用通道] 不支持回放推流的录像: {}({})", channel.getGbName(), channel.getGbDeviceId());
throw new PlayException(Response.FORBIDDEN, "forbidden");
@ -70,7 +71,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
}
}else if ("Download".equals(inviteInfo.getSessionName())) {
if (channel.getGbDeviceDbId() > 0) {
if (channel.getGbDeviceDbId() != null) {
int downloadSpeed = 4;
try {
if (inviteInfo.getDownloadSpeed() != null){
@ -80,11 +81,11 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
// 国标通道
downloadGbDeviceChannel(channel, inviteInfo.getStartTime(), inviteInfo.getStopTime(), downloadSpeed, callback);
} else if (channel.getStreamProxyId() > 0) {
} else if (channel.getStreamProxyId() != null) {
// 拉流代理
log.warn("[下载通用通道录像] 不支持下载拉流代理的录像: {}({})", channel.getGbName(), channel.getGbDeviceId());
throw new PlayException(Response.FORBIDDEN, "forbidden");
} else if (channel.getStreamPushId() > 0) {
} else if (channel.getStreamPushId() != null) {
// 推流
log.warn("[下载通用通道录像] 不支持下载推流的录像: {}({})", channel.getGbName(), channel.getGbDeviceId());
throw new PlayException(Response.FORBIDDEN, "forbidden");
@ -107,6 +108,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
} catch (PlayException e) {
callback.run(e.getCode(), e.getMsg(), null);
} catch (Exception e) {
log.error("[点播失败] {}({})", channel.getGbName(), channel.getGbDeviceId(), e);
callback.run(Response.BUSY_HERE, "busy here", null);
}
}
@ -118,7 +120,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
if (streamInfo == null) {
callback.run(Response.BUSY_HERE, "busy here", null);
}else {
callback.run(Response.OK, "success", streamInfo);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
}catch (Exception e) {
callback.run(Response.BUSY_HERE, "busy here", null);

View File

@ -370,7 +370,6 @@ public class GbChannelServiceImpl implements IGbChannelService {
return channelList;
}
private Set<Group> getAllGroup(Set<Group> regionChannelList ) {
if (regionChannelList.isEmpty()) {
return new HashSet<>();

View File

@ -43,6 +43,7 @@ import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.Vector;
@ -365,7 +366,7 @@ public class PlatformServiceImpl implements IPlatformService {
@Override
public void offline(Platform platform, boolean stopRegister) {
log.info("[平台离线]{}", platform.getServerGBId());
log.info("[平台离线]{}({})", platform.getName(), platform.getServerGBId());
PlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId());
platformCatch.setKeepAliveReply(0);
platformCatch.setRegisterAliveReply(0);
@ -376,17 +377,17 @@ public class PlatformServiceImpl implements IPlatformService {
platformMapper.updateStatus(platform.getServerGBId(), false);
// 停止所有推流
log.info("[平台离线] {}, 停止所有推流", platform.getServerGBId());
log.info("[平台离线] {}({}), 停止所有推流", platform.getName(), platform.getServerGBId());
stopAllPush(platform.getServerGBId());
// 清除注册定时
log.info("[平台离线] {}, 停止定时注册任务", platform.getServerGBId());
log.info("[平台离线] {}({}), 停止定时注册任务", platform.getName(), platform.getServerGBId());
final String registerTaskKey = REGISTER_KEY_PREFIX + platform.getServerGBId();
if (dynamicTask.contains(registerTaskKey)) {
dynamicTask.stop(registerTaskKey);
}
// 清除心跳定时
log.info("[平台离线] {}, 停止定时发送心跳任务", platform.getServerGBId());
log.info("[平台离线] {}({}), 停止定时发送心跳任务", platform.getName(), platform.getServerGBId());
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + platform.getServerGBId();
if (dynamicTask.contains(keepaliveTaskKey)) {
// 清除心跳任务
@ -396,11 +397,11 @@ public class PlatformServiceImpl implements IPlatformService {
SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
if (catalogSubscribe != null) {
if (catalogSubscribe.getExpires() > 0) {
log.info("[平台离线] {}, 停止目录订阅回复", platform.getServerGBId());
log.info("[平台离线] {}({}), 停止目录订阅回复", platform.getName(), platform.getServerGBId());
subscribeHolder.removeCatalogSubscribe(platform.getServerGBId());
}
}
log.info("[平台离线] {}, 停止移动位置订阅回复", platform.getServerGBId());
log.info("[平台离线] {}({}), 停止移动位置订阅回复", platform.getName(), platform.getServerGBId());
subscribeHolder.removeMobilePositionSubscribe(platform.getServerGBId());
// 发起定时自动重新注册
if (!stopRegister) {
@ -436,7 +437,7 @@ public class PlatformServiceImpl implements IPlatformService {
// 添加注册任务
dynamicTask.startCron(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法
()-> log.info("[国标级联] {},平台离线后持续发起注册,失败", platform.getServerGBId()),
()-> log.info("[国标级联] {}({}),平台离线后持续发起注册,失败", platform.getName(), platform.getServerGBId()),
60*1000);
}, null);
} catch (InvalidArgumentException | ParseException | SipException e) {
@ -809,4 +810,9 @@ public class PlatformServiceImpl implements IPlatformService {
public Platform queryOne(Integer platformId) {
return platformMapper.query(platformId);
}
@Override
public List<Platform> queryEnablePlatformList() {
return platformMapper.queryEnablePlatformList();
}
}

View File

@ -585,7 +585,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
recordXml.append("</RecordList>\r\n")
.append("</Response>\r\n");
log.info("[国标级联] 发送录像数据通道:{}, 内容: {}", recordInfo.getChannelId(), recordXml);
log.debug("[国标级联] 发送录像数据通道:{}, 内容: {}", recordInfo.getChannelId(), recordXml);
// callid
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport());

View File

@ -28,6 +28,7 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
@ -180,7 +181,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
channelPlayService.start(channel, inviteInfo, platform, ((code, msg, streamInfo) -> {
if (code != Response.OK) {
if (code != InviteErrorCode.SUCCESS.getCode()) {
try {
responseAck(request, code, msg);
} catch (SipException | InvalidArgumentException | ParseException e) {

View File

@ -106,7 +106,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp
}
return;
}
if (channel.getStreamProxyId() > 0 || channel.getStreamPushId() > 0) {
if (channel.getStreamProxyId() != null || channel.getStreamPushId() != null ) {
log.info("[平台查询录像记录] 不支持查询推流和拉流代理的录像数据 {}/{}", parentPlatform.getName(), channelId );
try {
responseAck(request, Response.NOT_IMPLEMENTED); // 回复未实现
@ -126,7 +126,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp
return;
}
// 接收录像数据
recordEndEventListener.addEndEventHandler(channel.getGbDeviceId(), channelId, (recordInfo)->{
recordEndEventListener.addEndEventHandler(device.getDeviceId(), channelId, (recordInfo)->{
try {
log.info("[国标级联] 录像查询收到数据, 通道: {},准备转发===", channelId);
cmderFroPlatform.recordInfo(channel, parentPlatform, request.getFromTag(), recordInfo);

View File

@ -847,7 +847,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[startSendRtpStream] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
}
log.info("[开始推流] rtp/{}, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
log.info("[开始推流] {}/{}, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem);
}

View File

@ -28,10 +28,6 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
@Autowired
private IMediaServerService mediaServerService;
@Override
public StreamInfo start(int id) {
StreamProxy streamProxy = streamProxyMapper.select(id);