Skip to content

Latest commit

 

History

History
 
 

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

README.md

Regional Go Reconciler Module

This module combines a workqueue with a regional Go service to create a complete reconciler setup. It stands up both the workqueue infrastructure (receiver and dispatcher) and a reconciler service that processes events from the workqueue.

Usage

module "my-reconciler" {
  source  = "driftlessaf/reconcilers/infra//modules/regional-go-reconciler"
  version = "~> 1.0"

  project_id = var.project_id
  name       = "my-reconciler"
  regions    = var.regions

  service_account = google_service_account.reconciler.email

  containers = {
    "reconciler" = {
      source = {
        working_dir = path.module
        importpath  = "./cmd/reconciler"
      }
      ports = [{
        container_port = 8080
      }]
    }
  }

  notification_channels = [var.notification_channel]
}

Features

  • Integrated Workqueue: Automatically sets up a workqueue with receiver and dispatcher services
  • Regional Deployment: Deploys reconciler services in multiple regions with workqueue infrastructure
  • Flexible Configuration: Supports both regional and global workqueue scopes
  • Built from Source: Uses ko to build Go binaries directly from source
  • Monitoring: Includes dashboards and metrics for both workqueue and service

Architecture

The module creates:

  1. A workqueue (using the workqueue module) with:
    • Receiver service (${name}-rcv) that accepts events
    • Dispatcher service (${name}-dsp) that processes the queue
    • GCS buckets for queue storage
    • Pub/Sub topics and subscriptions
  2. A reconciler service (using the regional-go-service module) that:
    • Receives events from the dispatcher
    • Processes them according to your business logic
    • Can be configured with custom containers and environment variables

Workqueue Service Protocol

Your reconciler should implement the workqueue proto service:

type Reconciler struct {
    workqueue.UnimplementedWorkqueueServiceServer
    // your fields
}

func (r *Reconciler) Process(ctx context.Context, req *workqueue.ProcessRequest) (*workqueue.ProcessResponse, error) {
    // Process the event
    err := r.reconcile(ctx, req.Key)
    if err != nil {
        // Requeue with delay
        if delay, ok := workqueue.GetRequeueDelay(err); ok {
            return &workqueue.ProcessResponse{
                RequeueAfterSeconds: int64(delay.Seconds()),
            }, nil
        }
        // Non-retriable error
        if workqueue.IsNonRetriableError(err) {
            return nil, status.Error(codes.InvalidArgument, err.Error())
        }
        // Retriable error
        return nil, err
    }
    return &workqueue.ProcessResponse{}, nil
}

Monitoring

A dedicated dashboard module is available for monitoring:

module "my-reconciler-dashboard" {
  source  = "driftlessaf/reconcilers/infra//modules/dashboard/workqueue"
  version = "~> 1.0"

  name       = "my-reconciler"
  project_id = var.project_id

  max_retry       = module.my-reconciler.max_retry
  concurrent_work = module.my-reconciler.concurrent_work
  scope           = module.my-reconciler.scope
}

Variables

See variables.tf for all available configuration options.

Outputs

Name Description
workqueue-receiver The name of the workqueue receiver service
reconciler-uri The URI of the reconciler service
workqueue-topic The workqueue topic name
workqueue-dashboards Dashboard outputs for monitoring

Requirements

No requirements.

Providers

No providers.

Modules

Name Source Version
reconciler ../regional-go-service n/a
workqueue ../workqueue n/a
workqueue-sharded ../workqueue/hyperqueue n/a

Resources

No resources.

Inputs

