From ac3c2b0386847a06486604f6c48fde5d3bc49b30 Mon Sep 17 00:00:00 2001 From: Minecon724 Date: Wed, 28 Aug 2024 15:48:52 +0200 Subject: [PATCH] make it streamable --- src/main/java/eu/m724/Main.java | 16 ++++++++-- .../java/eu/m724/example/ExampleSource.groovy | 30 ++++++++++++++----- .../eu/m724/responsesource/ChatResponse.java | 16 ++++++---- 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/main/java/eu/m724/Main.java b/src/main/java/eu/m724/Main.java index 1c68170..a72f30a 100644 --- a/src/main/java/eu/m724/Main.java +++ b/src/main/java/eu/m724/Main.java @@ -8,15 +8,25 @@ import eu.m724.responsesource.ChatResponseSource; import groovy.lang.GroovyShell; public class Main { - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { ChatResponseSource source = new ExampleSource(); Chat chat = new Chat(); chat.messages.add(new ChatMessage(false, "hello")); ChatResponse chatResponse = source.ask(chat); + String token; + int tokens = 0; - System.out.println(chatResponse.text().join()); - System.out.println(chatResponse.message().text); + System.out.println("Streaming response now\n"); + + while (!(token = chatResponse.textQueue().take()).equals("END_OF_TEXT")) { + System.out.print(token); + tokens++; + } + + System.out.println("\n"); + System.out.printf("Tokens: %d\n", tokens); + System.out.printf("Text: %s\n", chatResponse.message().join().text); } } \ No newline at end of file diff --git a/src/main/java/eu/m724/example/ExampleSource.groovy b/src/main/java/eu/m724/example/ExampleSource.groovy index 4d86ba0..a730126 100644 --- a/src/main/java/eu/m724/example/ExampleSource.groovy +++ b/src/main/java/eu/m724/example/ExampleSource.groovy @@ -7,6 +7,9 @@ import eu.m724.responsesource.ChatResponseSource import eu.m724.responsesource.ChatResponseSourceInfo import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors +import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.LinkedBlockingQueue /** * an example chatresponsesource chatresponsesource ChatResponseSource CHATRESPONSESOURCE CAHTSERREPOSNECSOURCE @@ -15,6 +18,7 @@ import java.util.concurrent.CompletableFuture class ExampleSource implements ChatResponseSource { private ChatResponseSourceInfo info = new ChatResponseSourceInfo("yo", "ye", "1.0", 1) + private Random random = new Random() @Override ChatResponseSourceInfo info() { @@ -23,23 +27,35 @@ class ExampleSource implements ChatResponseSource { @Override ChatResponse ask(Chat chat) { - return new ChatResponse() { - String[] parts - CompletableFuture completableFuture = new CompletableFuture<>(); + String[] parts = "hello how can I assist you today".split(" ") + LinkedBlockingQueue queue = new LinkedBlockingQueue<>() + + CompletableFuture future = CompletableFuture.supplyAsync { + for (int i=0; i 0 ? " " : "") + parts[i] + queue.put(token); + Thread.sleep(random.nextInt(200, 500)) + } + + queue.put("END_OF_TEXT") + return new ChatMessage(true, parts.join(" ")) + } + + return new ChatResponse() { @Override boolean isStreaming() { return false } @Override - CompletableFuture text() { - return CompletableFuture.completedFuture("hello how can i assist you today") + LinkedBlockingQueue textQueue() { + return queue } @Override - ChatMessage message() { - return new ChatMessage(true, "i assisted you already bye") + CompletableFuture message() { + return future } } } diff --git a/src/main/java/eu/m724/responsesource/ChatResponse.java b/src/main/java/eu/m724/responsesource/ChatResponse.java index 2abb90e..9f36909 100644 --- a/src/main/java/eu/m724/responsesource/ChatResponse.java +++ b/src/main/java/eu/m724/responsesource/ChatResponse.java @@ -3,10 +3,13 @@ package eu.m724.responsesource; import eu.m724.chat.ChatMessage; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; public interface ChatResponse { /** * is this response streaming + * if it's not, the queue will get one element that is the whole response + * * @return is this response streaming */ boolean isStreaming(); @@ -14,14 +17,15 @@ public interface ChatResponse { /** * if streamed, text token by token as it goes (or other splitting depending on the source) * if not, the {@link CompletableFuture} returns just the whole response after it's ready - * @return yeah + * + * @return the fifo queue with each element being a part. null ends the sequence */ - CompletableFuture text(); // TODO completablefuture is not correct here also fix the doc + LinkedBlockingQueue textQueue(); /** - * gets the resulting {@link ChatMessage} - * TODO I think it should be available after streaming is done so maybe wrap this in {@link CompletableFuture} - * @return the resulting {@link ChatMessage} + * gets the resulting {@link ChatMessage} when it's ready + * + * @return the resulting {@link ChatMessage} as soon as the response is complete */ - ChatMessage message(); + CompletableFuture message(); }