Skip to content

[FEATURE] Miner Revenue Reconciliation System #216

@epappas

Description

@epappas

Miner Revenue Reconciliation System - Architecture Document

Executive Summary

This document outlines the architecture and implementation plan for the Miner Revenue Reconciliation System, a financial reporting feature that aggregates rental revenue per miner to facilitate accurate payout calculations for GPU compute providers on the Basilica network.

Critical Context: This is a financial feature directly affecting miner payouts. Accuracy, auditability, and reliability are paramount.

1. System Overview

1.1 Business Requirements

The system must provide:

  1. Accurate Revenue Aggregation: Total revenue generated per miner (node_id) over configurable time periods
  2. Transaction-Level Audit Trail: Complete traceability from rental to credit transaction to revenue
  3. Multiple Report Formats: CSV for accounting team, JSON for API consumers
  4. Temporal Flexibility: Support for custom date ranges and predefined periods (daily, weekly, monthly, quarterly)
  5. Validator Attribution: Ability to split revenue by validator when needed
  6. Historical Accuracy: Handle backfilled transactions and incomplete balance snapshots

1.2 Key Stakeholders

  • Accounting Team: Primary consumer for payout calculations (CSV format)
  • Miner Operators: Need transparency on earnings
  • Finance/Audit: Require audit trail and reconciliation capabilities
  • Operations Team: Monitor system health and revenue patterns

2. Current System Analysis

2.1 Database Schema Assessment

Primary Tables for Revenue Calculation:

billing.rentals
├── rental_id (UUID, PK)
├── node_id (VARCHAR) -- Miner identifier
├── user_id (VARCHAR) -- Auth0 external_id
├── validator_id (VARCHAR, nullable)
├── package_id (VARCHAR, FK)
├── status (VARCHAR) -- 'pending', 'active', 'completed', 'cancelled', 'failed'
├── hourly_rate (DECIMAL 10,2)
├── start_time (TIMESTAMPTZ)
├── end_time (TIMESTAMPTZ, nullable)
├── total_cost (DECIMAL 10,2, nullable) -- Final computed cost
└── created_at, updated_at (TIMESTAMPTZ)

billing.credit_transactions
├── id (UUID, PK)
├── user_id (UUID, FK to users.user_id)
├── transaction_type (VARCHAR) -- 'credit', 'debit', 'reserve', 'release'
├── amount (DECIMAL 20,8)
├── reference_id (VARCHAR) -- Links to rental_id::text
├── reference_type (VARCHAR) -- 'rental', 'payment', 'refund'
├── balance_before, balance_after (DECIMAL 20,8)
├── metadata (JSONB)
└── created_at (TIMESTAMPTZ)

billing.active_rentals_facts
├── rental_id (UUID, PK)
├── node_id (VARCHAR)
├── validator_id (VARCHAR)
├── user_id (UUID, FK)
├── total_cost (DECIMAL 20,8)
├── duration_hours (DECIMAL 10,2)
├── start_time, end_time (TIMESTAMPTZ)
└── [additional resource metrics]

Key Findings:

  1. No Foreign Key from rentals to nodes: node_id is a free-form VARCHAR identifier
  2. User Identity Split: rentals.user_id uses Auth0 external_id (VARCHAR), while credits.user_id uses internal UUID
  3. Historical Data Gaps: Credit transactions before migration 025 have balance_before/after = 0 (marked with metadata->>'backfilled' = true)
  4. Revenue States: Only status = 'completed' rentals have finalized total_cost
  5. Incremental Charging: Active rentals accumulate cost incrementally via telemetry processing

2.2 Existing Code Patterns

Repository Pattern:

#[async_trait]
pub trait RentalRepository {
    async fn get_rental(&self, id: &RentalId) -> Result<Option<Rental>>;
    async fn get_active_rentals(&self, user_id: Option<&UserId>) -> Result<Vec<Rental>>;
    async fn get_rental_statistics(&self, user_id: Option<&UserId>) -> Result<RentalStatistics>;
}

Domain Types:

  • RentalId: UUID-based rental identifier
  • UserId: Auth0 external_id wrapper
  • CreditBalance: Decimal-based with 6 decimal precision
  • RentalState: Lifecycle enum (Pending, Active, Suspended, Terminating, Completed, Failed)

gRPC Service Structure:

  • BillingServiceImpl provides streaming telemetry ingestion and rental management
  • Existing endpoints: track_rental, finalize_rental, get_active_rentals, get_usage_report

3. Architecture Design

3.1 Design Principles

Following SOLID, DRY, KISS:

  1. Single Responsibility: Separate concerns for data aggregation, report generation, and API serving
  2. Open/Closed: Extensible for new report formats without modifying core logic
  3. Liskov Substitution: Repository abstractions allow testing with mock implementations
  4. Interface Segregation: Narrow, focused traits for each operation
  5. Dependency Inversion: Depend on abstractions, not concrete implementations

3.2 Component Architecture

┌─────────────────────────────────────────────────────────────┐
│                     gRPC API Layer                          │
│  - GetMinerRevenueReport (streaming response)               │
│  - ExportMinerRevenueReport (file generation)               │
└─────────────────┬───────────────────────────────────────────┘
                  │
┌─────────────────▼───────────────────────────────────────────┐
│              Domain Service Layer                           │
│  - MinerRevenueService                                      │
│    ├── aggregate_revenue()                                  │
│    ├── generate_report()                                    │
│    └── export_to_format()                                   │
└─────────────────┬───────────────────────────────────────────┘
                  │
┌─────────────────▼───────────────────────────────────────────┐
│             Repository Layer                                │
│  - MinerRevenueRepository                                   │
│    ├── get_miner_revenue()                                  │
│    ├── get_miner_revenue_details()                          │
│    └── get_revenue_by_time_period()                         │
└─────────────────┬───────────────────────────────────────────┘
                  │
┌─────────────────▼───────────────────────────────────────────┐
│              PostgreSQL Database                            │
│  - billing.rentals                                          │
│  - billing.credit_transactions                              │
│  - billing.active_rentals_facts (materialized view)         │
│  - billing.miner_revenue_facts (NEW aggregation table)      │
└─────────────────────────────────────────────────────────────┘

3.3 Data Flow

1. Report Request (API)
   └─> MinerRevenueService
       ├─> Validate parameters
       └─> MinerRevenueRepository
           ├─> Query rentals + credit_transactions
           ├─> Apply date filters
           ├─> Aggregate by node_id
           └─> Return MinerRevenueRecord[]

2. MinerRevenueService
   ├─> Transform to report format
   ├─> Apply sorting/filtering
   └─> Export (CSV/JSON)
       └─> Return to client

4. Detailed Implementation Plan

4.1 Phase 1: Database Layer (Days 1-2)

4.1.1 Create Aggregation Table

Migration: 027_miner_revenue_facts.sql

-- Materialized view for efficient miner revenue queries
CREATE TABLE billing.miner_revenue_facts (
    node_id VARCHAR(128) PRIMARY KEY,
    validator_id VARCHAR(128),

    -- Aggregated metrics
    total_rentals INTEGER NOT NULL DEFAULT 0,
    completed_rentals INTEGER NOT NULL DEFAULT 0,
    failed_rentals INTEGER NOT NULL DEFAULT 0,
    total_revenue DECIMAL(20,8) NOT NULL DEFAULT 0,
    total_hours DECIMAL(15,2) NOT NULL DEFAULT 0,

    -- Temporal bounds
    first_rental_at TIMESTAMPTZ,
    last_rental_at TIMESTAMPTZ,
    last_completed_rental_at TIMESTAMPTZ,

    -- Computed metrics
    avg_hourly_rate DECIMAL(10,4),
    avg_rental_duration_hours DECIMAL(10,2),

    -- Audit fields
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    computation_version INTEGER NOT NULL DEFAULT 1
);

