-
Notifications
You must be signed in to change notification settings - Fork 8
[DRAFT] Priority-based scheduler for federated applications #328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
92f8ada
c4c2b9e
adacaf8
121f299
a2b4048
72b84ba
5ecfb3e
427b617
cb7775a
520b3f1
07da797
5f2822f
cf6629a
e85d92f
23b92d5
0691dbd
27e6d72
6df041c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,13 +18,25 @@ struct FederatedEnvironment { | |
| // communicate with other federates running in different environments. | ||
| size_t net_bundles_size; // The number of NetworkChannels in the net_channels array. | ||
| size_t federation_longest_path; // The longest path in the federation. | ||
| StartupCoordinator* startup_coordinator; // A pointer to the startup coordinator, if the program has one. | ||
| ClockSynchronization* clock_sync; // A pointer to the clock synchronization module, if the program has one. | ||
|
|
||
| StartupCoordinator* startup_coordinator; // A pointer to the startup coordinator, if the program has one. | ||
| ClockSynchronization* clock_sync; // A pointer to the clock synchronization module, if the program has one. | ||
| interval_t global_max_wait; // The global maximum wait time for remote input ports for this federate. | ||
|
|
||
| /** | ||
| * @brief Set the global maximum wait time for this federate to assume remote input ports are absent. | ||
| * @param super The environment. | ||
| * @param max_wait The maximum wait time to be set. | ||
| */ | ||
| void (*set_maxwait)(Environment *super, interval_t max_wait); | ||
| }; | ||
|
|
||
| void FederatedEnvironment_ctor(FederatedEnvironment* self, Reactor* main, Scheduler* scheduler, bool fast_mode, | ||
| FederatedConnectionBundle** net_bundles, size_t net_bundles_size, | ||
| StartupCoordinator* startup_coordinator, ClockSynchronization* clock_sync); | ||
| void FederatedEnvironment_free(FederatedEnvironment* self); | ||
|
|
||
| /** Set the global maximum wait time for this federate to assume remote input ports are absent. */ | ||
| #define lf_set_fed_maxwait(time) ((FederatedEnvironment *)env)->set_maxwait(env, time) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is the user-facing function, perhaps it needs documentation? |
||
|
|
||
| #endif | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
|
|
||
| typedef struct Platform Platform; | ||
| typedef struct Mutex Mutex; | ||
| typedef struct ThreadedPlatform ThreadedPlatform; | ||
|
|
||
| /** | ||
| * @brief Each supported platform must provide a mutex, this is used by the runtime | ||
|
|
@@ -60,6 +61,85 @@ Platform* Platform_new(); | |
| // Allow each platform to provide its own implementation for printing. | ||
| void Platform_vprintf(const char* fmt, va_list args); | ||
|
|
||
| /** | ||
| * @brief ThreadedPlatform is a subclass of Platform that provides additional | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sed /subclass/extension of/ |
||
| * functionality for setting the priority, scheduling policy, and core affinity | ||
| * of the current thread. | ||
| */ | ||
| struct ThreadedPlatform { | ||
| Platform super; // Platform is the base class (this allows to cast Platform to ThreadedPlatform). | ||
| /** | ||
| * @brief Set the priority of the current thread to schedule the current | ||
| * reaction with deadline monotonic. The main thread before sleeping waiting | ||
| * for the physical time to catch up with the logical time should invoke | ||
| * this with LF_SLEEP_PRIORITY. The TCP thread should invoke it with LF_TCP_THREAD_PRIORITY. | ||
| */ | ||
| lf_ret_t (*set_thread_priority)(interval_t rel_deadline); | ||
| /** | ||
| * @brief Get the priority of the current thread. | ||
| */ | ||
| int (*get_thread_priority)(); | ||
| /** | ||
| * @brief Set the scheduling policy for the current thread. The policy is | ||
| * specified by the LF_THREAD_POLICY macro. | ||
| */ | ||
| lf_ret_t (*set_scheduling_policy)(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this function not take the policy as an argument? |
||
| /** | ||
| * @brief Set the core affinity for the current thread. The number of cores | ||
| * to use is specified by the LF_NUMBER_OF_CORES macro. | ||
| */ | ||
| lf_ret_t (*set_core_affinity)(); | ||
| }; | ||
|
|
||
| /** | ||
| * @brief The thread scheduling policies. | ||
| */ | ||
| typedef enum { | ||
| /** Non real-time scheduling policy. Corresponds to SCHED_OTHER in Linux. */ | ||
| LF_SCHED_FAIR, | ||
| /** Real-time, time-slicing priority-based policy. Corresponds to SCHED_RR in Linux. */ | ||
| LF_SCHED_TIMESLICE, | ||
| /** Real-time, priority-only based scheduling. Corresponds to SCHED_FIFO in Linux. */ | ||
| LF_SCHED_PRIORITY, | ||
| } lf_scheduling_policy_type_t; | ||
|
|
||
|
|
||
| /** | ||
| * @brief The thread scheduling policy to use. | ||
| * | ||
| * This should be one of LF_SCHED_FAIR, LF_SCHED_TIMESLICE, or LF_SCHED_PRIORITY. | ||
| * The default is LF_SCHED_FAIR, which corresponds to the Linux SCHED_OTHER. | ||
| * LF_SCHED_TIMESLICE corresponds to Linux SCHED_RR, and LF_SCHED_PRIORITY corresponds | ||
| * to SCHED_FIFO. | ||
| */ | ||
| #ifndef LF_THREAD_POLICY | ||
| #define LF_THREAD_POLICY LF_SCHED_FAIR | ||
| #endif | ||
|
|
||
|
|
||
| /** | ||
| * @brief The number of cores of the hardware platform to use. The default is 0, which means all cores are used. | ||
| */ | ||
| #ifndef LF_NUMBER_OF_CORES | ||
| #define LF_NUMBER_OF_CORES 0 | ||
| #endif | ||
|
|
||
| /** | ||
| * @brief The constant used to set the priority of the sleeping main thread | ||
| * (the second highest priority). | ||
| */ | ||
| #ifndef LF_SLEEP_PRIORITY | ||
| #define LF_SLEEP_PRIORITY -1 | ||
| #endif | ||
|
|
||
| /** | ||
| * @brief The constant used to set the priority of the TCP thread | ||
| * (the highest priority). | ||
| */ | ||
| #ifndef LF_TCP_THREAD_PRIORITY | ||
| #define LF_TCP_THREAD_PRIORITY -2 | ||
| #endif | ||
|
|
||
| #if defined(PLATFORM_POSIX) | ||
| #include "reactor-uc/platform/posix/posix.h" | ||
| #elif defined(PLATFORM_RIOT) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,7 @@ typedef struct { | |
| } MutexPosix; | ||
|
|
||
| typedef struct { | ||
| Platform super; | ||
| ThreadedPlatform super; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would require that all POSIX platforms are RT. I don't think this is wise, for example this won't work on MacOS. I would say we create posix and a posixRT platform. |
||
| pthread_cond_t cond; | ||
| MutexPosix mutex; | ||
| bool new_async_event; | ||
|
|
@@ -20,4 +20,7 @@ typedef struct { | |
| #define PLATFORM_T PlatformPosix | ||
| #define MUTEX_T MutexPosix | ||
|
|
||
| lf_ret_t PlatformPosix_set_thread_priority(interval_t rel_deadline); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You dont need to declare the functions here, because the functions are only called through the function pointers stored in the objects. (Declaration is not needed) |
||
| lf_ret_t PlatformPosix_set_scheduling_policy(); | ||
|
|
||
| #endif | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,4 +43,9 @@ void DynamicScheduler_ctor(DynamicScheduler* self, Environment* env, EventQueue* | |
| EventQueue* system_event_queue, ReactionQueue* reaction_queue, interval_t duration, | ||
| bool keep_alive); | ||
|
|
||
| bool Scheduler_check_and_handle_stp_violations(DynamicScheduler *self, Reaction *reaction); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here you can remove this. |
||
| bool Scheduler_check_and_handle_deadline_violations(DynamicScheduler *self, Reaction *reaction); | ||
| void Scheduler_pop_system_events_and_handle(Scheduler *untyped_self, tag_t next_tag); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the coding convention in reactor-uc? Do function names have leading capital letters? How about struct definitions? Whatever it is, be sure to be uniform.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes the convention is |
||
| void Scheduler_pop_events_and_prepare(Scheduler *untyped_self, tag_t next_tag); | ||
|
|
||
| #endif // SCHEDULER_H | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| // | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove |
||
| // Created by francesco on 10/01/25. | ||
| // | ||
|
|
||
| #ifndef PRIORITY_SCHEDULER_H | ||
| #define PRIORITY_SCHEDULER_H | ||
|
|
||
| #include "reactor-uc/schedulers/dynamic/scheduler.h" | ||
|
|
||
| typedef struct PriorityScheduler PriorityScheduler; | ||
|
|
||
| struct PriorityScheduler { | ||
| DynamicScheduler super; | ||
| }; | ||
|
|
||
| void PriorityScheduler_ctor(PriorityScheduler *self, Environment *env, EventQueue *event_queue, | ||
| EventQueue *system_event_queue, ReactionQueue *reaction_queue, interval_t duration, | ||
| bool keep_alive); | ||
|
|
||
| #endif // PRIORITY_SCHEDULER_H | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1387,6 +1387,46 @@ public static Long getDelay(Expression delay) { | |
| return ret; | ||
| } | ||
|
|
||
| /** | ||
| * Given a parameter reference and a chain of instantiations, resolve the parameter to its time | ||
| * value (in nanoseconds). This method handles nested parameter references by recursively | ||
| * resolving them through the instantiation chain. | ||
| * | ||
| * <p>For example, if reactor B has parameter {@code d2} and instantiates reactor A with {@code a | ||
| * = new A(d = d2)}, then when resolving parameter {@code d} of A with instantiation {@code a}, | ||
| * this method will resolve {@code d2} from B's context. | ||
| * | ||
| * <p>The instantiations list should represent the chain from the innermost (the instantiation | ||
| * containing the parameter reference) to the outermost. If the list is null or empty, the | ||
| * parameter's default value is used. | ||
| * | ||
| * @param paramRef The parameter reference to resolve. | ||
| * @param instantiations The chain of instantiations, or null/empty to use default value. | ||
| * @return The time value in nanoseconds, or null if it cannot be determined. | ||
| * @throws IllegalArgumentException If an instantiation provided is not an instantiation of the | ||
| * reactor class that is parameterized by the respective parameter or if the chain of | ||
| * instantiations is not nested. | ||
| */ | ||
| public static Long getTimeValueFromParameterReference( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This for passing time values as arguments into reactors? |
||
| ParameterReference paramRef, List<Instantiation> instantiations) { | ||
| if (paramRef == null) { | ||
| return null; | ||
| } | ||
| Parameter parameter = paramRef.getParameter(); | ||
| if (!isOfTimeType(parameter)) { | ||
| return null; | ||
| } | ||
| try { | ||
| Expression resolvedExpr = initialValue(parameter, instantiations); | ||
| TimeValue tv = getLiteralTimeValue(resolvedExpr); | ||
| return tv == null ? null : tv.toNanoSeconds(); | ||
| } catch (IllegalArgumentException e) { | ||
| // If we can't resolve through instantiations, fall back to default value | ||
| TimeValue tv = getDefaultAsTimeValue(parameter); | ||
| return tv == null ? null : tv.toNanoSeconds(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Given the width specification of port or instantiation and an (optional) list of nested | ||
| * instantiations, return the width if it can be determined and -1 if not. It will not be able to | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -585,7 +585,9 @@ public void initialize(TargetConfig config) { | |
| LoggingProperty.INSTANCE, | ||
| CmakeIncludeProperty.INSTANCE, | ||
| ClockSyncModeProperty.INSTANCE, | ||
| FilesProperty.INSTANCE); | ||
| FilesProperty.INSTANCE, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am against adding target properties => annotations on federate instantiations. |
||
| ThreadPolicyProperty.INSTANCE, | ||
| CoresProperty.INSTANCE); | ||
|
|
||
| // case CPP -> | ||
| // config.register( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| package org.lflang.target.property; | ||
|
|
||
| import org.lflang.MessageReporter; | ||
| import org.lflang.ast.ASTUtils; | ||
| import org.lflang.lf.Element; | ||
| import org.lflang.target.property.type.PrimitiveType; | ||
|
|
||
| /** The number of cores to utilize. The default is to use all available cores. */ | ||
| public final class CoresProperty extends TargetProperty<Integer, PrimitiveType> { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean with number of cores to utilize? |
||
|
|
||
| /** Singleton target property instance. */ | ||
| public static final CoresProperty INSTANCE = new CoresProperty(); | ||
|
|
||
| private CoresProperty() { | ||
| super(PrimitiveType.NON_NEGATIVE_INTEGER); | ||
| } | ||
|
|
||
| @Override | ||
| public Integer initialValue() { | ||
| return 0; | ||
| } | ||
|
|
||
| @Override | ||
| protected Integer fromString(String string, MessageReporter reporter) { | ||
| try { | ||
| return Integer.parseInt(string); | ||
| } catch (NumberFormatException e) { | ||
| reporter.nowhere().error("Invalid number of cores: " + string); | ||
| return 0; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected Integer fromAst(Element node, MessageReporter reporter) { | ||
| return ASTUtils.toInteger(node); | ||
| } | ||
|
|
||
| @Override | ||
| public Element toAstElement(Integer value) { | ||
| return ASTUtils.toElement(value); | ||
| } | ||
|
|
||
| @Override | ||
| public String name() { | ||
| return "cores"; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| package org.lflang.target.property; | ||
|
|
||
| import org.lflang.MessageReporter; | ||
| import org.lflang.ast.ASTUtils; | ||
| import org.lflang.lf.Element; | ||
| import org.lflang.target.property.type.ThreadPolicyType; | ||
| import org.lflang.target.property.type.ThreadPolicyType.ThreadPolicy; | ||
|
|
||
| /** | ||
| * Directive to specify the target thread policy type. | ||
| */ | ||
| public final class ThreadPolicyProperty extends TargetProperty<ThreadPolicy, ThreadPolicyType> { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the target property using the |
||
|
|
||
| /** Singleton target property instance. */ | ||
| public static final ThreadPolicyProperty INSTANCE = new ThreadPolicyProperty(); | ||
|
|
||
| private ThreadPolicyProperty() { | ||
| super(new ThreadPolicyType()); | ||
| } | ||
|
|
||
| @Override | ||
| public Element toAstElement(ThreadPolicy value) { | ||
| return ASTUtils.toElement(value.toString()); | ||
| } | ||
|
|
||
| @Override | ||
| public ThreadPolicy initialValue() { | ||
| return ThreadPolicy.LF_SCHED_FAIR; | ||
| } | ||
|
|
||
| @Override | ||
| public ThreadPolicy fromAst(Element node, MessageReporter reporter) { | ||
| return fromString(ASTUtils.elementToSingleString(node), reporter); | ||
| } | ||
|
|
||
| @Override | ||
| protected ThreadPolicy fromString(String string, MessageReporter reporter) { | ||
| ThreadPolicy result = this.type.forName(string); | ||
| if (result == null) { | ||
| reporter.nowhere().error( | ||
| "Invalid thread policy: '" + string + "'. " | ||
| + "Allowed values are: " + this.type.optionsString()); | ||
| return initialValue(); | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| @Override | ||
| public String name() { | ||
| return "thread-policy"; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| package org.lflang.target.property.type; | ||
|
|
||
| import org.lflang.target.property.type.ThreadPolicyType.ThreadPolicy; | ||
|
|
||
| /** Enumeration of supported thread policies */ | ||
| public class ThreadPolicyType extends OptionsType<ThreadPolicy> { | ||
|
|
||
| @Override | ||
| protected Class<ThreadPolicy> enumClass() { | ||
| return ThreadPolicy.class; | ||
| } | ||
|
|
||
| /** | ||
| * Enumeration of thread policies to be used to parameterize the thread scheduler. | ||
| * | ||
| * @author Shaokai Lin | ||
| * @author Marten Lohstroh | ||
| */ | ||
| public enum ThreadPolicy { | ||
| LF_SCHED_FAIR("normal", "LF_SCHED_FAIR"), | ||
| LF_SCHED_TIMESLICE("rt-rr", "LF_SCHED_TIMESLICE"), | ||
| LF_SCHED_PRIORITY("rt-fifo", "LF_SCHED_PRIORITY"); | ||
|
|
||
| /** Alias used in toString method. */ | ||
| private final String alias; | ||
|
|
||
| public final String define; | ||
|
|
||
| /** Private constructor for Cmake build types. */ | ||
| ThreadPolicy(String alias, String define) { | ||
| this.define = define; | ||
| this.alias = alias; | ||
| } | ||
|
|
||
| /** Return the alias. */ | ||
| @Override | ||
| public String toString() { | ||
| return this.alias; | ||
| } | ||
|
|
||
| public static ThreadPolicy getDefault() { | ||
| return ThreadPolicy.LF_SCHED_FAIR; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the max_wait here? Because the
max_waitis normally stored inside FederatedInputConnections and can be configured per (Bundle/Connection)