Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/runway/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ go_library(
"//runway/controller",
"//runway/controller/merge",
"//runway/controller/mergeconflictcheck",
"//runway/extension/merger",
"//runway/extension/merger/noop",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally//:tally",
"@org_golang_google_grpc//:grpc",
Expand Down
30 changes: 26 additions & 4 deletions example/runway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"github.com/uber/submitqueue/runway/controller"
"github.com/uber/submitqueue/runway/controller/merge"
"github.com/uber/submitqueue/runway/controller/mergeconflictcheck"
"github.com/uber/submitqueue/runway/extension/merger"
"github.com/uber/submitqueue/runway/extension/merger/noop"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -152,9 +154,13 @@ func run() error {
),
)

mergerFactory := newMergerFactory()

mergeConflictCheckController := mergeconflictcheck.NewController(mergeconflictcheck.Params{
Logger: logger.Sugar(),
Scope: scope,
MergerFactory: mergerFactory,
Registry: registry,
TopicKey: runwaymq.TopicKeyMergeConflictCheck,
ConsumerGroup: "runway-mergeconflictcheck",
})
Expand Down Expand Up @@ -235,10 +241,21 @@ func run() error {
return err
}

// newTopicRegistry builds the TopicRegistry for Runway's consumed merge queues.
// Runway is the consumer of the merge-conflict-check and merge queues; each is
// registered with a consuming subscription. The corresponding signal queues
// (where results are published) are not wired yet.
// newMergerFactory returns a merger.Factory for the example server. The noop
// implementation always succeeds; a real deployment wires a VCS-backed factory.
func newMergerFactory() merger.Factory {
return &noopMergerFactory{}
}

type noopMergerFactory struct{}

func (f *noopMergerFactory) For(_ merger.Config) (merger.Merger, error) {
return noop.New(), nil
}

// newTopicRegistry builds the TopicRegistry for Runway's merge queues. Inbound
// topics (merge-conflict-check, merge) have subscriptions; outbound signal topics
// are publish-only.
func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) {
return consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Expand All @@ -249,6 +266,11 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
subscriberName, "runway-mergeconflictcheck",
),
},
{
Key: runwaymq.TopicKeyMergeConflictCheckSignal,
Name: "merge-conflict-check-signal",
Queue: q,
},
{
Key: runwaymq.TopicKeyMerge,
Name: "merge",
Expand Down
9 changes: 9 additions & 0 deletions runway/controller/mergeconflictcheck/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//api/runway/messagequeue",
"//api/runway/messagequeue/protopb",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/errs",
"//platform/metrics",
"//runway/extension/merger",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
Expand All @@ -20,8 +24,13 @@ go_test(
embed = [":mergeconflictcheck"],
deps = [
"//api/runway/messagequeue",
"//api/runway/messagequeue/protopb",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/errs",
"//platform/extension/messagequeue/mock",
"//runway/extension/merger",
"//runway/extension/merger/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
Expand Down
99 changes: 90 additions & 9 deletions runway/controller/mergeconflictcheck/mergeconflictcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@
// Runway's merge-conflict-check queue. A request asks whether an ordered sequence
// of merge steps applies cleanly onto the target branch without committing.
//
// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue
// and logs it. The real check (attempt the merge without committing and publish a
// MergeResult to the merge-conflict-check-signal queue) is not wired yet.
// The controller obtains a Merger for the request's landing target, calls
// CheckMergeability, and publishes the MergeResult to the
// merge-conflict-check-signal queue. A merge conflict is an expected outcome
// (ack + publish FAILED result), not an infrastructure error.
package mergeconflictcheck

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally"
runwaymq "github.com/uber/submitqueue/api/runway/messagequeue"
runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb"
entityqueue "github.com/uber/submitqueue/platform/base/messagequeue"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/errs"
"github.com/uber/submitqueue/platform/metrics"
"github.com/uber/submitqueue/runway/extension/merger"
"go.uber.org/zap"
)

Expand All @@ -39,6 +45,8 @@ var _ consumer.Controller = (*Controller)(nil)
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
mergerFactory merger.Factory
registry consumer.TopicRegistry
topicKey consumer.TopicKey
consumerGroup string
}
Expand All @@ -48,6 +56,9 @@ type Params struct {
TopicKey consumer.TopicKey
ConsumerGroup string

MergerFactory merger.Factory
Registry consumer.TopicRegistry

Scope tally.Scope
Logger *zap.SugaredLogger
}
Expand All @@ -57,13 +68,22 @@ func NewController(p Params) *Controller {
return &Controller{
logger: p.Logger.Named("mergeconflictcheck_controller"),
metricsScope: p.Scope.SubScope("mergeconflictcheck_controller"),
mergerFactory: p.MergerFactory,
registry: p.Registry,
topicKey: p.TopicKey,
consumerGroup: p.ConsumerGroup,
}
}

// Process deserializes the merge request and logs it. Returns nil to ack, or an
// error to nack.
// Process deserializes the merge request, performs a dry-run merge check, and
// publishes the result. Returns nil to ack, or an error to nack.
//
// Error classification: deserialize, factory, and check failures are
// non-retryable (reject to DLQ) — a malformed payload or misconfigured target
// will not succeed on replay, and a genuinely transient VCS failure is the
// merger implementation's job to mark retryable (the wrap here preserves it).
// The signal publish is retryable: it is the hand-off that carries the result
// back to the orchestrator, so a transient enqueue blip should replay.
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
const opName = "process"

Expand All @@ -75,13 +95,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
request := &runwaymq.MergeRequest{}
if err := runwaymq.Unmarshal(msg.Payload, request); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
// Non-retryable: a malformed payload will never deserialize on retry.
return fmt.Errorf("failed to deserialize merge request: %w", err)
}

