Merge pull request #74 from lawrencehj/wvp-28181-2.0

实现语音广播信令等(web语音推流开发中)
pull/80/head
648540858 2021-03-27 13:56:56 +08:00 committed by GitHub
commit f8fe76add2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 362 additions and 131 deletions

View File

@ -33,6 +33,9 @@ public class SipPlatformRunner implements CommandLineRunner {
// 设置所有平台离线
storager.outlineForAllParentPlatform();
// 清理所有平台注册缓存
redisCatchStorage.cleanPlatformRegisterInfos();
List<ParentPlatform> parentPlatforms = storager.queryEnableParentPlatformList(true);
for (ParentPlatform parentPlatform : parentPlatforms) {

View File

@ -16,7 +16,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
// import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

View File

@ -41,6 +41,8 @@ public class DeferredResultHolder {
public static final String CALLBACK_CMD_ALARM = "CALLBACK_ALARM";
public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST";
private Map<String, DeferredResult> map = new ConcurrentHashMap<String, DeferredResult>();
public void put(String key, DeferredResult result) {

View File

@ -119,6 +119,14 @@ public interface ISIPCommander {
*/
boolean audioBroadcastCmd(Device device,String channelId);
/**
* 广
*
* @param device
*/
void audioBroadcastCmd(Device device, SipSubscribe.Event okEvent);
boolean audioBroadcastCmd(Device device);
/**
*
*

View File

@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
// import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;

View File

@ -6,14 +6,14 @@ import java.util.ArrayList;
import javax.sip.InvalidArgumentException;
import javax.sip.PeerUnavailableException;
import javax.sip.SipFactory;
import javax.sip.SipProvider;
// import javax.sip.SipProvider;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.*;
import javax.sip.message.Request;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
// import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.conf.SipConfig;

View File

@ -23,7 +23,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.ComponentScan;
// import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@ -48,7 +48,6 @@ public class SIPCommander implements ISIPCommander {
private final Logger logger = LoggerFactory.getLogger(SIPCommander.class);
@Autowired
private SipConfig sipConfig;
@ -623,10 +622,66 @@ public class SIPCommander implements ISIPCommander {
*/
@Override
public boolean audioBroadcastCmd(Device device, String channelId) {
// TODO Auto-generated method stub
// 改为新的实现
return false;
}
/**
* 广
*
* @param device
* @param channelId
*/
@Override
public boolean audioBroadcastCmd(Device device) {
try {
StringBuffer broadcastXml = new StringBuffer(200);
broadcastXml.append("<?xml version=\"1.0\" ?>\r\n");
broadcastXml.append("<Notify>\r\n");
broadcastXml.append("<CmdType>Broadcast</CmdType>\r\n");
broadcastXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
broadcastXml.append("<SourceID>" + sipConfig.getSipId() + "</SourceID>\r\n");
broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n");
broadcastXml.append("</Notify>\r\n");
String tm = Long.toString(System.currentTimeMillis());
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), "z9hG4bK-ViaBcst-" + tm, "FromBcst" + tm, null, callIdHeader);
transmitRequest(device, request);
return true;
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
}
return false;
}
@Override
public void audioBroadcastCmd(Device device, SipSubscribe.Event errorEvent) {
try {
StringBuffer broadcastXml = new StringBuffer(200);
broadcastXml.append("<?xml version=\"1.0\" ?>\r\n");
broadcastXml.append("<Notify>\r\n");
broadcastXml.append("<CmdType>Broadcast</CmdType>\r\n");
broadcastXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
broadcastXml.append("<SourceID>" + sipConfig.getSipId() + "</SourceID>\r\n");
broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n");
broadcastXml.append("</Notify>\r\n");
String tm = Long.toString(System.currentTimeMillis());
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), "z9hG4bK-ViaBcst-" + tm, "FromBcst" + tm, null, callIdHeader);
transmitRequest(device, request, errorEvent);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
}
}
/**
*
*

View File

@ -15,7 +15,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.ComponentScan;
// import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Lazy;
import org.springframework.lang.Nullable;

View File

@ -14,6 +14,7 @@ import javax.sip.message.Response;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
@ -74,37 +75,38 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
Request request = evt.getRequest();
SipURI sipURI = (SipURI) request.getRequestURI();
String channelId = sipURI.getUser();
String platformId = null;
String requesterId = null;
FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
platformId = uri.getUser();
requesterId = uri.getUser();
if (platformId == null || channelId == null) {
logger.info("无法从FromHeader的Address中获取到平台id返回404");
if (requesterId == null || channelId == null) {
logger.info("无法从FromHeader的Address中获取到平台id返回400");
responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400请求错误
return;
}
// 查询请求方是否上级平台
ParentPlatform platform = storager.queryParentPlatById(requesterId);
if (platform != null) {
// 查询平台下是否有该通道
DeviceChannel channel = storager.queryChannelInParentPlatform(platformId, channelId);
DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
if (channel == null) {
logger.info("通道不存在返回404");
responseAck(evt, Response.NOT_FOUND); // 通道不存在发404资源不存在
return;
}else {
responseAck(evt, Response.TRYING); // 通道存在发100trying
responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在发181呼叫转接中
}
// 解析sdp消息, 使用jainsip 自带的sdp解析方式
String contentString = new String(request.getRawContent());
// jainSip不支持y=字段, 移除移除以解析。
int ssrcIndex = contentString.indexOf("y=");
String ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
//ssrc规定长度为10字节不取余下长度以避免后续还有“f=”字段
// String ssrc = contentString.substring(ssrcIndex + 2, contentString.length())
// .replace("\r\n", "").replace("\n", "");
String ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
String substring = contentString.substring(0, contentString.indexOf("y="));
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
@ -152,13 +154,13 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
//String sessionName = sdp.getSessionName().getValue();
logger.info("[上级点播]用户:{} 地址:{}:{} ssrc{}", username, addressStr, port, ssrc);
Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId);
Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
if (device == null) {
logger.warn("点播平台{}的通道{}时未找到设备信息", platformId, channel);
logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
responseAck(evt, Response.SERVER_INTERNAL_ERROR);
return;
}
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, platformId, device.getDeviceId(), channelId,
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId,
mediaTransmissionTCP);
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
@ -205,7 +207,6 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
try {
response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest());
getServerTransaction(evt).sendResponse(response);
} catch (ParseException | SipException | InvalidArgumentException e) {
e.printStackTrace();
}
@ -213,6 +214,78 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
if (logger.isDebugEnabled()) {
logger.debug(playResult.getResult().toString());
}
} else {
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = storager.queryVideoDevice(requesterId);
if (device != null) {
logger.info("收到设备" + requesterId + "的语音广播Invite请求");
responseAck(evt, Response.TRYING);
String contentString = new String(request.getRawContent());
// jainSip不支持y=字段, 移除移除以解析。
String substring = contentString;
String ssrc = "0000000404";
int ssrcIndex = contentString.indexOf("y=");
if (ssrcIndex > 0) {
substring = contentString.substring(0, ssrcIndex);
ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
}
ssrcIndex = substring.indexOf("f=");
if (ssrcIndex > 0) {
substring = contentString.substring(0, ssrcIndex);
}
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
// 获取支持的格式
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
// 查看是否支持PS 负载96
int port = -1;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false;
Boolean tcpActive = null;
for (int i = 0; i < mediaDescriptions.size(); i++) {
MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i);
Media media = mediaDescription.getMedia();
Vector mediaFormats = media.getMediaFormats(false);
if (mediaFormats.contains("8")) {
port = media.getMediaPort();
String protocol = media.getProtocol();
// 区分TCP发流还是udp 当前默认udp
if ("TCP/RTP/AVP".equals(protocol)) {
String setup = mediaDescription.getAttribute("setup");
if (setup != null) {
mediaTransmissionTCP = true;
if ("active".equals(setup)) {
tcpActive = true;
} else if ("passive".equals(setup)) {
tcpActive = false;
}
}
}
break;
}
}
if (port == -1) {
logger.info("不支持的媒体格式返回415");
// 回复不支持的格式
responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
return;
}
String username = sdp.getOrigin().getUsername();
String addressStr = sdp.getOrigin().getAddress();
logger.info("设备{}请求语音流,地址:{}:{}ssrc{}", username, addressStr, port, ssrc);
} else {
logger.warn("来自无效设备/平台的请求");
responseAck(evt, Response.BAD_REQUEST);
}
}
} catch (SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();

View File

@ -93,7 +93,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
private static final String MESSAGE_ALARM = "Alarm";
private static final String MESSAGE_RECORD_INFO = "RecordInfo";
private static final String MESSAGE_MEDIA_STATUS = "MediaStatus";
// private static final String MESSAGE_BROADCAST = "Broadcast";
private static final String MESSAGE_BROADCAST = "Broadcast";
private static final String MESSAGE_DEVICE_STATUS = "DeviceStatus";
private static final String MESSAGE_DEVICE_CONTROL = "DeviceControl";
private static final String MESSAGE_DEVICE_CONFIG = "DeviceConfig";
@ -123,7 +123,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
logger.info("接收到Catalog消息");
processMessageCatalogList(evt);
} else if (MESSAGE_DEVICE_INFO.equals(cmd)) {
//DeviceInfo消息处理
// DeviceInfo消息处理
processMessageDeviceInfo(evt);
} else if (MESSAGE_DEVICE_STATUS.equals(cmd)) {
// DeviceStatus消息处理
@ -149,6 +149,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
} else if (MESSAGE_PRESET_QUERY.equals(cmd)) {
logger.info("接收到PresetQuery消息");
processMessagePresetQuery(evt);
} else if (MESSAGE_BROADCAST.equals(cmd)) {
// Broadcast消息处理
processMessageBroadcast(evt);
} else {
logger.info("接收到消息:" + cmd);
responseAck(evt);
@ -298,7 +301,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
// 远程启动功能
if (!XmlUtil.isEmpty(XmlUtil.getText(rootElement, "TeleBoot"))) {
if (deviceId.equals(targetGBId)) {
// 远程启动功能需要在重新启动程序后先对SipStack解绑
// 远程启动本平台需要在重新启动程序后先对SipStack解绑
logger.info("执行远程启动本平台命令");
ParentPlatform parentPlatform = storager.queryParentPlatById(platformId);
cmderFroPlatform.unregister(parentPlatform, null, null);
@ -333,6 +336,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
// 远程启动指定设备
}
}
// 云台/前端控制命令
if (!XmlUtil.isEmpty(XmlUtil.getText(rootElement,"PTZCmd")) && !deviceId.equals(targetGBId)) {
String cmdString = XmlUtil.getText(rootElement,"PTZCmd");
Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, deviceId);
@ -895,6 +899,37 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
}
}
/**
* AudioBroadcast广Message
*
* @param evt
*/
private void processMessageBroadcast(RequestEvent evt) {
try {
Element rootElement = getRootElement(evt);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
// 回复200 OK
responseAck(evt);
if (rootElement.getName().equals("Response")) {
// 此处是对本平台发出Broadcast指令的应答
JSONObject json = new JSONObject();
XmlUtil.node2Json(rootElement, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_BROADCAST);
msg.setData(json);
deferredResultHolder.invokeResult(msg);
} else {
// 此处是上级发出的Broadcast指令
}
} catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
e.printStackTrace();
}
}
/***
* 200 OK

View File

@ -50,7 +50,6 @@ public class RegisterResponseProcessor implements ISIPResponseProcessor {
*/
@Override
public void process(ResponseEvent evt, SipLayer layer, SipConfig config) {
// TODO Auto-generated method stub
Response response = evt.getResponse();
CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME);
String callId = callIdHeader.getCallId();

