Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring. Split interface/implementation for Shard/Workflow Context #7443

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions service/history/api/addtasks/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
historyi "go.temporal.io/server/service/history/interfaces"
"go.temporal.io/server/service/history/tasks"
)

Expand All @@ -59,7 +59,7 @@ const (
// any validation on the shard ID because that must have been done by whoever provided the shard.Context to this method.
func Invoke(
ctx context.Context,
shardContext shard.Context,
shardContext historyi.ShardContext,
deserializer TaskDeserializer,
numShards int,
req *historyservice.AddTasksRequest,
Expand Down
12 changes: 6 additions & 6 deletions service/history/api/addtasks/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/service/history/api/addtasks"
"go.temporal.io/server/service/history/shard"
historyi "go.temporal.io/server/service/history/interfaces"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
"go.uber.org/mock/gomock"
Expand All @@ -48,7 +48,7 @@ import (
type (
// testParams contains the arguments for invoking addtasks.Invoke.
testParams struct {
shardContext shard.Context
shardContext historyi.ShardContext
deserializer addtasks.TaskDeserializer
numShards int
req *historyservice.AddTasksRequest
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestInvoke(t *testing.T) {
{
name: "happy path",
configure: func(t *testing.T, params *testParams) {
params.shardContext.(*shard.MockContext).EXPECT().AddTasks(
params.shardContext.(*historyi.MockShardContext).EXPECT().AddTasks(
gomock.Any(),
gomock.Any(),
).Return(nil)
Expand All @@ -97,7 +97,7 @@ func TestInvoke(t *testing.T) {
configure: func(t *testing.T, params *testParams) {
numWorkflows := 2
requests := make([]*persistence.AddHistoryTasksRequest, 0, numWorkflows)
params.shardContext.(*shard.MockContext).EXPECT().AddTasks(
params.shardContext.(*historyi.MockShardContext).EXPECT().AddTasks(
gomock.Any(),
gomock.Any(),
).DoAndReturn(func(_ context.Context, req *persistence.AddHistoryTasksRequest) error {
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestInvoke(t *testing.T) {
{
name: "add tasks error",
configure: func(t *testing.T, params *testParams) {
params.shardContext.(*shard.MockContext).EXPECT().AddTasks(
params.shardContext.(*historyi.MockShardContext).EXPECT().AddTasks(
gomock.Any(),
gomock.Any(),
).Return(assert.AnError)
Expand Down Expand Up @@ -265,7 +265,7 @@ func getDefaultTestParams(t *testing.T) *testParams {
blob, err := serializer.SerializeTask(task)
require.NoError(t, err)
ctrl := gomock.NewController(t)
shardContext := shard.NewMockContext(ctrl)
shardContext := historyi.NewMockShardContext(ctrl)
shardContext.EXPECT().GetShardID().Return(int32(1)).AnyTimes()
shardContext.EXPECT().GetRangeID().Return(int64(1)).AnyTimes()
params := &testParams{
Expand Down
5 changes: 2 additions & 3 deletions service/history/api/consistency_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/consts"
historyi "go.temporal.io/server/service/history/interfaces"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/vclock"
wcache "go.temporal.io/server/service/history/workflow/cache"
)
Expand Down Expand Up @@ -71,13 +70,13 @@ type (
}

WorkflowConsistencyCheckerImpl struct {
shardContext shard.Context
shardContext historyi.ShardContext
workflowCache wcache.Cache
}
)

func NewWorkflowConsistencyChecker(
shardContext shard.Context,
shardContext historyi.ShardContext,
workflowCache wcache.Cache,
) *WorkflowConsistencyCheckerImpl {
return &WorkflowConsistencyCheckerImpl{
Expand Down
8 changes: 3 additions & 5 deletions service/history/api/consistency_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ import (
"go.temporal.io/server/common/testing/protomock"
"go.temporal.io/server/service/history/configs"
historyi "go.temporal.io/server/service/history/interfaces"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
"go.uber.org/mock/gomock"
)
Expand All @@ -55,7 +53,7 @@ type (
*require.Assertions

controller *gomock.Controller
shardContext *shard.MockContext
shardContext *historyi.MockShardContext
workflowCache *wcache.MockCache
config *configs.Config

Expand Down Expand Up @@ -83,7 +81,7 @@ func (s *workflowConsistencyCheckerSuite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())
s.shardContext = shard.NewMockContext(s.controller)
s.shardContext = historyi.NewMockShardContext(s.controller)
s.workflowCache = wcache.NewMockCache(s.controller)
s.config = tests.NewDynamicConfig()

Expand All @@ -105,7 +103,7 @@ func (s *workflowConsistencyCheckerSuite) TearDownTest() {
func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck_Success_PassCheck() {
ctx := context.Background()

wfContext := workflow.NewMockContext(s.controller)
wfContext := historyi.NewMockWorkflowContext(s.controller)
mutableState := historyi.NewMockMutableState(s.controller)
released := false
releaseFn := func(err error) { released = true }
Expand Down
17 changes: 8 additions & 9 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/common/worker_versioning"
historyi "go.temporal.io/server/service/history/interfaces"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
"google.golang.org/protobuf/types/known/durationpb"
Expand All @@ -63,13 +62,13 @@ type (
}
CreateOrUpdateLeaseFunc func(
WorkflowLease,
shard.Context,
historyi.ShardContext,
historyi.MutableState,
) (WorkflowLease, error)
)

func NewWorkflowWithSignal(
shard shard.Context,
shard historyi.ShardContext,
namespaceEntry *namespace.Namespace,
workflowID string,
runID string,
Expand Down Expand Up @@ -152,7 +151,7 @@ func NewWorkflowWithSignal(
// NOTE: must implement CreateOrUpdateLeaseFunc.
func NewWorkflowLeaseAndContext(
existingLease WorkflowLease,
shardCtx shard.Context,
shardCtx historyi.ShardContext,
ms historyi.MutableState,
) (WorkflowLease, error) {
// TODO(stephanos): remove this hack
Expand All @@ -177,7 +176,7 @@ func NewWorkflowLeaseAndContext(
}

func CreateMutableState(
shard shard.Context,
shard historyi.ShardContext,
namespaceEntry *namespace.Namespace,
executionTimeout *durationpb.Duration,
runTimeout *durationpb.Duration,
Expand Down Expand Up @@ -213,7 +212,7 @@ func GenerateFirstWorkflowTask(
}

func NewWorkflowVersionCheck(
shard shard.Context,
shard historyi.ShardContext,
prevLastWriteVersion int64,
newMutableState historyi.MutableState,
) error {
Expand All @@ -236,7 +235,7 @@ func NewWorkflowVersionCheck(

func ValidateStart(
ctx context.Context,
shard shard.Context,
shard historyi.ShardContext,
namespaceEntry *namespace.Namespace,
workflowID string,
workflowInputSize int,
Expand Down Expand Up @@ -284,7 +283,7 @@ func ValidateStart(
func ValidateStartWorkflowExecutionRequest(
ctx context.Context,
request *workflowservice.StartWorkflowExecutionRequest,
shard shard.Context,
shard historyi.ShardContext,
namespaceEntry *namespace.Namespace,
operation string,
) error {
Expand Down Expand Up @@ -342,7 +341,7 @@ func ValidateStartWorkflowExecutionRequest(
func OverrideStartWorkflowExecutionRequest(
request *workflowservice.StartWorkflowExecutionRequest,
operation string,
shard shard.Context,
shard historyi.ShardContext,
metricsHandler metrics.Handler,
) {
// workflow execution timeout is left as is
Expand Down
10 changes: 5 additions & 5 deletions service/history/api/deleteworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ import (
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/shard"
historyi "go.temporal.io/server/service/history/interfaces"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
ctx context.Context,
request *historyservice.DeleteWorkflowExecutionRequest,
shard shard.Context,
shardContext historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
workflowDeleteManager deletemanager.DeleteManager,
) (_ *historyservice.DeleteWorkflowExecutionResponse, retError error) {
Expand Down Expand Up @@ -76,14 +76,14 @@ func Invoke(
// skip delete open workflow
return &historyservice.DeleteWorkflowExecutionResponse{}, nil
}
ns, err := shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(request.GetNamespaceId()))
ns, err := shardContext.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(request.GetNamespaceId()))
if err != nil {
return nil, err
}
if ns.ActiveInCluster(shard.GetClusterMetadata().GetCurrentClusterName()) {
if ns.ActiveInCluster(shardContext.GetClusterMetadata().GetCurrentClusterName()) {
// If workflow execution is running and in active cluster.
if err := api.UpdateWorkflowWithNew(
shard,
shardContext,
ctx,
workflowLease,
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
Expand Down
4 changes: 2 additions & 2 deletions service/history/api/describemutablestate/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (
"go.temporal.io/server/common/locks"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
historyi "go.temporal.io/server/service/history/interfaces"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
ctx context.Context,
req *historyservice.DescribeMutableStateRequest,
shardContext shard.Context,
shardContext historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (_ *historyservice.DescribeMutableStateResponse, retError error) {
namespaceID := namespace.ID(req.GetNamespaceId())
Expand Down
4 changes: 2 additions & 2 deletions service/history/api/describeworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/circuitbreakerpool"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/shard"
historyi "go.temporal.io/server/service/history/interfaces"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -80,7 +80,7 @@ func clonePayloadMap(source map[string]*commonpb.Payload) map[string]*commonpb.P
func Invoke(
ctx context.Context,
req *historyservice.DescribeWorkflowExecutionRequest,
shard shard.Context,
shard historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
persistenceVisibilityMgr manager.VisibilityManager,
outboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool,
Expand Down
Loading
Loading