SDP library is implementing IUDFAgent
for communicating with Kapacitor via Kapacitor's UDF RPC protocol. UDFs use
socket based approach
to communicate with Kapacitor, but UDFAgent
is ready to work with child
process based approach if needed.
We use uvw library, which is libuv wrapper, in order to provide asynchronous I/O. It also allows to write code in event-based approach.
Every UDF requires separate
RequestHandler
interface implementation -- it handles incoming RPC calls from Kapacitor and
sends responses back via IUDFAgent
.
As long as we are using Apache Arrow library for data processing we need to
store, handle data and convert it between Kapacitor's points and
Arrow's RecordBatch
es formats. This functionality is provided by
corresponding objects:
IPointsStorage
for storing;RecordBatchHandler
from the main part of the library for data handling;PointsConverter
for data converting.
Therefore, the most important part of every UDF is a
RecordBatchHandler
that is doing all useful work. See the
full list of currently implemented
RecordBatchHandler
s.
If you want to write your own UDF using SDP library you could follow the
basic steps. As example, you can refer to
streamAggregateUDF
's
and batchAggregateUDF
's
RequestHandeler
implementations (items 3, 4, 6) and to the
udf_agent_client_factory.h
file (item 7).
- Check out if your UDF is expressed through one of
RecordBatchHandler
s or their composition. If it's not then implement newRecordBatchHandler
that has minimal missing functionality for your UDF. - Decide which type does your UDF have: does it consume stream or batch
data? Choose appropriate base class for your UDF's
RequestHandler
: for wants-batch you should probably useBatchRecordBatchRequestHandlerBase
, for wants-stream one ofStreamRecordBatchRequestHandlerBase
andTimerRecordBatchRequestHandlerBase
should be suitable for you. - Decide which parameters your UDF should have in terms of Kapacitor RPC
Protocol. Implement
RequestHandler::info
method according to UDF's parameters and type. - Create instrument for parsing UDF's parameters from init RPC message.
- If you need additional pre-processing for converting Kapacitor's points to
Arrow's
RecordBatch
es, you can implement your ownPointsConverter
decorator. - Implement
RequestHandler::init
method. There you should:- use your parameters parser from item 3 to parse
agent::InitMessage
RPC message; - create
RecordBatchHandler
from item 1 according to your UDF's functionality; - create
PointsConverter
with possibly implemented decorator from item 5; - create
PointsStorage
injecting just createdRecordBatchHandler
andPointsConverter
and set it usingsetPointsStorage
protected method.
- use your parameters parser from item 3 to parse
- Implement
UnixSocketClientFactory
that will be creatingAgentClient
s withUDFAgent
and just implementedRequestHandler
in it. Use this factory to generate connections toUnixSocketServer
.