Following you’ll find a lightweight Docker Compose file that allows you to test the application in your local environment
Prerequisites:
-
Docker
-
Docker Compose
Here the instruction about how to configure Docker and Docker-Compose
Following a compose file that allows you to spin-up Neo4j, Kafka and Zookeeper in order to test the application.
Before starting please change the volume directory according to yours, inside the <plugins> dir you must put Streams jar
volumes:
- $HOME/neo4j/3.4/plugins:/plugins
From the same directory where the compose file is, you can launch this command:
$ docker-compose up -d
Please note that the Neo4j Docker image use a naming convention; you can override every neo4j.conf property by prefix it with NEO4J_
and using the following transformations:
-
single underscore is converted in double underscore:
_ → __
-
point is converted in single underscore:
.
→_
Example:
-
dbms.memory.heap.max_size=8G
→NEO4J_dbms_memory_heap_max__size: 8G
-
dbms.logs.debug.level=DEBUG
→NEO4J_dbms_logs_debug_level: DEBUG
In case you are testing the producer you can execute a consumer that subscribes the topic neo4j
by executing this command:
$ docker exec kafka kafka-console-consumer --bootstrap-server kafka:19092 --topic neo4j --from-beginning
Then directly from the Neo4j browser you can generate some random data with this query:
UNWIND range(1,100) as id
CREATE (p:Person {id:id, name: "Name " + id, age: id % 3}) WITH collect(p) as people
UNWIND people as p1
UNWIND range(1,10) as friend
WITH p1, people[(p1.id + friend) % size(people)] as p2
CREATE (p1)-[:KNOWS {years: abs(p2.id - p1.id)}]->(p2)
And if you go back to your consumer you’ll see something like this:
{"key":"neo4j","value":{"meta":{"timestamp":1542047038549,"username":"neo4j","txId":12,"txEventId":107,"txEventsCount":110,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"99","start":{"id":"9","labels":["Person"]},"end":{"id":"0","labels":["Person"]},"before":null,"after":{"properties":{"years":9}},"label":"KNOWS","type":"relationship"},"schema":{"properties":[],"constraints":null}}}
{"key":"neo4j","value":{"meta":{"timestamp":1542047038549,"username":"neo4j","txId":12,"txEventId":108,"txEventsCount":110,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"96","start":{"id":"9","labels":["Person"]},"end":{"id":"7","labels":["Person"]},"before":null,"after":{"properties":{"years":2}},"label":"KNOWS","type":"relationship"},"schema":{"properties":[],"constraints":null}}}
{"key":"neo4j","value":{"meta":{"timestamp":1542047038549,"username":"neo4j","txId":12,"txEventId":109,"txEventsCount":110,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"97","start":{"id":"9","labels":["Person"]},"end":{"id":"8","labels":["Person"]},"before":null,"after":{"properties":{"years":1}},"label":"KNOWS","type":"relationship"},"schema":{"properties":[],"constraints":null}}}
In case of you are using the Sink you can define your topic/cypher-query combination as it follows:
environment:
NEO4J_streams_sink_topic_neo4j:
"WITH event.value.payload AS payload, event.value.meta AS meta
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
SET n += payload.after.properties
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
MERGE (e:Question{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
MERGE (s:Question{neo_id: toInteger(payload.start.id)})
MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
MERGE (s:User{neo_id: toInteger(payload.start.id)})
MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
)
FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
MERGE (s:User{neo_id: toInteger(payload.start.id)})
MERGE (e:Question{neo_id: toInteger(payload.end.id)})
CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
)"
link:data/docker-compose.yml[role=include]