Familiar Day #4 - Getting somewhere

nickblow

NickBlow

Posted on June 21, 2019

Familiar Day #4 - Getting somewhere

If you're new here

I'm currently building an open source tabletop roleplaying assistant with the latest technologies. I'm starting with infrastructure and build tools, to ensure I've got a solid foundation. I've set up a decent starter Terraform config, and decided to move my JavaScript transpilation from ParcelJS to WebPack, as I ended up having to set a whole bunch of configuration on ParcelJS, which defeats the point of a 'no config' library. I said last time I'd work on WebPack, but I ended up working on the GraphQL Subscription Service instead, as I need it for something I'm working on professionally as well...

And a small correction for yesterday

I mentioned that lack of Server Side Rendering for Lit-Element was no big deal, because Lit-Element is fast. However, I realised this morning that this may cause issues with SEO, as even though Bing and Google claim to index Javascript, this is quite sporadic and unreliable. I will have a think about how to handle this, it would be nice to have the option to have a public campaign journal for people to follow along with, and making this indexable would be a good start.

As I mentioned yesterday, there's nothing stopping us mix and matching technologies with Lit-Element. So I think I'll create most of the campaign journal in Lit-HTML, using Web-Components for progressive enhancement of interactive elements. That sounds like a fruitful avenue to explore. The only issue is I need to be careful about my CSS scoping and naming for the parts that are not in ShadowDOM. Rehydration is still an open problem, but I might be able to just treat the Lit-HTML content as opaque HTML, and work around it - I could potentially even only use Lit-HTML on the server, but will need to work out what to do. One option I didn't mention yesterday is to exploit the fact that the browser treats unknown custom tags as HTMLUnknownElement and do something like:

<my-tag>
  <span slot="ssr">hello world</span>
</my-tag>

This will render 'hello world' even without any JS on the page, in a sort-of standards compliant way. There's also a proposal for declarative ShadowDOM that will solve this for good.

A quick note on project structure

I'm using a sort of monorepo approach, but each folder is treated as a fully isolated service. It's not going to get to millions of lines of code (well, who knows, but probably not). I'm not going to use code sharing between modules, aside from the client side JS and the server that will serve that JS & HTML, as that makes isomorphic apps the easiest. If I want to have a module that is shared between the two, I won't do import myFile from '../../../../../../otherFolder/file.ts or the equivalent in Go. I will publish that to Git as an entirely separate repository (although that said, with the new Go Modules) I might be able to do something fancy that still maintains isolation between services.

Webpack & our project

We'll probably need to have multiple webpack configurations, one for server and one for web. I've also kept a top level package.json with project info, but will put all the config for the frontend in the frontend folder.

How I'm approaching Git

Right now I've pushed everything to master. My philosophy is that master should always be deployable, and represent the current state of the system. If I'm building a breaking change or new feature, I'll build it on a branch, and only merge it in if I'd be happy deploying it live. It's quite difficult at the start because we don't have anything built. I'll probably start building the frontend on a new branch now, and merge it in once I'm happy for that code to go live on the web.

What I'm doing today

I'm going to be using GraphQL subscriptions for something I'm doing with work, so I decided to work on that instead of fiddling with WebPack. I updated something I had working on AWS to be a bit more pluggable. Eventually I'll get round to having end-to-end examples running in AWS and GCP.

Here's the progress I made https://github.com/NickBlow/gqlssehandlers/commit/ab1333eb6fffb92f8522cbe73ef90952a8c1b932

It works locally, go run examples/example-server.go and then just curl 0.0.0.0:8080 to see it working. Right now, it just multicasts to all clients, sending a "Hello" message every few seconds, and a keep-alive if nothing's been received in 3 seconds.

I want this to be somewhat pluggable, but unfortunately it does look like a lot of the work will be on writing the interface for the configuration - you need to handle storing the subscriptions, populating the graphql parameters and also listening to the event stream. However, this should make it reasonable flexible.

Here's what a basic server set up looks like:

package main

import (
    "log"
    "net/http"
    "time"

    "github.com/NickBlow/gqlssehandlers"
    "github.com/NickBlow/gqlssehandlers/examples/adapters"

    gorrilaHandlers "github.com/gorilla/handlers"
    "github.com/gorilla/mux"
)

