Skip to content

Commit d38eac5

Browse files
committed
feat: add a LLMStreamingGenerator class
Convert the langchain4j StreamingResponseHandler to AsyncStream work on #31
1 parent 9d0a9a6 commit d38eac5

File tree

1 file changed

+57
-0
lines changed

1 file changed

+57
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package org.bsc.langgraph4j.langchain4j.generators;
2+
3+
import dev.langchain4j.data.message.AiMessage;
4+
import dev.langchain4j.model.StreamingResponseHandler;
5+
import dev.langchain4j.model.output.Response;
6+
import org.bsc.async.AsyncGenerator;
7+
import org.bsc.async.AsyncGeneratorQueue;
8+
9+
import java.util.concurrent.BlockingQueue;
10+
import java.util.concurrent.CompletableFuture;
11+
import java.util.concurrent.LinkedBlockingQueue;
12+
13+
import static java.util.concurrent.CompletableFuture.completedFuture;
14+
15+
public class LLMStreamingGenerator<T> implements AsyncGenerator<String> {
16+
private final AsyncGeneratorQueue.Generator<String> generator ;
17+
18+
public LLMStreamingGenerator( BlockingQueue<AsyncGenerator.Data<String>> queue ) {
19+
this.generator = new AsyncGeneratorQueue.Generator<>( queue );
20+
}
21+
22+
public LLMStreamingGenerator() {
23+
this( new LinkedBlockingQueue<>());
24+
}
25+
26+
@Override
27+
public Data<String> next() {
28+
return generator.next();
29+
}
30+
31+
public AsyncGenerator<String> generator( ) {
32+
return generator;
33+
}
34+
35+
public StreamingResponseHandler<T> handler() {
36+
return new StreamingResponseHandler<T>() {
37+
38+
@Override
39+
public void onNext(String token) {
40+
generator.queue().add( AsyncGenerator.Data.of(completedFuture(token)) );
41+
}
42+
43+
@Override
44+
public void onComplete(Response<T> response) {
45+
generator.queue().add(AsyncGenerator.Data.done());
46+
}
47+
48+
@Override
49+
public void onError(Throwable error) {
50+
CompletableFuture<String> future = new CompletableFuture<>();
51+
future.completeExceptionally(error);
52+
generator.queue().add( AsyncGenerator.Data.of(future) );
53+
}
54+
};
55+
}
56+
57+
}

0 commit comments

Comments
 (0)