CREATE INDEX idx_miner_revenue_facts_validator ON billing.miner_revenue_facts(validator_id);
CREATE INDEX idx_miner_revenue_facts_updated_at ON billing.miner_revenue_facts(updated_at);
CREATE INDEX idx_miner_revenue_facts_revenue ON billing.miner_revenue_facts(total_revenue DESC);

-- Function to refresh miner revenue facts
CREATE OR REPLACE FUNCTION billing.refresh_miner_revenue_facts()
RETURNS void AS $$
BEGIN
    INSERT INTO billing.miner_revenue_facts (
        node_id,
        validator_id,
        total_rentals,
        completed_rentals,
        failed_rentals,
        total_revenue,
        total_hours,
        first_rental_at,
        last_rental_at,
        last_completed_rental_at,
        avg_hourly_rate,
        avg_rental_duration_hours,
        updated_at
    )
    SELECT
        r.node_id,
        r.validator_id,
        COUNT(*) AS total_rentals,
        COUNT(*) FILTER (WHERE r.status = 'completed') AS completed_rentals,
        COUNT(*) FILTER (WHERE r.status = 'failed') AS failed_rentals,
        COALESCE(SUM(r.total_cost), 0) AS total_revenue,
        COALESCE(
            SUM(EXTRACT(EPOCH FROM (COALESCE(r.end_time, NOW()) - r.start_time)) / 3600),
            0
        ) AS total_hours,
        MIN(r.start_time) AS first_rental_at,
        MAX(r.start_time) AS last_rental_at,
        MAX(r.end_time) FILTER (WHERE r.status = 'completed') AS last_completed_rental_at,
        AVG(r.hourly_rate) AS avg_hourly_rate,
        AVG(EXTRACT(EPOCH FROM (COALESCE(r.end_time, NOW()) - r.start_time)) / 3600) AS avg_rental_duration_hours,
        NOW() AS updated_at
    FROM billing.rentals r
    WHERE r.status IN ('completed', 'failed')
    GROUP BY r.node_id, r.validator_id
    ON CONFLICT (node_id) DO UPDATE SET
        validator_id = EXCLUDED.validator_id,
        total_rentals = EXCLUDED.total_rentals,
        completed_rentals = EXCLUDED.completed_rentals,
        failed_rentals = EXCLUDED.failed_rentals,
        total_revenue = EXCLUDED.total_revenue,
        total_hours = EXCLUDED.total_hours,
        first_rental_at = EXCLUDED.first_rental_at,
        last_rental_at = EXCLUDED.last_rental_at,
        last_completed_rental_at = EXCLUDED.last_completed_rental_at,
        avg_hourly_rate = EXCLUDED.avg_hourly_rate,
        avg_rental_duration_hours = EXCLUDED.avg_rental_duration_hours,
        updated_at = NOW();
END;
$$ LANGUAGE plpgsql;

Rationale:

  • Pre-aggregated table reduces query complexity and latency
  • ON CONFLICT DO UPDATE provides idempotent refresh operation
  • Separates completed/failed rentals for reporting clarity
  • Includes validator_id for future split-revenue scenarios

4.1.2 Repository Trait Definition

File: src/storage/miner_revenue.rs

use crate::domain::types::{CreditBalance, UserId};
use crate::error::Result;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MinerRevenueRecord {
    pub node_id: String,
    pub validator_id: Option<String>,
    pub total_rentals: u64,
    pub completed_rentals: u64,
    pub failed_rentals: u64,
    pub total_revenue: CreditBalance,
    pub total_hours: Decimal,
    pub first_rental_at: Option<DateTime<Utc>>,
    pub last_rental_at: Option<DateTime<Utc>>,
    pub last_completed_rental_at: Option<DateTime<Utc>>,
    pub avg_hourly_rate: Option<Decimal>,
    pub avg_rental_duration_hours: Option<Decimal>,
}

#[derive(Debug, Clone)]
pub struct MinerRevenueFilter {
    pub node_ids: Option<Vec<String>>,
    pub validator_ids: Option<Vec<String>>,
    pub start_time: Option<DateTime<Utc>>,
    pub end_time: Option<DateTime<Utc>>,
    pub min_revenue: Option<CreditBalance>,
    pub include_failed: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MinerRevenueDetail {
    pub rental_id: uuid::Uuid,
    pub node_id: String,
    pub user_id: String,
    pub validator_id: Option<String>,
    pub package_id: String,
    pub hourly_rate: Decimal,
    pub start_time: DateTime<Utc>,
    pub end_time: Option<DateTime<Utc>>,
    pub duration_hours: Decimal,
    pub total_cost: CreditBalance,
    pub status: String,
}

#[async_trait]
pub trait MinerRevenueRepository: Send + Sync {
    async fn get_miner_revenue(
        &self,
        filter: &MinerRevenueFilter,
    ) -> Result<Vec<MinerRevenueRecord>>;

    async fn get_miner_revenue_details(
        &self,
        node_id: &str,
        start_time: Option<DateTime<Utc>>,
        end_time: Option<DateTime<Utc>>,
    ) -> Result<Vec<MinerRevenueDetail>>;

    async fn refresh_miner_revenue_facts(&self) -> Result<u64>;

    async fn get_total_network_revenue(
        &self,
        start_time: Option<DateTime<Utc>>,
        end_time: Option<DateTime<Utc>>,
    ) -> Result<CreditBalance>;
}

Design Rationale:

