Merge branch 'wvp-28181-2.0' into wvp-28181-2.0

pull/353/head
mrjackwang 2022-03-15 10:28:23 +08:00 committed by GitHub
commit 42901d0374
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 759 additions and 548 deletions

View File

@ -85,7 +85,7 @@ RUN echo '#!/bin/bash' > run.sh && \
echo 'nohup /opt/media/MediaServer -d -m 3 &' >> run.sh && \ echo 'nohup /opt/media/MediaServer -d -m 3 &' >> run.sh && \
echo 'cd /opt/wvp' >> run.sh && \ echo 'cd /opt/wvp' >> run.sh && \
echo 'if [${WVP_CONFIG}]; then' >> run.sh && \ echo 'if [${WVP_CONFIG}]; then' >> run.sh && \
echo ' java -jar *.jar --spring.confi g.location=/opt/wvp/config/application.yml --media.record-assist-port=18081 ${WVP_CONFIG}' >> run.sh && \ echo ' java -jar *.jar --spring.config.location=/opt/wvp/config/application.yml --media.record-assist-port=18081 ${WVP_CONFIG}' >> run.sh && \
echo 'else' >> run.sh && \ echo 'else' >> run.sh && \
echo ' java -jar *.jar --spring.config.location=/opt/wvp/config/application.yml --media.record-assist-port=18081 --media.ip=127.0.0.1 --media.sdp-ip=${WVP_IP} --sip.ip=${WVP_IP} --media.stream-ip=${WVP_IP}' >> run.sh && \ echo ' java -jar *.jar --spring.config.location=/opt/wvp/config/application.yml --media.record-assist-port=18081 --media.ip=127.0.0.1 --media.sdp-ip=${WVP_IP} --sip.ip=${WVP_IP} --media.stream-ip=${WVP_IP}' >> run.sh && \
echo 'fi' >> run.sh echo 'fi' >> run.sh

10
pom.xml
View File

@ -169,13 +169,6 @@
<version>1.2.73</version> <version>1.2.73</version>
</dependency> </dependency>
<!--Guava是一种基于开源的Java库-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
<!-- okhttp --> <!-- okhttp -->
<dependency> <dependency>
<groupId>com.squareup.okhttp3</groupId> <groupId>com.squareup.okhttp3</groupId>
@ -279,6 +272,9 @@
<plugin> <plugin>
<groupId>pl.project13.maven</groupId> <groupId>pl.project13.maven</groupId>
<artifactId>git-commit-id-plugin</artifactId> <artifactId>git-commit-id-plugin</artifactId>
<configuration>
<offline>true</offline>
</configuration>
</plugin> </plugin>
<plugin> <plugin>

View File

@ -60,12 +60,9 @@ public class SipPlatformRunner implements CommandLineRunner {
// 取消订阅 // 取消订阅
sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{
ParentPlatform platform = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId()); // 发送平台未注册消息
sipCommanderForPlatform.register(platform, null, null); publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId());
}); });
// 发送平台未注册消息
publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId());
} }
} }
} }

View File

@ -0,0 +1,5 @@
package com.genersoft.iot.vmp.gb28181.bean;
public interface InviteStreamCallback {
void call(InviteStreamInfo inviteStreamInfo);
}

View File

@ -0,0 +1,61 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
public class InviteStreamInfo {
public InviteStreamInfo(MediaServerItem mediaServerItem, JSONObject response, String callId, String app, String stream) {
this.mediaServerItem = mediaServerItem;
this.response = response;
this.callId = callId;
this.app = app;
this.stream = stream;
}
private MediaServerItem mediaServerItem;
private JSONObject response;
private String callId;
private String app;
private String stream;
public MediaServerItem getMediaServerItem() {
return mediaServerItem;
}
public void setMediaServerItem(MediaServerItem mediaServerItem) {
this.mediaServerItem = mediaServerItem;
}
public JSONObject getResponse() {
return response;
}
public void setResponse(JSONObject response) {
this.response = response;
}
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
}

View File

@ -114,6 +114,21 @@ public class ParentPlatform {
*/ */
private String catalogId; private String catalogId;
/**
*
*/
private boolean catalogSubscribe;
/**
*
*/
private boolean alarmSubscribe;
/**
* GPS
*/
private boolean gpsSubscribe;
public Integer getId() { public Integer getId() {
return id; return id;
} }
@ -290,4 +305,28 @@ public class ParentPlatform {
public void setCatalogId(String catalogId) { public void setCatalogId(String catalogId) {
this.catalogId = catalogId; this.catalogId = catalogId;
} }
public boolean isCatalogSubscribe() {
return catalogSubscribe;
}
public void setCatalogSubscribe(boolean catalogSubscribe) {
this.catalogSubscribe = catalogSubscribe;
}
public boolean isAlarmSubscribe() {
return alarmSubscribe;
}
public void setAlarmSubscribe(boolean alarmSubscribe) {
this.alarmSubscribe = alarmSubscribe;
}
public boolean isGpsSubscribe() {
return gpsSubscribe;
}
public void setGpsSubscribe(boolean gpsSubscribe) {
this.gpsSubscribe = gpsSubscribe;
}
} }

View File

@ -2,6 +2,8 @@ package com.genersoft.iot.vmp.gb28181.bean;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@Component @Component
@ -34,4 +36,14 @@ public class SubscribeHolder {
public void removeMobilePositionSubscribe(String platformId) { public void removeMobilePositionSubscribe(String platformId) {
mobilePositionMap.remove(platformId); mobilePositionMap.remove(platformId);
} }
public List<String> getAllCatalogSubscribePlatform() {
List<String> platforms = new ArrayList<>();
if(catalogMap.size() > 0) {
for (String key : catalogMap.keySet()) {
platforms.add(catalogMap.get(key).getId());
}
}
return platforms;
}
} }

View File

@ -14,19 +14,15 @@ public class SubscribeInfo {
public SubscribeInfo(RequestEvent evt, String id) { public SubscribeInfo(RequestEvent evt, String id) {
this.id = id; this.id = id;
Request request = evt.getRequest(); Request request = evt.getRequest();
CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
this.callId = callIdHeader.getCallId();
FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
this.fromTag = fromHeader.getTag();
ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME); ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME);
this.expires = expiresHeader.getExpires(); this.expires = expiresHeader.getExpires();
EventHeader eventHeader = (EventHeader)request.getHeader(EventHeader.NAME); EventHeader eventHeader = (EventHeader)request.getHeader(EventHeader.NAME);
this.eventId = eventHeader.getEventId(); this.eventId = eventHeader.getEventId();
this.eventType = eventHeader.getEventType(); this.eventType = eventHeader.getEventType();
ViaHeader viaHeader = (ViaHeader)request.getHeader(ViaHeader.NAME);
this.branch = viaHeader.getBranch();
this.transaction = evt.getServerTransaction(); this.transaction = evt.getServerTransaction();
this.dialog = evt.getDialog(); this.dialog = evt.getDialog();
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
this.callId = callIdHeader.getCallId();
} }
private String id; private String id;
@ -34,9 +30,6 @@ public class SubscribeInfo {
private String callId; private String callId;
private String eventId; private String eventId;
private String eventType; private String eventType;
private String fromTag;
private String toTag;
private String branch;
private ServerTransaction transaction; private ServerTransaction transaction;
private Dialog dialog; private Dialog dialog;
@ -52,18 +45,6 @@ public class SubscribeInfo {
return callId; return callId;
} }
public String getFromTag() {
return fromTag;
}
public void setToTag(String toTag) {
this.toTag = toTag;
}
public String getToTag() {
return toTag;
}
public void setId(String id) { public void setId(String id) {
this.id = id; this.id = id;
} }
@ -76,10 +57,6 @@ public class SubscribeInfo {
this.callId = callId; this.callId = callId;
} }
public void setFromTag(String fromTag) {
this.fromTag = fromTag;
}
public String getEventId() { public String getEventId() {
return eventId; return eventId;
} }
@ -96,14 +73,6 @@ public class SubscribeInfo {
this.eventType = eventType; this.eventType = eventType;
} }
public String getBranch() {
return branch;
}
public void setBranch(String branch) {
this.branch = branch;
}
public ServerTransaction getTransaction() { public ServerTransaction getTransaction() {
return transaction; return transaction;
} }

View File

@ -30,7 +30,7 @@ public class SipSubscribe {
// @Scheduled(fixedRate= 100 * 60 * 60 ) // @Scheduled(fixedRate= 100 * 60 * 60 )
@Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次 @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
public void execute(){ public void execute(){
logger.info("[定时任务] 清理过期的订阅信息"); logger.info("[定时任务] 清理过期的SIP订阅信息");
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date()); calendar.setTime(new Date());
calendar.set(Calendar.MINUTE, calendar.get(Calendar.MINUTE) - 5); calendar.set(Calendar.MINUTE, calendar.get(Calendar.MINUTE) - 5);
@ -49,10 +49,10 @@ public class SipSubscribe {
errorTimeSubscribes.remove(key); errorTimeSubscribes.remove(key);
} }
} }
logger.info("okTimeSubscribes.size:{}",okTimeSubscribes.size()); logger.debug("okTimeSubscribes.size:{}",okTimeSubscribes.size());
logger.info("okSubscribes.size:{}",okSubscribes.size()); logger.debug("okSubscribes.size:{}",okSubscribes.size());
logger.info("errorTimeSubscribes.size:{}",errorTimeSubscribes.size()); logger.debug("errorTimeSubscribes.size:{}",errorTimeSubscribes.size());
logger.info("errorSubscribes.size:{}",errorSubscribes.size()); logger.debug("errorSubscribes.size:{}",errorSubscribes.size());
} }
public interface Event { public interface Event {

View File

@ -68,8 +68,6 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId(); String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId();
Device deviceInStore = storager.queryVideoDevice(device.getDeviceId()); Device deviceInStore = storager.queryVideoDevice(device.getDeviceId());
device.setOnline(1); device.setOnline(1);
// 处理上线监听
storager.updateDevice(device);
switch (event.getFrom()) { switch (event.getFrom()) {
// 注册时触发的在线事件先在redis中增加超时超时监听 // 注册时触发的在线事件先在redis中增加超时超时监听
case VideoManagerConstants.EVENT_ONLINE_REGISTER: case VideoManagerConstants.EVENT_ONLINE_REGISTER:
@ -98,7 +96,8 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
break; break;
} }
// 处理上线监听
storager.updateDevice(device);
List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId()); List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON); eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
// 上线添加订阅 // 上线添加订阅

