diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/annotation/MsgId.java b/src/main/java/com/genersoft/iot/vmp/jt1078/annotation/MsgId.java new file mode 100644 index 00000000..d5c2de46 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/annotation/MsgId.java @@ -0,0 +1,15 @@ +package com.genersoft.iot.vmp.jt1078.annotation; + +import java.lang.annotation.*; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:31 + * @email qingtaij@163.com + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface MsgId { + String id(); +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java b/src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java new file mode 100644 index 00000000..ad3ab006 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java @@ -0,0 +1,56 @@ +package com.genersoft.iot.vmp.jt1078.cmd; + +import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd; +import com.genersoft.iot.vmp.jt1078.proc.response.J9101; +import com.genersoft.iot.vmp.jt1078.proc.response.J9102; +import com.genersoft.iot.vmp.jt1078.session.SessionManager; + +import java.util.Random; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:58 + * @email qingtaij@163.com + */ +public class JT1078Template { + + private final Random random = new Random(); + + /** + * 开启直播视频 + * + * @param devId 设备号 + * @param j9101 开启视频参数 + */ + public String startLive(String devId, J9101 j9101, Integer timeOut) { + Cmd cmd = new Cmd.Builder() + .setDevId(devId) + .setPackageNo(randomInt()) + .setMsgId("9101") + .setRespId("0001") + .setRs(j9101) + .build(); + return SessionManager.INSTANCE.request(cmd, timeOut); + } + + /** + * 关闭直播视频 + * + * @param devId 设备号 + * @param j9102 关闭视频参数 + */ + public String stopLive(String devId, J9102 j9102, Integer timeOut) { + Cmd cmd = new Cmd.Builder() + .setDevId(devId) + .setPackageNo(randomInt()) + .setMsgId("9102") + .setRespId("0001") + .setRs(j9102) + .build(); + return SessionManager.INSTANCE.request(cmd, timeOut); + } + + private Long randomInt() { + return (long) random.nextInt(1000) + 1; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java new file mode 100644 index 00000000..4817c665 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java @@ -0,0 +1,146 @@ +package com.genersoft.iot.vmp.jt1078.codec.decode; + +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.factory.CodecFactory; +import com.genersoft.iot.vmp.jt1078.proc.request.Re; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:10 + * @email qingtaij@163.com + */ +public class Jt808Decoder extends ByteToMessageDecoder { + private final static Logger log = LoggerFactory.getLogger(Jt808Decoder.class); + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + Session session = ctx.channel().attr(Session.KEY).get(); + log.info("> {} hex:{}", session, ByteBufUtil.hexDump(in)); + + try { + ByteBuf buf = unEscapeAndCheck(in); + + Header header = new Header(); + header.setMsgId(ByteBufUtil.hexDump(buf.readSlice(2))); + header.setMsgPro(buf.readUnsignedShort()); + if (header.is2019Version()) { + header.setVersion(buf.readUnsignedByte()); + String devId = ByteBufUtil.hexDump(buf.readSlice(10)); + header.setDevId(devId.replaceFirst("^0*", "")); + } else { + header.setDevId(ByteBufUtil.hexDump(buf.readSlice(6)).replaceFirst("^0*", "")); + } + header.setSn(buf.readUnsignedShort()); + + Re handler = CodecFactory.getHandler(header.getMsgId()); + if (handler == null) { + log.error("get msgId is null {}", header.getMsgId()); + return; + } + Rs decode = handler.decode(buf, header, session); + if (decode != null) { + out.add(decode); + } + } finally { + in.skipBytes(in.readableBytes()); + } + + + } + + + /** + * 转义与验证校验码 + * + * @param byteBuf 转义Buf + * @return 转义好的数据 + */ + public ByteBuf unEscapeAndCheck(ByteBuf byteBuf) throws Exception { + int low = byteBuf.readerIndex(); + int high = byteBuf.writerIndex(); + byte checkSum = 0; + int calculationCheckSum = 0; + + byte aByte = byteBuf.getByte(high - 2); + byte protocolEscapeFlag7d = 0x7d; + //0x7d转义 + byte protocolEscapeFlag01 = 0x01; + //0x7e转义 + byte protocolEscapeFlag02 = 0x02; + if (aByte == protocolEscapeFlag7d) { + byte b2 = byteBuf.getByte(high - 1); + if (b2 == protocolEscapeFlag01) { + checkSum = protocolEscapeFlag7d; + } else if (b2 == protocolEscapeFlag02) { + checkSum = 0x7e; + } else { + log.error("转义1异常:{}", ByteBufUtil.hexDump(byteBuf)); + throw new Exception("转义错误"); + } + high = high - 2; + } else { + high = high - 1; + checkSum = byteBuf.getByte(high); + } + List bufList = new ArrayList<>(); + int index = low; + while (index < high) { + byte b = byteBuf.getByte(index); + if (b == protocolEscapeFlag7d) { + byte c = byteBuf.getByte(index + 1); + if (c == protocolEscapeFlag01) { + ByteBuf slice = slice0x01(byteBuf, low, index); + bufList.add(slice); + b = protocolEscapeFlag7d; + } else if (c == protocolEscapeFlag02) { + ByteBuf slice = slice0x02(byteBuf, low, index); + bufList.add(slice); + b = 0x7e; + } else { + log.error("转义2异常:{}", ByteBufUtil.hexDump(byteBuf)); + throw new Exception("转义错误"); + } + index += 2; + low = index; + } else { + index += 1; + } + calculationCheckSum = calculationCheckSum ^ b; + } + + if (calculationCheckSum == checkSum) { + if (bufList.size() == 0) { + return byteBuf.slice(low, high); + } else { + bufList.add(byteBuf.slice(low, high - low)); + return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, bufList.size(), bufList); + } + } else { + log.info("{} 解析校验码:{}--计算校验码:{}", ByteBufUtil.hexDump(byteBuf), checkSum, calculationCheckSum); + throw new Exception("校验码错误!"); + } + } + + + private ByteBuf slice0x01(ByteBuf buf, int low, int sign) { + return buf.slice(low, sign - low + 1); + } + + private ByteBuf slice0x02(ByteBuf buf, int low, int sign) { + buf.setByte(sign, 0x7e); + return buf.slice(low, sign - low + 1); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java new file mode 100644 index 00000000..afb1a797 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java @@ -0,0 +1,33 @@ +package com.genersoft.iot.vmp.jt1078.codec.encode; + + +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:10 + * @email qingtaij@163.com + */ +public class Jt808Encoder extends MessageToByteEncoder { + private final static Logger log = LoggerFactory.getLogger(Jt808Encoder.class); + + @Override + protected void encode(ChannelHandlerContext ctx, Rs msg, ByteBuf out) throws Exception { + Session session = ctx.channel().attr(Session.KEY).get(); + + ByteBuf encode = Jt808EncoderCmd.encode(msg, session, session.nextSerialNo()); + if(encode!=null){ + log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode)); + out.writeBytes(encode); + } + } + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808EncoderCmd.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808EncoderCmd.java new file mode 100644 index 00000000..0e9e11f6 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808EncoderCmd.java @@ -0,0 +1,151 @@ +package com.genersoft.iot.vmp.jt1078.codec.encode; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import com.genersoft.iot.vmp.jt1078.util.Bin; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.ByteProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +import java.util.LinkedList; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:25 + * @email qingtaij@163.com + */ +public class Jt808EncoderCmd extends MessageToByteEncoder { + private final static Logger log = LoggerFactory.getLogger(Jt808EncoderCmd.class); + + @Override + protected void encode(ChannelHandlerContext ctx, Cmd cmd, ByteBuf out) throws Exception { + Session session = ctx.channel().attr(Session.KEY).get(); + Rs msg = cmd.getRs(); + ByteBuf encode = encode(msg, session, cmd.getPackageNo().intValue()); + if (encode != null) { + log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode)); + out.writeBytes(encode); + } + } + + + public static ByteBuf encode(Rs msg, Session session, Integer packageNo) { + String id = msg.getClass().getAnnotation(MsgId.class).id(); + if (!StringUtils.hasLength(id)) { + log.error("Not find msgId"); + return null; + } + + ByteBuf byteBuf = Unpooled.buffer(); + + byteBuf.writeBytes(ByteBufUtil.decodeHexDump(id)); + + ByteBuf encode = msg.encode(); + + Header header = msg.getHeader(); + if (header == null) { + header = session.getHeader(); + } + + if (header.is2019Version()) { + // 消息体属性 + byteBuf.writeShort(encode.readableBytes() | 1 << 14); + + // 版本号 + byteBuf.writeByte(header.getVersion()); + + // 终端手机号 + byteBuf.writeBytes(ByteBufUtil.decodeHexDump(Bin.strHexPaddingLeft(header.getDevId(), 20))); + } else { + // 消息体属性 + byteBuf.writeShort(encode.readableBytes()); + + byteBuf.writeBytes(ByteBufUtil.decodeHexDump(Bin.strHexPaddingLeft(header.getDevId(), 12))); + } + + // 消息体流水号 + byteBuf.writeShort(packageNo); + + // 写入消息体 + byteBuf.writeBytes(encode); + + // 计算校验码,并反转义 + byteBuf = escapeAndCheck0(byteBuf); + return byteBuf; + } + + + private static final ByteProcessor searcher = value -> !(value == 0x7d || value == 0x7e); + + //转义与校验 + public static ByteBuf escapeAndCheck0(ByteBuf source) { + + sign(source); + + int low = source.readerIndex(); + int high = source.writerIndex(); + + LinkedList bufList = new LinkedList<>(); + int mark, len; + while ((mark = source.forEachByte(low, high - low, searcher)) > 0) { + + len = mark + 1 - low; + ByteBuf[] slice = slice(source, low, len); + bufList.add(slice[0]); + bufList.add(slice[1]); + low += len; + } + + if (bufList.size() > 0) { + bufList.add(source.slice(low, high - low)); + } else { + bufList.add(source); + } + + ByteBuf delimiter = Unpooled.buffer(1, 1).writeByte(0x7e).retain(); + bufList.addFirst(delimiter); + bufList.addLast(delimiter); + + CompositeByteBuf byteBufLs = Unpooled.compositeBuffer(bufList.size()); + byteBufLs.addComponents(true, bufList); + return byteBufLs; + } + + public static void sign(ByteBuf buf) { + byte checkCode = bcc(buf); + buf.writeByte(checkCode); + } + + public static byte bcc(ByteBuf byteBuf) { + byte cs = 0; + while (byteBuf.isReadable()) + cs ^= byteBuf.readByte(); + byteBuf.resetReaderIndex(); + return cs; + } + + protected static ByteBuf[] slice(ByteBuf byteBuf, int index, int length) { + byte first = byteBuf.getByte(index + length - 1); + + ByteBuf[] byteBufList = new ByteBuf[2]; + byteBufList[0] = byteBuf.retainedSlice(index, length); + + if (first == 0x7d) { + byteBufList[1] = Unpooled.buffer(1, 1).writeByte(0x01); + } else { + byteBuf.setByte(index + length - 1, 0x7d); + byteBufList[1] = Unpooled.buffer(1, 1).writeByte(0x02); + } + return byteBufList; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/Jt808Handler.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/Jt808Handler.java new file mode 100644 index 00000000..fd503027 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/Jt808Handler.java @@ -0,0 +1,72 @@ +package com.genersoft.iot.vmp.jt1078.codec.netty; + +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import com.genersoft.iot.vmp.jt1078.session.SessionManager; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:14 + * @email qingtaij@163.com + */ +public class Jt808Handler extends ChannelInboundHandlerAdapter { + + private final static Logger log = LoggerFactory.getLogger(Jt808Handler.class); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Rs) { + ctx.writeAndFlush(msg); + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + Session session = SessionManager.INSTANCE.newSession(channel); + channel.attr(Session.KEY).set(session); + log.info("> Tcp connect {}", session); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + Session session = ctx.channel().attr(Session.KEY).get(); + log.info("< Tcp disconnect {}", session); + ctx.close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) { + Session session = ctx.channel().attr(Session.KEY).get(); + String message = e.getMessage(); + if (message.toLowerCase().contains("Connection reset by peer".toLowerCase())) { + log.info("< exception{} {}", session, e.getMessage()); + } else { + log.info("< exception{} {}", session, e.getMessage(), e); + } + + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + IdleState state = event.state(); + if (state == IdleState.READER_IDLE || state == IdleState.WRITER_IDLE) { + Session session = ctx.channel().attr(Session.KEY).get(); + log.warn("< Proactively disconnect{}", session); + ctx.close(); + } + } + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java new file mode 100644 index 00000000..a7e4df82 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java @@ -0,0 +1,112 @@ +package com.genersoft.iot.vmp.jt1078.codec.netty; + +import com.genersoft.iot.vmp.jt1078.codec.decode.Jt808Decoder; +import com.genersoft.iot.vmp.jt1078.codec.encode.Jt808Encoder; +import com.genersoft.iot.vmp.jt1078.codec.encode.Jt808EncoderCmd; +import com.genersoft.iot.vmp.jt1078.proc.factory.CodecFactory; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioChannelOption; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:01 + * @email qingtaij@163.com + */ + +public class TcpServer { + private final static Logger log = LoggerFactory.getLogger(TcpServer.class); + + private final Integer port; + private boolean isRunning = false; + private EventLoopGroup bossGroup = null; + private EventLoopGroup workerGroup = null; + + private final ByteBuf DECODER_JT808 = Unpooled.wrappedBuffer(new byte[]{0x7e}); + + public TcpServer(Integer port) { + this.port = port; + } + + private void startTcpServer() { + try { + CodecFactory.init(); + this.bossGroup = new NioEventLoopGroup(); + this.workerGroup = new NioEventLoopGroup(); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.channel(NioServerSocketChannel.class); + bootstrap.group(bossGroup, workerGroup); + + bootstrap.option(NioChannelOption.SO_BACKLOG, 1024) + .option(NioChannelOption.SO_REUSEADDR, true) + .childOption(NioChannelOption.TCP_NODELAY, true) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(NioSocketChannel channel) { + channel.pipeline() + .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES)) + .addLast(new DelimiterBasedFrameDecoder(1024 * 2, DECODER_JT808)) + .addLast(new Jt808Decoder()) + .addLast(new Jt808Encoder()) + .addLast(new Jt808EncoderCmd()) + .addLast(new Jt808Handler()); + } + }); + ChannelFuture channelFuture = bootstrap.bind(port).sync(); + // 监听设备TCP端口是否启动成功 + channelFuture.addListener(future -> { + if (!future.isSuccess()) { + log.error("Binding port:{} fail! cause: {}", port, future.cause().getCause(), future.cause()); + } + }); + log.info("服务:JT808 Server 启动成功, port:{}", port); + channelFuture.channel().closeFuture().sync(); + } catch (Exception e) { + log.warn("服务:JT808 Server 启动异常, port:{},{}", port, e.getMessage(), e); + } finally { + stop(); + } + } + + /** + * 开启一个新的线程,拉起来Netty + */ + public synchronized void start() { + if (this.isRunning) { + log.warn("服务:JT808 Server 已经启动, port:{}", port); + return; + } + this.isRunning = true; + new Thread(this::startTcpServer).start(); + } + + public synchronized void stop() { + if (!this.isRunning) { + log.warn("服务:JT808 Server 已经停止, port:{}", port); + } + this.isRunning = false; + Future future = this.bossGroup.shutdownGracefully(); + if (!future.isSuccess()) { + log.warn("bossGroup 无法正常停止", future.cause()); + } + future = this.workerGroup.shutdownGracefully(); + if (!future.isSuccess()) { + log.warn("workerGroup 无法正常停止", future.cause()); + } + log.warn("服务:JT808 Server 已经停止, port:{}", port); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java b/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java new file mode 100644 index 00000000..cffb147d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java @@ -0,0 +1,46 @@ +package com.genersoft.iot.vmp.jt1078.config; + +import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; +import com.genersoft.iot.vmp.jt1078.proc.response.J9101; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; + +/** + * curl http://localhost:18080/api/jt1078/start/live/18864197066/1 + * + * @author QingtaiJiang + * @date 2023/4/27 18:12 + * @email qingtaij@163.com + */ +@ConditionalOnProperty(value = "jt1078.enable", havingValue = "true") +@RestController +@RequestMapping("/api/jt1078") +public class JT1078Controller { + + @Resource + JT1078Template jt1078Template; + + @GetMapping("/start/live/{deviceId}/{channelId}") + public WVPResult startLive(@PathVariable String deviceId, @PathVariable String channelId) { + J9101 j9101 = new J9101(); + j9101.setChannel(Integer.valueOf(channelId)); + j9101.setIp("192.168.1.1"); + j9101.setRate(1); + j9101.setTcpPort(7618); + j9101.setUdpPort(7618); + j9101.setType(0); + + String s = jt1078Template.startLive(deviceId, j9101, 6); + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(200); + wvpResult.setData(String.format("http://192.168.1.1/rtp/%s_%s.live.mp4", deviceId, channelId)); + return wvpResult; + } +} + diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/config/TcpAutoConfiguration.java b/src/main/java/com/genersoft/iot/vmp/jt1078/config/TcpAutoConfiguration.java new file mode 100644 index 00000000..0b07bb43 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/config/TcpAutoConfiguration.java @@ -0,0 +1,30 @@ +package com.genersoft.iot.vmp.jt1078.config; + +import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; +import com.genersoft.iot.vmp.jt1078.codec.netty.TcpServer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; + +/** + * @author QingtaiJiang + * @date 2023/4/27 19:35 + * @email qingtaij@163.com + */ +@Order(Integer.MIN_VALUE) +@Configuration +@ConditionalOnProperty(value = "jt1078.enable", havingValue = "true") +public class TcpAutoConfiguration { + + @Bean(initMethod = "start", destroyMethod = "stop") + public TcpServer jt1078Server(@Value("${jt1078.port}") Integer port) { + return new TcpServer(port); + } + + @Bean + public JT1078Template jt1078Template() { + return new JT1078Template(); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java new file mode 100644 index 00000000..86c5fff2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java @@ -0,0 +1,76 @@ +package com.genersoft.iot.vmp.jt1078.proc; + +import com.genersoft.iot.vmp.jt1078.util.Bin; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:22 + * @email qingtaij@163.com + */ +public class Header { + // 消息ID + String msgId; + + // 消息体属性 + Integer msgPro; + + // 标识 + String devId; + + // 消息体流水号 + Integer sn; + + // 协议版本号 + Short version = -1; + + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public Integer getMsgPro() { + return msgPro; + } + + public void setMsgPro(Integer msgPro) { + this.msgPro = msgPro; + } + + public String getDevId() { + return devId; + } + + public void setDevId(String devId) { + this.devId = devId; + } + + public Integer getSn() { + return sn; + } + + public void setSn(Integer sn) { + this.sn = sn; + } + + public Short getVersion() { + return version; + } + + public void setVersion(Short version) { + this.version = version; + } + + /** + * 判断是否是2019的版本 + * + * @return true 2019后的版本。false 2013 + */ + public boolean is2019Version() { + return Bin.get(msgPro, 14); + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/entity/Cmd.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/entity/Cmd.java new file mode 100644 index 00000000..19d6d8f1 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/entity/Cmd.java @@ -0,0 +1,105 @@ +package com.genersoft.iot.vmp.jt1078.proc.entity; + +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:23 + * @email qingtaij@163.com + */ +public class Cmd { + String devId; + Long packageNo; + String msgId; + String respId; + Rs rs; + + public Cmd() { + } + + public Cmd(Builder builder) { + this.devId = builder.devId; + this.packageNo = builder.packageNo; + this.msgId = builder.msgId; + this.respId = builder.respId; + this.rs = builder.rs; + } + + public String getDevId() { + return devId; + } + + public void setDevId(String devId) { + this.devId = devId; + } + + public Long getPackageNo() { + return packageNo; + } + + public void setPackageNo(Long packageNo) { + this.packageNo = packageNo; + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public String getRespId() { + return respId; + } + + public void setRespId(String respId) { + this.respId = respId; + } + + public Rs getRs() { + return rs; + } + + public void setRs(Rs rs) { + this.rs = rs; + } + + public static class Builder { + String devId; + Long packageNo; + String msgId; + String respId; + Rs rs; + + public Builder setDevId(String devId) { + this.devId = devId.replaceFirst("^0*", ""); + return this; + } + + public Builder setPackageNo(Long packageNo) { + this.packageNo = packageNo; + return this; + } + + public Builder setMsgId(String msgId) { + this.msgId = msgId; + return this; + } + + public Builder setRespId(String respId) { + this.respId = respId; + return this; + } + + public Builder setRs(Rs re) { + this.rs = re; + return this; + } + + public Cmd build() { + return new Cmd(this); + } + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/factory/CodecFactory.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/factory/CodecFactory.java new file mode 100644 index 00000000..45d5fc71 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/factory/CodecFactory.java @@ -0,0 +1,44 @@ +package com.genersoft.iot.vmp.jt1078.proc.factory; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.request.Re; +import com.genersoft.iot.vmp.jt1078.util.ClassUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:29 + * @email qingtaij@163.com + */ + +public class CodecFactory { + private final static Logger log = LoggerFactory.getLogger(CodecFactory.class); + + private static Map> protocolHash; + + public static void init() { + protocolHash = new HashMap<>(); + List> classList = ClassUtil.getClassList("com.genersoft.iot.vmp.jt1078.proc", MsgId.class); + for (Class handlerClass : classList) { + String id = handlerClass.getAnnotation(MsgId.class).id(); + protocolHash.put(id, handlerClass); + } + if (log.isDebugEnabled()) { + log.debug("消息ID缓存表 protocolHash:{}", protocolHash); + } + } + + public static Re getHandler(String msgId) { + Class aClass = protocolHash.get(msgId); + Object bean = ClassUtil.getBean(aClass); + if (bean instanceof Re) { + return (Re) bean; + } + return null; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0001.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0001.java new file mode 100644 index 00000000..1d7f85db --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0001.java @@ -0,0 +1,50 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import com.genersoft.iot.vmp.jt1078.session.SessionManager; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; + +/** + * 终端通用应答 + * + * @author QingtaiJiang + * @date 2023/4/27 18:04 + * @email qingtaij@163.com + */ +@MsgId(id = "0001") +public class J0001 extends Re { + int respNo; + String respId; + int result; + + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + respNo = buf.readUnsignedShort(); + respId = ByteBufUtil.hexDump(buf.readSlice(2)); + result = buf.readUnsignedByte(); + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + SessionManager.INSTANCE.response(header.getDevId(), "0001", (long) respNo, JSON.toJSONString(this)); + return null; + } + + public int getRespNo() { + return respNo; + } + + public String getRespId() { + return respId; + } + + public int getResult() { + return result; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0002.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0002.java new file mode 100644 index 00000000..f52303a6 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0002.java @@ -0,0 +1,32 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.J8001; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; + +/** + * 终端心跳 + * + * @author QingtaiJiang + * @date 2023/4/27 18:04 + * @email qingtaij@163.com + */ +@MsgId(id = "0002") +public class J0002 extends Re { + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + J8001 j8001 = new J8001(); + j8001.setRespNo(header.getSn()); + j8001.setRespId(header.getMsgId()); + j8001.setResult(J8001.SUCCESS); + return j8001; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0004.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0004.java new file mode 100644 index 00000000..0f00a801 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0004.java @@ -0,0 +1,27 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; + +/** + * 查询服务器时间 + * + * @author QingtaiJiang + * @date 2023/4/27 18:06 + * @email qingtaij@163.com + */ +@MsgId(id = "0004") +public class J0004 extends Re { + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + return null; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0100.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0100.java new file mode 100644 index 00000000..a731dda6 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0100.java @@ -0,0 +1,56 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.J8100; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; + +/** + * 终端注册 + * + * @author QingtaiJiang + * @date 2023/4/27 18:06 + * @email qingtaij@163.com + */ +@MsgId(id = "0100") +public class J0100 extends Re { + + private int provinceId; + + private int cityId; + + private String makerId; + + private String deviceModel; + + private String deviceId; + + private int plateColor; + + private String plateNo; + + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + Short version = header.getVersion(); + provinceId = buf.readUnsignedShort(); + if (version > 1) { + cityId = buf.readUnsignedShort(); + // decode as 2019 + } else { + int i = buf.readUnsignedShort(); + // decode as 2013 + } + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + J8100 j8100 = new J8100(); + j8100.setRespNo(header.getSn()); + j8100.setResult(J8100.SUCCESS); + j8100.setCode("WVP_YYDS"); + return j8100; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0102.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0102.java new file mode 100644 index 00000000..8e531ae4 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0102.java @@ -0,0 +1,36 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.J8001; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; + +/** + * 终端鉴权 + * + * @author QingtaiJiang + * @date 2023/4/27 18:06 + * @email qingtaij@163.com + */ +@MsgId(id = "0102") +public class J0102 extends Re { + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + int lenCode = buf.readUnsignedByte(); +// String code = buf.readCharSequence(lenCode, CharsetUtil.UTF_8).toString(); + // if 2019 to decode next + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + J8001 j8001 = new J8001(); + j8001.setRespNo(header.getSn()); + j8001.setRespId(header.getMsgId()); + j8001.setResult(J8001.SUCCESS); + return j8001; + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java new file mode 100644 index 00000000..d027dd2e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java @@ -0,0 +1,32 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.J8001; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; + +/** + * 实时消息上报 + * + * @author QingtaiJiang + * @date 2023/4/27 18:06 + * @email qingtaij@163.com + */ +@MsgId(id = "0200") +public class J0200 extends Re { + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + J8001 j8001 = new J8001(); + j8001.setRespNo(header.getSn()); + j8001.setRespId(header.getMsgId()); + j8001.setResult(J8001.SUCCESS); + return j8001; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/Re.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/Re.java new file mode 100644 index 00000000..0a24ad27 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/Re.java @@ -0,0 +1,40 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:50 + * @email qingtaij@163.com + */ +public abstract class Re { + private final static Logger log = LoggerFactory.getLogger(Re.class); + + protected abstract Rs decode0(ByteBuf buf, Header header, Session session); + + protected abstract Rs handler(Header header, Session session); + + public Rs decode(ByteBuf buf, Header header, Session session) { + if (session != null && !StringUtils.hasLength(session.getDevId())) { + session.register(header.getDevId(), (int) header.getVersion(), header); + } + Rs rs = decode0(buf, header, session); + Rs rsHand = handler(header, session); + if (rs == null && rsHand != null) { + rs = rsHand; + } else if (rs != null && rsHand != null) { + log.warn("decode0:{} 与 handler:{} 返回值冲突,采用decode0返回值", rs, rsHand); + } + if (rs != null) { + rs.setHeader(header); + } + + return rs; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8001.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8001.java new file mode 100644 index 00000000..ec9e31f1 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8001.java @@ -0,0 +1,43 @@ +package com.genersoft.iot.vmp.jt1078.proc.response; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:48 + * @email qingtaij@163.com + */ +@MsgId(id = "8001") +public class J8001 extends Rs { + public static final Integer SUCCESS = 0; + + Integer respNo; + String respId; + Integer result; + + @Override + public ByteBuf encode() { + ByteBuf buffer = Unpooled.buffer(); + buffer.writeShort(respNo); + buffer.writeBytes(ByteBufUtil.decodeHexDump(respId)); + buffer.writeByte(result); + + return buffer; + } + + + public void setRespNo(Integer respNo) { + this.respNo = respNo; + } + + public void setRespId(String respId) { + this.respId = respId; + } + + public void setResult(Integer result) { + this.result = result; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8100.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8100.java new file mode 100644 index 00000000..48a9c95e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8100.java @@ -0,0 +1,41 @@ +package com.genersoft.iot.vmp.jt1078.proc.response; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:40 + * @email qingtaij@163.com + */ +@MsgId(id = "8100") +public class J8100 extends Rs { + public static final Integer SUCCESS = 0; + + Integer respNo; + Integer result; + String code; + + @Override + public ByteBuf encode() { + ByteBuf buffer = Unpooled.buffer(); + buffer.writeShort(respNo); + buffer.writeByte(result); + buffer.writeCharSequence(code, CharsetUtil.UTF_8); + return buffer; + } + + public void setRespNo(Integer respNo) { + this.respNo = respNo; + } + + public void setResult(Integer result) { + this.result = result; + } + + public void setCode(String code) { + this.code = code; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9101.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9101.java new file mode 100644 index 00000000..d6713723 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9101.java @@ -0,0 +1,110 @@ +package com.genersoft.iot.vmp.jt1078.proc.response; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:25 + * @email qingtaij@163.com + */ +@MsgId(id = "9101") +public class J9101 extends Rs { + String ip; + + // TCP端口 + Integer tcpPort; + + // UDP端口 + Integer udpPort; + + // 逻辑通道号 + Integer channel; + + // 数据类型 + /** + * 0:音视频,1:视频,2:双向对讲,3:监听,4:中心广播,5:透传 + */ + Integer type; + + // 码流类型 + /** + * 0:主码流,1:子码流 + */ + Integer rate; + + @Override + public ByteBuf encode() { + ByteBuf buffer = Unpooled.buffer(); + buffer.writeByte(ip.getBytes().length); + buffer.writeCharSequence(ip, CharsetUtil.UTF_8); + buffer.writeShort(tcpPort); + buffer.writeShort(udpPort); + buffer.writeByte(channel); + buffer.writeByte(type); + buffer.writeByte(rate); + return buffer; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public Integer getTcpPort() { + return tcpPort; + } + + public void setTcpPort(Integer tcpPort) { + this.tcpPort = tcpPort; + } + + public Integer getUdpPort() { + return udpPort; + } + + public void setUdpPort(Integer udpPort) { + this.udpPort = udpPort; + } + + public Integer getChannel() { + return channel; + } + + public void setChannel(Integer channel) { + this.channel = channel; + } + + public Integer getType() { + return type; + } + + public void setType(Integer type) { + this.type = type; + } + + public Integer getRate() { + return rate; + } + + public void setRate(Integer rate) { + this.rate = rate; + } + + @Override + public String toString() { + return "J9101{" + + "ip='" + ip + '\'' + + ", tcpPort=" + tcpPort + + ", udpPort=" + udpPort + + ", channel=" + channel + + ", type=" + type + + ", rate=" + rate + + '}'; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9102.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9102.java new file mode 100644 index 00000000..f92fe8e7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9102.java @@ -0,0 +1,85 @@ +package com.genersoft.iot.vmp.jt1078.proc.response; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:49 + * @email qingtaij@163.com + */ +public class J9102 extends Rs { + + // 通道号 + Integer channel; + + // 控制指令 + /** + * 0:关闭音视频传输指令; + * 1:切换码流(增加暂停和继续); + * 2:暂停该通道所有流的发送; + * 3:恢复暂停前流的发送,与暂停前的流类型一致; + * 4:关闭双向对讲 + */ + Integer command; + + // 数据类型 + /** + * 0:关闭该通道有关的音视频数据; + * 1:只关闭该通道有关的音频,保留该通道 + * 有关的视频; + * 2:只关闭该通道有关的视频,保留该通道 + * 有关的音频 + */ + Integer closeType; + + // 数据类型 + /** + * 0:主码流; + * 1:子码流 + */ + Integer streamType; + + @Override + public ByteBuf encode() { + ByteBuf buffer = Unpooled.buffer(); + buffer.writeByte(channel); + buffer.writeByte(command); + buffer.writeByte(closeType); + buffer.writeByte(streamType); + return null; + } + + + public Integer getChannel() { + return channel; + } + + public void setChannel(Integer channel) { + this.channel = channel; + } + + public Integer getCommand() { + return command; + } + + public void setCommand(Integer command) { + this.command = command; + } + + public Integer getCloseType() { + return closeType; + } + + public void setCloseType(Integer closeType) { + this.closeType = closeType; + } + + public Integer getStreamType() { + return streamType; + } + + public void setStreamType(Integer streamType) { + this.streamType = streamType; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/Rs.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/Rs.java new file mode 100644 index 00000000..243cd942 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/Rs.java @@ -0,0 +1,27 @@ +package com.genersoft.iot.vmp.jt1078.proc.response; + + +import com.genersoft.iot.vmp.jt1078.proc.Header; +import io.netty.buffer.ByteBuf; + + +/** + * @author QingtaiJiang + * @date 2021/8/30 18:54 + * @email qingtaij@163.com + */ + +public abstract class Rs { + private Header header; + + public abstract ByteBuf encode(); + + + public Header getHeader() { + return header; + } + + public void setHeader(Header header) { + this.header = header; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java b/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java new file mode 100644 index 00000000..f7df8de0 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java @@ -0,0 +1,114 @@ +package com.genersoft.iot.vmp.jt1078.session; + +import com.genersoft.iot.vmp.jt1078.proc.Header; +import io.netty.channel.Channel; +import io.netty.util.AttributeKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:54 + * @email qingtaij@163.com + */ +public class Session { + private final static Logger log = LoggerFactory.getLogger(Session.class); + + public static final AttributeKey KEY = AttributeKey.newInstance(Session.class.getName()); + + // Netty的channel + protected final Channel channel; + + // 原子类的自增ID + private final AtomicInteger serialNo = new AtomicInteger(0); + + // 是否注册成功 + private boolean registered = false; + + // 设备ID + private String devId; + + // 创建时间 + private final long creationTime; + + // 协议版本号 + private Integer protocolVersion; + + private Header header; + + protected Session(Channel channel) { + this.channel = channel; + this.creationTime = System.currentTimeMillis(); + } + + public void writeObject(Object message) { + log.info("<<<<<<<<<< cmd{},{}", this, message); + channel.writeAndFlush(message); + } + + /** + * 获得下一个流水号 + * + * @return 流水号 + */ + public int nextSerialNo() { + int current; + int next; + do { + current = serialNo.get(); + next = current > 0xffff ? 0 : current; + } while (!serialNo.compareAndSet(current, next + 1)); + return next; + } + + /** + * 注册session + * + * @param devId 设备ID + */ + public void register(String devId, Integer version, Header header) { + this.devId = devId; + this.registered = true; + this.protocolVersion = version; + this.header = header; + SessionManager.INSTANCE.put(devId, this); + } + + /** + * 获取设备号 + * + * @return 设备号 + */ + public String getDevId() { + return devId; + } + + + public boolean isRegistered() { + return registered; + } + + public long getCreationTime() { + return creationTime; + } + + public Integer getProtocolVersion() { + return protocolVersion; + } + + public Header getHeader() { + return header; + } + + @Override + public String toString() { + return "[" + + "devId=" + devId + + ", reg=" + registered + + ", version=" + protocolVersion + + ",ip=" + channel.remoteAddress() + + ']'; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java b/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java new file mode 100644 index 00000000..9347249e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java @@ -0,0 +1,127 @@ +package com.genersoft.iot.vmp.jt1078.session; + +import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + + +/** + * @author QingtaiJiang + * @date 2023/4/27 19:54 + * @email qingtaij@163.com + */ +public enum SessionManager { + INSTANCE; + private final static Logger log = LoggerFactory.getLogger(SessionManager.class); + + // 用与消息的缓存 + private final Map> topicSubscribers = new ConcurrentHashMap<>(); + + // session的缓存 + private final Map sessionMap; + + SessionManager() { + this.sessionMap = new ConcurrentHashMap<>(); + } + + /** + * 创建新的Session + * + * @param channel netty通道 + * @return 创建的session对象 + */ + public Session newSession(Channel channel) { + return new Session(channel); + } + + + /** + * 获取指定设备的Session + * + * @param clientId 设备Id + * @return Session + */ + public Session get(Object clientId) { + return sessionMap.get(clientId); + } + + /** + * 放入新设备连接的session + * + * @param clientId 设备ID + * @param newSession session + */ + protected void put(Object clientId, Session newSession) { + sessionMap.put(clientId, newSession); + } + + + /** + * 发送同步消息,接收响应 + * 默认超时时间6秒 + */ + public String request(Cmd cmd) { + // 默认6秒 + int timeOut = 6000; + return request(cmd, timeOut); + } + + public String request(Cmd cmd, Integer timeOut) { + Session session = this.get(cmd.getDevId()); + if (session == null) { + log.error("DevId: {} not online!", cmd.getDevId()); + return "-1"; + } + String requestKey = requestKey(cmd.getDevId(), cmd.getRespId(), cmd.getPackageNo()); + SynchronousQueue subscribe = subscribe(requestKey); + if (subscribe == null) { + log.error("DevId: {} key:{} send repaid", cmd.getDevId(), requestKey); + return "-1"; + } + session.writeObject(cmd); + try { + return subscribe.poll(timeOut, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.warn("<<<<<<<<<< timeout" + session, e); + } finally { + this.unsubscribe(requestKey); + } + return null; + } + + public Boolean response(String devId, String respId, Long responseNo, String data) { + String requestKey = requestKey(devId, respId, responseNo); + SynchronousQueue queue = topicSubscribers.get(requestKey); + if (queue != null) { + try { + return queue.offer(data, 2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("{}", e.getMessage(), e); + } + } + log.warn("未找到对应回复指令,key:{} 消息:{} ", requestKey, data); + return false; + } + + private void unsubscribe(String key) { + topicSubscribers.remove(key); + } + + private SynchronousQueue subscribe(String key) { + SynchronousQueue queue = null; + if (!topicSubscribers.containsKey(key)) + topicSubscribers.put(key, queue = new SynchronousQueue()); + return queue; + } + + private String requestKey(String devId, String respId, Long requestNo) { + return String.join("_", devId.replaceFirst("^0*", ""), respId, requestNo.toString()); + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/util/Bin.java b/src/main/java/com/genersoft/iot/vmp/jt1078/util/Bin.java new file mode 100644 index 00000000..31f8b930 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/util/Bin.java @@ -0,0 +1,41 @@ +package com.genersoft.iot.vmp.jt1078.util; + +/** + * 32位整型的二进制读写 + */ +public class Bin { + + private static final int[] bits = new int[32]; + + static { + bits[0] = 1; + for (int i = 1; i < bits.length; i++) { + bits[i] = bits[i - 1] << 1; + } + } + + /** + * 读取n的第i位 + * + * @param n int32 + * @param i 取值范围0-31 + */ + public static boolean get(int n, int i) { + return (n & bits[i]) == bits[i]; + } + + /** + * 不足位数从左边加0 + */ + public static String strHexPaddingLeft(String data, int length) { + int dataLength = data.length(); + if (dataLength < length) { + StringBuilder dataBuilder = new StringBuilder(data); + for (int i = dataLength; i < length; i++) { + dataBuilder.insert(0, "0"); + } + data = dataBuilder.toString(); + } + return data; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/util/ClassUtil.java b/src/main/java/com/genersoft/iot/vmp/jt1078/util/ClassUtil.java new file mode 100644 index 00000000..3dcb1b86 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/util/ClassUtil.java @@ -0,0 +1,112 @@ +package com.genersoft.iot.vmp.jt1078.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.core.io.support.ResourcePatternResolver; + +import java.lang.annotation.Annotation; +import java.util.LinkedList; +import java.util.List; + +public class ClassUtil { + + private static final Logger logger = LoggerFactory.getLogger(ClassUtil.class); + + + public static Object getBean(Class clazz) { + if (clazz != null) { + try { + return clazz.getDeclaredConstructor().newInstance(); + } catch (Exception ex) { + logger.error("ClassUtil:找不到指定的类", ex); + } + } + return null; + } + + + public static Object getBean(String className) { + Class clazz = null; + try { + clazz = Class.forName(className); + } catch (Exception ex) { + logger.error("ClassUtil:找不到指定的类"); + } + if (clazz != null) { + try { + //获取声明的构造器--》创建实例 + return clazz.getDeclaredConstructor().newInstance(); + } catch (Exception ex) { + logger.error("ClassUtil:找不到指定的类", ex); + } + } + return null; + } + + + /** + * 获取包下所有带注解的class + * + * @param packageName 包名 + * @param annotationClass 注解类型 + * @return list + */ + public static List> getClassList(String packageName, Class annotationClass) { + List> classList = getClassList(packageName); + classList.removeIf(next -> !next.isAnnotationPresent(annotationClass)); + return classList; + } + + public static List> getClassList(String... packageName) { + List> classList = new LinkedList<>(); + for (String s : packageName) { + List> c = getClassList(s); + classList.addAll(c); + } + return classList; + } + + public static List> getClassList(String packageName) { + List> classList = new LinkedList<>(); + try { + ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver(); + Resource[] resources = resourcePatternResolver.getResources(packageName.replace(".", "/") + "/**/*.class"); + for (Resource resource : resources) { + String url = resource.getURL().toString(); + + String[] split = url.split(packageName.replace(".", "/")); + String s = split[split.length - 1]; + String className = s.replace("/", "."); + className = className.substring(0, className.lastIndexOf(".")); + doAddClass(classList, packageName + className); + } + + } catch (Exception e) { + throw new RuntimeException(e); + } + return classList; + } + + private static void doAddClass(List> classList, String className) { + Class cls = loadClass(className, false); + classList.add(cls); + } + + public static Class loadClass(String className, boolean isInitialized) { + Class cls; + try { + cls = Class.forName(className, isInitialized, getClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + return cls; + } + + + public static ClassLoader getClassLoader() { + return Thread.currentThread().getContextClassLoader(); + } + +} diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index 0fba9a92..cc2145a4 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -92,6 +92,15 @@ sip: # 是否存储alarm信息 alarm: false +# 做为JT1078服务器的配置 +jt1078: + #[必须修改] 是否开启1078的服务 + enable: true + #[必修修改] 1708设备接入的端口 + port: 21078 + #[可选] 设备鉴权的密码 + password: admin123 + #zlm 默认服务器配置 media: # [必须修改] zlm服务器唯一id,用于触发hook时区别是哪台服务器,general.mediaServerId