Skip to content

Commit 22ff621

Browse files
Merge pull request #1038 from Aiven-Open/keejon/backport-forwarding-fix
fix: no forwarding if primary url own
2 parents 74e8ea9 + 4ca6727 commit 22ff621

11 files changed

Lines changed: 343 additions & 69 deletions

src/karapace/coordinator/master_coordinator.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from karapace.config import Config
1414
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
1515
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
16-
from karapace.typing import SchemaReaderStoppper
16+
from karapace.typing import PrimaryInfo, SchemaReaderStoppper
1717
from threading import Thread
1818
from typing import Final
1919

@@ -157,18 +157,26 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:
157157
group_generation_id=generation if generation is not None else -1,
158158
)
159159

160-
def get_master_info(self) -> tuple[bool | None, str | None]:
160+
def get_master_info(self) -> PrimaryInfo:
161161
"""Return whether we're the master, and the actual master url that can be used if we're not"""
162162
if not self._sc:
163-
return False, None
163+
return PrimaryInfo(False, None)
164164

165165
if not self._sc.ready():
166166
# we should wait for a while after we have been elected master, we should also consume
167167
# all the messages in the log before proceeding, check the doc of `self._sc.are_we_master`
168168
# for more details
169-
return False, None
169+
return PrimaryInfo(False, None)
170170

171-
return self._sc.are_we_master(), self._sc.master_url
171+
url: str | None = None
172+
if (
173+
self._sc.master_url is not None
174+
and f"{self.config['host']}:{self.config['port']}" not in self._sc.master_url
175+
and f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" not in self._sc.master_url
176+
):
177+
url = self._sc.master_url
178+
179+
return PrimaryInfo(self._sc.are_we_master(), url)
172180

173181
def __send_close_event(self) -> None:
174182
self._closing.set()

src/karapace/coordinator/schema_coordinator.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def __init__(
197197
def is_master_assigned_to_myself(self) -> bool:
198198
return self._are_we_master or False
199199

200-
def are_we_master(self) -> bool | None:
200+
def are_we_master(self) -> bool:
201201
"""
202202
After a new election its made we should wait for a while since the previous master could have produced
203203
a new message shortly before being disconnected from the cluster.
@@ -211,7 +211,7 @@ def are_we_master(self) -> bool | None:
211211
# `self._are_we_master` is `None` only during the perform of the assignment
212212
# where we don't know if we are master yet
213213
LOG.warning("No new elections performed yet.")
214-
return None
214+
return False
215215

216216
if not self._ready or not self._schema_reader_stopper.ready():
217217
return False
@@ -522,6 +522,7 @@ def coordinator_dead(self) -> None:
522522
self._are_we_master = False
523523
self.coordinator_id = None
524524
self._coordinator_dead_fut.set_result(None)
525+
self.request_rejoin()
525526

526527
def reset_generation(self) -> None:
527528
"""Coordinator did not recognize either generation or member_id. Will

src/karapace/schema_reader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,12 +393,12 @@ def handle_messages(self) -> None:
393393

394394
watch_offsets = False
395395
if self.master_coordinator is not None:
396-
are_we_master, _ = self.master_coordinator.get_master_info()
396+
primary_info = self.master_coordinator.get_master_info()
397397
# keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct
398398
# writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any
399399
# messages off the topic are writes performed by another node
400400
# Also if master_eligibility is disabled by configuration, disable writes too
401-
if are_we_master is True:
401+
if primary_info.primary:
402402
watch_offsets = True
403403

404404
self.consume_messages(msgs, watch_offsets)

src/karapace/schema_registry.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner
3232
from karapace.schema_reader import KafkaSchemaReader
3333
from karapace.schema_references import LatestVersionReference, Reference
34-
from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version
34+
from karapace.typing import JsonObject, Mode, PrimaryInfo, SchemaId, Subject, Version
3535

3636
import asyncio
3737
import logging
@@ -85,23 +85,23 @@ async def close(self) -> None:
8585
stack.enter_context(closing(self.schema_reader))
8686
stack.enter_context(closing(self.producer))
8787

88-
async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | None]:
88+
async def get_master(self) -> PrimaryInfo:
8989
"""Resolve if current node is the primary and the primary node address.
9090
91-
:param bool ignore_readiness: Ignore waiting to become ready and return
92-
follower/primary state and primary url.
93-
:return (bool, Optional[str]): returns the primary/follower state and primary url
91+
:return PrimaryInfo: returns the PrimaryInfo object with primary state and primary url.
9492
"""
9593
async with self._master_lock:
96-
while True:
97-
are_we_master, master_url = self.mc.get_master_info()
98-
if are_we_master is None:
99-
LOG.info("No master set: %r, url: %r", are_we_master, master_url)
100-
elif not ignore_readiness and self.schema_reader.ready() is False:
101-
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
102-
else:
103-
return are_we_master, master_url
104-
await asyncio.sleep(1.0)
94+
primary_info = self.mc.get_master_info()
95+
if (
96+
# If node is not primary and no known primary url
97+
not primary_info.primary
98+
and primary_info.primary_url is None
99+
):
100+
LOG.warning("No master set: %r", primary_info)
101+
if self.schema_reader.ready() is False:
102+
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
103+
return PrimaryInfo(False, primary_url=primary_info.primary_url)
104+
return primary_info
105105

