diff --git a/src/main/java/eu/m724/wtapi/provider/thunder/ThunderProvider.java b/src/main/java/eu/m724/wtapi/provider/thunder/ThunderProvider.java index ba209c5..770d7b5 100644 --- a/src/main/java/eu/m724/wtapi/provider/thunder/ThunderProvider.java +++ b/src/main/java/eu/m724/wtapi/provider/thunder/ThunderProvider.java @@ -1,45 +1,59 @@ package eu.m724.wtapi.provider.thunder; +import java.util.ArrayList; import java.util.function.Consumer; -import eu.m724.wtapi.object.Coordinates; import eu.m724.wtapi.provider.exception.ProviderException; public abstract class ThunderProvider { + private final ArrayList> strikeHandlers = new ArrayList<>(); + private final ArrayList> eventHandlers = new ArrayList<>(); - /** - * initialize and test provider - * @throws ProviderException - */ - public abstract void init() throws ProviderException; + private long latencyMillis; + private int latencySamplesCount; /** - * connects to remote server and starts processing data - * @throws ProviderException + * Connect to the provider and start to process data + * + * @throws ProviderException If something failed */ public abstract void start() throws ProviderException; - + + public abstract void close(); + + + public void registerStrikeConsumer(Consumer runnable) { + strikeHandlers.add(runnable); + } + + public void registerEventConsumer(Consumer runnable) { + eventHandlers.add(runnable); + } + /** - * disconnects from remote server + * @return Latency to the provider in milliseconds */ - public abstract void stop(); - - /** - * check for new data and call callback and stuff - */ - public abstract void tick(); - - public abstract void registerStrikeHandler(Consumer runnable); - - /** - * delay between irl strike and receiving of data - * @return delay in ms - */ - public abstract int getDelay(); - - /** - * this is not {@link/getDelay} - * @return latency to api in ms - */ - public abstract long getLatency(); + public long getLatency() { + return this.latencyMillis; + } + + + protected void submitStrike(TimedStrike strike) { + long now = System.nanoTime() / 1_000_000; + + long strikeDelay = now - strike.timestamp(); + this.latencyMillis += (strikeDelay - latencyMillis) / ++latencySamplesCount; + + strikeHandlers.forEach(con -> con.accept(strike)); + } + + protected void submitEvent(String message) { + submitEvent(message, null); + } + + protected void submitEvent(String message, Throwable exception) { + ThunderProviderEvent event = new ThunderProviderEvent(System.currentTimeMillis(), message, exception); + + eventHandlers.forEach(con -> con.accept(event)); + } } diff --git a/src/main/java/eu/m724/wtapi/provider/thunder/ThunderProviderEvent.java b/src/main/java/eu/m724/wtapi/provider/thunder/ThunderProviderEvent.java new file mode 100644 index 0000000..4ccf8bc --- /dev/null +++ b/src/main/java/eu/m724/wtapi/provider/thunder/ThunderProviderEvent.java @@ -0,0 +1,15 @@ +package eu.m724.wtapi.provider.thunder; + +/** + * Represents a system message from the provider. + * + * @param timestamp Timestamp of the event in unix millis + * @param message Event message + * @param exception Exception or null + */ +public record ThunderProviderEvent( + long timestamp, + String message, + Throwable exception +) { +} diff --git a/src/main/java/eu/m724/wtapi/provider/thunder/TimedStrike.java b/src/main/java/eu/m724/wtapi/provider/thunder/TimedStrike.java new file mode 100644 index 0000000..bfa6603 --- /dev/null +++ b/src/main/java/eu/m724/wtapi/provider/thunder/TimedStrike.java @@ -0,0 +1,14 @@ +package eu.m724.wtapi.provider.thunder; + +import eu.m724.wtapi.object.Coordinates; + +/** + * Represents a lightning strike + * + * @param timestamp Timestamp of when the strike occurred, in unix milliseconds + * @param coordinates Coordinates of the strike + */ +public record TimedStrike( + long timestamp, + Coordinates coordinates +) { } diff --git a/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/BlitzortungProvider.java b/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/BlitzortungProvider.java index 466996a..caf2052 100644 --- a/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/BlitzortungProvider.java +++ b/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/BlitzortungProvider.java @@ -1,94 +1,62 @@ package eu.m724.wtapi.provider.thunder.impl.blitzortung; -import java.util.ArrayList; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Consumer; - import eu.m724.wtapi.object.Coordinates; import eu.m724.wtapi.provider.exception.ProviderException; import eu.m724.wtapi.provider.thunder.ThunderProvider; +import eu.m724.wtapi.provider.thunder.TimedStrike; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; public class BlitzortungProvider extends ThunderProvider { - BlitzortungWebsocketClient websocketClient = new BlitzortungWebsocketClient(this); - ArrayList> strikeHandlers = new ArrayList<>(); - CopyOnWriteArrayList strikes = new CopyOnWriteArrayList<>(); // TODO optimize? - - private long reconnectPending; - private final int delay = 10000; - - private long latency; - private int latencySamplesCount; - - @Override - public void init() throws ProviderException { - // TODO Auto-generated method stub - - } + private final BlitzortungWebsocketClient websocketClient = new BlitzortungWebsocketClient(this); @Override public void start() throws ProviderException { + submitEvent("Connecting"); + try { websocketClient.connectBlocking(); } catch (InterruptedException e) { - throw new ProviderException("unexpected interruptedexception"); + throw new ProviderException(e); } + + submitEvent("Connected"); } @Override - public void stop() { + public void close() { try { websocketClient.closeBlocking(); } catch (InterruptedException e) { websocketClient.close(); } } - - @Override - public void registerStrikeHandler(Consumer runnable) { - strikeHandlers.add(runnable); - } - - @Override - public int getDelay() { - return this.delay; - } - - @Override - public void tick() { - long now = System.currentTimeMillis(); - - if (reconnectPending > 0) { - if (now > reconnectPending) { - websocketClient.reconnect(); - reconnectPending = 0; - } - } - - for (TimedStrike strike : strikes) { - if (strike.timestamp > now) - break; - strikeHandlers.forEach(con -> con.accept(strike.coordinates)); - strikes.remove(0); - } - } void reconnect() { - System.out.println("reconnecting in 5 seconds"); - reconnectPending = System.currentTimeMillis() + 3000; + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + + scheduledExecutorService.schedule(() -> { + try { + Thread.sleep(5000); + submitEvent("Reconnecting"); + websocketClient.reconnectBlocking(); + } catch (InterruptedException e) { + submitEvent("Failed to reconnect", e); + } + }, 5, TimeUnit.SECONDS); + + submitEvent("Reconnecting in 5 seconds"); } void submitStrike(Coordinates coordinates, long timestamp) { - long now = System.currentTimeMillis(); - long delay = now - timestamp; - - latency += (delay - latency) / ++latencySamplesCount; - - strikes.add(new TimedStrike(timestamp, coordinates)); + TimedStrike strike = new TimedStrike(timestamp, coordinates); + + submitStrike(strike); } - @Override - public long getLatency() { - return latency; + void wsSubmitEvent(String message, Throwable exception) { + submitEvent(message, exception); } - } diff --git a/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/BlitzortungWebsocketClient.java b/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/BlitzortungWebsocketClient.java index 4a06e21..c8ae7b6 100644 --- a/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/BlitzortungWebsocketClient.java +++ b/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/BlitzortungWebsocketClient.java @@ -6,32 +6,35 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.gson.JsonParseException; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; import eu.m724.wtapi.object.Coordinates; class BlitzortungWebsocketClient extends WebSocketClient { - BlitzortungProvider lightningMapsProvider; + private final BlitzortungProvider blitzortungProvider; - private static URI[] uris = new URI[] { + private static final URI[] uris = new URI[] { URI.create("wss://ws1.blitzortung.org/"), + URI.create("wss://ws2.blitzortung.org/"), URI.create("wss://ws7.blitzortung.org/"), URI.create("wss://ws8.blitzortung.org/") }; private int currentUri = 0; - public BlitzortungWebsocketClient(BlitzortungProvider lightningMapsProvider) { - super(uris[0]); - this.lightningMapsProvider = lightningMapsProvider; + public BlitzortungWebsocketClient(BlitzortungProvider blitzortungProvider) { + super(uris[0]); // It's rotated before reconnecting + this.blitzortungProvider = blitzortungProvider; } @Override - public void onOpen(ServerHandshake handshakedata) { + public void onOpen(ServerHandshake handshakeData) { + blitzortungProvider.wsSubmitEvent("Websocket connected", null); + this.send("{\"a\":111}"); } @@ -61,14 +64,19 @@ class BlitzortungWebsocketClient extends WebSocketClient { JsonObject json = JsonParser.parseString(decode(message)) .getAsJsonObject(); - - long time = json.getAsJsonPrimitive("timestamp").getAsLong() / 1000000; + + if (!json.has("time")) { + return; + } + + long time = json.getAsJsonPrimitive("time").getAsLong() / 1000000; double lat = json.getAsJsonPrimitive("lat").getAsDouble(); double lon = json.getAsJsonPrimitive("lon").getAsDouble(); Coordinates coordinates = new Coordinates(lat, lon); - lightningMapsProvider.submitStrike(coordinates, time); - } catch (JsonSyntaxException e) { + blitzortungProvider.submitStrike(coordinates, time); + } catch (JsonParseException e) { + blitzortungProvider.wsSubmitEvent("Websocket received invalid JSON", e); // ignore invalid json } @@ -76,16 +84,18 @@ class BlitzortungWebsocketClient extends WebSocketClient { @Override public void onClose(int code, String reason, boolean remote) { - System.out.printf("%s Closed: %d %s\n", this.uri, code, reason); + blitzortungProvider.wsSubmitEvent("Websocket closed: %s".formatted(reason), null); + this.uri = uris[++currentUri % uris.length]; - lightningMapsProvider.reconnect(); - + blitzortungProvider.reconnect(); } @Override public void onError(Exception ex) { + blitzortungProvider.wsSubmitEvent("Websocket error", ex); + this.uri = uris[++currentUri % uris.length]; - lightningMapsProvider.reconnect(); + blitzortungProvider.reconnect(); } } diff --git a/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/TimedStrike.java b/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/TimedStrike.java deleted file mode 100644 index 00580ff..0000000 --- a/src/main/java/eu/m724/wtapi/provider/thunder/impl/blitzortung/TimedStrike.java +++ /dev/null @@ -1,20 +0,0 @@ -package eu.m724.wtapi.provider.thunder.impl.blitzortung; - -import eu.m724.wtapi.object.Coordinates; - -public class TimedStrike { - /** - * unix millis - */ - public long timestamp; - - /** - * coordinates of strike - */ - public Coordinates coordinates; - - public TimedStrike(long timestamp, Coordinates coordinates) { - this.timestamp = timestamp; - this.coordinates = coordinates; - } -} diff --git a/src/test/java/eu/m724/wtapi/thunder/BlitzortungTest.java b/src/test/java/eu/m724/wtapi/thunder/BlitzortungTest.java index 51ce79e..3176614 100644 --- a/src/test/java/eu/m724/wtapi/thunder/BlitzortungTest.java +++ b/src/test/java/eu/m724/wtapi/thunder/BlitzortungTest.java @@ -2,35 +2,37 @@ package eu.m724.wtapi.thunder; import java.util.ArrayList; +import eu.m724.wtapi.provider.thunder.TimedStrike; import org.junit.Test; -import eu.m724.wtapi.object.Coordinates; import eu.m724.wtapi.provider.thunder.ThunderProvider; import eu.m724.wtapi.provider.thunder.impl.blitzortung.BlitzortungProvider; public class BlitzortungTest { + private final ArrayList strikes = new ArrayList<>(); + @Test public void blitzortungTest() throws InterruptedException { - ArrayList coordinatesList = new ArrayList<>(); - ThunderProvider provider = new BlitzortungProvider(); - - provider.registerStrikeHandler(coordinatesList::add); - - provider.init(); + + provider.registerEventConsumer((event) -> { + System.out.println("Event: " + event.message() + " " + (event.exception() != null ? event.exception().getMessage() : "")); + }); + + provider.registerStrikeConsumer(strikes::add); provider.start(); for (int i=0; i < 100; i++) { - provider.tick(); - int size = coordinatesList.size(); + int size = strikes.size(); if (size > 0) - System.out.printf("Last from tick: %f %f (total %d)\n", coordinatesList.get(size - 1).latitude(), coordinatesList.get(size - 1).longitude(), size); + System.out.printf("Last: %f %f (total %d)\n", strikes.get(size - 1).coordinates().latitude(), strikes.get(size - 1).coordinates().longitude(), size); Thread.sleep(25); } + + provider.close(); - System.out.printf("Strikes in the last 3s: %d\n", coordinatesList.size()); + System.out.printf("Strikes in the last 3s: %d\n", strikes.size()); System.out.printf("Latency: %dms\n", provider.getLatency()); - } } diff --git a/src/test/java/eu/m724/wtapi/thunder/MockThunderProvider.java b/src/test/java/eu/m724/wtapi/thunder/MockThunderProvider.java index f864362..9324012 100644 --- a/src/test/java/eu/m724/wtapi/thunder/MockThunderProvider.java +++ b/src/test/java/eu/m724/wtapi/thunder/MockThunderProvider.java @@ -1,78 +1,54 @@ package eu.m724.wtapi.thunder; -import java.util.ArrayList; import java.util.Random; -import java.util.function.Consumer; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import eu.m724.wtapi.object.Coordinates; import eu.m724.wtapi.provider.exception.ProviderException; import eu.m724.wtapi.provider.thunder.ThunderProvider; -import eu.m724.wtapi.provider.thunder.impl.blitzortung.TimedStrike; +import eu.m724.wtapi.provider.thunder.TimedStrike; public class MockThunderProvider extends ThunderProvider { - ArrayList> strikeHandlers = new ArrayList<>(); - ArrayList strikes = new ArrayList<>(); - Random rnd = new Random(); + private final Random rnd = new Random(); - @Override - public void init() throws ProviderException { - System.out.println("mock thunder init"); - - } + private Future strikeFuture; @Override public void start() throws ProviderException { - System.out.println("mock thunder start"); - - long now = System.currentTimeMillis(); - - for (int i=0; i<40; i++) { - strikes.add(new TimedStrike(now + i * 50, - new Coordinates( - rnd.nextDouble(-90, 90), - rnd.nextDouble(-180, 180) - ))); - } - + System.out.println("Mock thunder provider started"); + + submitEvent("Mock thunder provider started"); + + strikeFuture = Executors.newSingleThreadExecutor().submit(() -> { + for (int i=0; i<20; i++) { + Coordinates coordinates = new Coordinates( + rnd.nextDouble(-90, 90), + rnd.nextDouble(-180, 180) + ); + TimedStrike strike = new TimedStrike(System.currentTimeMillis(), coordinates); + + submitStrike(strike); + } + + submitEvent("Done ticking"); + + return null; + }); } @Override - public void stop() { - System.out.println("mock thunder stop"); - - } + public void close() { + strikeFuture.cancel(true); - @Override - public void tick() { - long now = System.currentTimeMillis(); - - while (!strikes.isEmpty()) { - TimedStrike str = strikes.get(0); - if (now > str.timestamp) { - System.out.printf("mock thunder given: %d\n", str.timestamp); - strikeHandlers.forEach(con -> con.accept(str.coordinates)); - strikes.remove(0); - } else break; - } - System.out.println("mock thunder tick"); - - } + System.out.println("Mock thunder provider closed"); - @Override - public void registerStrikeHandler(Consumer runnable) { - strikeHandlers.add(runnable); - System.out.println("mock thunder strike handler added"); - } - - @Override - public int getDelay() { - // TODO Auto-generated method stub - return 10000; + submitEvent("Mock thunder provider closed"); } @Override public long getLatency() { - return 690; + return 700; } } diff --git a/src/test/java/eu/m724/wtapi/thunder/MockThunderProviderTest.java b/src/test/java/eu/m724/wtapi/thunder/MockThunderProviderTest.java new file mode 100644 index 0000000..9023cd7 --- /dev/null +++ b/src/test/java/eu/m724/wtapi/thunder/MockThunderProviderTest.java @@ -0,0 +1,43 @@ +package eu.m724.wtapi.thunder; + +import java.util.ArrayList; + +import eu.m724.wtapi.provider.thunder.TimedStrike; +import org.junit.Test; + +import eu.m724.wtapi.provider.thunder.ThunderProvider; + +public class MockThunderProviderTest { + private final ArrayList strikes = new ArrayList<>(); + private volatile boolean done = false; + + @Test + public void mockThunderProviderTest() { + ThunderProvider provider = new MockThunderProvider(); + + provider.registerEventConsumer((event) -> { + if (event.message().equals("Done ticking")) { + done = true; + } + + System.out.println("Event: " + event.message()); + }); + + provider.registerStrikeConsumer((strike) -> { + strikes.add(strike); + + int size = strikes.size(); + System.out.printf("Strike: %f %f #%d%n", strike.coordinates().latitude(), strike.coordinates().longitude(), size); + }); + + provider.start(); + + while (!done) { + Thread.onSpinWait(); + } + + provider.close(); + + assert strikes.size() == 20; + } +} diff --git a/src/test/java/eu/m724/wtapi/thunder/ThunderProviderTest.java b/src/test/java/eu/m724/wtapi/thunder/ThunderProviderTest.java deleted file mode 100644 index 6778d7d..0000000 --- a/src/test/java/eu/m724/wtapi/thunder/ThunderProviderTest.java +++ /dev/null @@ -1,38 +0,0 @@ -package eu.m724.wtapi.thunder; - -import java.util.ArrayList; - -import org.junit.Test; - -import eu.m724.wtapi.object.Coordinates; -import eu.m724.wtapi.provider.thunder.ThunderProvider; - -public class ThunderProviderTest { - @Test - public void testThunderProvider() throws InterruptedException { - ArrayList coordinatesList = new ArrayList<>(); - - ThunderProvider provider = new MockThunderProvider(); - - provider.registerStrikeHandler(coordinatesList::add); - - provider.init(); - - provider.start(); - - for (int i=0; i < 50; i++) { - provider.tick(); - int size = coordinatesList.size(); - if (size > 0) - System.out.printf("Last from tick: %f %f (total %d)\n", coordinatesList.get(size - 1).latitude(), coordinatesList.get(size - 1).longitude(), size); - Thread.sleep(20); - } - - provider.stop(); - - System.out.printf("Strikes in the last 1s: %d\n", coordinatesList.size()); - System.out.printf("Latency: %dms\n", provider.getLatency()); - - assert coordinatesList.size() == 20; // TODO this is timestamp sensitive and fails under loaded system. Also, the entire test is suboptimal - } -}