EVE's control plane is composed of several independent processes: zedmanager, zedagent, downloader, etc. These services often have two
requirements:
- State storage: store state, normally as go objects, in memory, and possibly persisted across service restart or eve-os reboot.
- Communications: be notified that some of the state of other services has changed.
For example, if the config of eve-os has changed, multiple services may need to know about that change. The volumemgr may need to know about additions, removals or changes of volumes; downloader may need to know to create or download volumes; networkmgr may need to know to change the state of various networks or interfaces.
While each process could simply send a local RPC call of some kind or another, that would tie up many threads for a long time in a synchronous backlog, as well as eliminating much flexibility. In addition, it would require each process to know all of its downstream clients.
EVE uses a custom library called pubsub - for "publish subscribe" - to solve all of these problems:
- communicate changes in current or desired state with other processes
- keep track of current state
- persist state to survive the loss of a process
Note that "persistence" here means surviving the loss of a process, not a node. The desired state of a node is handled via the node configuration received from the controller.
PubSub is a library that implements a simple in-memory key-value store, with notifications for changes.
Each process that wants to store state, and possibly share it with other processes, includes the library. It then creates a publishing "table", which is simply a named space for records to be stored. It then "publishes" updates to the table using the library.
A publishing process can create as many tables as it wants. Each table is uniquely identified using the following:
AgentName: The name of the publishing process. This makes it possible for other processes to know whose publications to follow.AgentScope: (optional) A unique string that lets the publisher further create scope around the table.TopicType: The type of object that is published in this table.
Once the table has been created, the publishing process can Publish() as many records as it wants, using a unique string key. It then
can get the records using Get() by unique key, GetAll() to get all records, Iterate() to iterate over the keys,
or Unpublish() to remove a record by key.
From the publishing process's perspective, this is nothing more than:
- a well-scoped
- in-memory
- key-value database
- with optional persistence
- that other processes can read
- or subscribe to change notifications
Each process that wants to consume the shared state also includes the library. It then "subscribes" to the desired table, identifying it by:
AgentName: The name of the publishing process.AgentScope: (optional) The unique scope of the table within the publishing process's tables.TopicType: The type of object expected to be received from the table.
It then can get the data from the table in the same ways as the publisher, using Get() by unique key, GetAll() to get all records,
Iterate() to iterate over the keys.
Additionally, it can subscribe to changes by calling MsgChan(), which will deliver a message on the channel for each changed object.
There are no ACLs or other security controls; any process can subscribe to any publisher's tables.
When the publisher saves updates - creating a new record, changing an existing record, or deleting a record - by making the single call to
Publish() or Unpublish() - that update is:
- saved to the in-memory version of the table in the publisher's process
- persisted, allowing a replay if needed
- replicated to all subscribers
Each subscriber's library:
- receives the updates
- updates the replicated copy of the state in its own in-memory version of the table, synchronizing it with the publisher's version
- triggers any registered handlers on that table
Thus, with a single call to "save updates" on one process (publisher), one or more other processes (subscribers) automatically receive updates, synchronize their in-memory copy, and trigger event handlers.
The following diagram describes the structure
TODO: diagram here
pubsub is composed of several layers and components.
- PubSub: a high level structure, initialized with a Driver, that is used as a factory to create Publication and Subscription.
- Publication: an interface for an implementation, returned by the
PubSubfactory, that enables a process to "publish", or store and announce, state information. - Subscription: an interface for an implementation, returned by the
PubSubfactory, that enables a process to "subscribe", or receive the published state of and all updates to such state, from another process. - Driver: an interface for a specific implementation, passed to the
PubSubfactory, that is capable of handling persistence of data, and notification of updates to subscribers.
You should have one PubSub per eve-os process, and they all should use the same Driver, if they are to communicate with each other.
When a process wants to store information, it does the following:
- Use the
PubSubfactory to create a newPublication, passing it theAgentName,AgentScope,TopicType, and if the data should be persisted. - Use the returned Publication to save (
Publish), delete (Unpublish), and retrieve objects based on keys.
Upon publishing changes, the Publication is responsible for:
- Validating that the topic and types fit the
Publication - Saving the data in memory
- Marshaling the go object into json bytes.
- Updating the driver with the key and updated bytes
Note that there is nothing about notifications, subscriptions, or persistence. The Publication is solely responsible for
validation, storing in memory, marshaling to json, and updating the Driver with the json bytes.
It is the Driver that handles notification and persistence.
When the Driver receives the update, it:
- If the table is marked as persistent, store the data using whatever storage is appropriate for the driver.
- Notifies any subscribers of changes, using whatever notification mechanism is appropriate for the driver.
All persistence and notification - the publish-subscribe part of pubsub - happens entirely within the driver.
The driver could be memory, network communication, socket communications, polled files, anything at all. The
Publication does not care.
The publishing process can retrieve data using Get(), GetAll() and Iterate(). These work entirely on the local
in-memory copy of the Publisher.
When a process wants to retrieve information stored by a publishing process, it does the following:
- Use the
PubSubfactory to create a new Subscription, passing it theAgentName,AgentScope,TopicType, as well as whether the table ispersistent. In addition, pass it handlers that should be called for modifications of data, such as creating a new entry, updating an entry, or deleting an entry. - Activate the returned Subscription.
- Use the returned Subscription to retrieve data:
- get all objects
- get specific objects based on keys
- asynchronously invoke handlers that were registered
The Subscription is responsible for:
- Being informed of updates from the driver.
- Unmarshaling the raw json bytes into the correct object type.
- Saving the data to its own in-memory copy of the table.
- Calling appropriate handlers for state changes.
Note that there is nothing about notifications, publications, or persistence. The Subscription is solely responsible for
receiving updates from the Driver, validation, storing in memory, and calling handlers.
The Subscription also knows if the specific table is "persistent". It does not engage with the persistence directly, as that is the responsibility
of the driver. It uses it primarily for initial loading.
When the Subscription is activated:
- Start the
DriverSubscriber, which will cause it to get information from theDriverPublisher - If the table is "persistent", populate
its table. It does so by calling
DriverSubscriber.Load(), which loads the entire set of data from persistence. The specific implementation ofLoad()is the driver responsibility.
The Driver handles persistence and notification. It has the following key structures:
- DriverPublisher: an interface that all
Drivermust implement. It provides the methods that will be called by thepubsub.Publicationto persist data and notify of state changes. - DriverSubscriber: an interface that all
Drivermust implement. It provides the methods that will be called by thepubsub.Subscriptionto be notified of state changes.
The DriverPublisher is the part of the Driver that is responsible for handling publisher events, specifically
persisting what should be persisted and notifying subscribers.
When creating a pubsub.Publication instance, the pubsub.Publication creates a DriverPublisher.
The DriverPublisher is expected to retrieve any persisted state for the calling pubsub.Publication, store any future updates from the calling
process pubsub.Publication, and to inform any subscribers of the current state and any changes.
The DriverSubscriber is the part of the Driver that is responsible for subscribing to events for a specific table.
It maintains a copy in memory of the publisher's table, listens for updates, and then updates its local-copy table. Finally, it calls any handlers registered for changes.
When a pubsub.Subscription wants to receive the state of, and notifications for changes to, another process's table,
it creates a DriverSubscriber, passing it sufficient information to identify the table. It also passes it
a channel for Change.
Each change in the table is expected to be updated via an update in the channel.
The DriverSubscriber is expected to retrieve the current state of the table, and update the calling
pubsub.Subscription of all changes by sending updates on a channel passed during the DriverSubscriber
initialization.
eve-os currently has one primary driver socketdriver. In addition, an emptydriver provides a zero-functionality implementation, which is useful for working with services that require a pubsub but will not be exercising it at all.
Additional implementations may exist in testing, e.g. in-memory drivers.
The primary Driver implemented in eve-os is the socketdriver, which uses:
- data storage via files, one directory per table, one file per key-value entry; it does not use in-memory storage.
- notification via Unix-domain sockets, one socket per table.
Note that in eve, /var/run and /run are the same. For simplicity's sake,
we use /run exclusively here.
The socketdriver stores all data for one table in a specific directory. It does not use in-memory storage. This file mechanism is used both for persistent and for non-persistent data. For persistent data, it stores files in a directory that will be persisted beyond reboot, whereas for non-persistent data, it stores files in an ephemeral directory that will be deleted on reboot.
The root directory upon which socketdriver operates is passed to the socketdriver
instance upon creation. In the case of normal eve-os operation, that defaults to /, but it can be changed upon initialization.
All other directories are subsidiary to the root directory.
The specific subdirectory used inside the root directory is determined based on the options here.
The directory to use and the type of file depends on 2 options:
persistent: whether or not this table should persist, and therefore if files should be in a persisted-beyond-reboot directory or not.publishToDir: whether or not this table should publish its data to a directory.
It also includes the "name" of the publication, where <name> is calculated:
- global table:
global agentScope == "":<agentName>/<topic>- otherwise:
<agentName>/<agentScope>/<topic>
The above combine to create the location:
| persistent? | publishToDir? | directory |
|---|---|---|
| Y | Y | /persist/config/<name> |
| Y | N | /persist/status/<name> |
| N | Y | /run/global/<name> |
| N | N | /run/<name> |
Examples:
| persistent | publish | agentScope | agentName | topic | directory |
|---|---|---|---|---|---|
| Y | Y | tester | configmgr | inputs | /persist/config/tester/configmgr/inputs/ |
| Y | N | tester | configmgr | inputs | /persist/tester/configmgr/inputs/ |
| Y | N | configmgr | inputs | /persist/configmgr/inputs/ |
|
| N | N | inputs | /run/global/inputs |
The specific implementations of the DriverPublisher interface and DriverSubscriber interface, for socketdriver, are, respectively,
socketdriver.Publisher and socketdriver.Subscriber.
When socketdriver.Publisher receives updates from its calling pubsub.Publication, it saves the data in a file, whose name is determined
as <dirName>/<key>.json. Key must not contain slashes and should fit max filesize limit (not exceed 255 symbols).
For example, if the <dirName> from above was /persist/tester/configmgr/inputs/, and the Publish() used the key important, then
the filename is /persist/tester/configmgr/inputs/important.json.
This is completely independent of whether or not it is persistent. socketdriver always writes its information to files.
Where those files are, i.e. which directory, is determined by whether or not the table is persistent.
- persistent: write to a directory that persists past reboots.
- not persistent: write to an ephemeral directory.
As described above, socketdriver.Publisher always writes to a file, whose name is <dirName>/<key>.json, whether the <dirName> is determined by the
algorithm above. persistent determines where that directory will be placed.
When socketdriver.Publisher receives a Publish() call, it determines the file name
and then saves the raw data in the file.
When socketdriver.Publisher receives an Unpublish() call, it determines the file name
and then removes the file.
socketdriver uses Unix-domain sockets and a server on that connection to publish notifications.
socketdriver.Publishercreates the socket file, listens on the socket, and handles requests.socketdriver.Subscriberconnects to the socket file and makes a connection request.
socketdriver.Publisher has one listener on the socket file; for each connection, representing each socketdriver.Publisher,
it starts a new goroutine
to handle the requests. Each subscriber has a long-running connection, over the socket, to the publisher, with a dedicated long-running goroutine.
The socketdriver.Subscriber sends requests to the publisher, primarily to do one of:
- send the entire data set
- send updates
Note that this is completely distinct from the part of the process that receives updates, i.e. socketdriver.Publisher.Update().
Writing of persistent files, and notifying of updates, happens in two different paths.
- persisting, including the actual data:
DriverPublisher.Update() - notifications of changed data: set of Updaters passed to
Driveron initialization ofDriverPublisher.
The updates do not contain the raw data; rather they are informed of changes.
The actual name for the socket file is:
/run/<name>.sockThis uses the same <name> algorithm as above.
This socket is unique to this table; each table has its own socket.
Upon receiving a request to subscribe to a table, socketdriver.Subscriber determines the filename for the Unix domain socket using the same
algorithm as socketdriver.Publisher.
When the pubsub.Subscription calls Start() on socketdriver.Subscriber, it:
- Opens a connection to the socket.
- Gets a download of the entire current state of the table, which it returns to
pubsub.Subscription. - Waits for any further updates, which it sends to the channel of Change