diff --git a/pom.xml b/pom.xml
index e25da8f..db1d5c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,6 +25,11 @@
ipaddress
5.5.1
+
+ org.json
+ json
+ 20240303
+
diff --git a/src/main/java/eu/m724/autopeerer/client/PacketHandler.java b/src/main/java/eu/m724/autopeerer/client/PacketHandler.java
index c979700..b047a1c 100644
--- a/src/main/java/eu/m724/autopeerer/client/PacketHandler.java
+++ b/src/main/java/eu/m724/autopeerer/client/PacketHandler.java
@@ -5,10 +5,10 @@ import eu.m724.autopeerer.client.wireguard.WireGuardLive;
import eu.m724.autopeerer.client.wireguard.WireGuardSession;
import eu.m724.autopeerer.common.packet.Packet;
import eu.m724.autopeerer.common.packet.Packets;
-import eu.m724.autopeerer.common.packet.c2s.PingRequestPacket;
-import eu.m724.autopeerer.common.packet.c2s.SessionRequestPacket;
-import eu.m724.autopeerer.common.packet.s2c.PingResponsePacket;
-import eu.m724.autopeerer.common.packet.s2c.SessionResponsePacket;
+import eu.m724.autopeerer.common.packet.s2c.PingRequestPacket;
+import eu.m724.autopeerer.common.packet.s2c.SessionRequestPacket;
+import eu.m724.autopeerer.common.packet.c2s.PingResponsePacket;
+import eu.m724.autopeerer.common.packet.c2s.SessionResponsePacket;
import java.io.BufferedReader;
import java.io.IOException;
diff --git a/src/main/java/eu/m724/autopeerer/common/packet/Packets.java b/src/main/java/eu/m724/autopeerer/common/packet/Packets.java
index 1681b50..01b0528 100644
--- a/src/main/java/eu/m724/autopeerer/common/packet/Packets.java
+++ b/src/main/java/eu/m724/autopeerer/common/packet/Packets.java
@@ -1,11 +1,11 @@
package eu.m724.autopeerer.common.packet;
import eu.m724.autopeerer.common.packet.c2s.LoginPacket;
-import eu.m724.autopeerer.common.packet.c2s.PingRequestPacket;
-import eu.m724.autopeerer.common.packet.c2s.SessionRequestPacket;
+import eu.m724.autopeerer.common.packet.s2c.PingRequestPacket;
+import eu.m724.autopeerer.common.packet.s2c.SessionRequestPacket;
import eu.m724.autopeerer.common.packet.s2c.LoginResponsePacket;
-import eu.m724.autopeerer.common.packet.s2c.PingResponsePacket;
-import eu.m724.autopeerer.common.packet.s2c.SessionResponsePacket;
+import eu.m724.autopeerer.common.packet.c2s.PingResponsePacket;
+import eu.m724.autopeerer.common.packet.c2s.SessionResponsePacket;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
diff --git a/src/main/java/eu/m724/autopeerer/common/packet/s2c/PingResponsePacket.java b/src/main/java/eu/m724/autopeerer/common/packet/c2s/PingResponsePacket.java
similarity index 97%
rename from src/main/java/eu/m724/autopeerer/common/packet/s2c/PingResponsePacket.java
rename to src/main/java/eu/m724/autopeerer/common/packet/c2s/PingResponsePacket.java
index 010c06d..ea3b35c 100644
--- a/src/main/java/eu/m724/autopeerer/common/packet/s2c/PingResponsePacket.java
+++ b/src/main/java/eu/m724/autopeerer/common/packet/c2s/PingResponsePacket.java
@@ -1,4 +1,4 @@
-package eu.m724.autopeerer.common.packet.s2c;
+package eu.m724.autopeerer.common.packet.c2s;
import eu.m724.autopeerer.common.packet.Packet;
diff --git a/src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionResponsePacket.java b/src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionResponsePacket.java
similarity index 97%
rename from src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionResponsePacket.java
rename to src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionResponsePacket.java
index 52bdc4b..9c135a7 100644
--- a/src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionResponsePacket.java
+++ b/src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionResponsePacket.java
@@ -1,4 +1,4 @@
-package eu.m724.autopeerer.common.packet.s2c;
+package eu.m724.autopeerer.common.packet.c2s;
import eu.m724.autopeerer.common.packet.Packet;
diff --git a/src/main/java/eu/m724/autopeerer/common/packet/c2s/PingRequestPacket.java b/src/main/java/eu/m724/autopeerer/common/packet/s2c/PingRequestPacket.java
similarity index 96%
rename from src/main/java/eu/m724/autopeerer/common/packet/c2s/PingRequestPacket.java
rename to src/main/java/eu/m724/autopeerer/common/packet/s2c/PingRequestPacket.java
index c1432c7..dea0978 100644
--- a/src/main/java/eu/m724/autopeerer/common/packet/c2s/PingRequestPacket.java
+++ b/src/main/java/eu/m724/autopeerer/common/packet/s2c/PingRequestPacket.java
@@ -1,4 +1,4 @@
-package eu.m724.autopeerer.common.packet.c2s;
+package eu.m724.autopeerer.common.packet.s2c;
import eu.m724.autopeerer.common.packet.Packet;
diff --git a/src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionRequestPacket.java b/src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionRequestPacket.java
similarity index 98%
rename from src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionRequestPacket.java
rename to src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionRequestPacket.java
index 28520af..8a92d1b 100644
--- a/src/main/java/eu/m724/autopeerer/common/packet/c2s/SessionRequestPacket.java
+++ b/src/main/java/eu/m724/autopeerer/common/packet/s2c/SessionRequestPacket.java
@@ -1,4 +1,4 @@
-package eu.m724.autopeerer.common.packet.c2s;
+package eu.m724.autopeerer.common.packet.s2c;
import eu.m724.autopeerer.common.packet.Packet;
import inet.ipaddr.HostName;
diff --git a/src/main/java/eu/m724/autopeerer/server/ClientState.java b/src/main/java/eu/m724/autopeerer/server/ClientState.java
index c9ee405..d003d73 100644
--- a/src/main/java/eu/m724/autopeerer/server/ClientState.java
+++ b/src/main/java/eu/m724/autopeerer/server/ClientState.java
@@ -2,9 +2,19 @@ package eu.m724.autopeerer.server;
import eu.m724.autopeerer.common.packet.Packet;
import eu.m724.autopeerer.common.packet.Packets;
+import eu.m724.autopeerer.common.packet.c2s.PingResponsePacket;
+import eu.m724.autopeerer.common.packet.c2s.SessionResponsePacket;
+import eu.m724.autopeerer.common.packet.s2c.PingRequestPacket;
import org.java_websocket.WebSocket;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
public class ClientState {
@@ -28,4 +38,38 @@ public class ClientState {
void end() {
socket.close();
}
+
+ CompletableFuture ping(InetAddress host) {
+ var future = new CompletableFuture();
+
+ var id = (short) (ThreadLocalRandom.current().nextInt() & 0xFFFF);
+
+ pingConsumers.put(id, future::complete);
+ send(new PingRequestPacket(id, host));
+
+ return future;
+ }
+
+
+ /* Packet functions */
+
+ private Map> pingConsumers = new HashMap<>();
+
+
+ void onPacketReceived(Packet> p) {
+ if (p instanceof PingResponsePacket packet) {
+ handlePingResponse(packet);
+ } else if (p instanceof SessionResponsePacket packet) {
+ handleSessionResponse(packet);
+ }
+ }
+
+ private void handlePingResponse(PingResponsePacket packet) {
+ System.out.printf("[%d] Ping response #%d: %s avg %.3f / mdev %.3f ms\n", clientId, packet.requestId, packet.status, packet.average, packet.meanDeviation);
+ pingConsumers.remove(packet.requestId).accept(packet);
+ }
+
+ private void handleSessionResponse(SessionResponsePacket packet) {
+ System.out.printf("[%d] Session response #%d: %s\n", clientId, packet.sessionId, packet.result);
+ }
}
diff --git a/src/main/java/eu/m724/autopeerer/server/Main.java b/src/main/java/eu/m724/autopeerer/server/Main.java
index 215ea8b..8ded69c 100644
--- a/src/main/java/eu/m724/autopeerer/server/Main.java
+++ b/src/main/java/eu/m724/autopeerer/server/Main.java
@@ -1,11 +1,13 @@
package eu.m724.autopeerer.server;
+import com.sun.net.httpserver.HttpServer;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Set;
public class Main {
- public static void main(String[] args) throws InterruptedException {
+ public static void main(String[] args) throws IOException {
System.out.println("Hello world!");
var config = new ServerConfiguration();
@@ -23,10 +25,21 @@ public class Main {
var packetHandler = new PacketHandler(nodes);
var server = new MyWebsocketServer(
- new InetSocketAddress(config.getString("listen.address"), config.getInt("listen.port")),
+ new InetSocketAddress(config.getString("socket.address"), config.getInt("socket.port")),
packetHandler
);
server.start();
+
+ HttpServer httpServer = HttpServer.create(
+ new InetSocketAddress(config.getString("http.address"), config.getInt("http.port")),
+ 0
+ );
+
+ httpServer.createContext("/", new MyHttpHandler(nodes, config.getString("http.key")));
+ httpServer.setExecutor(null);
+ httpServer.start();
+
+ System.out.println("HTTP server started on " + httpServer.getAddress().getHostString() + ":" + httpServer.getAddress().getPort());
}
}
diff --git a/src/main/java/eu/m724/autopeerer/server/MyHttpHandler.java b/src/main/java/eu/m724/autopeerer/server/MyHttpHandler.java
new file mode 100644
index 0000000..14e7a00
--- /dev/null
+++ b/src/main/java/eu/m724/autopeerer/server/MyHttpHandler.java
@@ -0,0 +1,195 @@
+package eu.m724.autopeerer.server;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import eu.m724.autopeerer.common.packet.c2s.PingResponsePacket;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+public class MyHttpHandler implements HttpHandler {
+ private final Set nodes;
+ private final String key;
+
+ public MyHttpHandler(Set nodes, String key) {
+ this.nodes = nodes;
+ this.key = key;
+ }
+
+ @Override
+ public void handle(HttpExchange exchange) throws IOException {
+ String key = exchange.getRequestHeaders().getFirst("key");
+
+ if (!this.key.equals(key)) {
+ exchange.sendResponseHeaders(403, -1);
+ return;
+ }
+
+ try {
+ inner(exchange);
+ } catch (IOException e) {
+ exchange.sendResponseHeaders(500, -1);
+ } catch (EndException e) { }
+ }
+
+ private void inner(HttpExchange exchange) throws IOException, EndException {
+ String[] path = exchange.getRequestURI().getPath().substring(1).split("/");
+ if (path.length == 0) {
+ sendResponse(exchange, 200, "Hello");
+ } else {
+ if (path[0].equals("nodes")) {
+ requireMethod(exchange, "GET");
+
+ var json = new JSONObject();
+ nodes.forEach(node -> {
+ var val = new JSONObject()
+ .put("name", node.name())
+ .put("host", node.host())
+ .put("online", node.connected());
+ json.put(node.id(), val);
+ });
+
+ sendResponse(exchange, 200, json);
+ } else if (path[0].equals("ping")) {
+ requireMethod(exchange, "POST");
+
+ var json = getJsonBody(exchange);
+
+ Set selectedNodes = this.nodes;
+ if (json.has("node")) {
+ selectedNodes = nodes.stream()
+ .filter(n -> n.id().equals(json.getString("node")))
+ .collect(Collectors.toSet());
+
+ if (selectedNodes.isEmpty()) // no node with that ID found
+ sendResponse(exchange, 400, "node");
+ }
+
+ InetAddress target = null;
+ try {
+ target = InetAddress.getByName(json.getString("target"));
+ } catch (JSONException | UnknownHostException e) {
+ sendResponse(exchange, 400, "target");
+ }
+
+ sseStart(exchange);
+
+ var futures = new HashSet>();
+
+ for (Node node : selectedNodes) {
+ if (!node.connected()) { // node is offline
+ var response = new JSONObject()
+ .put("node", node.id())
+ .put("status", "OFFLINE");
+ sseWrite(exchange, response.toString());
+ continue;
+ }
+
+ var future = node.getClient().ping(target).handle((result, ex) -> {
+ try {
+ if (ex != null) {
+ sendResponse(exchange, 500);
+ }
+ var response = new JSONObject()
+ .put("node", node.id())
+ .put("status", result.status);
+
+ if (result.status == PingResponsePacket.PingResponseStatus.OK) {
+ response.put("average", result.average)
+ .put("meanDeviation", result.meanDeviation);
+ }
+
+ sseWrite(exchange, response.toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (EndException e) { }
+
+ return (Void) null;
+ });
+
+ futures.add(future);
+ }
+
+ CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).handle((v, ex) -> {
+ // TODO I don't know what to do with the errors
+
+ if (ex != null) {
+ ex.printStackTrace();
+ }
+
+ try {
+ sseClose(exchange);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ });
+ } else if (path[0].equals("peer")) {
+
+ }
+ }
+ }
+
+ private void sseStart(HttpExchange exchange) throws IOException {
+ exchange.getResponseHeaders().add("Content-Type", "text/event-stream; charset=utf-8");
+
+ exchange.sendResponseHeaders(200, 0); // 0 for chunked encoding
+ }
+
+ private void sseWrite(HttpExchange exchange, String message) throws IOException {
+ var body = exchange.getResponseBody();
+ body.write(("data: " + message + "\n\n").getBytes(StandardCharsets.UTF_8));
+ body.flush();
+ }
+
+ private void sseClose(HttpExchange exchange) throws IOException {
+ exchange.getResponseBody().close();
+ }
+
+ private JSONObject getJsonBody(HttpExchange exchange) throws IOException {
+ String body;
+ try (var is = exchange.getRequestBody()) {
+ body = new String(is.readAllBytes());
+ }
+
+ return new JSONObject(body);
+ }
+
+ private void requireMethod(HttpExchange exchange, String method) throws IOException, EndException {
+ if (!exchange.getRequestMethod().equals(method)) {
+ sendResponse(exchange, 405);
+ }
+ }
+
+ private void sendResponse(HttpExchange exchange, int statusCode) throws IOException, EndException {
+ exchange.sendResponseHeaders(statusCode, -1);
+ throw new EndException();
+ }
+
+ private void sendResponse(HttpExchange exchange, int statusCode, Object response) throws IOException, EndException {
+ var type = (response instanceof JSONObject || response instanceof JSONArray) ? "application/json; charset=utf8" : "text/plain; charset=utf8";
+ exchange.getResponseHeaders().add("Content-Type", type);
+
+ byte[] responseBytes = response.toString().getBytes(StandardCharsets.UTF_8);
+ exchange.sendResponseHeaders(statusCode, responseBytes.length);
+
+ OutputStream outputStream = exchange.getResponseBody();
+ outputStream.write(responseBytes);
+ outputStream.close();
+
+ throw new EndException();
+ }
+
+ private static class EndException extends Exception { }
+}
diff --git a/src/main/java/eu/m724/autopeerer/server/MyWebsocketServer.java b/src/main/java/eu/m724/autopeerer/server/MyWebsocketServer.java
index 2397a35..4981fac 100644
--- a/src/main/java/eu/m724/autopeerer/server/MyWebsocketServer.java
+++ b/src/main/java/eu/m724/autopeerer/server/MyWebsocketServer.java
@@ -1,7 +1,7 @@
package eu.m724.autopeerer.server;
-import eu.m724.autopeerer.common.packet.c2s.PingRequestPacket;
-import eu.m724.autopeerer.common.packet.c2s.SessionRequestPacket;
+import eu.m724.autopeerer.common.packet.s2c.PingRequestPacket;
+import eu.m724.autopeerer.common.packet.s2c.SessionRequestPacket;
import inet.ipaddr.IPAddressString;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
@@ -35,10 +35,9 @@ public class MyWebsocketServer extends WebSocketServer {
System.out.printf("[%d] Connected: %s\n", id, conn.getRemoteSocketAddress().getHostString());
// TODO testing
- try {
- state.send(new PingRequestPacket((short) 1, InetAddress.getByName("1.1.1.1")));
+ /*state.send(new PingRequestPacket((short) 1, InetAddress.getByName("1.1.1.1")));
state.send(new PingRequestPacket((short) 2, InetAddress.getByName("1.2.3.4")));
- state.send(new PingRequestPacket((short) 3, InetAddress.getByName("1.1.1.2")));
+ state.send(new PingRequestPacket((short) 3, InetAddress.getByName("1.1.1.2")));*/
state.send(new SessionRequestPacket(
(short) 1,
@@ -48,14 +47,12 @@ public class MyWebsocketServer extends WebSocketServer {
51820,
4242420000L
));
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
}
@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
var state = states.remove(conn);
+ state.node.setClient(null);
var id = state.authenticated ? state.node.id() : conn.getRemoteSocketAddress().getHostString();
System.out.printf("[%d] Disconnected: %s\n", state.clientId, id);
}
diff --git a/src/main/java/eu/m724/autopeerer/server/Node.java b/src/main/java/eu/m724/autopeerer/server/Node.java
index 876a21d..fc0a3da 100644
--- a/src/main/java/eu/m724/autopeerer/server/Node.java
+++ b/src/main/java/eu/m724/autopeerer/server/Node.java
@@ -1,14 +1,48 @@
package eu.m724.autopeerer.server;
+import eu.m724.autopeerer.common.packet.Packet;
+import eu.m724.autopeerer.common.packet.Packets;
+
import java.util.Base64;
+import java.util.Objects;
import java.util.Properties;
-public record Node(
- String id,
- byte[] key,
- String name,
- String host
-) {
+public final class Node {
+ private final String id;
+ private final byte[] key;
+ private final String name;
+ private final String host;
+
+ private ClientState client;
+
+ public Node(
+ String id,
+ byte[] key,
+ String name,
+ String host
+ ) {
+ this.id = id;
+ this.key = key;
+ this.name = name;
+ this.host = host;
+ }
+
+ void setClient(ClientState client) {
+ this.client = client;
+ }
+
+ ClientState getClient() {
+ return client;
+ }
+
+ boolean connected() {
+ return client != null;
+ }
+
+ void send(Packet> packet) {
+ client.send(packet);
+ }
+
static Node fromProperties(String id, Properties properties) {
return new Node(
id,
@@ -17,4 +51,46 @@ public record Node(
properties.getProperty("host")
);
}
+
+ public String id() {
+ return id;
+ }
+
+ public byte[] key() {
+ return key;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public String host() {
+ return host;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) return true;
+ if (obj == null || obj.getClass() != this.getClass()) return false;
+ var that = (Node) obj;
+ return Objects.equals(this.id, that.id) &&
+ Objects.equals(this.key, that.key) &&
+ Objects.equals(this.name, that.name) &&
+ Objects.equals(this.host, that.host);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, key, name, host);
+ }
+
+ @Override
+ public String toString() {
+ return "Node[" +
+ "id=" + id + ", " +
+ "key=" + key + ", " +
+ "name=" + name + ", " +
+ "host=" + host + ']';
+ }
+
}
diff --git a/src/main/java/eu/m724/autopeerer/server/PacketHandler.java b/src/main/java/eu/m724/autopeerer/server/PacketHandler.java
index 4147ac3..71be9f2 100644
--- a/src/main/java/eu/m724/autopeerer/server/PacketHandler.java
+++ b/src/main/java/eu/m724/autopeerer/server/PacketHandler.java
@@ -3,8 +3,8 @@ package eu.m724.autopeerer.server;
import eu.m724.autopeerer.common.packet.Packet;
import eu.m724.autopeerer.common.packet.Packets;
import eu.m724.autopeerer.common.packet.c2s.LoginPacket;
-import eu.m724.autopeerer.common.packet.s2c.PingResponsePacket;
-import eu.m724.autopeerer.common.packet.s2c.SessionResponsePacket;
+import eu.m724.autopeerer.common.packet.c2s.PingResponsePacket;
+import eu.m724.autopeerer.common.packet.c2s.SessionResponsePacket;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
@@ -46,10 +46,10 @@ public class PacketHandler {
return;
}
- if (p instanceof PingResponsePacket packet) {
- handlePingResponse(state, packet);
- } else if (p instanceof SessionResponsePacket packet) {
- handleSessionResponse(state, packet);
+ try {
+ state.onPacketReceived(p);
+ } catch (Exception e) {
+ throw new RuntimeException("Exception handling packet by ClientState", e);
}
}
@@ -73,6 +73,7 @@ public class PacketHandler {
System.out.printf("[%d] Logged in as %s\n", state.clientId, node.getKey().id());
nodes.put(node.getKey(), state);
+ node.getKey().setClient(state);
state.authenticated = true;
state.node = node.getKey();
} else {
@@ -80,12 +81,4 @@ public class PacketHandler {
state.end();
}
}
-
- private void handlePingResponse(ClientState state, PingResponsePacket packet) {
- System.out.printf("[%d] Ping response #%d: %s avg %.3f / mdev %.3f ms\n", state.clientId, packet.requestId, packet.status, packet.average, packet.meanDeviation);
- }
-
- private void handleSessionResponse(ClientState state, SessionResponsePacket packet) {
- System.out.printf("[%d] Session response #%d: %s\n", state.clientId, packet.sessionId, packet.result);
- }
}
diff --git a/src/main/resources/server.properties b/src/main/resources/server.properties
index 84ea1a0..32d8a99 100644
--- a/src/main/resources/server.properties
+++ b/src/main/resources/server.properties
@@ -1,2 +1,6 @@
-listen.address=127.0.0.1
-listen.port=8002
\ No newline at end of file
+socket.address=127.0.0.1
+socket.port=8002
+
+http.address=127.0.0.1
+http.port=8003
+http.key=very_secret_key
\ No newline at end of file