View File

@ -74,7 +74,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
} }
}else { }else {
// 获取所用订阅 // 获取所用订阅
List<String> platforms = redisCatchStorage.getAllSubscribePlatform(); List<String> platforms = subscribeHolder.getAllCatalogSubscribePlatform();
if (event.getDeviceChannels() != null) { if (event.getDeviceChannels() != null) {
if (platforms.size() > 0) { if (platforms.size() > 0) {
for (DeviceChannel deviceChannel : event.getDeviceChannels()) { for (DeviceChannel deviceChannel : event.getDeviceChannels()) {
@ -117,8 +117,6 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId); List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId);
if (parentPlatforms != null && parentPlatforms.size() > 0) { if (parentPlatforms != null && parentPlatforms.size() > 0) {
for (ParentPlatform platform : parentPlatforms) { for (ParentPlatform platform : parentPlatforms) {
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
// SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId()); SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
if (subscribeInfo == null) continue; if (subscribeInfo == null) continue;
logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);

View File

@ -95,14 +95,14 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
logger.debug("\n收到响应\n{}", responseEvent.getResponse()); logger.debug("\n收到响应\n{}", responseEvent.getResponse());
int status = response.getStatusCode(); int status = response.getStatusCode();
if (((status >= 200) && (status < 300)) || status == 401) { // Success! if (((status >= 200) && (status < 300)) || status == Response.UNAUTHORIZED) { // Success!
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod(); String method = cseqHeader.getMethod();
ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
if (sipRequestProcessor != null) { if (sipRequestProcessor != null) {
sipRequestProcessor.process(responseEvent); sipRequestProcessor.process(responseEvent);
} }
if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
if (callIdHeader != null) { if (callIdHeader != null) {
SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -103,7 +104,7 @@ public interface ISIPCommander {
* @param startTime ,yyyy-MM-dd HH:mm:ss * @param startTime ,yyyy-MM-dd HH:mm:ss
* @param endTime ,yyyy-MM-dd HH:mm:ss * @param endTime ,yyyy-MM-dd HH:mm:ss
*/ */
void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime,InviteStreamCallback inviteStreamCallback, InviteStreamCallback event, SipSubscribe.Event errorEvent);
/** /**
* *
@ -114,13 +115,13 @@ public interface ISIPCommander {
* @param endTime ,yyyy-MM-dd HH:mm:ss * @param endTime ,yyyy-MM-dd HH:mm:ss
* @param downloadSpeed * @param downloadSpeed
*/ */
void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, InviteStreamCallback event, SipSubscribe.Event errorEvent);
/** /**
* *
*/ */
void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent); void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent);
void streamByeCmd(String deviceId, String channelId, String stream); void streamByeCmd(String deviceId, String channelId, String stream, String callId);
/** /**
* *

View File

@ -6,6 +6,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
@ -445,27 +447,13 @@ public class SIPCommander implements ISIPCommander {
* @param endTime ,yyyy-MM-dd HH:mm:ss * @param endTime ,yyyy-MM-dd HH:mm:ss
*/ */
@Override @Override
public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
, SipSubscribe.Event errorEvent) { String startTime, String endTime, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
SipSubscribe.Event errorEvent) {
try { try {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", ssrcInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", mediaServerItem.getId());
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
if (event != null) {
event.response(mediaServerItemInUse, json);
}
});
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n"); content.append("v=0\r\n");
content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
@ -530,6 +518,21 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId(); : udpSipProvider.getNewCallId();
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", ssrcInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", mediaServerItem.getId());
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey);
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
if (hookEvent != null) {
InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream());
hookEvent.call(inviteStreamInfo);
}
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
transmitRequest(device, request, errorEvent, okEvent -> { transmitRequest(device, request, errorEvent, okEvent -> {
@ -537,6 +540,9 @@ public class SIPCommander implements ISIPCommander {
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), responseEvent.getClientTransaction()); streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), responseEvent.getClientTransaction());
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), okEvent.dialog); streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), okEvent.dialog);
}); });
if (inviteStreamCallback != null) {
inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
}
} catch ( SipException | ParseException | InvalidArgumentException e) { } catch ( SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -552,24 +558,11 @@ public class SIPCommander implements ISIPCommander {
* @param downloadSpeed * @param downloadSpeed
*/ */
@Override @Override
public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, ZLMHttpHookSubscribe.Event event public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, InviteStreamCallback event
, SipSubscribe.Event errorEvent) { , SipSubscribe.Event errorEvent) {
try { try {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", ssrcInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("mediaServerId", mediaServerItem.getId());
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
event.response(mediaServerItemInUse, json);
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
});
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n"); content.append("v=0\r\n");
content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
@ -637,6 +630,19 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId(); : udpSipProvider.getNewCallId();
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", "rtp");
subscribeKey.put("stream", ssrcInfo.getStream());
subscribeKey.put("regist", true);
subscribeKey.put("mediaServerId", mediaServerItem.getId());
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
event.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
ClientTransaction transaction = transmitRequest(device, request, errorEvent); ClientTransaction transaction = transmitRequest(device, request, errorEvent);
@ -652,15 +658,15 @@ public class SIPCommander implements ISIPCommander {
* , 使 * , 使
*/ */
@Override @Override
public void streamByeCmd(String deviceId, String channelId, String stream) { public void streamByeCmd(String deviceId, String channelId, String stream, String callId) {
streamByeCmd(deviceId, channelId, stream, null); streamByeCmd(deviceId, channelId, stream, callId, null);
} }
/** /**
* *
*/ */
@Override @Override
public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) { public void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent) {
try { try {
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream); SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream); ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream);
@ -672,7 +678,15 @@ public class SIPCommander implements ISIPCommander {
} }
return; return;
} }
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, stream); SIPDialog dialog;
if (callId != null) {
dialog = streamSession.getDialogByCallId(deviceId, channelId, callId);
}else {
if (stream == null) return;
dialog = streamSession.getDialogByStream(deviceId, channelId, stream);
}
if (dialog == null) { if (dialog == null) {
logger.warn("[ {} -> {}]停止视频流的时候发现对话已丢失", deviceId, channelId); logger.warn("[ {} -> {}]停止视频流的时候发现对话已丢失", deviceId, channelId);
return; return;

View File

@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.SerializeUtils; import com.genersoft.iot.vmp.utils.SerializeUtils;
import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.message.MessageFactoryImpl;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPDialog; import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -77,11 +78,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override @Override
public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
parentPlatform.setExpires("0");
if (parentPlatformCatch != null) { if (parentPlatformCatch != null) {
parentPlatformCatch.setParentPlatform(parentPlatform); parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
} }
parentPlatform.setExpires("0");
return register(parentPlatform, null, null, errorEvent, okEvent, false); return register(parentPlatform, null, null, errorEvent, okEvent, false);
} }
@ -101,7 +102,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
callIdHeader = udpSipProvider.getNewCallId(); callIdHeader = udpSipProvider.getNewCallId();
} }
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader); request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform,
redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm,
"z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader);
// 将 callid 写入缓存, 等注册成功可以更新状态 // 将 callid 写入缓存, 等注册成功可以更新状态
String callIdFromHeader = callIdHeader.getCallId(); String callIdFromHeader = callIdHeader.getCallId();
redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, parentPlatform.getServerGBId()); redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, parentPlatform.getServerGBId());
@ -414,11 +417,13 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent, private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent,
SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent ) SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent )
throws NoSuchFieldException, IllegalAccessException, SipException, ParseException { throws NoSuchFieldException, IllegalAccessException, SipException, ParseException {
MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset("gb2312");
Dialog dialog = subscribeInfo.getDialog(); Dialog dialog = subscribeInfo.getDialog();
Request notifyRequest = dialog.createRequest(Request.NOTIFY); if (dialog == null) return;
SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
notifyRequest.setContent(catalogXmlContent, contentTypeHeader); notifyRequest.setContent(catalogXmlContent, contentTypeHeader);
SubscriptionStateHeader subscriptionState = sipFactory.createHeaderFactory() SubscriptionStateHeader subscriptionState = sipFactory.createHeaderFactory()
@ -509,7 +514,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
} }
@Override @Override
public boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List<DeviceChannel> deviceChannels, SubscribeInfo subscribeInfo, Integer index) { public boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List<DeviceChannel> deviceChannels,
SubscribeInfo subscribeInfo, Integer index) {
if (parentPlatform == null if (parentPlatform == null
|| deviceChannels == null || deviceChannels == null
|| deviceChannels.size() == 0 || deviceChannels.size() == 0
@ -577,24 +583,30 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
recordXml.append("<SN>" +recordInfo.getSn() + "</SN>\r\n"); recordXml.append("<SN>" +recordInfo.getSn() + "</SN>\r\n");
recordXml.append("<DeviceID>" + recordInfo.getDeviceId() + "</DeviceID>\r\n"); recordXml.append("<DeviceID>" + recordInfo.getDeviceId() + "</DeviceID>\r\n");
recordXml.append("<SumNum>" + recordInfo.getSumNum() + "</SumNum>\r\n"); recordXml.append("<SumNum>" + recordInfo.getSumNum() + "</SumNum>\r\n");
recordXml.append("<RecordList Num=\"" + recordInfo.getRecordList().size()+"\">\r\n"); if (recordInfo.getRecordList() == null ) {
for (RecordItem recordItem : recordInfo.getRecordList()) { recordXml.append("<RecordList Num=\"0\">\r\n");
recordXml.append("<Item>\r\n"); }else {
if (deviceChannel != null) { recordXml.append("<RecordList Num=\"" + recordInfo.getRecordList().size()+"\">\r\n");
recordXml.append("<DeviceID>" + recordItem.getDeviceId() + "</DeviceID>\r\n"); if (recordInfo.getRecordList().size() > 0) {
recordXml.append("<Name>" + recordItem.getName() + "</Name>\r\n"); for (RecordItem recordItem : recordInfo.getRecordList()) {
recordXml.append("<StartTime>" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getStartTime()) + "</StartTime>\r\n"); recordXml.append("<Item>\r\n");
recordXml.append("<EndTime>" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getEndTime()) + "</EndTime>\r\n"); if (deviceChannel != null) {
recordXml.append("<Secrecy>" + recordItem.getSecrecy() + "</Secrecy>\r\n"); recordXml.append("<DeviceID>" + recordItem.getDeviceId() + "</DeviceID>\r\n");
recordXml.append("<Type>" + recordItem.getType() + "</Type>\r\n"); recordXml.append("<Name>" + recordItem.getName() + "</Name>\r\n");
if (!StringUtils.isEmpty(recordItem.getFileSize())) { recordXml.append("<StartTime>" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getStartTime()) + "</StartTime>\r\n");
recordXml.append("<FileSize>" + recordItem.getFileSize() + "</FileSize>\r\n"); recordXml.append("<EndTime>" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getEndTime()) + "</EndTime>\r\n");
} recordXml.append("<Secrecy>" + recordItem.getSecrecy() + "</Secrecy>\r\n");
if (!StringUtils.isEmpty(recordItem.getFilePath())) { recordXml.append("<Type>" + recordItem.getType() + "</Type>\r\n");
recordXml.append("<FilePath>" + recordItem.getFilePath() + "</FilePath>\r\n"); if (!StringUtils.isEmpty(recordItem.getFileSize())) {
recordXml.append("<FileSize>" + recordItem.getFileSize() + "</FileSize>\r\n");
}
if (!StringUtils.isEmpty(recordItem.getFilePath())) {
recordXml.append("<FilePath>" + recordItem.getFilePath() + "</FilePath>\r\n");
}
}
recordXml.append("</Item>\r\n");
} }
} }
recordXml.append("</Item>\r\n");
} }
recordXml.append("</RecordList>\r\n"); recordXml.append("</RecordList>\r\n");