// TODO: attempt the ordered merge steps without committing and publish a
// MergeResult to the merge-conflict-check-signal queue. For now the request
// is only logged after parsing.
c.logger.Infow("received merge-conflict-check request",
"id", request.Id,
"queue_name", request.QueueName,
Expand All @@ -90,6 +106,71 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
"partition_key", msg.PartitionKey,
)

m, err := c.mergerFactory.For(merger.Config{QueueName: request.GetQueueName()})
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1)
return fmt.Errorf("failed to create merger for queue %s: %w", request.GetQueueName(), err)
}

result, err := m.CheckMergeability(ctx, request)
if err != nil {
if errors.Is(err, merger.ErrConflict) {
metrics.NamedCounter(c.metricsScope, opName, "merge_conflicts", 1)
c.logger.Infow("merge conflict detected",
"id", request.GetId(),
"queue_name", request.GetQueueName(),
)
result = &runwaymq.MergeResult{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at some point it would be nice to return and log a list of conflicting files as well

Id: request.GetId(),
Outcome: runwaypb.Outcome_FAILED,
Reason: err.Error(),
}
} else {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverse if statement and return early to avoid branching

metrics.NamedCounter(c.metricsScope, opName, "check_errors", 1)
return fmt.Errorf("failed to check mergeability for %s: %w", request.GetId(), err)
}
}

if err := c.publish(ctx, runwaymq.TopicKeyMergeConflictCheckSignal, result, msg.PartitionKey); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
// Retryable: publishing the result is the hand-off that keeps this
// conflict check alive; a transient enqueue blip should replay rather
// than DLQ the only message carrying the result back to the orchestrator.
return errs.NewRetryableError(fmt.Errorf("failed to publish merge-conflict-check result for %s: %w", request.GetId(), err))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not do classification here; not all errors are retryable, for example programming bugs are not retryable. Only certain errors (like connection failure) are retryable and should be recognized by a generic-purpose classifier.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed guidance around this in #281

}

c.logger.Infow("published merge-conflict-check result",
"id", result.GetId(),
"outcome", result.GetOutcome().String(),
"topic_key", runwaymq.TopicKeyMergeConflictCheckSignal,
)

return nil
}

// publish serializes a MergeResult and publishes it to the given signal topic.
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, result *runwaymq.MergeResult, partitionKey string) error {
payload, err := runwaymq.Marshal(result)
if err != nil {
return fmt.Errorf("failed to serialize merge result: %w", err)
}

msg := entityqueue.NewMessage(result.GetId(), payload, partitionKey, nil)

q, ok := c.registry.Queue(key)
if !ok {
return fmt.Errorf("no queue registered for topic key %s", key)
}

topicName, ok := c.registry.TopicName(key)
if !ok {
return fmt.Errorf("no topic name registered for topic key %s", key)
}

if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

return nil
}

Expand Down
Loading
Loading