[FEATURE] WebSocket hook rework

This commit is contained in:
Gravita 2023-03-20 00:25:27 +07:00
parent 71739f5670
commit 17f9c28f3d
8 changed files with 59 additions and 162 deletions

View file

@ -0,0 +1,12 @@
package pro.gravit.launchserver.modules.events;
import pro.gravit.launcher.modules.LauncherModule;
import pro.gravit.launchserver.LaunchServer;
public class LaunchServerNettyFullInitEvent extends LauncherModule.Event {
public final LaunchServer server;
public LaunchServerNettyFullInitEvent(LaunchServer server) {
this.server = server;
}
}

View file

@ -5,7 +5,6 @@
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelMatchers;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -21,7 +20,6 @@
import pro.gravit.launchserver.socket.response.auth.*; import pro.gravit.launchserver.socket.response.auth.*;
import pro.gravit.launchserver.socket.response.management.FeaturesResponse; import pro.gravit.launchserver.socket.response.management.FeaturesResponse;
import pro.gravit.launchserver.socket.response.management.GetPublicKeyResponse; import pro.gravit.launchserver.socket.response.management.GetPublicKeyResponse;
import pro.gravit.launchserver.socket.response.management.ServerStatusResponse;
import pro.gravit.launchserver.socket.response.profile.BatchProfileByUsername; import pro.gravit.launchserver.socket.response.profile.BatchProfileByUsername;
import pro.gravit.launchserver.socket.response.profile.ProfileByUUIDResponse; import pro.gravit.launchserver.socket.response.profile.ProfileByUUIDResponse;
import pro.gravit.launchserver.socket.response.profile.ProfileByUsername; import pro.gravit.launchserver.socket.response.profile.ProfileByUsername;
@ -33,26 +31,21 @@
import pro.gravit.launchserver.socket.response.update.UpdateListResponse; import pro.gravit.launchserver.socket.response.update.UpdateListResponse;
import pro.gravit.launchserver.socket.response.update.UpdateResponse; import pro.gravit.launchserver.socket.response.update.UpdateResponse;
import pro.gravit.utils.BiHookSet; import pro.gravit.utils.BiHookSet;
import pro.gravit.utils.HookSet;
import pro.gravit.utils.ProviderMap; import pro.gravit.utils.ProviderMap;
import pro.gravit.utils.helper.IOHelper; import pro.gravit.utils.helper.IOHelper;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
public class WebSocketService { public class WebSocketService {
public static final ProviderMap<WebSocketServerResponse> providers = new ProviderMap<>(); public static final ProviderMap<WebSocketServerResponse> providers = new ProviderMap<>();
public final ChannelGroup channels; public final ChannelGroup channels;
public final BiHookSet<WebSocketRequestContext, ChannelHandlerContext> hook = new BiHookSet<>(); public final HookSet<WebSocketRequestContext> hookBeforeParsing = new HookSet<>();
//Statistic data public final HookSet<WebSocketRequestContext> hookBeforeExecute = new HookSet<>();
public final AtomicLong shortRequestLatency = new AtomicLong(); public final HookSet<WebSocketRequestContext> hookComplete = new HookSet<>();
public final AtomicLong shortRequestCounter = new AtomicLong(); public final BiHookSet<Channel, Object> hookSend = new BiHookSet<>();
public final AtomicLong middleRequestLatency = new AtomicLong();
public final AtomicLong middleRequestCounter = new AtomicLong();
public final AtomicLong longRequestLatency = new AtomicLong();
public final AtomicLong longRequestCounter = new AtomicLong();
public final AtomicLong lastRequestTime = new AtomicLong();
private final LaunchServer server; private final LaunchServer server;
private final Gson gson; private final Gson gson;
private transient final Logger logger = LogManager.getLogger(); private transient final Logger logger = LogManager.getLogger();
@ -83,7 +76,6 @@ public static void registerResponses() {
providers.register("verifySecureLevelKey", VerifySecureLevelKeyResponse.class); providers.register("verifySecureLevelKey", VerifySecureLevelKeyResponse.class);
providers.register("securityReport", SecurityReportResponse.class); providers.register("securityReport", SecurityReportResponse.class);
providers.register("hardwareReport", HardwareReportResponse.class); providers.register("hardwareReport", HardwareReportResponse.class);
providers.register("serverStatus", ServerStatusResponse.class);
providers.register("currentUser", CurrentUserResponse.class); providers.register("currentUser", CurrentUserResponse.class);
providers.register("features", FeaturesResponse.class); providers.register("features", FeaturesResponse.class);
providers.register("refreshToken", RefreshTokenResponse.class); providers.register("refreshToken", RefreshTokenResponse.class);
@ -119,55 +111,27 @@ public void forEachActiveChannels(BiConsumer<Channel, WebSocketFrameHandler> cal
} }
public void process(ChannelHandlerContext ctx, TextWebSocketFrame frame, Client client, String ip) { public void process(ChannelHandlerContext ctx, TextWebSocketFrame frame, Client client, String ip) {
long startTimeNanos = System.nanoTime();
String request = frame.text(); String request = frame.text();
WebSocketRequestContext context = new WebSocketRequestContext(ctx, request, client, ip);
if(hookBeforeParsing.hook(context)) {
return;
}
WebSocketServerResponse response = gson.fromJson(request, WebSocketServerResponse.class); WebSocketServerResponse response = gson.fromJson(request, WebSocketServerResponse.class);
context.response = response;
if (response == null) { if (response == null) {
RequestEvent event = new ErrorRequestEvent("This type of request is not supported"); RequestEvent event = new ErrorRequestEvent("This type of request is not supported");
sendObject(ctx, event); hookComplete.hook(context);
sendObject(ctx.channel(), event, WebSocketEvent.class);
return; return;
} }
process(ctx, response, client, ip); process(context, response, client, ip);
long executeTime = System.nanoTime() - startTimeNanos;
if (executeTime > 0) {
addRequestTimeToStats(executeTime);
}
} }
public void addRequestTimeToStats(long nanos) { void process(WebSocketRequestContext context, WebSocketServerResponse response, Client client, String ip) {
if (nanos < 100_000_000L) // < 100 millis if (hookBeforeExecute.hook(context)) {
{
shortRequestCounter.getAndIncrement();
shortRequestLatency.getAndAdd(nanos);
} else if (nanos < 1_000_000_000L) // > 100 millis and < 1 second
{
middleRequestCounter.getAndIncrement();
middleRequestLatency.getAndAdd(nanos);
} else // > 1 second
{
longRequestCounter.getAndIncrement();
longRequestLatency.getAndAdd(nanos);
}
long lastTime = lastRequestTime.get();
long currentTime = System.currentTimeMillis();
if (currentTime - lastTime > 60 * 1000) //1 minute
{
lastRequestTime.set(currentTime);
shortRequestLatency.set(0);
shortRequestCounter.set(0);
middleRequestCounter.set(0);
middleRequestLatency.set(0);
longRequestCounter.set(0);
longRequestLatency.set(0);
}
}
void process(ChannelHandlerContext ctx, WebSocketServerResponse response, Client client, String ip) {
WebSocketRequestContext context = new WebSocketRequestContext(response, client, ip);
if (hook.hook(context, ctx)) {
return; return;
} }
ChannelHandlerContext ctx = context.context;
if (response instanceof SimpleResponse simpleResponse) { if (response instanceof SimpleResponse simpleResponse) {
simpleResponse.server = server; simpleResponse.server = server;
simpleResponse.service = this; simpleResponse.service = this;
@ -177,38 +141,27 @@ void process(ChannelHandlerContext ctx, WebSocketServerResponse response, Client
} }
try { try {
response.execute(ctx, client); response.execute(ctx, client);
} catch (Exception e) { } catch (Throwable e) {
context.exception = e;
logger.error("WebSocket request processing failed", e); logger.error("WebSocket request processing failed", e);
RequestEvent event; RequestEvent event;
event = new ErrorRequestEvent("Fatal server error. Contact administrator"); event = new ErrorRequestEvent("Fatal server error. Contact administrator");
if (response instanceof SimpleResponse) { if (response instanceof SimpleResponse) {
event.requestUUID = ((SimpleResponse) response).requestUUID; event.requestUUID = ((SimpleResponse) response).requestUUID;
} }
sendObject(ctx, event); sendObject(ctx.channel(), event);
} }
hookComplete.hook(context);
} }
public void registerClient(Channel channel) { public void registerClient(Channel channel) {
channels.add(channel); channels.add(channel);
} }
public void sendObject(ChannelHandlerContext ctx, Object obj) {
String msg = gson.toJson(obj, WebSocketEvent.class);
if (logger.isTraceEnabled()) {
logger.trace("Send to {}: {}", getIPFromContext(ctx), msg);
}
ctx.writeAndFlush(new TextWebSocketFrame(msg), ctx.voidPromise());
}
public void sendObject(ChannelHandlerContext ctx, Object obj, Type type) {
String msg = gson.toJson(obj, type);
if (logger.isTraceEnabled()) {
logger.trace("Send to {}: {}", getIPFromContext(ctx), msg);
}
ctx.writeAndFlush(new TextWebSocketFrame(msg), ctx.voidPromise());
}
public void sendObject(Channel channel, Object obj) { public void sendObject(Channel channel, Object obj) {
if(hookSend.hook(channel, obj)) {
return;
}
String msg = gson.toJson(obj, WebSocketEvent.class); String msg = gson.toJson(obj, WebSocketEvent.class);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Send to channel {}: {}", getIPFromChannel(channel), msg); logger.trace("Send to channel {}: {}", getIPFromChannel(channel), msg);
@ -217,6 +170,9 @@ public void sendObject(Channel channel, Object obj) {
} }
public void sendObject(Channel channel, Object obj, Type type) { public void sendObject(Channel channel, Object obj, Type type) {
if(hookSend.hook(channel, obj)) {
return;
}
String msg = gson.toJson(obj, type); String msg = gson.toJson(obj, type);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Send to channel {}: {}", getIPFromChannel(channel), msg); logger.trace("Send to channel {}: {}", getIPFromChannel(channel), msg);
@ -224,23 +180,9 @@ public void sendObject(Channel channel, Object obj, Type type) {
channel.writeAndFlush(new TextWebSocketFrame(msg), channel.voidPromise()); channel.writeAndFlush(new TextWebSocketFrame(msg), channel.voidPromise());
} }
public void sendObjectAll(Object obj) {
String msg = gson.toJson(obj, WebSocketEvent.class);
if (logger.isTraceEnabled()) {
logger.trace("Send to all: {}", msg);
}
for (Channel ch : channels) {
ch.writeAndFlush(new TextWebSocketFrame(msg), ch.voidPromise());
}
}
public void sendObjectAll(Object obj, Type type) { public void sendObjectAll(Object obj, Type type) {
String msg = gson.toJson(obj, type);
if (logger.isTraceEnabled()) {
logger.trace("Send to all: {}", msg);
}
for (Channel ch : channels) { for (Channel ch : channels) {
ch.writeAndFlush(new TextWebSocketFrame(msg), ch.voidPromise()); sendObject(ch, obj, type);
} }
} }
@ -251,6 +193,9 @@ public void sendObjectToUUID(UUID userUuid, Object obj, Type type) {
if (wsHandler == null) continue; if (wsHandler == null) continue;
Client client = wsHandler.getClient(); Client client = wsHandler.getClient();
if (client == null || !userUuid.equals(client.uuid)) continue; if (client == null || !userUuid.equals(client.uuid)) continue;
if(hookSend.hook(ch, obj)) {
continue;
}
String msg = gson.toJson(obj, type); String msg = gson.toJson(obj, type);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Send to {}({}): {}", getIPFromChannel(ch), userUuid, msg); logger.trace("Send to {}({}): {}", getIPFromChannel(ch), userUuid, msg);
@ -319,6 +264,9 @@ public boolean kickByIP(String ip, boolean isClose) {
} }
public void sendObjectAndClose(ChannelHandlerContext ctx, Object obj) { public void sendObjectAndClose(ChannelHandlerContext ctx, Object obj) {
if(hookSend.hook(ctx.channel(), obj)) {
return;
}
String msg = gson.toJson(obj, WebSocketEvent.class); String msg = gson.toJson(obj, WebSocketEvent.class);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Send and close {}: {}", getIPFromContext(ctx), msg); logger.trace("Send and close {}: {}", getIPFromContext(ctx), msg);
@ -327,6 +275,9 @@ public void sendObjectAndClose(ChannelHandlerContext ctx, Object obj) {
} }
public void sendObjectAndClose(ChannelHandlerContext ctx, Object obj, Type type) { public void sendObjectAndClose(ChannelHandlerContext ctx, Object obj, Type type) {
if(hookSend.hook(ctx.channel(), obj)) {
return;
}
String msg = gson.toJson(obj, type); String msg = gson.toJson(obj, type);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Send and close {}: {}", getIPFromContext(ctx), msg); logger.trace("Send and close {}: {}", getIPFromContext(ctx), msg);
@ -334,22 +285,17 @@ public void sendObjectAndClose(ChannelHandlerContext ctx, Object obj, Type type)
ctx.writeAndFlush(new TextWebSocketFrame(msg)).addListener(ChannelFutureListener.CLOSE); ctx.writeAndFlush(new TextWebSocketFrame(msg)).addListener(ChannelFutureListener.CLOSE);
} }
@Deprecated
public void sendEvent(EventResult obj) {
String msg = gson.toJson(obj, WebSocketEvent.class);
if (logger.isTraceEnabled()) {
logger.trace("Send event: {}", msg);
}
channels.writeAndFlush(new TextWebSocketFrame(msg), ChannelMatchers.all(), true);
}
public static class WebSocketRequestContext { public static class WebSocketRequestContext {
public final WebSocketServerResponse response; public final ChannelHandlerContext context;
public final String text;
public final Client client; public final Client client;
public final String ip; public final String ip;
public WebSocketServerResponse response;
public Throwable exception;
public WebSocketRequestContext(WebSocketServerResponse response, Client client, String ip) { public WebSocketRequestContext(ChannelHandlerContext context, String text, Client client, String ip) {
this.response = response; this.context = context;
this.text = text;
this.client = client; this.client = client;
this.ip = ip; this.ip = ip;
} }

View file

@ -4,6 +4,7 @@
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import pro.gravit.launchserver.LaunchServer; import pro.gravit.launchserver.LaunchServer;
import pro.gravit.launchserver.config.LaunchServerConfig; import pro.gravit.launchserver.config.LaunchServerConfig;
import pro.gravit.launchserver.modules.events.LaunchServerNettyFullInitEvent;
import pro.gravit.launchserver.socket.LauncherNettyServer; import pro.gravit.launchserver.socket.LauncherNettyServer;
import javax.net.ssl.SSLServerSocketFactory; import javax.net.ssl.SSLServerSocketFactory;
@ -34,5 +35,6 @@ public void run() {
for (LaunchServerConfig.NettyBindAddress address : server.config.netty.binds) { for (LaunchServerConfig.NettyBindAddress address : server.config.netty.binds) {
nettyServer.bind(new InetSocketAddress(address.address, address.port)); nettyServer.bind(new InetSocketAddress(address.address, address.port));
} }
server.modulesManager.invokeEvent(new LaunchServerNettyFullInitEvent(server));
} }
} }

View file

@ -17,7 +17,7 @@ public abstract class SimpleResponse implements WebSocketServerResponse {
public void sendResult(RequestEvent result) { public void sendResult(RequestEvent result) {
result.requestUUID = requestUUID; result.requestUUID = requestUUID;
service.sendObject(ctx, result); service.sendObject(ctx.channel(), result);
} }
public void sendResultAndClose(RequestEvent result) { public void sendResultAndClose(RequestEvent result) {
@ -28,6 +28,6 @@ public void sendResultAndClose(RequestEvent result) {
public void sendError(String errorMessage) { public void sendError(String errorMessage) {
ErrorRequestEvent event = new ErrorRequestEvent(errorMessage); ErrorRequestEvent event = new ErrorRequestEvent(errorMessage);
event.requestUUID = requestUUID; event.requestUUID = requestUUID;
service.sendObject(ctx, event); service.sendObject(ctx.channel(), event);
} }
} }

View file

@ -1,27 +0,0 @@
package pro.gravit.launchserver.socket.response.management;
import io.netty.channel.ChannelHandlerContext;
import pro.gravit.launcher.events.request.ServerStatusRequestEvent;
import pro.gravit.launchserver.socket.Client;
import pro.gravit.launchserver.socket.response.SimpleResponse;
import pro.gravit.utils.helper.JVMHelper;
public class ServerStatusResponse extends SimpleResponse {
@Override
public String getType() {
return "serverStatus";
}
@Override
public void execute(ChannelHandlerContext ctx, Client client) {
ServerStatusRequestEvent event = new ServerStatusRequestEvent(server.config.projectName);
event.totalJavaMemory = JVMHelper.RUNTIME.totalMemory();
event.freeJavaMemory = JVMHelper.RUNTIME.freeMemory();
event.shortLatency = (service.shortRequestLatency.get() / service.shortRequestCounter.get()) / 1_000_000;
event.middleLatency = (service.middleRequestLatency.get() / service.middleRequestCounter.get()) / 1_000_000;
event.longLatency = (service.longRequestLatency.get() / service.longRequestCounter.get()) / 1_000_000;
event.latency = ((service.shortRequestLatency.get() + service.middleRequestLatency.get() + service.longRequestLatency.get()) /
(service.shortRequestCounter.get() + service.middleRequestCounter.get() + service.longRequestCounter.get())) / 1_000_000;
sendResult(event);
}
}

View file

@ -1,24 +0,0 @@
package pro.gravit.launcher.events.request;
import pro.gravit.launcher.events.RequestEvent;
public class ServerStatusRequestEvent extends RequestEvent {
public final String projectName;
public long totalJavaMemory;
public long freeJavaMemory;
//Latency
public long shortLatency; //Millis
public long middleLatency; //Millis
public long longLatency; //Millis
public long latency;
public ServerStatusRequestEvent(String projectName) {
this.projectName = projectName;
}
@Override
public String getType() {
return "serverStatus";
}
}

View file

@ -1,11 +0,0 @@
package pro.gravit.launcher.request.management;
import pro.gravit.launcher.events.request.ServerStatusRequestEvent;
import pro.gravit.launcher.request.Request;
public class ServerStatusRequest extends Request<ServerStatusRequestEvent> {
@Override
public String getType() {
return "serverStatus";
}
}

View file

@ -102,7 +102,6 @@ public void registerResults() {
results.register("verifySecureLevelKey", VerifySecureLevelKeyRequestEvent.class); results.register("verifySecureLevelKey", VerifySecureLevelKeyRequestEvent.class);
results.register("securityReport", SecurityReportRequestEvent.class); results.register("securityReport", SecurityReportRequestEvent.class);
results.register("hardwareReport", HardwareReportRequestEvent.class); results.register("hardwareReport", HardwareReportRequestEvent.class);
results.register("serverStatus", ServerStatusRequestEvent.class);
results.register("currentUser", CurrentUserRequestEvent.class); results.register("currentUser", CurrentUserRequestEvent.class);
results.register("features", FeaturesRequestEvent.class); results.register("features", FeaturesRequestEvent.class);
results.register("refreshToken", RefreshTokenRequestEvent.class); results.register("refreshToken", RefreshTokenRequestEvent.class);