Skip to content

Commit c838f61

Browse files
committed
feat: make LLMStreamingGenerator an implementation of AsyncGenerator.WithResult
work on #31
1 parent 49eea02 commit c838f61

File tree

1 file changed

+12
-15
lines changed

1 file changed

+12
-15
lines changed

langchain4j/src/main/java/org/bsc/langgraph4j/langchain4j/generators/LLMStreamingGenerator.java

+12-15
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import dev.langchain4j.model.StreamingResponseHandler;
44
import dev.langchain4j.model.output.Response;
5+
import lombok.extern.slf4j.Slf4j;
56
import org.bsc.async.AsyncGenerator;
67
import org.bsc.async.AsyncGeneratorQueue;
78

@@ -11,44 +12,40 @@
1112

1213
import static java.util.concurrent.CompletableFuture.completedFuture;
1314

14-
public class LLMStreamingGenerator<T> implements AsyncGenerator<String> {
15-
private final AsyncGeneratorQueue.Generator<String> generator ;
15+
@Slf4j
16+
public class LLMStreamingGenerator<T> extends AsyncGenerator.WithResult<String> {
17+
final BlockingQueue<AsyncGenerator.Data<String>> queue;
1618

1719
public LLMStreamingGenerator( BlockingQueue<AsyncGenerator.Data<String>> queue ) {
18-
this.generator = new AsyncGeneratorQueue.Generator<>( queue );
20+
super(new AsyncGeneratorQueue.Generator<>( queue ));
21+
this.queue = queue;
1922
}
2023

2124
public LLMStreamingGenerator() {
2225
this( new LinkedBlockingQueue<>());
2326
}
2427

25-
@Override
26-
public Data<String> next() {
27-
return generator.next();
28-
}
29-
30-
public AsyncGenerator<String> generator( ) {
31-
return generator;
32-
}
33-
3428
public StreamingResponseHandler<T> handler() {
3529
return new StreamingResponseHandler<T>() {
3630

3731
@Override
3832
public void onNext(String token) {
39-
generator.queue().add( AsyncGenerator.Data.of(completedFuture(token)) );
33+
log.trace("onNext: {}", token);
34+
queue.add( AsyncGenerator.Data.of(completedFuture(token)) );
4035
}
4136

4237
@Override
4338
public void onComplete(Response<T> response) {
44-
generator.queue().add(AsyncGenerator.Data.done());
39+
log.trace("onComplete: {}", response);
40+
queue.add(AsyncGenerator.Data.done(response));
4541
}
4642

4743
@Override
4844
public void onError(Throwable error) {
45+
log.trace("onError", error);
4946
CompletableFuture<String> future = new CompletableFuture<>();
5047
future.completeExceptionally(error);
51-
generator.queue().add( AsyncGenerator.Data.of(future) );
48+
queue.add( AsyncGenerator.Data.of(future) );
5249
}
5350
};
5451
}

0 commit comments

Comments
 (0)