Skip to content

Commit 7980161

Browse files
authored
Pause activity by rules in executeActivityRetryTimerTask (#7555)
## What changed? <!-- Describe what has changed in this PR --> Add code to pause the activity while processing activity retry task. ## Why? <!-- Tell your future self why have you made these changes --> Part of the workflow rules efforts. This is another place where we need to check rules related to the activity. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Add functional test. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> N/A. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> N/A.
1 parent 36d3382 commit 7980161

File tree

2 files changed

+198
-5
lines changed

2 files changed

+198
-5
lines changed

service/history/timer_queue_active_task_executor.go

+50
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,18 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask(
549549
return consts.ErrWorkflowCompleted
550550
}
551551

552+
err = t.processActivityWorkflowRules(ctx, weContext, mutableState, activityInfo)
553+
if err != nil {
554+
return err
555+
}
556+
557+
// task can be paused as the result of processing activity workflow rules, so we need to check again
558+
if task.Stamp != activityInfo.Stamp || activityInfo.Paused {
559+
// if retry task event is from an old stamp of if activity is paused we should ignore the event.
560+
release(nil) // release(nil) so mutable state is not unloaded from cache
561+
return consts.ErrActivityTaskNotFound
562+
}
563+
552564
taskQueue := &taskqueuepb.TaskQueue{
553565
Name: activityInfo.TaskQueue,
554566
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
@@ -880,3 +892,41 @@ func (t *timerQueueActiveTaskExecutor) emitTimeoutMetricScopeWithNamespaceTag(
880892
metrics.HeartbeatTimeoutCounter.With(metricsScope).Record(1)
881893
}
882894
}
895+
896+
func (t *timerQueueActiveTaskExecutor) processActivityWorkflowRules(
897+
ctx context.Context,
898+
weContext historyi.WorkflowContext,
899+
ms historyi.MutableState,
900+
ai *persistencespb.ActivityInfo,
901+
) error {
902+
if ai.Paused {
903+
return nil
904+
}
905+
906+
activityChanged := workflow.ActivityMatchWorkflowRules(ms, t.logger, ai)
907+
if !activityChanged {
908+
return nil
909+
}
910+
if ai.Paused {
911+
// need to update activity
912+
if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error {
913+
activityInfo.StartedEventId = common.EmptyEventID
914+
activityInfo.StartedTime = nil
915+
activityInfo.RequestId = ""
916+
return nil
917+
}); err != nil {
918+
return err
919+
}
920+
921+
// need to update mutable state
922+
err := weContext.UpdateWorkflowExecutionAsActive(
923+
ctx,
924+
t.shardContext,
925+
)
926+
if err != nil {
927+
return err
928+
}
929+
}
930+
931+
return nil
932+
}

tests/activity_api_rules_test.go

+148-5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"go.temporal.io/sdk/temporal"
4141
"go.temporal.io/sdk/workflow"
4242
"go.temporal.io/server/common/dynamicconfig"
43+
"go.temporal.io/server/common/log"
4344
"go.temporal.io/server/common/util"
4445
"go.temporal.io/server/tests/testcore"
4546
)
@@ -72,10 +73,11 @@ type internalRulesTestWorkflow struct {
7273
activityFailedCn chan struct{}
7374

7475
testSuite *testcore.FunctionalTestBase
76+
logger log.Logger
7577
ctx context.Context
7678
}
7779

