-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(BA-75): Introduce Raftify
and refactor DistributedGlobalTimer
with Raft
#2105
base: main
Are you sure you want to change the base?
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
Your org has enabled the Graphite merge queue for merging into mainAdd the label “flow:merge-queue” to the PR and Graphite will automatically add it to the merge queue when it’s ready to merge. Or use the label “flow:hotfix” to add to the merge queue as a hot fix. You must have a Graphite account in order to use the merge queue. Sign up using this link. |
RaftGlobalTimer
to replace DistributedGlobalTimer
07cc613
to
5be82bb
Compare
5be82bb
to
fbe8733
Compare
fbe8733
to
e5705e7
Compare
e5705e7
to
b9b8a24
Compare
src/ai/backend/common/distributed.py
Outdated
async def join(self) -> None: | ||
self._tick_task = asyncio.create_task(self.generate_tick()) | ||
|
||
async def leave(self) -> None: | ||
self._stopped = True | ||
await asyncio.sleep(0) | ||
if not self._tick_task.done(): | ||
try: | ||
self._tick_task.cancel() | ||
await self._tick_task | ||
except asyncio.CancelledError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitions here are copy & pasted from DistributedLockGlobalTimer
. I suggest you to just directly inherit DistributedLockGlobalTimer
instead of AbstractGlobalTimer
so that the RaftGlobalTimer
can benefit from already defined join()
and leave()
functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To have RaftGlobalTimer
inherit from DistributedLockGlobalTimer
instead of AbstractGlobalTimer
, it is necessary to define RaftDistributedLock
. This seems to conflict with the intent to replace the existing DistributedLock
, and to implement this, it is essential to decide how the RaftDistributedLock
should work.
IMHO, if our purpose here is simply to reuse join()
and leave()
, it would be appropriate to move the implementation of these two functions to AbstractGlobalTimer
.
src/ai/backend/manager/api/logs.py
Outdated
|
||
if root_ctx.raft_ctx.use_raft(): | ||
app_ctx.log_cleanup_timer = RaftGlobalTimer( | ||
root_ctx.raft_ctx.raft_node, | ||
root_ctx.event_producer, | ||
lambda: DoLogCleanupEvent(), | ||
20.0, | ||
initial_delay=17.0, | ||
) | ||
else: | ||
app_ctx.log_cleanup_timer = DistributedLockGlobalTimer( | ||
root_ctx.distributed_lock_factory(LockID.LOCKID_LOG_CLEANUP_TIMER, 20.0), | ||
root_ctx.event_producer, | ||
lambda: DoLogCleanupEvent(), | ||
20.0, | ||
initial_delay=17.0, | ||
task_name="log_cleanup_task", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These kind of flag-like approach can be a big burdensome when adapting a third lock backend - please refactor this to match our match-case selection convention (ref).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored the related codes in 5eb1e9f to reflect the feedback, and this issue is resolved in the commit.
In this commit, the use_raft()
checks are replaced with match-case statements.
[raft] | ||
heartbeat-tick = 3 | ||
election-tick = 10 | ||
log-dir = "./logs" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding raft backend is only enabled when this directive is specified at the configuration file. This kind of approach can be too implicit from system manager's perspective; Please refactor activation mechanism so that config writer can explicitly mention which timer backend to be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored the related codes in 5eb1e9f to reflect the feedback, and this issue is resolved in the commit.
In this commit, I created global-timer
option to manager.toml
(manager local config).
I think the global-timer
option allows to specify whether to use raft
or distributed-lock
more explicitly.
May I ask for a review for the commit?
from typing import Any | ||
|
||
|
||
class Logger: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to retain separate Logger class for raft APIs only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logs printed on the Rust side should be passed to the Python logger instead of being directly output to stdout or a file.
In other words, the methods of this class are called from Rust. That's why the logs output by raft-rs
are formatted to match the Backend.AI logger format.
You can see the details of this class in the code below.
Ref: https://github.com/lablup/raftify/blob/main/binding/python/src/bindings/logger.rs
class SetCommand: | ||
""" | ||
Represent simple key-value command. | ||
Use pickle to serialize the data. | ||
""" | ||
|
||
def __init__(self, key: str, value: str) -> None: | ||
self.key = key | ||
self.value = value | ||
|
||
def encode(self) -> bytes: | ||
return pickle.dumps(self.__dict__) | ||
|
||
@classmethod | ||
def decode(cls, packed: bytes) -> "SetCommand": | ||
unpacked = pickle.loads(packed) | ||
return cls(unpacked["key"], unpacked["value"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the general concepts of this class can be achieved by just using dataclass
library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I haven't yet found a proper way to express the association with Rust types, this class is used on the Rust side as the LogEntry
type. (Refer to the PyLogEntry
implementation below)
https://github.com/lablup/raftify/blob/main/binding/python/src/bindings/state_machine.rs#L27-L31
This means that the encode
and decode
method should be callable from Rust.
If we try to refactor SetCommand
like below,
@dataclass
class SetCommand:
key: str
value: str
We will encounter the following error when we call them from Rust.
2024-05-10 11:48:16,101 - ERROR - Error handling request
Traceback (most recent call last):
File "/Users/jopemachine/.pyenv/versions/3.12.2/lib/python3.12/site-packages/aiohttp/web_protocol.py", line 452, in _handle_request
resp = await request_handler(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jopemachine/.pyenv/versions/3.12.2/lib/python3.12/site-packages/aiohttp/web_app.py", line 543, in _handle
resp = await handler(request)
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jopemachine/Desktop/raftify/binding/python/examples/web_server_api.py", line 52, in put
await raft_node.propose(message.encode())
^^^^^^^^^^^^^^
AttributeError: 'SetCommand' object has no attribute 'encode'
If there is still a way to improve the SetCommand
using dataclass
, I would appreciate your advice. If there is room for improvement, I will reflect it immediately.
set_confchange_context_deserializer, | ||
set_confchangev2_context_deserializer, | ||
set_entry_context_deserializer, | ||
set_entry_data_deserializer, | ||
set_fsm_deserializer, | ||
set_log_entry_deserializer, | ||
set_message_context_deserializer, | ||
set_snapshot_data_deserializer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more of an upstream problem, but since we also has the key of raftify
project: why does every context have its own _deserializer()
API function? Won't it be better to merge these into one gateway function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your point. It would be more helpful for code readability to register callbacks all at once, rather than the current method.
I will make improvements in the PR below.
lablup/raftify#101
src/ai/backend/manager/types.py
Outdated
class RaftNodeInitialRole(str, enum.Enum): | ||
LEADER = "leader" | ||
VOTER = "voter" | ||
LEARNER = "learner" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If raftify or raft-rs define the behaviors for each role, how about having those roles in raftify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, in Raftify, InitialRole
type is already defined on the Rust side, and this type exists in the Python bindings as well.
The reason I define and use a separate Enum with the same contents here is because the type is not an actual Enum in Python.
For instance, below type validating code produce the following erorr.
from raftify import InitialRole
...
t.Key("role", default=InitialRole.VOTER): tx.Enum(InitialRole),
File "/home/jopemachine/backend.ai/dist/export/python/virtualenvs/python-default/3.12.2/lib/python3.12/site-packages/trafaret/base.py", line 1143, in transform
for k, v, names in key(value, context=context):
File "/home/jopemachine/backend.ai/dist/export/python/virtualenvs/python-default/3.12.2/lib/python3.12/site-packages/trafaret/base.py", line 972, in __call__
result = self.trafaret(self.get_data(data, default), context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jopemachine/backend.ai/dist/export/python/virtualenvs/python-default/3.12.2/lib/python3.12/site-packages/trafaret/base.py", line 152, in __call__
return self.check(val, context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jopemachine/backend.ai/dist/export/python/virtualenvs/python-default/3.12.2/lib/python3.12/site-packages/trafaret/base.py", line 115, in check
return self.check_and_return(value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/jopemachine/backend.ai/src/ai/backend/common/validators.py", line 216, in check_and_return
return self.enum_cls(value)
^^^^^^^^^^^^^^^^^^^^
TypeError: No constructor defined
There might be a way to handle Enum types more wisely in writing PyO3 bindings.
There are several aspects for improvement currently existing on the Python bindings side of Raftify like this.
Ref: lablup/raftify#91
CI test is failing while downloading raftify package, this should be fixed after setting up raftify PyPI package deployment workflow |
…it more explicitly
1fec8fa
to
a585601
Compare
RaftGlobalTimer
to replace DistributedGlobalTimer
DistributedGlobalTimer
with Raft
DistributedGlobalTimer
with RaftRaftify
and refactor DistributedGlobalTimer
with Raft
Raftify
and refactor DistributedGlobalTimer
with RaftRaftify
and refactor DistributedGlobalTimer
with Raft
Resolves #2969 (BA-75).
Prerequisite of #415 (BA-269).
This PR aims to resolve the distribution locking issues by integrating Raftify with Backend.AI manager (based on GlobalTimer operating by Raft algorithm).
Any kind of feedback is welcome.
How to setup test environment
num-proc
ofmanager.toml
to an arbitrary number other than 1, and setraft
section when running Backend.AI manager.raft-cluster-config.toml
and setinitial_peers
there. Below is an example.Testing and debugging
For example, you can use the below command for putting a new log entry to the cluster.
For printing all persisted logs, you can use the below command.
For more details, please refer to the Raftify documentation.
Checklist: (if applicable)
📚 Documentation preview 📚: https://sorna--2105.org.readthedocs.build/en/2105/
📚 Documentation preview 📚: https://sorna-ko--2105.org.readthedocs.build/ko/2105/