-
Notifications
You must be signed in to change notification settings - Fork 2
feat(runway): wire merge-conflict-check controller to Merger extension #280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,19 +16,25 @@ | |
| // Runway's merge-conflict-check queue. A request asks whether an ordered sequence | ||
| // of merge steps applies cleanly onto the target branch without committing. | ||
| // | ||
| // Currently a parse-and-log stub: it deserializes the MergeRequest off the queue | ||
| // and logs it. The real check (attempt the merge without committing and publish a | ||
| // MergeResult to the merge-conflict-check-signal queue) is not wired yet. | ||
| // The controller obtains a Merger for the request's landing target, calls | ||
| // CheckMergeability, and publishes the MergeResult to the | ||
| // merge-conflict-check-signal queue. A merge conflict is an expected outcome | ||
| // (ack + publish FAILED result), not an infrastructure error. | ||
| package mergeconflictcheck | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
|
|
||
| "github.com/uber-go/tally" | ||
| runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" | ||
| runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb" | ||
| entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" | ||
| "github.com/uber/submitqueue/platform/consumer" | ||
| "github.com/uber/submitqueue/platform/errs" | ||
| "github.com/uber/submitqueue/platform/metrics" | ||
| "github.com/uber/submitqueue/runway/extension/merger" | ||
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
|
|
@@ -39,6 +45,8 @@ var _ consumer.Controller = (*Controller)(nil) | |
| type Controller struct { | ||
| logger *zap.SugaredLogger | ||
| metricsScope tally.Scope | ||
| mergerFactory merger.Factory | ||
| registry consumer.TopicRegistry | ||
| topicKey consumer.TopicKey | ||
| consumerGroup string | ||
| } | ||
|
|
@@ -48,6 +56,9 @@ type Params struct { | |
| TopicKey consumer.TopicKey | ||
| ConsumerGroup string | ||
|
|
||
| MergerFactory merger.Factory | ||
| Registry consumer.TopicRegistry | ||
|
|
||
| Scope tally.Scope | ||
| Logger *zap.SugaredLogger | ||
| } | ||
|
|
@@ -57,13 +68,22 @@ func NewController(p Params) *Controller { | |
| return &Controller{ | ||
| logger: p.Logger.Named("mergeconflictcheck_controller"), | ||
| metricsScope: p.Scope.SubScope("mergeconflictcheck_controller"), | ||
| mergerFactory: p.MergerFactory, | ||
| registry: p.Registry, | ||
| topicKey: p.TopicKey, | ||
| consumerGroup: p.ConsumerGroup, | ||
| } | ||
| } | ||
|
|
||
| // Process deserializes the merge request and logs it. Returns nil to ack, or an | ||
| // error to nack. | ||
| // Process deserializes the merge request, performs a dry-run merge check, and | ||
| // publishes the result. Returns nil to ack, or an error to nack. | ||
| // | ||
| // Error classification: deserialize, factory, and check failures are | ||
| // non-retryable (reject to DLQ) — a malformed payload or misconfigured target | ||
| // will not succeed on replay, and a genuinely transient VCS failure is the | ||
| // merger implementation's job to mark retryable (the wrap here preserves it). | ||
| // The signal publish is retryable: it is the hand-off that carries the result | ||
| // back to the orchestrator, so a transient enqueue blip should replay. | ||
| func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { | ||
| const opName = "process" | ||
|
|
||
|
|
@@ -75,13 +95,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r | |
| request := &runwaymq.MergeRequest{} | ||
| if err := runwaymq.Unmarshal(msg.Payload, request); err != nil { | ||
| metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) | ||
| // Non-retryable: a malformed payload will never deserialize on retry. | ||
| return fmt.Errorf("failed to deserialize merge request: %w", err) | ||
| } | ||
|
|
||
| // TODO: attempt the ordered merge steps without committing and publish a | ||
| // MergeResult to the merge-conflict-check-signal queue. For now the request | ||
| // is only logged after parsing. | ||
| c.logger.Infow("received merge-conflict-check request", | ||
| "id", request.Id, | ||
| "queue_name", request.QueueName, | ||
|
|
@@ -90,6 +106,71 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r | |
| "partition_key", msg.PartitionKey, | ||
| ) | ||
|
|
||
| m, err := c.mergerFactory.For(merger.Config{QueueName: request.GetQueueName()}) | ||
| if err != nil { | ||
| metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1) | ||
| return fmt.Errorf("failed to create merger for queue %s: %w", request.GetQueueName(), err) | ||
| } | ||
|
|
||
| result, err := m.CheckMergeability(ctx, request) | ||
| if err != nil { | ||
| if errors.Is(err, merger.ErrConflict) { | ||
| metrics.NamedCounter(c.metricsScope, opName, "merge_conflicts", 1) | ||
| c.logger.Infow("merge conflict detected", | ||
| "id", request.GetId(), | ||
| "queue_name", request.GetQueueName(), | ||
| ) | ||
| result = &runwaymq.MergeResult{ | ||
| Id: request.GetId(), | ||
| Outcome: runwaypb.Outcome_FAILED, | ||
| Reason: err.Error(), | ||
| } | ||
| } else { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reverse if statement and return early to avoid branching |
||
| metrics.NamedCounter(c.metricsScope, opName, "check_errors", 1) | ||
| return fmt.Errorf("failed to check mergeability for %s: %w", request.GetId(), err) | ||
| } | ||
| } | ||
|
|
||
| if err := c.publish(ctx, runwaymq.TopicKeyMergeConflictCheckSignal, result, msg.PartitionKey); err != nil { | ||
| metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) | ||
| // Retryable: publishing the result is the hand-off that keeps this | ||
| // conflict check alive; a transient enqueue blip should replay rather | ||
| // than DLQ the only message carrying the result back to the orchestrator. | ||
| return errs.NewRetryableError(fmt.Errorf("failed to publish merge-conflict-check result for %s: %w", request.GetId(), err)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do not do classification here; not all errors are retryable, for example programming bugs are not retryable. Only certain errors (like connection failure) are retryable and should be recognized by a generic-purpose classifier.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed guidance around this in #281 |
||
| } | ||
|
|
||
| c.logger.Infow("published merge-conflict-check result", | ||
| "id", result.GetId(), | ||
| "outcome", result.GetOutcome().String(), | ||
| "topic_key", runwaymq.TopicKeyMergeConflictCheckSignal, | ||
| ) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // publish serializes a MergeResult and publishes it to the given signal topic. | ||
| func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, result *runwaymq.MergeResult, partitionKey string) error { | ||
| payload, err := runwaymq.Marshal(result) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to serialize merge result: %w", err) | ||
| } | ||
|
|
||
| msg := entityqueue.NewMessage(result.GetId(), payload, partitionKey, nil) | ||
|
|
||
| q, ok := c.registry.Queue(key) | ||
| if !ok { | ||
| return fmt.Errorf("no queue registered for topic key %s", key) | ||
| } | ||
|
|
||
| topicName, ok := c.registry.TopicName(key) | ||
| if !ok { | ||
| return fmt.Errorf("no topic name registered for topic key %s", key) | ||
| } | ||
|
|
||
| if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { | ||
| return fmt.Errorf("failed to publish message: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at some point it would be nice to return and log a list of conflicting files as well