78-
func newInternalRulesTestWorkflow(ctx context.Context, testSuite *testcore.FunctionalTestBase) *internalRulesTestWorkflow {
80+
func newInternalRulesTestWorkflow(ctx context.Context, testSuite *testcore.FunctionalTestBase, logger log.Logger) *internalRulesTestWorkflow {
7981
wf := &internalRulesTestWorkflow{
8082
initialRetryInterval: 1 * time.Second,
8183
scheduleToCloseTimeout: 30 * time.Minute,
@@ -84,6 +86,7 @@ func newInternalRulesTestWorkflow(ctx context.Context, testSuite *testcore.Funct
8486
activityFailedCn: make(chan struct{}),
8587
testSuite: testSuite,
8688
ctx: ctx,
89+
logger: logger,
8790
}
8891
wf.activityRetryPolicy = &temporal.RetryPolicy{
8992
InitialInterval: wf.initialRetryInterval,
@@ -305,7 +308,7 @@ func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryActivity() {
305308
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
306309
defer cancel()
307310

308-
testWorkflow := newInternalRulesTestWorkflow(ctx, &s.FunctionalTestBase)
311+
testWorkflow := newInternalRulesTestWorkflow(ctx, &s.FunctionalTestBase, s.Logger)
309312

310313
s.Worker().RegisterWorkflow(testWorkflow.WorkflowFuncForRetryActivity)
311314
s.Worker().RegisterActivity(testWorkflow.ActivityFuncForRetryActivity)
@@ -339,8 +342,9 @@ func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryActivity() {
339342
}
340343
}, 5*time.Second, 200*time.Millisecond)
341344

342-
// Let activity fail
343-
err = util.InterruptibleSleep(ctx, 1*time.Second)
345+
// Let namespace config propagate.
346+
// There is no good way to check if the namespace config has propagated to the history service
347+
err = util.InterruptibleSleep(ctx, 4*time.Second)
344348
s.NoError(err)
345349

346350
testWorkflow.activityFailedCn <- struct{}{}
@@ -378,6 +382,11 @@ func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryActivity() {
378382
assert.Len(s.T(), nsResp.Rules, 0)
379383
}, 5*time.Second, 200*time.Millisecond)
380384

385+
// Let namespace config propagate.
386+
// There is no good way to check if the namespace config has propagated to the history service
387+
err = util.InterruptibleSleep(ctx, 4*time.Second)
388+
s.NoError(err)
389+
381390
// unpause the activity
382391
_, err = s.FrontendClient().UnpauseActivity(ctx, &workflowservice.UnpauseActivityRequest{
383392
Namespace: s.Namespace().String(),
@@ -397,7 +406,7 @@ func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryActivity() {
397406
assert.True(t, description.PendingActivities[0].GetActivityType().GetName() == activityType)
398407
assert.False(t, description.PendingActivities[0].GetPaused())
399408
}
400-
assert.Equal(t, int32(1), testWorkflow.startedActivityCount.Load())
409+
assert.LessOrEqual(t, int32(1), testWorkflow.startedActivityCount.Load())
401410
}, 5*time.Second, 200*time.Millisecond)
402411

403412
// let activity complete
@@ -409,6 +418,140 @@ func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryActivity() {
409418
s.NoError(err)
410419
}
411420

421+
func (s *ActivityApiRulesClientTestSuite) TestActivityRulesApi_RetryTask() {
422+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
423+
defer cancel()
424+
425+
// overall test execution plan:
426+
// 1. start workflow
427+
// 2. wait for activity to start and fail exactly once
428+
// 3. create rule to pause activity
429+
// 4. wait for activity to be paused by rule
430+
// 5. let activity succeed
431+
// 6. Remove the rule so it didn't interfere with the activity
432+
// 7. Make sure there is no rules
433+
// 6. Unpause the activity. this will also trigger the activity
434+
// 7. Wait for activity to be unpaused
435+
// 8. Let activity complete
436+
// 9. Wait for workflow to finish
437+
438+
testRetryTaskWorkflow := newInternalRulesTestWorkflow(ctx, &s.FunctionalTestBase, s.Logger)
439+
440+
// set much longer retry interval to make sure that activity is retried at least once
441+
s.initialRetryInterval = 4 * time.Second
442+
s.activityRetryPolicy.InitialInterval = s.initialRetryInterval
443+
444+
s.Worker().RegisterWorkflow(testRetryTaskWorkflow.WorkflowFuncForRetryTask)
445+
s.Worker().RegisterActivity(testRetryTaskWorkflow.ActivityFuncForRetryTask)
446+
447+
// 1. Start workflow
448+
workflowRun := s.createWorkflow(ctx, testRetryTaskWorkflow.WorkflowFuncForRetryTask)
449+
450+
// 2. Wait for activity to start and fail exactly once
451+
s.EventuallyWithT(func(t *assert.CollectT) {
452+
description, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID())
453+
assert.NoError(t, err)
454+
if description.GetPendingActivities() != nil {
455+
assert.Len(t, description.PendingActivities, 1)
456+
}
457+
assert.Equal(t, int32(1), testRetryTaskWorkflow.startedActivityCount.Load())
458+
}, 2*time.Second, 200*time.Millisecond)
459+
460+
// 3. Create rule to pause activity
461+
ruleID := "pause-activity"
462+
activityType := "ActivityFuncForRetryTask"
463+
createRuleRequest := s.createPauseRuleRequest(activityType, ruleID)
464+
createRuleResponse, err := s.FrontendClient().CreateWorkflowRule(ctx, createRuleRequest)
465+
s.NoError(err)
466+
s.NotNil(createRuleResponse)
467+
468+
// 4. verify that frontend has updated namespaces
469+
s.EventuallyWithT(func(t *assert.CollectT) {
470+
nsResp, err := s.FrontendClient().ListWorkflowRules(ctx, &workflowservice.ListWorkflowRulesRequest{
471+
Namespace: s.Namespace().String(),
472+
})
473+
assert.NoError(s.T(), err)
474+
assert.NotNil(s.T(), nsResp)
475+
assert.NotNil(s.T(), nsResp.Rules)
476+
if nsResp.GetRules() != nil {
477+
assert.Len(s.T(), nsResp.Rules, 1)
478+
assert.Equal(s.T(), ruleID, nsResp.Rules[0].Spec.Id)
479+
}
480+
}, 5*time.Second, 200*time.Millisecond)
481+
482+
// Let namespace config propagate.
483+
// There is no good way to check if the namespace config has propagated to the history service
484+
err = util.InterruptibleSleep(ctx, 2*time.Second)
485+
s.NoError(err)
486+
487+
// 5. wait for activity to be paused by rule. This should happen in the activity retry task
488+
s.EventuallyWithT(func(t *assert.CollectT) {
489+
description, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID())
490+
assert.NoError(t, err)
491+
if description.GetPendingActivities() != nil {
492+
assert.Len(t, description.PendingActivities, 1)
493+
assert.True(t, description.PendingActivities[0].GetActivityType().GetName() == activityType)
494+
assert.True(t, description.PendingActivities[0].GetPaused())
495+
}
496+
assert.Equal(t, int32(1), testRetryTaskWorkflow.startedActivityCount.Load())
497+
}, 5*time.Second, 200*time.Millisecond)
498+
499+
// let activity succeed
500+
testRetryTaskWorkflow.letActivitySucceed.Store(true)
501+
502+
// remove the rule so it didn't interfere with the activity
503+
deleteRuleResponse, err := s.FrontendClient().DeleteWorkflowRule(ctx, &workflowservice.DeleteWorkflowRuleRequest{
504+
Namespace: s.Namespace().String(),
505+
RuleId: ruleID,
506+
})
507+
s.NoError(err)
508+
s.NotNil(deleteRuleResponse)
509+
510+
// make sure there is no rules
511+
s.EventuallyWithT(func(t *assert.CollectT) {
512+
nsResp, err := s.FrontendClient().ListWorkflowRules(ctx, &workflowservice.ListWorkflowRulesRequest{
513+
Namespace: s.Namespace().String(),
514+
})
515+
assert.NoError(s.T(), err)
516+
assert.NotNil(s.T(), nsResp)
517+
assert.Len(s.T(), nsResp.Rules, 0)
518+
}, 5*time.Second, 200*time.Millisecond)
519+
520+
// Let namespace config propagate.
521+
// There is no good way to check if the namespace config has propagated to the history service
522+
err = util.InterruptibleSleep(ctx, 2*time.Second)
523+
s.NoError(err)
524+
525+
// unpause the activity. this will also trigger the activity
526+
_, err = s.FrontendClient().UnpauseActivity(ctx, &workflowservice.UnpauseActivityRequest{
527+
Namespace: s.Namespace().String(),
528+
Execution: &commonpb.WorkflowExecution{
529+
WorkflowId: workflowRun.GetID(),
530+
},
531+
Activity: &workflowservice.UnpauseActivityRequest_Type{Type: activityType},
532+
})
533+
s.NoError(err)
534+
535+
// wait for activity to be unpaused
536+
s.EventuallyWithT(func(t *assert.CollectT) {
537+
description, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID())
538+
assert.NoError(t, err)
539+
if description.GetPendingActivities() != nil {
540+
assert.Len(t, description.PendingActivities, 1)
541+
assert.True(t, description.PendingActivities[0].GetActivityType().GetName() == activityType)
542+
assert.False(t, description.PendingActivities[0].GetPaused())
543+
}
544+
assert.LessOrEqual(t, int32(1), testRetryTaskWorkflow.startedActivityCount.Load())
545+
}, 5*time.Second, 200*time.Millisecond)
546+
547+
// let activity complete
548+
testRetryTaskWorkflow.activityCompleteCn <- struct{}{}
549+
// wait for workflow to finish
550+
var out string
551+
err = workflowRun.Get(ctx, &out)
552+
s.NoError(err)
553+
}
554+
412555
func (s *ActivityApiRulesClientTestSuite) createPauseRuleRequest(
413556
activityType string, ruleID string,
414557
) *workflowservice.CreateWorkflowRuleRequest {

0 commit comments

Comments
 (0)