func main() {

    router := mux.NewRouter()
    eventStream := &adapters.InMemoryAdapter{}

    subscriptionServerConfig := &gqlssehandlers.HandlerConfig{
        Adapter:         eventStream,
        EventBufferSize: 100,
    }

    handlers := gqlssehandlers.GetHandlers(subscriptionServerConfig)
    router.Handle("/", handlers.PublishStreamHandler).Methods("GET")
    router.Handle("/subscriptions", handlers.SubscribeHandler).Methods("POST")
    router.Handle("/subscriptions", handlers.UnsubscribeHandler).Methods("DELETE")

    originsOk := gorrilaHandlers.AllowedOrigins([]string{"https://example.com"})
    methodsOk := gorrilaHandlers.AllowedMethods([]string{"GET", "POST", "OPTIONS", "DELETE"})

    // Server has long write timeout because we're supporting a streaming response.
    srv := http.Server{
        Addr:         ":8080",
        Handler:      gorrilaHandlers.CORS(originsOk, methodsOk)(router),
        ReadTimeout:  15 * time.Second,
        WriteTimeout: 15 * time.Minute,
    }

    log.Printf("Server starting on port 8080")
    log.Fatal(srv.ListenAndServe())

}

Right now, the library returns a subscription handler, an unsubscribe handler and a streaming handler. Because Server Sent Events only works over GET requests, I'll need to figure out some way of adding authentication onto the streaming handler. The two options are setting a cookie on a handshake, or creating some kind of short lived one time password passing in a query string. Ideally I don't want to do anything like validate a JWT, but maybe we can make this configurable.

I definitely need to work on the Developer Experience (DX) of the library. Right now, the configuration looks like this:

// HandlerConfig represesents the configuration options for GQLSSEHandlers
// If the EventBuffer size has been set, the server will store events in memory,
// and re-send them if a client reconnects with a Last-Event-ID Header. This may result in duplicate messages being sent,
// so ensure your client is idempotent.
// This will also only send events that the server itself received.
// If you want to ensure a client always gets buffered messages, you can either use sticky sessions, route based on some hash, or multicast events to all servers.
type HandlerConfig struct {
    Adapter         SubscriptionAdapter
    EventBufferSize int64
}

And the SubscriptionAdapter looks like:


// SubscriptionAdapter represents an interface that begins listening to a stream of arbitrary events (e.g. a queue or a pub/sub service),
// and calls the callback with an array of interested clients whenever it receives an event.
type SubscriptionAdapter interface {
    StartListening(cb orchestration.NewEventCallback)
    NotifyNewSubscription(subscriber orchestration.SubscriptionData) error
    NotifyUnsubscribe(subscriptionID string) error
}

There's a lot of complexity hidden in this adapter, and for example in the AWS example I have working but haven't released the source of yet, the StartListening event is using SQS + SNS, and the NotifyNewSubscription/NotifyUnsubscribe methods are using DynamoDB, which is a lot of services to cram into one adapter. I'm also not convinced about the orchestration package's name.

SubscriptionData is also quite overloaded, as it contains all the information needed to do the GraphQL query. On the one hand, this shifts a lot of responsibility to the integrator of the library, however on the other hand this means you can do things like stick a DataLoader on to the context, and have much more control over what's executed.

// SubscriptionData encompasses a particular subscription and the parameters for the GraphQL Query that should be performed.
type SubscriptionData struct {
    ID            string
    GraphQLParams *graphql.Params
}

An example adapter

I still need to create a mock graphQL schema for this - for now the streaming server just multicasts the subscription ID to everyone.

package adapters

import (
    "time"

    "github.com/NickBlow/gqlssehandlers/internal/orchestration"
)

type subscriptionData = orchestration.SubscriptionData

var subscribersMap = make(map[string]subscriptionData)

// InMemoryAdapter stores subscribers in memory, and triggers events at random intervals
type InMemoryAdapter struct{}

//StartListening calls the callback at random intervals
func (a *InMemoryAdapter) StartListening(cb orchestration.NewEventCallback) {

    go func() {
        for {
            <-time.After(time.Second * 10)
            cb([]subscriptionData{subscriptionData{ID: "Hello"}})
        }
    }()

}

// NotifyNewSubscription adds a subscriber to the map
func (a *InMemoryAdapter) NotifyNewSubscription(subscriber subscriptionData) error {
    subscribersMap[subscriber.ID] = subscriber
    return nil
}

// NotifyUnsubscribe removes a subscriber from the map
func (a *InMemoryAdapter) NotifyUnsubscribe(subscriber string) error {
    delete(subscribersMap, subscriber)
    return nil
}

How it works:

Server sent events themselves are really easy: You need a writer that implements the Flusher interface. This blog post helped point me in the right direction. You then need to set a few headers:

