Skip to content

Commit c1dbc59

Browse files
authored
Network Flush Reactors embedded in FederatedOutputConnection (#330)
* reactor-uc: reworking init * initial commit of rework * small fix * small fix * add commets * runtime path * removing test-case
1 parent abef927 commit c1dbc59

19 files changed

Lines changed: 133 additions & 93 deletions

File tree

examples/posix/federated/receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ typedef struct {
7878
Reactor super;
7979
LF_CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
8080
LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
81-
LF_FEDERATE_BOOKKEEPING_INSTANCES(1);
81+
LF_FEDERATE_BOOKKEEPING_INSTANCES(1, 0);
8282
LF_CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
8383
LF_DEFINE_STARTUP_COORDINATOR(Federate);
8484
LF_DEFINE_CLOCK_SYNC(Federate);

examples/posix/federated/sender.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ int serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned c
1717
const msg_t *msg = user_struct;
1818

1919
memcpy(msg_buf, &msg->size, sizeof(msg->size));
20-
memcpy(msg_buf + sizeof(msg->size), msg->msg, msg->size);
20+
memcpy(msg_buf + sizeof(msg->size), msg->msg, user_struct_size);
2121

2222
return sizeof(msg->size) + msg->size;
2323
}
@@ -90,7 +90,7 @@ typedef struct {
9090
Reactor super;
9191
LF_CHILD_REACTOR_INSTANCE(Sender, sender, 1);
9292
LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver);
93-
LF_FEDERATE_BOOKKEEPING_INSTANCES(1);
93+
LF_FEDERATE_BOOKKEEPING_INSTANCES(1, 1);
9494
LF_CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 1);
9595
LF_CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0);
9696
LF_CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0);

examples/riot/coap_federated/receiver/main.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ typedef struct {
7878
Reactor super;
7979
LF_CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
8080
LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
81-
LF_FEDERATE_BOOKKEEPING_INSTANCES(1);
81+
LF_FEDERATE_BOOKKEEPING_INSTANCES(1, 0);
8282
LF_CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
8383
LF_DEFINE_STARTUP_COORDINATOR(Federate);
8484
LF_DEFINE_CLOCK_SYNC(Federate);

examples/riot/coap_federated/sender/main.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ typedef struct {
8585
Reactor super;
8686
LF_CHILD_REACTOR_INSTANCE(Sender, sender, 1);
8787
LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver);
88-
LF_FEDERATE_BOOKKEEPING_INSTANCES(1);
88+
LF_FEDERATE_BOOKKEEPING_INSTANCES(1, 1);
8989
LF_CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 1);
9090
LF_CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0);
9191
LF_CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0);

examples/zephyr/basic_federated/common/receiver.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ typedef struct {
9191
Reactor super;
9292
LF_CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
9393
LF_FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
94-
LF_FEDERATE_BOOKKEEPING_INSTANCES(1);
94+
LF_FEDERATE_BOOKKEEPING_INSTANCES(1, 0);
9595
LF_CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
9696
LF_DEFINE_STARTUP_COORDINATOR(Federate);
9797
LF_DEFINE_CLOCK_SYNC(Federate);

examples/zephyr/basic_federated/federated_sender/src/sender.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ typedef struct {
148148
LF_CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 2);
149149
LF_CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0);
150150
LF_CHILD_OUTPUT_OBSERVERS(sender, out,1, 1, 0);
151-
LF_FEDERATE_BOOKKEEPING_INSTANCES(2);
151+
LF_FEDERATE_BOOKKEEPING_INSTANCES(2, 2);
152152
LF_DEFINE_STARTUP_COORDINATOR(Federate);
153153
LF_DEFINE_CLOCK_SYNC(Federate);
154154
} MainSender;

