First steps with Streammachine.io
Bart van Deenen
Posted on October 28, 2020
This post aims to show some first steps with the Stream Machine platform. The target audience is developers, data engineers and possibly data scientists. This post uses Python, but it can similarly be done with Stream Machine's Other language drivers.
About me: I'm Bart van Deenen, lead engineer of Stream Machine, so I should sort of know what I'm talking about :-). This post is meant to be unbiased, and from the perspective of a third-party developer.
Stream Machine
Stream Machine promises to provide Lightning fast, privacy secured, customer data - you can actually use
.
So what does this actually mean?
- Stream Machine accepts events with a strictly defined serialization schema (currently Apache Avro and Json-Schema are supported). Any valid schema can be used, but needs to be registered with Stream Machine.
- Stream Machine handles events that are subject to an event-contract that defines which fields in the event schema (the serialization schema) contain Personally Identifiable Information ( referred to as PII or PII Data).
- Stream Machine event contracts may contain customizable validation rules that define field value validity.
- Stream Machine processes events with a highly available fault tolerant stream processing system that encrypts all PII field values. The encryption keys are rotated every 24 hours, and this leads to a GDPR compliant stream of event data that can be used by everyone in your company. During the 24 hours, the encrypted values remain static.
- Stream Machine events contains consent-level information, and only those events that allow decryption of PII data for certain purposes can be decrypted into decrypted stream(s) that can only be used by those inside your company that are allowed.
This post uses a debugging output of the stream data that uses a websocket. Production level output streams require hooking up our internal Apache Kafka streams. AWS S3 and Google Cloud Storage buckets can be used for batch processing. This will be explored in a next blog post.
The plan
I'm going to build a Python application that mimics users clicking around on a dummy web-shop, that will send a click stream to Stream Machine. I want to retrieve the anonymized data from a Google Cloud bucket, and show them in a Jupyter notebook. I also want to see that only for those simulated users that have given full personalized marketing permissions I retrieve their click stream events. This first post just gets the basics working, i.e. sending events to Stream Machine, and retrieving them.
The steps
An account
I went to streammachine.io to register an account, and after confirming my email, I was shown this page:
Get the CLI
The cli is the most mature way to interact with Stream Machine. Install the most recent version and don't forget to install the shell auto-completion.
Let's create a new stream
strm auth login your-email@wherever
strm create stream demo --save
The --save
options causes the credentials to be saved in
~/.config/stream-machine/Stream/demo.json
.
Let's send an event!
I'm following along with the Python example in the documentation. I'm going to use the syncsender because I want to play with it in ipython.
The demo example uses the streammachine/demo/1.0.2
schema. You can see its full definition via strm get schema streammachine/demo/1.0.2
. The corresponding event-contract that defines validation rules and such is streammachine/example/1.3.0
.
For convenience I've copied the saved credentials file (see above) into the working directory.
Keep an eye on the stream
strm egress demo
This starts a websocket connection where you'll see a json serialization of the messages on a stream.
python3 -m venv .venv
. .venv/bin/activate
make
# observe the installed streammachine stuff.
pip freeze | grep streammachine
streammachine-driver==1.0.0
streammachine-schemas-common==1.0.0
streammachine-schemas-demo-avro==1.0.2
Ok, let's start ipython
python3 -m pip install ipython
$> ipython
# import a class that matches the structure of the demo schema
# note that this package name is so long because it's derived from the Stream
# Machine schema ref streammachine/demo and also the `namespace` inside the Avro
# schema (which is io.streammachine.schemas.demo.v1)
from streammachine_io_streammachine_schemas_demo_v1.io.streammachine.schemas.demo.v1 import DemoEvent
from streammachine.driver import StreamMachineEvent, current_time_millis
from streammachine.driver.client.syncsender import SyncSender
event = DemoEvent()
import json
creds = json.load(open("demo.json"))
sender = SyncSender(creds['ref']['billingId'], creds['credentials'][0]['clientId'],
creds['credentials'][0]['clientSecret'])
sender.start()
sender.wait_ready()
sender.send_event(event)
After the send_event(event)
I get the following error:
Error while sending event to Stream Machine (https://in.strm.services/event),
response status = 400, response: Invalid event contract: . Not supported.
We need to set strmMeta.eventContractRef
to a valid contract reference (strm list event-contracts
). While a schema defines the shape/structure of an event, an event-contract defines the rules that apply to an event, i.e. what validations, what fields contain personally identifiable data, which field ties the events into a sequence etc... We'll use streammachine/example/1.3.0
(inspect with strm get event-contract ..
)
event.strmMeta.eventContractRef="streammachine/example/1.3.0"
sender.send_event(event)
Error while sending event to Stream Machine (https://in.strm.services/event),
response status = 400, response: Field: 'consistentValue' in event with schema: 'streammachine/demo/1.0.2'
with value: '' doesn't match regex: '^.+$'
So the field consistentValue
needs at least one character?
event.consistentValue="hi there"
sender.send_event(event)
# nothing ....
A whole lot of nothing is returned. Stream Machine is meant for very high throughput and logging anything at thousands of events per second will quickly break the bank if you're using StackDriver. So None and http status 204 are the indicators that everything went fine.
But if we have the strm egress demo
running, we would see something like
strm egress demo
{
"strmMeta": {
"eventContractRef": "streammachine/example/1.3.0",
"nonce": -1515353731,
"timestamp": 1628688372401,
"keyLink": "cd181172-4ec7-4f0d-86bb-662fc0ee854b",
"billingId": "strmbart5986941267",
"consentLevels": []
},
"uniqueIdentifier": null,
"consistentValue": "AWpvnLU8hBPWRfYrAjWPs0wWt6vBMMXXEnSqGTw=",
"someSensitiveValue": null,
"notSensitiveValue": null
}
We can observe that the consistentValue field has some data in it (it's actually base64 encoded encrypted "hi there"). Explanation of the various fields in strmMeta
are explained in the documentation
Let's fill in some more fields:
import random, uuid
def create_avro_event():
event = DemoEvent()
event.strmMeta.eventContractRef = "streammachine/example/1.3.0"
event.strmMeta.consentLevels = [random.randint(0, 3)]
event.uniqueIdentifier = str(uuid.uuid4())
event.someSensitiveValue = "A value that should be encrypted"
event.consistentValue = "a-user-session"
event.notSensitiveValue = "Anyone is free to see this text."
return event
event = create_avro_event()
r = sender.send_event(event)
print(r)
None
In the strm egress demo
pane we see:
{
"strmMeta": {
"eventContractRef": "streammachine/example/1.3.0",
"nonce": 1820364498,
"timestamp": 1628689078226,
"keyLink": "5074c4de-c51b-4321-a70b-c87db4c79bde",
"billingId": "strmbart5986941267",
"consentLevels": [ 0 ]
},
"uniqueIdentifier": "AUvu95+NUDFf9krvvVUSU+pJsRBl9XahrMVCTpjqHDa9lTHBTzbRdjazyyMVi3xDy2Ps7HDxJHWA",
"consistentValue": "AUvu958J4Lf8JWlxwEfdMXXSZpjxdkBSL4hl8Tk5MVHp3L4=",
"someSensitiveValue": "AUvu95+bq6bw4Z1l9pTYLNQwd/ecdtntrH5mcBJNWv8n6n9jzYxKwEuSDUjig5lPNYqpZpU=",
"notSensitiveValue": "Anyone is free to see this text."
}
Just looking at it, I can see that all the PII field in the event-contract have been encrypted
Decrypting
We can create a decrypted stream that does contain personal data.
strm create stream --derived-from demo --levels 2 --save
Start sending data in a loop
import time
while True:
time.sleep(0.1)
sender.send_event(create_avro_event())
And observe in the egress:
strm egress demo-2
{
"strmMeta": {
"eventContractRef": "streammachine/example/1.3.0",
"nonce": 1165221388,
"timestamp": 1628689364936,
"keyLink": "5074c4de-c51b-4321-a70b-c87db4c79bde",
"billingId": "strmbart5986941267",
"consentLevels": [
3
]
},
"uniqueIdentifier": "279e9dcb-a9a5-497b-8d46-b213106f7fab",
"consistentValue": "a-user-session",
"someSensitiveValue": "AUvu95+bq6bw4Z1l9pTYLNQwd/ecdtntrH5mcBJNWv8n6n9jzYxKwEuSDUjig5lPNYqpZpU=",
"notSensitiveValue": "Anyone is free to see this text."
}
Note that someSensitiveValue
is still encrypted (observe the base64 encoding).
This is because
- we're asking for a stream with consent-levels up to 2.
- the data owner has given consent level 3, but we don't need it, didn't ask for it and so we don't get it. If we'd asked for level 3 this would also have been decrypted.
If we observe the consent levels in the decrypted stream:
strm egress demo-2 | jq -c .strmMeta.consentLevels
[3]
[3]
[2]
[2]
we only see [3]
and [2]
. Events with consent levels lower than these
don't get exported to this decrypted stream.
Conclusions
We can send data into Stream Machine, and receive them privacy safe and of guaranteed quality.
The next post in this series will show how to get all these data into a Google Cloud bucket, and how to use those data in a Jupyter notebook.
Posted on October 28, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 30, 2024
November 30, 2024