106106
def get_compatibility_mode(self, subject: Subject) -> CompatibilityModes:
107107
compatibility = self.database.get_subject_compatibility(subject=subject)

src/karapace/schema_registry_apis.py

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -145,20 +145,12 @@ async def _forward_if_not_ready_to_serve(self, request: HTTPRequest, content_typ
145145
pass
146146
else:
147147
# Not ready, still loading the state.
148-
# Needs only the master_url
149-
_, master_url = await self.schema_registry.get_master(ignore_readiness=True)
148+
primary_info = await self.schema_registry.get_master()
150149
returned_content_type = request.get_header("Content-Type") if content_type is None else content_type
151-
if not master_url:
150+
if not primary_info.primary_url:
152151
self.no_master_error(request.content_type)
153-
elif f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in master_url:
154-
# If master url is the same as the url of this Karapace respond 503.
155-
self.r(
156-
body="",
157-
content_type=returned_content_type,
158-
status=HTTPStatus.SERVICE_UNAVAILABLE,
159-
)
160152
else:
161-
url = f"{master_url}{request.url.path}"
153+
url = f"{primary_info.primary_url}{request.url.path}"
162154
await self._forward_request_remote(
163155
request=request,
164156
body=request.json,
@@ -574,13 +566,13 @@ async def config_set(self, content_type: str, *, request: HTTPRequest, user: Use
574566
status=HTTPStatus.UNPROCESSABLE_ENTITY,
575567
)
576568

577-
are_we_master, master_url = await self.schema_registry.get_master()
578-
if are_we_master:
569+
primary_info = await self.schema_registry.get_master()
570+
if primary_info.primary:
579571
self.schema_registry.send_config_message(compatibility_level=compatibility_level, subject=None)
580-
elif not master_url:
572+
elif not primary_info.primary_url:
581573
self.no_master_error(content_type)
582574
else:
583-
url = f"{master_url}/config"
575+
url = f"{primary_info.primary_url}/config"
584576
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="PUT")
585577

586578
self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)
@@ -645,13 +637,13 @@ async def config_subject_set(
645637
status=HTTPStatus.UNPROCESSABLE_ENTITY,
646638
)
647639

648-
are_we_master, master_url = await self.schema_registry.get_master()
649-
if are_we_master:
640+
primary_info = await self.schema_registry.get_master()
641+
if primary_info.primary:
650642
self.schema_registry.send_config_message(compatibility_level=compatibility_level, subject=subject)
651-
elif not master_url:
643+
elif not primary_info.primary_url:
652644
self.no_master_error(content_type)
653645
else:
654-
url = f"{master_url}/config/{subject}"
646+
url = f"{primary_info.primary_url}/config/{subject}"
655647
await self._forward_request_remote(
656648
request=request, body=request.json, url=url, content_type=content_type, method="PUT"
657649
)
@@ -670,13 +662,13 @@ async def config_subject_delete(
670662
if not self._auth.check_authorization(user, Operation.Write, f"Subject:{subject}"):
671663
self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN)
672664