include/reactor-uc/federated.h

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
typedef struct FederatedConnectionBundle FederatedConnectionBundle;
99
typedef struct FederatedOutputConnection FederatedOutputConnection;
1010
typedef struct FederatedInputConnection FederatedInputConnection;
11+
typedef struct FederatedFlushReactor FederatedFlushReactor;
1112
typedef struct NetworkChannel NetworkChannel;
1213

1314
/**
@@ -63,6 +64,23 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle* self, Reactor* pa
6364
FederatedInputConnection** inputs, deserialize_hook* deserialize_hooks,
6465
size_t inputs_size, FederatedOutputConnection** outputs,
6566
serialize_hook* serialize_hooks, size_t outputs_size, size_t index);
67+
/**
68+
* @brief This reactor is part of the FederatedOutputConnection and has the purpose of flushing and sending
69+
* the value transmitted by the last downstream port/reaction.
70+
*/
71+
struct FederatedFlushReactor {
72+
Reactor super;
73+
Port input_port;
74+
Reaction flush_reaction;
75+
Trigger* flush_triggers;
76+
Reaction* reaction_array;
77+
};
78+
79+
/**
80+
* @brief Instantiates the FederatedFlushReactor object by initializing the reaction, input port, and reactor.
81+
*/
82+
void FederatedFlushReactor_ctor(FederatedFlushReactor* self, Reactor* parent, void* payload_buf, size_t payload_size,
83+
FederatedOutputConnection* connection);
6684