Cache-Control: no-cache
Connection: keep-alive
Content-Type: text/event-stream
Transfer-Encoding: chunked

Make sure your content controls new lines appropriately see here and here.

This is my entire code for the streaming handler:

func (s *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
        return
    }
    closed := w.(http.CloseNotifier).CloseNotify()
    clientID, err := gonanoid.Nanoid()
    if err != nil {
        fmt.Println("Couldn't generate client ID")
        http.Error(w, "Error", http.StatusInternalServerError)
        return
    }
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    messageChan := make(chan string)

    s.Broker.NewClients <- orchestration.ClientInfo{
        ClientID:             clientID,
        CommunicationChannel: messageChan,
    }

Loop:
    for {
        select {
        case <-closed:
            s.Broker.ClosedClients <- clientID
            break Loop
        case <-time.After(time.Second * 3):
            fmt.Fprint(w, ":KEEPALIVE \n\n")
            flusher.Flush()
        case val := <-messageChan:
            fmt.Fprintf(w, "data:%v \n\n", val)
            flusher.Flush()
        }
    }
    fmt.Println("stopped main thread")
}

Just in case there's a lack of messages, the server sends a keepalive every 3 seconds, though I will increase this later. The example server has a timeout, which is good practice but as we're sending a streaming response I set this to 15 minutes. Event Source automatically reconnects, and we do also want to be able to drain connections on old servers so that when we update the software there's not clients connected forever, as well as being able to automatically clean up badly behaved and missing clients.

The referenced Broker is in charge of keeping track of connected clients, and sending messages to them from the above NewEventCallback. These messages will come down the client's messageChan.

Here's what the Broker looks like.

// ClientInfo contains information about a connected client
type ClientInfo struct {
    ClientID             string
    CommunicationChannel chan string
    LastSeenEventID      string
    subscriptions        []string
}

var subscriptionLookupTable = make(map[string]ClientInfo)

// Broker contains all the details to manage state of connected clients. 
type Broker struct {
    NewClients           chan ClientInfo
    ClosedClients        chan string
    NewSubscriptions     chan SubscriptionData
    executeSubscriptions chan SubscriptionData
    bufferedEvents       []interface{}
    clients              map[string]ClientInfo
}

// InitializeBroker creates a broker and starts listening to events
func InitializeBroker() *Broker {
    b := &Broker{
        NewClients:           make(chan ClientInfo),
        ClosedClients:        make(chan string),
        NewSubscriptions:     make(chan SubscriptionData),
        executeSubscriptions: make(chan SubscriptionData),
        bufferedEvents:       make([]interface{}, 0),
        clients:              map[string]ClientInfo{},
    }
    go b.listen()
    return b
}

// NewEventCallback is a function that should be called every time an event happens,
// and contain details of the subscriptions that should be called in response to that event
type NewEventCallback func(subscriptions []SubscriptionData)

// ExecuteQueriesAndPublish triggers the broker to perform the GraphQL Query specified in the SubscriptionData, and propagate it to the clients
func (b *Broker) ExecuteQueriesAndPublish(subscriptions []SubscriptionData) {
    for _, val := range subscriptions {
        b.executeSubscriptions <- val

    }
}

func (b *Broker) listen() {
    for {
        select {
        case client := <-b.NewClients:
            b.clients[client.ClientID] = client
        case client := <-b.ClosedClients:
            delete(b.clients, client)
        case event := <-b.executeSubscriptions:
            for _, client := range b.clients {
                client.CommunicationChannel <- event.ID
            }
        }
    }
}

Obviously it's still a work in progress, as ExecuteQueriesAndPublish doesn't do what it says on the tin!

Alrighty

Even though this is a 'GraphQL' server, I've yet to do anything with the subscriptions yet, or created an example schema... I'll get to that. I might also want to do an Apollo Link integration for this as well.

I still have plenty to do, but we made good progress today. Let's see how we get on over the weekend! I'm trying to do a little every day for as long as I can...

💖 💪 🙅 🚩
nickblow
NickBlow

Posted on June 21, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

What was your win this week?
weeklyretro What was your win this week?

November 29, 2024

Where GitOps Meets ClickOps
devops Where GitOps Meets ClickOps

November 29, 2024

How to Use KitOps with MLflow
beginners How to Use KitOps with MLflow

November 29, 2024

Modern C++ for LeetCode 🧑‍💻🚀
leetcode Modern C++ for LeetCode 🧑‍💻🚀

November 29, 2024