Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions quesma/telemetry/phone_home.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package telemetry
import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"database/sql"
"encoding/json"
Expand All @@ -19,6 +20,7 @@ import (
"quesma/elasticsearch"
"quesma/health"
telemetry_headers "quesma/telemetry/headers"
"sort"

"quesma/logger"
"quesma/quesma/config"
Expand Down Expand Up @@ -46,6 +48,7 @@ const (

// for local debugging purposes
phoneHomeLocalEnabled = false // used initially for testing
tablesInUsageReport = 10
)

type ClickHouseStats struct {
Expand All @@ -55,6 +58,9 @@ type ClickHouseStats struct {
OpenConnection int `json:"open_connection"`
MaxOpenConnection int `json:"max_open_connection"`
ServerVersion string `json:"server_version"`
DbInfoHash string `json:"db_info_hash"`
BillableSize int64 `json:"billable_size"`
TopTablesSizeInfo string `json:"top_tables_size_info"`
}

type ElasticStats struct {
Expand Down Expand Up @@ -296,6 +302,88 @@ where active
return nil
}

func (a *agent) collectClickHouseTableSizes(ctx context.Context) (int64, map[string]int64, error) {
totalSize, tablesWithSizes, err := a.getTableSizes(a.ctx)
if err != nil {
logger.WarnWithCtx(ctx).Msgf("Error getting table sizes from clickhouse: %v", err)
return 0, nil, err
}
return totalSize, tablesWithSizes, nil
}

func (a *agent) getDbInfoHash() string {
dbUrl, dbName, dbUser := "", "default", "<no-user>"
if a.config.ClickHouse.User != "" {
dbUser = a.config.ClickHouse.User
}
if a.config.ClickHouse.Database != "" {
dbName = a.config.ClickHouse.Database
}
if a.config.ClickHouse.Url != nil {
dbUrl = a.config.ClickHouse.Url.String()
}
// we hash it to avoid leaking sensitive info
dbInfoHash := sha256.Sum256([]byte(fmt.Sprintf("%s@%s/%s", dbUser, dbUrl, dbName)))
return fmt.Sprintf("%x", dbInfoHash[:8])
}

func (a *agent) getTableSizes(ctx context.Context) (int64, map[string]int64, error) {
tableSizes := make(map[string]int64)
dbName := "default"
allTablesSize := int64(0)
if a.config.ClickHouse.Database != "" {
dbName = a.config.ClickHouse.Database
}
query := `SELECT table, sum(bytes_on_disk) AS total_size
FROM system.parts
WHERE active = 1 AND database = ?
GROUP BY table
ORDER BY total_size DESC;`

rows, err := a.clickHouseDb.QueryContext(ctx, query, dbName)
if err != nil {
return 0, nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
for rows.Next() {
var tableName string
var totalSize int64
if err := rows.Scan(&tableName, &totalSize); err != nil {
return 0, nil, fmt.Errorf("failed to scan row: %w", err)
}
tableSize := totalSize / 1000000 // convert bytes to megabytes
if tableSize >= 1 { // we're not interested in tables smaller than 1MB
tableSizes[tableName] = tableSize
}
allTablesSize += tableSize
}
tableSizes = getTopNValues(tableSizes, tablesInUsageReport)

if err := rows.Err(); err != nil {
return 0, nil, fmt.Errorf("error iterating over rows: %w", err)
}
return allTablesSize, tableSizes, nil
}

func getTopNValues(in map[string]int64, n int) map[string]int64 {
type kv struct {
Key string
Value int64
}
var sortedSlice []kv
for k, v := range in {
sortedSlice = append(sortedSlice, kv{k, v})
}
sort.Slice(sortedSlice, func(i, j int) bool {
return sortedSlice[i].Value > sortedSlice[j].Value
})
result := make(map[string]int64) // get the top `n` values
for i := 0; i < n && i < len(sortedSlice); i++ {
result[sortedSlice[i].Key] = sortedSlice[i].Value
}
return result
}

func (a *agent) collectClickHouseVersion(ctx context.Context, stats *ClickHouseStats) error {

// https://clickhouse.com/docs/en/sql-reference/functions/other-functions#version
Expand Down Expand Up @@ -539,6 +627,14 @@ func (a *agent) collect(ctx context.Context, reportType string) (stats PhoneHome
} else {
stats.ClickHouse = ClickHouseStats{Status: "paused"}
}
if !strings.HasPrefix(a.config.ClickHouse.ConnectorType, "hydrolix") { // we only check table sizes for ClickHouse
if totalSize, topTableSizes, err := a.collectClickHouseTableSizes(ctx); err == nil {
stats.ClickHouse.DbInfoHash = a.getDbInfoHash()
stats.ClickHouse.BillableSize = totalSize
stats.ClickHouse.TopTablesSizeInfo = fmt.Sprintf("%v", topTableSizes)
logger.Info().Msgf("[USAGE REPORT] dababase=[%s] billable_size_in_Mbs=[%d] top_table_sizes=%v", a.getDbInfoHash(), totalSize, topTableSizes)
}
}

stats.IngestCounters = a.ingestCounters.AggregateAndReset()

Expand Down
73 changes: 73 additions & 0 deletions quesma/telemetry/phone_home_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,76 @@ func TestAgent_CollectElastic_Version(t *testing.T) {
assert.Equal(t, "8.11.1", response.Version.Number)

}

func TestGetTopNValues(t *testing.T) {
tests := []struct {
name string
input map[string]int64
n int
expected map[string]int64
}{
{
name: "LessThanN",
input: map[string]int64{
"table2": 300,
"table1": 500,
},
n: 5,
expected: map[string]int64{
"table1": 500,
"table2": 300,
},
},
{
name: "EqualToN",
input: map[string]int64{
"table1": 200,
"table3": 500,
"table2": 300,
},
n: 3,
expected: map[string]int64{
"table3": 500,
"table2": 300,
"table1": 200,
},
},
{
name: "MoreThanN",
input: map[string]int64{
"table2": 300,
"table4": 100,
"table1": 500,
"table3": 200,
},
n: 3,
expected: map[string]int64{
"table1": 500,
"table2": 300,
"table3": 200,
},
},
{
name: "EmptyMap",
input: map[string]int64{},
n: 3,
expected: map[string]int64{},
},
{
name: "NegativeN",
input: map[string]int64{
"table1": 500,
"table2": 300,
},
n: -1,
expected: map[string]int64{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := getTopNValues(tt.input, tt.n)
assert.Equal(t, tt.expected, result)
})
}
}
Loading