6785
/**
6886
* @brief A single output connection from this federate to another federate.
@@ -74,16 +92,14 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle* self, Reactor* pa
7492
struct FederatedOutputConnection {
7593
Connection super; // Inherits from Connection, it wastes some memory but makes for a nicer architecture.
7694
FederatedConnectionBundle* bundle; // A pointer to the super it is within
77-
EventPayloadPool payload_pool; // Output buffer
78-
void* staged_payload_ptr;
7995
int conn_id;
96+
FederatedFlushReactor flush_reactor;
8097
};
8198

8299
void FederatedConnectionBundle_validate(FederatedConnectionBundle* bundle);
83100

84101
void FederatedOutputConnection_ctor(FederatedOutputConnection* self, Reactor* parent, FederatedConnectionBundle* bundle,
85-
int conn_id, void* payload_buf, bool* payload_used_buf, size_t payload_size,
86-
size_t payload_buf_capacity);
102+
int conn_id, void* payload_buf, size_t payload_size);
87103

88104
/**
89105
* @brief A single input connection coming from another federate.

include/reactor-uc/macros_internal.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@
118118
#define LF_FEDERATE_CONNECTION_BUNDLE_INSTANCE(ReactorName, OtherName) \
119119
ReactorName##_##OtherName##_Bundle ReactorName##_##OtherName_bundle
120120

121-
#define LF_FEDERATE_BOOKKEEPING_INSTANCES(NumBundles) \
122-
LF_REACTOR_BOOKKEEPING_INSTANCES(0, 0, 1); \
121+
#define LF_FEDERATE_BOOKKEEPING_INSTANCES(NumBundles, NumOutputs) \
122+
LF_REACTOR_BOOKKEEPING_INSTANCES(0, 0, 1 + NumOutputs); \
123123
FederatedConnectionBundle* _bundles[NumBundles];
124124

125125
#define LF_DEFINE_OUTPUT_STRUCT(ReactorName, PortName, SourceSize, BufferType) \
@@ -472,21 +472,19 @@ typedef struct FederatedOutputConnection FederatedOutputConnection;
472472
typedef struct { \
473473
FederatedOutputConnection super; \
474474
BufferType payload_buf[1]; \
475-
bool payload_used_buf[1]; \
476475
} ReactorName##_##OutputName##_conn;
477476

478477
#define LF_DEFINE_FEDERATED_OUTPUT_CONNECTION_STRUCT_ARRAY(ReactorName, OutputName, BufferType, ArrayLength) \
479478
typedef struct { \
480479
FederatedOutputConnection super; \
481480
BufferType payload_buf[1][(ArrayLength)]; \
482-
bool payload_used_buf[1]; \
483481
} ReactorName##_##OutputName##_conn;
484482

485483
#define LF_DEFINE_FEDERATED_OUTPUT_CONNECTION_CTOR(ReactorName, OutputName, BufferType, DestinationConnId) \
486484
void ReactorName##_##OutputName##_conn_ctor(ReactorName##_##OutputName##_conn* self, Reactor* parent, \
487485
FederatedConnectionBundle* bundle) { \
488486
FederatedOutputConnection_ctor(&self->super, parent, bundle, DestinationConnId, (void*)&self->payload_buf, \
489-
(bool*)&self->payload_used_buf, sizeof(self->payload_buf[0]), 1); \
487+
sizeof(self->payload_buf[0])); \
490488
}
491489

492490
#define LF_FEDERATED_OUTPUT_CONNECTION_INSTANCE(ReactorName, OutputName) ReactorName##_##OutputName##_conn OutputName
@@ -516,7 +514,11 @@ typedef struct FederatedOutputConnection FederatedOutputConnection;
516514

517515
#define LF_INITIALIZE_FEDERATED_CONNECTION_BUNDLE(ReactorName, OtherName) \
518516
ReactorName##_##OtherName##_Bundle_ctor(&self->ReactorName##_##OtherName##_bundle, &self->super, _bundle_idx); \
519-
self->_bundles[_bundle_idx++] = &self->ReactorName##_##OtherName##_bundle.super;
517+
self->_bundles[_bundle_idx] = &self->ReactorName##_##OtherName##_bundle.super; \
518+
for (int j = 0; j < self->_bundles[_bundle_idx]->outputs_size; j++) { \
519+
self->_children[_child_idx++] = &self->_bundles[_bundle_idx]->outputs[j]->flush_reactor.super; \
520+
} \
521+
_bundle_idx++;
520522

521523
#define LF_INITIALIZE_FEDERATED_OUTPUT_CONNECTION(ReactorName, OutputName, SerializeFunc) \
522524
ReactorName##_##OutputName##_conn_ctor(&self->OutputName, self->super.parent, &self->super); \

include/reactor-uc/trigger.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@ typedef struct Trigger Trigger;
1414
* and output ports.
1515
*/
1616
typedef enum {
17-
TRIG_TIMER,
18-
TRIG_ACTION,
19-
TRIG_INPUT,
20-
TRIG_OUTPUT,
21-
TRIG_CONN,
22-
TRIG_CONN_DELAYED,
23-
TRIG_CONN_FEDERATED_INPUT,
24-
TRIG_CONN_FEDERATED_OUTPUT,
25-
TRIG_STARTUP,
26-
TRIG_SHUTDOWN
17+
TRIG_TIMER = 0,
18+
TRIG_ACTION = 1,
19+
TRIG_INPUT = 2,
20+
TRIG_OUTPUT = 3,
21+
TRIG_CONN = 4,
22+
TRIG_CONN_DELAYED = 5,
23+
TRIG_CONN_FEDERATED_INPUT = 6,
24+
TRIG_CONN_FEDERATED_OUTPUT = 7,
25+
TRIG_STARTUP = 8,
26+
TRIG_SHUTDOWN = 9
2727
} TriggerType;
2828

2929
/**

lfc/core/src/main/kotlin/org/lflang/generator/uc/UcConnectionGenerator.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,9 @@ class UcConnectionGenerator(
259259
allGroupedConnections.forEachIndexed { idx, el -> el.assignUid(idx) }
260260
}
261261

262+
fun getNumOutputs(federate: UcFederate) =
263+
federatedConnectionBundles.sumOf { it.numOutputs(federate) }
264+
262265
fun getNumFederatedConnectionBundles() = federatedConnectionBundles.size
263266

264267
fun getNumConnectionsFromPort(instantiation: Instantiation?, port: Port): Int {

0 commit comments

Comments
 (0)