Skip to content

Commit bd2385e

Browse files
authored
State Based replication fix (#7553)
## What changed? <!-- Describe what has changed in this PR --> 1. Update the mutable state update execution state to ignore the change when no change on the status 2. Update the caller of Partial refresh to provide inclusive state transition. 3. Add events to event cache when processing sync versioned transition task ## Why? <!-- Tell your future self why have you made these changes --> 1 and 2 is to reduce task refresh count. 3 is for reduce ReadHistoryBranch call. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> unit test ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> no risk ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> n/a ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> yes
1 parent 2b610f1 commit bd2385e

File tree

5 files changed

+48
-48
lines changed

5 files changed

+48
-48
lines changed

service/history/ndc/workflow_state_replicator.go

+41-3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import (
6060
"go.temporal.io/server/common/primitives/timestamp"
6161
serviceerrors "go.temporal.io/server/common/serviceerror"
6262
"go.temporal.io/server/service/history/consts"
63+
"go.temporal.io/server/service/history/events"
6364
"go.temporal.io/server/service/history/historybuilder"
6465
historyi "go.temporal.io/server/service/history/interfaces"
6566
"go.temporal.io/server/service/history/workflow"
@@ -404,8 +405,9 @@ func (r *WorkflowStateReplicatorImpl) applyMutation(
404405
return err
405406
}
406407
}
407-
408-
err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localVersionedTransition)
408+
nextVersionedTransition := transitionhistory.CopyVersionedTransition(localVersionedTransition)
409+
nextVersionedTransition.TransitionCount++
410+
err = r.taskRefresher.PartialRefresh(ctx, localMutableState, nextVersionedTransition)
409411
if err != nil {
410412
return err
411413
}
@@ -550,7 +552,9 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowExist(
550552
return err
551553
}
552554
} else {
553-
err = r.taskRefresher.PartialRefresh(ctx, localMutableState, localVersionedTransition)
555+
nextVersionedTransition := transitionhistory.CopyVersionedTransition(localVersionedTransition)
556+
nextVersionedTransition.TransitionCount++
557+
err = r.taskRefresher.PartialRefresh(ctx, localMutableState, nextVersionedTransition)
554558
if err != nil {
555559
return err
556560
}
@@ -839,6 +843,7 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
839843
}
840844
for _, event := range events {
841845
localMutableState.AddReapplyCandidateEvent(event)
846+
r.addEventToCache(localMutableState.GetWorkflowKey(), event)
842847
}
843848
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
844849
ShardID: r.shardContext.GetShardID(),
@@ -888,6 +893,7 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
888893
}
889894
for _, event := range events {
890895
localMutableState.AddReapplyCandidateEvent(event)
896+
r.addEventToCache(localMutableState.GetWorkflowKey(), event)
891897
}
892898
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
893899
ShardID: r.shardContext.GetShardID(),
@@ -1202,6 +1208,7 @@ BackfillLoop:
12021208
}
12031209
for _, event := range events {
12041210
mutableState.AddReapplyCandidateEvent(event)
1211+
r.addEventToCache(mutableState.GetWorkflowKey(), event)
12051212
}
12061213
}
12071214

@@ -1348,6 +1355,37 @@ func (r *WorkflowStateReplicatorImpl) getHistoryFromRemotePaginationFn(
13481355
}
13491356
}
13501357

1358+
func (r *WorkflowStateReplicatorImpl) addEventToCache(
1359+
workflowKey definition.WorkflowKey,
1360+
event *historypb.HistoryEvent,
1361+
) {
1362+
switch event.EventType {
1363+
case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED,
1364+
enumspb.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED,
1365+
enumspb.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED,
1366+
enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED,
1367+
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED,
1368+
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
1369+
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW,
1370+
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED,
1371+
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT,
1372+
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED,
1373+
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
1374+
r.shardContext.GetEventsCache().PutEvent(
1375+
events.EventKey{
1376+
NamespaceID: namespace.ID(workflowKey.NamespaceID),
1377+
WorkflowID: workflowKey.WorkflowID,
1378+
RunID: workflowKey.RunID,
1379+
EventID: event.GetEventId(),
1380+
Version: event.GetVersion(),
1381+
},
1382+
event,
1383+
)
1384+
default:
1385+
}
1386+
1387+
}
1388+
13511389
func sortAncestors(ans []*persistencespb.HistoryBranchRange) []*persistencespb.HistoryBranchRange {
13521390
if len(ans) > 0 {
13531391
// sort ans based onf EndNodeID so that we can set BeginNodeID

service/history/ndc/workflow_state_replicator_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_SameBranch_S
695695
mockTaskRefresher.EXPECT().
696696
PartialRefresh(gomock.Any(), gomock.Any(), EqVersionedTransition(&persistencespb.VersionedTransition{
697697
NamespaceFailoverVersion: 2,
698-
TransitionCount: 18,
698+
TransitionCount: 19,
699699
}),
700700
).Return(nil).Times(1)
701701

@@ -870,7 +870,7 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_SameBranch_S
870870
mockTaskRefresher.EXPECT().
871871
PartialRefresh(gomock.Any(), gomock.Any(), EqVersionedTransition(&persistencespb.VersionedTransition{
872872
NamespaceFailoverVersion: 2,
873-
TransitionCount: 18,
873+
TransitionCount: 19,
874874
}),
875875
).Return(nil).Times(1)
876876

@@ -1053,6 +1053,7 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_W
10531053
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
10541054
RunId: s.runID,
10551055
}).AnyTimes()
1056+
mockMutableState.EXPECT().GetWorkflowKey().Return(definition.NewWorkflowKey(namespaceID, s.workflowID, s.runID)).AnyTimes()
10561057
mockMutableState.EXPECT().SetHistoryBuilder(gomock.Any()).Times(1)
10571058

