Skip to content
Draft
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions framework/py/flwr/common/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,18 @@ class EventLogWriterType:
def __new__(cls) -> EventLogWriterType:
"""Prevent instantiation."""
raise TypeError(f"{cls.__name__} cannot be instantiated.")


class SchedulerPluginType:
"""Scheduler plugin types."""

CLIENT_APP = "clientapp"

def __new__(cls) -> SchedulerPluginType:
"""Prevent instantiation."""
raise TypeError(f"{cls.__name__} cannot be instantiated.")

@staticmethod
def all() -> list[str]:
"""Return all scheduler plugin types."""
return [SchedulerPluginType.CLIENT_APP]
4 changes: 4 additions & 0 deletions framework/py/flwr/common/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ def _generate_next_value_(name: str, start: int, count: int, last_values: list[A
FLWR_SERVERAPP_RUN_ENTER = auto()
FLWR_SERVERAPP_RUN_LEAVE = auto()

# CLI: flwr-app-scheduler
FLWR_APP_SCHEDULER_RUN_ENTER = auto()
FLWR_APP_SCHEDULER_RUN_LEAVE = auto()

# --- Simulation Engine ------------------------------------------------------------

# CLI: flower-simulation
Expand Down
22 changes: 22 additions & 0 deletions framework/py/flwr/supercore/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2025 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower command line interface for shared infrastructure components."""


from .flwr_app_scheduler import flwr_app_scheduler

__all__ = [
"flwr_app_scheduler",
]
85 changes: 85 additions & 0 deletions framework/py/flwr/supercore/cli/flwr_app_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2025 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""`flwr-app-scheduler` command."""


import argparse
from logging import INFO

from flwr.common import EventType, event
from flwr.common.constant import SchedulerPluginType
from flwr.common.logger import log
from flwr.supercore.scheduler import run_app_scheduler
from flwr.supernode.scheduler import SimpleClientAppSchedulerPlugin


def flwr_app_scheduler() -> None:
"""Run `flwr-app-scheduler` command."""
args = _parse_args().parse_args()

# Log the first message after parsing arguments in case of `--help`
log(INFO, "Starting Flower App Scheduler")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add logs similar to those in the flwr-*** commands mentioning about TLS and the address used? For example this is how it's done with flwr-clientapp atm:
https://github.com/adap/flower/blob/ce17e4e2573bb53a94a6a879e871f38a4394ffb2/framework/py/flwr/supernode/cli/flwr_clientapp.py#L32-L45

# Trigger telemetry event
event(EventType.FLWR_APP_SCHEDULER_RUN_ENTER)

run_app_scheduler(
plugin_class=_get_plugin_class(args.plugin_type),
appio_api_address=args.appio_api_address,
flwr_dir=args.flwr_dir,
)


def _parse_args() -> argparse.ArgumentParser:
"""Parse `flwr-app-scheduler` command line arguments."""
parser = argparse.ArgumentParser(
description="Run a Flower App Scheduler",
)
parser.add_argument(
"--appio-api-address", type=str, required=True, help="Address of the AppIO API"
)
parser.add_argument(
"--plugin-type",
type=str,
choices=SchedulerPluginType.all(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

required=True,
help="The type of plugin to use.",
)
parser.add_argument(
"--insecure",
action="store_true",
help="Connect to the AppIO API without TLS. "
"Data transmitted between the client and server is not encrypted. "
"Use this flag only if you understand the risks.",
)
parser.add_argument(
"--flwr-dir",
default=None,
help="""The path containing installed Flower Apps.
By default, this value is equal to:

- `$FLWR_HOME/` if `$FLWR_HOME` is defined
- `$XDG_DATA_HOME/.flwr/` if `$XDG_DATA_HOME` is defined
- `$HOME/.flwr/` in all other cases
""",
)
Comment on lines +60 to +77
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we inject these aregs via add_args_flwr_app_common as we do in the flwr-*** commands? For example in flwr-clientapp: https://github.com/adap/flower/blob/ce17e4e2573bb53a94a6a879e871f38a4394ffb2/framework/py/flwr/supernode/cli/flwr_clientapp.py#L87

return parser


def _get_plugin_class(plugin_type: str) -> type[SimpleClientAppSchedulerPlugin]:
"""Get the plugin class based on the plugin type."""
if plugin_type == SchedulerPluginType.CLIENT_APP:
return SimpleClientAppSchedulerPlugin
raise ValueError(f"Unknown plugin type: {plugin_type}")
2 changes: 2 additions & 0 deletions framework/py/flwr/supercore/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@


from .plugin import SchedulerPlugin
from .run_scheduler import run_app_scheduler

__all__ = [
"SchedulerPlugin",
"run_app_scheduler",
]
110 changes: 110 additions & 0 deletions framework/py/flwr/supercore/scheduler/run_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright 2025 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower app scheduler process."""


import time
from typing import Optional

from flwr.common.exit_handlers import register_exit_handlers
from flwr.common.grpc import create_channel, on_channel_state_change
from flwr.common.serde import run_from_proto
from flwr.common.telemetry import EventType
from flwr.common.typing import Run
from flwr.proto.clientappio_pb2 import ( # pylint: disable=E0611
GetRunIdsWithPendingMessagesRequest,
RequestTokenRequest,
)
from flwr.proto.clientappio_pb2_grpc import ClientAppIoStub
from flwr.proto.run_pb2 import GetRunRequest # pylint: disable=E0611

from .plugin import SchedulerPlugin


def run_app_scheduler(
plugin_class: type[SchedulerPlugin],
appio_api_address: str,
flwr_dir: Optional[str] = None,
) -> None:
"""Run the Flower app scheduler.

Parameters
----------
plugin_class : type[SchedulerPlugin]
The class of the scheduler plugin to use.
appio_api_address : str
The address of the AppIO API.
flwr_dir : Optional[str] (default: None)
The Flower directory.
"""
# Create the channel to the AppIO API
# No TLS support for now, so insecure connection
channel = create_channel(
server_address=appio_api_address,
insecure=True,
root_certificates=None,
)
channel.subscribe(on_channel_state_change)

# Register exit handlers to close the channel on exit
register_exit_handlers(
event_type=EventType.FLWR_APP_SCHEDULER_RUN_LEAVE,
exit_message="Flower app scheduler terminated gracefully.",
exit_handlers=[lambda: channel.close()],
)

# Create the gRPC stub for the AppIO API
# We shall merge the ClientAppIo and ServerAppIo in the future
# so we can use the same stub for both.
# For now, we use ClientAppIoStub.
stub = ClientAppIoStub(channel)

def get_run(run_id: int) -> Run:
_req = GetRunRequest(run_id=run_id)
_res = stub.GetRun(_req)
return run_from_proto(_res.run)

# Create the scheduler plugin instance
plugin = plugin_class(
appio_api_address=appio_api_address,
flwr_dir=flwr_dir,
get_run=get_run,
)

# Start the scheduler loop
try:
while True:
# Fetch suitable run IDs
get_runs_req = GetRunIdsWithPendingMessagesRequest()
get_runs_res = stub.GetRunIdsWithPendingMessages(get_runs_req)

# Allow the plugin to select a run ID
run_id = None
if get_runs_res.run_ids:
run_id = plugin.select_run_id(candidate_run_ids=get_runs_res.run_ids)

# Apply for a token if a run ID was selected
if run_id is not None:
tk_req = RequestTokenRequest(run_id=run_id)
tk_res = stub.RequestToken(tk_req)

# Launch the app if a token was granted; do nothing if not
if tk_res.token:
plugin.launch_app(token=tk_res.token, run_id=run_id)

# Sleep for a while before checking again
time.sleep(1)
finally:
channel.close()
Loading
Loading