Name Description Type Default Required
batch-size Optional cap on how much work to launch per dispatcher pass. number null no
concurrent-work The amount of concurrent work to dispatch at a given time. number 20 no
containers The containers to run in the service. Each container will be run in each region.
map(object({
source = object({
base_image = optional(string, "cgr.dev/chainguard/static:latest-glibc@sha256:a301031ffd4ed67f35ca7fa6cf3dad9937b5fa47d7493955a18d9b4ca5412d1a")
working_dir = string
importpath = string
env = optional(list(string), [])
})
args = optional(list(string), [])
ports = optional(list(object({
name = optional(string, "h2c")
container_port = number
})), [])
resources = optional(
object(
{
limits = optional(object(
{
cpu = string
memory = string
}
), null)
cpu_idle = optional(bool)
startup_cpu_boost = optional(bool, true)
}
),
{}
)
env = optional(list(object({
name = string
value = optional(string)
value_source = optional(object({
secret_key_ref = object({
secret = string
version = string
})
}), null)
})), [])
regional-env = optional(list(object({
name = string
value = map(string)
})), [])
regional-cpu-idle = optional(map(bool), {})
volume_mounts = optional(list(object({
name = string
mount_path = string
})), [])
startup_probe = optional(object({
initial_delay_seconds = optional(number)
timeout_seconds = optional(number, 240)
period_seconds = optional(number, 240)
failure_threshold = optional(number, 1)
tcp_socket = optional(object({
port = optional(number)
}), null)
grpc = optional(object({
port = optional(number)
service = optional(string)
}), null)
}), null)
liveness_probe = optional(object({
initial_delay_seconds = optional(number)
timeout_seconds = optional(number)
period_seconds = optional(number)
failure_threshold = optional(number)
http_get = optional(object({
path = optional(string)
http_headers = optional(list(object({
name = string
value = string
})), [])
}), null)
grpc = optional(object({
port = optional(number)
service = optional(string)
}), null)
}), null)
}))
{} no
deletion_protection Whether to enable delete protection for the service. bool true no
egress Which type of egress traffic to send through the VPC.

- ALL_TRAFFIC sends all traffic through regional VPC network. This should be used if service is not expected to egress to the Internet.
- PRIVATE_RANGES_ONLY sends only traffic to private IP addresses through regional VPC network
string "ALL_TRAFFIC" no
enable_dead_letter_alerting Whether to enable alerting for dead-lettered keys. bool true no
enable_profiler Enable continuous profiling for the service. This has a small performance impact, which shouldn't matter for production services. bool true no
execution_environment The execution environment for the service (options: EXECUTION_ENVIRONMENT_GEN1, EXECUTION_ENVIRONMENT_GEN2). string "EXECUTION_ENVIRONMENT_GEN2" no
labels Additional labels to add to all resources. map(string) {} no
max-retry The maximum number of times a task will be retried before being moved to the dead-letter queue. Set to 0 for unlimited retries. number 100 no
multi_regional_location The multi-regional location for the global workqueue bucket. Options: US, EU, ASIA. string "US" no
name n/a string n/a yes
notification_channels The channels to send notifications to. List of channel IDs list(string) [] no
otel_resources Resources to add to the OpenTelemetry resource. map(string) {} no
primary-region The primary region for single-homed resources like the reenqueue job. Defaults to the first region in the regions map. string null no
product The product that this service belongs to. string "" no
project_id n/a string n/a yes
regional-volumes The volumes to make available to the containers in the service for mounting.
list(object({
name = string
gcs = optional(map(object({
bucket = string
read_only = optional(bool, true)
mount_options = optional(list(string), [])
})), {})
nfs = optional(map(object({
server = string
path = string
read_only = optional(bool, true)
})), {})
}))
[] no
regions A map from region names to a network and subnetwork. A service will be created in each region configured to egress the specified traffic via the specified subnetwork.
map(object({
network = string
subnet = string
}))
n/a yes
request_timeout_seconds The request timeout for the service in seconds. number 300 no
scaling The scaling configuration for the service.
object({
min_instances = optional(number, 0)
max_instances = optional(number, 100)
max_instance_request_concurrency = optional(number, 1000)
})
{} no
service_account The service account as which to run the reconciler service. string n/a yes
shards Number of workqueue shards. When 1, uses standard workqueue. When >1, uses hyperqueue. number 1 no
slo Configuration for setting up SLO for the cloud run service
object({
enable = optional(bool, false)
enable_alerting = optional(bool, false)
success = optional(object(
{
multi_region_goal = optional(number, 0.999)
per_region_goal = optional(number, 0.999)
}
), null)
monitor_gclb = optional(bool, false)
})
{} no
team Team label to apply to resources (replaces deprecated 'squad'). string n/a yes
volumes The volumes to attach to the service.
list(object({
name = string
empty_dir = optional(object({
medium = optional(string, "MEMORY")
size_limit = optional(string, "1Gi")
}), null)
csi = optional(object({
driver = string
volume_attributes = optional(object({
bucketName = string
}), null)
}), null)
}))
[] no
workqueue_cpu_idle Set to false for a region in order to use instance-based billing for workqueue services (dispatcher and receiver). Defaults to true. To control reconciler cpu_idle, use the 'regional-cpu-idle' field in the 'containers' variable. map(map(bool))
{
"dispatcher": {},
"receiver": {}
}
no

Outputs

Name Description
receiver The workqueue receiver object for connecting triggers.
reconciler-uris The URIs of the reconciler service by region.