10581059
allEvents := append(gapEvents, requestedEvents...)

service/history/workflow/mutable_state_impl.go

+3
Original file line numberDiff line numberDiff line change
@@ -5754,6 +5754,9 @@ func (ms *MutableStateImpl) UpdateWorkflowStateStatus(
57545754
state enumsspb.WorkflowExecutionState,
57555755
status enumspb.WorkflowExecutionStatus,
57565756
) error {
5757+
if state == ms.executionState.State && status == ms.executionState.Status {
5758+
return nil
5759+
}
57575760
if state != enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
57585761
ms.executionStateUpdated = true
57595762
ms.visibilityUpdated = true // workflow status & state change triggers visibility change as well

service/history/workflow/task_refresher.go

+1-18
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,6 @@ func (r *TaskRefresherImpl) PartialRefresh(
9898
mutableState historyi.MutableState,
9999
minVersionedTransition *persistencespb.VersionedTransition,
100100
) error {
101-
if transitionhistory.Compare(minVersionedTransition, EmptyVersionedTransition) != 0 {
102-
// Perform a sanity check to make sure that the minVersionedTransition, if provided,
103-
// is on the current branch of transition history.
104-
if err := TransitionHistoryStalenessCheck(
105-
mutableState.GetExecutionInfo().TransitionHistory,
106-
minVersionedTransition,
107-
); err != nil {
108-
return err
109-
}
110-
}
111-
112101
taskGenerator := r.taskGeneratorProvider.NewTaskGenerator(
113102
r.shard,
114103
mutableState,
@@ -410,13 +399,8 @@ func (r *TaskRefresherImpl) refreshTasksForActivity(
410399
continue
411400
}
412401

413-
scheduleEvent, err := mutableState.GetActivityScheduledEvent(ctx, activityInfo.ScheduledEventId)
414-
if err != nil {
415-
return err
416-
}
417-
418402
if err := taskGenerator.GenerateActivityTasks(
419-
scheduleEvent.GetEventId(),
403+
activityInfo.ScheduledEventId,
420404
); err != nil {
421405
return err
422406
}
@@ -605,7 +589,6 @@ func (r *TaskRefresherImpl) refreshTasksForWorkflowSearchAttr(
605589
) < 0 {
606590
return nil
607591
}
608-
609592
return taskGenerator.GenerateUpsertVisibilityTask()
610593
}
611594

service/history/workflow/task_refresher_test.go

-25
Original file line numberDiff line numberDiff line change
@@ -613,31 +613,6 @@ func (s *taskRefresherSuite) TestRefreshActivityTasks() {
613613
10,
614614
)
615615
s.NoError(err)
616-
617-
for _, eventID := range tc.getActivityScheduledEventIDs {
618-
// only the first activity will actually refresh the transfer activity task
619-
scheduledEvent := &historypb.HistoryEvent{
620-
EventId: eventID,
621-
Version: common.EmptyVersion,
622-
EventType: enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED,
623-
Attributes: &historypb.HistoryEvent_ActivityTaskScheduledEventAttributes{
624-
ActivityTaskScheduledEventAttributes: &historypb.ActivityTaskScheduledEventAttributes{},
625-
},
626-
}
627-
s.mockShard.MockEventsCache.EXPECT().GetEvent(
628-
gomock.Any(),
629-
s.mockShard.GetShardID(),
630-
events.EventKey{
631-
NamespaceID: tests.NamespaceID,
632-
WorkflowID: tests.WorkflowID,
633-
RunID: tests.RunID,
634-
EventID: eventID,
635-
Version: common.EmptyVersion,
636-
},
637-
int64(4),
638-
branchToken,
639-
).Return(scheduledEvent, nil).Times(1)
640-
}
641616
for _, eventID := range tc.generateActivityTaskIDs {
642617
s.mockTaskGenerator.EXPECT().GenerateActivityTasks(int64(eventID)).Return(nil).Times(1)
643618
}

0 commit comments

Comments
 (0)