Adam Cowley
Posted on February 28, 2020
In this post, I will demonstrate how you can build a real-time application on top of Neo4j using a few open source plugins. Due to brevity, this will only be a basic application but you can take the principles and make it as complicated as you need.
For this example, I will also be using Vue.js on top of an express server communicating through websockets. If you prefer something like React then it will work just as well.
Also, I'm using neode - a nodejs based OGM that I wrote for Neo4j. Again, you could quite easily swap this out for another OGM or just run cypher queries against Neo4j using the official driver.
With the disclaimers out of the way, let's get going.
Overview
An idea that I've had at the back of my mind for a while is to build a collaborative Graph Database modelling tool. For this, real-time information will have to be passed through to all users that are logged in so a user doesn't overwrite what another user is doing. For this, I will need to lock and unlock nodes in real-time and pass the message to a user.
- When a user selects a Node in the UI, a
lock
event should be sent to the API via websockets. This status update should be distributed to the other users, and that node locked in their front end - As the user drags the Node, a
drag
event should be sent with the x/y coordinates of the node so this can be reflected in what each other user sees. - When the Node is dropped, the node should be unlocked for all users by a
lock
event
Creating a basic express API
Firstly, we need an API that we can send these messages. The API will be in charge of saving the updates to Neo4j and redistributing the message to all of the other clients. The first step is to create a new directory which will hold the code, initiate the project and install the required dependencies.
mkdir api && cd $_
npm init -y
npm i -S express neode kafka-node socket.io
- express - This is an unopinionated framework for building REST APIs
- neode - An OGM that I'll use to take care of the legwork when communicating with Neo4j.
- kafka-node - A library for interacting with Kafka. This package allows you to create Producers and Consumers in node.
- socket.io - A library that enables communication over websockets
For now, I'll create a server that socket.io and the kafka consumer will bind to.
api/src/index.js
const express = require('express')
const app = express()
app.get('/', (req, res) => res.send('Hello!'))
const server = app.listen(3000, () => console.log('Listening on http://localhost:3000'))
Running node index.js
and heading to http://localhost:3000, you should now see a page saying Hello!
.
Next step, setting up neode. For this, I will use the fromEnv
function included with neode. This makes life a little easier by allowing you to define the neo4j connection details in a .env
file. We can also use this for the kafka consumer later on.
api/.env
NEO4J_PROTOCOL=bolt
NEO4J_HOST=localhost
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=mysecretpassword
NEO4J_PORT=7687
To load these settings into process.env
, you call the .config()
function supplied by dotenv
.
// Load the .env variables into process.env
require('dotenv').config()
// Accessing the variable
console.log(process.env.NEO4J_HOST) // "localhost"
Data Persistence with Neode
Next, I'll need a neode model. At the risk of causing confusion, I'm going to call the nodes we save in the database Node
. I'll want a UUID to serve as a unique ID, a status and x and y coordinates.
api/models/Node.js
module.exports = {
id: {
type: 'uuid',
primary: true,
},
status: {
type: 'string',
valid: [ 'locked', 'unlocked', ], // I want two statuses
},
x: 'float', // These are a simple shorthand for {type: 'float'}
y: 'float',
}
Then, an instance of neode. For clarity, I will create this in a new file called neode.js
but this could quite easily just be written in index.js.
api/neode.js
const neode = require('neode')
// Load using the config in .env
.fromEnv()
// Use the models defined in api/models/*.js
.withDirectory(__dirname + '/models')
module.exports = neode
You can then use neode to create some data and place them at a random x,y coordinate.
const neode = require('./neode')
const ids = [
'61ff4876-e488-4a91-94a6-f8d6bdd35273',
'c98e1475-1421-41f9-9f2f-551ce1b1bf5a',
'642f2518-b1b8-4405-9dd8-bd19e6a0f487',
'be7d130c-a922-4e80-9a84-c0723a8d4710'
];
ids.map(id => neode.create('Node', { id, x: Math.random()*1000, y: Math.random()*1000 }))
Communicating with Socket.io
Next, for the communication between the front end and API. For this, I've gone for
socket.io because I've used this on a few projects in the past and it is relatively easy to set up with express. But in reality, any websockets implementation will do.
Again, in a new file, I'm creating a function that can be called to create a socket.io instance. I've done this because the socket needs to be bound to some sort of http server, and at some point will need to communicate with neo4j and the kafka consumer too.
api/io.jsconst createSocket = (server, neode, consumer) => {
const io = require('socket.io')(server)
// Messages will be sent and received here...
return io
}
module.exports = createSocket
Then the function can be imported and called inside index.js
.
const io = require('./io')(server, neode)
The socket.io API is essentially an event emitted and therefore has an extremely simple interface.
On connection, a connection
event is fired which provides a socket
object. This contains an
on(event, callback)
function that allows you to execute a callback function when an event is received.
io.on('connection', socket => {
console.log(`--> JOINED: ${socket.id}`) // --> JOINED: Am-spDXUXvNHhuLNAAAD
// Listen for events from that socket
socket.on('disconnect', () => {
console.log(`<-- LEFT : ${socket.id}`) // <-- LEFT : sbIpyb0yM64cYcX_AAAA
})
})
To send a message, you can either use socket.emit(message, payload)
to send a message to that socket only, or use io.emit(message, payload)
to send a message to all sockets. Socket.io also has the concept of Rooms and Namespaces, which could be useful in a multitenancy environment but that is beyond the scope of this article.
When a user joins for the first time, we might want to do some authentication but for berevity I will skip that step. When a user joins, I will use neode to query the database and send the current state of the graph to the UI in the form of a welcome
message.
io.on('connection', socket => {
console.log(`--> JOINED: ${socket.id}`) // --> JOINED: Am-spDXUXvNHhuLNAAAD
neode.all('Node')
.then(res => res.toJson())
.then(data => socket.emit('welcome', data))
// ...
})
In order to handle communications from the client, I'll need to listen for 2 events; lock
and unlock
. When the lock event is received with an id in the payload, I will need to use that to update the status of a node in neo4j. If an unlock event is received, I will need to set the unlocked status should be saved along with the new x and y coordinates.
For now, when a message is received I will simply send out the past tense version to all other connected sockets. Later on, these messages will be handled by Kafka instead.
// Handle Lock Event
socket.on('lock', data => {
neode.find('Node', data.id)
.then(node => node.update({
status: 'locked',
// So we have some context of who locked it, we can use the socket ID
by: socket.id,
}))
.then(node => node.toJson())
.then(json => io.emit('locked', json))
})
// Handle Unlock Event
socket.on('lock', data => {
neode.find('Node', data.id)
.then(node => node.update({
status: 'unlocked',
by: null,
x: data.x,
y: data.y,
}))
.then(node => node.toJson())
.then(json => io.emit('unlocked', json))
})
Great. That should be enough to get the front end working.
Building a Front End with Vue.js
Essentially, we want a framework that allows us to build an application quickly and that can consume messages via websockets using jabascript. My personal preference is Vue.js but you could te easily swap out Vue.js for React, Angular, Ember, jQuery, Backbone (is that still a thing?!), YourFavouriteFrontEndFramework and the result would be the same.
The @vue/cli
tools allow you to quickly generate a project template and get started in minutes. The following command creates a new project in the ui
folder.
vue create ui
There are several ways to include the Socket.io code in a vue project, but because the implementation is relatively simple I'll just use the socket.io-client
package.
cd ui
npm i --save socket.io-client
# serve the project in dev mode
npm run serve
The easiest way to create draggable elements is through SVG. So inside the App.vue
component, I'll just add an svg element and use a custom Vue component to represent the nodes - each generating a draggable <circle>
element.
When the UI receives the welcome message, it should populate a nodes
array. Each time a locked or unlocked message is received, the corresponding item in that array will be updated and that status reflected in the UI.
App.vue
<template>
<div id="app">
<svg height="100%" width="100%" style="background: #f2f2f2">
<!-- Node circles will go here -->
</svg>
</div>
</template>
<script>
import Node from '@/components/Node'
import io from 'socket.io-client';
// There is probably a more sophisticated way of doing this...
const socket = io('http://localhost:3000');
export default {
name: 'app',
components: {
Node,
},
data: () => ({
loading: true,
selected: null,
nodes: [],
}),
}
</script>
For the draggable nodes, I'll simply wrap a Vue component <circle>
component and apply some handlers. This way, I can encapsulate the locking and dragging logic and then send back events to the parent component when something interesting happens. I've defined these
Node.vue
<template>
<circle
:cx="x" :cy="y" :r="r" :style="style"
:title=" JSON.stringify(node) "
/>
</template>
<script>
export default {
name: 'Node',
props: {
node: Object,
selected: String,
lock: Function,
move: Function,
unlock: Function,
r: {
type: Number,
default: 50,
}
},
// Start with default values for x and y that will be changed based
// on the values of the node object passed as a prop
data: () => ({ x: 0, y: 0, }),
}
</script>
Because the x and y coordinates can be changed, I can't use these directly from the node object. So the data()
function will return default values. Then as the component is mounted, or the node object updates this position should be overwritten.
Node.vue
export default {
name: 'Node',
// ...
data: () => ({ x: 0, y: 0, }),
methods: {
reposition() {
// Set x and y based on the Node object
this.x = this.node.x;
this.y = this.node.y;
},
},
mounted() {
// Run the function when the component is first mounted
this.reposition()
},
watch: {
// ... and for any subsequent change to the node object
node() {
this.reposition()
},
},
}
Locking the Node
On mouse down, I want to send a message to the server to say that the node has been locked. This will be taken care of by the lock
function passed as a prop, but I will also need to check that another user hasn't locked the nodeso I need to wrap it in a handler function. First I need to define a function in the component, then bind that to the onmousedown event.
// ...
methods: {
handleLock(e) {
// Don't do anything if it's already locked
if ( this.node.status === "locked" ) {
return;
}
this.lock(this.node)
},
},
// ...
To bind events to components, I prefer the shorthand @
syntax - so for example, I can bind the function on mouse down, and then use .prevent
to prevent any default behaviour (the equivalent of calling event.preventDefault()
).
<template>
<circle
:cx="x" :cy="y" :r="r" :style="style"
:title=" JSON.stringify(node) "
@mousedown.prevent="handleLock"
@mousemove.prevent="handleMove"
@mouseup.prevent="handleUnlock"
@mouseout.prevent="handleUnlock"
/>
</template>
Handling the Drag
Handling the dragging of a component in an SVG is fairly trivial - and is defined in great detail here. On mouse move, if this node is selected by the current user, I want to set the cx
and cy
properties to the position of the mouse. This information is available from the event. Because the SVG element is the parent element of the component, it can be accessed by using this.$el
.
handleMove(e) {
if ( this.selected === this.node.id ) {
this.$el.setAttributeNS(null, "cx", e.clientX)
this.$el.setAttributeNS(null, "cy", e.clientY)
}
},
Then, once the Node has been dropped, I want to unlock the node and send the new position back to the API.
handleUnlock(e) {
if ( this.selected === this.node.id ) {
this.unlock(this.node, {x: e.clientX, y: e.clientY})
}
},
Client to Server Communication
Sending and receiving messages from the Socket.io client are similar to the code above. The socket
object has an emit(message, body)
function which. As this will be identical for all messages, I'll implement it as a method on the App
component. For a production ready app you might want to move this into it's own service.
Each of the messages then has a similar signature, just sending a different event name and payload.
App.vue
export default {
name: 'app',
// ...
methods: {
send(message, body) {
console.log('sending', message, body)
return socket.emit(message, body)
},
lock(node) {
const { id, } = node;
// Set selected node in component
this.selected = node.id
this.send('lock', { id, })
},
move(node, data) {
const { id, } = node;
this.send('move', { id, ...data })
},
unlock(node, data) {
// Clear selected node in component
this.selected = null
const { id, } = node;
this.send('unlock', { id, ...data })
},
},
}
Now the server can send events, all that is needed is to receive any events that come back. The created()
function is fired when the component is created - this is a perfect place to bind some event listeners for the socket. As a message comes in, the nodes prop for the component should be updated to reflect the change - whether that is a node locked or unlocked.
export default {
name: 'app',
// ...
created() {
// Add listeners to set the initial app state on connection
socket.on('welcome', nodes => {
this.nodes = nodes
this.loading = false
})
// Handle another user locking a node
socket.on('locked', e => {
const index = this.locked.indexOf(e.id);
if ( index > -1 ) {
this.locked.splice(index, 1)
}
// Update status
const nodeIndex = this.nodes.findIndex(n => n.id == e.id)
this.nodes[ nodeIndex ].status = "locked"
console.log('LOCKED!', e)
})
// Handle a node becoming unlocked
socket.on('unlocked', e => {
// Remove from locked
const index = this.locked.indexOf(e.id);
if ( index > -1 ) {
this.locked.splice(index, 1)
}
// Update status and position
const nodeIndex = this.nodes.findIndex(n => n.id == e.id)
this.nodes[ nodeIndex ].x = e.x
this.nodes[ nodeIndex ].y = e.y
this.nodes[ nodeIndex ].status = "unlocked"
console.log('UNLOCKED!', e)
})
},
}
A quick test shows that any changes made in one browser window are reflected in other browser windows.
Publishing Updates with Neo4j Streams
This is fine for a small application, but wouldn't work at scale. If you try to run the API in a load balanced environment only the other users that happen to be connected to the server that received the update would be updated. This is where Kafka comes in, when data is updated in Neo4j we can push it out to a Kafka topic. Then, we can create a Consumer in api/src/index.js
which will consume the topic and then relay the changes out to the front end via websockets.
There are two ways of achieving this. The more complex option would be to publish the changes from the application server, or use the Neo4j Streams plugin to turn Neo4j into a Publisher.
Neo4j Streams is a neo4j plugin that allows you to create Producers and/or Consumers directly inside neo4j by editing a few lines of neo4j.conf
. By installing the plugin, a set of procedures are made available in neo4j to publish. You could CALL streams.publish(topic, payload)
in cypher but that still seems like too much work.
Neo4j Streams also allows you to publish to or consume from a Kafka topic with a few lines of config.
Installing Neo4j Streams
Full installation instructions are available here but in short, you download the *.jar file for the latest release and copy it to the plugins
folder of your neo4j instance. Then, in neo4j.conf
add in a couple of items with references to the zookeeper server and any kafka servers to bootstrap to (separated by a comma).
neo4j.conf
kafka.zookeeper.connect=zookeeper.url:2181
kafka.bootstrap.servers=kafka.url:9092
Configuring the Sink
By configuring the neo4j server as a Sink, you can publish updates to a Kafka Topic at the end of every transaction. Neo4j Streams allows you to set this up by adding a few more lines to the neo4j.conf
file. After configuring the server as a source, there are options to define patterns, each of which will be checked when a transaction is committed.
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
I have a queue set up in kafka called locks
, so I can configure neo4j to update kafka when the status, x or y properties of a Node
node are updated. You can either supply a wildcard {*}
to include all properties or provide a list inside braces separated by a comma ({prop1, prop2}
). Prefixing a property with a minus sign {-prop3}
will exclude it from the filter. You can apply many patterns to this config by splitting with a semicolon.
# enable it
streams.source.enabled=true
streams.source.topic.nodes.locks=Node{status,x,y}
After restarting the database to apply the settings and running a cypher query - you can see that
MATCH (n:Node)
SET n.status = 'locked', n.x = rand()*100, n.y = rand()*100
Listening for Changes in the API
Now, the only thing left to do is to messages published to the Kafka topic in the API and push those out to the front end. For this, the API will need a Kafka consumer. This is done with the kafka-node
package installed earlier.
The first step is to create a client to connect to the Kafka host. Because .env
is already setup and working with neode, I'll add a the configuration options for the kafka server(s), topic and consumer group to .env
.env
KAFKA_SERVER=kafka.url:9092
KAFKA_TOPIC=locks
KAFKA_GROUP=LocksGroup1
Then the next step is to create a client to connect to Kafka, then a consumer that will listen to the locks topic. By specifying a group, you can either split the consumption of the topic over a number of servers, or in this case by supplying different groups for each application server - each application server and subsequent user will receive the update.
api/src/consumer.js
const kafka = require('kafka-node')
// Client
const client = new kafka.KafkaClient({ kafkaHost: process.env.KAFKA_SERVER });
// Consumer
const consumer = new kafka.Consumer(
client,
// You can supply more than one topic here
[ { topic: process.env.KAFKA_TOPIC, partition: 0 } ],
{
groupId: process.env.KAFKA_GROUP,
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: 'utf8',
fromOffset: false
}
);
// Export it so we can use it later
module.exports = consumer;
Then the only thing left to do is consume the messages as the come in from the topic. Because the consumer is only listening on a single topic there's no need to check the message when it comes in - just the type.
The interface for listening is the same as socket.io, when a message comes in we want to emit it to each connected socket.
api/src/io.js
// Now with added consumer parameter
const createSocket = (server, neode, consumer) => {
// ...
// Listen for Kafka
consumer.on('message', ({ value, }) => {
// Parse the JSON value into an object
const { payload, } = JSON.parse(value)
// Get the properties from the update
const { properties, } = payload.after
// ... and the status
const { status, } = properties
console.log('\n\nemitting from kafka:', status, properties)
// Emit the message through all connected sockets
io.emit(status, properties)
})
}
That's it! When a message comes in, the payload will include a value that includes before
and after
objects that show the state of the node before and after the transaction. Because the status properties are the same as the messages, this can be used as the message name, and the entire property object can be sent as the payload.
Next Steps
This is quite a basic example, but you could use this to do some pretty clever things. For example, neo4j could push the information to another topic, which another topic listens to, runs some aggregation and then passes the new value to another queue.
Neo4j could be used as a Consumer as well as a Producer and process messages directly from a Kafka topic.
On the websockets side, Socket.io has Rooms and Namespaces functionality that could be used for a multi-tenancy system. As a connection is received, some authentication could take place that resulted in the user joining a room belonging to it's organisation. That way only the relevant messages are sent to each user.
The code is up on github, feel free to clone and play for yourself.
If you have any questions or comments, I'm @adamcowley or feel free to continue the conversation on the Neo4j Community site.
Posted on February 28, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.