From a19fa8f2fafa5c4c800b091e6f51ffe0fd5c7f12 Mon Sep 17 00:00:00 2001 From: Minecon724 Date: Mon, 23 Dec 2024 15:24:31 +0100 Subject: [PATCH] A --- pom.xml | 5 + .../m724/autopeerer/client/PacketHandler.java | 8 +- .../autopeerer/common/packet/Packets.java | 8 +- .../{s2c => c2s}/PingResponsePacket.java | 2 +- .../{s2c => c2s}/SessionResponsePacket.java | 2 +- .../{c2s => s2c}/PingRequestPacket.java | 2 +- .../{c2s => s2c}/SessionRequestPacket.java | 2 +- .../m724/autopeerer/server/ClientState.java | 44 ++++ .../java/eu/m724/autopeerer/server/Main.java | 17 +- .../m724/autopeerer/server/MyHttpHandler.java | 195 ++++++++++++++++++ .../autopeerer/server/MyWebsocketServer.java | 13 +- .../java/eu/m724/autopeerer/server/Node.java | 88 +++++++- .../m724/autopeerer/server/PacketHandler.java | 21 +- src/main/resources/server.properties | 8 +- 14 files changed, 371 insertions(+), 44 deletions(-) rename src/main/java/eu/m724/autopeerer/common/packet/{s2c => c2s}/PingResponsePacket.java (97%) rename src/main/java/eu/m724/autopeerer/common/packet/{s2c => c2s}/SessionResponsePacket.java (97%) rename src/main/java/eu/m724/autopeerer/common/packet/{c2s => s2c}/PingRequestPacket.java (96%) rename src/main/java/eu/m724/autopeerer/common/packet/{c2s => s2c}/SessionRequestPacket.java (98%) create mode 100644 src/main/java/eu/m724/autopeerer/server/MyHttpHandler.java diff --git a/pom.xml b/pom.xml index e25da8f..db1d5c3 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,11 @@ ipaddress 5.5.1 + + org.json + json + 20240303 + diff --git a/src/main/java/eu/m724/autopeerer/client/PacketHandler.java b/src/main/java/eu/m724/autopeerer/client/PacketHandler.java index c979700..b047a1c 100644 --- a/src/main/java/eu/m724/autopeerer/client/PacketHandler.java +++ b/src/main/java/eu/m724/autopeerer/client/PacketHandler.java @@ -5,10 +5,10 @@ import eu.m724.autopeerer.client.wireguard.WireGuardLive; import eu.m724.autopeerer.client.wireguard.WireGuardSession; import eu.m724.autopeerer.common.packet.Packet; import eu.m724.autopeerer.common.packet.Packets; -import eu.m724.autopeerer.common.packet.c2s.PingRequestPacket; -import eu.m724.autopeerer.common.packet.c2s.SessionRequestPacket; -import eu.m724.autopeerer.common.packet.s2c.PingResponsePacket; -import eu.m724.autopeerer.common.packet.s2c.SessionResponsePacket; +import eu.m724.autopeerer.common.packet.s2c.PingRequestPacket; +import eu.m724.autopeerer.common.packet.s2c.SessionRequestPacket; +import eu.m724.autopeerer.common.packet.c2s.PingResponsePacket; +import eu.m724.autopeerer.common.packet.c2s.SessionResponsePacket; import java.io.BufferedReader; import java.io.IOException; diff --git a/src/main/java/eu/m724/autopeerer/common/packet/Packets.java b/src/main/java/eu/m724/autopeerer/common/packet/Packets.java index 1681b50..01b0528 100644 --- a/src/main/java/eu/m724/autopeerer/common/packet/Packets.java +++ b/src/main/java/eu/m724/autopeerer/common/packet/Packets.java @@ -1,11 +1,11 @@ package eu.m724.autopeerer.common.packet; import eu.m724.autopeerer.common.packet.c2s.LoginPacket; -import eu.m724.autopeerer.common.packet.c2s.PingRequestPacket; -import eu.m724.autopeerer.common.packet.c2s.SessionRequestPacket; +import eu.m724.autopeerer.common.packet.s2c.PingRequestPacket; +import eu.m724.autopeerer.common.packet.s2c.SessionRequestPacket; import eu.m724.autopeerer.common.packet.s2c.LoginResponsePacket; -import eu.m724.autopeerer.common.packet.s2c.PingResponsePacket; -import eu.m724.autopeerer.common.packet.s2c.SessionResponsePacket; +import eu.m724.autopeerer.common.packet.c2s.PingResponsePacket; +import eu.m724.autopeerer.common.packet.c2s.SessionResponsePacket; import java.nio.ByteBuffer; import java.util.function.Consumer; diff --git a/src/main/java/eu/m724/autopeerer/common/packet/s2c/PingResponsePacket.java b/src/main/java/eu/m724/autopeerer/common/packet/c2s/PingResponsePacket.java similarity index 97% rename from src/main/java/eu/m724/autopeerer/common/packet/s2c/PingResponsePacket.java rename to src/main/java/eu/m724/autopeerer/common/packet/c2s/PingResponsePacket.java index 010c06d..ea3b35c 100644 --- a/src/main/java/eu/m724/autopeerer/common/packet/s2c/PingResponsePacket.java +++ b/src/main/java/eu/m724/autopeerer/common/packet/c2s/PingResponsePacket.java @@ -1,4 +1,4 @@ -package eu.m724.autopeerer.common.packet.s2c; +package eu.m724.autopeerer.common.packet.c2s; import eu.m724.autopeerer.common.packet.Packet; diff --git a/src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionResponsePacket.java b/src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionResponsePacket.java similarity index 97% rename from src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionResponsePacket.java rename to src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionResponsePacket.java index 52bdc4b..9c135a7 100644 --- a/src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionResponsePacket.java +++ b/src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionResponsePacket.java @@ -1,4 +1,4 @@ -package eu.m724.autopeerer.common.packet.s2c; +package eu.m724.autopeerer.common.packet.c2s; import eu.m724.autopeerer.common.packet.Packet; diff --git a/src/main/java/eu/m724/autopeerer/common/packet/c2s/PingRequestPacket.java b/src/main/java/eu/m724/autopeerer/common/packet/s2c/PingRequestPacket.java similarity index 96% rename from src/main/java/eu/m724/autopeerer/common/packet/c2s/PingRequestPacket.java rename to src/main/java/eu/m724/autopeerer/common/packet/s2c/PingRequestPacket.java index c1432c7..dea0978 100644 --- a/src/main/java/eu/m724/autopeerer/common/packet/c2s/PingRequestPacket.java +++ b/src/main/java/eu/m724/autopeerer/common/packet/s2c/PingRequestPacket.java @@ -1,4 +1,4 @@ -package eu.m724.autopeerer.common.packet.c2s; +package eu.m724.autopeerer.common.packet.s2c; import eu.m724.autopeerer.common.packet.Packet; diff --git a/src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionRequestPacket.java b/src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionRequestPacket.java similarity index 98% rename from src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionRequestPacket.java rename to src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionRequestPacket.java index 28520af..8a92d1b 100644 --- a/src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionRequestPacket.java +++ b/src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionRequestPacket.java @@ -1,4 +1,4 @@ -package eu.m724.autopeerer.common.packet.c2s; +package eu.m724.autopeerer.common.packet.s2c; import eu.m724.autopeerer.common.packet.Packet; import inet.ipaddr.HostName; diff --git a/src/main/java/eu/m724/autopeerer/server/ClientState.java b/src/main/java/eu/m724/autopeerer/server/ClientState.java index c9ee405..d003d73 100644 --- a/src/main/java/eu/m724/autopeerer/server/ClientState.java +++ b/src/main/java/eu/m724/autopeerer/server/ClientState.java @@ -2,9 +2,19 @@ package eu.m724.autopeerer.server; import eu.m724.autopeerer.common.packet.Packet; import eu.m724.autopeerer.common.packet.Packets; +import eu.m724.autopeerer.common.packet.c2s.PingResponsePacket; +import eu.m724.autopeerer.common.packet.c2s.SessionResponsePacket; +import eu.m724.autopeerer.common.packet.s2c.PingRequestPacket; import org.java_websocket.WebSocket; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; public class ClientState { @@ -28,4 +38,38 @@ public class ClientState { void end() { socket.close(); } + + CompletableFuture ping(InetAddress host) { + var future = new CompletableFuture(); + + var id = (short) (ThreadLocalRandom.current().nextInt() & 0xFFFF); + + pingConsumers.put(id, future::complete); + send(new PingRequestPacket(id, host)); + + return future; + } + + + /* Packet functions */ + + private Map> pingConsumers = new HashMap<>(); + + + void onPacketReceived(Packet p) { + if (p instanceof PingResponsePacket packet) { + handlePingResponse(packet); + } else if (p instanceof SessionResponsePacket packet) { + handleSessionResponse(packet); + } + } + + private void handlePingResponse(PingResponsePacket packet) { + System.out.printf("[%d] Ping response #%d: %s avg %.3f / mdev %.3f ms\n", clientId, packet.requestId, packet.status, packet.average, packet.meanDeviation); + pingConsumers.remove(packet.requestId).accept(packet); + } + + private void handleSessionResponse(SessionResponsePacket packet) { + System.out.printf("[%d] Session response #%d: %s\n", clientId, packet.sessionId, packet.result); + } } diff --git a/src/main/java/eu/m724/autopeerer/server/Main.java b/src/main/java/eu/m724/autopeerer/server/Main.java index 215ea8b..8ded69c 100644 --- a/src/main/java/eu/m724/autopeerer/server/Main.java +++ b/src/main/java/eu/m724/autopeerer/server/Main.java @@ -1,11 +1,13 @@ package eu.m724.autopeerer.server; +import com.sun.net.httpserver.HttpServer; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.Set; public class Main { - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws IOException { System.out.println("Hello world!"); var config = new ServerConfiguration(); @@ -23,10 +25,21 @@ public class Main { var packetHandler = new PacketHandler(nodes); var server = new MyWebsocketServer( - new InetSocketAddress(config.getString("listen.address"), config.getInt("listen.port")), + new InetSocketAddress(config.getString("socket.address"), config.getInt("socket.port")), packetHandler ); server.start(); + + HttpServer httpServer = HttpServer.create( + new InetSocketAddress(config.getString("http.address"), config.getInt("http.port")), + 0 + ); + + httpServer.createContext("/", new MyHttpHandler(nodes, config.getString("http.key"))); + httpServer.setExecutor(null); + httpServer.start(); + + System.out.println("HTTP server started on " + httpServer.getAddress().getHostString() + ":" + httpServer.getAddress().getPort()); } } diff --git a/src/main/java/eu/m724/autopeerer/server/MyHttpHandler.java b/src/main/java/eu/m724/autopeerer/server/MyHttpHandler.java new file mode 100644 index 0000000..14e7a00 --- /dev/null +++ b/src/main/java/eu/m724/autopeerer/server/MyHttpHandler.java @@ -0,0 +1,195 @@ +package eu.m724.autopeerer.server; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import eu.m724.autopeerer.common.packet.c2s.PingResponsePacket; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class MyHttpHandler implements HttpHandler { + private final Set nodes; + private final String key; + + public MyHttpHandler(Set nodes, String key) { + this.nodes = nodes; + this.key = key; + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + String key = exchange.getRequestHeaders().getFirst("key"); + + if (!this.key.equals(key)) { + exchange.sendResponseHeaders(403, -1); + return; + } + + try { + inner(exchange); + } catch (IOException e) { + exchange.sendResponseHeaders(500, -1); + } catch (EndException e) { } + } + + private void inner(HttpExchange exchange) throws IOException, EndException { + String[] path = exchange.getRequestURI().getPath().substring(1).split("/"); + if (path.length == 0) { + sendResponse(exchange, 200, "Hello"); + } else { + if (path[0].equals("nodes")) { + requireMethod(exchange, "GET"); + + var json = new JSONObject(); + nodes.forEach(node -> { + var val = new JSONObject() + .put("name", node.name()) + .put("host", node.host()) + .put("online", node.connected()); + json.put(node.id(), val); + }); + + sendResponse(exchange, 200, json); + } else if (path[0].equals("ping")) { + requireMethod(exchange, "POST"); + + var json = getJsonBody(exchange); + + Set selectedNodes = this.nodes; + if (json.has("node")) { + selectedNodes = nodes.stream() + .filter(n -> n.id().equals(json.getString("node"))) + .collect(Collectors.toSet()); + + if (selectedNodes.isEmpty()) // no node with that ID found + sendResponse(exchange, 400, "node"); + } + + InetAddress target = null; + try { + target = InetAddress.getByName(json.getString("target")); + } catch (JSONException | UnknownHostException e) { + sendResponse(exchange, 400, "target"); + } + + sseStart(exchange); + + var futures = new HashSet>(); + + for (Node node : selectedNodes) { + if (!node.connected()) { // node is offline + var response = new JSONObject() + .put("node", node.id()) + .put("status", "OFFLINE"); + sseWrite(exchange, response.toString()); + continue; + } + + var future = node.getClient().ping(target).handle((result, ex) -> { + try { + if (ex != null) { + sendResponse(exchange, 500); + } + var response = new JSONObject() + .put("node", node.id()) + .put("status", result.status); + + if (result.status == PingResponsePacket.PingResponseStatus.OK) { + response.put("average", result.average) + .put("meanDeviation", result.meanDeviation); + } + + sseWrite(exchange, response.toString()); + } catch (IOException e) { + e.printStackTrace(); + } catch (EndException e) { } + + return (Void) null; + }); + + futures.add(future); + } + + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).handle((v, ex) -> { + // TODO I don't know what to do with the errors + + if (ex != null) { + ex.printStackTrace(); + } + + try { + sseClose(exchange); + } catch (IOException e) { + e.printStackTrace(); + } + + return null; + }); + } else if (path[0].equals("peer")) { + + } + } + } + + private void sseStart(HttpExchange exchange) throws IOException { + exchange.getResponseHeaders().add("Content-Type", "text/event-stream; charset=utf-8"); + + exchange.sendResponseHeaders(200, 0); // 0 for chunked encoding + } + + private void sseWrite(HttpExchange exchange, String message) throws IOException { + var body = exchange.getResponseBody(); + body.write(("data: " + message + "\n\n").getBytes(StandardCharsets.UTF_8)); + body.flush(); + } + + private void sseClose(HttpExchange exchange) throws IOException { + exchange.getResponseBody().close(); + } + + private JSONObject getJsonBody(HttpExchange exchange) throws IOException { + String body; + try (var is = exchange.getRequestBody()) { + body = new String(is.readAllBytes()); + } + + return new JSONObject(body); + } + + private void requireMethod(HttpExchange exchange, String method) throws IOException, EndException { + if (!exchange.getRequestMethod().equals(method)) { + sendResponse(exchange, 405); + } + } + + private void sendResponse(HttpExchange exchange, int statusCode) throws IOException, EndException { + exchange.sendResponseHeaders(statusCode, -1); + throw new EndException(); + } + + private void sendResponse(HttpExchange exchange, int statusCode, Object response) throws IOException, EndException { + var type = (response instanceof JSONObject || response instanceof JSONArray) ? "application/json; charset=utf8" : "text/plain; charset=utf8"; + exchange.getResponseHeaders().add("Content-Type", type); + + byte[] responseBytes = response.toString().getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(statusCode, responseBytes.length); + + OutputStream outputStream = exchange.getResponseBody(); + outputStream.write(responseBytes); + outputStream.close(); + + throw new EndException(); + } + + private static class EndException extends Exception { } +} diff --git a/src/main/java/eu/m724/autopeerer/server/MyWebsocketServer.java b/src/main/java/eu/m724/autopeerer/server/MyWebsocketServer.java index 2397a35..4981fac 100644 --- a/src/main/java/eu/m724/autopeerer/server/MyWebsocketServer.java +++ b/src/main/java/eu/m724/autopeerer/server/MyWebsocketServer.java @@ -1,7 +1,7 @@ package eu.m724.autopeerer.server; -import eu.m724.autopeerer.common.packet.c2s.PingRequestPacket; -import eu.m724.autopeerer.common.packet.c2s.SessionRequestPacket; +import eu.m724.autopeerer.common.packet.s2c.PingRequestPacket; +import eu.m724.autopeerer.common.packet.s2c.SessionRequestPacket; import inet.ipaddr.IPAddressString; import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; @@ -35,10 +35,9 @@ public class MyWebsocketServer extends WebSocketServer { System.out.printf("[%d] Connected: %s\n", id, conn.getRemoteSocketAddress().getHostString()); // TODO testing - try { - state.send(new PingRequestPacket((short) 1, InetAddress.getByName("1.1.1.1"))); + /*state.send(new PingRequestPacket((short) 1, InetAddress.getByName("1.1.1.1"))); state.send(new PingRequestPacket((short) 2, InetAddress.getByName("1.2.3.4"))); - state.send(new PingRequestPacket((short) 3, InetAddress.getByName("1.1.1.2"))); + state.send(new PingRequestPacket((short) 3, InetAddress.getByName("1.1.1.2")));*/ state.send(new SessionRequestPacket( (short) 1, @@ -48,14 +47,12 @@ public class MyWebsocketServer extends WebSocketServer { 51820, 4242420000L )); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } } @Override public void onClose(WebSocket conn, int code, String reason, boolean remote) { var state = states.remove(conn); + state.node.setClient(null); var id = state.authenticated ? state.node.id() : conn.getRemoteSocketAddress().getHostString(); System.out.printf("[%d] Disconnected: %s\n", state.clientId, id); } diff --git a/src/main/java/eu/m724/autopeerer/server/Node.java b/src/main/java/eu/m724/autopeerer/server/Node.java index 876a21d..fc0a3da 100644 --- a/src/main/java/eu/m724/autopeerer/server/Node.java +++ b/src/main/java/eu/m724/autopeerer/server/Node.java @@ -1,14 +1,48 @@ package eu.m724.autopeerer.server; +import eu.m724.autopeerer.common.packet.Packet; +import eu.m724.autopeerer.common.packet.Packets; + import java.util.Base64; +import java.util.Objects; import java.util.Properties; -public record Node( - String id, - byte[] key, - String name, - String host -) { +public final class Node { + private final String id; + private final byte[] key; + private final String name; + private final String host; + + private ClientState client; + + public Node( + String id, + byte[] key, + String name, + String host + ) { + this.id = id; + this.key = key; + this.name = name; + this.host = host; + } + + void setClient(ClientState client) { + this.client = client; + } + + ClientState getClient() { + return client; + } + + boolean connected() { + return client != null; + } + + void send(Packet packet) { + client.send(packet); + } + static Node fromProperties(String id, Properties properties) { return new Node( id, @@ -17,4 +51,46 @@ public record Node( properties.getProperty("host") ); } + + public String id() { + return id; + } + + public byte[] key() { + return key; + } + + public String name() { + return name; + } + + public String host() { + return host; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (Node) obj; + return Objects.equals(this.id, that.id) && + Objects.equals(this.key, that.key) && + Objects.equals(this.name, that.name) && + Objects.equals(this.host, that.host); + } + + @Override + public int hashCode() { + return Objects.hash(id, key, name, host); + } + + @Override + public String toString() { + return "Node[" + + "id=" + id + ", " + + "key=" + key + ", " + + "name=" + name + ", " + + "host=" + host + ']'; + } + } diff --git a/src/main/java/eu/m724/autopeerer/server/PacketHandler.java b/src/main/java/eu/m724/autopeerer/server/PacketHandler.java index 4147ac3..71be9f2 100644 --- a/src/main/java/eu/m724/autopeerer/server/PacketHandler.java +++ b/src/main/java/eu/m724/autopeerer/server/PacketHandler.java @@ -3,8 +3,8 @@ package eu.m724.autopeerer.server; import eu.m724.autopeerer.common.packet.Packet; import eu.m724.autopeerer.common.packet.Packets; import eu.m724.autopeerer.common.packet.c2s.LoginPacket; -import eu.m724.autopeerer.common.packet.s2c.PingResponsePacket; -import eu.m724.autopeerer.common.packet.s2c.SessionResponsePacket; +import eu.m724.autopeerer.common.packet.c2s.PingResponsePacket; +import eu.m724.autopeerer.common.packet.c2s.SessionResponsePacket; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -46,10 +46,10 @@ public class PacketHandler { return; } - if (p instanceof PingResponsePacket packet) { - handlePingResponse(state, packet); - } else if (p instanceof SessionResponsePacket packet) { - handleSessionResponse(state, packet); + try { + state.onPacketReceived(p); + } catch (Exception e) { + throw new RuntimeException("Exception handling packet by ClientState", e); } } @@ -73,6 +73,7 @@ public class PacketHandler { System.out.printf("[%d] Logged in as %s\n", state.clientId, node.getKey().id()); nodes.put(node.getKey(), state); + node.getKey().setClient(state); state.authenticated = true; state.node = node.getKey(); } else { @@ -80,12 +81,4 @@ public class PacketHandler { state.end(); } } - - private void handlePingResponse(ClientState state, PingResponsePacket packet) { - System.out.printf("[%d] Ping response #%d: %s avg %.3f / mdev %.3f ms\n", state.clientId, packet.requestId, packet.status, packet.average, packet.meanDeviation); - } - - private void handleSessionResponse(ClientState state, SessionResponsePacket packet) { - System.out.printf("[%d] Session response #%d: %s\n", state.clientId, packet.sessionId, packet.result); - } } diff --git a/src/main/resources/server.properties b/src/main/resources/server.properties index 84ea1a0..32d8a99 100644 --- a/src/main/resources/server.properties +++ b/src/main/resources/server.properties @@ -1,2 +1,6 @@ -listen.address=127.0.0.1 -listen.port=8002 \ No newline at end of file +socket.address=127.0.0.1 +socket.port=8002 + +http.address=127.0.0.1 +http.port=8003 +http.key=very_secret_key \ No newline at end of file