View File

@ -27,10 +27,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress; import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader; import javax.sip.header.ToHeader;
import java.util.HashMap; import java.util.*;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
/** /**
* SIP ACK * SIP ACK
@ -84,44 +81,72 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
String deviceId = sendRtpItem.getDeviceId(); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
StreamInfo streamInfo = null; logger.info("收到ACK开始向上级推流 rtp/{}", sendRtpItem.getStreamId());
if (sendRtpItem.isPlay()) {
streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
}else {
streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId);
}
if (streamInfo == null) {
streamInfo = new StreamInfo();
streamInfo.setApp(sendRtpItem.getApp());
streamInfo.setStream(sendRtpItem.getStreamId());
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__"); param.put("vhost","__defaultVhost__");
param.put("app",streamInfo.getApp()); param.put("app",sendRtpItem.getApp());
param.put("stream",streamInfo.getStream()); param.put("stream",sendRtpItem.getStreamId());
param.put("ssrc", sendRtpItem.getSsrc()); param.put("ssrc", sendRtpItem.getSsrc());
param.put("dst_url",sendRtpItem.getIp()); param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort()); param.put("dst_port", sendRtpItem.getPort());
param.put("is_udp", is_Udp); param.put("is_udp", is_Udp);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); param.put("src_port", sendRtpItem.getLocalPort());
JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
if (jsonObject.getInteger("code") != 0) {
logger.info("监听流以等待流上线{}/{}", streamInfo.getApp(), streamInfo.getStream());
// 监听流上线
// 添加订阅 // if (streamInfo == null) { // 流还没上来对方就回复ack
JSONObject subscribeKey = new JSONObject(); // logger.info("监听流以等待流上线1 rtp/{}", sendRtpItem.getStreamId());
subscribeKey.put("app", "rtp"); // // 监听流上线
subscribeKey.put("stream", streamInfo.getStream()); // // 添加订阅
subscribeKey.put("regist", true); // JSONObject subscribeKey = new JSONObject();
subscribeKey.put("schema", "rtmp"); // subscribeKey.put("app", "rtp");
subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); // subscribeKey.put("stream", sendRtpItem.getStreamId());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, // subscribeKey.put("regist", true);
(MediaServerItem mediaServerItemInUse, JSONObject json)->{ // subscribeKey.put("schema", "rtmp");
zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); // subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
}); // subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
} // (MediaServerItem mediaServerItemInUse, JSONObject json)->{
// Map<String, Object> param = new HashMap<>();
// param.put("vhost","__defaultVhost__");
// param.put("app",json.getString("app"));
// param.put("stream",json.getString("stream"));
// param.put("ssrc", sendRtpItem.getSsrc());
// param.put("dst_url",sendRtpItem.getIp());
// param.put("dst_port", sendRtpItem.getPort());
// param.put("is_udp", is_Udp);
// param.put("src_port", sendRtpItem.getLocalPort());
// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
// });
// }else {
// Map<String, Object> param = new HashMap<>();
// param.put("vhost","__defaultVhost__");
// param.put("app",streamInfo.getApp());
// param.put("stream",streamInfo.getStream());
// param.put("ssrc", sendRtpItem.getSsrc());
// param.put("dst_url",sendRtpItem.getIp());
// param.put("dst_port", sendRtpItem.getPort());
// param.put("is_udp", is_Udp);
// param.put("src_port", sendRtpItem.getLocalPort());
//
// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
// if (jsonObject.getInteger("code") != 0) {
// logger.info("监听流以等待流上线2 {}/{}", streamInfo.getApp(), streamInfo.getStream());
// // 监听流上线
// // 添加订阅
// JSONObject subscribeKey = new JSONObject();
// subscribeKey.put("app", "rtp");
// subscribeKey.put("stream", streamInfo.getStream());
// subscribeKey.put("regist", true);
// subscribeKey.put("schema", "rtmp");
// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
// (MediaServerItem mediaServerItemInUse, JSONObject json)->{
// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
// });
// }
// }
} }
} }
} }

View File

@ -93,14 +93,16 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
param.put("app",sendRtpItem.getApp()); param.put("app",sendRtpItem.getApp());
param.put("stream",streamId); param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc()); param.put("ssrc",sendRtpItem.getSsrc());
logger.info("停止向上级推流:" + streamId); logger.info("收到bye:停止向上级推流:" + streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount <= 0) { if (totalReaderCount <= 0) {
logger.info(streamId + "无其它观看者,通知设备停止推流"); logger.info("收到bye: {}无其它观看者,通知设备停止推流", streamId);
cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId); if (sendRtpItem.isPlay()) {
cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null);
}
} }
} }
// 可能是设备主动停止 // 可能是设备主动停止

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@ -91,6 +92,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired @Autowired
private SIPProcessorObserver sipProcessorObserver; private SIPProcessorObserver sipProcessorObserver;
@Autowired
private VideoStreamSessionManager sessionManager;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
@ -233,6 +237,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
String username = sdp.getOrigin().getUsername(); String username = sdp.getOrigin().getUsername();
String addressStr = sdp.getOrigin().getAddress(); String addressStr = sdp.getOrigin().getAddress();
logger.info("[上级点播]用户:{} 地址:{}:{} ssrc{}", username, addressStr, port, ssrc); logger.info("[上级点播]用户:{} 地址:{}:{} ssrc{}", username, addressStr, port, ssrc);
Device device = null; Device device = null;
// 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
@ -266,13 +271,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setDialog(dialogByteArray); sendRtpItem.setDialog(dialogByteArray);
byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
sendRtpItem.setTransaction(transactionByteArray); sendRtpItem.setTransaction(transactionByteArray);
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
Long finalStartTime = startTime; Long finalStartTime = startTime;
Long finalStopTime = stopTime; Long finalStopTime = stopTime;
ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{
logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP) {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); String app = responseJSON.getString("app");
String stream = responseJSON.getString("stream");
logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP) {}/{}", app, stream);
// * 0 等待设备推流上来 // * 0 等待设备推流上来
// * 1 下级已经推流等待上级平台回复ack // * 1 下级已经推流等待上级平台回复ack
// * 2 推流中 // * 2 推流中
@ -325,46 +331,66 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
e.printStackTrace(); e.printStackTrace();
} }
}); });
sendRtpItem.setApp("rtp");
if ("Playback".equals(sessionName)) { if ("Playback".equals(sessionName)) {
sendRtpItem.setPlay(false); sendRtpItem.setPlay(false);
sendRtpItem.setStreamId(ssrc); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true);
sendRtpItem.setStreamId(ssrcInfo.getStream());
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
playService.playBack(device.getDeviceId(), channelId, format.format(start), format.format(end),result -> { playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, format.format(start),
if (result.getCode() != 0){ format.format(end), null, result -> {
logger.warn("录像回放失败"); if (result.getCode() != 0){
if (result.getEvent() != null) { logger.warn("录像回放失败");
errorEvent.response(result.getEvent()); if (result.getEvent() != null) {
errorEvent.response(result.getEvent());
}
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
try {
responseAck(evt, Response.REQUEST_TIMEOUT);
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}else {
if (result.getMediaServerItem() != null) {
hookEvent.response(result.getMediaServerItem(), result.getResponse());
}
} }
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); });
try {
responseAck(evt, Response.REQUEST_TIMEOUT);
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}else {
if (result.getMediaServerItem() != null) {
hookEvent.response(result.getMediaServerItem(), result.getResponse());
}
}
});
}else { }else {
sendRtpItem.setPlay(true); sendRtpItem.setPlay(true);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
if (streamInfo == null) { if (playTransaction != null) {
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
if (!streamReady) {
playTransaction = null;
}
}
if (playTransaction == null) {
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true);
if (mediaServerItem.isRtpEnable()) { if (mediaServerItem.isRtpEnable()) {
sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId)); sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId));
}else {
sendRtpItem.setStreamId(ssrcInfo.getStream());
} }
sendRtpItem.setPlay(false); // 写入redis 超时时回复
playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent, errorEvent, ()->{ redisCatchStorage.updateSendRTPSever(sendRtpItem);
playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
}); }, null);
}else { }else {
sendRtpItem.setStreamId(streamInfo.getStream()); sendRtpItem.setStreamId(playTransaction.getStream());
hookEvent.response(mediaServerItem, null); // 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
JSONObject jsonObject = new JSONObject();
jsonObject.put("app", sendRtpItem.getApp());
jsonObject.put("stream", sendRtpItem.getStreamId());
hookEvent.response(mediaServerItem, jsonObject);
} }
} }
}else if (gbStream != null) { }else if (gbStream != null) {

View File

@ -233,7 +233,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
*/ */
private void processNotifyCatalogList(RequestEvent evt) { private void processNotifyCatalogList(RequestEvent evt) {
try { try {
System.out.println(343434);
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);

View File

@ -158,20 +158,14 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
// redisCatchStorage.updateSubscribe(key, subscribeInfo);
}else if (subscribeInfo.getExpires() == 0) { }else if (subscribeInfo.getExpires() == 0) {
dynamicTask.stop(key); dynamicTask.stop(key);
// redisCatchStorage.delSubscribe(key);
subscribeHolder.removeMobilePositionSubscribe(platformId); subscribeHolder.removeMobilePositionSubscribe(platformId);
} }
try { try {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform); responseXmlAck(evt, resultXml.toString(), parentPlatform);
ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
subscribeInfo.setToTag(toHeader.getTag());
redisCatchStorage.updateSubscribe(key, subscribeInfo);
} catch (SipException e) { } catch (SipException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InvalidArgumentException e) { } catch (InvalidArgumentException e) {
@ -211,21 +205,14 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
.append("</Response>\r\n"); .append("</Response>\r\n");
if (subscribeInfo.getExpires() > 0) { if (subscribeInfo.getExpires() > 0) {
// redisCatchStorage.updateSubscribe(key, subscribeInfo);
subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo); subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
}else if (subscribeInfo.getExpires() == 0) { }else if (subscribeInfo.getExpires() == 0) {
// redisCatchStorage.delSubscribe(key);
subscribeHolder.removeCatalogSubscribe(platformId); subscribeHolder.removeCatalogSubscribe(platformId);
} }
try { try {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform); responseXmlAck(evt, resultXml.toString(), parentPlatform);
ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
subscribeInfo.setToTag(toHeader.getTag());
// redisCatchStorage.updateSubscribe(key, subscribeInfo);
subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
} catch (SipException e) { } catch (SipException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InvalidArgumentException e) { } catch (InvalidArgumentException e) {

View File

@ -67,9 +67,6 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
// 查询设备是否存在 // 查询设备是否存在
CSeqHeader cseqHeader = (CSeqHeader) evt.getRequest().getHeader(CSeqHeader.NAME); CSeqHeader cseqHeader = (CSeqHeader) evt.getRequest().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod(); String method = cseqHeader.getMethod();
if (method.equals("MESSAGE")) {
System.out.println();
}
Device device = redisCatchStorage.getDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
// 查询上级平台是否存在 // 查询上级平台是否存在
ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId); ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId);

