make it streamable

This commit is contained in:
Minecon724 2024-08-28 15:48:52 +02:00
parent ed10ade455
commit ac3c2b0386
Signed by: Minecon724
GPG key ID: 3CCC4D267742C8E8
3 changed files with 46 additions and 16 deletions

View file

@ -8,15 +8,25 @@ import eu.m724.responsesource.ChatResponseSource;
import groovy.lang.GroovyShell; import groovy.lang.GroovyShell;
public class Main { public class Main {
public static void main(String[] args) { public static void main(String[] args) throws InterruptedException {
ChatResponseSource source = new ExampleSource(); ChatResponseSource source = new ExampleSource();
Chat chat = new Chat(); Chat chat = new Chat();
chat.messages.add(new ChatMessage(false, "hello")); chat.messages.add(new ChatMessage(false, "hello"));
ChatResponse chatResponse = source.ask(chat); ChatResponse chatResponse = source.ask(chat);
String token;
int tokens = 0;
System.out.println(chatResponse.text().join()); System.out.println("Streaming response now\n");
System.out.println(chatResponse.message().text);
while (!(token = chatResponse.textQueue().take()).equals("END_OF_TEXT")) {
System.out.print(token);
tokens++;
}
System.out.println("\n");
System.out.printf("Tokens: %d\n", tokens);
System.out.printf("Text: %s\n", chatResponse.message().join().text);
} }
} }

View file

@ -7,6 +7,9 @@ import eu.m724.responsesource.ChatResponseSource
import eu.m724.responsesource.ChatResponseSourceInfo import eu.m724.responsesource.ChatResponseSourceInfo
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.LinkedBlockingQueue
/** /**
* an example chatresponsesource chatresponsesource ChatResponseSource CHATRESPONSESOURCE CAHTSERREPOSNECSOURCE * an example chatresponsesource chatresponsesource ChatResponseSource CHATRESPONSESOURCE CAHTSERREPOSNECSOURCE
@ -15,6 +18,7 @@ import java.util.concurrent.CompletableFuture
class ExampleSource implements ChatResponseSource { class ExampleSource implements ChatResponseSource {
private ChatResponseSourceInfo info = private ChatResponseSourceInfo info =
new ChatResponseSourceInfo("yo", "ye", "1.0", 1) new ChatResponseSourceInfo("yo", "ye", "1.0", 1)
private Random random = new Random()
@Override @Override
ChatResponseSourceInfo info() { ChatResponseSourceInfo info() {
@ -23,23 +27,35 @@ class ExampleSource implements ChatResponseSource {
@Override @Override
ChatResponse ask(Chat chat) { ChatResponse ask(Chat chat) {
return new ChatResponse() { String[] parts = "hello how can I assist you today".split(" ")
String[] parts
CompletableFuture<String> completableFuture = new CompletableFuture<>();
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>()
CompletableFuture<ChatMessage> future = CompletableFuture.supplyAsync {
for (int i=0; i<parts.length; i++) {
String token = (i > 0 ? " " : "") + parts[i]
queue.put(token);
Thread.sleep(random.nextInt(200, 500))
}
queue.put("END_OF_TEXT")
return new ChatMessage(true, parts.join(" "))
}
return new ChatResponse() {
@Override @Override
boolean isStreaming() { boolean isStreaming() {
return false return false
} }
@Override @Override
CompletableFuture<String> text() { LinkedBlockingQueue<String> textQueue() {
return CompletableFuture.completedFuture("hello how can i assist you today") return queue
} }
@Override @Override
ChatMessage message() { CompletableFuture<ChatMessage> message() {
return new ChatMessage(true, "i assisted you already bye") return future
} }
} }
} }

View file

@ -3,10 +3,13 @@ package eu.m724.responsesource;
import eu.m724.chat.ChatMessage; import eu.m724.chat.ChatMessage;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
public interface ChatResponse { public interface ChatResponse {
/** /**
* is this response streaming * is this response streaming
* if it's not, the queue will get one element that is the whole response
*
* @return is this response streaming * @return is this response streaming
*/ */
boolean isStreaming(); boolean isStreaming();
@ -14,14 +17,15 @@ public interface ChatResponse {
/** /**
* if streamed, text token by token as it goes (or other splitting depending on the source) * if streamed, text token by token as it goes (or other splitting depending on the source)
* if not, the {@link CompletableFuture} returns just the whole response after it's ready * if not, the {@link CompletableFuture} returns just the whole response after it's ready
* @return yeah *
* @return the fifo queue with each element being a part. null ends the sequence
*/ */
CompletableFuture<String> text(); // TODO completablefuture is not correct here also fix the doc LinkedBlockingQueue<String> textQueue();
/** /**
* gets the resulting {@link ChatMessage} * gets the resulting {@link ChatMessage} when it's ready
* TODO I think it should be available after streaming is done so maybe wrap this in {@link CompletableFuture} *
* @return the resulting {@link ChatMessage} * @return the resulting {@link ChatMessage} as soon as the response is complete
*/ */
ChatMessage message(); CompletableFuture<ChatMessage> message();
} }