View File

@ -81,6 +81,8 @@ public interface IRedisCatchStorage {
void delPlatformRegisterInfo(String callId);
void cleanPlatformRegisterInfos();
void updateSendRTPSever(SendRtpItem sendRtpItem);
/**

View File

@ -13,6 +13,7 @@ import org.springframework.stereotype.Component;
import java.util.*;
@SuppressWarnings("rawtypes")
@Component
public class RedisCatchStorageImpl implements IRedisCatchStorage {
@ -212,6 +213,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
redis.del(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + callId);
}
@Override
public void cleanPlatformRegisterInfos() {
List regInfos = redis.scan(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + "*");
for (Object key : regInfos) {
redis.del(key.toString());
}
}
@Override
public void updateSendRTPSever(SendRtpItem sendRtpItem) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId();

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.vmanager.play;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@ -27,6 +28,8 @@ import org.springframework.web.context.request.async.DeferredResult;
import java.util.UUID;
import javax.sip.message.Response;
@CrossOrigin
@RestController
@RequestMapping("/api")
@ -204,5 +207,47 @@ public class PlayController {
}
return new ResponseEntity<String>( result.toJSONString(), HttpStatus.OK);
}
/**
* 广API
*
* @param deviceId
*/
@GetMapping("/broadcast/{deviceId}")
@PostMapping("/broadcast/{deviceId}")
public DeferredResult<ResponseEntity<String>> broadcastApi(@PathVariable String deviceId) {
if (logger.isDebugEnabled()) {
logger.debug("语音广播API调用");
}
Device device = storager.queryVideoDevice(deviceId);
cmder.audioBroadcastCmd(device, event -> {
Response response = event.getResponse();
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId);
JSONObject json = new JSONObject();
json.put("DeviceID", deviceId);
json.put("CmdType", "Broadcast");
json.put("Result", "Failed");
json.put("Description", String.format("语音广播操作失败,错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
msg.setData(json);
resultHolder.invokeResult(msg);
});
DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(3 * 1000L);
result.onTimeout(() -> {
logger.warn(String.format("语音广播操作超时, 设备未返回应答指令"));
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId);
JSONObject json = new JSONObject();
json.put("DeviceID", deviceId);
json.put("CmdType", "Broadcast");
json.put("Result", "Failed");
json.put("Error", "Timeout. Device did not response to broadcast command.");
msg.setData(json);
resultHolder.invokeResult(msg);
});
resultHolder.put(DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId, result);
return result;
}
}

View File

@ -99,7 +99,7 @@ export default {
currentPage: parseInt(this.$route.params.page),
count: parseInt(this.$route.params.count),
total: 0,
beforeUrl: "/videoList",
beforeUrl: "/deviceList",
isLoging: false,
autoList: true
};
@ -131,7 +131,7 @@ export default {
this.currentPage = parseInt(this.$route.params.page);
this.count = parseInt(this.$route.params.count);
if (this.parentChannelId == "" || this.parentChannelId == 0) {
this.beforeUrl = "/videoList"
this.beforeUrl = "/deviceList"
}
},

View File

@ -81,7 +81,7 @@ export default {
parentChannelId: this.$route.params.parentChannelId,
updateLooper: 0, //
total: 0,
beforeUrl: "/videoList",
beforeUrl: "/deviceList",
isLoging: false,
autoList: false,
};
@ -111,7 +111,7 @@ export default {
// this.currentPage = parseInt(this.$route.params.page);
// this.count = parseInt(this.$route.params.count);
// if (this.parentChannelId == "" || this.parentChannelId == 0) {
// this.beforeUrl = "/videoList";
// this.beforeUrl = "/deviceList";
// }
},
initBaiduMap() {