[FEATURE][EXPERIMENTAL] Исправлена работа Netty на клиенте

This commit is contained in:
Gravit 2019-04-29 11:00:37 +07:00
parent 9480e94778
commit 1cc08204e5
No known key found for this signature in database
GPG key ID: 061981E1E85D3216
4 changed files with 54 additions and 38 deletions

View file

@ -5,8 +5,8 @@
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
@ -16,27 +16,33 @@
import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslContextBuilder;
import ru.gravit.utils.helper.LogHelper; import ru.gravit.utils.helper.LogHelper;
import javax.net.ssl.SSLException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Map;
public class ClientJSONPoint { public abstract class ClientJSONPoint {
private final URI uri; private final URI uri;
private Channel ch; protected Channel ch;
private static final EventLoopGroup group = new NioEventLoopGroup(); private static final EventLoopGroup group = new NioEventLoopGroup();
protected WebSocketClientHandler webSocketClientHandler; protected WebSocketClientHandler webSocketClientHandler;
protected Bootstrap bootstrap = new Bootstrap();
public boolean isClosed;
public ClientJSONPoint(final String uri) { public ClientJSONPoint(final String uri) throws SSLException {
this.uri = URI.create(uri); this(URI.create(uri));
} }
public ClientJSONPoint(URI uri) { public ClientJSONPoint(URI uri) throws SSLException {
this.uri = uri; this.uri = uri;
} // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
webSocketClientHandler =
new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 1280000), this);
public void open() throws Exception {
Bootstrap b = new Bootstrap();
String protocol = uri.getScheme(); String protocol = uri.getScheme();
if (!"ws".equals(protocol) && !"wss".equals(protocol)) { if (!"ws".equals(protocol) && !"wss".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol); throw new IllegalArgumentException("Unsupported protocol: " + protocol);
@ -49,19 +55,8 @@ public void open() throws Exception {
final SslContext sslCtx; final SslContext sslCtx;
if (ssl) { if (ssl) {
sslCtx = SslContextBuilder.forClient().build(); sslCtx = SslContextBuilder.forClient().build();
} else { } else sslCtx = null;
sslCtx = null; bootstrap.group(group)
}
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
webSocketClientHandler =
new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, false, HttpHeaders.EMPTY_HEADERS, 1280000));
b.group(group)
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
@ -75,9 +70,11 @@ public void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast("ws-handler", webSocketClientHandler); pipeline.addLast("ws-handler", webSocketClientHandler);
} }
}); });
}
public void open() throws Exception {
//System.out.println("WebSocket Client connecting"); //System.out.println("WebSocket Client connecting");
ch = b.connect(uri.getHost(), uri.getPort()).sync().channel(); ch = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel();
webSocketClientHandler.handshakeFuture().sync(); webSocketClientHandler.handshakeFuture().sync();
} }
public ChannelFuture send(String text) public ChannelFuture send(String text)
@ -85,9 +82,12 @@ public ChannelFuture send(String text)
LogHelper.dev("Send: %s", text); LogHelper.dev("Send: %s", text);
return ch.writeAndFlush(new TextWebSocketFrame(text)); return ch.writeAndFlush(new TextWebSocketFrame(text));
} }
abstract void onMessage(String message) throws Exception;
abstract void onDisconnect() throws Exception;
public void close() throws InterruptedException { public void close() throws InterruptedException {
//System.out.println("WebSocket Client sending close"); //System.out.println("WebSocket Client sending close");
isClosed = true;
ch.writeAndFlush(new CloseWebSocketFrame()); ch.writeAndFlush(new CloseWebSocketFrame());
ch.closeFuture().sync(); ch.closeFuture().sync();
//group.shutdownGracefully(); //group.shutdownGracefully();

View file

@ -9,6 +9,7 @@
import ru.gravit.launcher.request.ResultInterface; import ru.gravit.launcher.request.ResultInterface;
import ru.gravit.utils.helper.LogHelper; import ru.gravit.utils.helper.LogHelper;
import javax.net.ssl.SSLException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.net.URI; import java.net.URI;
@ -25,7 +26,7 @@ public class ClientWebSocketService extends ClientJSONPoint {
private HashMap<String, Class<? extends ResultInterface>> results; private HashMap<String, Class<? extends ResultInterface>> results;
private HashSet<EventHandler> handlers; private HashSet<EventHandler> handlers;
public ClientWebSocketService(GsonBuilder gsonBuilder, String address, int i) { public ClientWebSocketService(GsonBuilder gsonBuilder, String address, int i) throws SSLException {
super(createURL(address)); super(createURL(address));
requests = new HashMap<>(); requests = new HashMap<>();
results = new HashMap<>(); results = new HashMap<>();
@ -49,14 +50,17 @@ private static URI createURL(String address) {
} }
@Override @Override
public void open() throws Exception { void onMessage(String message) {
super.open();
webSocketClientHandler.onMessageCallback = (message) -> {
ResultInterface result = gson.fromJson(message, ResultInterface.class); ResultInterface result = gson.fromJson(message, ResultInterface.class);
for (EventHandler handler : handlers) { for (EventHandler handler : handlers) {
handler.process(result); handler.process(result);
} }
}; }
@Override
void onDisconnect() {
LogHelper.info("WebSocket client disconnect");
if(onCloseCallback != null) onCloseCallback.onClose(0,"unsupported param", !isClosed);
} }
@FunctionalInterface @FunctionalInterface
@ -133,6 +137,7 @@ public void waitIfNotConnected()
public void sendObject(Object obj) throws IOException { public void sendObject(Object obj) throws IOException {
waitIfNotConnected(); waitIfNotConnected();
if(ch == null || !ch.isActive()) reconnectCallback.onReconnect();
//if(isClosed() && reconnectCallback != null) //if(isClosed() && reconnectCallback != null)
// reconnectCallback.onReconnect(); // reconnectCallback.onReconnect();
send(gson.toJson(obj, RequestInterface.class)); send(gson.toJson(obj, RequestInterface.class));
@ -140,6 +145,7 @@ public void sendObject(Object obj) throws IOException {
public void sendObject(Object obj, Type type) throws IOException { public void sendObject(Object obj, Type type) throws IOException {
waitIfNotConnected(); waitIfNotConnected();
if(ch == null || !ch.isActive()) reconnectCallback.onReconnect();
//if(isClosed() && reconnectCallback != null) //if(isClosed() && reconnectCallback != null)
// reconnectCallback.onReconnect(); // reconnectCallback.onReconnect();
send(gson.toJson(obj, type)); send(gson.toJson(obj, type));

View file

@ -8,6 +8,7 @@
import ru.gravit.utils.helper.JVMHelper; import ru.gravit.utils.helper.JVMHelper;
import ru.gravit.utils.helper.LogHelper; import ru.gravit.utils.helper.LogHelper;
import javax.net.ssl.SSLException;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -15,7 +16,7 @@
public class StandartClientWebSocketService extends ClientWebSocketService { public class StandartClientWebSocketService extends ClientWebSocketService {
public WaitEventHandler waitEventHandler = new WaitEventHandler(); public WaitEventHandler waitEventHandler = new WaitEventHandler();
public StandartClientWebSocketService(GsonBuilder gsonBuilder, String address, int i) { public StandartClientWebSocketService(GsonBuilder gsonBuilder, String address, int i) throws SSLException {
super(gsonBuilder, address, i); super(gsonBuilder, address, i);
} }
public class RequestFuture implements Future<ResultInterface> public class RequestFuture implements Future<ResultInterface>
@ -101,7 +102,13 @@ public RequestFuture asyncSendRequest(RequestInterface request) throws IOExcepti
} }
public static StandartClientWebSocketService initWebSockets(String address, boolean async) { public static StandartClientWebSocketService initWebSockets(String address, boolean async) {
StandartClientWebSocketService service = new StandartClientWebSocketService(new GsonBuilder(), address, 5000); StandartClientWebSocketService service;
try {
service = new StandartClientWebSocketService(new GsonBuilder(), address, 5000);
} catch (SSLException e) {
LogHelper.error(e);
return null;
}
service.registerResults(); service.registerResults();
service.registerRequests(); service.registerRequests();
service.registerHandler(service.waitEventHandler); service.registerHandler(service.waitEventHandler);

View file

@ -21,6 +21,7 @@
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private final WebSocketClientHandshaker handshaker; private final WebSocketClientHandshaker handshaker;
private final ClientJSONPoint clientJSONPoint;
private ChannelPromise handshakeFuture; private ChannelPromise handshakeFuture;
interface OnMessageCallback interface OnMessageCallback
{ {
@ -28,8 +29,9 @@ interface OnMessageCallback
} }
public OnMessageCallback onMessageCallback; public OnMessageCallback onMessageCallback;
public WebSocketClientHandler(final WebSocketClientHandshaker handshaker) { public WebSocketClientHandler(final WebSocketClientHandshaker handshaker, ClientJSONPoint clientJSONPoint) {
this.handshaker = handshaker; this.handshaker = handshaker;
this.clientJSONPoint = clientJSONPoint;
} }
public ChannelFuture handshakeFuture() { public ChannelFuture handshakeFuture() {
@ -49,6 +51,7 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
@Override @Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception { public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
//System.out.println("WebSocket Client disconnected!"); //System.out.println("WebSocket Client disconnected!");
clientJSONPoint.onDisconnect();
} }
@Override @Override
@ -70,7 +73,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
final WebSocketFrame frame = (WebSocketFrame) msg; final WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) { if (frame instanceof TextWebSocketFrame) {
final TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; final TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
if(onMessageCallback != null) onMessageCallback.onMessage(textFrame.text()); clientJSONPoint.onMessage(textFrame.text());
LogHelper.dev("Message: %s", textFrame.text()); LogHelper.dev("Message: %s", textFrame.text());
// uncomment to print request // uncomment to print request
// logger.info(textFrame.text()); // logger.info(textFrame.text());