View File

@ -18,6 +18,7 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
@ -56,14 +57,15 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
} catch (ParseException e) { } catch (ParseException e) {
e.printStackTrace(); e.printStackTrace();
} }
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
String NotifyType =getText(rootElement, "NotifyType"); String NotifyType =getText(rootElement, "NotifyType");
if (NotifyType.equals("121")){ if (NotifyType.equals("121")){
logger.info("媒体播放完毕,通知关流"); logger.info("媒体播放完毕,通知关流");
StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), "*"); String channelId =getText(rootElement, "DeviceID");
if (streamInfo != null) { redisCatchStorage.stopPlayback(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
redisCatchStorage.stopPlayback(streamInfo); cmder.streamByeCmd(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream()); // TODO 如果级联播放,需要给上级发送此通知
}
} }
} }

View File

@ -88,7 +88,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp
Element secrecyElement = rootElement.element("Secrecy"); Element secrecyElement = rootElement.element("Secrecy");
int secrecy = 0; int secrecy = 0;
if (secrecyElement != null) { if (secrecyElement != null) {
secrecy = Integer.parseInt(secrecyElement.getText()); secrecy = Integer.parseInt(secrecyElement.getText().trim());
} }
String type = "all"; String type = "all";
Element typeElement = rootElement.element("Type"); Element typeElement = rootElement.element("Type");

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
@ -40,6 +41,9 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
@Autowired @Autowired
private SIPProcessorObserver sipProcessorObserver; private SIPProcessorObserver sipProcessorObserver;
@Autowired
private SubscribeHolder subscribeHolder;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅 // 添加消息处理的订阅
@ -83,19 +87,19 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
// 注册/注销成功 // 注册/注销成功
logger.info(String.format("%s %s成功", platformGBId, action)); logger.info(String.format("%s %s成功", platformGBId, action));
redisCatchStorage.delPlatformRegisterInfo(callId); redisCatchStorage.delPlatformRegisterInfo(callId);
parentPlatform.setStatus("注册".equals(action)); redisCatchStorage.delPlatformCatchInfo(platformGBId);
// 取回Expires设置避免注销过程中被置为0 // 取回Expires设置避免注销过程中被置为0
if (!parentPlatformCatch.getParentPlatform().getExpires().equals("0")) { ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); parentPlatformTmp.setStatus("注册".equals(action));
String expires = parentPlatformTmp.getExpires(); redisCatchStorage.updatePlatformRegister(parentPlatformTmp);
parentPlatform.setExpires(expires); redisCatchStorage.updatePlatformKeepalive(parentPlatformTmp);
parentPlatform.setId(parentPlatformTmp.getId()); parentPlatformCatch.setParentPlatform(parentPlatformTmp);
redisCatchStorage.updatePlatformRegister(parentPlatform); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
redisCatchStorage.updatePlatformKeepalive(parentPlatform);
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
}
storager.updateParentPlatformStatus(platformGBId, "注册".equals(action)); storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
if ("注销".equals(action)) {
subscribeHolder.removeCatalogSubscribe(platformGBId);
subscribeHolder.removeMobilePositionSubscribe(platformGBId);
}
} }
} }

View File

@ -9,9 +9,12 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@ -81,7 +84,7 @@ public class ZLMHttpHookListener {
private UserSetup userSetup; private UserSetup userSetup;
@Autowired @Autowired
private MediaConfig mediaConfig; private VideoStreamSessionManager sessionManager;
@Autowired @Autowired
private ZLMRESTfulUtils zlmresTfulUtils; private ZLMRESTfulUtils zlmresTfulUtils;
@ -207,15 +210,15 @@ public class ZLMHttpHookListener {
}else { }else {
ret.put("enableMP4", userSetup.isRecordPushLive()); ret.put("enableMP4", userSetup.isRecordPushLive());
} }
StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(stream); List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream);
if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
// 录像回放时不进行录像下载 String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
if (streamInfo != null) { String channelId = ssrcTransactionForAll.get(0).getChannelId();
ret.put("enableMP4", false); DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
}else { if (deviceChannel != null) {
ret.put("enableMP4", userSetup.isRecordPushLive()); ret.put("enable_audio", deviceChannel.isHasAudio());
}
} }
return new ResponseEntity<String>(ret.toString(), HttpStatus.OK); return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
} }
@ -350,8 +353,12 @@ public class ZLMHttpHookListener {
redisCatchStorage.stopPlay(streamInfo); redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
}else{ }else{
streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
redisCatchStorage.stopPlayback(streamInfo); if (streamInfo != null) {
redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(),
streamInfo.getStream(), null);
}
} }
}else { }else {
if (!"rtp".equals(app)){ if (!"rtp".equals(app)){
@ -443,18 +450,19 @@ public class ZLMHttpHookListener {
ret.put("close", false); ret.put("close", false);
} else { } else {
cmder.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId(), cmder.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId(),
streamInfoForPlayCatch.getStream()); streamInfoForPlayCatch.getStream(), null);
redisCatchStorage.stopPlay(streamInfoForPlayCatch); redisCatchStorage.stopPlay(streamInfoForPlayCatch);
storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId()); storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId());
} }
}else{ }else{
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlaybackByStreamId(streamId); StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (streamInfoForPlayBackCatch != null) { if (streamInfoForPlayBackCatch != null) {
cmder.streamByeCmd(streamInfoForPlayBackCatch.getDeviceID(), cmder.streamByeCmd(streamInfoForPlayBackCatch.getDeviceID(),
streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream()); streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null);
redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch); redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch.getDeviceID(),
streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null);
}else { }else {
StreamInfo streamInfoForDownload = redisCatchStorage.queryDownloadByStreamId(streamId); StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, streamId, null);
// 进行录像下载时无人观看不断流 // 进行录像下载时无人观看不断流
if (streamInfoForDownload != null) { if (streamInfoForDownload != null) {
ret.put("close", false); ret.put("close", false);
@ -462,7 +470,7 @@ public class ZLMHttpHookListener {
} }
} }
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null && "-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())) { if (mediaServerItem != null && mediaServerItem.getStreamNoneReaderDelayMS() == -1) {
ret.put("close", false); ret.put("close", false);
} }
return new ResponseEntity<String>(ret.toString(),HttpStatus.OK); return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);

View File

@ -45,12 +45,8 @@ public class ZLMRTPServerFactory {
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
int result = -1; int result = -1;
/** // 不设置推流端口端则使用随机端口
* 使 if (!StringUtils.isEmpty(mediaServerItem.getSendRtpPortRange())){
*/
if (StringUtils.isEmpty(mediaServerItem.getSendRtpPortRange())){
param.put("port", 0);
}else {
int newPort = getPortFromportRange(mediaServerItem); int newPort = getPortFromportRange(mediaServerItem);
param.put("port", newPort); param.put("port", newPort);
} }

View File

@ -2,10 +2,14 @@ package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
@ -17,13 +21,17 @@ public interface IPlayService {
void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
InviteTimeOutCallback timeoutCallback, String uuid);
PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
MediaServerItem getNewMediaServerItem(Device device); MediaServerItem getNewMediaServerItem(Device device);
void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString); void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String toString);
DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback errorCallBack); DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
void zlmServerOffline(String mediaServerId); void zlmServerOffline(String mediaServerId);
} }

View File

