Persisting event data to Postgres using GenStage and EventBus

mustafaturan

Mustafa Turan

Posted on July 31, 2018

Persisting event data to Postgres using GenStage and EventBus

One of the ways to consume EventBus events is implementing GenStage consumers. GenStage handles backpressure easily with configurable workers. event_bus_postgres library uses GenStage to persist event_bus events to postgres DB with batch insert.

How it works

+-----+
|     |                                         GEN STAGE
|     |        EVENTBUS      +------------------------------------------+
|     |        CONSUMER      |                   +---+                  |
|     |        +-----+       |                   |   |                  |
|     |        |     |       |                   |   |          +---+   |
|     |        |  E  |       |                   |   |          |   |   |
|     |        |  v  |       |                   |   |          |   |   |
|     |        |  e  |       |                   |   |          |   |   |
|  E  |        |  n  |       |                   | E |          |   |   |
|  l  |        |  t  |       |  +-------+        | v |          |   |   |
|  i  | topic  |  B  |  topic   |       |        | e |          |   |
|  x  |   +    |  u  |    +     |   Q   |        | n |          | B |       +--+
|  i  |event_id|  s  | event_id |   u   |   ask  | t |    ask   | u |       |  |
|  r  |------->|  .  |--------->|   e   |<-------|   | <--------| c | BATCH |  |
|     |        |  P  |          |   u   |------->| M | -------->| k |------>|DB|
|  E  |        |  o  |          |   e   |   pull | a |    pull  | e | INSERT|  |
|  v  |        |  s  |          |       |        | p |          | t |       |  |
|  e  |        |  t  |       |  +-------+        | p |          |   |   |   +--+
|  n  |        |  g  |       |  GENSTAGE         | e |          |   |   |
|  t  |        |  r  |       |  PRODUCER         | r |          |   |   |
|  B  |        |  e  |       |                   |   |          |   |   |
|  u  |        |  s  |       |                   |   |          |   |   |
|  s  |        +-----+       |                   |   |          |   |   |
|     |<-----------------------------------------|   |          +---+   |
|     |                      |    fetch_event/1  |   |         CONSUMER |
|     |                      |                   |   |                  |
+-----+                      |                   +---+                  |
                             |                  CONSUMER                |
                             |                  PRODUCER                |
                             +------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

Components

EventBus
Message bus for Elixir; it publishes event_id and topic data to topic subscribers.

EventBus.Postgres
Message bus event consumer; it pushes event_id and topic to the EventBus.Postgres.Queue

Queue
GenStage producer; it is a simple queue implementaion

EventMapper
GenStage producer-consumer; it pulls/dequeues from EventBus.Postgres.Queue, and fetch original event from EventBus and then convert data into Ecto model.

Bucket
GenStage consumer; it pulls/dequeues from EventBus.Postgres.EventMapper and batch insert data to Postgres DB.

Source code: https://github.com/otobus/event_bus_postgres

💖 💪 🙅 🚩
mustafaturan
Mustafa Turan

Posted on July 31, 2018

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related