Skip to content

Latest commit

 

History

History
102 lines (67 loc) · 2.3 KB

index.adoc

File metadata and controls

102 lines (67 loc) · 2.3 KB

Procedures

The Streams project comes out with a list of procedures.

General configuration

You can enable/disable the procedures by changing this variable inside the neo4j.conf

neo4j.conf
streams.procedures.enable=<true/false, default=true>

streams.publish

This procedure allows custom message streaming from Neo4j to the configured environment by using the underlying configured Producer.

Uses:

CALL streams.publish('my-topic', 'Hello World from Neo4j!')

The message retrieved from the Consumer is the following:

{"payload":"Hello world from Neo4j!"}

If you use a local docker (compose) setup, you can check for these messages with:

docker exec -it kafka kafka-console-consumer --topic my-topic --bootstrap-server kafka:9092

Input Parameters:

Variable Name Type Description

topic

String

The topic where you want to publish the data

payload

Object

The data that you want to stream

You can send any kind of data in the payload, nodes, relationships, paths, lists, maps, scalar values and nested versions thereof.

In case of nodes or relationships if the topic is defined in the patterns provided by the configuration their properties will be filtered in according with the configuration.

streams.consume

This procedure allows to consume messages from a given topic.

Uses:

CALL streams.consume('my-topic', {<config>}) YIELD event RETURN event

Example: Imagine you have a producer that publish events like this {"name": "Andrea", "surname": "Santurbano"}, we can create user nodes in this way:

CALL streams.consume('my-topic', {<config>}) YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})

Input Parameters:

Variable Name Type Description

topic

String

The topic where you want to publish the data

config

Object

The configuration parameters

Available configuration parameters

Variable Name Type Description

timeout

Long

It’s the value passed to Kafka Consumer#poll method (milliseconds). Default 1000

from

String

It’s the Kafka configuration parameter auto.offset.reset