@ -0,0 +1,6 @@
package com.genersoft.iot.vmp.service.bean;
public interface InviteTimeOutCallback {
void run(int code, String msg); // code: 0 sip超时, 1 收流超时
}

View File

@ -7,9 +7,9 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
public class PlayBackResult<T> { public class PlayBackResult<T> {
private int code; private int code;
private T data; private T data;
private MediaServerItem mediaServerItem; private MediaServerItem mediaServerItem;
private JSONObject response; private JSONObject response;
private SipSubscribe.EventResult event; private SipSubscribe.EventResult event;

View File

@ -512,7 +512,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex)); param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex));
param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex)); param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex));
param.put("hook.timeoutSec","20"); param.put("hook.timeoutSec","20");
param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() ); param.put("general.streamNoneReaderDelayMS",mediaServerItem.getStreamNoneReaderDelayMS()==-1?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
// 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。 // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。
// 置0关闭此特性(推流断开会导致立即断开播放器) // 置0关闭此特性(推流断开会导致立即断开播放器)
// 此参数不应大于播放器超时时间 // 此参数不应大于播放器超时时间

View File

@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.PlayBackResult; import com.genersoft.iot.vmp.service.bean.PlayBackResult;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@ -27,6 +28,7 @@ import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IPlayService;
import gov.nist.javax.sip.stack.SIPDialog; import gov.nist.javax.sip.stack.SIPDialog;
import jdk.nashorn.internal.ir.RuntimeNode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -36,6 +38,9 @@ import org.springframework.stereotype.Service;
import org.springframework.util.ResourceUtils; import org.springframework.util.ResourceUtils;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.header.CallIdHeader;
import javax.sip.header.Header;
import javax.sip.message.Request;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.util.*; import java.util.*;
@ -79,6 +84,8 @@ public class PlayServiceImpl implements IPlayService {
private UserSetup userSetup; private UserSetup userSetup;
@Override @Override
public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
@ -141,67 +148,7 @@ public class PlayServiceImpl implements IPlayService {
e.printStackTrace(); e.printStackTrace();
} }
}); });
if (streamInfo == null) { if (streamInfo != null) {
String streamId = null;
if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
// 超时处理
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
logger.warn(String.format("设备点播超时deviceId%s channelId%s", deviceId, channelId));
if (timeoutCallback != null) {
timeoutCallback.run();
}
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
if (dialog != null) {
wvpResult.setMsg("收流超时,请稍候重试");
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
}else {
wvpResult.setMsg("点播超时,请稍候重试");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
msg.setData(wvpResult);
// 回复之前所有的点播请求
resultHolder.invokeAllResult(msg);
}
}, userSetup.getPlayTimeout());
// 发送点播消息
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
timer.cancel();
onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid);
if (hookEvent != null) {
hookEvent.response(mediaServerItem, response);
}
}, (event) -> {
timer.cancel();
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
// 点播返回sip错误
mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
if (errorEvent != null) {
errorEvent.response(event);
}
});
} else {
String streamId = streamInfo.getStream(); String streamId = streamInfo.getStream();
if (streamId == null) { if (streamId == null) {
WVPResult wvpResult = new WVPResult(); WVPResult wvpResult = new WVPResult();
@ -227,67 +174,109 @@ public class PlayServiceImpl implements IPlayService {
if (hookEvent != null) { if (hookEvent != null) {
hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo))); hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo)));
} }
} else { }else {
// TODO 点播前是否重置状态
redisCatchStorage.stopPlay(streamInfo); redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
String streamId2 = null; streamInfo = null;
if (mediaServerItem.isRtpEnable()) {
streamId2 = String.format("%s_%s", device.getDeviceId(), channelId);
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2);
// 超时处理
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
logger.warn(String.format("设备点播超时deviceId%s channelId%s", deviceId, channelId));
if (timeoutCallback != null) {
timeoutCallback.run();
}
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
if (dialog != null) {
wvpResult.setMsg("收流超时,请稍候重试");
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
}else {
wvpResult.setMsg("点播超时,请稍候重试");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
msg.setData(wvpResult);
// 回复之前所有的点播请求
resultHolder.invokeAllResult(msg);
}
}, userSetup.getPlayTimeout());
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid);
}, (event) -> {
mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
});
} }
}
if (streamInfo == null) {
String streamId = null;
if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
if (hookEvent != null) {
hookEvent.response(mediaServerItem, response);
}
}, event -> {
// sip error错误
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
if (errorEvent != null) {
errorEvent.response(event);
}
}, (code, msgStr)->{
// invite点播超时
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
if (code == 0) {
wvpResult.setMsg("点播超时,请稍候重试");
}else if (code == 1) {
wvpResult.setMsg("收流超时,请稍候重试");
}
msg.setData(wvpResult);
// 回复之前所有的点播请求
resultHolder.invokeAllResult(msg);
}, uuid);
}
return playResult;
}
@Override
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
InviteTimeOutCallback timeoutCallback, String uuid) {
String streamId = null;
if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
if (ssrcInfo == null) {
ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
} }
return playResult; // 超时处理
Timer timer = new Timer();
SSRCInfo finalSsrcInfo = ssrcInfo;
timer.schedule(new TimerTask() {
@Override
public void run() {
logger.warn(String.format("设备点播超时deviceId%s channelId%s", device.getDeviceId(), channelId));
SIPDialog dialog = streamSession.getDialogByStream(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
if (dialog != null) {
timeoutCallback.run(1, "收流超时");
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, finalSsrcInfo.getStream(), null);
}else {
timeoutCallback.run(0, "点播超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
}
}
}, userSetup.getPlayTimeout());
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
timer.cancel();
// hook响应
onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
hookEvent.response(mediaServerItemInuse, response);
}, (event) -> {
timer.cancel();
mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
errorEvent.response(event);
});
} }
@Override @Override
public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
msg.setId(uuid); if (uuid != null) {
msg.setId(uuid);
}
msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
if (streamInfo != null) { if (streamInfo != null) {
@ -297,7 +286,6 @@ public class PlayServiceImpl implements IPlayService {
storager.startPlay(deviceId, channelId, streamInfo.getStream()); storager.startPlay(deviceId, channelId, streamInfo.getStream());
} }
redisCatchStorage.startPlay(streamInfo); redisCatchStorage.startPlay(streamInfo);
msg.setData(JSON.toJSONString(streamInfo));
WVPResult wvpResult = new WVPResult(); WVPResult wvpResult = new WVPResult();
wvpResult.setCode(0); wvpResult.setCode(0);
@ -329,9 +317,24 @@ public class PlayServiceImpl implements IPlayService {
return mediaServerItem; return mediaServerItem;
} }
@Override
public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime,
String endTime,InviteStreamCallback inviteStreamCallback,
PlayBackCallback callback) {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) return null;
MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
}
@Override @Override
public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback callback) { public DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
String deviceId, String channelId, String startTime,
String endTime, InviteStreamCallback infoCallBack,
PlayBackCallback playBackCallback) {
if (mediaServerItem == null || ssrcInfo == null) return null;
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();
String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L); DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
@ -341,8 +344,6 @@ public class PlayServiceImpl implements IPlayService {
return result; return result;
} }
MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId, uuid, result); resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId, uuid, result);
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
msg.setId(uuid); msg.setId(uuid);
@ -356,63 +357,62 @@ public class PlayServiceImpl implements IPlayService {
logger.warn(String.format("设备回放超时deviceId%s channelId%s", deviceId, channelId)); logger.warn(String.format("设备回放超时deviceId%s channelId%s", deviceId, channelId));
playBackResult.setCode(-1); playBackResult.setCode(-1);
playBackResult.setData(msg); playBackResult.setData(msg);
callback.call(playBackResult); playBackCallback.call(playBackResult);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源 // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
if (dialog != null) { if (dialog != null) {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源 // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
}else { }else {
mediaServerService.releaseSsrc(newMediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
} }
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
// 回复之前所有的点播请求 // 回复之前所有的点播请求
callback.call(playBackResult); playBackCallback.call(playBackResult);
} }
}, userSetup.getPlayTimeout()); }, userSetup.getPlayTimeout());
cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
logger.info("收到订阅消息: " + response.toJSONString()); (InviteStreamInfo inviteStreamInfo) -> {
timer.cancel(); logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); timer.cancel();
if (streamInfo == null) { StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
logger.warn("设备回放API调用失败"); if (streamInfo == null) {
msg.setData("设备回放API调用失败"); logger.warn("设备回放API调用失败");
playBackResult.setCode(-1); msg.setData("设备回放API调用失败");
playBackResult.setData(msg); playBackResult.setCode(-1);
callback.call(playBackResult); playBackResult.setData(msg);
return; playBackCallback.call(playBackResult);
} return;
redisCatchStorage.startPlayback(streamInfo); }
msg.setData(JSON.toJSONString(streamInfo)); redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
playBackResult.setCode(0); msg.setData(JSON.toJSONString(streamInfo));
playBackResult.setData(msg); playBackResult.setCode(0);
playBackResult.setMediaServerItem(mediaServerItem); playBackResult.setData(msg);
playBackResult.setResponse(response); playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
callback.call(playBackResult); playBackResult.setResponse(inviteStreamInfo.getResponse());
}, event -> { playBackCallback.call(playBackResult);
timer.cancel(); }, event -> {
msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); timer.cancel();
playBackResult.setCode(-1); msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
playBackResult.setData(msg); playBackResult.setCode(-1);
playBackResult.setEvent(event); playBackResult.setData(msg);
callback.call(playBackResult); playBackResult.setEvent(event);
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); playBackCallback.call(playBackResult);
}); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
});
return result; return result;
} }
@Override @Override
public void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId); msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
msg.setId(uuid); msg.setId(uuid);
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
if (streamInfo != null) { if (streamInfo != null) {
redisCatchStorage.startDownload(streamInfo); redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
msg.setData(JSON.toJSONString(streamInfo)); msg.setData(JSON.toJSONString(streamInfo));
resultHolder.invokeResult(msg); resultHolder.invokeResult(msg);
} else { } else {
@ -449,7 +449,8 @@ public class PlayServiceImpl implements IPlayService {
if (allSsrc.size() > 0) { if (allSsrc.size() > 0) {
for (SsrcTransaction ssrcTransaction : allSsrc) { for (SsrcTransaction ssrcTransaction : allSsrc) {
if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) { if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
ssrcTransaction.getStream(), null);
} }
} }
} }