  • MinerRevenueRecord: Aggregate summary per miner
  • MinerRevenueDetail: Transaction-level drill-down for audit
  • MinerRevenueFilter: Flexible query parameters
  • refresh_miner_revenue_facts(): Trigger manual refresh for real-time accuracy

4.1.3 SQL Repository Implementation

File: src/storage/miner_revenue.rs (continued)

use crate::storage::rds::RdsConnection;
use sqlx::Row;
use std::sync::Arc;

pub struct SqlMinerRevenueRepository {
    connection: Arc<RdsConnection>,
}

impl SqlMinerRevenueRepository {
    pub fn new(connection: Arc<RdsConnection>) -> Self {
        Self { connection }
    }
}

#[async_trait]
impl MinerRevenueRepository for SqlMinerRevenueRepository {
    async fn get_miner_revenue(
        &self,
        filter: &MinerRevenueFilter,
    ) -> Result<Vec<MinerRevenueRecord>> {
        // Build dynamic query based on filter
        let mut query = String::from(
            r#"
            SELECT
                r.node_id,
                r.validator_id,
                COUNT(*) AS total_rentals,
                COUNT(*) FILTER (WHERE r.status = 'completed') AS completed_rentals,
                COUNT(*) FILTER (WHERE r.status = 'failed') AS failed_rentals,
                COALESCE(SUM(r.total_cost), 0) AS total_revenue,
                COALESCE(
                    SUM(EXTRACT(EPOCH FROM (COALESCE(r.end_time, NOW()) - r.start_time)) / 3600),
                    0
                ) AS total_hours,
                MIN(r.start_time) AS first_rental_at,
                MAX(r.start_time) AS last_rental_at,
                MAX(r.end_time) FILTER (WHERE r.status = 'completed') AS last_completed_rental_at,
                AVG(r.hourly_rate) AS avg_hourly_rate,
                AVG(EXTRACT(EPOCH FROM (COALESCE(r.end_time, NOW()) - r.start_time)) / 3600) AS avg_rental_duration_hours
            FROM billing.rentals r
            WHERE 1=1
            "#
        );

        let mut conditions = Vec::new();

        if !filter.include_failed {
            conditions.push("r.status = 'completed'");
        } else {
            conditions.push("r.status IN ('completed', 'failed')");
        }

        if filter.start_time.is_some() {
            conditions.push("r.start_time >= $start_time");
        }

        if filter.end_time.is_some() {
            conditions.push("r.start_time < $end_time");
        }

        if let Some(ref node_ids) = filter.node_ids {
            if !node_ids.is_empty() {
                conditions.push("r.node_id = ANY($node_ids)");
            }
        }

        if let Some(ref validator_ids) = filter.validator_ids {
            if !validator_ids.is_empty() {
                conditions.push("r.validator_id = ANY($validator_ids)");
            }
        }

        for condition in conditions {
            query.push_str(&format!(" AND {}", condition));
        }

        query.push_str(" GROUP BY r.node_id, r.validator_id");

        if filter.min_revenue.is_some() {
            query.push_str(" HAVING SUM(r.total_cost) >= $min_revenue");
        }

        query.push_str(" ORDER BY total_revenue DESC");

        // Execute query with bind parameters
        let mut q = sqlx::query(&query);

        if let Some(start) = filter.start_time {
            q = q.bind(start);
        }
        if let Some(end) = filter.end_time {
            q = q.bind(end);
        }
        if let Some(ref node_ids) = filter.node_ids {
            q = q.bind(node_ids);
        }
        if let Some(ref validator_ids) = filter.validator_ids {
            q = q.bind(validator_ids);
        }
        if let Some(min_rev) = filter.min_revenue {
            q = q.bind(min_rev.as_decimal());
        }

        let rows = q.fetch_all(self.connection.pool()).await?;

        Ok(rows
            .iter()
            .map(|row| MinerRevenueRecord {
                node_id: row.get("node_id"),
                validator_id: row.get("validator_id"),
                total_rentals: row.get::<i64, _>("total_rentals") as u64,
                completed_rentals: row.get::<i64, _>("completed_rentals") as u64,
                failed_rentals: row.get::<i64, _>("failed_rentals") as u64,
                total_revenue: CreditBalance::from_decimal(
                    row.get::<Decimal, _>("total_revenue")
                ),
                total_hours: row.get("total_hours"),
                first_rental_at: row.get("first_rental_at"),
                last_rental_at: row.get("last_rental_at"),
                last_completed_rental_at: row.get("last_completed_rental_at"),
                avg_hourly_rate: row.get("avg_hourly_rate"),
                avg_rental_duration_hours: row.get("avg_rental_duration_hours"),
            })
            .collect())
    }

    async fn get_miner_revenue_details(
        &self,
        node_id: &str,
        start_time: Option<DateTime<Utc>>,
        end_time: Option<DateTime<Utc>>,
    ) -> Result<Vec<MinerRevenueDetail>> {
        let mut query = sqlx::query(
            r#"
            SELECT
                rental_id,
                node_id,
                user_id,
                validator_id,
                package_id,
                hourly_rate,
                start_time,
                end_time,
                EXTRACT(EPOCH FROM (COALESCE(end_time, NOW()) - start_time)) / 3600 AS duration_hours,
                COALESCE(total_cost, 0) AS total_cost,
                status
            FROM billing.rentals
            WHERE node_id = $1
              AND status IN ('completed', 'failed')
            "#
        )
        .bind(node_id);

        if let Some(start) = start_time {
            query = sqlx::query(
                r#"
                SELECT
                    rental_id, node_id, user_id, validator_id, package_id,
                    hourly_rate, start_time, end_time,
                    EXTRACT(EPOCH FROM (COALESCE(end_time, NOW()) - start_time)) / 3600 AS duration_hours,
                    COALESCE(total_cost, 0) AS total_cost,
                    status
                FROM billing.rentals
                WHERE node_id = $1
                  AND status IN ('completed', 'failed')
                  AND start_time >= $2
                "#
            )
            .bind(node_id)
            .bind(start);
        }

        if let Some(end) = end_time {
            query = sqlx::query(
                r#"
                SELECT
                    rental_id, node_id, user_id, validator_id, package_id,
                    hourly_rate, start_time, end_time,
                    EXTRACT(EPOCH FROM (COALESCE(end_time, NOW()) - start_time)) / 3600 AS duration_hours,
                    COALESCE(total_cost, 0) AS total_cost,
                    status
                FROM billing.rentals
                WHERE node_id = $1
                  AND status IN ('completed', 'failed')
                  AND start_time >= $2
                  AND start_time < $3
                "#
            )
            .bind(node_id)
            .bind(start_time.unwrap())
            .bind(end);
        }

        query = query.bind("ORDER BY start_time ASC");

        let rows = query.fetch_all(self.connection.pool()).await?;

        Ok(rows
            .iter()
            .map(|row| MinerRevenueDetail {
                rental_id: row.get("rental_id"),
                node_id: row.get("node_id"),
                user_id: row.get("user_id"),
                validator_id: row.get("validator_id"),
                package_id: row.get::<Option<String>, _>("package_id").unwrap_or_default(),
                hourly_rate: row.get("hourly_rate"),
                start_time: row.get("start_time"),
                end_time: row.get("end_time"),
                duration_hours: row.get("duration_hours"),
                total_cost: CreditBalance::from_decimal(row.get("total_cost")),
                status: row.get("status"),
            })
            .collect())
    }

    async fn refresh_miner_revenue_facts(&self) -> Result<u64> {
        let result = sqlx::query("SELECT billing.refresh_miner_revenue_facts()")
            .execute(self.connection.pool())
            .await?;

        Ok(result.rows_affected())
    }

    async fn get_total_network_revenue(
        &self,
        start_time: Option<DateTime<Utc>>,
        end_time: Option<DateTime<Utc>>,
    ) -> Result<CreditBalance> {
        let mut query = String::from(
            "SELECT COALESCE(SUM(total_cost), 0) AS total FROM billing.rentals WHERE status = 'completed'"
        );

        let mut bind_values = Vec::new();

        if let Some(start) = start_time {
            query.push_str(" AND start_time >= $1");
            bind_values.push(start);
        }

        if let Some(end) = end_time {
            let param_num = if start_time.is_some() { 2 } else { 1 };
            query.push_str(&format!(" AND start_time < ${}", param_num));
            bind_values.push(end);
        }

        let mut q = sqlx::query(&query);
        for value in bind_values {
            q = q.bind(value);
        }

        let row = q.fetch_one(self.connection.pool()).await?;
        Ok(CreditBalance::from_decimal(row.get("total")))
    }
}

Implementation Notes:

  • Dynamic query construction for flexible filtering
  • Handles optional time ranges and node/validator filtering
  • Direct aggregation from rentals table for accuracy
  • refresh_miner_revenue_facts() calls PostgreSQL function for materialized view refresh

4.2 Phase 2: Domain Service Layer (Days 3-4)

4.2.1 Domain Types

File: src/domain/miner_revenue.rs

use crate::domain::types::CreditBalance;
use crate::error::Result;
use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MinerRevenueReport {
    pub report_id: uuid::Uuid,
    pub generated_at: DateTime<Utc>,
    pub period_start: DateTime<Utc>,
    pub period_end: DateTime<Utc>,
    pub total_miners: u64,
    pub total_network_revenue: CreditBalance,
    pub entries: Vec<MinerRevenueEntry>,
    pub metadata: ReportMetadata,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MinerRevenueEntry {
    pub node_id: String,
    pub validator_id: Option<String>,
    pub total_rentals: u64,
    pub completed_rentals: u64,
    pub total_revenue: CreditBalance,
    pub total_hours: Decimal,
    pub avg_hourly_rate: Option<Decimal>,
    pub revenue_share_percentage: Decimal,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReportMetadata {
    pub report_type: ReportType,
    pub filter_applied: String,
    pub computation_version: u32,
    pub data_quality_notes: Vec<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ReportType {
    MinerRevenue,
    ValidatorRevenue,
    NetworkSummary,
}

#[derive(Debug, Clone)]
pub enum ReportFormat {
    Json,
    Csv,
    Xlsx,
}

#[derive(Debug, Clone)]
pub struct ReportParameters {
    pub start_time: DateTime<Utc>,
    pub end_time: DateTime<Utc>,
    pub node_ids: Option<Vec<String>>,
    pub validator_ids: Option<Vec<String>>,
    pub min_revenue: Option<CreditBalance>,
    pub include_failed_rentals: bool,
    pub format: ReportFormat,
}

4.2.2 Service Implementation

File: src/domain/miner_revenue.rs (continued)

use crate::storage::miner_revenue::{MinerRevenueFilter, MinerRevenueRepository};
use std::sync::Arc;

pub struct MinerRevenueService {
    repository: Arc<dyn MinerRevenueRepository>,
}

impl MinerRevenueService {
    pub fn new(repository: Arc<dyn MinerRevenueRepository>) -> Self {
        Self { repository }
    }

    pub async fn generate_report(
        &self,
        params: ReportParameters,
    ) -> Result<MinerRevenueReport> {
        let filter = MinerRevenueFilter {
            node_ids: params.node_ids.clone(),
            validator_ids: params.validator_ids.clone(),
            start_time: Some(params.start_time),
            end_time: Some(params.end_time),
            min_revenue: params.min_revenue,
            include_failed: params.include_failed_rentals,
        };

        let records = self.repository.get_miner_revenue(&filter).await?;

        let total_network_revenue = self
            .repository
            .get_total_network_revenue(Some(params.start_time), Some(params.end_time))
            .await?;

        let entries: Vec<MinerRevenueEntry> = records
            .into_iter()
            .map(|record| {
                let revenue_share = if total_network_revenue.as_decimal() > Decimal::ZERO {
                    (record.total_revenue.as_decimal() / total_network_revenue.as_decimal()) * Decimal::from(100)
                } else {
                    Decimal::ZERO
                };

                MinerRevenueEntry {
                    node_id: record.node_id,
                    validator_id: record.validator_id,
                    total_rentals: record.total_rentals,
                    completed_rentals: record.completed_rentals,
                    total_revenue: record.total_revenue,
                    total_hours: record.total_hours,
                    avg_hourly_rate: record.avg_hourly_rate,
                    revenue_share_percentage: revenue_share,
                }
            })
            .collect();

        let metadata = ReportMetadata {
            report_type: ReportType::MinerRevenue,
            filter_applied: format!(
                "period={} to {}, filters={{node_ids:{:?}, validator_ids:{:?}}}",
                params.start_time, params.end_time, params.node_ids, params.validator_ids
            ),
            computation_version: 1,
            data_quality_notes: vec![],
        };

        Ok(MinerRevenueReport {
            report_id: uuid::Uuid::new_v4(),
            generated_at: Utc::now(),
            period_start: params.start_time,
            period_end: params.end_time,
            total_miners: entries.len() as u64,
            total_network_revenue,
            entries,
            metadata,
        })
    }

    pub async fn export_report(
        &self,
        report: &MinerRevenueReport,
        format: ReportFormat,
    ) -> Result<Vec<u8>> {
        match format {
            ReportFormat::Json => self.export_json(report),
            ReportFormat::Csv => self.export_csv(report),
            ReportFormat::Xlsx => self.export_xlsx(report),
        }
    }

    fn export_json(&self, report: &MinerRevenueReport) -> Result<Vec<u8>> {
        let json = serde_json::to_vec_pretty(report)?;
        Ok(json)
    }

    fn export_csv(&self, report: &MinerRevenueReport) -> Result<Vec<u8>> {
        use std::io::Write;

        let mut wtr = csv::Writer::from_writer(vec![]);

        wtr.write_record(&[
            "node_id",
            "validator_id",
            "total_rentals",
            "completed_rentals",
            "total_revenue",
            "total_hours",
            "avg_hourly_rate",
            "revenue_share_percentage",
        ])?;

        for entry in &report.entries {
            wtr.write_record(&[
                entry.node_id.clone(),
                entry.validator_id.clone().unwrap_or_default(),
                entry.total_rentals.to_string(),
                entry.completed_rentals.to_string(),
                entry.total_revenue.to_string(),
                entry.total_hours.to_string(),
                entry.avg_hourly_rate.map(|r| r.to_string()).unwrap_or_default(),
                entry.revenue_share_percentage.to_string(),
            ])?;
        }

        Ok(wtr.into_inner()?)
    }

    fn export_xlsx(&self, report: &MinerRevenueReport) -> Result<Vec<u8>> {
        Err(crate::error::BillingError::NotImplemented {
            feature: "XLSX export".to_string(),
        })
    }

    pub async fn get_miner_details(
        &self,
        node_id: &str,
        start_time: Option<DateTime<Utc>>,
        end_time: Option<DateTime<Utc>>,
    ) -> Result<Vec<crate::storage::miner_revenue::MinerRevenueDetail>> {
        self.repository
            .get_miner_revenue_details(node_id, start_time, end_time)
            .await
    }
}

Service Responsibilities:

  1. Report Generation: Orchestrates data retrieval and transformation
  2. Format Export: Converts report to CSV/JSON/XLSX
  3. Revenue Share Calculation: Computes percentage of total network revenue
  4. Drill-Down Support: Provides transaction-level detail for specific miners

4.3 Phase 3: gRPC API Layer (Days 5-6)

4.3.1 Protocol Buffer Definitions

File: crates/protocol/proto/billing.proto (additions)

// Miner Revenue Reporting

message GetMinerRevenueReportRequest {
  google.protobuf.Timestamp start_time = 1;
  google.protobuf.Timestamp end_time = 2;
  repeated string node_ids = 3;
  repeated string validator_ids = 4;
  string min_revenue = 5;  // Decimal string
  bool include_failed_rentals = 6;
  ReportFormat format = 7;
}

enum ReportFormat {
  REPORT_FORMAT_UNSPECIFIED = 0;
  REPORT_FORMAT_JSON = 1;
  REPORT_FORMAT_CSV = 2;
  REPORT_FORMAT_XLSX = 3;
}

message MinerRevenueReportResponse {
  string report_id = 1;
  google.protobuf.Timestamp generated_at = 2;
  google.protobuf.Timestamp period_start = 3;
  google.protobuf.Timestamp period_end = 4;
  uint64 total_miners = 5;
  string total_network_revenue = 6;
  repeated MinerRevenueEntry entries = 7;
  ReportMetadata metadata = 8;
}

message MinerRevenueEntry {
  string node_id = 1;
  string validator_id = 2;
  uint64 total_rentals = 3;
  uint64 completed_rentals = 4;
  string total_revenue = 5;
  string total_hours = 6;
  string avg_hourly_rate = 7;
  string revenue_share_percentage = 8;
}

message ReportMetadata {
  string report_type = 1;
  string filter_applied = 2;
  uint32 computation_version = 3;
  repeated string data_quality_notes = 4;
}

message ExportMinerRevenueReportRequest {
  string report_id = 1;  // Optional: re-export existing report
  GetMinerRevenueReportRequest params = 2;  // Or generate new report
  ReportFormat format = 3;
}

message ExportMinerRevenueReportResponse {
  bytes file_data = 1;
  string filename = 2;
  string content_type = 3;
}

message GetMinerRevenueDetailsRequest {
  string node_id = 1;
  google.protobuf.Timestamp start_time = 2;
  google.protobuf.Timestamp end_time = 3;
}

message GetMinerRevenueDetailsResponse {
  string node_id = 1;
  repeated RentalDetailEntry rentals = 2;
}

message RentalDetailEntry {
  string rental_id = 1;
  string user_id = 2;
  string validator_id = 3;
  string package_id = 4;
  string hourly_rate = 5;
  google.protobuf.Timestamp start_time = 6;
  google.protobuf.Timestamp end_time = 7;
  string duration_hours = 8;
  string total_cost = 9;
  string status = 10;
}

service BillingService {
  // ... existing methods ...

  rpc GetMinerRevenueReport(GetMinerRevenueReportRequest) returns (MinerRevenueReportResponse);
  rpc ExportMinerRevenueReport(ExportMinerRevenueReportRequest) returns (ExportMinerRevenueReportResponse);
  rpc GetMinerRevenueDetails(GetMinerRevenueDetailsRequest) returns (GetMinerRevenueDetailsResponse);
}

4.3.2 gRPC Service Implementation

File: src/grpc/billing_service.rs (additions)

async fn get_miner_revenue_report(
    &self,
    request: Request<GetMinerRevenueReportRequest>,
) -> std::result::Result<Response<MinerRevenueReportResponse>, Status> {
    let req = request.into_inner();

    let start_time = req
        .start_time
        .ok_or_else(|| Status::invalid_argument("start_time is required"))?;
    let end_time = req
        .end_time
        .ok_or_else(|| Status::invalid_argument("end_time is required"))?;

    let params = crate::domain::miner_revenue::ReportParameters {
        start_time: chrono::DateTime::from_timestamp(start_time.seconds, start_time.nanos as u32)
            .unwrap(),
        end_time: chrono::DateTime::from_timestamp(end_time.seconds, end_time.nanos as u32)
            .unwrap(),
        node_ids: if req.node_ids.is_empty() {
            None
        } else {
            Some(req.node_ids)
        },
        validator_ids: if req.validator_ids.is_empty() {
            None
        } else {
            Some(req.validator_ids)
        },
        min_revenue: if req.min_revenue.is_empty() {
            None
        } else {
            Some(CreditBalance::from_decimal(
                Decimal::from_str(&req.min_revenue).map_err(|e| {
                    Status::invalid_argument(format!("Invalid min_revenue: {}", e))
                })?,
            ))
        },
        include_failed_rentals: req.include_failed_rentals,
        format: match req.format() {
            basilica_protocol::billing::ReportFormat::Json => {
                crate::domain::miner_revenue::ReportFormat::Json
            }
            basilica_protocol::billing::ReportFormat::Csv => {
                crate::domain::miner_revenue::ReportFormat::Csv
            }
            basilica_protocol::billing::ReportFormat::Xlsx => {
                crate::domain::miner_revenue::ReportFormat::Xlsx
            }
            _ => crate::domain::miner_revenue::ReportFormat::Json,
        },
    };

    let report = self
        .miner_revenue_service
        .generate_report(params)
        .await
        .map_err(|e| Status::internal(format!("Failed to generate report: {}", e)))?;

    let response = MinerRevenueReportResponse {
        report_id: report.report_id.to_string(),
        generated_at: Some(prost_types::Timestamp {
            seconds: report.generated_at.timestamp(),
            nanos: report.generated_at.timestamp_subsec_nanos() as i32,
        }),
        period_start: Some(prost_types::Timestamp {
            seconds: report.period_start.timestamp(),
            nanos: report.period_start.timestamp_subsec_nanos() as i32,
        }),
        period_end: Some(prost_types::Timestamp {
            seconds: report.period_end.timestamp(),
            nanos: report.period_end.timestamp_subsec_nanos() as i32,
        }),
        total_miners: report.total_miners,
        total_network_revenue: report.total_network_revenue.to_string(),
        entries: report
            .entries
            .into_iter()
            .map(|e| basilica_protocol::billing::MinerRevenueEntry {
                node_id: e.node_id,
                validator_id: e.validator_id.unwrap_or_default(),
                total_rentals: e.total_rentals,
                completed_rentals: e.completed_rentals,
                total_revenue: e.total_revenue.to_string(),
                total_hours: e.total_hours.to_string(),
                avg_hourly_rate: e.avg_hourly_rate.map(|r| r.to_string()).unwrap_or_default(),
                revenue_share_percentage: e.revenue_share_percentage.to_string(),
            })
            .collect(),
        metadata: Some(basilica_protocol::billing::ReportMetadata {
            report_type: report.metadata.report_type.to_string(),
            filter_applied: report.metadata.filter_applied,
            computation_version: report.metadata.computation_version,
            data_quality_notes: report.metadata.data_quality_notes,
        }),
    };

    Ok(Response::new(response))
}

async fn export_miner_revenue_report(
    &self,
    request: Request<ExportMinerRevenueReportRequest>,
) -> std::result::Result<Response<ExportMinerRevenueReportResponse>, Status> {
    let req = request.into_inner();

    let params = req
        .params
        .ok_or_else(|| Status::invalid_argument("params are required"))?;

    // Generate report using existing logic
    let report_request = GetMinerRevenueReportRequest {
        start_time: params.start_time,
        end_time: params.end_time,
        node_ids: params.node_ids,
        validator_ids: params.validator_ids,
        min_revenue: params.min_revenue,
        include_failed_rentals: params.include_failed_rentals,
        format: req.format,
    };

    let report_response = self
        .get_miner_revenue_report(Request::new(report_request))
        .await?
        .into_inner();

    // Convert to internal report structure
    let report = self.proto_to_domain_report(report_response)?;

    // Export to requested format
    let format = match req.format() {
        basilica_protocol::billing::ReportFormat::Json => {
            crate::domain::miner_revenue::ReportFormat::Json
        }
        basilica_protocol::billing::ReportFormat::Csv => {
            crate::domain::miner_revenue::ReportFormat::Csv
        }
        basilica_protocol::billing::ReportFormat::Xlsx => {
            crate::domain::miner_revenue::ReportFormat::Xlsx
        }
        _ => crate::domain::miner_revenue::ReportFormat::Json,
    };

    let file_data = self
        .miner_revenue_service
        .export_report(&report, format)
        .await
        .map_err(|e| Status::internal(format!("Failed to export report: {}", e)))?;

    let (filename, content_type) = match req.format() {
        basilica_protocol::billing::ReportFormat::Json => {
            (format!("miner_revenue_{}.json", report.report_id), "application/json")
        }
        basilica_protocol::billing::ReportFormat::Csv => {
            (format!("miner_revenue_{}.csv", report.report_id), "text/csv")
        }
        basilica_protocol::billing::ReportFormat::Xlsx => {
            (format!("miner_revenue_{}.xlsx", report.report_id), "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
        }
        _ => (format!("miner_revenue_{}.json", report.report_id), "application/json"),
    };

    let response = ExportMinerRevenueReportResponse {
        file_data,
        filename,
        content_type: content_type.to_string(),
    };

    Ok(Response::new(response))
}

async fn get_miner_revenue_details(
    &self,
    request: Request<GetMinerRevenueDetailsRequest>,
) -> std::result::Result<Response<GetMinerRevenueDetailsResponse>, Status> {
    let req = request.into_inner();

    let start_time = req.start_time.map(|ts| {
        chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32).unwrap()
    });

    let end_time = req.end_time.map(|ts| {
        chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32).unwrap()
    });

    let details = self
        .miner_revenue_service
        .get_miner_details(&req.node_id, start_time, end_time)
        .await
        .map_err(|e| Status::internal(format!("Failed to get miner details: {}", e)))?;

    let rentals = details
        .into_iter()
        .map(|detail| basilica_protocol::billing::RentalDetailEntry {
            rental_id: detail.rental_id.to_string(),
            user_id: detail.user_id,
            validator_id: detail.validator_id.unwrap_or_default(),
            package_id: detail.package_id,
            hourly_rate: detail.hourly_rate.to_string(),
            start_time: Some(prost_types::Timestamp {
                seconds: detail.start_time.timestamp(),
                nanos: detail.start_time.timestamp_subsec_nanos() as i32,
            }),
            end_time: detail.end_time.map(|t| prost_types::Timestamp {
                seconds: t.timestamp(),
                nanos: t.timestamp_subsec_nanos() as i32,
            }),
            duration_hours: detail.duration_hours.to_string(),
            total_cost: detail.total_cost.to_string(),
            status: detail.status,
        })
        .collect();

    let response = GetMinerRevenueDetailsResponse {
        node_id: req.node_id,
        rentals,
    };

    Ok(Response::new(response))
}

4.4 Phase 4: Testing (Days 7-8)

4.4.1 Unit Tests

File: src/storage/miner_revenue.rs (tests module)

#[cfg(test)]
mod tests {
    use super::*;
    use chrono::Utc;

    #[tokio::test]
    async fn test_get_miner_revenue_no_filter() {
        // Arrange
        let connection = setup_test_db().await;
        seed_test_data(&connection).await;
        let repo = SqlMinerRevenueRepository::new(Arc::new(connection));

        let filter = MinerRevenueFilter {
            node_ids: None,
            validator_ids: None,
            start_time: None,
            end_time: None,
            min_revenue: None,
            include_failed: false,
        };

        // Act
        let result = repo.get_miner_revenue(&filter).await;

        // Assert
        assert!(result.is_ok());
        let records = result.unwrap();
        assert_eq!(records.len(), 3); // Expect 3 miners from seed
        assert!(records[0].total_revenue > CreditBalance::zero());
    }

    #[tokio::test]
    async fn test_get_miner_revenue_with_time_filter() {
        let connection = setup_test_db().await;
        seed_test_data(&connection).await;
        let repo = SqlMinerRevenueRepository::new(Arc::new(connection));

        let now = Utc::now();
        let filter = MinerRevenueFilter {
            node_ids: None,
            validator_ids: None,
            start_time: Some(now - chrono::Duration::days(7)),
            end_time: Some(now),
            min_revenue: None,
            include_failed: false,
        };

        let result = repo.get_miner_revenue(&filter).await;

        assert!(result.is_ok());
        let records = result.unwrap();
        for record in records {
            assert!(record.last_rental_at.unwrap() >= now - chrono::Duration::days(7));
        }
    }

    #[tokio::test]
    async fn test_get_miner_revenue_details() {
        let connection = setup_test_db().await;
        seed_test_data(&connection).await;
        let repo = SqlMinerRevenueRepository::new(Arc::new(connection));

        let result = repo.get_miner_revenue_details("node-1", None, None).await;

        assert!(result.is_ok());
        let details = result.unwrap();
        assert!(!details.is_empty());
        assert_eq!(details[0].node_id, "node-1");
    }

    #[tokio::test]
    async fn test_refresh_miner_revenue_facts() {
        let connection = setup_test_db().await;
        seed_test_data(&connection).await;
        let repo = SqlMinerRevenueRepository::new(Arc::new(connection));

        let result = repo.refresh_miner_revenue_facts().await;

        assert!(result.is_ok());
        assert!(result.unwrap() > 0);
    }
}

4.4.2 Integration Tests

File: tests/miner_revenue_integration_test.rs

use basilica_billing::domain::miner_revenue::{MinerRevenueService, ReportParameters, ReportFormat};
use basilica_billing::storage::miner_revenue::SqlMinerRevenueRepository;
use chrono::Utc;
use std::sync::Arc;

#[tokio::test]
async fn test_generate_miner_revenue_report_end_to_end() {
    // Setup
    let db_connection = setup_integration_test_db().await;
    seed_realistic_rental_data(&db_connection).await;

    let repo = Arc::new(SqlMinerRevenueRepository::new(Arc::new(db_connection)));
    let service = MinerRevenueService::new(repo);

    // Generate report
    let params = ReportParameters {
        start_time: Utc::now() - chrono::Duration::days(30),
        end_time: Utc::now(),
        node_ids: None,
        validator_ids: None,
        min_revenue: None,
        include_failed_rentals: false,
        format: ReportFormat::Json,
    };

    let report = service.generate_report(params).await.unwrap();

    // Assertions
    assert!(report.entries.len() > 0);
    assert!(report.total_network_revenue > CreditBalance::zero());

    let total_from_entries: CreditBalance = report
        .entries
        .iter()
        .map(|e| e.total_revenue)
        .sum();

    assert_eq!(total_from_entries, report.total_network_revenue);
}

#[tokio::test]
async fn test_export_csv_format() {
    let db_connection = setup_integration_test_db().await;
    seed_realistic_rental_data(&db_connection).await;

    let repo = Arc::new(SqlMinerRevenueRepository::new(Arc::new(db_connection)));
    let service = MinerRevenueService::new(repo);

    let params = ReportParameters {
        start_time: Utc::now() - chrono::Duration::days(7),
        end_time: Utc::now(),
        node_ids: None,
        validator_ids: None,
        min_revenue: None,
        include_failed_rentals: false,
        format: ReportFormat::Csv,
    };

    let report = service.generate_report(params).await.unwrap();
    let csv_data = service.export_report(&report, ReportFormat::Csv).await.unwrap();

    // Validate CSV structure
    let csv_string = String::from_utf8(csv_data).unwrap();
    let lines: Vec<&str> = csv_string.lines().collect();

    assert!(lines.len() > 1); // Header + at least one data row
    assert!(lines[0].contains("node_id"));
    assert!(lines[0].contains("total_revenue"));
}

4.4.3 Financial Accuracy Tests

File: tests/revenue_accuracy_test.rs

#[tokio::test]
async fn test_revenue_matches_credit_transactions() {
    let db = setup_test_db().await;

    // Create test rentals with known costs
    let rental1 = create_test_rental("node-1", 100.0);
    let rental2 = create_test_rental("node-1", 250.50);
    let rental3 = create_test_rental("node-2", 75.25);

    // Create corresponding credit transactions
    create_credit_transaction(&rental1, 100.0);
    create_credit_transaction(&rental2, 250.50);
    create_credit_transaction(&rental3, 75.25);

    // Query revenue report
    let repo = SqlMinerRevenueRepository::new(Arc::new(db));
    let filter = MinerRevenueFilter {
        node_ids: None,
        validator_ids: None,
        start_time: None,
        end_time: None,
        min_revenue: None,
        include_failed: false,
    };

    let records = repo.get_miner_revenue(&filter).await.unwrap();

    // Verify node-1 revenue
    let node1_record = records.iter().find(|r| r.node_id == "node-1").unwrap();
    assert_eq!(
        node1_record.total_revenue,
        CreditBalance::from_f64(350.50).unwrap()
    );

    // Verify node-2 revenue
    let node2_record = records.iter().find(|r| r.node_id == "node-2").unwrap();
    assert_eq!(
        node2_record.total_revenue,
        CreditBalance::from_f64(75.25).unwrap()
    );
}

#[tokio::test]
async fn test_revenue_calculation_with_backfilled_transactions() {
    // Test that backfilled transactions (metadata->>'backfilled' = true)
    // are included in revenue calculations
    let db = setup_test_db().await;

    create_backfilled_rental_and_transaction("node-1", 500.0);
    create_normal_rental_and_transaction("node-1", 200.0);

    let repo = SqlMinerRevenueRepository::new(Arc::new(db));
    let filter = MinerRevenueFilter {
        node_ids: Some(vec!["node-1".to_string()]),
        validator_ids: None,
        start_time: None,
        end_time: None,
        min_revenue: None,
        include_failed: false,
    };

    let records = repo.get_miner_revenue(&filter).await.unwrap();
    let node1 = &records[0];

    // Both backfilled and normal transactions should be counted
    assert_eq!(
        node1.total_revenue,
        CreditBalance::from_f64(700.0).unwrap()
    );
}

4.5 Phase 5: Documentation & Deployment (Day 9)

4.5.1 API Documentation

File: docs/api/miner-revenue-reporting.md

# Miner Revenue Reporting API

## Overview

The Miner Revenue Reporting API provides financial reports aggregating rental revenue per GPU node (miner) for payout calculations.

## Endpoints

### 1. Get Miner Revenue Report

**RPC**: `GetMinerRevenueReport`

**Request**:
```protobuf
message GetMinerRevenueReportRequest {
  google.protobuf.Timestamp start_time = 1;  // Required
  google.protobuf.Timestamp end_time = 2;    // Required
  repeated string node_ids = 3;              // Optional filter
  repeated string validator_ids = 4;         // Optional filter
  string min_revenue = 5;                    // Optional: minimum revenue threshold
  bool include_failed_rentals = 6;          // Default: false
  ReportFormat format = 7;                   // Default: JSON
}

Response:

message MinerRevenueReportResponse {
  string report_id = 1;
  google.protobuf.Timestamp generated_at = 2;
  google.protobuf.Timestamp period_start = 3;
  google.protobuf.Timestamp period_end = 4;
  uint64 total_miners = 5;
  string total_network_revenue = 6;
  repeated MinerRevenueEntry entries = 7;
  ReportMetadata metadata = 8;
}

Example:

grpcurl -d '{
  "start_time": {"seconds": 1704067200},
  "end_time": {"seconds": 1706745600},
  "format": "REPORT_FORMAT_JSON"
}' billing.basilica.ai:50051 billing.BillingService/GetMinerRevenueReport

2. Export Miner Revenue Report

RPC: ExportMinerRevenueReport

Downloads report as CSV/JSON file for offline processing.

Request:

message ExportMinerRevenueReportRequest {
  GetMinerRevenueReportRequest params = 1;
  ReportFormat format = 2;  // CSV, JSON, or XLSX
}

Response:

message ExportMinerRevenueReportResponse {
  bytes file_data = 1;
  string filename = 2;
  string content_type = 3;
}

3. Get Miner Revenue Details

RPC: GetMinerRevenueDetails

Drill-down to individual rental transactions for a specific miner.

Request:

message GetMinerRevenueDetailsRequest {
  string node_id = 1;  // Required
  google.protobuf.Timestamp start_time = 2;  // Optional
  google.protobuf.Timestamp end_time = 3;    // Optional
}

CSV Format Specification

node_id,validator_id,total_rentals,completed_rentals,total_revenue,total_hours,avg_hourly_rate,revenue_share_percentage
node-abc123,validator-xyz789,42,40,1250.50,350.25,3.57,15.23
node-def456,validator-xyz789,28,28,850.75,290.00,2.93,10.35

Column Descriptions:

  • node_id: Unique identifier for GPU node (miner)
  • validator_id: Associated validator (may be empty)
  • total_rentals: Total number of rentals (completed + failed)
  • completed_rentals: Number of successfully completed rentals
  • total_revenue: Sum of total_cost from all completed rentals (in credits)
  • total_hours: Sum of rental durations (in hours)
  • avg_hourly_rate: Average hourly rate across all rentals
  • revenue_share_percentage: Percentage of total network revenue

Data Quality Notes

  1. Historical Data: Transactions before migration 025 (2024-XX-XX) may have incomplete balance snapshots
  2. Active Rentals: Only completed rentals (status = 'completed') contribute to revenue
  3. Failed Rentals: Excluded by default; include with include_failed_rentals = true
  4. Timezone: All timestamps in UTC

Rate Limiting

  • 10 requests per minute per API key
  • Large exports (>10MB) may timeout; use time-range filtering

Error Codes

  • INVALID_ARGUMENT: Missing required parameters or invalid date range
  • NOT_FOUND: No data found for specified filters
  • INTERNAL: Database or computation error

#### 4.5.2 Deployment Checklist

```markdown
# Deployment Checklist: Miner Revenue Reporting

## Pre-Deployment

- [ ] Run all unit tests: `cargo test -p basilica-billing`
- [ ] Run integration tests: `cargo test --test miner_revenue_integration_test`
- [ ] Verify migration 027 applies cleanly on staging database
- [ ] Run `refresh_miner_revenue_facts()` manually and verify results
- [ ] Generate sample CSV report and validate with accounting team
- [ ] Review gRPC API with Postman/grpcurl
- [ ] Load test: 1000 concurrent report requests
- [ ] Security review: SQL injection, input validation

## Deployment Steps

1. **Database Migration**
   ```bash
   cd crates/basilica-billing
   sqlx migrate run
  1. Build Binary

    ./scripts/basilica-billing/build.sh
  2. Deploy to Staging

    kubectl apply -f k8s/billing-service-staging.yaml
  3. Smoke Tests

    grpcurl -d '{"start_time": {...}, "end_time": {...}}' \
      staging-billing.basilica.ai:50051 \
      billing.BillingService/GetMinerRevenueReport
  4. Deploy to Production

    kubectl apply -f k8s/billing-service-production.yaml

Post-Deployment

  • Monitor Prometheus metrics: billing_miner_revenue_report_duration_seconds
  • Check error logs for first 24 hours
  • Generate first production report and validate with finance team
  • Schedule daily automated refresh: refresh_miner_revenue_facts() via cron
  • Update internal documentation wiki
  • Announce feature to miner community

Rollback Plan

If issues arise:

  1. Revert Kubernetes deployment: kubectl rollout undo deployment/billing-service
  2. Migration rollback (if needed): Manually drop miner_revenue_facts table
  3. Disable gRPC endpoints via feature flag

## 5. Risk Assessment & Mitigation

### 5.1 Financial Risks

| Risk | Impact | Probability | Mitigation |
|------|--------|-------------|------------|
| Revenue miscalculation | High | Low | Comprehensive testing, audit trail, cross-validation with credit_transactions |
| Double-counting rentals | High | Low | Use DISTINCT rental_id in aggregations, idempotency checks |
| Missing historical data | Medium | Medium | Document data quality notes in reports, backfill where possible |
| Currency precision errors | High | Low | Use Decimal type (6 decimal places), avoid floating-point arithmetic |
| Incomplete rental records | Medium | Medium | Filter by `status = 'completed'`, validate `end_time IS NOT NULL` |

### 5.2 Technical Risks

| Risk | Impact | Probability | Mitigation |
|------|--------|-------------|------------|
| Query performance degradation | Medium | Medium | Materialized view `miner_revenue_facts`, indexed queries, pagination |
| Database connection pool exhaustion | Medium | Low | Connection pooling with max limits, timeout configuration |
| Large CSV file generation OOM | Low | Low | Stream CSV generation, implement pagination for >10K records |
| gRPC timeout on large reports | Medium | Medium | 30s timeout, recommend time-range filtering for large datasets |

### 5.3 Operational Risks

| Risk | Impact | Probability | Mitigation |
|------|--------|-------------|------------|
| Delayed payout to miners | High | Low | Automated alerts for report generation failures, manual override capability |
| Accounting team workflow disruption | Medium | Low | CSV format validation with team, training session, fallback to manual calculation |
| API abuse / DoS | Medium | Low | Rate limiting (10 req/min), API key authentication |

## 6. Performance Considerations

### 6.1 Query Optimization

**Expected Load**:
- 100 miners × 30 days × 24 hours = ~72,000 rental records per month
- Report generation: 1-2 seconds for monthly reports
- CSV export: 3-5 seconds for 100 miners

**Optimization Strategies**:
1. **Materialized View**: Pre-aggregate common queries
2. **Indexed Columns**: `node_id`, `start_time`, `status`, `validator_id`
3. **Partition by Time**: Consider partitioning `rentals` table by month (future enhancement)
4. **Read Replicas**: Route report queries to read-only replicas

### 6.2 Caching Strategy

- **Report Cache**: Cache generated reports for 1 hour (keyed by parameters)
- **Materialized View Refresh**: Hourly cron job to refresh `miner_revenue_facts`
- **CDN for CSV Files**: Store exported CSVs in S3, serve via CloudFront

## 7. Monitoring & Alerting

### 7.1 Metrics

```rust
// Prometheus metrics to implement
billing_miner_revenue_report_requests_total{status="success|error"}
billing_miner_revenue_report_duration_seconds
billing_miner_revenue_export_size_bytes
billing_miner_revenue_facts_refresh_duration_seconds
billing_miner_revenue_total_miners_gauge
billing_miner_revenue_total_network_revenue_gauge

7.2 Alerts

- alert: MinerRevenueReportHighErrorRate
  expr: rate(billing_miner_revenue_report_requests_total{status="error"}[5m]) > 0.1
  annotations:
    summary: "High error rate in miner revenue reporting"

- alert: MinerRevenueFactsStale
  expr: time() - billing_miner_revenue_facts_last_refresh_timestamp > 7200
  annotations:
    summary: "Miner revenue facts not refreshed in 2 hours"

8. Future Enhancements

8.1 Phase 2 Features (Post-Launch)

  1. Validator Revenue Split: Attribute revenue between miner and validator (commission-based)
  2. Multi-Currency Support: Convert credits to USD/EUR for international payouts
  3. Scheduled Reports: Automated monthly report generation and email delivery
  4. Tax Reporting: Generate IRS 1099 forms for US-based miners
  5. Dashboard UI: Web interface for real-time revenue tracking
  6. Payment Integration: Direct Stripe/bank transfer payout automation

8.2 Scalability Roadmap

  1. Horizontal Sharding: Shard rentals table by node_id hash for >1M records
  2. OLAP Database: Migrate analytics to ClickHouse for sub-second queries
  3. Streaming Aggregation: Real-time revenue updates via Kafka + Flink
  4. GraphQL API: Flexible query interface for custom reporting needs

9. Acceptance Criteria

9.1 Functional Requirements

  • Generate revenue report aggregated by node_id
  • Support custom date ranges (start_time, end_time)
  • Export report in CSV format
  • Export report in JSON format
  • Provide transaction-level drill-down for specific miner
  • Calculate revenue share percentage per miner
  • Filter by minimum revenue threshold
  • Include/exclude failed rentals
  • Support filtering by validator_id

9.2 Non-Functional Requirements

  • Report generation completes in <5 seconds for 30-day period
  • CSV export for 100 miners completes in <10 seconds
  • Revenue calculations accurate to 6 decimal places
  • API supports 10 concurrent requests
  • 99.9% uptime SLA
  • Audit trail for all financial queries
  • Role-based access control (finance team only)

10. Implementation Timeline

Phase Tasks Duration Dependencies
Phase 1: Database Migration 027, repository trait, SQL implementation 2 days None
Phase 2: Domain Service layer, report types, export logic 2 days Phase 1
Phase 3: API gRPC endpoints, protocol buffers 2 days Phase 2
Phase 4: Testing Unit tests, integration tests, accuracy tests 2 days Phase 3
Phase 5: Deployment Documentation, staging deployment, production release 1 day Phase 4
Total 9 days

Estimated Effort: 9 development days (1.8 weeks for single developer)

11. Appendix

11.1 Sample SQL Queries

Get Top 10 Miners by Revenue:

SELECT node_id, total_revenue
FROM billing.miner_revenue_facts
ORDER BY total_revenue DESC
LIMIT 10;

Verify Revenue Matches Credit Transactions:

SELECT
    r.node_id,
    SUM(r.total_cost) AS rental_revenue,
    SUM(ct.amount) AS transaction_revenue
FROM billing.rentals r
LEFT JOIN billing.credit_transactions ct
    ON ct.reference_id = r.rental_id::text
    AND ct.reference_type = 'rental'
WHERE r.status = 'completed'
GROUP BY r.node_id
HAVING ABS(SUM(r.total_cost) - SUM(ct.amount)) > 0.01;

11.2 Code Review Checklist

  • All public functions have docstrings
  • Error handling covers all failure modes
  • No SQL injection vulnerabilities
  • Decimal precision preserved throughout
  • No floating-point arithmetic for financial calculations
  • Repository pattern properly abstracted
  • Unit test coverage >80%
  • Integration tests cover happy path and edge cases
  • No TODOs or placeholder code
  • Follows project code style (rustfmt, clippy)

Document Version: 1.0
Last Updated: 2025-10-29
Author: System Architect
Status: Approved for Implementation

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions