Skip to content

Commit 6a80f32

Browse files
committed
feat: add a view for failed replication items
1 parent c93fa9e commit 6a80f32

File tree

5 files changed

+231
-120
lines changed

5 files changed

+231
-120
lines changed
+141-94
Original file line numberDiff line numberDiff line change
@@ -1,126 +1,173 @@
1-
import { Table, Button, notification, Typography, Tooltip, Spin } from 'antd'
1+
import { Table, Button, notification, Typography, Tooltip, Spin, Select, Space } from 'antd'
22
import { usePollingEffect } from '../../utils/usePollingEffect'
3-
import React, { useState } from 'react'
3+
import React, { useState, useEffect } from 'react'
44
import { ColumnType } from 'antd/es/table'
55

66
const { Paragraph } = Typography
77

8-
interface RunningQueryData {
9-
query: string
10-
read_rows: number
11-
read_rows_readable: string
12-
query_id: string
13-
total_rows_approx: number
14-
total_rows_approx_readable: string
15-
elapsed: number
16-
memory_usage: string
8+
interface ClusterNode {
9+
cluster: string
10+
shard_num: number
11+
shard_weight: number
12+
replica_num: number
13+
host_name: string
14+
host_address: string
15+
port: number
16+
is_local: number
17+
user: string
18+
default_database: string
19+
errors_count: number
20+
slowdowns_count: number
21+
estimated_recovery_time: number
1722
}
1823

19-
function KillQueryButton({ queryId }: any) {
20-
const [isLoading, setIsLoading] = useState(false)
21-
const [isKilled, setIsKilled] = useState(false)
24+
interface Cluster {
25+
cluster: string
26+
nodes: ClusterNode[]
27+
}
2228

23-
const killQuery = async () => {
24-
setIsLoading(true)
25-
try {
26-
const res = await fetch(`/api/analyze/${queryId}/kill_query`, {
27-
method: 'POST',
28-
headers: {
29-
'Content-Type': 'application/x-www-form-urlencoded',
30-
},
31-
body: new URLSearchParams({
32-
query_id: queryId,
33-
}),
34-
})
35-
setIsKilled(true)
36-
setIsLoading(false)
37-
return await res.json()
38-
} catch (err) {
39-
setIsLoading(false)
40-
notification.error({
41-
message: 'Killing query failed',
42-
})
43-
}
44-
}
45-
return (
46-
<>
47-
{isKilled ? (
48-
<Button disabled>Query killed</Button>
49-
) : (
50-
<Button danger onClick={killQuery} loading={isLoading}>
51-
Kill query
52-
</Button>
53-
)}
54-
</>
55-
)
29+
interface ReplicationQueueItem {
30+
host_name: string
31+
database: string
32+
table: string
33+
position: number
34+
error: string
35+
last_attempt_time: string
36+
num_attempts: number
37+
type: string
5638
}
5739

5840
export default function Replication() {
59-
const [runningQueries, setRunningQueries] = useState([])
60-
const [loadingRunningQueries, setLoadingRunningQueries] = useState(false)
41+
const [replicationQueue, setReplicationQueue] = useState<ReplicationQueueItem[]>([])
42+
const [loadingReplication, setLoadingReplication] = useState(false)
43+
const [selectedCluster, setSelectedCluster] = useState<string>('')
44+
const [clusters, setClusters] = useState<Cluster[]>([])
45+
const [loadingClusters, setLoadingClusters] = useState(false)
46+
47+
useEffect(() => {
48+
const fetchClusters = async () => {
49+
setLoadingClusters(true)
50+
try {
51+
const res = await fetch('/api/clusters')
52+
const resJson: Cluster[] = await res.json()
53+
setClusters(resJson)
54+
if (resJson.length > 0) {
55+
setSelectedCluster(resJson[0].cluster)
56+
}
57+
} catch (err) {
58+
notification.error({
59+
message: 'Failed to fetch clusters',
60+
description: 'Please try again later',
61+
})
62+
}
63+
setLoadingClusters(false)
64+
}
65+
fetchClusters()
66+
}, [])
6167

62-
const columns: ColumnType<RunningQueryData>[] = [
68+
const columns: ColumnType<ReplicationQueueItem>[] = [
69+
{
70+
title: 'Host',
71+
dataIndex: 'host_name',
72+
key: 'host_name',
73+
},
6374
{
64-
title: 'Query',
65-
dataIndex: 'normalized_query',
66-
key: 'query',
67-
render: (_: any, item) => {
68-
let index = 0
69-
return (
70-
<Paragraph
71-
style={{ maxWidth: '100%', fontFamily: 'monospace' }}
72-
ellipsis={{
73-
rows: 2,
74-
expandable: true,
75-
}}
76-
>
77-
{item.query.replace(/(\?)/g, () => {
78-
index = index + 1
79-
return '$' + index
80-
})}
81-
</Paragraph>
82-
)
83-
},
75+
title: 'Database',
76+
dataIndex: 'database',
77+
key: 'database',
8478
},
85-
{ title: 'User', dataIndex: 'user' },
86-
{ title: 'Elapsed time', dataIndex: 'elapsed' },
8779
{
88-
title: 'Rows read',
89-
dataIndex: 'read_rows',
90-
render: (_: any, item) => (
91-
<Tooltip title={`~${item.read_rows}/${item.total_rows_approx}`}>
92-
~{item.read_rows_readable}/{item.total_rows_approx_readable}
93-
</Tooltip>
80+
title: 'Table',
81+
dataIndex: 'table',
82+
key: 'table',
83+
},
84+
{
85+
title: 'Error',
86+
dataIndex: 'error',
87+
key: 'error',
88+
render: (error: string) => (
89+
<Paragraph
90+
style={{ maxWidth: '400px', color: 'red' }}
91+
ellipsis={{
92+
rows: 2,
93+
expandable: true,
94+
}}
95+
>
96+
{error}
97+
</Paragraph>
9498
),
9599
},
96-
{ title: 'Memory Usage', dataIndex: 'memory_usage' },
97100
{
98-
title: 'Actions',
99-
render: (_: any, item) => <KillQueryButton queryId={item.query_id} />,
101+
title: 'Last Attempt',
102+
dataIndex: 'last_attempt_time',
103+
key: 'last_attempt_time',
104+
},
105+
{
106+
title: 'Attempts',
107+
dataIndex: 'num_attempts',
108+
key: 'num_attempts',
109+
},
110+
{
111+
title: 'Type',
112+
dataIndex: 'type',
113+
key: 'type',
100114
},
101115
]
102116

103117
usePollingEffect(
104118
async () => {
105-
setLoadingRunningQueries(true)
106-
const res = await fetch('/api/analyze/running_queries')
107-
const resJson = await res.json()
108-
setRunningQueries(resJson)
109-
setLoadingRunningQueries(false)
119+
if (!selectedCluster) return
120+
121+
setLoadingReplication(true)
122+
try {
123+
const res = await fetch(`/api/replication/?cluster=${selectedCluster}`)
124+
const resJson = await res.json()
125+
// Filter for failed items only
126+
const failedItems = resJson.filter((item: ReplicationQueueItem) => item.error)
127+
setReplicationQueue(failedItems)
128+
} catch (err) {
129+
notification.error({
130+
message: 'Failed to fetch replication queue',
131+
description: 'Please try again later',
132+
})
133+
}
134+
setLoadingReplication(false)
110135
},
111-
[],
136+
[selectedCluster],
112137
{ interval: 5000 }
113138
)
114139

115140
return (
116141
<>
117-
<h1 style={{ textAlign: 'left' }}>Running queries {loadingRunningQueries ? <Spin /> : null}</h1>
118-
<br />
119-
<Table
120-
columns={columns}
121-
dataSource={runningQueries}
122-
loading={runningQueries.length == 0 && loadingRunningQueries}
123-
/>
142+
<Space direction="vertical" size="large" style={{ width: '100%' }}>
143+
<Space>
144+
<h1 style={{ margin: 0 }}>
145+
{`${replicationQueue.length}`} Failed Replication Queue Items
146+
</h1>
147+
{loadingReplication && <Spin />}
148+
</Space>
149+
150+
<Select
151+
style={{ width: 200 }}
152+
value={selectedCluster}
153+
onChange={setSelectedCluster}
154+
loading={loadingClusters}
155+
placeholder="Select a cluster"
156+
>
157+
{clusters.map((cluster) => (
158+
<Select.Option key={cluster.cluster} value={cluster.cluster}>
159+
{cluster.cluster}
160+
</Select.Option>
161+
))}
162+
</Select>
163+
164+
<Table
165+
columns={columns}
166+
dataSource={replicationQueue}
167+
loading={replicationQueue.length === 0 && loadingReplication}
168+
rowKey={(record) => `${record.host_name}-${record.table}-${record.position}`}
169+
/>
170+
</Space>
124171
</>
125172
)
126173
}

housewatch/api/cluster.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import structlog
2-
from rest_framework.decorators import action
32
from rest_framework.request import Request
43
from rest_framework.response import Response
54
from rest_framework.viewsets import GenericViewSet
@@ -15,3 +14,11 @@ def list(self, request: Request) -> Response:
1514

1615
def retrieve(self, request: Request, pk: str) -> Response:
1716
return Response(clusters.get_cluster(pk))
17+
18+
19+
class ReplicationViewset(GenericViewSet):
20+
def list(self, request: Request) -> Response:
21+
cluster = request.query_params.get("cluster")
22+
if not cluster:
23+
return Response({"error": "cluster parameter is required"}, status=400)
24+
return Response(list(clusters.get_replication_queues(cluster)))

housewatch/clickhouse/client.py

+36-24
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import os
21
from typing import Dict, Optional
32
from clickhouse_pool import ChPool
43
from clickhouse_driver import Client
@@ -23,6 +22,24 @@
2322
)
2423

2524

25+
def get_client(node: Optional[Dict] = None):
26+
if node:
27+
client = Client(
28+
host=node["host_name"],
29+
database=settings.CLICKHOUSE_DATABASE,
30+
user=settings.CLICKHOUSE_USER,
31+
secure=settings.CLICKHOUSE_SECURE,
32+
ca_certs=settings.CLICKHOUSE_CA,
33+
verify=settings.CLICKHOUSE_VERIFY,
34+
settings={"max_result_rows": "2000"},
35+
send_receive_timeout=30,
36+
password=settings.CLICKHOUSE_PASSWORD,
37+
)
38+
else:
39+
client = pool.get_client()
40+
return client
41+
42+
2643
def run_query_on_shards(
2744
query: str,
2845
params: Dict[str, str | int] = {},
@@ -38,24 +55,13 @@ def run_query_on_shards(
3855
for shard, node in nodes:
3956
params["shard"] = shard
4057
final_query = query % (params or {}) if substitute_params else query
41-
client = Client(
42-
host=node["host_address"],
43-
database=settings.CLICKHOUSE_DATABASE,
44-
user=settings.CLICKHOUSE_USER,
45-
secure=settings.CLICKHOUSE_SECURE,
46-
ca_certs=settings.CLICKHOUSE_CA,
47-
verify=settings.CLICKHOUSE_VERIFY,
48-
settings={"max_result_rows": "2000"},
49-
send_receive_timeout=30,
50-
password=settings.CLICKHOUSE_PASSWORD,
51-
)
58+
client = get_client(node)
5259
result = client.execute(final_query, settings=query_settings, with_column_types=True, query_id=query_id)
5360
response = []
5461
for res in result[0]:
5562
item = {}
5663
for index, key in enumerate(result[1]):
5764
item[key[0]] = res[index]
58-
5965
response.append(item)
6066
responses.append((shard, response))
6167
return response
@@ -68,7 +74,7 @@ def run_query(
6874
query_id: Optional[str] = None,
6975
use_cache: bool = True, # defaulting to True for now for simplicity, but ideally we should default this to False
7076
substitute_params: bool = True,
71-
cluster: Optional[str] = None,
77+
node: Optional[Dict] = None,
7278
):
7379
final_query = query % (params or {}) if substitute_params else query
7480
query_hash = ""
@@ -79,18 +85,24 @@ def run_query(
7985
if cached_result:
8086
return json.loads(cached_result)
8187

82-
with pool.get_client() as client:
88+
response = []
89+
if node:
90+
client = get_client(node)
8391
result = client.execute(final_query, settings=settings, with_column_types=True, query_id=query_id)
84-
response = []
85-
for res in result[0]:
86-
item = {}
87-
for index, key in enumerate(result[1]):
88-
item[key[0]] = res[index]
92+
else:
93+
with pool.get_client() as client:
94+
result = client.execute(final_query, settings=settings, with_column_types=True, query_id=query_id)
8995

90-
response.append(item)
91-
if use_cache:
92-
cache.set(query_hash, json.dumps(response, default=str), timeout=60 * 5)
93-
return response
96+
for res in result[0]:
97+
item = {}
98+
for index, key in enumerate(result[1]):
99+
item[key[0]] = res[index]
100+
response.append(item)
101+
102+
if use_cache:
103+
cache.set(query_hash, json.dumps(response, default=str), timeout=60 * 5)
104+
105+
return response
94106

95107

96108
existing_system_tables = [row["name"] for row in run_query(EXISTING_TABLES_SQL, use_cache=False)]

0 commit comments

Comments
 (0)