View File

@ -47,17 +47,15 @@ public interface IRedisCatchStorage {
StreamInfo queryPlayByStreamId(String steamId); StreamInfo queryPlayByStreamId(String steamId);
StreamInfo queryPlaybackByStreamId(String steamId);
StreamInfo queryPlayByDevice(String deviceId, String channelId); StreamInfo queryPlayByDevice(String deviceId, String channelId);
Map<String, StreamInfo> queryPlayByDeviceId(String deviceId); Map<String, StreamInfo> queryPlayByDeviceId(String deviceId);
boolean startPlayback(StreamInfo stream); boolean startPlayback(StreamInfo stream, String callId);
boolean stopPlayback(StreamInfo streamInfo); boolean stopPlayback(String deviceId, String channelId, String stream, String callId);
StreamInfo queryPlaybackByDevice(String deviceId, String code); StreamInfo queryPlayback(String deviceId, String channelID, String stream, String callId);
void updatePlatformCatchInfo(ParentPlatformCatch parentPlatformCatch); void updatePlatformCatchInfo(ParentPlatformCatch parentPlatformCatch);
@ -167,9 +165,9 @@ public interface IRedisCatchStorage {
* *
* @param streamInfo * @param streamInfo
*/ */
boolean startDownload(StreamInfo streamInfo); boolean startDownload(StreamInfo streamInfo, String callId);
StreamInfo queryDownloadByStreamId(String streamId); StreamInfo queryDownload(String deviceId, String channelId, String stream, String callId);
/** /**
* *
@ -204,18 +202,8 @@ public interface IRedisCatchStorage {
void resetAllSN(); void resetAllSN();
void updateSubscribe(String key, SubscribeInfo subscribeInfo);
SubscribeInfo getSubscribe(String key);
void delSubscribe(String key);
MediaItem getStreamInfo(String app, String streamId, String mediaServerId); MediaItem getStreamInfo(String app, String streamId, String mediaServerId);
List<SubscribeInfo> getAllSubscribe();
List<String> getAllSubscribePlatform();
void addCpuInfo(double cpuInfo); void addCpuInfo(double cpuInfo);
void addMemInfo(double memInfo); void addMemInfo(double memInfo);

View File

@ -231,7 +231,6 @@ public interface DeviceChannelMapper {
" name as title,\n" + " name as title,\n" +
" channelId as \"value\",\n" + " channelId as \"value\",\n" +
" channelId as \"key\",\n" + " channelId as \"key\",\n" +
" channelId,\n" +
" longitude,\n" + " longitude,\n" +
" latitude\n" + " latitude\n" +
" from device_channel\n" + " from device_channel\n" +
@ -248,4 +247,13 @@ public interface DeviceChannelMapper {
"<foreach collection='channels' item='item' open='(' separator=',' close=')' > #{item.channelId}</foreach>" + "<foreach collection='channels' item='item' open='(' separator=',' close=')' > #{item.channelId}</foreach>" +
" </script>"}) " </script>"})
int cleanChannelsNotInList(String deviceId, List<DeviceChannel> channels); int cleanChannelsNotInList(String deviceId, List<DeviceChannel> channels);
@Update(" update device_channel" +
" set subCount = (select *" +
" from (select count(0)" +
" from device_channel" +
" where deviceId = #{deviceId} and parentId = #{channelId}) as temp)" +
" where deviceId = #{deviceId} " +
" and channelId = #{channelId}")
int updateChannelSubCount(String deviceId, String channelId);
} }

View File

@ -55,7 +55,7 @@ public interface PlatformChannelMapper {
int cleanChannelForGB(String platformId); int cleanChannelForGB(String platformId);
@Select("SELECT dc.* FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE dc.channelId='${channelId}' and pgc.platformId='${platformId}'") @Select("SELECT dc.* FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE dc.channelId='${channelId}' and pgc.platformId='${platformId}'")
DeviceChannel queryChannelInParentPlatform(String platformId, String channelId); List<DeviceChannel> queryChannelInParentPlatform(String platformId, String channelId);
@Select(" select dc.channelId as id, dc.name as name, pgc.platformId as platformId, pgc.catalogId as parentId, 0 as childrenCount, 1 as type " + @Select(" select dc.channelId as id, dc.name as name, pgc.platformId as platformId, pgc.catalogId as parentId, 0 as childrenCount, 1 as type " +
" from device_channel dc left join platform_gb_channel pgc on dc.id = pgc.deviceChannelId " + " from device_channel dc left join platform_gb_channel pgc on dc.id = pgc.deviceChannelId " +
@ -67,7 +67,7 @@ public interface PlatformChannelMapper {
" left join device_channel dc on dc.id = pgc.deviceChannelId\n" + " left join device_channel dc on dc.id = pgc.deviceChannelId\n" +
" left join device d on dc.deviceId = d.deviceId\n" + " left join device d on dc.deviceId = d.deviceId\n" +
"where dc.channelId = #{channelId} and pgc.platformId=#{platformId}") "where dc.channelId = #{channelId} and pgc.platformId=#{platformId}")
Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId); List<Device> queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId);
@Delete("<script> "+ @Delete("<script> "+
"DELETE FROM platform_gb_channel WHERE catalogId=#{id}" + "DELETE FROM platform_gb_channel WHERE catalogId=#{id}" +

View File

@ -133,13 +133,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return (StreamInfo)redis.get(playLeys.get(0).toString()); return (StreamInfo)redis.get(playLeys.get(0).toString());
} }
@Override
public StreamInfo queryPlaybackByStreamId(String streamId) {
List<Object> playLeys = redis.scan(String.format("%S_%s_%s_*", VideoManagerConstants.PLAY_BLACK_PREFIX, userSetup.getServerId(), streamId));
if (playLeys == null || playLeys.size() == 0) return null;
return (StreamInfo)redis.get(playLeys.get(0).toString());
}
@Override @Override
public StreamInfo queryPlayByDevice(String deviceId, String channelId) { public StreamInfo queryPlayByDevice(String deviceId, String channelId) {
List<Object> playLeys = redis.scan(String.format("%S_%s_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, List<Object> playLeys = redis.scan(String.format("%S_%s_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX,
@ -166,49 +159,67 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public boolean startPlayback(StreamInfo stream) { public boolean startPlayback(StreamInfo stream, String callId) {
return redis.set(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, return redis.set(String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
userSetup.getServerId(), stream.getStream(), stream.getDeviceID(), stream.getChannelId()), stream); userSetup.getServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream);
} }
@Override @Override
public boolean startDownload(StreamInfo streamInfo) { public boolean startDownload(StreamInfo stream, String callId) {
return redis.set(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX, userSetup.getServerId(), return redis.set(String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
streamInfo.getStream(), streamInfo.getDeviceID(), streamInfo.getChannelId()), streamInfo); userSetup.getServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream);
} }
@Override @Override
public boolean stopPlayback(StreamInfo streamInfo) { public boolean stopPlayback(String deviceId, String channelId, String stream, String callId) {
if (streamInfo == null) return false; DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(deviceId, channelId);
DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(streamInfo.getDeviceID(), streamInfo.getChannelId());
if (deviceChannel != null) { if (deviceChannel != null) {
deviceChannel.setStreamId(null); deviceChannel.setStreamId(null);
deviceChannel.setDeviceId(streamInfo.getDeviceID()); deviceChannel.setDeviceId(deviceId);
deviceChannelMapper.update(deviceChannel); deviceChannelMapper.update(deviceChannel);
} }
return redis.del(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, if (deviceId == null) deviceId = "*";
if (channelId == null) channelId = "*";
if (stream == null) stream = "*";
if (callId == null) callId = "*";
String key = String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
userSetup.getServerId(), userSetup.getServerId(),
streamInfo.getStream(), deviceId,
streamInfo.getDeviceID(), channelId,
streamInfo.getChannelId())); stream,
callId
);
List<Object> scan = redis.scan(key);
if (scan.size() > 0) {
for (Object keyObj : scan) {
redis.del((String) keyObj);
}
}
return true;
} }
@Override @Override
public StreamInfo queryPlaybackByDevice(String deviceId, String code) { public StreamInfo queryPlayback(String deviceId, String channelId, String stream, String callId) {
// String format = String.format("%S_*_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, if (stream == null && callId == null) {
// deviceId, return null;
// code); }
List<Object> playLeys = redis.scan(String.format("%S_%s_*_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, if (deviceId == null) deviceId = "*";
if (channelId == null) channelId = "*";
if (stream == null) stream = "*";
if (callId == null) callId = "*";
String key = String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX,
userSetup.getServerId(), userSetup.getServerId(),
deviceId, deviceId,
code)); channelId,
if (playLeys == null || playLeys.size() == 0) { stream,
playLeys = redis.scan(String.format("%S_%s_*_*_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, callId
userSetup.getServerId(), );
deviceId)); List<Object> streamInfoScan = redis.scan(key);
if (streamInfoScan.size() > 0) {
return (StreamInfo) redis.get((String) streamInfoScan.get(0));
}else {
return null;
} }
if (playLeys == null || playLeys.size() == 0) return null;
return (StreamInfo)redis.get(playLeys.get(0).toString());
} }
@Override @Override
@ -361,7 +372,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
} }
} }
List<Object> playBackers = redis.scan(String.format("%S_%s_*_%s_*", VideoManagerConstants.PLAY_BLACK_PREFIX, List<Object> playBackers = redis.scan(String.format("%S_%s_%s_*_*_*", VideoManagerConstants.PLAY_BLACK_PREFIX,
userSetup.getServerId(), userSetup.getServerId(),
deviceId)); deviceId));
if (playBackers.size() > 0) { if (playBackers.size() > 0) {
@ -426,10 +437,27 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
} }
@Override @Override
public StreamInfo queryDownloadByStreamId(String streamId) { public StreamInfo queryDownload(String deviceId, String channelId, String stream, String callId) {
List<Object> playLeys = redis.scan(String.format("%S_%s_%s_*", VideoManagerConstants.DOWNLOAD_PREFIX, userSetup.getServerId(), streamId)); if (stream == null && callId == null) {
if (playLeys == null || playLeys.size() == 0) return null; return null;
return (StreamInfo)redis.get(playLeys.get(0).toString()); }
if (deviceId == null) deviceId = "*";
if (channelId == null) channelId = "*";
if (stream == null) stream = "*";
if (callId == null) callId = "*";
String key = String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
userSetup.getServerId(),
deviceId,
channelId,
stream,
callId
);
List<Object> streamInfoScan = redis.scan(key);
if (streamInfoScan.size() > 0) {
return (StreamInfo) redis.get((String) streamInfoScan.get(0));
}else {
return null;
}
} }
@Override @Override
@ -490,21 +518,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return (GPSMsgInfo)redis.get(key); return (GPSMsgInfo)redis.get(key);
} }
@Override
public void updateSubscribe(String key, SubscribeInfo subscribeInfo) {
redis.set(key, subscribeInfo, subscribeInfo.getExpires());
}
@Override
public SubscribeInfo getSubscribe(String key) {
return (SubscribeInfo)redis.get(key);
}
@Override
public void delSubscribe(String key) {
redis.del(key);
}
@Override @Override
public List<GPSMsgInfo> getAllGpsMsgInfo() { public List<GPSMsgInfo> getAllGpsMsgInfo() {
String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_*"; String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_*";
@ -535,32 +548,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return result; return result;
} }
@Override
public List<SubscribeInfo> getAllSubscribe() {
String scanKey = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_*";
List<SubscribeInfo> result = new ArrayList<>();
List<Object> keys = redis.scan(scanKey);
for (int i = 0; i < keys.size(); i++) {
String key = (String) keys.get(i);
SubscribeInfo subscribeInfo = (SubscribeInfo) redis.get(key);
result.add(subscribeInfo);
}
return result;
}
@Override
public List<String> getAllSubscribePlatform() {
String scanKey = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_*";
List<String> result = new ArrayList<>();
List<Object> keys = redis.scan(scanKey);
for (int i = 0; i < keys.size(); i++) {
String key = (String) keys.get(i);
String platformId = key.substring(scanKey.length() - 1);
result.add(platformId);
}
return result;
}
@Override @Override
public void addCpuInfo(double cpuInfo) { public void addCpuInfo(double cpuInfo) {
String key = VideoManagerConstants.SYSTEM_INFO_CPU_PREFIX + userSetup.getServerId(); String key = VideoManagerConstants.SYSTEM_INFO_CPU_PREFIX + userSetup.getServerId();

View File

@ -42,7 +42,7 @@ import java.util.*;
@Component @Component
public class VideoManagerStoragerImpl implements IVideoManagerStorager { public class VideoManagerStoragerImpl implements IVideoManagerStorager {
private Logger logger = LoggerFactory.getLogger(VideoManagerStoragerImpl.class); private final Logger logger = LoggerFactory.getLogger(VideoManagerStoragerImpl.class);
@Autowired @Autowired
EventPublisher eventPublisher; EventPublisher eventPublisher;
@ -171,6 +171,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
}else { }else {
deviceChannelMapper.update(channel); deviceChannelMapper.update(channel);
} }
deviceChannelMapper.updateChannelSubCount(deviceId,channel.getParentId());
} }
@Override @Override
@ -542,7 +543,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
if (parentPlatformCatch == null) { // serverGBId 已变化 if (parentPlatformCatch == null) { // serverGBId 已变化
ParentPlatform parentPlatById = platformMapper.getParentPlatById(parentPlatform.getId()); ParentPlatform parentPlatById = platformMapper.getParentPlatById(parentPlatform.getId());
// 使用旧的查出缓存ID // 使用旧的查出缓存ID
parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatById.getServerGBId()); parentPlatformCatch = new ParentPlatformCatch();
parentPlatformCatch.setId(parentPlatform.getServerGBId()); parentPlatformCatch.setId(parentPlatform.getServerGBId());
redisCatchStorage.delPlatformCatchInfo(parentPlatById.getServerGBId()); redisCatchStorage.delPlatformCatchInfo(parentPlatById.getServerGBId());
} }
@ -662,8 +663,16 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
@Override @Override
public DeviceChannel queryChannelInParentPlatform(String platformId, String channelId) { public DeviceChannel queryChannelInParentPlatform(String platformId, String channelId) {
DeviceChannel channel = platformChannelMapper.queryChannelInParentPlatform(platformId, channelId); List<DeviceChannel> channels = platformChannelMapper.queryChannelInParentPlatform(platformId, channelId);
return channel; if (channels.size() > 1) {
// 出现长度大于0的时候肯定是国标通道的ID重复了
logger.warn("国标ID存在重复{}", channelId);
}
if (channels.size() == 0) {
return null;
}else {
return channels.get(0);
}
} }
@Override @Override
@ -680,8 +689,18 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
@Override @Override
public Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId) { public Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId) {
Device device = platformChannelMapper.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId); List<Device> devices = platformChannelMapper.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId);
return device; if (devices.size() > 1) {
// 出现长度大于0的时候肯定是国标通道的ID重复了
logger.warn("国标ID存在重复{}", channelId);
}
if (devices.size() == 0) {
return null;
}else {
return devices.get(0);
}
} }
/** /**
@ -1083,6 +1102,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
@Override @Override
public List<ParentPlatform> queryPlatFormListForStreamWithGBId(String app, String stream, List<String> platforms) { public List<ParentPlatform> queryPlatFormListForStreamWithGBId(String app, String stream, List<String> platforms) {
if (platforms == null || platforms.size() == 0) {
return new ArrayList<>();
}
return platformGbStreamMapper.queryPlatFormListForGBWithGBId(app, stream, platforms); return platformGbStreamMapper.queryPlatFormListForGBWithGBId(app, stream, platforms);
} }

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
@ -49,6 +50,9 @@ public class PlatformController {
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
@Autowired
private SubscribeHolder subscribeHolder;
@Autowired @Autowired
private ISIPCommanderForPlatform commanderForPlatform; private ISIPCommanderForPlatform commanderForPlatform;
@ -110,10 +114,14 @@ public class PlatformController {
}) })
public PageInfo<ParentPlatform> platforms(@PathVariable int page, @PathVariable int count) { public PageInfo<ParentPlatform> platforms(@PathVariable int page, @PathVariable int count) {
// if (logger.isDebugEnabled()) { PageInfo<ParentPlatform> parentPlatformPageInfo = storager.queryParentPlatformList(page, count);
// logger.debug("查询所有上级设备API调用"); if (parentPlatformPageInfo.getList().size() > 0) {
// } for (ParentPlatform platform : parentPlatformPageInfo.getList()) {
return storager.queryParentPlatformList(page, count); platform.setGpsSubscribe(subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()) != null);
platform.setCatalogSubscribe(subscribeHolder.getCatalogSubscribe(platform.getServerGBId()) != null);
}
}
return parentPlatformPageInfo;
} }
/** /**

View File

@ -120,7 +120,7 @@ public class PlayController {
storager.stopPlay(deviceId, channelId); storager.stopPlay(deviceId, channelId);
return result; return result;
} }
cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream(), (event) -> { cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream(), null, (event) -> {
redisCatchStorage.stopPlay(streamInfo); redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
@ -174,7 +174,7 @@ public class PlayController {
public ResponseEntity<String> playConvert(@PathVariable String streamId) { public ResponseEntity<String> playConvert(@PathVariable String streamId) {
StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
if (streamInfo == null) { if (streamInfo == null) {
streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
} }
if (streamInfo == null) { if (streamInfo == null) {
logger.warn("视频转码API调用失败, 视频流已经停止!"); logger.warn("视频转码API调用失败, 视频流已经停止!");

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.vmanager.gb28181.playback; package com.genersoft.iot.vmp.vmanager.gb28181.playback;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -93,11 +94,6 @@ public class DownloadController {
} }
resultHolder.put(key, uuid, result); resultHolder.put(key, uuid, result);
Device device = storager.queryVideoDevice(deviceId); Device device = storager.queryVideoDevice(deviceId);
StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId);
if (streamInfo != null) {
// 停止之前的下载
cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream());
}
MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
if (newMediaServerItem == null) { if (newMediaServerItem == null) {
@ -112,9 +108,9 @@ public class DownloadController {
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
cmder.downloadStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, (MediaServerItem mediaServerItem, JSONObject response) -> { cmder.downloadStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, (InviteStreamInfo inviteStreamInfo) -> {
logger.info("收到订阅消息: " + response.toJSONString()); logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
playService.onPublishHandlerForDownload(mediaServerItem, response, deviceId, channelId, uuid); playService.onPublishHandlerForDownload(inviteStreamInfo, deviceId, channelId, uuid);
}, event -> { }, event -> {
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
msg.setId(uuid); msg.setId(uuid);
@ -135,7 +131,7 @@ public class DownloadController {
@GetMapping("/stop/{deviceId}/{channelId}/{stream}") @GetMapping("/stop/{deviceId}/{channelId}/{stream}")
public ResponseEntity<String> playStop(@PathVariable String deviceId, @PathVariable String channelId, @PathVariable String stream) { public ResponseEntity<String> playStop(@PathVariable String deviceId, @PathVariable String channelId, @PathVariable String stream) {
cmder.streamByeCmd(deviceId, channelId, stream); cmder.streamByeCmd(deviceId, channelId, stream, null);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(String.format("设备历史媒体下载停止 API调用deviceId/channelId%s_%s", deviceId, channelId)); logger.debug(String.format("设备历史媒体下载停止 API调用deviceId/channelId%s_%s", deviceId, channelId));

View File

@ -77,7 +77,7 @@ public class PlaybackController {
logger.debug(String.format("设备回放 API调用deviceId%s channelId%s", deviceId, channelId)); logger.debug(String.format("设备回放 API调用deviceId%s channelId%s", deviceId, channelId));
} }
DeferredResult<ResponseEntity<String>> result = playService.playBack(deviceId, channelId, startTime, endTime, wvpResult->{ DeferredResult<ResponseEntity<String>> result = playService.playBack(deviceId, channelId, startTime, endTime, null, wvpResult->{
resultHolder.invokeResult(wvpResult.getData()); resultHolder.invokeResult(wvpResult.getData());
}); });
@ -96,7 +96,7 @@ public class PlaybackController {
@PathVariable String channelId, @PathVariable String channelId,
@PathVariable String stream) { @PathVariable String stream) {
cmder.streamByeCmd(deviceId, channelId, stream); cmder.streamByeCmd(deviceId, channelId, stream, null);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(String.format("设备录像回放停止 API调用deviceId/channelId%s/%s", deviceId, channelId)); logger.debug(String.format("设备录像回放停止 API调用deviceId/channelId%s/%s", deviceId, channelId));
@ -124,7 +124,7 @@ public class PlaybackController {
public ResponseEntity<String> playPause(@PathVariable String streamId) { public ResponseEntity<String> playPause(@PathVariable String streamId) {
logger.info("playPause: "+streamId); logger.info("playPause: "+streamId);
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (null == streamInfo) { if (null == streamInfo) {
json.put("msg", "streamId不存在"); json.put("msg", "streamId不存在");
logger.warn("streamId不存在!"); logger.warn("streamId不存在!");
@ -144,7 +144,7 @@ public class PlaybackController {
public ResponseEntity<String> playResume(@PathVariable String streamId) { public ResponseEntity<String> playResume(@PathVariable String streamId) {
logger.info("playResume: "+streamId); logger.info("playResume: "+streamId);
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (null == streamInfo) { if (null == streamInfo) {
json.put("msg", "streamId不存在"); json.put("msg", "streamId不存在");
logger.warn("streamId不存在!"); logger.warn("streamId不存在!");
@ -165,7 +165,7 @@ public class PlaybackController {
public ResponseEntity<String> playSeek(@PathVariable String streamId, @PathVariable long seekTime) { public ResponseEntity<String> playSeek(@PathVariable String streamId, @PathVariable long seekTime) {
logger.info("playSeek: "+streamId+", "+seekTime); logger.info("playSeek: "+streamId+", "+seekTime);
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (null == streamInfo) { if (null == streamInfo) {
json.put("msg", "streamId不存在"); json.put("msg", "streamId不存在");
logger.warn("streamId不存在!"); logger.warn("streamId不存在!");
@ -186,7 +186,7 @@ public class PlaybackController {
public ResponseEntity<String> playSpeed(@PathVariable String streamId, @PathVariable Double speed) { public ResponseEntity<String> playSpeed(@PathVariable String streamId, @PathVariable Double speed) {
logger.info("playSpeed: "+streamId+", "+speed); logger.info("playSpeed: "+streamId+", "+speed);
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (null == streamInfo) { if (null == streamInfo) {
json.put("msg", "streamId不存在"); json.put("msg", "streamId不存在");
logger.warn("streamId不存在!"); logger.warn("streamId不存在!");

View File

@ -177,7 +177,7 @@ public class ApiStreamController {
result.put("error","未找到流信息"); result.put("error","未找到流信息");
return result; return result;
} }
cmder.streamByeCmd(serial, code, streamInfo.getStream()); cmder.streamByeCmd(serial, code, streamInfo.getStream(), null);
redisCatchStorage.stopPlay(streamInfo); redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
return null; return null;

View File

@ -13,7 +13,7 @@
</div> </div>
<!--设备列表--> <!--设备列表-->
<el-table :data="platformList" border style="width: 100%" :height="winHeight"> <el-table :data="platformList" border style="width: 100%" :height="winHeight">
<el-table-column prop="name" label="名称" width="240" align="center"></el-table-column> <el-table-column prop="name" label="名称" align="center"></el-table-column>
<el-table-column prop="serverGBId" label="平台编号" width="180" align="center"></el-table-column> <el-table-column prop="serverGBId" label="平台编号" width="180" align="center"></el-table-column>
<el-table-column label="是否启用" width="120" align="center"> <el-table-column label="是否启用" width="120" align="center">
<template slot-scope="scope"> <template slot-scope="scope">
@ -38,9 +38,19 @@
</div> </div>
</template> </template>
</el-table-column> </el-table-column>
<el-table-column prop="deviceGBId" label="设备国标编号" width="240" align="center"></el-table-column> <el-table-column prop="deviceGBId" label="设备国标编号" width="200" align="center"></el-table-column>
<el-table-column prop="transport" label="信令传输模式" width="120" align="center"></el-table-column> <el-table-column prop="transport" label="信令传输模式" width="120" align="center"></el-table-column>
<el-table-column prop="channelCount" label="通道数" align="center"></el-table-column> <el-table-column prop="channelCount" label="通道数" width="120" align="center"></el-table-column>
<el-table-column label="订阅信息" width="240" align="center" fixed="right">
<template slot-scope="scope">
<i v-if="scope.row.alarmSubscribe" style="font-size: 1.5rem;" title="报警订阅" class="subscribe-on iconfont icon-gbaojings" ></i>
<i v-if="!scope.row.alarmSubscribe" style="font-size: 1.5rem;" title="报警订阅" class="subscribe-off iconfont icon-gbaojings" ></i>
<i v-if="scope.row.catalogSubscribe" title="目录订阅" class="subscribe-on iconfont icon-gjichus" ></i>
<i v-if="!scope.row.catalogSubscribe" title="目录订阅" class="subscribe-off iconfont icon-gjichus" ></i>
<i v-if="scope.row.gpsSubscribe" title="位置订阅" class="subscribe-on iconfont icon-gxunjians" ></i>
<i v-if="!scope.row.gpsSubscribe" title="位置订阅" class="subscribe-off iconfont icon-gxunjians" ></i>
</template>
</el-table-column>
<el-table-column label="操作" width="300" align="center" fixed="right"> <el-table-column label="操作" width="300" align="center" fixed="right">
<template slot-scope="scope"> <template slot-scope="scope">
@ -169,3 +179,13 @@ export default {
} }
}; };
</script> </script>
<style>
.subscribe-on{
color: #409EFF;
font-size: 1.3rem;
}
.subscribe-off{
color: #afafb3;
font-size: 1.3rem;
}
</style>

View File

@ -193,6 +193,7 @@ export default {
this.mediaServer.getOnlineMediaServerList((data)=>{ this.mediaServer.getOnlineMediaServerList((data)=>{
this.mediaServerList = data.data; this.mediaServerList = data.data;
this.proxyParam.mediaServerId = this.mediaServerList[0].id this.proxyParam.mediaServerId = this.mediaServerList[0].id
this.mediaServerIdChange()
}) })
}, },
mediaServerIdChange:function (){ mediaServerIdChange:function (){
@ -206,6 +207,7 @@ export default {
} }
}).then(function (res) { }).then(function (res) {
that.ffmpegCmdList = res.data.data; that.ffmpegCmdList = res.data.data;
that.proxyParam.ffmpeg_cmd_key = Object.keys(res.data.data)[0];
}).catch(function (error) { }).catch(function (error) {
console.log(error); console.log(error);
}); });

View File

@ -1,8 +1,8 @@
<template> <template>
<div id="chooseChannelForGb" > <div id="chooseChannelForGb" >
<div style="font-size: 17px; color: #606060; white-space: nowrap; line-height: 30px; font-family: monospace;"> <div style="font-size: 17px; color: #606060; white-space: nowrap; line-height: 30px; font-family: monospace;">
<span v-if="catalogId == null">{{catalogName}}</span> <span v-if="catalogId == null">{{catalogName}}</span>
<span v-if="catalogId != null">{{catalogName}}({{catalogId}})</span> <span v-if="catalogId != null">{{catalogName}}({{catalogId}})</span>
</div> </div>
<div style="background-color: #FFFFFF; position: relative; padding: 0.5rem; text-align: left;font-size: 14px;"> <div style="background-color: #FFFFFF; position: relative; padding: 0.5rem; text-align: left;font-size: 14px;">
搜索: <el-input @input="search" style="margin-right: 1rem; width: auto;" size="mini" placeholder="关键字" prefix-icon="el-icon-search" v-model="searchSrt" clearable> </el-input> 搜索: <el-input @input="search" style="margin-right: 1rem; width: auto;" size="mini" placeholder="关键字" prefix-icon="el-icon-search" v-model="searchSrt" clearable> </el-input>

View File

@ -1,8 +1,8 @@
<template> <template>
<div id="chooseChannelFoStream" > <div id="chooseChannelFoStream" >
<div style="font-size: 17px; color: #606060; white-space: nowrap; line-height: 30px; font-family: monospace;"> <div style="font-size: 17px; color: #606060; white-space: nowrap; line-height: 30px; font-family: monospace;">
<span v-if="catalogId == null">{{catalogName}}</span> <span v-if="catalogId == null">{{catalogName}}</span>
<span v-if="catalogId != null">{{catalogName}}({{catalogId}})</span> <span v-if="catalogId != null">{{catalogName}}({{catalogId}})</span>
</div> </div>
<div style="background-color: #FFFFFF; margin-bottom: 1rem; position: relative; padding: 0.5rem; text-align: left;font-size: 14px;"> <div style="background-color: #FFFFFF; margin-bottom: 1rem; position: relative; padding: 0.5rem; text-align: left;font-size: 14px;">

View File

@ -1,8 +1,8 @@
@font-face { @font-face {
font-family: "iconfont"; /* Project id 1291092 */ font-family: "iconfont"; /* Project id 1291092 */
src: url('iconfont.woff2?t=1644809302709') format('woff2'), src: url('iconfont.woff2?t=1647245982270') format('woff2'),
url('iconfont.woff?t=1644809302709') format('woff'), url('iconfont.woff?t=1647245982270') format('woff'),
url('iconfont.ttf?t=1644809302709') format('truetype'); url('iconfont.ttf?t=1647245982270') format('truetype');
} }
.iconfont { .iconfont {
@ -13,6 +13,22 @@
-moz-osx-font-smoothing: grayscale; -moz-osx-font-smoothing: grayscale;
} }
.icon-xitongxinxi:before {
content: "\e7d6";
}
.icon-gbaojings:before {
content: "\e7d7";
}
.icon-gjichus:before {
content: "\e7d8";
}
.icon-gxunjians:before {
content: "\e7d9";
}
.icon-ziyuan:before { .icon-ziyuan:before {
content: "\e7d5"; content: "\e7d5";
} }

Binary file not shown.