Skip to content

Commit 0f1a5bc

Browse files
committed
fix: synchronize buffering and committing
1 parent d95feb6 commit 0f1a5bc

File tree

1 file changed

+26
-17
lines changed

1 file changed

+26
-17
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

+26-17
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import java.util.concurrent.Executor;
6262
import java.util.concurrent.TimeUnit;
6363
import java.util.concurrent.TimeoutException;
64-
import java.util.concurrent.atomic.AtomicBoolean;
6564
import java.util.concurrent.atomic.AtomicInteger;
6665
import java.util.logging.Level;
6766
import java.util.logging.Logger;
@@ -152,7 +151,10 @@ public void removeListener(Runnable listener) {
152151
}
153152
}
154153

155-
private final AtomicBoolean committing = new AtomicBoolean();
154+
private final Object committingLock = new Object();
155+
156+
@GuardedBy("committingLock")
157+
private volatile boolean committing;
156158

157159
@GuardedBy("lock")
158160
private volatile SettableApiFuture<Void> finishedAsyncOperations = SettableApiFuture.create();
@@ -284,8 +286,15 @@ void commit() {
284286
volatile ApiFuture<CommitResponse> commitFuture;
285287

286288
ApiFuture<CommitResponse> commitAsync() {
287-
if (committing.getAndSet(true)) {
288-
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
289+
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
290+
synchronized (committingLock) {
291+
if (committing) {
292+
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
293+
}
294+
committing = true;
295+
if (!mutations.isEmpty()) {
296+
Mutation.toProto(mutations, mutationsProto);
297+
}
289298
}
290299
final SettableApiFuture<CommitResponse> res = SettableApiFuture.create();
291300
final SettableApiFuture<Void> finishOps;
@@ -311,11 +320,7 @@ ApiFuture<CommitResponse> commitAsync() {
311320
finishOps = finishedAsyncOperations;
312321
}
313322
}
314-
if (!mutations.isEmpty()) {
315-
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
316-
Mutation.toProto(mutations, mutationsProto);
317-
builder.addAllMutations(mutationsProto);
318-
}
323+
builder.addAllMutations(mutationsProto);
319324
finishOps.addListener(
320325
new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor());
321326
return res;
@@ -608,10 +613,12 @@ public void onDone(boolean withBeginTransaction) {
608613

609614
@Override
610615
public void buffer(Mutation mutation) {
611-
if (committing.get()) {
612-
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
616+
synchronized (committingLock) {
617+
if (committing) {
618+
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
619+
}
620+
mutations.add(checkNotNull(mutation));
613621
}
614-
mutations.add(checkNotNull(mutation));
615622
}
616623

617624
@Override
@@ -625,11 +632,13 @@ public ApiFuture<Void> bufferAsync(Mutation mutation) {
625632

626633
@Override
627634
public void buffer(Iterable<Mutation> mutations) {
628-
if (committing.get()) {
629-
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
630-
}
631-
for (Mutation mutation : mutations) {
632-
this.mutations.add(checkNotNull(mutation));
635+
synchronized (committingLock) {
636+
if (committing) {
637+
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
638+
}
639+
for (Mutation mutation : mutations) {
640+
this.mutations.add(checkNotNull(mutation));
641+
}
633642
}
634643
}
635644

0 commit comments

Comments
 (0)