commit 0725168a3e859b7cdce84859139bc7ce6f41077d Author: 剑器近 Date: Tue Nov 3 14:00:28 2020 +0800 Initial commit. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e34678e --- /dev/null +++ b/.gitignore @@ -0,0 +1,42 @@ +# Operating System Files + +*.DS_Store +Thumbs.db +*.sw? +.#* +*# +*~ +*.sublime-* + +# Build Artifacts + +.git* +.gradle/ +build/ +target/ +bin/ +out/ +dependency-reduced-pom.xml + +# Eclipse Project Files + +.classpath +.project +.settings/ + +# IntelliJ IDEA Files + +*.iml +*.ipr +*.iws +*.idea +*.log + +README.html + +# HTML resourse Files +#src/main/resources/static/index.html +#src/main/resources/static/view/ + +# temp ignore +src/main/resources/swagger.yaml \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6c60621 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019 剑器近 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..b25fa36 --- /dev/null +++ b/README.md @@ -0,0 +1,239 @@ +部标808协议快速开发包 +==================== +# 项目介绍 +* 基于Netty,实现JT/T 808部标协议的消息分发,与编码解码; +* 与Spring解耦合,协议编码解码和Netty服务均可独立运行(Android客户端同样适用); +* SpringBoot 仅负责将协议暴露至Web接口,目的是方便测试,且为二次开发提供样例; +* 最简洁、清爽、易用的部标开发框架。 + +问题交流群:[906230542] + +# 主要特性 +* 代码足够精简,便于二次开发; +* 致敬Spring、Hibernate设计理念,熟悉Web开发的同学上手极快; +* 使用注解描述协议,告别繁琐的封包、解包; +* 实时兼容2011、2013、2019部标协议版本,支持分包请求; +* 支持JT/T1078音视频协议,T/JSATL12苏标主动安防协议; +* 支持异步批量处理,显著提升MySQL入库性能; +* 提供报文解释器(解析过程分析工具),编码解码不再抓瞎; +* 全覆盖的测试用例,稳定发版。 + +# 代码仓库 + * Gitee仓库地址:[https://gitee.com/yezhihao/jt808-server/tree/master](https://gitee.com/yezhihao/jt808-server/tree/master) + * Github仓库地址:[https://github.com/yezhihao/jt808-server/tree/master](https://github.com/yezhihao/jt808-server/tree/master) + +# 下载方式 + * Gitee下载命令:`git clone https://gitee.com/yezhihao/jt808-server -b master` + * Github下载命令:`git clone https://github.com/yezhihao/jt808-server -b master` + +# 使用说明 + +## 项目分为四部分: + +## 1.framework,核心模块,不推荐修改,有BUG或扩展的需求,建议提交issues或联系作者 +```sh +└── framework + ├── codec 编码解码 + ├── mvc 消息分发、处理 + ├── netty 网络通信 + ├── orm 序列化相关 + └── session 消息发送和会话管理 + ``` +注解: + +* @Endpoint,服务接入点,等价SpringMVC的 @Controller; +* @Mapping,定义消息ID,等价SpringMVC中 @RequestMapping; +* @AsyncBatch, 异步批量消息,对于并发较高的消息,如0x0200(位置信息汇报),使用该注解,显著提升Netty和MySQL入库性能。 + + +* @Message,协议类型,等价Hibernate的 @Table; +* @Field,属性定义,等价Hibernate的 @Column; +* @Fs,多版本协议支持 + +## 2.protocol 部标协议定义,不推荐做大量修改 +```sh +└── protocol + ├── basics 部标协议通用消息头,以及公共的消息定义 + ├── codec 部标编码解码工具 + ├── commons 部标协议ID,工具类等 + ├── jsatl12 T/JSATL12 苏标协议(已完成) + ├── t808 JT/T808 部标协议(已完成) + └── t1078 JT/T1078 音视频协议(已完成) + ``` + 消息定义样例: + ```java +@Message(JT808.定位数据批量上传) +public class T0704 extends AbstractMessage
{ + + private Integer total; + private Integer type; + private List items; + + @Field(index = 0, type = DataType.WORD, desc = "数据项个数") + public Integer getTotal() { return total; } + public void setTotal(Integer total) { this.total = total; } + + @Field(index = 2, type = DataType.BYTE, desc = "位置数据类型 0:正常位置批量汇报,1:盲区补报") + public Integer getType() { return type; } + public void setType(Integer type) { this.type = type; } + + @Field(index = 3, type = DataType.LIST, desc = "位置汇报数据项") + public List getItems() { return items; } + public void setItems(List items) { this.items = items; this.total = items.size(); } +} +``` + +## 3.web 开箱即用的Demo,业务需求在这个包下开发,可随意修改 +```sh +└── web + ├── config spring 相关配置 + ├── component.mybatis 附赠极简的mybatis分页插件:D + ├── endpoint 808消息入口,所有netty进入的请求都会根据@Mapping转发到此 + └── controller service mapper ... 不再赘述 + ``` +##### 消息接入: +```java +@Endpoint +public class JT808Endpoint { + + @Autowired + private LocationService locationService; + + @Autowired + private DeviceService deviceService; + + //异步批量处理 队列大小20000 最大累积200处理一次 最大等待时间5秒 + @AsyncBatch(capacity = 20000, maxElements = 200, maxWait = 5000) + @Mapping(types = 位置信息汇报, desc = "位置信息汇报") + public void 位置信息汇报(List list) { + locationService.batchInsert(list); + } + + @Async + @Mapping(types = 终端注册, desc = "终端注册") + public T8100 register(T0100 message, Session session) { + Header header = message.getHeader(); + + T8100 result = new T8100(session.nextSerialNo(), header.getMobileNo()); + result.setSerialNo(header.getSerialNo()); + + String token = deviceService.register(message); + if (token != null) { + session.register(header); + + result.setResultCode(T8100.Success); + result.setToken(token); + } else { + + result.setResultCode(T8100.NotFoundTerminal); + } + return result; + } +} +``` + +##### 消息下发: +```java +@Controller +@RestController("terminal") +public class TerminalController { + + private MessageManager messageManager = MessageManager.getInstance(); + + @ApiOperation("设置终端参数") + @PostMapping("{terminalId}/parameters") + public T0001 updateParameters(@PathVariable("terminalId") String terminalId, @RequestBody List parameters) { + T8103 request = new T8103(terminalId); + request.setItems(parameters); + T0001 response = messageManager.request(request, T0001.class); + return response; + } +} +``` +##### 已集成Swagger文档,启动后可访问如下地址 + +* Swagger UI:[http://127.0.0.1:8000/swagger-ui.html](http://127.0.0.1:8000/swagger-ui.html) +* Bootstrap UI:[http://127.0.0.1:8000/doc.html](http://127.0.0.1:8000/doc.html) +![Bootstrap UI](https://images.gitee.com/uploads/images/2020/0731/135035_43dfca8e_670717.png "doc2.png") + +## 4.test 808协议全覆盖的测试用例,以及报文解释器 + +* QuickStart 不依赖Spring的启动,可用于Android客户端 +* Beans 测试数据 +* TestBeans 消息对象的封包解包 +* TestHex 原始报文测试 + +* Elucidator 报文解释器 - 解码 +* DarkRepulsor 报文解释器 - 编码 + +分析报文内每个属性所处的位置以及转换后的值,以便查询报文解析出错的原因 + +Elucidator 运行效果如下: +``` +0 [0200] 消息ID: 512 +2 [4061] 消息体属性: 16481 +4 [01] 协议版本号: 1 +5 [00000000017299841738] 终端手机号: 17299841738 +15 [ffff] 流水号: 65535 +0 [00000400] 报警标志: 1024 +4 [00000800] 状态: 2048 +8 [06eeb6ad] 纬度: 116307629 +12 [02633df7] 经度: 40058359 +16 [0138] 海拔: 312 +18 [0003] 速度: 3 +20 [0063] 方向: 99 +22 [200707192359] 时间: 2020-07-07T19:23:59 +0 [01] 附加信息ID: 1 +1 [04] 参数值长度: 4 +2 [0000000b] 参数值: {0,0,0,11} +0 [02] 附加信息ID: 2 +1 [02] 参数值长度: 2 +2 [0016] 参数值: {0,22} +0 [03] 附加信息ID: 3 +1 [02] 参数值长度: 2 +2 [0021] 参数值: {0,33} +0 [04] 附加信息ID: 4 +1 [02] 参数值长度: 2 +2 [002c] 参数值: {0,44} +0 [05] 附加信息ID: 5 +1 [03] 参数值长度: 3 +2 [373737] 参数值: {55,55,55} +0 [11] 附加信息ID: 17 +1 [05] 参数值长度: 5 +2 [4200000042] 参数值: {66,0,0,0,66} +0 [12] 附加信息ID: 18 +1 [06] 参数值长度: 6 +2 [4d0000004d4d] 参数值: {77,0,0,0,77,77} +0 [13] 附加信息ID: 19 +1 [07] 参数值长度: 7 +2 [00000058005858] 参数值: {0,0,0,88,0,88,88} +0 [25] 附加信息ID: 37 +1 [04] 参数值长度: 4 +2 [00000063] 参数值: {0,0,0,99} +0 [2a] 附加信息ID: 42 +1 [02] 参数值长度: 2 +2 [000a] 参数值: {0,10} +0 [2b] 附加信息ID: 43 +1 [04] 参数值长度: 4 +2 [00000014] 参数值: {0,0,0,20} +0 [30] 附加信息ID: 48 +1 [01] 参数值长度: 1 +2 [1e] 参数值: {30} +0 [31] 附加信息ID: 49 +1 [01] 参数值长度: 1 +2 [28] 参数值: {40} +28 [01040000000b02020016030200210402002c05033737371105420000004212064d0000004d4d1307000000580058582504000000632a02000a2b040000001430011e310128] 位置附加信息: [BytesAttribute[id=1,value={0,0,0,11}], BytesAttribute[id=2,value={0,22}], BytesAttribute[id=3,value={0,33}], BytesAttribute[id=4,value={0,44}], BytesAttribute[id=5,value={55,55,55}], BytesAttribute[id=17,value={66,0,0,0,66}], BytesAttribute[id=18,value={77,0,0,0,77,77}], BytesAttribute[id=19,value={0,0,0,88,0,88,88}], BytesAttribute[id=37,value={0,0,0,99}], BytesAttribute[id=42,value={0,10}], BytesAttribute[id=43,value={0,0,0,20}], BytesAttribute[id=48,value={30}], BytesAttribute[id=49,value={40}]] +020040610100000000017299841738ffff000004000000080006eeb6ad02633df701380003006320070719235901040000000b02020016030200210402002c05033737371105420000004212064d0000004d4d1307000000580058582504000000632a02000a2b040000001430011e31012863 +``` + +使用发包工具模拟请求 +``` +7e020040610100000000017299841738ffff000004000000080006eeb6ad02633df701380003006320070719235901040000000b02020016030200210402002c05033737371105420000004212064d0000004d4d1307000000580058582504000000632a02000a2b040000001430011e310128637e +``` +![使用发包工具模拟请求](https://images.gitee.com/uploads/images/2019/0705/162745_9becaf08_670717.png) + +项目会不定期进行更新,建议star和watch一份,您的支持是我最大的动力。 + +如有任何疑问或者BUG,请联系我,非常感谢。 + +技术交流QQ群:[906230542] diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..d9db2d6 --- /dev/null +++ b/pom.xml @@ -0,0 +1,197 @@ + + 4.0.0 + io.github.yezhihao + netmc + 1.0.0.RELEASE + jar + + Netmc + https://github.com/yezhihao/netmc + MVC framework based on netty implementation + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + https://github.com/yezhihao/netmc + https://github.com/yezhihao/netmc.git + + + + + netmc.yezhihao + netmc + zhihao.ye@qq.com + + + + + 1.8 + @ + UTF-8 + UTF-8 + ${java.version} + ${java.version} + true + + + + + junit + junit + 4.12 + test + + + org.slf4j + slf4j-api + 1.7.30 + provided + + + org.apache.commons + commons-lang3 + 3.11 + + + io.netty + netty-handler + 4.1.51.Final + + + com.github.ben-manes.caffeine + caffeine + 2.8.5 + + + org.springframework + spring-context + 5.2.8.RELEASE + compile + + + + + + nexus-release + + true + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + package + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.2.0 + + + package + + jar + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + org.apache.maven.plugins + maven-release-plugin + 2.5.3 + + + maven-deploy-plugin + 2.8.2 + + + default-deploy + deploy + + deploy + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.8 + true + + central-nexus + https://oss.sonatype.org/ + true + + + + org.apache.maven.plugins + maven-scm-plugin + 1.11.2 + + + + + + nexus-release + https://oss.sonatype.org/service/local/staging/deploy/maven2 + + + nexus-snapshot + https://oss.sonatype.org/content/repositories/snapshots + + + + + + + + central + Maven Central + http://maven.aliyun.com/nexus/content/groups/public/ + default + + true + + + + \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/NettyConfig.java b/src/main/java/io/github/yezhihao/netmc/NettyConfig.java new file mode 100644 index 0000000..f668d1b --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/NettyConfig.java @@ -0,0 +1,138 @@ +package io.github.yezhihao.netmc; + +import io.github.yezhihao.netmc.codec.Delimiter; +import io.github.yezhihao.netmc.codec.LengthField; +import io.github.yezhihao.netmc.codec.MessageDecoder; +import io.github.yezhihao.netmc.codec.MessageEncoder; +import io.github.yezhihao.netmc.core.HandlerInterceptor; +import io.github.yezhihao.netmc.core.HandlerMapping; +import io.github.yezhihao.netmc.session.SessionManager; +import io.netty.channel.ChannelInboundHandlerAdapter; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class NettyConfig { + + protected final int port; + protected final int maxFrameLength; + protected final LengthField lengthField; + protected final Delimiter[] delimiter; + protected final MessageDecoder decoder; + protected final MessageEncoder encoder; + protected final ChannelInboundHandlerAdapter adapter; + protected final HandlerMapping handlerMapping; + protected final HandlerInterceptor handlerInterceptor; + protected final SessionManager sessionManager; + + private NettyConfig(int port, + int maxFrameLength, + LengthField lengthField, + Delimiter[] delimiter, + MessageDecoder decoder, + MessageEncoder encoder, + HandlerMapping handlerMapping, + HandlerInterceptor handlerInterceptor, + SessionManager sessionManager + ) { + this.port = port; + this.maxFrameLength = maxFrameLength; + this.lengthField = lengthField; + this.delimiter = delimiter; + this.decoder = decoder; + this.encoder = encoder; + this.handlerMapping = handlerMapping; + this.handlerInterceptor = handlerInterceptor; + this.sessionManager = sessionManager; + this.adapter = new TCPServerHandler(this.handlerMapping, this.handlerInterceptor, this.sessionManager); + } + + public static NettyConfig.Builder custom() { + return new Builder(); + } + + public static class Builder { + + private int port; + private int maxFrameLength; + private LengthField lengthField; + private Delimiter[] delimiters; + private MessageDecoder decoder; + private MessageEncoder encoder; + private HandlerMapping handlerMapping; + private HandlerInterceptor handlerInterceptor; + private SessionManager sessionManager; + + public Builder() { + } + + public Builder setPort(int port) { + this.port = port; + return this; + } + + public Builder setMaxFrameLength(int maxFrameLength) { + this.maxFrameLength = maxFrameLength; + return this; + } + + public Builder setLengthField(LengthField lengthField) { + this.lengthField = lengthField; + return this; + } + + public Builder setDelimiters(byte[][] delimiters) { + Delimiter[] t = new Delimiter[delimiters.length]; + for (int i = 0; i < delimiters.length; i++) { + t[i] = new Delimiter(delimiters[i]); + } + this.delimiters = t; + return this; + } + + public Builder setDelimiters(Delimiter... delimiters) { + this.delimiters = delimiters; + return this; + } + + public Builder setDecoder(MessageDecoder decoder) { + this.decoder = decoder; + return this; + } + + public Builder setEncoder(MessageEncoder encoder) { + this.encoder = encoder; + return this; + } + + public Builder setHandlerMapping(HandlerMapping handlerMapping) { + this.handlerMapping = handlerMapping; + return this; + } + + public Builder setHandlerInterceptor(HandlerInterceptor handlerInterceptor) { + this.handlerInterceptor = handlerInterceptor; + return this; + } + + public Builder setSessionManager(SessionManager sessionManager) { + this.sessionManager = sessionManager; + return this; + } + + public NettyConfig build() { + return new NettyConfig( + this.port, + this.maxFrameLength, + this.lengthField, + this.delimiters, + this.decoder, + this.encoder, + this.handlerMapping, + this.handlerInterceptor, + this.sessionManager + ); + } + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/TCPServer.java b/src/main/java/io/github/yezhihao/netmc/TCPServer.java new file mode 100644 index 0000000..35c2469 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/TCPServer.java @@ -0,0 +1,111 @@ +package io.github.yezhihao.netmc; + +import io.github.yezhihao.netmc.codec.DelimiterBasedFrameDecoder; +import io.github.yezhihao.netmc.codec.LengthFieldAndDelimiterFrameDecoder; +import io.github.yezhihao.netmc.codec.MessageDecoderWrapper; +import io.github.yezhihao.netmc.codec.MessageEncoderWrapper; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioChannelOption; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.NettyRuntime; +import io.netty.util.concurrent.Future; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class TCPServer { + + private static final Logger log = LoggerFactory.getLogger(TCPServer.class); + private volatile boolean isRunning = false; + + private EventLoopGroup bossGroup = null; + private EventLoopGroup workerGroup = null; + + private String name; + private NettyConfig config; + + public TCPServer(String name, NettyConfig config) { + this.name = name; + this.config = config; + } + + private void startInternal() { + try { + this.bossGroup = new NioEventLoopGroup(1); + this.workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors()); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.channel(NioServerSocketChannel.class); + bootstrap.group(bossGroup, workerGroup); + bootstrap.option(NioChannelOption.SO_BACKLOG, 1024) + .option(NioChannelOption.SO_REUSEADDR, true) + .childOption(NioChannelOption.TCP_NODELAY, true) + .childHandler(new ChannelInitializer() { + + private MessageEncoderWrapper messageEncoderWrapper = new MessageEncoderWrapper(config.encoder); + private MessageDecoderWrapper messageDecoderWrapper = new MessageDecoderWrapper(config.decoder); + + @Override + public void initChannel(NioSocketChannel channel) { + channel.pipeline() + .addLast(new IdleStateHandler(4, 0, 0, TimeUnit.MINUTES)) + .addLast("frameDecoder", frameDecoder()) + .addLast("decoder", messageDecoderWrapper) + .addLast("encoder", messageEncoderWrapper) + .addLast("adapter", config.adapter); + } + }); + + ChannelFuture channelFuture = bootstrap.bind(config.port).sync(); + log.warn("==={}启动成功, port={}===", name, config.port); + channelFuture.channel().closeFuture().sync(); + } catch (Exception e) { + log.warn("==={}出现异常, port={}===", e); + } finally { + stop(); + } + } + + public ByteToMessageDecoder frameDecoder() { + if (config.lengthField == null) + return new DelimiterBasedFrameDecoder(config.maxFrameLength, config.delimiter); + return new LengthFieldAndDelimiterFrameDecoder(config.maxFrameLength, config.lengthField, config.delimiter); + } + + public synchronized void start() { + if (this.isRunning) { + log.warn("==={}已经启动, port={}===", name, config.port); + return; + } + this.isRunning = true; + new Thread(() -> startInternal()).start(); + } + + public synchronized void stop() { + if (!this.isRunning) { + log.warn("==={}已经停止, port={}===", name, config.port); + } + this.isRunning = false; + + Future future = this.bossGroup.shutdownGracefully(); + if (!future.isSuccess()) + log.warn("bossGroup 无法正常停止", future.cause()); + + future = this.workerGroup.shutdownGracefully(); + if (!future.isSuccess()) + log.warn("workerGroup 无法正常停止", future.cause()); + + log.warn("==={}已经停止, port={}===", name, config.port); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/TCPServerHandler.java b/src/main/java/io/github/yezhihao/netmc/TCPServerHandler.java new file mode 100644 index 0000000..8a10b96 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/TCPServerHandler.java @@ -0,0 +1,108 @@ +package io.github.yezhihao.netmc; + +import io.github.yezhihao.netmc.core.HandlerInterceptor; +import io.github.yezhihao.netmc.core.HandlerMapping; +import io.github.yezhihao.netmc.core.handler.Handler; +import io.github.yezhihao.netmc.core.model.Message; +import io.github.yezhihao.netmc.session.Session; +import io.github.yezhihao.netmc.session.SessionManager; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +@ChannelHandler.Sharable +public class TCPServerHandler extends ChannelInboundHandlerAdapter { + + private static final Logger log = LoggerFactory.getLogger(TCPServerHandler.class.getSimpleName()); + + private HandlerMapping handlerMapping; + + private HandlerInterceptor interceptor; + + private SessionManager sessionManager; + + public TCPServerHandler(HandlerMapping handlerMapping, HandlerInterceptor interceptor, SessionManager sessionManager) { + this.handlerMapping = handlerMapping; + this.interceptor = interceptor; + this.sessionManager = sessionManager; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (!(msg instanceof Message)) + return; + Message request = (Message) msg; + Message response; + Channel channel = ctx.channel(); + Session session = channel.attr(Session.KEY).get(); + long time = session.access(); + + try { + Handler handler = handlerMapping.getHandler(request.getMessageType()); + if (handler != null) { + if (!interceptor.beforeHandle(request, session)) + return; + + response = handler.invoke(request, session); + if (handler.returnVoid) { + response = interceptor.successful(request, session); + } else { + interceptor.afterHandle(request, response, session); + } + } else { + response = interceptor.notSupported(request, session); + } + } catch (Exception e) { + log.warn(String.valueOf(request), e); + response = interceptor.exceptional(request, session, e); + } + time = System.currentTimeMillis() - time; + if (time > 200) + log.info("=========消息ID{},处理耗时{}ms,", request.getHeader(), time); + if (response != null) + ctx.writeAndFlush(response); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + Session session = sessionManager.newSession(channel); + channel.attr(Session.KEY).set(session); + log.info(">>>>>终端连接{}", session); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + Session session = ctx.channel().attr(Session.KEY).get(); + session.invalidate(); + log.info("<<<<<断开连接{}", session); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) { + Session session = ctx.channel().attr(Session.KEY).get(); + log.info("<<<<<终端异常断开连接" + session); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + IdleState state = event.state(); + if (state == IdleState.READER_IDLE || state == IdleState.WRITER_IDLE) { + Session session = ctx.channel().attr(Session.KEY).get(); + log.warn("<<<<<终端主动断开连接{}", session); + ctx.close(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/codec/Delimiter.java b/src/main/java/io/github/yezhihao/netmc/codec/Delimiter.java new file mode 100644 index 0000000..2d8b1ed --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/codec/Delimiter.java @@ -0,0 +1,23 @@ +package io.github.yezhihao.netmc.codec; + +public class Delimiter { + protected byte[] value; + protected boolean strip; + + public Delimiter(byte[] value) { + this(value, true); + } + + public Delimiter(byte[] value, boolean strip) { + this.value = value; + this.strip = strip; + } + + public byte[] getValue() { + return value; + } + + public boolean isStrip() { + return strip; + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/codec/DelimiterBasedFrameDecoder.java b/src/main/java/io/github/yezhihao/netmc/codec/DelimiterBasedFrameDecoder.java new file mode 100644 index 0000000..0696a4b --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/codec/DelimiterBasedFrameDecoder.java @@ -0,0 +1,122 @@ +package io.github.yezhihao.netmc.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.util.internal.ObjectUtil; +import io.github.yezhihao.netmc.util.ByteBufUtils; + +import java.util.List; + +import static io.netty.util.internal.ObjectUtil.checkPositive; + +public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder { + + private final Delimiter[] delimiters; + private final int maxFrameLength; + private final boolean failFast; + private boolean discardingTooLongFrame; + private int tooLongFrameLength; + + public DelimiterBasedFrameDecoder(int maxFrameLength, Delimiter... delimiters) { + this(maxFrameLength, true, delimiters); + } + + public DelimiterBasedFrameDecoder(int maxFrameLength, boolean failFast, Delimiter... delimiters) { + validateMaxFrameLength(maxFrameLength); + ObjectUtil.checkNonEmpty(delimiters, "delimiters"); + + this.delimiters = delimiters; + this.maxFrameLength = maxFrameLength; + this.failFast = failFast; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + Object decoded = decode(ctx, in); + if (decoded != null) { + out.add(decoded); + } + } + + protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + // Try all delimiters and choose the delimiter which yields the shortest frame. + int minFrameLength = Integer.MAX_VALUE; + Delimiter minDelim = null; + for (Delimiter delim : delimiters) { + int frameLength = ByteBufUtils.indexOf(buffer, delim.value); + if (frameLength >= 0 && frameLength < minFrameLength) { + minFrameLength = frameLength; + minDelim = delim; + } + } + + if (minDelim != null) { + int minDelimLength = minDelim.value.length; + ByteBuf frame = null; + + if (discardingTooLongFrame) { + // We've just finished discarding a very large frame. + // Go back to the initial state. + discardingTooLongFrame = false; + buffer.skipBytes(minFrameLength + minDelimLength); + + int tooLongFrameLength = this.tooLongFrameLength; + this.tooLongFrameLength = 0; + if (!failFast) { + fail(tooLongFrameLength); + } + return null; + } + + if (minFrameLength > maxFrameLength) { + // Discard read frame. + buffer.skipBytes(minFrameLength + minDelimLength); + fail(minFrameLength); + return null; + } + + if (minDelim.strip) { + //忽略长度等于0的报文 + if (minFrameLength != 0) { + frame = buffer.readRetainedSlice(minFrameLength); + } + buffer.skipBytes(minDelimLength); + } else { + frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); + } + + return frame; + } else { + if (!discardingTooLongFrame) { + if (buffer.readableBytes() > maxFrameLength) { + // Discard the content of the buffer until a delimiter is found. + tooLongFrameLength = buffer.readableBytes(); + buffer.skipBytes(buffer.readableBytes()); + discardingTooLongFrame = true; + if (failFast) { + fail(tooLongFrameLength); + } + } + } else { + // Still discarding the buffer since a delimiter is not found. + tooLongFrameLength += buffer.readableBytes(); + buffer.skipBytes(buffer.readableBytes()); + } + return null; + } + } + + private void fail(long frameLength) { + if (frameLength > 0) { + throw new TooLongFrameException("frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded"); + } else { + throw new TooLongFrameException("frame length exceeds " + maxFrameLength + " - discarding"); + } + } + + private static void validateMaxFrameLength(int maxFrameLength) { + checkPositive(maxFrameLength, "maxFrameLength"); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/codec/LengthField.java b/src/main/java/io/github/yezhihao/netmc/codec/LengthField.java new file mode 100644 index 0000000..0f0ebc8 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/codec/LengthField.java @@ -0,0 +1,63 @@ +package io.github.yezhihao.netmc.codec; + +import static io.netty.util.internal.ObjectUtil.checkPositive; +import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; + +public class LengthField { + protected final byte[] prefix; + protected final int lengthFieldMaxFrameLength; + protected final int lengthFieldOffset; + protected final int lengthFieldLength; + protected final int lengthFieldEndOffset; + protected final int lengthAdjustment; + protected final int initialBytesToStrip; + + public LengthField(byte[] prefix, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) { + this(prefix, maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0); + } + + public LengthField(byte[] prefix, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { + checkPositive(maxFrameLength, "maxFrameLength_LengthField"); + checkPositiveOrZero(lengthFieldOffset, "lengthFieldOffset"); + checkPositiveOrZero(initialBytesToStrip, "initialBytesToStrip"); + if (lengthFieldOffset > maxFrameLength - lengthFieldLength) { + throw new IllegalArgumentException("maxFrameLength_LengthField (" + maxFrameLength + ") must be equal to or greater than lengthFieldOffset (" + lengthFieldOffset + ") + lengthFieldLength (" + lengthFieldLength + ")."); + } else { + this.prefix = prefix; + this.lengthFieldMaxFrameLength = maxFrameLength; + this.lengthFieldOffset = lengthFieldOffset; + this.lengthFieldLength = lengthFieldLength; + this.lengthAdjustment = lengthAdjustment; + this.lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength; + this.initialBytesToStrip = initialBytesToStrip; + } + } + + public byte[] getPrefix() { + return prefix; + } + + public int getLengthFieldMaxFrameLength() { + return lengthFieldMaxFrameLength; + } + + public int getLengthFieldOffset() { + return lengthFieldOffset; + } + + public int getLengthFieldLength() { + return lengthFieldLength; + } + + public int getLengthFieldEndOffset() { + return lengthFieldEndOffset; + } + + public int getLengthAdjustment() { + return lengthAdjustment; + } + + public int getInitialBytesToStrip() { + return initialBytesToStrip; + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/codec/LengthFieldAndDelimiterFrameDecoder.java b/src/main/java/io/github/yezhihao/netmc/codec/LengthFieldAndDelimiterFrameDecoder.java new file mode 100644 index 0000000..f43c300 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/codec/LengthFieldAndDelimiterFrameDecoder.java @@ -0,0 +1,178 @@ +package io.github.yezhihao.netmc.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.util.internal.ObjectUtil; +import io.github.yezhihao.netmc.util.ByteBufUtils; + +import java.util.List; + +/** + * @see io.netty.handler.codec.LengthFieldBasedFrameDecoder + */ +public class LengthFieldAndDelimiterFrameDecoder extends DelimiterBasedFrameDecoder { + protected final byte[] prefix; + private final int maxFrameLength; + private final int lengthFieldOffset; + private final int lengthFieldLength; + private final int lengthFieldEndOffset; + private final int lengthAdjustment; + private final int initialBytesToStrip; + private final boolean failFast; + private boolean discardingTooLongFrame; + private int tooLongFrameLength; + private int bytesToDiscard; + + public LengthFieldAndDelimiterFrameDecoder(int maxFrameLength, LengthField lengthField, Delimiter... delimiters) { + this(maxFrameLength, true, lengthField, delimiters); + } + + public LengthFieldAndDelimiterFrameDecoder(int maxFrameLength, boolean failFast, LengthField lengthField, Delimiter... delimiters) { + super(maxFrameLength, failFast, delimiters); + ObjectUtil.checkPositive(maxFrameLength, "delimiterMaxFrameLength"); + ObjectUtil.checkNonEmpty(delimiters, "delimiters"); + + this.prefix = lengthField.prefix; + this.maxFrameLength = lengthField.lengthFieldMaxFrameLength; + this.lengthFieldOffset = lengthField.lengthFieldOffset; + this.lengthFieldLength = lengthField.lengthFieldLength; + this.lengthFieldEndOffset = lengthField.lengthFieldEndOffset; + this.lengthAdjustment = lengthField.lengthAdjustment; + this.initialBytesToStrip = lengthField.initialBytesToStrip; + this.failFast = failFast; + } + + protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (discardingTooLongFrame) { + discardingTooLongFrame(in); + } + + Object decoded; + if (ByteBufUtils.startsWith(in, prefix)) { + decoded = this.decode(ctx, in); + } else { + decoded = super.decode(ctx, in); + } + if (decoded != null) { + out.add(decoded); + } + + } + + private void discardingTooLongFrame(ByteBuf in) { + int bytesToDiscard = this.bytesToDiscard; + int localBytesToDiscard = Math.min(bytesToDiscard, in.readableBytes()); + in.skipBytes(localBytesToDiscard); + bytesToDiscard -= localBytesToDiscard; + this.bytesToDiscard = bytesToDiscard; + this.failIfNecessary(false); + } + + private static void failOnNegativeLengthField(ByteBuf in, int frameLength, int lengthFieldEndOffset) { + in.skipBytes(lengthFieldEndOffset); + throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength); + } + + private static void failOnFrameLengthLessThanLengthFieldEndOffset(ByteBuf in, int frameLength, int lengthFieldEndOffset) { + in.skipBytes(lengthFieldEndOffset); + throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less than lengthFieldEndOffset: " + lengthFieldEndOffset); + } + + private void exceededFrameLength(ByteBuf in, int frameLength) { + int discard = frameLength - in.readableBytes(); + this.tooLongFrameLength = frameLength; + if (discard < 0) { + in.skipBytes(frameLength); + } else { + this.discardingTooLongFrame = true; + this.bytesToDiscard = discard; + in.skipBytes(in.readableBytes()); + } + + this.failIfNecessary(true); + } + + private static void failOnFrameLengthLessThanInitialBytesToStrip(ByteBuf in, int frameLength, int initialBytesToStrip) { + in.skipBytes(frameLength); + throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less than initialBytesToStrip: " + initialBytesToStrip); + } + + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + if (in.readableBytes() < this.lengthFieldEndOffset) { + return null; + } else { + int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset; + int frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength); + if (frameLength < 0) { + failOnNegativeLengthField(in, frameLength, this.lengthFieldEndOffset); + } + + frameLength += this.lengthAdjustment + this.lengthFieldEndOffset; + if (frameLength < this.lengthFieldEndOffset) { + failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, this.lengthFieldEndOffset); + } + + if (frameLength > this.maxFrameLength) { + this.exceededFrameLength(in, frameLength); + return null; + } else { + if (in.readableBytes() < frameLength) { + return null; + } else { + if (this.initialBytesToStrip > frameLength) { + failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip); + } + + in.skipBytes(this.initialBytesToStrip); + int readerIndex = in.readerIndex(); + int actualFrameLength = frameLength - this.initialBytesToStrip; + ByteBuf frame = in.retainedSlice(readerIndex, actualFrameLength); + in.readerIndex(readerIndex + actualFrameLength); + return frame; + } + } + } + } + + protected int getUnadjustedFrameLength(ByteBuf buf, int offset, int length) { + int frameLength; + switch (length) { + case 2: + frameLength = buf.getUnsignedShort(offset); + break; + case 3: + frameLength = buf.getUnsignedMedium(offset); + break; + case 4: + frameLength = buf.getInt(offset); + break; + default: + throw new DecoderException("unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 2, 3, 4)"); + } + return frameLength; + } + + private void failIfNecessary(boolean firstDetectionOfTooLongFrame) { + if (this.bytesToDiscard == 0) { + int tooLongFrameLength = this.tooLongFrameLength; + this.tooLongFrameLength = 0; + this.discardingTooLongFrame = false; + if (!this.failFast || firstDetectionOfTooLongFrame) { + this.fail(tooLongFrameLength); + } + } else if (this.failFast && firstDetectionOfTooLongFrame) { + this.fail(this.tooLongFrameLength); + } + } + + private void fail(long frameLength) { + if (frameLength > 0) { + throw new TooLongFrameException("Adjusted frame length exceeds " + this.maxFrameLength + ": " + frameLength + " - discarded"); + } else { + throw new TooLongFrameException("Adjusted frame length exceeds " + this.maxFrameLength + " - discarding"); + } + } +} diff --git a/src/main/java/io/github/yezhihao/netmc/codec/MessageDecoder.java b/src/main/java/io/github/yezhihao/netmc/codec/MessageDecoder.java new file mode 100644 index 0000000..9d9d773 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/codec/MessageDecoder.java @@ -0,0 +1,17 @@ +package io.github.yezhihao.netmc.codec; + +import io.netty.buffer.ByteBuf; +import io.github.yezhihao.netmc.session.Session; + +/** + * 基础消息解码 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public interface MessageDecoder { + + T decode(ByteBuf buf); + + T decode(ByteBuf buf, Session session); + +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/codec/MessageDecoderWrapper.java b/src/main/java/io/github/yezhihao/netmc/codec/MessageDecoderWrapper.java new file mode 100644 index 0000000..c7a7fee --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/codec/MessageDecoderWrapper.java @@ -0,0 +1,55 @@ +package io.github.yezhihao.netmc.codec; + +import io.github.yezhihao.netmc.session.Session; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.DecoderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 基础消息解码 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +@ChannelHandler.Sharable +public class MessageDecoderWrapper extends ChannelInboundHandlerAdapter { + + private static final Logger log = LoggerFactory.getLogger(MessageDecoderWrapper.class.getSimpleName()); + + private MessageDecoder decoder; + + public MessageDecoderWrapper(MessageDecoder decoder) { + this.decoder = decoder; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + try { + if (log.isInfoEnabled()) { + String hex; + if (buf.readableBytes() < 1048) + hex = ByteBufUtil.hexDump(buf); + else + hex = ByteBufUtil.hexDump(buf.slice(0, 32)) + "..." + ByteBufUtil.hexDump(buf.slice(buf.readableBytes() - 32, 32)); + log.info(">>>>>原始报文[ip={}],hex={}", ctx.channel().remoteAddress(), hex); + } + Object message = decoder.decode(buf, ctx.channel().attr(Session.KEY).get()); + if (message != null) + ctx.fireChannelRead(message); + buf.skipBytes(buf.readableBytes()); + } catch (Exception e) { + throw new DecoderException(e); + } finally { + buf.release(); + } + } else { + ctx.fireChannelRead(msg); + } + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/codec/MessageEncoder.java b/src/main/java/io/github/yezhihao/netmc/codec/MessageEncoder.java new file mode 100644 index 0000000..01caa2c --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/codec/MessageEncoder.java @@ -0,0 +1,14 @@ +package io.github.yezhihao.netmc.codec; + +import io.netty.buffer.ByteBuf; + +/** + * 基础消息编码 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public interface MessageEncoder { + + ByteBuf encode(T message); + +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/codec/MessageEncoderWrapper.java b/src/main/java/io/github/yezhihao/netmc/codec/MessageEncoderWrapper.java new file mode 100644 index 0000000..d1bebab --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/codec/MessageEncoderWrapper.java @@ -0,0 +1,55 @@ +package io.github.yezhihao.netmc.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.EncoderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 基础消息编码 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +@ChannelHandler.Sharable +public class MessageEncoderWrapper extends ChannelOutboundHandlerAdapter { + + private static final Logger log = LoggerFactory.getLogger(MessageEncoderWrapper.class.getSimpleName()); + + private MessageEncoder encoder; + + public MessageEncoderWrapper(MessageEncoder encoder) { + this.encoder = encoder; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + ByteBuf buf = null; + try { + buf = encoder.encode(msg); + if (log.isInfoEnabled()) + log.info("<<<<<原始报文[ip={}],hex={}", ctx.channel().remoteAddress(), ByteBufUtil.hexDump(buf)); + + if (buf.isReadable()) { + ctx.write(buf, promise); + } else { + buf.release(); + ctx.write(Unpooled.EMPTY_BUFFER, promise); + } + buf = null; + } catch (EncoderException e) { + throw e; + } catch (Throwable e) { + throw new EncoderException(e); + } finally { + if (buf != null) { + buf.release(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/AbstractHandlerMapping.java b/src/main/java/io/github/yezhihao/netmc/core/AbstractHandlerMapping.java new file mode 100644 index 0000000..0078204 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/AbstractHandlerMapping.java @@ -0,0 +1,62 @@ +package io.github.yezhihao.netmc.core; + +import io.github.yezhihao.netmc.core.annotation.AsyncBatch; +import io.github.yezhihao.netmc.core.annotation.Mapping; +import io.github.yezhihao.netmc.core.handler.AsyncBatchHandler; +import io.github.yezhihao.netmc.core.handler.Handler; +import io.github.yezhihao.netmc.core.handler.SimpleHandler; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +/** + * 消息处理映射 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public abstract class AbstractHandlerMapping implements HandlerMapping { + + private final Map handlerMap = new HashMap(60); + + /** + * 将Endpoint中被@Mapping标记的方法注册到映射表 + */ + protected synchronized void registerHandlers(Object bean) { + Class beanClass = bean.getClass(); + Method[] methods = beanClass.getDeclaredMethods(); + if (methods == null) + return; + + for (Method method : methods) { + + Mapping mapping = method.getAnnotation(Mapping.class); + if (mapping != null) { + + String desc = mapping.desc(); + int[] types = mapping.types(); + + AsyncBatch asyncBatch = method.getAnnotation(AsyncBatch.class); + Handler handler; + + if (asyncBatch != null) { + handler = new AsyncBatchHandler(bean, method, desc, asyncBatch.poolSize(), asyncBatch.maxElements(), asyncBatch.maxWait()); + + } else { + handler = new SimpleHandler(bean, method, desc); + } + + for (int type : types) { + handlerMap.put(type, handler); + } + } + } + } + + /** + * 根据消息类型获取Handler + */ + public Handler getHandler(Object messageType) { + return handlerMap.get(messageType); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/DefaultHandlerMapping.java b/src/main/java/io/github/yezhihao/netmc/core/DefaultHandlerMapping.java new file mode 100644 index 0000000..b6d8ee0 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/DefaultHandlerMapping.java @@ -0,0 +1,26 @@ +package io.github.yezhihao.netmc.core; + +import io.github.yezhihao.netmc.core.annotation.Endpoint; +import io.github.yezhihao.netmc.util.ClassUtils; + +import java.util.List; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class DefaultHandlerMapping extends AbstractHandlerMapping { + + public DefaultHandlerMapping(String endpointPackage) { + List> endpointClasses = ClassUtils.getClassList(endpointPackage, Endpoint.class); + + for (Class endpointClass : endpointClasses) { + try { + Object bean = endpointClass.newInstance(); + super.registerHandlers(bean); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/HandlerInterceptor.java b/src/main/java/io/github/yezhihao/netmc/core/HandlerInterceptor.java new file mode 100644 index 0000000..c548c38 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/HandlerInterceptor.java @@ -0,0 +1,27 @@ +package io.github.yezhihao.netmc.core; + +import io.github.yezhihao.netmc.core.model.Header; +import io.github.yezhihao.netmc.core.model.Message; +import io.github.yezhihao.netmc.session.Session; + +/** + * 消息拦截器 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public interface HandlerInterceptor> { + /** 未找到对应的Handle */ + T notSupported(T request, Session session); + + /** 调用之前 */ + boolean beforeHandle(T request, Session session); + + /** 调用之后,返回值为void的 */ + T successful(T request, Session session); + + /** 调用之后,有返回值的 */ + void afterHandle(T request, T response, Session session); + + /** 调用之后抛出异常的 */ + T exceptional(T request, Session session, Exception e); +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/HandlerMapping.java b/src/main/java/io/github/yezhihao/netmc/core/HandlerMapping.java new file mode 100644 index 0000000..1f135fc --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/HandlerMapping.java @@ -0,0 +1,14 @@ +package io.github.yezhihao.netmc.core; + +import io.github.yezhihao.netmc.core.handler.Handler; + +/** + * 消息映射接口 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public interface HandlerMapping { + + Handler getHandler(Object messageType); + +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/SpringHandlerMapping.java b/src/main/java/io/github/yezhihao/netmc/core/SpringHandlerMapping.java new file mode 100644 index 0000000..19bfee4 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/SpringHandlerMapping.java @@ -0,0 +1,23 @@ +package io.github.yezhihao.netmc.core; + +import io.github.yezhihao.netmc.core.annotation.Endpoint; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +import java.util.Map; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class SpringHandlerMapping extends AbstractHandlerMapping implements ApplicationContextAware { + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + Map endpoints = applicationContext.getBeansWithAnnotation(Endpoint.class); + for (Object bean : endpoints.values()) { + super.registerHandlers(bean); + } + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/annotation/AsyncBatch.java b/src/main/java/io/github/yezhihao/netmc/core/annotation/AsyncBatch.java new file mode 100644 index 0000000..4f02339 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/annotation/AsyncBatch.java @@ -0,0 +1,24 @@ +package io.github.yezhihao.netmc.core.annotation; + +import java.lang.annotation.*; + +/** + * 异步批处理消息注解 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface AsyncBatch { + + //线程数量 + int poolSize() default 2; + + //最大累计消息数 + int maxElements() default 400; + + //最大等待时间 + int maxWait() default 1000; + +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/annotation/Endpoint.java b/src/main/java/io/github/yezhihao/netmc/core/annotation/Endpoint.java new file mode 100644 index 0000000..1e00b07 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/annotation/Endpoint.java @@ -0,0 +1,15 @@ +package io.github.yezhihao.netmc.core.annotation; + +import java.lang.annotation.*; + +/** + * 消息接入点 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface Endpoint { + +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/annotation/Mapping.java b/src/main/java/io/github/yezhihao/netmc/core/annotation/Mapping.java new file mode 100644 index 0000000..71967db --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/annotation/Mapping.java @@ -0,0 +1,19 @@ +package io.github.yezhihao.netmc.core.annotation; + +import java.lang.annotation.*; + +/** + * 消息类型映射 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface Mapping { + + int[] types(); + + String desc() default ""; + +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/handler/AsyncBatchHandler.java b/src/main/java/io/github/yezhihao/netmc/core/handler/AsyncBatchHandler.java new file mode 100644 index 0000000..2bfbb4f --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/handler/AsyncBatchHandler.java @@ -0,0 +1,118 @@ +package io.github.yezhihao.netmc.core.handler; + +import io.github.yezhihao.netmc.core.model.Message; +import io.github.yezhihao.netmc.session.Session; +import io.github.yezhihao.netmc.util.VirtualList; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 异步批量处理 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class AsyncBatchHandler extends Handler { + + private static final Logger log = LoggerFactory.getLogger(AsyncBatchHandler.class.getSimpleName()); + + private ConcurrentLinkedQueue queue; + + private ThreadPoolExecutor executor; + + private int poolSize; + + private int maxElements; + + private int maxWait; + + private int warningLines; + + public AsyncBatchHandler(Object actionClass, Method actionMethod, String desc, int poolSize, int maxElements, int maxWait) { + super(actionClass, actionMethod, desc); + + Class[] parameterTypes = actionMethod.getParameterTypes(); + if (parameterTypes.length > 1) + throw new RuntimeException("@AsyncBatch方法仅支持一个List参数:" + actionMethod); + if (!parameterTypes[0].isAssignableFrom(List.class)) + throw new RuntimeException("@AsyncBatch方法的参数不是List类型:" + actionMethod); + + this.poolSize = poolSize; + this.maxElements = maxElements; + this.maxWait = maxWait; + this.warningLines = maxElements * poolSize * 50; + + this.queue = new ConcurrentLinkedQueue(); + this.executor = new ThreadPoolExecutor(this.poolSize, this.poolSize, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(400), + new BasicThreadFactory.Builder().namingPattern(actionMethod.getName() + "-pool-%d").build()); + + for (int i = 0; i < poolSize; i++) { + boolean master = i == 0; + executor.execute(() -> { + try { + startInternal(master); + } catch (Exception e) { + log.error("批处理线程出错", e); + } + }); + } + } + + public Message invoke(Message request, Session session) { + queue.offer(request); + return null; + } + + public void startInternal(boolean master) { + Message[] array = new Message[maxElements]; + long logtime = 0; + long starttime = 0; + + for (; ; ) { + Message temp; + int i = 0; + while ((temp = queue.poll()) != null) { + array[i++] = temp; + if (i >= maxElements) + break; + } + + if (i > 0) { + starttime = System.currentTimeMillis(); + try { + targetMethod.invoke(targetObject, new VirtualList<>(array, i)); + } catch (Exception e) { + log.warn(targetMethod.getName(), e); + } + long time = System.currentTimeMillis() - starttime; + if (time > 1000L) + log.warn("批处理耗时:{}ms,共{}条记录", time, i); + } + + if (i < maxElements) { + try { + for (int j = 0; j < i; j++) + array[j] = null; + Thread.sleep(maxWait); + } catch (InterruptedException e) { + } + } else if (master) { + if (logtime < starttime) { + logtime = starttime + 5000L; + + int size = queue.size(); + if (size > warningLines) { + log.warn("批处理队列繁忙, size:{}", size); + } + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/handler/Handler.java b/src/main/java/io/github/yezhihao/netmc/core/handler/Handler.java new file mode 100644 index 0000000..d775ca5 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/handler/Handler.java @@ -0,0 +1,82 @@ +package io.github.yezhihao.netmc.core.handler; + +import io.github.yezhihao.netmc.core.model.Header; +import io.github.yezhihao.netmc.core.model.Message; +import io.github.yezhihao.netmc.session.Session; +import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; + +import java.lang.reflect.Method; +import java.lang.reflect.Type; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +@SuppressWarnings("unchecked") +public abstract class Handler { + + public static final int MESSAGE = 0; + public static final int SESSION = 1; + public static final int HEADER = 2; + + public final Object targetObject; + public final Method targetMethod; + public final int[] parameterTypes; + public final boolean returnVoid; + public final String desc; + + public Handler(Object targetObject, Method targetMethod, String desc) { + this.targetObject = targetObject; + this.targetMethod = targetMethod; + this.returnVoid = targetMethod.getReturnType().isAssignableFrom(Void.TYPE); + this.desc = desc; + + Type[] types = targetMethod.getGenericParameterTypes(); + int[] parameterTypes = new int[types.length]; + try { + for (int i = 0; i < types.length; i++) { + Type type = types[i]; + Class clazz; + if (type instanceof ParameterizedTypeImpl) + clazz = (Class) ((ParameterizedTypeImpl) type).getActualTypeArguments()[0]; + else + clazz = (Class) type; + + if (Message.class.isAssignableFrom(clazz)) + parameterTypes[i] = MESSAGE; + else if (Header.class.isAssignableFrom(clazz)) + parameterTypes[i] = HEADER; + else if (Session.class.isAssignableFrom(clazz)) + parameterTypes[i] = SESSION; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + this.parameterTypes = parameterTypes; + } + + public T invoke(T request, Session session) throws Exception { + Object[] args = new Object[parameterTypes.length]; + + for (int i = 0; i < parameterTypes.length; i++) { + int type = parameterTypes[i]; + switch (type) { + case Handler.MESSAGE: + args[i] = request; + break; + case Handler.SESSION: + args[i] = session; + break; + case Handler.HEADER: + args[i] = request.getHeader(); + break; + } + } + return (T) targetMethod.invoke(targetObject, args); + } + + @Override + public String toString() { + return desc; + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/handler/SimpleHandler.java b/src/main/java/io/github/yezhihao/netmc/core/handler/SimpleHandler.java new file mode 100644 index 0000000..7b0c106 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/handler/SimpleHandler.java @@ -0,0 +1,22 @@ +package io.github.yezhihao.netmc.core.handler; + +import io.github.yezhihao.netmc.core.model.Message; +import io.github.yezhihao.netmc.session.Session; + +import java.lang.reflect.Method; + +/** + * 同步处理 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class SimpleHandler extends Handler { + + public SimpleHandler(Object actionClass, Method actionMethod, String desc) { + super(actionClass, actionMethod, desc); + } + + public Message invoke(Message request, Session session) throws Exception { + return super.invoke(request, session); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/model/Header.java b/src/main/java/io/github/yezhihao/netmc/core/model/Header.java new file mode 100644 index 0000000..f48bad7 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/model/Header.java @@ -0,0 +1,19 @@ +package io.github.yezhihao.netmc.core.model; + +import java.io.Serializable; + +/** + * 消息头 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public interface Header extends Serializable { + + /** 客户端唯一标识 */ + T getClientId(); + + /** 消息流水号 */ + int getSerialNo(); + + void setSerialNo(int serialNo); +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/model/Message.java b/src/main/java/io/github/yezhihao/netmc/core/model/Message.java new file mode 100644 index 0000000..824b221 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/model/Message.java @@ -0,0 +1,15 @@ +package io.github.yezhihao.netmc.core.model; + +import java.io.Serializable; + +/** + * 消息体 + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public interface Message extends Serializable { + + T getHeader(); + + Object getMessageType(); +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/core/model/Response.java b/src/main/java/io/github/yezhihao/netmc/core/model/Response.java new file mode 100644 index 0000000..ca8e115 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/core/model/Response.java @@ -0,0 +1,12 @@ +package io.github.yezhihao.netmc.core.model; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public interface Response { + + /** 应答消息流水号 */ + int getSerialNo(); + +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/session/MessageManager.java b/src/main/java/io/github/yezhihao/netmc/session/MessageManager.java new file mode 100644 index 0000000..32b92c9 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/session/MessageManager.java @@ -0,0 +1,126 @@ +package io.github.yezhihao.netmc.session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.github.yezhihao.netmc.core.model.Header; +import io.github.yezhihao.netmc.core.model.Message; +import io.github.yezhihao.netmc.core.model.Response; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class MessageManager { + + private static final Logger log = LoggerFactory.getLogger(MessageManager.class.getSimpleName()); + + private Map topicSubscribers = new ConcurrentHashMap<>(); + + private SessionManager sessionManager; + + public MessageManager(SessionManager sessionManager) { + this.sessionManager = sessionManager; + } + + /** + * 发送通知类消息,不接收响应 + */ + public boolean notify(Message message) { + Header header = message.getHeader(); + Object clientId = header.getClientId(); + + Session session = sessionManager.get(clientId); + if (session == null) { + log.info("<<<<<<<<<<消息发送失败,未注册,{}", message); + return false; + } + + header.setSerialNo(session.nextSerialNo()); + session.writeObject(message); + return true; + } + + /** + * 发送同步消息,接收响应 + * 默认超时时间20秒 + */ + public T request(Message request, Class responseClass) { + return request(request, responseClass, 20000); + } + + public T request(Message request, Class responseClass, long timeout) { + Header header = request.getHeader(); + Object clientId = header.getClientId(); + + Session session = sessionManager.get(clientId); + if (session == null) { + log.info("<<<<<<<<<<消息发送失败,未注册,{}", request); + return null; + } + + header.setSerialNo(session.nextSerialNo()); + + String key = requestKey(header, responseClass); + SynchronousQueue syncQueue = this.subscribe(key); + if (syncQueue == null) { + log.info("<<<<<<<<<<请勿重复发送,{}", request); + } + + try { + session.writeObject(request); + return (T) syncQueue.poll(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warn("<<<<<<<<<<等待响应超时" + session, e); + } finally { + this.unsubscribe(key); + } + return null; + } + + /** + * 消息响应 + */ + public boolean response(Message message) { + SynchronousQueue queue = topicSubscribers.get(responseKey(message)); + if (queue != null) + return queue.offer(message); + return false; + } + + private SynchronousQueue subscribe(String key) { + SynchronousQueue queue = null; + if (!topicSubscribers.containsKey(key)) + topicSubscribers.put(key, queue = new SynchronousQueue()); + return queue; + } + + private void unsubscribe(String key) { + topicSubscribers.remove(key); + } + + private static String requestKey(Header header, Class responseClass) { + StringBuilder key = new StringBuilder(13 + 1 + 27 + 1 + 5); + key.append(header.getClientId()).append('/').append(responseClass.getName()); + + if (Response.class.isAssignableFrom(responseClass)) + key.append('/').append(header.getSerialNo()); + return key.toString(); + } + + private static String responseKey(Message response) { + Class responseClass = response.getClass(); + Header header = response.getHeader(); + + StringBuilder key = new StringBuilder(13 + 1 + 27 + 1 + 5); + key.append(header.getClientId()).append('/').append(responseClass.getName()); + + if (response instanceof Response) + key.append('/').append(((Response) response).getSerialNo()); + return key.toString(); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/session/Session.java b/src/main/java/io/github/yezhihao/netmc/session/Session.java new file mode 100644 index 0000000..fc0fb2e --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/session/Session.java @@ -0,0 +1,180 @@ +package io.github.yezhihao.netmc.session; + +import io.netty.channel.Channel; +import io.netty.util.AttributeKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.github.yezhihao.netmc.core.model.Header; + +import java.util.Collection; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class Session { + + private static final Logger log = LoggerFactory.getLogger(Session.class.getSimpleName()); + + public static final AttributeKey KEY = AttributeKey.newInstance(Session.class.getName()); + + protected final Channel channel; + + private AtomicInteger serialNo = new AtomicInteger(0); + private boolean registered = false; + private Object clientId; + + private final long creationTime; + private volatile long lastAccessedTime; + private Map attributes; + private Object subject; + private Object snapshot; + private Integer protocolVersion; + + private SessionManager sessionManager; + + protected Session(Channel channel, SessionManager sessionManager) { + this.channel = channel; + this.sessionManager = sessionManager; + this.creationTime = System.currentTimeMillis(); + this.lastAccessedTime = creationTime; + this.attributes = new TreeMap<>(); + } + + public void writeObject(Object message) { + log.info("<<<<<<<<<<消息下发{},{}", this, message); + channel.writeAndFlush(message); + } + + public int getId() { + return channel.id().hashCode(); + } + + public int nextSerialNo() { + int current; + int next; + do { + current = serialNo.get(); + next = current > 0xffff ? 0 : current; + } while (!serialNo.compareAndSet(current, next + 1)); + return next; + } + + public boolean isRegistered() { + return registered; + } + + /** + * 注册到SessionManager + */ + public void register(Header header) { + this.register(header, null); + } + + public void register(Header header, Object subject) { + this.clientId = header.getClientId(); + this.registered = true; + this.subject = subject; + sessionManager.put(clientId, this); + } + + public Object getClientId() { + return clientId; + } + + + public long getCreationTime() { + return creationTime; + } + + public long getLastAccessedTime() { + return lastAccessedTime; + } + + public long access() { + lastAccessedTime = System.currentTimeMillis(); + return lastAccessedTime; + } + + public Collection getAttributeNames() { + return attributes.keySet(); + } + + public Object getAttribute(String name) { + return attributes.get(name); + } + + public void setAttribute(String name, Object value) { + attributes.put(name, value); + } + + public Object removeAttribute(String name) { + return attributes.remove(name); + } + + public Object getSubject() { + return subject; + } + + public void setSubject(Object subject) { + this.subject = subject; + } + + public Object getSnapshot() { + return snapshot; + } + + public void setSnapshot(Object snapshot) { + this.snapshot = snapshot; + } + + public Integer getProtocolVersion() { + return protocolVersion; + } + + public void setProtocolVersion(int protocolVersion) { + this.protocolVersion = protocolVersion; + } + + public Integer cachedProtocolVersion(Object clientId) { + return this.sessionManager.getVersion(clientId); + } + + public void recordProtocolVersion(Object clientId, int protocolVersion) { + this.protocolVersion = protocolVersion; + this.sessionManager.putVersion(clientId, protocolVersion); + } + + public void invalidate() { + channel.close(); + sessionManager.callSessionDestroyedListener(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Session that = (Session) o; + return this.getId() == that.getId(); + } + + @Override + public int hashCode() { + return getId(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(66); + sb.append("[ip=").append(channel.remoteAddress()); + sb.append(", cid=").append(clientId); + sb.append(", reg=").append(registered); + sb.append(']'); + return sb.toString(); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/session/SessionListener.java b/src/main/java/io/github/yezhihao/netmc/session/SessionListener.java new file mode 100644 index 0000000..0f1e082 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/session/SessionListener.java @@ -0,0 +1,13 @@ +package io.github.yezhihao.netmc.session; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public interface SessionListener { + default void sessionCreated(Session session) { + } + + default void sessionDestroyed(Session session) { + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/session/SessionManager.java b/src/main/java/io/github/yezhihao/netmc/session/SessionManager.java new file mode 100644 index 0000000..4f159ed --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/session/SessionManager.java @@ -0,0 +1,81 @@ +package io.github.yezhihao.netmc.session; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class SessionManager { + + private Map sessionMap; + + private Cache versionCache; + + private ChannelFutureListener remover; + + private SessionListener sessionListener; + + public SessionManager() { + this.sessionMap = new ConcurrentHashMap<>(); + this.versionCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); + this.remover = future -> { + Session session = future.channel().attr(Session.KEY).get(); + if (session != null) { + sessionMap.remove(session.getClientId(), session); + } + }; + } + + public SessionManager(SessionListener sessionListener) { + this(); + this.sessionListener = sessionListener; + } + + public Session newSession(Channel channel) { + Session session = new Session(channel, this); + callSessionCreatedListener(session); + return session; + } + + protected void callSessionDestroyedListener(Session session) { + if (sessionListener != null) + sessionListener.sessionDestroyed(session); + } + + protected void callSessionCreatedListener(Session session) { + if (sessionListener != null) + sessionListener.sessionCreated(session); + } + + public Session get(Object clientId) { + return sessionMap.get(clientId); + } + + public Collection all() { + return sessionMap.values(); + } + + protected void put(Object clientId, Session newSession) { + Session oldSession = sessionMap.put(clientId, newSession); + if (!newSession.equals(oldSession)) { + newSession.channel.closeFuture().addListener(remover); + } + } + + public void putVersion(Object clientId, int version) { + versionCache.put(clientId, version); + } + + public Integer getVersion(Object clientId) { + return versionCache.getIfPresent(clientId); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/util/AdapterList.java b/src/main/java/io/github/yezhihao/netmc/util/AdapterList.java new file mode 100644 index 0000000..cf57773 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/util/AdapterList.java @@ -0,0 +1,26 @@ +package io.github.yezhihao.netmc.util; + +import java.util.AbstractList; +import java.util.List; +import java.util.function.Function; + +public final class AdapterList extends AbstractList { + + private final List src; + private final Function function; + + public AdapterList(List src, Function function) { + this.src = src; + this.function = function; + } + + @Override + public T get(int index) { + return function.apply(src.get(index)); + } + + @Override + public int size() { + return src.size(); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/util/ByteBufUtils.java b/src/main/java/io/github/yezhihao/netmc/util/ByteBufUtils.java new file mode 100644 index 0000000..7ee7224 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/util/ByteBufUtils.java @@ -0,0 +1,45 @@ +package io.github.yezhihao.netmc.util; + +import io.netty.buffer.ByteBuf; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class ByteBufUtils { + + /** + * Returns the number of bytes between the readerIndex of the haystack and + * the first needle found in the haystack. -1 is returned if no needle is + * found in the haystack. + */ + public static int indexOf(ByteBuf haystack, byte[] needle) { + for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) { + int haystackIndex = i; + int needleIndex; + for (needleIndex = 0; needleIndex < needle.length; needleIndex++) { + if (haystack.getByte(haystackIndex) != needle[needleIndex]) { + break; + } else { + haystackIndex++; + if (haystackIndex == haystack.writerIndex() && needleIndex != needle.length - 1) { + return -1; + } + } + } + + if (needleIndex == needle.length) { + // Found the needle from the haystack! + return i - haystack.readerIndex(); + } + } + return -1; + } + + public static boolean startsWith(ByteBuf haystack, byte[] prefix) { + for (int i = 0, j = haystack.readerIndex(); i < prefix.length; ) + if (prefix[i++] != haystack.getByte(j++)) + return false; + return true; + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/util/ClassUtils.java b/src/main/java/io/github/yezhihao/netmc/util/ClassUtils.java new file mode 100644 index 0000000..6cf28e7 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/util/ClassUtils.java @@ -0,0 +1,118 @@ +package io.github.yezhihao.netmc.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.lang.annotation.Annotation; +import java.net.JarURLConnection; +import java.net.URL; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class ClassUtils { + + private static final Logger log = LoggerFactory.getLogger(ClassUtils.class.getSimpleName()); + + public static List> getClassList(String packageName, Class annotationClass) { + List> classList = getClassList(packageName); + Iterator> iterator = classList.iterator(); + while (iterator.hasNext()) { + Class next = iterator.next(); + if (!next.isAnnotationPresent(annotationClass)) + iterator.remove(); + } + return classList; + } + + public static List> getClassList(String packageName) { + List> classList = new LinkedList(); + String path = packageName.replace(".", "/"); + try { + Enumeration urls = ClassUtils.getClassLoader().getResources(path); + while (urls.hasMoreElements()) { + URL url = urls.nextElement(); + + if (url != null) { + String protocol = url.getProtocol(); + + if (protocol.equals("file")) { + addClass(classList, url.toURI().getPath(), packageName); + + } else if (protocol.equals("jar")) { + JarURLConnection jarURLConnection = (JarURLConnection) url.openConnection(); + JarFile jarFile = jarURLConnection.getJarFile(); + + Enumeration jarEntries = jarFile.entries(); + while (jarEntries.hasMoreElements()) { + + JarEntry jarEntry = jarEntries.nextElement(); + String entryName = jarEntry.getName(); + + if (entryName.startsWith(path) && entryName.endsWith(".class")) { + String className = entryName.substring(0, entryName.lastIndexOf(".")).replaceAll("/", "."); + addClass(classList, className); + } + } + } + } + } + } catch (Exception e) { + log.error("获取类出错!", e); + } + return classList; + } + + private static void addClass(List> classList, String packagePath, String packageName) { + try { + File[] files = new File(packagePath).listFiles(file -> (file.isDirectory() || file.getName().endsWith(".class"))); + if (files != null) + for (File file : files) { + String fileName = file.getName(); + if (file.isFile()) { + String className = fileName.substring(0, fileName.lastIndexOf(".")); + if (packageName != null) { + className = packageName + "." + className; + } + addClass(classList, className); + } else { + String subPackagePath = fileName; + if (packageName != null) { + subPackagePath = packagePath + "/" + subPackagePath; + } + String subPackageName = fileName; + if (packageName != null) { + subPackageName = packageName + "." + subPackageName; + } + addClass(classList, subPackagePath, subPackageName); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void addClass(List> classList, String className) { + classList.add(loadClass(className, false)); + } + + public static Class loadClass(String className, boolean isInitialized) { + try { + return Class.forName(className, isInitialized, getClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + public static ClassLoader getClassLoader() { + return Thread.currentThread().getContextClassLoader(); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/util/VirtualList.java b/src/main/java/io/github/yezhihao/netmc/util/VirtualList.java new file mode 100644 index 0000000..a0cb062 --- /dev/null +++ b/src/main/java/io/github/yezhihao/netmc/util/VirtualList.java @@ -0,0 +1,102 @@ +package io.github.yezhihao.netmc.util; + +import java.io.Serializable; +import java.util.*; +import java.util.function.Consumer; +import java.util.function.UnaryOperator; + +/** + * @author yezhihao + * home https://gitee.com/yezhihao/jt808-server + */ +public class VirtualList extends AbstractList implements RandomAccess, Serializable { + + private final E[] elementData; + private final int size; + + public VirtualList(E[] array, int length) { + this.elementData = array; + this.size = length; + } + + @Override + public int size() { + return size; + } + + @Override + public Object[] toArray() { + return elementData.clone(); + } + + @Override + @SuppressWarnings("unchecked") + public T[] toArray(T[] a) { + if (a.length < size) + return Arrays.copyOf(this.elementData, size, + (Class) a.getClass()); + System.arraycopy(this.elementData, 0, a, 0, size); + if (a.length > size) + a[size] = null; + return a; + } + + @Override + public E get(int index) { + return elementData[index]; + } + + @Override + public E set(int index, E element) { + E oldValue = elementData[index]; + elementData[index] = element; + return oldValue; + } + + @Override + public int indexOf(Object o) { + E[] a = this.elementData; + if (o == null) { + for (int i = 0; i < size; i++) + if (a[i] == null) + return i; + } else { + for (int i = 0; i < size; i++) + if (o.equals(a[i])) + return i; + } + return -1; + } + + @Override + public boolean contains(Object o) { + return indexOf(o) != -1; + } + + @Override + public Spliterator spliterator() { + return Spliterators.spliterator(elementData, 0, size, Spliterator.ORDERED); + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + for (int i = 0; i < size; i++) { + action.accept(elementData[i]); + } + } + + @Override + public void replaceAll(UnaryOperator operator) { + Objects.requireNonNull(operator); + E[] a = this.elementData; + for (int i = 0; i < size; i++) { + a[i] = operator.apply(a[i]); + } + } + + @Override + public void sort(Comparator c) { + Arrays.sort(elementData, 0, size, c); + } +} \ No newline at end of file diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..241de4e --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,29 @@ + + + + logs/ + jt808 + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/test/java/io/github/yezhihao/netmc/Test.java b/src/test/java/io/github/yezhihao/netmc/Test.java new file mode 100644 index 0000000..dbeba5f --- /dev/null +++ b/src/test/java/io/github/yezhihao/netmc/Test.java @@ -0,0 +1,7 @@ +package io.github.yezhihao.netmc; + +public class Test { + public static void main(String[] args) { + System.out.println(); + } +}