From dfe19aff6696e9c8c38e1534444a08e7fe197a73 Mon Sep 17 00:00:00 2001 From: "kevin.new" Date: Fri, 26 Jun 2026 17:42:24 +0000 Subject: [PATCH] feat(runway): wire merge-conflict-check controller to Merger extension The merge-conflict-check controller was a parse-and-log stub. Wire it to the Merger extension so it performs the dry-run check and publishes the MergeResult to the merge-conflict-check-signal topic. A merge conflict is an expected outcome (ack + publish FAILED), not an infrastructure error. Co-Authored-By: Claude Opus 4.6 (1M context) --- example/runway/server/BUILD.bazel | 2 + example/runway/server/main.go | 30 ++- .../controller/mergeconflictcheck/BUILD.bazel | 9 + .../mergeconflictcheck/mergeconflictcheck.go | 99 +++++++- .../mergeconflictcheck_test.go | 226 ++++++++++++++++-- 5 files changed, 336 insertions(+), 30 deletions(-) diff --git a/example/runway/server/BUILD.bazel b/example/runway/server/BUILD.bazel index e8bb1200..10d83d2f 100644 --- a/example/runway/server/BUILD.bazel +++ b/example/runway/server/BUILD.bazel @@ -17,6 +17,8 @@ go_library( "//runway/controller", "//runway/controller/merge", "//runway/controller/mergeconflictcheck", + "//runway/extension/merger", + "//runway/extension/merger/noop", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally//:tally", "@org_golang_google_grpc//:grpc", diff --git a/example/runway/server/main.go b/example/runway/server/main.go index e5b4d485..85fef699 100644 --- a/example/runway/server/main.go +++ b/example/runway/server/main.go @@ -39,6 +39,8 @@ import ( "github.com/uber/submitqueue/runway/controller" "github.com/uber/submitqueue/runway/controller/merge" "github.com/uber/submitqueue/runway/controller/mergeconflictcheck" + "github.com/uber/submitqueue/runway/extension/merger" + "github.com/uber/submitqueue/runway/extension/merger/noop" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -152,9 +154,13 @@ func run() error { ), ) + mergerFactory := newMergerFactory() + mergeConflictCheckController := mergeconflictcheck.NewController(mergeconflictcheck.Params{ Logger: logger.Sugar(), Scope: scope, + MergerFactory: mergerFactory, + Registry: registry, TopicKey: runwaymq.TopicKeyMergeConflictCheck, ConsumerGroup: "runway-mergeconflictcheck", }) @@ -235,10 +241,21 @@ func run() error { return err } -// newTopicRegistry builds the TopicRegistry for Runway's consumed merge queues. -// Runway is the consumer of the merge-conflict-check and merge queues; each is -// registered with a consuming subscription. The corresponding signal queues -// (where results are published) are not wired yet. +// newMergerFactory returns a merger.Factory for the example server. The noop +// implementation always succeeds; a real deployment wires a VCS-backed factory. +func newMergerFactory() merger.Factory { + return &noopMergerFactory{} +} + +type noopMergerFactory struct{} + +func (f *noopMergerFactory) For(_ merger.Config) (merger.Merger, error) { + return noop.New(), nil +} + +// newTopicRegistry builds the TopicRegistry for Runway's merge queues. Inbound +// topics (merge-conflict-check, merge) have subscriptions; outbound signal topics +// are publish-only. func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) { return consumer.NewTopicRegistry([]consumer.TopicConfig{ { @@ -249,6 +266,11 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe subscriberName, "runway-mergeconflictcheck", ), }, + { + Key: runwaymq.TopicKeyMergeConflictCheckSignal, + Name: "merge-conflict-check-signal", + Queue: q, + }, { Key: runwaymq.TopicKeyMerge, Name: "merge", diff --git a/runway/controller/mergeconflictcheck/BUILD.bazel b/runway/controller/mergeconflictcheck/BUILD.bazel index 6fa3c304..2c98fed4 100644 --- a/runway/controller/mergeconflictcheck/BUILD.bazel +++ b/runway/controller/mergeconflictcheck/BUILD.bazel @@ -7,8 +7,12 @@ go_library( visibility = ["//visibility:public"], deps = [ "//api/runway/messagequeue", + "//api/runway/messagequeue/protopb", + "//platform/base/messagequeue", "//platform/consumer", + "//platform/errs", "//platform/metrics", + "//runway/extension/merger", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], @@ -20,8 +24,13 @@ go_test( embed = [":mergeconflictcheck"], deps = [ "//api/runway/messagequeue", + "//api/runway/messagequeue/protopb", "//platform/base/messagequeue", + "//platform/consumer", + "//platform/errs", "//platform/extension/messagequeue/mock", + "//runway/extension/merger", + "//runway/extension/merger/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", diff --git a/runway/controller/mergeconflictcheck/mergeconflictcheck.go b/runway/controller/mergeconflictcheck/mergeconflictcheck.go index 689a64e9..343cfe77 100644 --- a/runway/controller/mergeconflictcheck/mergeconflictcheck.go +++ b/runway/controller/mergeconflictcheck/mergeconflictcheck.go @@ -16,19 +16,25 @@ // Runway's merge-conflict-check queue. A request asks whether an ordered sequence // of merge steps applies cleanly onto the target branch without committing. // -// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue -// and logs it. The real check (attempt the merge without committing and publish a -// MergeResult to the merge-conflict-check-signal queue) is not wired yet. +// The controller obtains a Merger for the request's landing target, calls +// CheckMergeability, and publishes the MergeResult to the +// merge-conflict-check-signal queue. A merge conflict is an expected outcome +// (ack + publish FAILED result), not an infrastructure error. package mergeconflictcheck import ( "context" + "errors" "fmt" "github.com/uber-go/tally" runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/runway/extension/merger" "go.uber.org/zap" ) @@ -39,6 +45,8 @@ var _ consumer.Controller = (*Controller)(nil) type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + mergerFactory merger.Factory + registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string } @@ -48,6 +56,9 @@ type Params struct { TopicKey consumer.TopicKey ConsumerGroup string + MergerFactory merger.Factory + Registry consumer.TopicRegistry + Scope tally.Scope Logger *zap.SugaredLogger } @@ -57,13 +68,22 @@ func NewController(p Params) *Controller { return &Controller{ logger: p.Logger.Named("mergeconflictcheck_controller"), metricsScope: p.Scope.SubScope("mergeconflictcheck_controller"), + mergerFactory: p.MergerFactory, + registry: p.Registry, topicKey: p.TopicKey, consumerGroup: p.ConsumerGroup, } } -// Process deserializes the merge request and logs it. Returns nil to ack, or an -// error to nack. +// Process deserializes the merge request, performs a dry-run merge check, and +// publishes the result. Returns nil to ack, or an error to nack. +// +// Error classification: deserialize, factory, and check failures are +// non-retryable (reject to DLQ) — a malformed payload or misconfigured target +// will not succeed on replay, and a genuinely transient VCS failure is the +// merger implementation's job to mark retryable (the wrap here preserves it). +// The signal publish is retryable: it is the hand-off that carries the result +// back to the orchestrator, so a transient enqueue blip should replay. func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { const opName = "process" @@ -75,13 +95,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r request := &runwaymq.MergeRequest{} if err := runwaymq.Unmarshal(msg.Payload, request); err != nil { metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) - // Non-retryable: a malformed payload will never deserialize on retry. return fmt.Errorf("failed to deserialize merge request: %w", err) } - // TODO: attempt the ordered merge steps without committing and publish a - // MergeResult to the merge-conflict-check-signal queue. For now the request - // is only logged after parsing. c.logger.Infow("received merge-conflict-check request", "id", request.Id, "queue_name", request.QueueName, @@ -90,6 +106,71 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "partition_key", msg.PartitionKey, ) + m, err := c.mergerFactory.For(merger.Config{QueueName: request.GetQueueName()}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1) + return fmt.Errorf("failed to create merger for queue %s: %w", request.GetQueueName(), err) + } + + result, err := m.CheckMergeability(ctx, request) + if err != nil { + if errors.Is(err, merger.ErrConflict) { + metrics.NamedCounter(c.metricsScope, opName, "merge_conflicts", 1) + c.logger.Infow("merge conflict detected", + "id", request.GetId(), + "queue_name", request.GetQueueName(), + ) + result = &runwaymq.MergeResult{ + Id: request.GetId(), + Outcome: runwaypb.Outcome_FAILED, + Reason: err.Error(), + } + } else { + metrics.NamedCounter(c.metricsScope, opName, "check_errors", 1) + return fmt.Errorf("failed to check mergeability for %s: %w", request.GetId(), err) + } + } + + if err := c.publish(ctx, runwaymq.TopicKeyMergeConflictCheckSignal, result, msg.PartitionKey); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + // Retryable: publishing the result is the hand-off that keeps this + // conflict check alive; a transient enqueue blip should replay rather + // than DLQ the only message carrying the result back to the orchestrator. + return errs.NewRetryableError(fmt.Errorf("failed to publish merge-conflict-check result for %s: %w", request.GetId(), err)) + } + + c.logger.Infow("published merge-conflict-check result", + "id", result.GetId(), + "outcome", result.GetOutcome().String(), + "topic_key", runwaymq.TopicKeyMergeConflictCheckSignal, + ) + + return nil +} + +// publish serializes a MergeResult and publishes it to the given signal topic. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, result *runwaymq.MergeResult, partitionKey string) error { + payload, err := runwaymq.Marshal(result) + if err != nil { + return fmt.Errorf("failed to serialize merge result: %w", err) + } + + msg := entityqueue.NewMessage(result.GetId(), payload, partitionKey, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + return nil } diff --git a/runway/controller/mergeconflictcheck/mergeconflictcheck_test.go b/runway/controller/mergeconflictcheck/mergeconflictcheck_test.go index 70fd41ad..dba2cc56 100644 --- a/runway/controller/mergeconflictcheck/mergeconflictcheck_test.go +++ b/runway/controller/mergeconflictcheck/mergeconflictcheck_test.go @@ -16,14 +16,20 @@ package mergeconflictcheck import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb" entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/runway/extension/merger" + mergermock "github.com/uber/submitqueue/runway/extension/merger/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) @@ -34,16 +40,6 @@ const ( testPartitionKey = "test-queue" ) -func newController(t *testing.T) *Controller { - t.Helper() - return NewController(Params{ - Logger: zaptest.NewLogger(t).Sugar(), - Scope: tally.NoopScope, - TopicKey: runwaymq.TopicKeyMergeConflictCheck, - ConsumerGroup: "runway-mergeconflictcheck", - }) -} - func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { t.Helper() msg := entityqueue.NewMessage(testID, payload, testPartitionKey, nil) @@ -53,26 +49,137 @@ func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemo return d } -func requestPayload(t *testing.T, req runwaymq.MergeRequest) []byte { +func requestPayload(t *testing.T, req *runwaymq.MergeRequest) []byte { t.Helper() - payload, err := runwaymq.Marshal(&req) + payload, err := runwaymq.Marshal(req) require.NoError(t, err) return payload } +func newRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error) (consumer.TopicRegistry, *queuemock.MockPublisher) { + t.Helper() + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, _ entityqueue.Message) error { + return publishErr + }, + ).AnyTimes() + + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: runwaymq.TopicKeyMergeConflictCheckSignal, Name: "merge-conflict-check-signal", Queue: q}, + }) + require.NoError(t, err) + return registry, pub +} + +func newController(t *testing.T, factory merger.Factory, registry consumer.TopicRegistry) *Controller { + t.Helper() + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + MergerFactory: factory, + Registry: registry, + TopicKey: runwaymq.TopicKeyMergeConflictCheck, + ConsumerGroup: "runway-mergeconflictcheck", + }) +} + func TestNewController(t *testing.T) { - controller := newController(t) + ctrl := gomock.NewController(t) + factory := mergermock.NewMockFactory(ctrl) + registry, _ := newRegistry(t, ctrl, nil) + + controller := newController(t, factory, registry) require.NotNil(t, controller) assert.Equal(t, runwaymq.TopicKeyMergeConflictCheck, controller.TopicKey()) assert.Equal(t, "runway-mergeconflictcheck", controller.ConsumerGroup()) assert.Equal(t, "merge-conflict-check", controller.Name()) } -func TestProcess_LogsParsedRequest(t *testing.T) { +func TestProcess_Success(t *testing.T) { + ctrl := gomock.NewController(t) + + expectedResult := &runwaymq.MergeResult{ + Id: testID, + Outcome: runwaypb.Outcome_SUCCEEDED, + Steps: []*runwaymq.StepResult{ + {StepId: "step-1"}, + }, + } + + m := mergermock.NewMockMerger(ctrl) + m.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(expectedResult, nil) + + factory := mergermock.NewMockFactory(ctrl) + factory.EXPECT().For(merger.Config{QueueName: testQueue}).Return(m, nil) + + var gotTopic string + var gotPayload []byte + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: runwaymq.TopicKeyMergeConflictCheckSignal, Name: "merge-conflict-check-signal", Queue: q}, + }) + require.NoError(t, err) + + controller := newController(t, factory, registry) + + req := &runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + require.NoError(t, controller.Process(context.Background(), delivery)) + + assert.Equal(t, "merge-conflict-check-signal", gotTopic) + result := &runwaymq.MergeResult{} + require.NoError(t, runwaymq.Unmarshal(gotPayload, result)) + assert.Equal(t, testID, result.Id) + assert.Equal(t, runwaypb.Outcome_SUCCEEDED, result.Outcome) +} + +func TestProcess_MergeConflict(t *testing.T) { ctrl := gomock.NewController(t) - controller := newController(t) - req := runwaymq.MergeRequest{ + m := mergermock.NewMockMerger(ctrl) + m.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("conflict in foo.go: %w", merger.ErrConflict)) + + factory := mergermock.NewMockFactory(ctrl) + factory.EXPECT().For(merger.Config{QueueName: testQueue}).Return(m, nil) + + var gotTopic string + var gotPayload []byte + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: runwaymq.TopicKeyMergeConflictCheckSignal, Name: "merge-conflict-check-signal", Queue: q}, + }) + require.NoError(t, err) + + controller := newController(t, factory, registry) + + req := &runwaymq.MergeRequest{ Id: testID, QueueName: testQueue, Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, @@ -80,11 +187,96 @@ func TestProcess_LogsParsedRequest(t *testing.T) { delivery := newDelivery(t, ctrl, requestPayload(t, req)) require.NoError(t, controller.Process(context.Background(), delivery)) + + assert.Equal(t, "merge-conflict-check-signal", gotTopic) + result := &runwaymq.MergeResult{} + require.NoError(t, runwaymq.Unmarshal(gotPayload, result)) + assert.Equal(t, testID, result.Id) + assert.Equal(t, runwaypb.Outcome_FAILED, result.Outcome) + assert.NotEmpty(t, result.Reason) +} + +func TestProcess_MergerInfraError(t *testing.T) { + ctrl := gomock.NewController(t) + + m := mergermock.NewMockMerger(ctrl) + m.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("git timeout")) + + factory := mergermock.NewMockFactory(ctrl) + factory.EXPECT().For(merger.Config{QueueName: testQueue}).Return(m, nil) + + registry, _ := newRegistry(t, ctrl, nil) + controller := newController(t, factory, registry) + + req := &runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + err := controller.Process(context.Background(), delivery) + require.Error(t, err) + // Non-retryable: the merger marks genuinely transient VCS failures itself. + assert.False(t, errs.IsRetryable(err)) +} + +func TestProcess_FactoryError(t *testing.T) { + ctrl := gomock.NewController(t) + + factory := mergermock.NewMockFactory(ctrl) + factory.EXPECT().For(merger.Config{QueueName: testQueue}).Return(nil, fmt.Errorf("unknown queue")) + + registry, _ := newRegistry(t, ctrl, nil) + controller := newController(t, factory, registry) + + req := &runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + err := controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) +} + +func TestProcess_PublishError(t *testing.T) { + ctrl := gomock.NewController(t) + + expectedResult := &runwaymq.MergeResult{ + Id: testID, + Outcome: runwaypb.Outcome_SUCCEEDED, + } + + m := mergermock.NewMockMerger(ctrl) + m.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(expectedResult, nil) + + factory := mergermock.NewMockFactory(ctrl) + factory.EXPECT().For(merger.Config{QueueName: testQueue}).Return(m, nil) + + registry, _ := newRegistry(t, ctrl, fmt.Errorf("publish failed")) + controller := newController(t, factory, registry) + + req := &runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + err := controller.Process(context.Background(), delivery) + require.Error(t, err) + // Retryable: the signal publish is the result hand-off and must replay. + assert.True(t, errs.IsRetryable(err)) } func TestProcess_DeserializeError(t *testing.T) { ctrl := gomock.NewController(t) - controller := newController(t) + factory := mergermock.NewMockFactory(ctrl) + registry, _ := newRegistry(t, ctrl, nil) + controller := newController(t, factory, registry) delivery := newDelivery(t, ctrl, []byte(`{"id": not json}`))