Skip to content

Commit c8ae36a

Browse files
committed
feat(CompiledGraph): Refactor compiled graph edge processing
- Update edge mapping logic to handle parallel nodes and conditional edges - Introduce parallel action nodes for handling multiple targets - Remove deprecated methods getEntryPoint() and getFinishPoint() work on #72
1 parent ed475c9 commit c8ae36a

File tree

1 file changed

+56
-8
lines changed

1 file changed

+56
-8
lines changed

core/src/main/java/org/bsc/langgraph4j/CompiledGraph.java

+56-8
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
import java.util.*;
1717
import java.util.concurrent.CompletableFuture;
1818
import java.util.concurrent.CompletionException;
19+
import java.util.function.Supplier;
1920
import java.util.stream.Collectors;
21+
import java.util.stream.Stream;
2022

2123
import static java.lang.String.format;
2224
import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -59,9 +61,55 @@ protected CompiledGraph(StateGraph<State> stateGraph, CompileConfig compileConfi
5961
nodes.put(n.id(), factory.apply(compileConfig));
6062
}
6163

62-
stateGraph.edges.forEach(e ->
63-
edges.put(e.sourceId(), e.target())
64-
);
64+
for( var e : stateGraph.edges ) {
65+
var targets = e.targets();
66+
if (targets.size() == 1) {
67+
edges.put(e.sourceId(), targets.get(0));
68+
}
69+
else {
70+
Supplier<Stream<EdgeValue<State>>> parallelNodeStream = () ->
71+
targets.stream().filter( target -> nodes.containsKey(target.id()) );
72+
73+
var parallelNodeEdges = parallelNodeStream.get()
74+
.map( target -> new Edge<State>(target.id()))
75+
.filter( ee -> stateGraph.edges.contains( ee ) )
76+
.map( ee -> stateGraph.edges.indexOf( ee ) )
77+
.map( index -> stateGraph.edges.get(index) )
78+
.toList();
79+
80+
var parallelNodeTargets = parallelNodeEdges.stream()
81+
.map( ee -> ee.target().id() )
82+
.collect(Collectors.toSet());
83+
84+
if( parallelNodeTargets.size() > 1 ) {
85+
86+
var conditionalEdges = parallelNodeEdges.stream()
87+
.filter( ee -> ee.target().value() != null )
88+
.toList();
89+
if(!conditionalEdges.isEmpty()) {
90+
throw StateGraph.Errors.unsupportedConditionalEdgeOnParallelNode.exception(
91+
e.sourceId(),
92+
conditionalEdges.stream().map(Edge::sourceId).toList() );
93+
}
94+
throw StateGraph.Errors.illegalMultipleTargetsOnParallelNode.exception(e.sourceId(), parallelNodeTargets );
95+
}
96+
97+
var actions = parallelNodeStream.get()
98+
.map( target -> nodes.remove(target.id()) )
99+
.toList();
100+
101+
var parallelNode = Node.parallel( e.sourceId(), actions, stateGraph.getChannels() );
102+
103+
nodes.put( parallelNode.id(), parallelNode.actionFactory().apply(compileConfig) );
104+
105+
edges.put( e.sourceId(), new EdgeValue<>( parallelNode.id(), null ) );
106+
107+
edges.put( parallelNode.id(), new EdgeValue<>( parallelNodeTargets.iterator().next(), null ));
108+
109+
}
110+
111+
112+
}
65113
}
66114

67115
public Collection<StateSnapshot<State>> getStateHistory( RunnableConfig config ) {
@@ -145,12 +193,12 @@ public RunnableConfig updateState( RunnableConfig config, Map<String,Object> val
145193
return updateState(config, values, null);
146194
}
147195

148-
@Deprecated
196+
@Deprecated( forRemoval = true )
149197
public EdgeValue<State> getEntryPoint() {
150198
return stateGraph.getEntryPoint();
151199
}
152200

153-
@Deprecated
201+
@Deprecated( forRemoval = true )
154202
public String getFinishPoint() {
155203
return stateGraph.getFinishPoint();
156204
}
@@ -203,7 +251,8 @@ private String nextNodeId(String nodeId, Map<String,Object> state) throws Except
203251
}
204252

205253
private String getEntryPoint( Map<String,Object> state ) throws Exception {
206-
return nextNodeId(stateGraph.getEntryPoint(), state, "entryPoint");
254+
var entryPoint = this.edges.get(START);
255+
return nextNodeId(entryPoint, state, "entryPoint");
207256
}
208257

209258
private boolean shouldInterruptBefore(@NonNull String nodeId, String previousNodeId ) {
@@ -557,6 +606,5 @@ public Data<Output> next() {
557606
}
558607
}
559608

560-
561-
562609
}
610+

0 commit comments

Comments
 (0)