diff --git a/src/main/java/io/github/yezhihao/netmc/session/MessageManager.java b/src/main/java/io/github/yezhihao/netmc/session/MessageManager.java deleted file mode 100644 index 32b92c9..0000000 --- a/src/main/java/io/github/yezhihao/netmc/session/MessageManager.java +++ /dev/null @@ -1,126 +0,0 @@ -package io.github.yezhihao.netmc.session; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import io.github.yezhihao.netmc.core.model.Header; -import io.github.yezhihao.netmc.core.model.Message; -import io.github.yezhihao.netmc.core.model.Response; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; - -/** - * @author yezhihao - * home https://gitee.com/yezhihao/jt808-server - */ -public class MessageManager { - - private static final Logger log = LoggerFactory.getLogger(MessageManager.class.getSimpleName()); - - private Map topicSubscribers = new ConcurrentHashMap<>(); - - private SessionManager sessionManager; - - public MessageManager(SessionManager sessionManager) { - this.sessionManager = sessionManager; - } - - /** - * 发送通知类消息,不接收响应 - */ - public boolean notify(Message message) { - Header header = message.getHeader(); - Object clientId = header.getClientId(); - - Session session = sessionManager.get(clientId); - if (session == null) { - log.info("<<<<<<<<<<消息发送失败,未注册,{}", message); - return false; - } - - header.setSerialNo(session.nextSerialNo()); - session.writeObject(message); - return true; - } - - /** - * 发送同步消息,接收响应 - * 默认超时时间20秒 - */ - public T request(Message request, Class responseClass) { - return request(request, responseClass, 20000); - } - - public T request(Message request, Class responseClass, long timeout) { - Header header = request.getHeader(); - Object clientId = header.getClientId(); - - Session session = sessionManager.get(clientId); - if (session == null) { - log.info("<<<<<<<<<<消息发送失败,未注册,{}", request); - return null; - } - - header.setSerialNo(session.nextSerialNo()); - - String key = requestKey(header, responseClass); - SynchronousQueue syncQueue = this.subscribe(key); - if (syncQueue == null) { - log.info("<<<<<<<<<<请勿重复发送,{}", request); - } - - try { - session.writeObject(request); - return (T) syncQueue.poll(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.warn("<<<<<<<<<<等待响应超时" + session, e); - } finally { - this.unsubscribe(key); - } - return null; - } - - /** - * 消息响应 - */ - public boolean response(Message message) { - SynchronousQueue queue = topicSubscribers.get(responseKey(message)); - if (queue != null) - return queue.offer(message); - return false; - } - - private SynchronousQueue subscribe(String key) { - SynchronousQueue queue = null; - if (!topicSubscribers.containsKey(key)) - topicSubscribers.put(key, queue = new SynchronousQueue()); - return queue; - } - - private void unsubscribe(String key) { - topicSubscribers.remove(key); - } - - private static String requestKey(Header header, Class responseClass) { - StringBuilder key = new StringBuilder(13 + 1 + 27 + 1 + 5); - key.append(header.getClientId()).append('/').append(responseClass.getName()); - - if (Response.class.isAssignableFrom(responseClass)) - key.append('/').append(header.getSerialNo()); - return key.toString(); - } - - private static String responseKey(Message response) { - Class responseClass = response.getClass(); - Header header = response.getHeader(); - - StringBuilder key = new StringBuilder(13 + 1 + 27 + 1 + 5); - key.append(header.getClientId()).append('/').append(responseClass.getName()); - - if (response instanceof Response) - key.append('/').append(((Response) response).getSerialNo()); - return key.toString(); - } -} \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/session/Session.java b/src/main/java/io/github/yezhihao/netmc/session/Session.java index 18358ae..6f961ef 100644 --- a/src/main/java/io/github/yezhihao/netmc/session/Session.java +++ b/src/main/java/io/github/yezhihao/netmc/session/Session.java @@ -1,14 +1,17 @@ package io.github.yezhihao.netmc.session; -import io.github.yezhihao.netmc.core.model.Header; +import io.github.yezhihao.netmc.core.model.Message; +import io.github.yezhihao.netmc.core.model.Response; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -29,24 +32,24 @@ public class Session { private final long creationTime; private volatile long lastAccessedTime; - private Map attributes; - private Object subject; - private Object snapshot; + private Map attributes; private Integer protocolVersion; private SessionManager sessionManager; protected Session(Channel channel, SessionManager sessionManager) { + this(null, channel, sessionManager); + } + + protected Session(Class sessionKeyClass, Channel channel, SessionManager sessionManager) { this.channel = channel; this.sessionManager = sessionManager; this.creationTime = System.currentTimeMillis(); this.lastAccessedTime = creationTime; - this.attributes = new TreeMap<>(); - } - - public void writeObject(Object message) { - log.info("<<<<<<<<<<消息下发{},{}", this, message); - channel.writeAndFlush(message); + if (sessionKeyClass != null) + this.attributes = new EnumMap(sessionKeyClass); + else + this.attributes = new TreeMap<>(); } public int getId() { @@ -70,33 +73,16 @@ public class Session { /** * 注册到SessionManager */ - public void register(Header header) { - this.register(header, null); - } - - public void register(Header header, Object subject) { - this.clientId = header.getClientId(); - this.registered = true; - this.subject = subject; - sessionManager.put(clientId, this); - } - public void register(Object clientId) { - this.register(clientId, null); - } - - public void register(Object clientId, Object subject) { - this.clientId = clientId; this.registered = true; - this.subject = subject; - sessionManager.put(clientId, this); + this.clientId = clientId; + this.sessionManager.put(this.clientId, this); } public Object getClientId() { return clientId; } - public long getCreationTime() { return creationTime; } @@ -110,38 +96,22 @@ public class Session { return lastAccessedTime; } - public Collection getAttributeNames() { + public Collection getAttributeNames() { return attributes.keySet(); } - public Object getAttribute(String name) { + public Object getAttribute(Object name) { return attributes.get(name); } - public void setAttribute(String name, Object value) { + public void setAttribute(Object name, Object value) { attributes.put(name, value); } - public Object removeAttribute(String name) { + public Object removeAttribute(Object name) { return attributes.remove(name); } - public Object getSubject() { - return subject; - } - - public void setSubject(Object subject) { - this.subject = subject; - } - - public Object getSnapshot() { - return snapshot; - } - - public void setSnapshot(Object snapshot) { - this.snapshot = snapshot; - } - public Integer getProtocolVersion() { return protocolVersion; } @@ -188,4 +158,84 @@ public class Session { sb.append(']'); return sb.toString(); } + + private transient Map topicSubscribers = new HashMap<>(); + + private static final ChannelFutureListener ERROR_LOG_LISTENER = future -> { + Throwable t = future.cause(); + if (t != null) + log.error("<<<<<<<<<<消息下发失败", t); + }; + + /** + * 发送通知类消息,不接收响应 + */ + public void notify(Object message) { + log.info("<<<<<<<<<<消息通知{},{}", this, message); + channel.writeAndFlush(message).addListener(ERROR_LOG_LISTENER); + } + + /** + * 发送同步消息,接收响应 + * 默认超时时间20秒 + */ + public T request(Message request, Class responseClass) { + return request(request, responseClass, 20000); + } + + public T request(Message request, Class responseClass, long timeout) { + String key = requestKey(request, responseClass); + SynchronousQueue syncQueue = this.subscribe(key); + if (syncQueue == null) { + log.info("<<<<<<<<<<请勿重复发送,{}", request); + } + + T result = null; + try { + log.info("<<<<<<<<<<消息请求{},{}", this, request); + ChannelFuture channelFuture = channel.writeAndFlush(request).addListener(ERROR_LOG_LISTENER); + if (channelFuture.awaitUninterruptibly().isSuccess()) + result = (T) syncQueue.poll(timeout, TimeUnit.MILLISECONDS); + } catch (Throwable e) { + log.warn("<<<<<<<<<<等待响应超时" + this, e); + } finally { + this.unsubscribe(key); + } + return result; + } + + /** + * 消息响应 + */ + public boolean response(Message message) { + SynchronousQueue queue = topicSubscribers.get(responseKey(message)); + if (queue != null) + return queue.offer(message); + return false; + } + + private SynchronousQueue subscribe(String key) { + SynchronousQueue queue = null; + synchronized (this) { + if (!topicSubscribers.containsKey(key)) + topicSubscribers.put(key, queue = new SynchronousQueue()); + } + return queue; + } + + private void unsubscribe(String key) { + topicSubscribers.remove(key); + } + + private static String requestKey(Message request, Class responseClass) { + if (Response.class.isAssignableFrom(responseClass)) + return Integer.toString(request.getSerialNo()); + return responseClass.getName(); + } + + private static String responseKey(Message response) { + if (response instanceof Response) + return Integer.toString(((Response) response).getResponseSerialNo()); + return response.getClass().getName(); + } } \ No newline at end of file diff --git a/src/main/java/io/github/yezhihao/netmc/session/SessionManager.java b/src/main/java/io/github/yezhihao/netmc/session/SessionManager.java index 4f159ed..6895bc7 100644 --- a/src/main/java/io/github/yezhihao/netmc/session/SessionManager.java +++ b/src/main/java/io/github/yezhihao/netmc/session/SessionManager.java @@ -24,24 +24,35 @@ public class SessionManager { private SessionListener sessionListener; + private Class sessionKeyClass; + public SessionManager() { + + } + + public SessionManager(Class sessionKeyClass) { + this(sessionKeyClass, null); + } + + public SessionManager(SessionListener sessionListener) { + this(null, sessionListener); + } + + public SessionManager(Class sessionKeyClass, SessionListener sessionListener) { this.sessionMap = new ConcurrentHashMap<>(); this.versionCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); + this.sessionKeyClass = sessionKeyClass; this.remover = future -> { Session session = future.channel().attr(Session.KEY).get(); if (session != null) { sessionMap.remove(session.getClientId(), session); } }; - } - - public SessionManager(SessionListener sessionListener) { - this(); this.sessionListener = sessionListener; } public Session newSession(Channel channel) { - Session session = new Session(channel, this); + Session session = new Session(sessionKeyClass, channel, this); callSessionCreatedListener(session); return session; }