capstomp is a MySQL user-defined function (udf) library for sending messages to message brokers like ActiveMQ Artemis or RabbitMQ using the STOMP protocol (v1.2). Module has very high performance based on pool of persistent tcp connections. This library is similar to lib_mysqludf_amqp which is used to publish messages via AMQP directly from MySQL.
To use with the RabbitMQ, the STOMP plugin is required.
- C++ compiler with C++17 support (tested with gcc and clang) and cmake build tool.
- MySQL or forks
- stomptalk STOMP protocol parser library
- stompconn simple STOMP connector
- libevent event notification library
- ActiveMQ Artemis with native STOMP protocol support
- [OR] RabbitMQ with STOMP plugin for protocol support
Publishes a json ['Hello, World!'] to the udf exchange on localhost:61613 with a routing key of test as the user guest with the password guest and vhost 123. Upon success, the message size is returned.
mysql> SELECT capstomp_json('stomp://guest:guest@localhost/123#/exchange/udf', 'test', json_array('Hello, World!'));
this example works with MySQL 8 json functions
You may use any json generator for MySQL. I use my own :)
mysql> SELECT capstomp_json('stomp://guest:guest@localhost:61613/123#/exchange/udf', 'test', jsarr('Hello, World!'));
Publish a string 'text' with custom headers
mysql> SELECT capstomp('stomp://guest:guest@localhost:61613/123#/exchange/udf', '', 'text', 'somekey1=1&some-key2=value');
Table event's
The following publishes JSON objects representing table rows whenever a row is inserted, updated, or deleted.
DROP TABLE IF EXISTS `country`
;
CREATE TABLE `country` (
`id` int NOT NULL,
`name` varchar(100) COLLATE utf8_bin DEFAULT NULL,
`inhabitants` int DEFAULT NULL,
`continent` varchar(20) COLLATE utf8_bin DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin
;
DELIMITER ;;
DROP TRIGGER IF EXISTS `country_AFTER_INSERT`;
CREATE DEFINER=`root`@`localhost` TRIGGER `country_AFTER_INSERT` AFTER INSERT ON `country` FOR EACH ROW BEGIN
SET @message_size = (
capstomp_json('stomp://stompdemo:123@localhost:61613/stompdemo#/queue/a1', '',
JSON_OBJECT('method', 'create', 'payload', JSON_OBJECT('id', NEW.`id`, 'name', NEW.`name`, 'inhabitants', NEW.`inhabitants`, 'continent', NEW.`continent`))));
END ;;
DROP TRIGGER IF EXISTS `country_AFTER_UPDATE`;
CREATE DEFINER=`root`@`localhost` TRIGGER `country_AFTER_UPDATE` AFTER UPDATE ON `country` FOR EACH ROW BEGIN
SET @message_size = (
capstomp_json('stomp://stompdemo:123@localhost:61613/stompdemo#/queue/a1', '',
JSON_OBJECT('method', 'modify', 'payload', JSON_OBJECT('id', NEW.`id`, 'name', NEW.`name`, 'inhabitants', NEW.`inhabitants`, 'continent', NEW.`continent`))));
END ;;
DROP TRIGGER IF EXISTS `country_AFTER_DELETE`;
CREATE DEFINER=`root`@`localhost` TRIGGER `country_AFTER_DELETE` AFTER DELETE ON `country` FOR EACH ROW BEGIN
SET @message_size = (
capstomp_json('stomp://stompdemo:123@localhost:61613/stompdemo#/queue/a1', '',
JSON_OBJECT('method', 'remove', 'payload', JSON_OBJECT('id', OLD.`id`))));
END ;;
DELIMITER ;
Sends a json-data to the given destination on the provided uri.
uri(string). "stomp://guest:guest@localhost/vhost#/stomp_destination/name" STOMP destination.routing-key(string). routing key for exchanges, or empty '' STOMP exchange destination.json-data(string). The body of the message (typically json but it may any string).stomp-header-pairs(query pairs like key1=val1&key2=val2 etc).CONCAT('my_timestamp=', round(unix_timestamp(now(4))*1000), '&some_id=', 42)will add to STOMP headersmy_timestamp=1599081164296andsome_id=42, but you have to be careful with it.
Upon succes, this function returns a number containing the size of sending data.
Same as capstomp but it add content-type=application/json header to each message.
Build with cmake and system libevent
git clone --recurse-submodules https://github.com/ikonopistsev/capstomp.git
$ cd capstomp
$ mkdir b && cd b
$ cmake -DCMAKE_BUILD_TYPE=Release ..
# ccmake .. (if needed)
$ make
...
$ cmake -DCMAKE_BUILD_TYPE=Release -DCAPSTOMP_STATIC_LIBEVENT=ON ..
...
Add -DCAPSTOMP_HAVE_MY_BOOL=ON if my_bool type is present in mysql.h
copy libcapstomp.so to mysql pugins directory (usually to /usr/lib/mysql/plugin or same) then import methods
CREATE FUNCTION capstomp RETURNS INTEGER SONAME 'libcapstomp.so';
CREATE FUNCTION capstomp_json RETURNS INTEGER SONAME 'libcapstomp.so';
CREATE FUNCTION capstomp_status RETURNS STRING SONAME 'libcapstomp.so';
CREATE FUNCTION capstomp_store_erase RETURNS INTEGER SONAME 'libcapstomp.so';
CREATE FUNCTION capstomp_store_clear RETURNS INTEGER SONAME 'libcapstomp.so';
CREATE FUNCTION capstomp_timeout RETURNS integer SONAME 'libcapstomp.so';
CREATE FUNCTION capstomp_max_pool_count RETURNS integer SONAME 'libcapstomp.so';
CREATE FUNCTION capstomp_max_pool_sockets RETURNS integer SONAME 'libcapstomp.so';
CREATE FUNCTION capstomp_pool_sockets RETURNS integer SONAME 'libcapstomp.so';
CREATE FUNCTION capstomp_verbose RETURNS integer SONAME 'libcapstomp.so';
Discription based on lib_mysqludf_amqp