673-
are_we_master, master_url = await self.schema_registry.get_master()
674-
if are_we_master:
665+
primary_info = await self.schema_registry.get_master()
666+
if primary_info.primary:
675667
self.schema_registry.send_config_subject_delete_message(subject=subject)
676-
elif not master_url:
668+
elif not primary_info.primary_url:
677669
self.no_master_error(content_type)
678670
else:
679-
url = f"{master_url}/config/{subject}"
671+
url = f"{primary_info.primary_url}/config/{subject}"
680672
await self._forward_request_remote(
681673
request=request, body=request.json, url=url, content_type=content_type, method="PUT"
682674
)
@@ -685,21 +677,24 @@ async def config_subject_delete(
685677

686678
async def master_available(self, *, request: HTTPRequest) -> None:
687679
no_cache_header = {"Cache-Control": "no-store, no-cache, must-revalidate"}
688-
are_we_master, master_url = await self.schema_registry.get_master()
689-
self.log.info("are master %s, master url %s", are_we_master, master_url)
680+
primary_info = await self.schema_registry.get_master()
681+
self.log.info("are master %s, master url %s", primary_info.primary, primary_info.primary_url)
690682

691683
if (
692684
self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access
693685
and self.schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() # pylint: disable=protected-access
694686
):
695687
raise HTTPResponse(
696-
body={"master_available": are_we_master},
688+
body={"master_available": primary_info.primary},
697689
status=HTTPStatus.OK,
698690
content_type=JSON_CONTENT_TYPE,
699691
headers=no_cache_header,
700692
)
701693

702-
if master_url is None or f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in master_url:
694+
if (
695+
primary_info.primary_url is None
696+
or f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in primary_info.primary_url
697+
):
703698
raise HTTPResponse(
704699
body={"master_available": False},
705700
status=HTTPStatus.OK,
@@ -708,7 +703,11 @@ async def master_available(self, *, request: HTTPRequest) -> None:
708703
)
709704

710705
await self._forward_request_remote(
711-
request=request, body={}, url=f"{master_url}/master_available", content_type=JSON_CONTENT_TYPE, method="GET"
706+
request=request,
707+
body={},
708+
url=f"{primary_info.primary_url}/master_available",
709+
content_type=JSON_CONTENT_TYPE,
710+
method="GET",
712711
)
713712

714713
async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None:
@@ -730,8 +729,8 @@ async def subject_delete(
730729

731730
permanent = request.query.get("permanent", "false").lower() == "true"
732731

733-
are_we_master, master_url = await self.schema_registry.get_master()
734-
if are_we_master:
732+
primary_info = await self.schema_registry.get_master()
733+
if primary_info.primary:
735734
try:
736735
version_list = await self.schema_registry.subject_delete_local(subject=subject, permanent=permanent)
737736
self.r([version.value for version in version_list], content_type, status=HTTPStatus.OK)
@@ -775,10 +774,10 @@ async def subject_delete(
775774
content_type=content_type,
776775
status=HTTPStatus.UNPROCESSABLE_ENTITY,
777776
)
778-
elif not master_url:
777+
elif not primary_info.primary_url:
779778
self.no_master_error(content_type)
780779
else:
781-
url = f"{master_url}/subjects/{subject}?permanent={permanent}"
780+
url = f"{primary_info.primary_url}/subjects/{subject}?permanent={permanent}"
782781
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")
783782

784783
async def subject_version_get(
@@ -819,8 +818,8 @@ async def subject_version_delete(
819818
self._check_authorization(user, Operation.Write, f"Subject:{subject}")
820819
permanent = request.query.get("permanent", "false").lower() == "true"
821820

822-
are_we_master, master_url = await self.schema_registry.get_master()
823-
if are_we_master:
821+
primary_info = await self.schema_registry.get_master()
822+
if primary_info.primary:
824823
try:
825824
resolved_version = await self.schema_registry.subject_version_delete_local(
826825
subject, Versioner.V(version), permanent
@@ -882,10 +881,10 @@ async def subject_version_delete(
882881
)
883882
except InvalidVersion:
884883
self._invalid_version(content_type, version)
885-
elif not master_url:
884+
elif not primary_info.primary_url:
886885
self.no_master_error(content_type)
887886
else:
888-
url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
887+
url = f"{primary_info.primary_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
889888
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")
890889

891890
async def subject_version_schema_get(
@@ -1241,8 +1240,8 @@ async def subject_post(
12411240
if schema_id is not None:
12421241
self.r({"id": schema_id}, content_type)
12431242

1244-
are_we_master, master_url = await self.schema_registry.get_master()
1245-
if are_we_master:
1243+
primary_info = await self.schema_registry.get_master()
1244+
if primary_info.primary:
12461245
try:
12471246
schema_id = await self.schema_registry.write_new_schema_local(subject, new_schema, references)
12481247
self.r(
@@ -1279,10 +1278,10 @@ async def subject_post(
12791278
except Exception as xx:
12801279
raise xx
12811280

1282-
elif not master_url:
1281+
elif not primary_info.primary_url:
12831282
self.no_master_error(content_type)
12841283
else:
1285-
url = f"{master_url}/subjects/{subject}/versions"
1284+
url = f"{primary_info.primary_url}/subjects/{subject}/versions"
12861285
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")
12871286

12881287
async def get_global_mode(

src/karapace/typing.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from abc import ABC, abstractmethod
88
from collections.abc import Mapping, Sequence
9+
from dataclasses import dataclass
910
from enum import Enum, unique
1011
from karapace.errors import InvalidVersion
1112
from typing import Any, ClassVar, NewType, Union
@@ -113,3 +114,9 @@ def ready(self) -> bool:
113114
@abstractmethod
114115
def set_not_ready(self) -> None:
115116
pass
117+
118+
119+
@dataclass(frozen=True)
120+
class PrimaryInfo:
121+
primary: bool
122+
primary_url: str | None

0 commit comments

Comments
 (0)