[FEATURE][EXPERIMENTAL] Netty в качестве библиотеки вебсокетов

This commit is contained in:
Gravit 2019-04-29 10:19:47 +07:00
parent 91d2690761
commit 9480e94778
No known key found for this signature in database
GPG key ID: 061981E1E85D3216
9 changed files with 212 additions and 64 deletions

View file

@ -30,6 +30,7 @@
shadowJar {
classifier = null
relocate 'org.objectweb.asm', 'ru.gravit.repackage.org.objectweb.asm'
relocate 'io.netty', 'ru.gravit.repackage.io.netty'
configurations = [project.configurations.pack]
exclude 'module-info.class'
}
@ -39,7 +40,7 @@ pack project(':LauncherAPI') // Not error on obf.
bundle 'com.github.oshi:oshi-core:3.13.0'
bundle 'com.jfoenix:jfoenix:8.0.8'
bundle 'de.jensd:fontawesomefx:8.9'
bundle 'org.fusesource.jansi:jansi:1.17.1'
pack 'io.netty:netty-all:4.1.32.Final'
pack 'org.ow2.asm:asm-tree:7.1'
}

View file

@ -81,9 +81,9 @@ public void start(String... args) throws Throwable {
{
LogHelper.debug("WebSocket connect closed. Try reconnect");
try {
if (!Request.service.reconnectBlocking()) LogHelper.error("Error connecting");
Request.service.open();
LogHelper.debug("Connect to %s", Launcher.getConfig().address);
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
}
try {

View file

@ -459,9 +459,9 @@ public static void main(String... args) throws Throwable {
{
LogHelper.debug("WebSocket connect closed. Try reconnect");
try {
if (!Request.service.reconnectBlocking()) LogHelper.error("Error connecting");
Request.service.open();
LogHelper.debug("Connect to %s", Launcher.getConfig().address);
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
}
try {

View file

@ -3,7 +3,7 @@
dependencies {
compile project(':libLauncher')
compile 'org.java-websocket:Java-WebSocket:1.3.9'
compileOnly 'io.netty:netty-all:4.1.32.Final'
compile 'org.apache.httpcomponents:httpclient:4.5.7'
compileOnly 'com.google.guava:guava:26.0-jre'
compile files('../compat/authlib/authlib-clean.jar')

View file

@ -1,36 +1,100 @@
package ru.gravit.launcher.request.websockets;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import ru.gravit.utils.helper.LogHelper;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
public class ClientJSONPoint extends WebSocketClient {
public class ClientJSONPoint {
public ClientJSONPoint(URI serverUri, Map<String, String> httpHeaders, int connectTimeout) {
super(serverUri, new Draft_6455(), httpHeaders, connectTimeout);
private final URI uri;
private Channel ch;
private static final EventLoopGroup group = new NioEventLoopGroup();
protected WebSocketClientHandler webSocketClientHandler;
public ClientJSONPoint(final String uri) {
this.uri = URI.create(uri);
}
@Override
public void onOpen(ServerHandshake handshakedata) {
public ClientJSONPoint(URI uri) {
this.uri = uri;
}
@Override
public void onMessage(String message) {
public void open() throws Exception {
Bootstrap b = new Bootstrap();
String protocol = uri.getScheme();
if (!"ws".equals(protocol) && !"wss".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
boolean ssl = false;
if("wss".equals(protocol))
{
ssl = true;
}
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().build();
} else {
sslCtx = null;
}
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
webSocketClientHandler =
new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, false, HttpHeaders.EMPTY_HEADERS, 1280000));
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("ws-handler", webSocketClientHandler);
}
});
//System.out.println("WebSocket Client connecting");
ch = b.connect(uri.getHost(), uri.getPort()).sync().channel();
webSocketClientHandler.handshakeFuture().sync();
}
public ChannelFuture send(String text)
{
LogHelper.dev("Send: %s", text);
return ch.writeAndFlush(new TextWebSocketFrame(text));
}
@Override
public void onClose(int code, String reason, boolean remote) {
LogHelper.debug("Disconnected: " + code + " " + remote + " " + reason != null ? reason : "no reason");
public void close() throws InterruptedException {
//System.out.println("WebSocket Client sending close");
ch.writeAndFlush(new CloseWebSocketFrame());
ch.closeFuture().sync();
//group.shutdownGracefully();
}
@Override
public void onError(Exception ex) {
LogHelper.error(ex);
public void eval(final String text) throws IOException {
ch.writeAndFlush(new TextWebSocketFrame(text));
}
}

View file

@ -2,7 +2,6 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.java_websocket.handshake.ServerHandshake;
import ru.gravit.launcher.events.ExceptionEvent;
import ru.gravit.launcher.events.request.*;
import ru.gravit.launcher.hasher.HashedEntry;
@ -13,7 +12,6 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -28,7 +26,7 @@ public class ClientWebSocketService extends ClientJSONPoint {
private HashSet<EventHandler> handlers;
public ClientWebSocketService(GsonBuilder gsonBuilder, String address, int i) {
super(createURL(address), Collections.emptyMap(), i);
super(createURL(address));
requests = new HashMap<>();
results = new HashMap<>();
handlers = new HashSet<>();
@ -51,33 +49,16 @@ private static URI createURL(String address) {
}
@Override
public void onMessage(String message) {
ResultInterface result = gson.fromJson(message, ResultInterface.class);
for (EventHandler handler : handlers) {
handler.process(result);
}
public void open() throws Exception {
super.open();
webSocketClientHandler.onMessageCallback = (message) -> {
ResultInterface result = gson.fromJson(message, ResultInterface.class);
for (EventHandler handler : handlers) {
handler.process(result);
}
};
}
@Override
public void onError(Exception e) {
LogHelper.error(e);
}
@Override
public void onOpen(ServerHandshake handshakedata) {
//Notify open
synchronized (onConnect)
{
onConnect.notifyAll();
}
}
@Override
public void onClose(int code, String reason, boolean remote)
{
LogHelper.debug("Disconnected: " + code + " " + remote + " " + (reason != null ? reason : "no reason"));
if(onCloseCallback != null)
onCloseCallback.onClose(code, reason, remote);
}
@FunctionalInterface
public interface OnCloseCallback
{
@ -136,7 +117,7 @@ public void registerHandler(EventHandler eventHandler) {
}
public void waitIfNotConnected()
{
if(!isOpen() && !isClosed() && !isClosing())
/*if(!isOpen() && !isClosed() && !isClosing())
{
LogHelper.warning("WebSocket not connected. Try wait onConnect object");
synchronized (onConnect)
@ -147,20 +128,20 @@ public void waitIfNotConnected()
LogHelper.error(e);
}
}
}
}*/
}
public void sendObject(Object obj) throws IOException {
waitIfNotConnected();
if(isClosed() && reconnectCallback != null)
reconnectCallback.onReconnect();
//if(isClosed() && reconnectCallback != null)
// reconnectCallback.onReconnect();
send(gson.toJson(obj, RequestInterface.class));
}
public void sendObject(Object obj, Type type) throws IOException {
waitIfNotConnected();
if(isClosed() && reconnectCallback != null)
reconnectCallback.onReconnect();
//if(isClosed() && reconnectCallback != null)
// reconnectCallback.onReconnect();
send(gson.toJson(obj, type));
}

View file

@ -108,20 +108,25 @@ public static StandartClientWebSocketService initWebSockets(String address, bool
if(!async)
{
try {
if (!service.connectBlocking()) LogHelper.error("Error connecting");
service.open();
LogHelper.debug("Connect to %s", address);
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
}
}
else
{
service.connect();
try {
service.open();
} catch (Exception e) {
e.printStackTrace();
}
}
JVMHelper.RUNTIME.addShutdownHook(new Thread(() -> {
try {
if(service.isOpen())
service.closeBlocking();
//if(service.isOpen())
// service.closeBlocking();
service.close();
} catch (InterruptedException e) {
LogHelper.error(e);
}

View file

@ -0,0 +1,97 @@
package ru.gravit.launcher.request.websockets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.CharsetUtil;
import ru.gravit.utils.helper.LogHelper;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture;
interface OnMessageCallback
{
void onMessage(String text);
}
public OnMessageCallback onMessageCallback;
public WebSocketClientHandler(final WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
handshakeFuture = ctx.newPromise();
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
//System.out.println("WebSocket Client disconnected!");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
final Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
// web socket client connected
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
final FullHttpResponse response = (FullHttpResponse) msg;
throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content="
+ response.content().toString(CharsetUtil.UTF_8) + ')');
}
final WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
final TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
if(onMessageCallback != null) onMessageCallback.onMessage(textFrame.text());
LogHelper.dev("Message: %s", textFrame.text());
// uncomment to print request
// logger.info(textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
} else if (frame instanceof CloseWebSocketFrame)
ch.close();
else if (frame instanceof BinaryWebSocketFrame) {
// uncomment to print request
// logger.info(frame.content().toString());
}
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
}

View file

@ -162,9 +162,9 @@ public void run(String... args) throws Throwable {
{
LogHelper.debug("WebSocket connect closed. Try reconnect");
try {
if (!Request.service.reconnectBlocking()) LogHelper.error("Error connecting");
Request.service.open();
LogHelper.debug("Connect to %s", config.websocket.address);
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
}
auth();