Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ GOIMPORTS_VERSION ?= v0.33.0
# (the out_dir convention in tool/proto/BUILD.bazel) and copied back here. A
# package may hold multiple .proto files (e.g. an RPC contract plus messagequeue
# contracts); all generated stubs land in the same protopb/ dir.
PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/runway api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe
PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/runway api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe stovepipe/core/messagequeue

# Set REPO_ROOT for docker-compose
export REPO_ROOT := $(shell pwd)
Expand All @@ -51,7 +51,7 @@ define assert_clean
fi
endef

.PHONY: build build-all-linux build-runway-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-runway-start local-runway-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-logs local-stovepipe-start local-stovepipe-stop mocks proto query-deps query-targets run-client-runway run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help
.PHONY: build build-all-linux build-runway-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-init-stovepipe-schemas local-runway-start local-runway-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-logs local-stovepipe-start local-stovepipe-stop mocks proto query-deps query-targets run-client-runway run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help


build: ## Build all services and examples
Expand Down Expand Up @@ -221,6 +221,19 @@ local-init-runway-queue-schema: ## Apply queue schema only (mysql-queue) for Run
done
@echo "✅ Runway queue schema applied successfully"

local-init-stovepipe-schemas: ## Apply storage (mysql-app) and queue (mysql-queue) schemas for Stovepipe compose stacks
@echo "Applying storage schema to mysql-app..."
@for file in stovepipe/extension/storage/mysql/schema/*.sql; do \
echo " - Applying $$(basename $$file)..."; \
docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-app-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \
done
@echo "Applying queue schema to mysql-queue..."
@for file in platform/extension/messagequeue/mysql/schema/*.sql; do \
echo " - Applying $$(basename $$file)..."; \
docker exec -i $(STOVEPIPE_LOCAL_PROJECT)-mysql-queue-1 mysql -uroot -proot submitqueue < $$file 2>&1 | grep -v "Using a password" || true; \
done
@echo "✅ Stovepipe schemas applied successfully"

local-runway-start: build-runway-linux ## Start Runway locally (runway + MySQL queue)
@echo "Starting Runway with compose..."
@$(COMPOSE) -f $(RUNWAY_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) up -d --build --wait
Expand Down Expand Up @@ -310,9 +323,11 @@ local-stop: ## Stop all services (keep data)
local-stovepipe-logs: ## View logs from the running Stovepipe service
@$(COMPOSE) -f $(STOVEPIPE_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) logs -f

local-stovepipe-start: build-stovepipe-linux ## Start Stovepipe service (single Ping-only gRPC service)
local-stovepipe-start: build-stovepipe-linux ## Start Stovepipe service (gRPC service + MySQL storage + MySQL queue)
@echo "Starting Stovepipe service with compose..."
@$(COMPOSE) -f $(STOVEPIPE_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) up -d --build --wait
@echo "Applying storage and queue schemas..."
@$(MAKE) -s local-init-stovepipe-schemas
@echo ""
@echo "✅ Stovepipe service is running!"
@echo ""
Expand Down
46 changes: 44 additions & 2 deletions example/stovepipe/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Docker Compose for the Stovepipe service.
#
# Stovepipe is currently a single Ping-only gRPC service with no storage or
# queue dependencies, so this stack runs just the one service.
# Stovepipe ingests a queue's head commit, persists the Request, and publishes it
# to the process stage over a messaging queue; the process consumer reloads the
# Request. The stack therefore runs the service plus two MySQL databases: one for
# storage (request, request_uri) and one for the messaging queue.
#
# IMPORTANT: Before running compose, build the Linux binary:
# make build-stovepipe-linux
Expand All @@ -12,8 +14,40 @@
# Quick start:
# make local-stovepipe-start
#
# After `up`, the storage and queue schemas are applied (local-init-stovepipe-schemas).

services:
# Storage database - Stovepipe's request and request_uri tables.
mysql-app:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: submitqueue
ports:
- "3306" # Random ephemeral port to avoid conflicts
healthcheck:
# Use 127.0.0.1 (TCP) instead of localhost (Unix socket). MySQL treats
# "localhost" as a socket connection, which can be ready before the TCP
# listener — causing dependent services that connect over TCP to fail.
test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"]
interval: 5s
timeout: 5s
retries: 10

# Queue database - messaging infrastructure (messages, offsets, partition leases).
mysql-queue:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: submitqueue
ports:
- "3306" # Random ephemeral port to avoid conflicts
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"]
interval: 5s
timeout: 5s
retries: 10

stovepipe-service:
build:
context: ${REPO_ROOT}
Expand All @@ -22,3 +56,11 @@ services:
- "8080" # Random ephemeral port to avoid conflicts
environment:
- PORT=:8080
- STORAGE_MYSQL_DSN=root:root@tcp(mysql-app:3306)/submitqueue?parseTime=true
- QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true
- HOSTNAME=stovepipe-dev
depends_on:
mysql-app:
condition: service_healthy
mysql-queue:
condition: service_healthy
12 changes: 12 additions & 0 deletions example/stovepipe/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,19 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//api/stovepipe/protopb",
"//platform/consumer",
"//platform/errs",
"//platform/errs/generic",
"//platform/errs/mysql",
"//platform/extension/messagequeue",
"//platform/extension/messagequeue/mysql",
"//stovepipe/controller",
"//stovepipe/controller/process",
"//stovepipe/core/messagequeue",
"//stovepipe/extension/sourcecontrol",
"//stovepipe/extension/sourcecontrol/fake",
"//stovepipe/extension/storage/mysql",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally//:tally",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//reflection",
Expand Down
129 changes: 123 additions & 6 deletions example/stovepipe/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"database/sql"
"errors"
"fmt"
"net"
Expand All @@ -25,9 +26,21 @@ import (
"syscall"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally"
pb "github.com/uber/submitqueue/api/stovepipe/protopb"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/errs"
genericerrs "github.com/uber/submitqueue/platform/errs/generic"
mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql"
extqueue "github.com/uber/submitqueue/platform/extension/messagequeue"
queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql"
"github.com/uber/submitqueue/stovepipe/controller"
"github.com/uber/submitqueue/stovepipe/controller/process"
stovepipemq "github.com/uber/submitqueue/stovepipe/core/messagequeue"
"github.com/uber/submitqueue/stovepipe/extension/sourcecontrol"
sourcecontrolfake "github.com/uber/submitqueue/stovepipe/extension/sourcecontrol/fake"
storageMySQL "github.com/uber/submitqueue/stovepipe/extension/storage/mysql"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -70,6 +83,15 @@ func (c *inMemoryCounter) Next(_ context.Context, domain string) (int64, error)
return c.values[domain], nil
}

// fakeSourceControlFactory is the example SourceControl factory. It seeds each queue with a
// deterministic single-commit history so ingest resolves a stable head URI (and re-ingesting
// the same queue exercises the dedup path). A real deployment supplies a VCS-backed factory.
type fakeSourceControlFactory struct{}

func (fakeSourceControlFactory) For(cfg sourcecontrol.Config) (sourcecontrol.SourceControl, error) {
return sourcecontrolfake.New([]string{fmt.Sprintf("git://%s/HEAD", cfg.QueueName)}), nil
}

func main() {
code := 0
if err := run(); err != nil {
Expand Down Expand Up @@ -131,12 +153,85 @@ func run() error {
metricsWgDone.Wait()
}()

// Storage database (request + request_uri tables).
storageDSN := os.Getenv("STORAGE_MYSQL_DSN")
if storageDSN == "" {
return fmt.Errorf("STORAGE_MYSQL_DSN environment variable is required")
}
storageDB, err := sql.Open("mysql", storageDSN)
if err != nil {
return fmt.Errorf("failed to open storage database: %w", err)
}
defer storageDB.Close()

store, err := storageMySQL.NewStorage(storageDB, scope.SubScope("storage"))
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
defer store.Close()

// Queue database (messaging infrastructure for the process stage).
queueDSN := os.Getenv("QUEUE_MYSQL_DSN")
if queueDSN == "" {
return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required")
}
queueDB, err := sql.Open("mysql", queueDSN)
if err != nil {
return fmt.Errorf("failed to open queue database: %w", err)
}
defer queueDB.Close()

mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{
DB: queueDB,
Logger: logger,
MetricsScope: scope.SubScope("queue"),
})
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}
defer mysqlQueue.Close()

subscriberName := os.Getenv("HOSTNAME")
if subscriberName == "" {
subscriberName = fmt.Sprintf("stovepipe-%d", time.Now().Unix())
}

registry, err := newTopicRegistry(mysqlQueue, subscriberName)
if err != nil {
return fmt.Errorf("failed to create topic registry: %w", err)
}

// Consumer running the process stage.
primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry,
errs.NewClassifierProcessor(
genericerrs.Classifier,
mysqlerrs.Classifier,
),
)

processController := process.NewController(logger.Sugar(), scope, store, stovepipemq.TopicKeyProcess, "stovepipe-process")
if err := primaryConsumer.Register(processController); err != nil {
return fmt.Errorf("failed to register process controller: %w", err)
}

if err := primaryConsumer.Start(ctx); err != nil {
return fmt.Errorf("failed to start consumer: %w", err)
}
logger.Info("consumer started")

// Create gRPC server
grpcServer := grpc.NewServer()

// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
ingestController := controller.NewIngestController(logger.Sugar(), scope, newInMemoryCounter())
ingestController := controller.NewIngestController(
logger.Sugar(),
scope,
newInMemoryCounter(),
fakeSourceControlFactory{},
store,
registry,
)
srv := &StovepipeServer{
pingController: pingController,
ingestController: ingestController,
Expand Down Expand Up @@ -166,28 +261,50 @@ func run() error {
}()

// Wait for interrupt signal or server critical error
// If interruption is signaled, gracefully stop the server
// If an error happens during shutdown, return the actual error, not the context cancellation error
var serverErr error
select {
case <-ctx.Done():
fmt.Println("Shutting down stovepipe server due to interruption signal...")

// Set the error to the context cancellation error to be surfaced as a desired exit code by the main function
// to indicate that the server was stopped as intended
// It may be overridden by the server error if any
// to indicate that the server was stopped as intended. It may be overridden by the server error if any.
err = ctx.Err()

// stop GRPC server and wait for it to exit
grpcServer.GracefulStop()
serverErr = <-serverErrCh
case serverErr = <-serverErrCh:
fmt.Println("Shutting down stovepipe server due to critical GRPC server error...")
cancel()
}

if serverErr != nil {
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr)
}

consumerStopErr := primaryConsumer.Stop(30000)
if consumerStopErr != nil {
consumerStopErr = fmt.Errorf("failed to stop consumer: %w", consumerStopErr)
}

if consumerStopErr != nil || serverErr != nil {
err = errors.Join(err, consumerStopErr, serverErr)
}

return err
}

// newTopicRegistry builds the TopicRegistry for Stovepipe's internal pipeline queues. ingest
// publishes to the process topic and the process consumer subscribes to it.
func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) {
return consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Key: stovepipemq.TopicKeyProcess,
Name: "process",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "stovepipe-process",
),
},
})
}
13 changes: 13 additions & 0 deletions stovepipe/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//api/stovepipe/protopb",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/errs",
"//platform/extension/counter",
"//platform/metrics",
"//stovepipe/core/messagequeue",
"//stovepipe/entity",
"//stovepipe/extension/sourcecontrol",
"//stovepipe/extension/storage",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
Expand All @@ -28,7 +33,15 @@ go_test(
embed = [":controller"],
deps = [
"//api/stovepipe/protopb",
"//platform/consumer",
"//platform/extension/counter/mock",
"//platform/extension/messagequeue/mock",
"//stovepipe/core/messagequeue",
"//stovepipe/entity",
"//stovepipe/extension/sourcecontrol",
"//stovepipe/extension/sourcecontrol/mock",
"//stovepipe/extension/storage",
"//stovepipe/extension/storage/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
Expand Down
Loading
Loading