From 8d1088f45277897260ebd85efc368f11176481d6 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Fri, 26 Jun 2026 09:06:41 -0700 Subject: [PATCH] feat(stovepipe): wire SourceControl + stores into ingest, publish to process MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary ### Why? Ingest was a thin stub: it minted a request id but never resolved the commit URI, persisted anything, or moved the request onto the pipeline, so `Request.URI` stayed empty and nothing consumed the work. This makes ingest the real pipeline entry and adds the first internal queue contract so the pipeline can hand work to the next stage. ### What? Ingest now resolves the queue's head URI via the SourceControl extension, dedups on the (queue, URI) pair, persists the Request and its URI mapping via storage, and publishes the request id to a new process stage over the messaging queue. Ingestion is idempotent: a re-reported head resolves to the already-minted request and nothing is published again. The URI mapping is claimed before the request row is written, so a lost race leaves no orphan row. Adds the first internal proto message-queue contract under `stovepipe/core/messagequeue` (proto3 + protojson, mirroring `api/runway/messagequeue`): a `ProcessRequest` payload carrying the id, the `TopicKeyProcess` constant, and the protojson glue, wired into the proto codegen (`tool/proto`, `PROTO_PACKAGES`). Per CLAUDE.md, internal contracts live under the domain's `core/`, not `api/`, and the contract package owns both the payload and its topic keys. Adds a minimal `process` consumer (`stovepipe/controller/process`) that reloads the Request by id and logs it; a not-yet-visible request is retryable so redelivery converges. The build-strategy/ancestry logic the RFC assigns to `process` is deferred. Wires the example server (`example/stovepipe/server`) into a MySQL storage + MySQL queue + fake SourceControl stack with the process consumer running, plus docker-compose (two databases) and a schema-init make target. ## Test Plan ✅ `bazel test //stovepipe/...` — contract round-trip + topic-key binding, ingest (happy/dedup/race/unknown-queue/infra-error paths), and process consumer unit tests. ✅ `bazel test //test/integration/stovepipe:stovepipe_test` — compose-backed: Ingest persists the request + URI mapping, publishes to the process topic, and a re-ingest dedups to the same id. ✅ `bazel build //...`, `make fmt`. --- Makefile | 21 +- example/stovepipe/docker-compose.yml | 46 +++- example/stovepipe/server/BUILD.bazel | 12 + example/stovepipe/server/main.go | 129 ++++++++++- stovepipe/controller/BUILD.bazel | 13 ++ stovepipe/controller/ingest.go | 185 +++++++++++++-- stovepipe/controller/ingest_test.go | 217 +++++++++++++++--- stovepipe/controller/process/BUILD.bazel | 38 +++ stovepipe/controller/process/process.go | 121 ++++++++++ stovepipe/controller/process/process_test.go | 99 ++++++++ stovepipe/core/messagequeue/BUILD.bazel | 29 +++ stovepipe/core/messagequeue/README.md | 11 + stovepipe/core/messagequeue/messagequeue.go | 81 +++++++ stovepipe/core/messagequeue/process_test.go | 70 ++++++ stovepipe/core/messagequeue/proto/BUILD.bazel | 47 ++++ .../core/messagequeue/proto/process.proto | 35 +++ .../core/messagequeue/protopb/BUILD.bazel | 13 ++ .../core/messagequeue/protopb/process.pb.go | 144 ++++++++++++ stovepipe/core/messagequeue/topics.go | 30 +++ test/integration/stovepipe/BUILD.bazel | 3 + test/integration/stovepipe/suite_test.go | 71 +++++- tool/proto/BUILD.bazel | 12 + 22 files changed, 1360 insertions(+), 67 deletions(-) create mode 100644 stovepipe/controller/process/BUILD.bazel create mode 100644 stovepipe/controller/process/process.go create mode 100644 stovepipe/controller/process/process_test.go create mode 100644 stovepipe/core/messagequeue/BUILD.bazel create mode 100644 stovepipe/core/messagequeue/README.md create mode 100644 stovepipe/core/messagequeue/messagequeue.go create mode 100644 stovepipe/core/messagequeue/process_test.go create mode 100644 stovepipe/core/messagequeue/proto/BUILD.bazel create mode 100644 stovepipe/core/messagequeue/proto/process.proto create mode 100644 stovepipe/core/messagequeue/protopb/BUILD.bazel create mode 100644 stovepipe/core/messagequeue/protopb/process.pb.go create mode 100644 stovepipe/core/messagequeue/topics.go diff --git a/Makefile b/Makefile index 70bed4fc..92546046 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ GOIMPORTS_VERSION ?= v0.33.0 # (the out_dir convention in tool/proto/BUILD.bazel) and copied back here. A # package may hold multiple .proto files (e.g. an RPC contract plus messagequeue # contracts); all generated stubs land in the same protopb/ dir. -PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/runway api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe +PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/runway api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe stovepipe/core/messagequeue # Set REPO_ROOT for docker-compose export REPO_ROOT := $(shell pwd) @@ -51,7 +51,7 @@ define assert_clean fi endef -.PHONY: build build-all-linux build-runway-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-runway-start local-runway-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-logs local-stovepipe-start local-stovepipe-stop mocks proto query-deps query-targets run-client-runway run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help +.PHONY: build build-all-linux build-runway-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-init-stovepipe-schemas local-runway-start local-runway-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-logs local-stovepipe-start local-stovepipe-stop mocks proto query-deps query-targets run-client-runway run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help build: ## Build all services and examples @@ -221,6 +221,19 @@ local-init-runway-queue-schema: ## Apply queue schema only (mysql-queue) for Run done @echo "✅ Runway queue schema applied successfully" +local-init-stovepipe-schemas: ## Apply storage (mysql-app) and queue (mysql-queue) schemas for Stovepipe compose stacks + @echo "Applying storage schema to mysql-app..." + @for file in stovepipe/extension/storage/mysql/schema/*.sql; do \ + echo " - Applying $$(basename $$file)..."; \ + docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-app-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ + done + @echo "Applying queue schema to mysql-queue..." + @for file in platform/extension/messagequeue/mysql/schema/*.sql; do \ + echo " - Applying $$(basename $$file)..."; \ + docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \ + done + @echo "✅ Stovepipe schemas applied successfully" + local-runway-start: build-runway-linux ## Start Runway locally (runway + MySQL queue) @echo "Starting Runway with compose..." @$(COMPOSE) -f $(RUNWAY_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) up -d --build --wait @@ -310,9 +323,11 @@ local-stop: ## Stop all services (keep data) local-stovepipe-logs: ## View logs from the running Stovepipe service @$(COMPOSE) -f $(STOVEPIPE_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) logs -f -local-stovepipe-start: build-stovepipe-linux ## Start Stovepipe service (single Ping-only gRPC service) +local-stovepipe-start: build-stovepipe-linux ## Start Stovepipe service (gRPC service + MySQL storage + MySQL queue) @echo "Starting Stovepipe service with compose..." @$(COMPOSE) -f $(STOVEPIPE_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) up -d --build --wait + @echo "Applying storage and queue schemas..." + @$(MAKE) -s local-init-stovepipe-schemas @echo "" @echo "✅ Stovepipe service is running!" @echo "" diff --git a/example/stovepipe/docker-compose.yml b/example/stovepipe/docker-compose.yml index aa3866d1..7bc64cc9 100644 --- a/example/stovepipe/docker-compose.yml +++ b/example/stovepipe/docker-compose.yml @@ -1,7 +1,9 @@ # Docker Compose for the Stovepipe service. # -# Stovepipe is currently a single Ping-only gRPC service with no storage or -# queue dependencies, so this stack runs just the one service. +# Stovepipe ingests a queue's head commit, persists the Request, and publishes it +# to the process stage over a messaging queue; the process consumer reloads the +# Request. The stack therefore runs the service plus two MySQL databases: one for +# storage (request, request_uri) and one for the messaging queue. # # IMPORTANT: Before running compose, build the Linux binary: # make build-stovepipe-linux @@ -12,8 +14,40 @@ # Quick start: # make local-stovepipe-start # +# After `up`, the storage and queue schemas are applied (local-init-stovepipe-schemas). services: + # Storage database - Stovepipe's request and request_uri tables. + mysql-app: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + # Use 127.0.0.1 (TCP) instead of localhost (Unix socket). MySQL treats + # "localhost" as a socket connection, which can be ready before the TCP + # listener — causing dependent services that connect over TCP to fail. + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10 + + # Queue database - messaging infrastructure (messages, offsets, partition leases). + mysql-queue: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10 + stovepipe-service: build: context: ${REPO_ROOT} @@ -22,3 +56,11 @@ services: - "8080" # Random ephemeral port to avoid conflicts environment: - PORT=:8080 + - STORAGE_MYSQL_DSN=root:root@tcp(mysql-app:3306)/submitqueue?parseTime=true + - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true + - HOSTNAME=stovepipe-dev + depends_on: + mysql-app: + condition: service_healthy + mysql-queue: + condition: service_healthy diff --git a/example/stovepipe/server/BUILD.bazel b/example/stovepipe/server/BUILD.bazel index 017d8a96..68572b79 100644 --- a/example/stovepipe/server/BUILD.bazel +++ b/example/stovepipe/server/BUILD.bazel @@ -7,7 +7,19 @@ go_library( visibility = ["//visibility:private"], deps = [ "//api/stovepipe/protopb", + "//platform/consumer", + "//platform/errs", + "//platform/errs/generic", + "//platform/errs/mysql", + "//platform/extension/messagequeue", + "//platform/extension/messagequeue/mysql", "//stovepipe/controller", + "//stovepipe/controller/process", + "//stovepipe/core/messagequeue", + "//stovepipe/extension/sourcecontrol", + "//stovepipe/extension/sourcecontrol/fake", + "//stovepipe/extension/storage/mysql", + "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally//:tally", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//reflection", diff --git a/example/stovepipe/server/main.go b/example/stovepipe/server/main.go index 8ab97ae1..e082ba5d 100644 --- a/example/stovepipe/server/main.go +++ b/example/stovepipe/server/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "database/sql" "errors" "fmt" "net" @@ -25,9 +26,21 @@ import ( "syscall" "time" + _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally" pb "github.com/uber/submitqueue/api/stovepipe/protopb" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + genericerrs "github.com/uber/submitqueue/platform/errs/generic" + mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql" + extqueue "github.com/uber/submitqueue/platform/extension/messagequeue" + queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql" "github.com/uber/submitqueue/stovepipe/controller" + "github.com/uber/submitqueue/stovepipe/controller/process" + stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue" + "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol" + sourcecontrolfake "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol/fake" + storageMySQL "github.com/uber/submitqueue/stovepipe/extension/storage/mysql" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -70,6 +83,15 @@ func (c *inMemoryCounter) Next(_ context.Context, domain string) (int64, error) return c.values[domain], nil } +// fakeSourceControlFactory is the example SourceControl factory. It seeds each queue with a +// deterministic single-commit history so ingest resolves a stable head URI (and re-ingesting +// the same queue exercises the dedup path). A real deployment supplies a VCS-backed factory. +type fakeSourceControlFactory struct{} + +func (fakeSourceControlFactory) For(cfg sourcecontrol.Config) (sourcecontrol.SourceControl, error) { + return sourcecontrolfake.New([]string{fmt.Sprintf("git://%s/HEAD", cfg.QueueName)}), nil +} + func main() { code := 0 if err := run(); err != nil { @@ -131,12 +153,85 @@ func run() error { metricsWgDone.Wait() }() + // Storage database (request + request_uri tables). + storageDSN := os.Getenv("STORAGE_MYSQL_DSN") + if storageDSN == "" { + return fmt.Errorf("STORAGE_MYSQL_DSN environment variable is required") + } + storageDB, err := sql.Open("mysql", storageDSN) + if err != nil { + return fmt.Errorf("failed to open storage database: %w", err) + } + defer storageDB.Close() + + store, err := storageMySQL.NewStorage(storageDB, scope.SubScope("storage")) + if err != nil { + return fmt.Errorf("failed to create storage: %w", err) + } + defer store.Close() + + // Queue database (messaging infrastructure for the process stage). + queueDSN := os.Getenv("QUEUE_MYSQL_DSN") + if queueDSN == "" { + return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required") + } + queueDB, err := sql.Open("mysql", queueDSN) + if err != nil { + return fmt.Errorf("failed to open queue database: %w", err) + } + defer queueDB.Close() + + mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: queueDB, + Logger: logger, + MetricsScope: scope.SubScope("queue"), + }) + if err != nil { + return fmt.Errorf("failed to create queue: %w", err) + } + defer mysqlQueue.Close() + + subscriberName := os.Getenv("HOSTNAME") + if subscriberName == "" { + subscriberName = fmt.Sprintf("stovepipe-%d", time.Now().Unix()) + } + + registry, err := newTopicRegistry(mysqlQueue, subscriberName) + if err != nil { + return fmt.Errorf("failed to create topic registry: %w", err) + } + + // Consumer running the process stage. + primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + errs.NewClassifierProcessor( + genericerrs.Classifier, + mysqlerrs.Classifier, + ), + ) + + processController := process.NewController(logger.Sugar(), scope, store, stovepipemq.TopicKeyProcess, "stovepipe-process") + if err := primaryConsumer.Register(processController); err != nil { + return fmt.Errorf("failed to register process controller: %w", err) + } + + if err := primaryConsumer.Start(ctx); err != nil { + return fmt.Errorf("failed to start consumer: %w", err) + } + logger.Info("consumer started") + // Create gRPC server grpcServer := grpc.NewServer() // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) - ingestController := controller.NewIngestController(logger.Sugar(), scope, newInMemoryCounter()) + ingestController := controller.NewIngestController( + logger.Sugar(), + scope, + newInMemoryCounter(), + fakeSourceControlFactory{}, + store, + registry, + ) srv := &StovepipeServer{ pingController: pingController, ingestController: ingestController, @@ -166,16 +261,13 @@ func run() error { }() // Wait for interrupt signal or server critical error - // If interruption is signaled, gracefully stop the server - // If an error happens during shutdown, return the actual error, not the context cancellation error var serverErr error select { case <-ctx.Done(): fmt.Println("Shutting down stovepipe server due to interruption signal...") // Set the error to the context cancellation error to be surfaced as a desired exit code by the main function - // to indicate that the server was stopped as intended - // It may be overridden by the server error if any + // to indicate that the server was stopped as intended. It may be overridden by the server error if any. err = ctx.Err() // stop GRPC server and wait for it to exit @@ -183,11 +275,36 @@ func run() error { serverErr = <-serverErrCh case serverErr = <-serverErrCh: fmt.Println("Shutting down stovepipe server due to critical GRPC server error...") + cancel() } if serverErr != nil { - err = fmt.Errorf("GRPC server exited with error: %w", serverErr) + serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr) + } + + consumerStopErr := primaryConsumer.Stop(30000) + if consumerStopErr != nil { + consumerStopErr = fmt.Errorf("failed to stop consumer: %w", consumerStopErr) + } + + if consumerStopErr != nil || serverErr != nil { + err = errors.Join(err, consumerStopErr, serverErr) } return err } + +// newTopicRegistry builds the TopicRegistry for Stovepipe's internal pipeline queues. ingest +// publishes to the process topic and the process consumer subscribes to it. +func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) { + return consumer.NewTopicRegistry([]consumer.TopicConfig{ + { + Key: stovepipemq.TopicKeyProcess, + Name: "process", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "stovepipe-process", + ), + }, + }) +} diff --git a/stovepipe/controller/BUILD.bazel b/stovepipe/controller/BUILD.bazel index 51aa4636..f0faffa5 100644 --- a/stovepipe/controller/BUILD.bazel +++ b/stovepipe/controller/BUILD.bazel @@ -10,10 +10,15 @@ go_library( visibility = ["//visibility:public"], deps = [ "//api/stovepipe/protopb", + "//platform/base/messagequeue", + "//platform/consumer", "//platform/errs", "//platform/extension/counter", "//platform/metrics", + "//stovepipe/core/messagequeue", "//stovepipe/entity", + "//stovepipe/extension/sourcecontrol", + "//stovepipe/extension/storage", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], @@ -28,7 +33,15 @@ go_test( embed = [":controller"], deps = [ "//api/stovepipe/protopb", + "//platform/consumer", "//platform/extension/counter/mock", + "//platform/extension/messagequeue/mock", + "//stovepipe/core/messagequeue", + "//stovepipe/entity", + "//stovepipe/extension/sourcecontrol", + "//stovepipe/extension/sourcecontrol/mock", + "//stovepipe/extension/storage", + "//stovepipe/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", diff --git a/stovepipe/controller/ingest.go b/stovepipe/controller/ingest.go index 8d1fd180..697432c9 100644 --- a/stovepipe/controller/ingest.go +++ b/stovepipe/controller/ingest.go @@ -21,10 +21,15 @@ import ( "github.com/uber-go/tally" pb "github.com/uber/submitqueue/api/stovepipe/protopb" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" "github.com/uber/submitqueue/platform/errs" "github.com/uber/submitqueue/platform/extension/counter" "github.com/uber/submitqueue/platform/metrics" + stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue" "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol" + "github.com/uber/submitqueue/stovepipe/extension/storage" "go.uber.org/zap" ) @@ -40,25 +45,48 @@ func IsInvalidRequest(err error) bool { // IngestController handles ingest business logic for stovepipe: it admits a queue's newly // observed commit into the validation pipeline. // -// This is the thin entry point. It mints a request ID namespaced by the queue and records the -// resulting Request. Resolving the commit URI via the SourceControl extension, persisting the -// Request, and publishing it onto the pipeline are deliberately out of scope for now. +// It resolves the queue's head commit URI via the SourceControl extension, dedups on the +// (queue, URI) pair, persists the Request and its URI mapping via storage, and publishes the +// request ID onto the process stage. Ingestion is idempotent: a re-reported head resolves to the +// already-minted request and no new work is published. type IngestController struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - counter counter.Counter + logger *zap.SugaredLogger + metricsScope tally.Scope + counter counter.Counter + sourceControl sourcecontrol.Factory + store storage.Storage + registry consumer.TopicRegistry } -// NewIngestController creates a new instance of the stovepipe ingest controller. -func NewIngestController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter) *IngestController { +// NewIngestController creates a new instance of the stovepipe ingest controller. It publishes +// accepted requests to the topic registered under messagequeue.TopicKeyProcess in the registry. +func NewIngestController( + logger *zap.SugaredLogger, + scope tally.Scope, + counter counter.Counter, + sourceControl sourcecontrol.Factory, + store storage.Storage, + registry consumer.TopicRegistry, +) *IngestController { return &IngestController{ - logger: logger, - metricsScope: scope.SubScope("ingest_controller"), - counter: counter, + logger: logger, + metricsScope: scope.SubScope("ingest_controller"), + counter: counter, + sourceControl: sourceControl, + store: store, + registry: registry, } } -// Ingest admits a queue's newly observed commit into the validation pipeline and returns the minted request ID. +// Ingest admits a queue's newly observed commit into the validation pipeline and returns the +// request ID validating it. +// +// It is idempotent and runs to completion on every call, each step tolerant of having already +// run: it resolves (or claims) the (queue, URI) mapping, ensures the Request row exists, and +// publishes the request to the process stage. A retry after a partial failure — for example the +// URI mapping committed but the request write failed — completes the missing steps instead of +// returning a dangling reference. The (queue, URI) mapping is the dedup gate, so concurrent +// ingests of the same head converge on one request. func (c *IngestController) Ingest(ctx context.Context, req *pb.IngestRequest) (resp *pb.IngestResponse, retErr error) { const opName = "ingest" @@ -68,29 +96,142 @@ func (c *IngestController) Ingest(ctx context.Context, req *pb.IngestRequest) (r if req.Queue == "" { return nil, fmt.Errorf("IngestController requires the request to have a queue name specified: %w", ErrInvalidRequest) } - queue := req.Queue - // Generate a globally unique request ID namespaced by the queue. The counter domain + // Resolve the queue's current head commit to its opaque URI via SourceControl. + // An unresolvable queue/ref is a caller error (unknown queue), not infrastructure. + sc, err := c.sourceControl.For(sourcecontrol.Config{QueueName: queue}) + if err != nil { + return nil, fmt.Errorf("IngestController failed to resolve source control for queue=%s: %w", queue, err) + } + uri, err := sc.Latest(ctx) + if err != nil { + if sourcecontrol.IsNotFound(err) { + return nil, fmt.Errorf("IngestController could not resolve head for queue=%s: %w: %w", queue, err, ErrInvalidRequest) + } + return nil, fmt.Errorf("IngestController failed to resolve head for queue=%s: %w", queue, err) + } + + // The (queue, URI) mapping is the dedup gate and the source of truth for "does this head + // have a request id". + id, err := c.resolveID(ctx, queue, uri) + if err != nil { + return nil, err + } + + // Ensure the request row exists, healing a prior partial write where the mapping committed + // but the request did not. + request, err := c.ensureRequest(ctx, id, queue, uri) + if err != nil { + return nil, err + } + + // Publish while the request is still pre-pipeline (Accepted). The process consumer is + // idempotent (keyed on the request id, at-least-once), so re-publishing on a retry or a + // duplicate report is safe and closes the "request created but publish failed" gap. Once + // process advances the request past Accepted, ingest stops re-publishing. + if request.State == entity.RequestStateAccepted { + if err := c.publishProcess(ctx, id, queue); err != nil { + return nil, fmt.Errorf("IngestController failed to publish request %s to process: %w", id, err) + } + } + + c.logger.Infow("ingested request", + "id", request.ID, + "queue", request.Queue, + "uri", request.URI, + "state", request.State, + ) + + return &pb.IngestResponse{Id: id}, nil +} + +// resolveID returns the request id mapped to (queue, URI), minting and claiming a new one if the +// pair is not yet mapped. Claiming the mapping is the dedup gate: a concurrent ingest that loses +// the claim re-reads and returns the winner's id, so no orphan request row is created (only the +// minted counter value is spent). +func (c *IngestController) resolveID(ctx context.Context, queue, uri string) (string, error) { + uriStore := c.store.GetRequestURIStore() + + if id, err := uriStore.GetIDByURI(ctx, queue, uri); err == nil { + return id, nil + } else if !errors.Is(err, storage.ErrNotFound) { + return "", fmt.Errorf("IngestController failed to look up existing request for queue=%s: %w", queue, err) + } + + // Mint a globally unique request ID namespaced by the queue. The counter domain // ("request/") doubles as the ID prefix, so the ID is "/". domain := "request/" + queue seq, err := c.counter.Next(ctx, domain) if err != nil { - return nil, fmt.Errorf("IngestController failed to generate request ID for queue=%s: %w", queue, err) + return "", fmt.Errorf("IngestController failed to generate request ID for queue=%s: %w", queue, err) + } + id := fmt.Sprintf("%s/%d", domain, seq) + + if err := uriStore.Create(ctx, queue, uri, id); err != nil { + if errors.Is(err, storage.ErrAlreadyExists) { + existing, getErr := uriStore.GetIDByURI(ctx, queue, uri) + if getErr != nil { + return "", fmt.Errorf("IngestController failed to resolve raced request for queue=%s: %w", queue, getErr) + } + return existing, nil + } + return "", fmt.Errorf("IngestController failed to map URI for queue=%s: %w", queue, err) + } + return id, nil +} + +// ensureRequest returns the request for id, creating it in the Accepted state if it does not yet +// exist. A concurrent creator (ErrAlreadyExists) is resolved by re-reading the canonical row. +func (c *IngestController) ensureRequest(ctx context.Context, id, queue, uri string) (entity.Request, error) { + reqStore := c.store.GetRequestStore() + + got, err := reqStore.Get(ctx, id) + if err == nil { + return got, nil + } + if !errors.Is(err, storage.ErrNotFound) { + return entity.Request{}, fmt.Errorf("IngestController failed to load request %s: %w", id, err) } request := entity.Request{ - ID: fmt.Sprintf("%s/%d", domain, seq), + ID: id, Queue: queue, + URI: uri, State: entity.RequestStateAccepted, Version: 1, } + if err := reqStore.Create(ctx, request); err != nil { + if !errors.Is(err, storage.ErrAlreadyExists) { + return entity.Request{}, fmt.Errorf("IngestController failed to persist request %s: %w", id, err) + } + // Raced with a concurrent creator; read the canonical row. + return reqStore.Get(ctx, id) + } + return request, nil +} + +// publishProcess publishes the request ID to the process stage, partitioned by queue so a +// queue's requests stay ordered. +func (c *IngestController) publishProcess(ctx context.Context, id, queue string) error { + payload, err := stovepipemq.Marshal(&stovepipemq.ProcessRequest{Id: id}) + if err != nil { + return fmt.Errorf("failed to serialize process request: %w", err) + } - c.logger.Infow("accepted request", - "id", request.ID, - "queue", request.Queue, - "state", request.State, - ) + msg := entityqueue.NewMessage(id, payload, queue, nil) - return &pb.IngestResponse{Id: request.ID}, nil + q, ok := c.registry.Queue(stovepipemq.TopicKeyProcess) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", stovepipemq.TopicKeyProcess) + } + topicName, ok := c.registry.TopicName(stovepipemq.TopicKeyProcess) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", stovepipemq.TopicKeyProcess) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish process request: %w", err) + } + return nil } diff --git a/stovepipe/controller/ingest_test.go b/stovepipe/controller/ingest_test.go index c0500251..484fbee1 100644 --- a/stovepipe/controller/ingest_test.go +++ b/stovepipe/controller/ingest_test.go @@ -23,48 +23,209 @@ import ( "github.com/stretchr/testify/require" "github.com/uber-go/tally" pb "github.com/uber/submitqueue/api/stovepipe/protopb" + "github.com/uber/submitqueue/platform/consumer" countermock "github.com/uber/submitqueue/platform/extension/counter/mock" + mqmock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue" + "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol" + scmock "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol/mock" + "github.com/uber/submitqueue/stovepipe/extension/storage" + storagemock "github.com/uber/submitqueue/stovepipe/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap" ) -func newIngestController(t *testing.T, c *countermock.MockCounter) *IngestController { - t.Helper() - return NewIngestController(zap.NewNop().Sugar(), tally.NewTestScope("test", nil), c) +const ( + testQueue = "monorepo/main" + testURI = "git://repo/monorepo/main/abc123" +) + +// ingestMocks bundles the mocks an Ingest test case wires expectations on. +type ingestMocks struct { + counter *countermock.MockCounter + factory *scmock.MockFactory + sc *scmock.MockSourceControl + reqStore *storagemock.MockRequestStore + uriStore *storagemock.MockRequestURIStore + publisher *mqmock.MockPublisher } -func TestIngestController_Ingest(t *testing.T) { - ctrl := gomock.NewController(t) - mockCounter := countermock.NewMockCounter(ctrl) - mockCounter.EXPECT().Next(gomock.Any(), "request/monorepo/main").Return(int64(7), nil) +func newIngestController(t *testing.T, ctrl *gomock.Controller) (*IngestController, ingestMocks) { + t.Helper() - c := newIngestController(t, mockCounter) + m := ingestMocks{ + counter: countermock.NewMockCounter(ctrl), + factory: scmock.NewMockFactory(ctrl), + sc: scmock.NewMockSourceControl(ctrl), + reqStore: storagemock.NewMockRequestStore(ctrl), + uriStore: storagemock.NewMockRequestURIStore(ctrl), + publisher: mqmock.NewMockPublisher(ctrl), + } - resp, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: "monorepo/main"}) - require.NoError(t, err) - assert.Equal(t, "request/monorepo/main/7", resp.Id) -} + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(m.reqStore).AnyTimes() + store.EXPECT().GetRequestURIStore().Return(m.uriStore).AnyTimes() + + queue := mqmock.NewMockQueue(ctrl) + queue.EXPECT().Publisher().Return(m.publisher).AnyTimes() -func TestIngestController_Ingest_EmptyQueue(t *testing.T) { - ctrl := gomock.NewController(t) - mockCounter := countermock.NewMockCounter(ctrl) - // Counter must not be consulted when the queue is missing. + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: stovepipemq.TopicKeyProcess, Name: "process", Queue: queue}, + }) + require.NoError(t, err) - c := newIngestController(t, mockCounter) + c := NewIngestController(zap.NewNop().Sugar(), tally.NewTestScope("test", nil), m.counter, m.factory, store, registry) + return c, m +} - _, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: ""}) - require.Error(t, err) - assert.True(t, IsInvalidRequest(err)) +// expectResolve wires the SourceControl factory + Latest happy path returning testURI. +func expectResolve(m ingestMocks) { + m.factory.EXPECT().For(sourcecontrol.Config{QueueName: testQueue}).Return(m.sc, nil) + m.sc.EXPECT().Latest(gomock.Any()).Return(testURI, nil) } -func TestIngestController_Ingest_CounterError(t *testing.T) { - ctrl := gomock.NewController(t) - mockCounter := countermock.NewMockCounter(ctrl) - mockCounter.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), errors.New("counter unavailable")) +func TestIngestController_Ingest(t *testing.T) { + tests := []struct { + name string + queue string + setup func(m ingestMocks) + wantID string + wantErr bool + wantInvalid bool + }{ + { + name: "happy path mints persists and publishes", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("", storage.ErrNotFound) + m.counter.EXPECT().Next(gomock.Any(), "request/"+testQueue).Return(int64(7), nil) + m.uriStore.EXPECT().Create(gomock.Any(), testQueue, testURI, "request/monorepo/main/7").Return(nil) + m.reqStore.EXPECT().Get(gomock.Any(), "request/monorepo/main/7").Return(entity.Request{}, storage.ErrNotFound) + m.reqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + m.publisher.EXPECT().Publish(gomock.Any(), "process", gomock.Any()).Return(nil) + }, + wantID: "request/monorepo/main/7", + }, + { + name: "dedup with existing accepted request republishes without minting", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("request/monorepo/main/3", nil) + m.reqStore.EXPECT().Get(gomock.Any(), "request/monorepo/main/3").Return(entity.Request{ID: "request/monorepo/main/3", State: entity.RequestStateAccepted}, nil) + m.publisher.EXPECT().Publish(gomock.Any(), "process", gomock.Any()).Return(nil) + }, + wantID: "request/monorepo/main/3", + }, + { + name: "heals when uri mapped but request missing", + queue: testQueue, + setup: func(m ingestMocks) { + // Prior attempt committed the URI mapping but failed before the request write. + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("request/monorepo/main/3", nil) + m.reqStore.EXPECT().Get(gomock.Any(), "request/monorepo/main/3").Return(entity.Request{}, storage.ErrNotFound) + m.reqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + m.publisher.EXPECT().Publish(gomock.Any(), "process", gomock.Any()).Return(nil) + }, + wantID: "request/monorepo/main/3", + }, + { + name: "dedup race returns winner id and completes", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("", storage.ErrNotFound) + m.counter.EXPECT().Next(gomock.Any(), "request/"+testQueue).Return(int64(7), nil) + m.uriStore.EXPECT().Create(gomock.Any(), testQueue, testURI, "request/monorepo/main/7").Return(storage.ErrAlreadyExists) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("request/monorepo/main/3", nil) + m.reqStore.EXPECT().Get(gomock.Any(), "request/monorepo/main/3").Return(entity.Request{ID: "request/monorepo/main/3", State: entity.RequestStateAccepted}, nil) + m.publisher.EXPECT().Publish(gomock.Any(), "process", gomock.Any()).Return(nil) + }, + wantID: "request/monorepo/main/3", + }, + { + name: "empty queue is invalid", + queue: "", + setup: func(m ingestMocks) {}, + wantErr: true, + wantInvalid: true, + }, + { + name: "unknown queue head is invalid", + queue: testQueue, + setup: func(m ingestMocks) { + m.factory.EXPECT().For(sourcecontrol.Config{QueueName: testQueue}).Return(m.sc, nil) + m.sc.EXPECT().Latest(gomock.Any()).Return("", sourcecontrol.ErrNotFound) + }, + wantErr: true, + wantInvalid: true, + }, + { + name: "source control infra error is not invalid", + queue: testQueue, + setup: func(m ingestMocks) { + m.factory.EXPECT().For(sourcecontrol.Config{QueueName: testQueue}).Return(m.sc, nil) + m.sc.EXPECT().Latest(gomock.Any()).Return("", errors.New("sc unavailable")) + }, + wantErr: true, + }, + { + name: "counter error", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("", storage.ErrNotFound) + m.counter.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), errors.New("counter unavailable")) + }, + wantErr: true, + }, + { + name: "request store create error", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("", storage.ErrNotFound) + m.counter.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(7), nil) + m.uriStore.EXPECT().Create(gomock.Any(), testQueue, testURI, gomock.Any()).Return(nil) + m.reqStore.EXPECT().Get(gomock.Any(), gomock.Any()).Return(entity.Request{}, storage.ErrNotFound) + m.reqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(errors.New("db down")) + }, + wantErr: true, + }, + { + name: "publish error", + queue: testQueue, + setup: func(m ingestMocks) { + expectResolve(m) + m.uriStore.EXPECT().GetIDByURI(gomock.Any(), testQueue, testURI).Return("", storage.ErrNotFound) + m.counter.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(7), nil) + m.uriStore.EXPECT().Create(gomock.Any(), testQueue, testURI, gomock.Any()).Return(nil) + m.reqStore.EXPECT().Get(gomock.Any(), gomock.Any()).Return(entity.Request{}, storage.ErrNotFound) + m.reqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + m.publisher.EXPECT().Publish(gomock.Any(), "process", gomock.Any()).Return(errors.New("queue down")) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + c, m := newIngestController(t, ctrl) + tt.setup(m) - c := newIngestController(t, mockCounter) + resp, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: tt.queue}) - _, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: "monorepo/main"}) - require.Error(t, err) - assert.False(t, IsInvalidRequest(err)) + if tt.wantErr { + require.Error(t, err) + assert.Equal(t, tt.wantInvalid, IsInvalidRequest(err)) + return + } + require.NoError(t, err) + assert.Equal(t, tt.wantID, resp.Id) + }) + } } diff --git a/stovepipe/controller/process/BUILD.bazel b/stovepipe/controller/process/BUILD.bazel new file mode 100644 index 00000000..8298b0fc --- /dev/null +++ b/stovepipe/controller/process/BUILD.bazel @@ -0,0 +1,38 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "process", + srcs = ["process.go"], + importpath = "github.com/uber/submitqueue/stovepipe/controller/process", + visibility = ["//visibility:public"], + deps = [ + "//platform/consumer", + "//platform/errs", + "//platform/metrics", + "//stovepipe/core/messagequeue", + "//stovepipe/extension/storage", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "process_test", + srcs = ["process_test.go"], + embed = [":process"], + deps = [ + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/errs", + "//platform/extension/messagequeue/mock", + "//stovepipe/core/messagequeue", + "//stovepipe/entity", + "//stovepipe/extension/storage", + "//stovepipe/extension/storage/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//:zap", + ], +) diff --git a/stovepipe/controller/process/process.go b/stovepipe/controller/process/process.go new file mode 100644 index 00000000..71da8a54 --- /dev/null +++ b/stovepipe/controller/process/process.go @@ -0,0 +1,121 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package process holds the process-stage queue controller. It consumes the +// request ids ingest publishes, reloads the Request from storage, and (in a +// future change) decides the build strategy by asking SourceControl how the new +// head relates to the queue's last-green URI. For now it is a thin consumer that +// reloads and logs the request, establishing the stage and its wiring. +package process + +import ( + "context" + "errors" + "fmt" + + "github.com/uber-go/tally" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + "github.com/uber/submitqueue/platform/metrics" + stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue" + "github.com/uber/submitqueue/stovepipe/extension/storage" + "go.uber.org/zap" +) + +// Controller consumes ProcessRequest messages from the process stage, reloads the +// referenced Request from storage, and logs it. Implements consumer.Controller. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new process controller. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + topicKey consumer.TopicKey, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("process_controller"), + metricsScope: scope.SubScope("process_controller"), + store: store, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process reloads the request referenced by the delivery and logs it. Returns nil +// to ack (success) or an error to nack (retry). A not-yet-visible request is +// retryable: ingest persists and publishes, but a stale read may not see the row +// yet, so redelivery converges. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + pr := &stovepipemq.ProcessRequest{} + if err := stovepipemq.Unmarshal(msg.Payload, pr); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + // Non-retryable: a malformed message will never succeed regardless of retries. + return fmt.Errorf("failed to deserialize process request: %w", err) + } + + request, err := c.store.GetRequestStore().Get(ctx, pr.Id) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + if errors.Is(err, storage.ErrNotFound) { + // Retryable: the request row may not be visible yet; redelivery converges. + return errs.NewRetryableError(fmt.Errorf("request %s not found yet: %w", pr.Id, err)) + } + return fmt.Errorf("failed to load request %s: %w", pr.Id, err) + } + + c.logger.Infow("processing request", + "request_id", request.ID, + "queue", request.Queue, + "uri", request.URI, + "state", string(request.State), + "version", request.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "process" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/stovepipe/controller/process/process_test.go b/stovepipe/controller/process/process_test.go new file mode 100644 index 00000000..e9083bbc --- /dev/null +++ b/stovepipe/controller/process/process_test.go @@ -0,0 +1,99 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package process + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/errs" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue" + "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension/storage" + storagemock "github.com/uber/submitqueue/stovepipe/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap" +) + +const testID = "request/monorepo/main/7" + +func newController(t *testing.T, ctrl *gomock.Controller) (*Controller, *storagemock.MockRequestStore) { + t.Helper() + reqStore := storagemock.NewMockRequestStore(ctrl) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + c := NewController(zap.NewNop().Sugar(), tally.NewTestScope("test", nil), store, stovepipemq.TopicKeyProcess, "stovepipe-process") + return c, reqStore +} + +// delivery wraps raw payload bytes in a MockDelivery (which satisfies consumer.Delivery). +func delivery(t *testing.T, ctrl *gomock.Controller, payload []byte) consumer.Delivery { + t.Helper() + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(entityqueue.NewMessage(testID, payload, "monorepo/main", nil)).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + return d +} + +func processPayload(t *testing.T, id string) []byte { + t.Helper() + b, err := stovepipemq.Marshal(&stovepipemq.ProcessRequest{Id: id}) + require.NoError(t, err) + return b +} + +func TestProcess_Success(t *testing.T) { + ctrl := gomock.NewController(t) + c, reqStore := newController(t, ctrl) + reqStore.EXPECT().Get(gomock.Any(), testID).Return(entity.Request{ID: testID, Queue: "monorepo/main", URI: "git://x", State: entity.RequestStateAccepted, Version: 1}, nil) + + require.NoError(t, c.Process(context.Background(), delivery(t, ctrl, processPayload(t, testID)))) +} + +func TestProcess_NotFoundIsRetryable(t *testing.T) { + ctrl := gomock.NewController(t) + c, reqStore := newController(t, ctrl) + reqStore.EXPECT().Get(gomock.Any(), testID).Return(entity.Request{}, storage.ErrNotFound) + + err := c.Process(context.Background(), delivery(t, ctrl, processPayload(t, testID))) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err), "a not-yet-visible request must be retryable") +} + +func TestProcess_StorageErrorNotRetryable(t *testing.T) { + ctrl := gomock.NewController(t) + c, reqStore := newController(t, ctrl) + reqStore.EXPECT().Get(gomock.Any(), testID).Return(entity.Request{}, errors.New("db down")) + + err := c.Process(context.Background(), delivery(t, ctrl, processPayload(t, testID))) + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) +} + +func TestProcess_MalformedPayload(t *testing.T) { + ctrl := gomock.NewController(t) + c, _ := newController(t, ctrl) + + err := c.Process(context.Background(), delivery(t, ctrl, []byte("not-json"))) + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) +} diff --git a/stovepipe/core/messagequeue/BUILD.bazel b/stovepipe/core/messagequeue/BUILD.bazel new file mode 100644 index 00000000..f52c5df2 --- /dev/null +++ b/stovepipe/core/messagequeue/BUILD.bazel @@ -0,0 +1,29 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "messagequeue", + srcs = [ + "messagequeue.go", + "topics.go", + ], + importpath = "github.com/uber/submitqueue/stovepipe/core/messagequeue", + visibility = ["//visibility:public"], + deps = [ + "//api/base/messagequeue/protopb", # keep + "//platform/consumer", + "//stovepipe/core/messagequeue/protopb", # keep + "@org_golang_google_protobuf//encoding/protojson", + "@org_golang_google_protobuf//proto", + ], +) + +go_test( + name = "messagequeue_test", + srcs = ["process_test.go"], + embed = [":messagequeue"], + deps = [ + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_protobuf//proto", + ], +) diff --git a/stovepipe/core/messagequeue/README.md b/stovepipe/core/messagequeue/README.md new file mode 100644 index 00000000..2a4b32a3 --- /dev/null +++ b/stovepipe/core/messagequeue/README.md @@ -0,0 +1,11 @@ +# Stovepipe internal message-queue contract + +Wire payloads for the queues internal to the Stovepipe pipeline. It is **internal** — used only within the Stovepipe domain — so it lives under `stovepipe/core` rather than `api/` (Bazel visibility keeps it domain-scoped). + +Payloads are defined in proto3 (`proto/`, generated into `protopb/`) and serialized as **protobuf JSON** (protojson), so the MySQL-backed queue keeps storing self-describing JSON. The contract package adds only generic glue — `Marshal`/`Unmarshal` and the `TopicKeys` reflection lookup — and owns the `TopicKey` constants for the stages it carries. Each payload declares the topic key(s) that carry it via the `topic_keys` proto option (defined in `api/base/messagequeue`); a contract test round-trips every payload and asserts each topic key is bound to exactly one message. + +## Stages + +- **process** (`TopicKeyProcess`, `ProcessRequest`) — ingest publishes the minted request id here once it accepts a new head; the process controller reloads the `Request` from storage and decides the build strategy. Only the id travels: producer and consumer share the store, so messages stay small and redelivery is idempotent. + +See [doc/rfc/messagequeue-contract.md](../../../doc/rfc/messagequeue-contract.md) for the contract conventions and `api/runway/messagequeue` for the external reference example. diff --git a/stovepipe/core/messagequeue/messagequeue.go b/stovepipe/core/messagequeue/messagequeue.go new file mode 100644 index 00000000..f8e03fb6 --- /dev/null +++ b/stovepipe/core/messagequeue/messagequeue.go @@ -0,0 +1,81 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package messagequeue holds Stovepipe's internal message-queue contract: the +// wire payloads for the pipeline queues Stovepipe owns, defined by the proto +// files in proto/ and generated into protopb/. The proto is the language-neutral +// authority; the generated Go types in protopb are the binding for Go callers. +// +// It is internal — used only within the Stovepipe domain — so it lives under +// stovepipe/core rather than api/. The message types are generated into protopb; +// this package adds only generic protojson glue (Marshal/Unmarshal) and the +// topic-key reflection lookup (TopicKeys), so there is no per-message +// serialization code. Payloads are serialized as protobuf JSON, not binary, so +// the MySQL-backed queue keeps storing self-describing JSON. The topic key that +// carries each payload is declared on the message itself via the topic_keys +// proto option (see api/base/messagequeue). +package messagequeue + +import ( + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + basemqpb "github.com/uber/submitqueue/api/base/messagequeue/protopb" + "github.com/uber/submitqueue/stovepipe/core/messagequeue/protopb" +) + +// Wire payload types. These alias the generated protobuf bindings so callers +// reference the contract through this curated package rather than protopb. +type ( + // ProcessRequest is the payload ingest publishes to the process stage: the + // minted request id to validate. + ProcessRequest = protopb.ProcessRequest +) + +// marshalOpts keeps the JSON field names identical to the proto field names +// (snake_case), so the wire shape matches the declared contract rather than +// protojson's default lowerCamelCase. Zero-valued fields are omitted. +var marshalOpts = protojson.MarshalOptions{UseProtoNames: true} + +// unmarshalOpts tolerates unknown fields so an additive contract change (a new +// field a producer sends but this consumer does not yet know) is ignored rather +// than rejected. +var unmarshalOpts = protojson.UnmarshalOptions{DiscardUnknown: true} + +// Marshal serializes any contract message to protojson bytes for the queue +// payload, keeping the proto field names (snake_case) on the wire. +func Marshal(m proto.Message) ([]byte, error) { + return marshalOpts.Marshal(m) +} + +// Unmarshal deserializes protojson bytes into the contract message m, tolerating +// unknown fields so an additive contract change is ignored rather than rejected. +func Unmarshal[T proto.Message](b []byte, m T) error { + return unmarshalOpts.Unmarshal(b, m) +} + +// TopicKeys returns the stable logical topic keys bound to a message via the +// topic_keys proto option — not concrete wire names; a caller maps each key to +// its backend's topic name. Returns nil for a message that declares no keys. +func TopicKeys(m proto.Message) []string { + opts := m.ProtoReflect().Descriptor().Options() + if opts == nil { + return nil + } + keys, ok := proto.GetExtension(opts, basemqpb.E_TopicKeys).([]string) + if !ok { + return nil + } + return keys +} diff --git a/stovepipe/core/messagequeue/process_test.go b/stovepipe/core/messagequeue/process_test.go new file mode 100644 index 00000000..4c9345f3 --- /dev/null +++ b/stovepipe/core/messagequeue/process_test.go @@ -0,0 +1,70 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package messagequeue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +func TestProcessRequestRoundTrip(t *testing.T) { + req := &ProcessRequest{Id: "request/monorepo/main/42"} + + data, err := Marshal(req) + require.NoError(t, err) + + got := &ProcessRequest{} + require.NoError(t, Unmarshal(data, got)) + assert.True(t, proto.Equal(req, got), "round-tripped ProcessRequest should equal the original") +} + +// TestWireFormat locks the protojson encoding decision the contract relies on: +// snake_case field names (UseProtoNames). +func TestWireFormat(t *testing.T) { + data, err := Marshal(&ProcessRequest{Id: "request/monorepo/main/42"}) + require.NoError(t, err) + + assert.Contains(t, string(data), `"id"`, "fields must serialize as snake_case") +} + +// TestTopicKeysBindEveryTopicKey is the topic-binding drift guard: every +// Stovepipe topic key is carried by exactly one message's topic_keys option, and +// no topic_keys option names an unknown key. +func TestTopicKeysBindEveryTopicKey(t *testing.T) { + bound := map[string]int{} + for _, m := range []proto.Message{&ProcessRequest{}} { + keys := TopicKeys(m) + require.NotEmpty(t, keys, "message must declare a non-empty topic_keys option") + for _, key := range keys { + bound[key]++ + } + } + + keys := []TopicKey{ + TopicKeyProcess, + } + + valid := map[string]bool{} + for _, k := range keys { + valid[k.String()] = true + assert.Equalf(t, 1, bound[k.String()], "topic key %q must be bound to exactly one message via the topic_keys option", k) + } + for key := range bound { + assert.Truef(t, valid[key], "topic_keys option names unknown key %q", key) + } +} diff --git a/stovepipe/core/messagequeue/proto/BUILD.bazel b/stovepipe/core/messagequeue/proto/BUILD.bazel new file mode 100644 index 00000000..047be015 --- /dev/null +++ b/stovepipe/core/messagequeue/proto/BUILD.bazel @@ -0,0 +1,47 @@ +load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//proto:def.bzl", "go_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") + +exports_files( + ["process.proto"], + visibility = ["//tool/proto:__pkg__"], +) + +proto_library( + name = "processpb_proto", + srcs = ["process.proto"], + visibility = ["//visibility:public"], + deps = [ + "//api/base/messagequeue/proto:messagequeuepb_proto", + ], +) + +# keep +go_proto_library( + name = "processpb_go_proto", + compilers = [ + "@rules_go//proto:go_proto", + "@rules_go//proto:go_grpc_v2", + ], + importpath = "github.com/uber/submitqueue/stovepipe/core/messagequeue/proto", + proto = ":processpb_proto", + visibility = ["//visibility:public"], + # keep + deps = [ + "//api/base/messagequeue/proto:messagequeuepb_go_proto", + ], +) + +go_library( + name = "proto", + embed = [":processpb_go_proto"], + importpath = "github.com/uber/submitqueue/stovepipe/core/messagequeue/proto", + visibility = ["//visibility:public"], +) + +go_library( + name = "protopb", + embed = [":processpb_go_proto"], + importpath = "github.com/uber/submitqueue/stovepipe/core/messagequeue/protopb", + visibility = ["//visibility:public"], +) diff --git a/stovepipe/core/messagequeue/proto/process.proto b/stovepipe/core/messagequeue/proto/process.proto new file mode 100644 index 00000000..999e149c --- /dev/null +++ b/stovepipe/core/messagequeue/proto/process.proto @@ -0,0 +1,35 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package uber.stovepipe.messagequeue; + +import "api/base/messagequeue/proto/messagequeue.proto"; + +option go_package = "github.com/uber/submitqueue/stovepipe/core/messagequeue/protopb"; +option java_multiple_files = true; +option java_outer_classname = "ProcessProto"; +option java_package = "com.uber.submitqueue.stovepipe.messagequeue"; + +// ProcessRequest is the payload ingest publishes to the process stage once it has +// accepted a new head: only the minted request id travels on the queue. process +// reloads the full Request from storage by this id (producer and consumer share +// the store, so the id is enough and redelivery stays idempotent). +message ProcessRequest { + option (uber.base.messagequeue.topic_keys) = "process"; + + // id is the minted request id to process. Format: "request//". + string id = 1; +} diff --git a/stovepipe/core/messagequeue/protopb/BUILD.bazel b/stovepipe/core/messagequeue/protopb/BUILD.bazel new file mode 100644 index 00000000..4ae8bd74 --- /dev/null +++ b/stovepipe/core/messagequeue/protopb/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "protopb", + srcs = ["process.pb.go"], + importpath = "github.com/uber/submitqueue/stovepipe/core/messagequeue/protopb", + visibility = ["//visibility:public"], + deps = [ + "//api/base/messagequeue/protopb", # keep + "@org_golang_google_protobuf//reflect/protoreflect", + "@org_golang_google_protobuf//runtime/protoimpl", + ], +) diff --git a/stovepipe/core/messagequeue/protopb/process.pb.go b/stovepipe/core/messagequeue/protopb/process.pb.go new file mode 100644 index 00000000..5cffa3c2 --- /dev/null +++ b/stovepipe/core/messagequeue/protopb/process.pb.go @@ -0,0 +1,144 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v5.29.3 +// source: process.proto + +package protopb + +import ( + reflect "reflect" + sync "sync" + unsafe "unsafe" + + _ "github.com/uber/submitqueue/api/base/messagequeue/protopb" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// ProcessRequest is the payload ingest publishes to the process stage once it has +// accepted a new head: only the minted request id travels on the queue. process +// reloads the full Request from storage by this id (producer and consumer share +// the store, so the id is enough and redelivery stays idempotent). +type ProcessRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // id is the minted request id to process. Format: "request//". + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ProcessRequest) Reset() { + *x = ProcessRequest{} + mi := &file_process_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ProcessRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProcessRequest) ProtoMessage() {} + +func (x *ProcessRequest) ProtoReflect() protoreflect.Message { + mi := &file_process_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProcessRequest.ProtoReflect.Descriptor instead. +func (*ProcessRequest) Descriptor() ([]byte, []int) { + return file_process_proto_rawDescGZIP(), []int{0} +} + +func (x *ProcessRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +var File_process_proto protoreflect.FileDescriptor + +const file_process_proto_rawDesc = "" + + "\n" + + "\rprocess.proto\x12\x1buber.stovepipe.messagequeue\x1a.api/base/messagequeue/proto/messagequeue.proto\"-\n" + + "\x0eProcessRequest\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id:\v\x8a\xb5\x18\aprocessB~\n" + + "+com.uber.submitqueue.stovepipe.messagequeueB\fProcessProtoP\x01Z?github.com/uber/submitqueue/stovepipe/core/messagequeue/protopbb\x06proto3" + +var ( + file_process_proto_rawDescOnce sync.Once + file_process_proto_rawDescData []byte +) + +func file_process_proto_rawDescGZIP() []byte { + file_process_proto_rawDescOnce.Do(func() { + file_process_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_process_proto_rawDesc), len(file_process_proto_rawDesc))) + }) + return file_process_proto_rawDescData +} + +var file_process_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_process_proto_goTypes = []any{ + (*ProcessRequest)(nil), // 0: uber.stovepipe.messagequeue.ProcessRequest +} +var file_process_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_process_proto_init() } +func file_process_proto_init() { + if File_process_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_process_proto_rawDesc), len(file_process_proto_rawDesc)), + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_process_proto_goTypes, + DependencyIndexes: file_process_proto_depIdxs, + MessageInfos: file_process_proto_msgTypes, + }.Build() + File_process_proto = out.File + file_process_proto_goTypes = nil + file_process_proto_depIdxs = nil +} diff --git a/stovepipe/core/messagequeue/topics.go b/stovepipe/core/messagequeue/topics.go new file mode 100644 index 00000000..f4a0109c --- /dev/null +++ b/stovepipe/core/messagequeue/topics.go @@ -0,0 +1,30 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package messagequeue + +import "github.com/uber/submitqueue/platform/consumer" + +// TopicKey is the typed identifier used to look up a queue backend, topic name, +// and subscription config in a consumer.TopicRegistry. The constants below are +// the logical topic keys for Stovepipe's internal pipeline stages; they are the +// same strings each message lists in its topic_keys option. +type TopicKey = consumer.TopicKey + +const ( + // TopicKeyProcess carries newly accepted requests from ingest to the process + // stage. ingest publishes a ProcessRequest (the request id) here; the process + // controller consumes it, reloads the Request, and decides the build strategy. + TopicKeyProcess TopicKey = "process" +) diff --git a/test/integration/stovepipe/BUILD.bazel b/test/integration/stovepipe/BUILD.bazel index f91f8042..8f8acfef 100644 --- a/test/integration/stovepipe/BUILD.bazel +++ b/test/integration/stovepipe/BUILD.bazel @@ -7,6 +7,8 @@ go_test( "//:MODULE.bazel", "//:go.mod", "//example/stovepipe:docker-compose.yml", + "//platform/extension/messagequeue/mysql/schema", + "//stovepipe/extension/storage/mysql/schema", ], tags = [ "external", @@ -15,6 +17,7 @@ go_test( deps = [ "//api/stovepipe/protopb", "//test/testutil", + "@com_github_go_sql_driver_mysql//:mysql", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", diff --git a/test/integration/stovepipe/suite_test.go b/test/integration/stovepipe/suite_test.go index 871fd78a..79f9edc3 100644 --- a/test/integration/stovepipe/suite_test.go +++ b/test/integration/stovepipe/suite_test.go @@ -18,8 +18,9 @@ package stovepipe // // These tests use compose from example/stovepipe/docker-compose.yml and require // a pre-built Linux binary (make integration-test runs //test/integration/... -// and builds all Linux binaries via build-all-linux). Stovepipe is currently a -// Ping-only service with no storage or queue dependencies. +// and builds all Linux binaries via build-all-linux). The stack runs the +// Stovepipe gRPC service plus a storage MySQL (request, request_uri) and a queue +// MySQL (process stage). // // Run with: // make integration-test @@ -28,9 +29,11 @@ package stovepipe import ( "context" + "database/sql" "path/filepath" "testing" + _ "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -41,10 +44,12 @@ import ( type StovepipeIntegrationSuite struct { suite.Suite - ctx context.Context - log *testutil.TestLogger - stack *testutil.ComposeStack - client pb.StovepipeClient + ctx context.Context + log *testutil.TestLogger + stack *testutil.ComposeStack + client pb.StovepipeClient + db *sql.DB // storage database (request, request_uri) + queueDB *sql.DB // queue database } func TestStovepipeIntegration(t *testing.T) { @@ -67,6 +72,17 @@ func (s *StovepipeIntegrationSuite) SetupSuite() { err := s.stack.Up() require.NoError(t, err, "failed to start compose stack") + s.db, err = s.stack.ConnectMySQLService("mysql-app") + require.NoError(t, err, "failed to connect to storage MySQL") + + s.queueDB, err = s.stack.ConnectMySQLService("mysql-queue") + require.NoError(t, err, "failed to connect to queue MySQL") + + // Apply schemas after the stack is up; the service connects lazily and the + // consumer retries, so the boot ordering is tolerated. + testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("stovepipe/extension/storage/mysql/schema")) + testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("platform/extension/messagequeue/mysql/schema")) + var conn *grpc.ClientConn conn, err = s.stack.ConnectGRPC("stovepipe-service", 8080) require.NoError(t, err, "failed to connect to stovepipe service") @@ -86,3 +102,46 @@ func (s *StovepipeIntegrationSuite) TestPingAPI() { assert.NotEmpty(t, resp.Message) assert.NotZero(t, resp.Timestamp) } + +// TestIngestAPI exercises the full ingest path: the controller resolves the head +// URI via the (fake) SourceControl, persists the Request and its (queue, URI) +// mapping, and publishes the request id to the process stage. A second ingest of +// the same queue resolves the same head and dedups to the same id. +func (s *StovepipeIntegrationSuite) TestIngestAPI() { + t := s.T() + + const queue = "monorepo/main" + + resp, err := s.client.Ingest(s.ctx, &pb.IngestRequest{Queue: queue}) + require.NoError(t, err, "Ingest failed") + require.NotEmpty(t, resp.Id, "minted request id should not be empty") + id := resp.Id + + // Request persisted. + var reqCount int + require.NoError(t, s.db.QueryRow("SELECT COUNT(*) FROM request WHERE id = ?", id).Scan(&reqCount)) + assert.Equal(t, 1, reqCount, "request row should be persisted") + + // (queue, URI) mapping persisted and points at the minted id. + var mappedID string + require.NoError(t, s.db.QueryRow("SELECT request_id FROM request_uri WHERE queue = ?", queue).Scan(&mappedID)) + assert.Equal(t, id, mappedID, "URI mapping should point at the minted request id") + + // Message published to the process topic. + var msgCount int + require.NoError(t, s.queueDB.QueryRow("SELECT COUNT(*) FROM queue_messages WHERE id = ?", id).Scan(&msgCount)) + assert.Equal(t, 1, msgCount, "should have published one process message") + + // Re-ingesting the same queue resolves the same head URI and dedups. + resp2, err := s.client.Ingest(s.ctx, &pb.IngestRequest{Queue: queue}) + require.NoError(t, err, "second Ingest failed") + assert.Equal(t, id, resp2.Id, "re-ingest of the same head should dedup to the same id") +} + +// TestIngestEmptyQueue verifies the request-validation error surfaces over gRPC. +func (s *StovepipeIntegrationSuite) TestIngestEmptyQueue() { + t := s.T() + + _, err := s.client.Ingest(s.ctx, &pb.IngestRequest{Queue: ""}) + require.Error(t, err, "Ingest with empty queue should fail") +} diff --git a/tool/proto/BUILD.bazel b/tool/proto/BUILD.bazel index 5719d284..f710f7c0 100644 --- a/tool/proto/BUILD.bazel +++ b/tool/proto/BUILD.bazel @@ -63,6 +63,17 @@ go_proto_generated_files( out_dir = "api_stovepipe", ) +# Stovepipe internal queue contract (message-only, no RPC service). +go_proto_generated_files( + name = "stovepipe_core_messagequeue", + srcs = ["//stovepipe/core/messagequeue/proto:process.proto"], + gen_services = False, + imports = [ + "//api/base/messagequeue/proto:messagequeue.proto", + ], + out_dir = "stovepipe_core_messagequeue", +) + filegroup( name = "generated", srcs = [ @@ -74,5 +85,6 @@ filegroup( ":api_stovepipe", ":api_submitqueue_gateway", ":api_submitqueue_orchestrator", + ":stovepipe_core_messagequeue", ], )