Refactor thunder providers
This commit is contained in:
parent
9fa70c0044
commit
6bfc57f114
10 changed files with 214 additions and 230 deletions
|
|
@ -1,45 +1,59 @@
|
|||
package eu.m724.wtapi.provider.thunder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import eu.m724.wtapi.object.Coordinates;
|
||||
import eu.m724.wtapi.provider.exception.ProviderException;
|
||||
|
||||
public abstract class ThunderProvider {
|
||||
private final ArrayList<Consumer<TimedStrike>> strikeHandlers = new ArrayList<>();
|
||||
private final ArrayList<Consumer<ThunderProviderEvent>> eventHandlers = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* initialize and test provider
|
||||
* @throws ProviderException
|
||||
*/
|
||||
public abstract void init() throws ProviderException;
|
||||
private long latencyMillis;
|
||||
private int latencySamplesCount;
|
||||
|
||||
/**
|
||||
* connects to remote server and starts processing data
|
||||
* @throws ProviderException
|
||||
* Connect to the provider and start to process data
|
||||
*
|
||||
* @throws ProviderException If something failed
|
||||
*/
|
||||
public abstract void start() throws ProviderException;
|
||||
|
||||
|
||||
public abstract void close();
|
||||
|
||||
|
||||
public void registerStrikeConsumer(Consumer<TimedStrike> runnable) {
|
||||
strikeHandlers.add(runnable);
|
||||
}
|
||||
|
||||
public void registerEventConsumer(Consumer<ThunderProviderEvent> runnable) {
|
||||
eventHandlers.add(runnable);
|
||||
}
|
||||
|
||||
/**
|
||||
* disconnects from remote server
|
||||
* @return Latency to the provider in milliseconds
|
||||
*/
|
||||
public abstract void stop();
|
||||
|
||||
/**
|
||||
* check for new data and call callback and stuff
|
||||
*/
|
||||
public abstract void tick();
|
||||
|
||||
public abstract void registerStrikeHandler(Consumer<Coordinates> runnable);
|
||||
|
||||
/**
|
||||
* delay between irl strike and receiving of data
|
||||
* @return delay in ms
|
||||
*/
|
||||
public abstract int getDelay();
|
||||
|
||||
/**
|
||||
* this is not {@link/getDelay}
|
||||
* @return latency to api in ms
|
||||
*/
|
||||
public abstract long getLatency();
|
||||
public long getLatency() {
|
||||
return this.latencyMillis;
|
||||
}
|
||||
|
||||
|
||||
protected void submitStrike(TimedStrike strike) {
|
||||
long now = System.nanoTime() / 1_000_000;
|
||||
|
||||
long strikeDelay = now - strike.timestamp();
|
||||
this.latencyMillis += (strikeDelay - latencyMillis) / ++latencySamplesCount;
|
||||
|
||||
strikeHandlers.forEach(con -> con.accept(strike));
|
||||
}
|
||||
|
||||
protected void submitEvent(String message) {
|
||||
submitEvent(message, null);
|
||||
}
|
||||
|
||||
protected void submitEvent(String message, Throwable exception) {
|
||||
ThunderProviderEvent event = new ThunderProviderEvent(System.currentTimeMillis(), message, exception);
|
||||
|
||||
eventHandlers.forEach(con -> con.accept(event));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,15 @@
|
|||
package eu.m724.wtapi.provider.thunder;
|
||||
|
||||
/**
|
||||
* Represents a system message from the provider.
|
||||
*
|
||||
* @param timestamp Timestamp of the event in unix millis
|
||||
* @param message Event message
|
||||
* @param exception Exception or null
|
||||
*/
|
||||
public record ThunderProviderEvent(
|
||||
long timestamp,
|
||||
String message,
|
||||
Throwable exception
|
||||
) {
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package eu.m724.wtapi.provider.thunder;
|
||||
|
||||
import eu.m724.wtapi.object.Coordinates;
|
||||
|
||||
/**
|
||||
* Represents a lightning strike
|
||||
*
|
||||
* @param timestamp Timestamp of when the strike occurred, in unix milliseconds
|
||||
* @param coordinates Coordinates of the strike
|
||||
*/
|
||||
public record TimedStrike(
|
||||
long timestamp,
|
||||
Coordinates coordinates
|
||||
) { }
|
||||
|
|
@ -1,94 +1,62 @@
|
|||
package eu.m724.wtapi.provider.thunder.impl.blitzortung;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import eu.m724.wtapi.object.Coordinates;
|
||||
import eu.m724.wtapi.provider.exception.ProviderException;
|
||||
import eu.m724.wtapi.provider.thunder.ThunderProvider;
|
||||
import eu.m724.wtapi.provider.thunder.TimedStrike;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class BlitzortungProvider extends ThunderProvider {
|
||||
BlitzortungWebsocketClient websocketClient = new BlitzortungWebsocketClient(this);
|
||||
ArrayList<Consumer<Coordinates>> strikeHandlers = new ArrayList<>();
|
||||
CopyOnWriteArrayList<TimedStrike> strikes = new CopyOnWriteArrayList<>(); // TODO optimize?
|
||||
|
||||
private long reconnectPending;
|
||||
private final int delay = 10000;
|
||||
|
||||
private long latency;
|
||||
private int latencySamplesCount;
|
||||
|
||||
@Override
|
||||
public void init() throws ProviderException {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
private final BlitzortungWebsocketClient websocketClient = new BlitzortungWebsocketClient(this);
|
||||
|
||||
@Override
|
||||
public void start() throws ProviderException {
|
||||
submitEvent("Connecting");
|
||||
|
||||
try {
|
||||
websocketClient.connectBlocking();
|
||||
} catch (InterruptedException e) {
|
||||
throw new ProviderException("unexpected interruptedexception");
|
||||
throw new ProviderException(e);
|
||||
}
|
||||
|
||||
submitEvent("Connected");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
public void close() {
|
||||
try {
|
||||
websocketClient.closeBlocking();
|
||||
} catch (InterruptedException e) {
|
||||
websocketClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerStrikeHandler(Consumer<Coordinates> runnable) {
|
||||
strikeHandlers.add(runnable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDelay() {
|
||||
return this.delay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tick() {
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
if (reconnectPending > 0) {
|
||||
if (now > reconnectPending) {
|
||||
websocketClient.reconnect();
|
||||
reconnectPending = 0;
|
||||
}
|
||||
}
|
||||
|
||||
for (TimedStrike strike : strikes) {
|
||||
if (strike.timestamp > now)
|
||||
break;
|
||||
strikeHandlers.forEach(con -> con.accept(strike.coordinates));
|
||||
strikes.remove(0);
|
||||
}
|
||||
}
|
||||
|
||||
void reconnect() {
|
||||
System.out.println("reconnecting in 5 seconds");
|
||||
reconnectPending = System.currentTimeMillis() + 3000;
|
||||
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
|
||||
|
||||
scheduledExecutorService.schedule(() -> {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
submitEvent("Reconnecting");
|
||||
websocketClient.reconnectBlocking();
|
||||
} catch (InterruptedException e) {
|
||||
submitEvent("Failed to reconnect", e);
|
||||
}
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
|
||||
submitEvent("Reconnecting in 5 seconds");
|
||||
}
|
||||
|
||||
void submitStrike(Coordinates coordinates, long timestamp) {
|
||||
long now = System.currentTimeMillis();
|
||||
long delay = now - timestamp;
|
||||
|
||||
latency += (delay - latency) / ++latencySamplesCount;
|
||||
|
||||
strikes.add(new TimedStrike(timestamp, coordinates));
|
||||
TimedStrike strike = new TimedStrike(timestamp, coordinates);
|
||||
|
||||
submitStrike(strike);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLatency() {
|
||||
return latency;
|
||||
void wsSubmitEvent(String message, Throwable exception) {
|
||||
submitEvent(message, exception);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,32 +6,35 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.gson.JsonParseException;
|
||||
import org.java_websocket.client.WebSocketClient;
|
||||
import org.java_websocket.handshake.ServerHandshake;
|
||||
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
import com.google.gson.JsonSyntaxException;
|
||||
|
||||
import eu.m724.wtapi.object.Coordinates;
|
||||
|
||||
class BlitzortungWebsocketClient extends WebSocketClient {
|
||||
BlitzortungProvider lightningMapsProvider;
|
||||
private final BlitzortungProvider blitzortungProvider;
|
||||
|
||||
private static URI[] uris = new URI[] {
|
||||
private static final URI[] uris = new URI[] {
|
||||
URI.create("wss://ws1.blitzortung.org/"),
|
||||
URI.create("wss://ws2.blitzortung.org/"),
|
||||
URI.create("wss://ws7.blitzortung.org/"),
|
||||
URI.create("wss://ws8.blitzortung.org/")
|
||||
};
|
||||
private int currentUri = 0;
|
||||
|
||||
public BlitzortungWebsocketClient(BlitzortungProvider lightningMapsProvider) {
|
||||
super(uris[0]);
|
||||
this.lightningMapsProvider = lightningMapsProvider;
|
||||
public BlitzortungWebsocketClient(BlitzortungProvider blitzortungProvider) {
|
||||
super(uris[0]); // It's rotated before reconnecting
|
||||
this.blitzortungProvider = blitzortungProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(ServerHandshake handshakedata) {
|
||||
public void onOpen(ServerHandshake handshakeData) {
|
||||
blitzortungProvider.wsSubmitEvent("Websocket connected", null);
|
||||
|
||||
this.send("{\"a\":111}");
|
||||
}
|
||||
|
||||
|
|
@ -61,14 +64,19 @@ class BlitzortungWebsocketClient extends WebSocketClient {
|
|||
JsonObject json =
|
||||
JsonParser.parseString(decode(message))
|
||||
.getAsJsonObject();
|
||||
|
||||
long time = json.getAsJsonPrimitive("timestamp").getAsLong() / 1000000;
|
||||
|
||||
if (!json.has("time")) {
|
||||
return;
|
||||
}
|
||||
|
||||
long time = json.getAsJsonPrimitive("time").getAsLong() / 1000000;
|
||||
double lat = json.getAsJsonPrimitive("lat").getAsDouble();
|
||||
double lon = json.getAsJsonPrimitive("lon").getAsDouble();
|
||||
|
||||
Coordinates coordinates = new Coordinates(lat, lon);
|
||||
lightningMapsProvider.submitStrike(coordinates, time);
|
||||
} catch (JsonSyntaxException e) {
|
||||
blitzortungProvider.submitStrike(coordinates, time);
|
||||
} catch (JsonParseException e) {
|
||||
blitzortungProvider.wsSubmitEvent("Websocket received invalid JSON", e);
|
||||
// ignore invalid json
|
||||
}
|
||||
|
||||
|
|
@ -76,16 +84,18 @@ class BlitzortungWebsocketClient extends WebSocketClient {
|
|||
|
||||
@Override
|
||||
public void onClose(int code, String reason, boolean remote) {
|
||||
System.out.printf("%s Closed: %d %s\n", this.uri, code, reason);
|
||||
blitzortungProvider.wsSubmitEvent("Websocket closed: %s".formatted(reason), null);
|
||||
|
||||
this.uri = uris[++currentUri % uris.length];
|
||||
lightningMapsProvider.reconnect();
|
||||
|
||||
blitzortungProvider.reconnect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Exception ex) {
|
||||
blitzortungProvider.wsSubmitEvent("Websocket error", ex);
|
||||
|
||||
this.uri = uris[++currentUri % uris.length];
|
||||
lightningMapsProvider.reconnect();
|
||||
blitzortungProvider.reconnect();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,20 +0,0 @@
|
|||
package eu.m724.wtapi.provider.thunder.impl.blitzortung;
|
||||
|
||||
import eu.m724.wtapi.object.Coordinates;
|
||||
|
||||
public class TimedStrike {
|
||||
/**
|
||||
* unix millis
|
||||
*/
|
||||
public long timestamp;
|
||||
|
||||
/**
|
||||
* coordinates of strike
|
||||
*/
|
||||
public Coordinates coordinates;
|
||||
|
||||
public TimedStrike(long timestamp, Coordinates coordinates) {
|
||||
this.timestamp = timestamp;
|
||||
this.coordinates = coordinates;
|
||||
}
|
||||
}
|
||||
|
|
@ -2,35 +2,37 @@ package eu.m724.wtapi.thunder;
|
|||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import eu.m724.wtapi.provider.thunder.TimedStrike;
|
||||
import org.junit.Test;
|
||||
|
||||
import eu.m724.wtapi.object.Coordinates;
|
||||
import eu.m724.wtapi.provider.thunder.ThunderProvider;
|
||||
import eu.m724.wtapi.provider.thunder.impl.blitzortung.BlitzortungProvider;
|
||||
|
||||
public class BlitzortungTest {
|
||||
private final ArrayList<TimedStrike> strikes = new ArrayList<>();
|
||||
|
||||
@Test
|
||||
public void blitzortungTest() throws InterruptedException {
|
||||
ArrayList<Coordinates> coordinatesList = new ArrayList<>();
|
||||
|
||||
ThunderProvider provider = new BlitzortungProvider();
|
||||
|
||||
provider.registerStrikeHandler(coordinatesList::add);
|
||||
|
||||
provider.init();
|
||||
|
||||
provider.registerEventConsumer((event) -> {
|
||||
System.out.println("Event: " + event.message() + " " + (event.exception() != null ? event.exception().getMessage() : ""));
|
||||
});
|
||||
|
||||
provider.registerStrikeConsumer(strikes::add);
|
||||
|
||||
provider.start();
|
||||
|
||||
for (int i=0; i < 100; i++) {
|
||||
provider.tick();
|
||||
int size = coordinatesList.size();
|
||||
int size = strikes.size();
|
||||
if (size > 0)
|
||||
System.out.printf("Last from tick: %f %f (total %d)\n", coordinatesList.get(size - 1).latitude(), coordinatesList.get(size - 1).longitude(), size);
|
||||
System.out.printf("Last: %f %f (total %d)\n", strikes.get(size - 1).coordinates().latitude(), strikes.get(size - 1).coordinates().longitude(), size);
|
||||
Thread.sleep(25);
|
||||
}
|
||||
|
||||
provider.close();
|
||||
|
||||
System.out.printf("Strikes in the last 3s: %d\n", coordinatesList.size());
|
||||
System.out.printf("Strikes in the last 3s: %d\n", strikes.size());
|
||||
System.out.printf("Latency: %dms\n", provider.getLatency());
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,78 +1,54 @@
|
|||
package eu.m724.wtapi.thunder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Random;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import eu.m724.wtapi.object.Coordinates;
|
||||
import eu.m724.wtapi.provider.exception.ProviderException;
|
||||
import eu.m724.wtapi.provider.thunder.ThunderProvider;
|
||||
import eu.m724.wtapi.provider.thunder.impl.blitzortung.TimedStrike;
|
||||
import eu.m724.wtapi.provider.thunder.TimedStrike;
|
||||
|
||||
public class MockThunderProvider extends ThunderProvider {
|
||||
ArrayList<Consumer<Coordinates>> strikeHandlers = new ArrayList<>();
|
||||
ArrayList<TimedStrike> strikes = new ArrayList<>();
|
||||
Random rnd = new Random();
|
||||
private final Random rnd = new Random();
|
||||
|
||||
@Override
|
||||
public void init() throws ProviderException {
|
||||
System.out.println("mock thunder init");
|
||||
|
||||
}
|
||||
private Future<Void> strikeFuture;
|
||||
|
||||
@Override
|
||||
public void start() throws ProviderException {
|
||||
System.out.println("mock thunder start");
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
for (int i=0; i<40; i++) {
|
||||
strikes.add(new TimedStrike(now + i * 50,
|
||||
new Coordinates(
|
||||
rnd.nextDouble(-90, 90),
|
||||
rnd.nextDouble(-180, 180)
|
||||
)));
|
||||
}
|
||||
|
||||
System.out.println("Mock thunder provider started");
|
||||
|
||||
submitEvent("Mock thunder provider started");
|
||||
|
||||
strikeFuture = Executors.newSingleThreadExecutor().submit(() -> {
|
||||
for (int i=0; i<20; i++) {
|
||||
Coordinates coordinates = new Coordinates(
|
||||
rnd.nextDouble(-90, 90),
|
||||
rnd.nextDouble(-180, 180)
|
||||
);
|
||||
TimedStrike strike = new TimedStrike(System.currentTimeMillis(), coordinates);
|
||||
|
||||
submitStrike(strike);
|
||||
}
|
||||
|
||||
submitEvent("Done ticking");
|
||||
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
System.out.println("mock thunder stop");
|
||||
|
||||
}
|
||||
public void close() {
|
||||
strikeFuture.cancel(true);
|
||||
|
||||
@Override
|
||||
public void tick() {
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
while (!strikes.isEmpty()) {
|
||||
TimedStrike str = strikes.get(0);
|
||||
if (now > str.timestamp) {
|
||||
System.out.printf("mock thunder given: %d\n", str.timestamp);
|
||||
strikeHandlers.forEach(con -> con.accept(str.coordinates));
|
||||
strikes.remove(0);
|
||||
} else break;
|
||||
}
|
||||
System.out.println("mock thunder tick");
|
||||
|
||||
}
|
||||
System.out.println("Mock thunder provider closed");
|
||||
|
||||
@Override
|
||||
public void registerStrikeHandler(Consumer<Coordinates> runnable) {
|
||||
strikeHandlers.add(runnable);
|
||||
System.out.println("mock thunder strike handler added");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDelay() {
|
||||
// TODO Auto-generated method stub
|
||||
return 10000;
|
||||
submitEvent("Mock thunder provider closed");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLatency() {
|
||||
return 690;
|
||||
return 700;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,43 @@
|
|||
package eu.m724.wtapi.thunder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import eu.m724.wtapi.provider.thunder.TimedStrike;
|
||||
import org.junit.Test;
|
||||
|
||||
import eu.m724.wtapi.provider.thunder.ThunderProvider;
|
||||
|
||||
public class MockThunderProviderTest {
|
||||
private final ArrayList<TimedStrike> strikes = new ArrayList<>();
|
||||
private volatile boolean done = false;
|
||||
|
||||
@Test
|
||||
public void mockThunderProviderTest() {
|
||||
ThunderProvider provider = new MockThunderProvider();
|
||||
|
||||
provider.registerEventConsumer((event) -> {
|
||||
if (event.message().equals("Done ticking")) {
|
||||
done = true;
|
||||
}
|
||||
|
||||
System.out.println("Event: " + event.message());
|
||||
});
|
||||
|
||||
provider.registerStrikeConsumer((strike) -> {
|
||||
strikes.add(strike);
|
||||
|
||||
int size = strikes.size();
|
||||
System.out.printf("Strike: %f %f #%d%n", strike.coordinates().latitude(), strike.coordinates().longitude(), size);
|
||||
});
|
||||
|
||||
provider.start();
|
||||
|
||||
while (!done) {
|
||||
Thread.onSpinWait();
|
||||
}
|
||||
|
||||
provider.close();
|
||||
|
||||
assert strikes.size() == 20;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,38 +0,0 @@
|
|||
package eu.m724.wtapi.thunder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import eu.m724.wtapi.object.Coordinates;
|
||||
import eu.m724.wtapi.provider.thunder.ThunderProvider;
|
||||
|
||||
public class ThunderProviderTest {
|
||||
@Test
|
||||
public void testThunderProvider() throws InterruptedException {
|
||||
ArrayList<Coordinates> coordinatesList = new ArrayList<>();
|
||||
|
||||
ThunderProvider provider = new MockThunderProvider();
|
||||
|
||||
provider.registerStrikeHandler(coordinatesList::add);
|
||||
|
||||
provider.init();
|
||||
|
||||
provider.start();
|
||||
|
||||
for (int i=0; i < 50; i++) {
|
||||
provider.tick();
|
||||
int size = coordinatesList.size();
|
||||
if (size > 0)
|
||||
System.out.printf("Last from tick: %f %f (total %d)\n", coordinatesList.get(size - 1).latitude(), coordinatesList.get(size - 1).longitude(), size);
|
||||
Thread.sleep(20);
|
||||
}
|
||||
|
||||
provider.stop();
|
||||
|
||||
System.out.printf("Strikes in the last 1s: %d\n", coordinatesList.size());
|
||||
System.out.printf("Latency: %dms\n", provider.getLatency());
|
||||
|
||||
assert coordinatesList.size() == 20; // TODO this is timestamp sensitive and fails under loaded system. Also, the entire test is suboptimal
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue