gRPC Streaming: Best Practices and Performance Insights

ramonberrutti

Ramón Berrutti

Posted on November 30, 2024

gRPC Streaming: Best Practices and Performance Insights

Introduction

gRPC streaming allows protobuf messages to be streamed from client to server, server to client, or bidirectionally.
This powerful feature can be used to build real-time applications such as chat applications, real-time monitoring dashboards, and more.

In this article, we will explore how to use gRPC streaming correctly.

Prerequisites

  • Basic knowledge of gRPC
  • Basic knowledge of Go programming language (The sample code is written in Go, but the concept can be applied to other languages as well)
  • The code examples are available on GitHub

Good Practices

Let's check the good practices for using gRPC streaming:

Use unary request for unary request

One common mistake is to use streaming for unary requests.
For example, consider the following gRPC service definition:

service MyService {
  rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {}
}
Enter fullscreen mode Exit fullscreen mode

If the client only needs to send one request and receive one response,
You don't need to use streaming. Instead, we can define the service as follows:

service MyService {
  rpc GetSomething (SomethingRequest) returns (SomethingResponse) {}
}
Enter fullscreen mode Exit fullscreen mode

By using streaming for unary requests, we are adding unnecessary complexity
to the code, which can make it harder to understand and maintain and not
gaining any benefits from using streaming.

Golang code example comparing unary request and streaming request:

Unary request:

type somethingUnary struct {
    pb.UnimplementedSomethingUnaryServer
}

func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) {
    return &pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }, nil
}

func TestSomethingUnary(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingUnaryServer(s, &somethingUnary{})
    })

    client := pb.NewSomethingUnaryClient(conn)

    response, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}
Enter fullscreen mode Exit fullscreen mode

Streaming unary request:

type somethingStream struct {
    pb.UnimplementedSomethingStreamServer
}

func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error {
    if err := stream.Send(&pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }); err != nil {
        return err
    }

    return nil
}

func TestSomethingStream(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingStreamServer(s, &somethingStream{})
    })

    client := pb.NewSomethingStreamClient(conn)

    stream, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something stream: %v", err)
    }

    response, err := stream.Recv()
    if err != nil {
        t.Fatalf("failed to receive response: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}
Enter fullscreen mode Exit fullscreen mode

As we can see, the code for unary requests is simpler and easier to understand
than the code for streaming requests.

Sending multiple documents at once if we can

Let's compare these two service definitions:

service BookStore {
  rpc ListBooks(ListBooksRequest) returns (stream Book) {}
}

service BookStoreBatch {
  rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {}
}

message ListBooksResponse {
  repeated Book books = 1;
}
Enter fullscreen mode Exit fullscreen mode

BookStore streams one book at a time, while BookStoreBatch streams multiple books simultaneously.

If the client needs to list all books, it is more efficient to use BookStoreBatch
because it reduces the number of round trips between the client and the server.

Let's see the Golang code example for BookStore and BookStoreBatch:

BookStore:

type bookStore struct {
    pb.UnimplementedBookStoreServer
}

func (s *bookStore) ListBooks(req *pb.ListBooksRequest, stream pb.BookStore_ListBooksServer) error {
    for _, b := range bookStoreData {
        if b.Author == req.Author {
            if err := stream.Send(&pb.Book{
                Title:           b.Title,
                Author:          b.Author,
                PublicationYear: int32(b.PublicationYear),
                Genre:           b.Genre,
            }); err != nil {
                return err
            }
        }
    }
    return nil
}

func TestBookStore_ListBooks(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreServer(s, &bookStore{})
    })

    client := pb.NewBookStoreClient(conn)

    stream, err := client.ListBooks(
        context.Background(),
        &pb.ListBooksRequest{
            Author: charlesDickens,
        },
    )
    if err != nil {
        t.Fatalf("failed to list books: %v", err)
    }

    books := []*pb.Book{}
    for {
        book, err := stream.Recv()
        if err != nil {
            break
        }
        books = append(books, book)
    }

    if len(books) != charlesDickensBooks {
        t.Errorf("unexpected number of books: %d", len(books))
    }
}
Enter fullscreen mode Exit fullscreen mode

BookStoreBatch:

type bookStoreBatch struct {
    pb.UnimplementedBookStoreBatchServer
}

func (s *bookStoreBatch) ListBooks(req *pb.ListBooksRequest, stream pb.BookStoreBatch_ListBooksServer) error {
    const batchSize = 10
    books := make([]*pb.Book, 0, batchSize)
    for _, b := range bookStoreData {
        if b.Author == req.Author {
            books = append(books, &pb.Book{
                Title:           b.Title,
                Author:          b.Author,
                PublicationYear: int32(b.PublicationYear),
                Genre:           b.Genre,
            })

            if len(books) == batchSize {
                if err := stream.Send(&pb.ListBooksResponse{
                    Books: books,
                }); err != nil {
                    return err
                }
                books = books[:0]
            }
        }
    }

    if len(books) > 0 {
        if err := stream.Send(&pb.ListBooksResponse{
            Books: books,
        }); err != nil {
            return nil
        }
    }

    return nil
}

func TestBookStoreBatch_ListBooks(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{})
    })

    client := pb.NewBookStoreBatchClient(conn)

    stream, err := client.ListBooks(
        context.Background(),
        &pb.ListBooksRequest{
            Author: charlesDickens,
        },
    )
    if err != nil {
        t.Fatalf("failed to list books: %v", err)
    }

    books := []*pb.Book{}
    for {
        response, err := stream.Recv()
        if err != nil {
            break
        }
        books = append(books, response.Books...)
    }

    if len(books) != charlesDickensBooks {
        t.Errorf("unexpected number of books: %d", len(books))
    }
}
Enter fullscreen mode Exit fullscreen mode

From the code above, it needs to be clarified which one is better.
Let's run a benchmark to see the difference:

BookStore benchmark:

func BenchmarkBookStore_ListBooks(b *testing.B) {
    conn := newServer(b, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreServer(s, &bookStore{})
    })

    client := pb.NewBookStoreClient(conn)

    var benchInnerBooks []*pb.Book
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        stream, err := client.ListBooks(
            context.Background(),
            &pb.ListBooksRequest{
                Author: charlesDickens,
            },
        )
        if err != nil {
            b.Fatalf("failed to list books: %v", err)
        }

        books := []*pb.Book{}
        for {
            book, err := stream.Recv()
            if err != nil {
                break
            }
            books = append(books, book)
        }

        benchInnerBooks = books
    }

    benchBooks = benchInnerBooks
}
Enter fullscreen mode Exit fullscreen mode

BookStoreBatch benchmark:

func BenchmarkBookStoreBatch_ListBooks(b *testing.B) {
    conn := newServer(b, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{})
    })

    client := pb.NewBookStoreBatchClient(conn)

    var benchInnerBooks []*pb.Book
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        stream, err := client.ListBooks(
            context.Background(),
            &pb.ListBooksRequest{
                Author: charlesDickens,
            },
        )
        if err != nil {
            b.Fatalf("failed to list books: %v", err)
        }

        books := []*pb.Book{}
        for {
            response, err := stream.Recv()
            if err != nil {
                break
            }
            books = append(books, response.Books...)
        }

        benchInnerBooks = books
    }

    benchBooks = benchInnerBooks
}
Enter fullscreen mode Exit fullscreen mode

Benchmark results:

BenchmarkBookStore_ListBooks
BenchmarkBookStore_ListBooks-12                      732           1647454 ns/op           85974 B/op       1989 allocs/op
BenchmarkBookStoreBatch_ListBooks
BenchmarkBookStoreBatch_ListBooks-12                1202            937491 ns/op           61098 B/op        853 allocs/op
Enter fullscreen mode Exit fullscreen mode

What an improvement! BookStoreBatch is faster than BookStore by a factor of 1.75x.

But why is BookStoreBatch faster than BookStore?

Each time that the server sends a message stream.Send() to the client, needs to
encode the message and send it over the network. By sending multiple documents
at once, we reduce the number of times that the server needs to encode and send
the message, which improves the performance not only for the server but also
for the client that needs to decode the message.

In the above example, the batch size is set to 10, but the client can adjust it based on the network conditions and the size of the documents.

Use bidirectional streaming to control the flow

The bookstore example returns all the books and finishes the stream, but if the client
needs to watch for events in real-time (e.g., sensors), the use of bidirectional
streaming is the right choice.

Bidirectional streams are a bit tricky because both the client and the server
can send and receive messages at the same time. Hopefully, golang will make it easy
to work with concurrency like this.

As mentioned, a sensor can be an excellent example of bidirectional streaming.
The watch function allows the client to decide which sensors to watch and request
the current value if needed.

Let's take a look at the following protobuf definition:

service Sensor {
  rpc Watch(stream WatchRequest) returns (stream WatchResponse) {}
}

message WatchRequest {
  oneof request {
    WatchCreateRequest create_request = 1;
    WatchCancelRequest cancel_request = 2;
    WatchNowRequest now_request = 3;
  }
}

message WatchCreateRequest {
  // sensor_id contains the sensor id to watch.
  string sensor_id = 1;
}

message WatchCancelRequest {
  // sensor_id contains the sensor id to cancel.
  string sensor_id = 1;
}

message WatchNowRequest {
  // sensor_id contains the sensor id to get the current value.
  string sensor_id = 1;
}

message WatchResponse {
  // sensor_id contains the sensor id for the current response.
  string sensor_id = 1;
  // created is true if the watch was created successfully.
  bool created = 2;
  // canceleted is true if the watch was canceled successfully or if the creation failed.
  bool canceleted = 3;
  // error contains the error message if something went wrong.
  string error = 4;
  // timestamp contains the timestamp of the value.
  google.protobuf.Timestamp timestamp = 5;
  // value contains the value of the sensor.
  int32 value = 6;
}
Enter fullscreen mode Exit fullscreen mode

The request message is not only a stream of messages but also a message that can
contain different types of requests. The oneof directive allows us to define a
field that can contain only one of the specified types.

The golang code for the sensor will ignore, but you can found it here

serverStream wraps the stream and the sensor data to make it easier to work with.

type serverStream struct {
    s           *sensorService         // Service
    stream      pb.Sensor_WatchServer  // Stream
    sendCh      chan *pb.WatchResponse // Control channel
    sensorCh    chan sensorData        // Data channel
    sensorWatch map[string]int         // Map of sensor id to watch id
}
Enter fullscreen mode Exit fullscreen mode

As noted before, the server can send and receive messages at the same time, one
function will handle the incoming messages and another function will handle the
outgoing messages.

Receiving messages:

func (ss *serverStream) recvLoop() error {
    defer ss.close()
    for {
        req, err := ss.stream.Recv()
        if errors.Is(err, io.EOF) {
            return nil
        }
        if err != nil {
            return err
        }

        switch req := req.Request.(type) {
        case *pb.WatchRequest_CreateRequest:
            // IGNORE VALIDATION (check the full code)

            // create a channel to send data to the client
            id := sensor.watch(ss.sensorCh)
            ss.sensorWatch[sensorId] = id

            // send created message
            ss.sendCh <- &pb.WatchResponse{
                SensorId: sensorId,
                Created:  true,
            }

        case *pb.WatchRequest_CancelRequest:
            // IGNORE VALIDATION (check the full code)

            // cancel the watch
            ss.s.sensors[sensorId].cancel(id)
            delete(ss.sensorWatch, sensorId)

            ss.sendCh <- &pb.WatchResponse{
                SensorId:   sensorId,
                Canceleted: true,
            }

        case *pb.WatchRequest_NowRequest:
            // IGNORE VALIDATION (check the full code)

            // send current value
            ss.sendCh <- &pb.WatchResponse{
                SensorId:  sensorId,
                Timestamp: timestamppb.Now(),
                Value:     int32(sensor.read()),
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The switch statement is used to handle the different types of requests and decide
what to do with each request. It's important to leave the recvLoop function only
to read and don't send messages to the client for this reason we have the sendLoop
that will read the messages from the control channel and send it to the client.

Sending messages:

func (ss *serverStream) sendLoop() {
    for {
        select {
        case m, ok := <-ss.sendCh:
            if !ok {
                return
            }

            // send message
            if err := ss.stream.Send(m); err != nil {
                return
            }

        case data, ok := <-ss.sensorCh:
            if !ok {
                return
            }

            // send data
            if err := ss.stream.Send(&pb.WatchResponse{
                SensorId:  data.id,
                Timestamp: timestamppb.New(data.time),
                Value:     int32(data.val),
            }); err != nil {
                return
            }

        case <-ss.stream.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The sendLoop function reads both the control channel and the data channel and sends
the messages to the client. If the stream is closed, the function will return.

Finally, a happy path test for the sensor service:

func TestSensor(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSensorServer(s, &sensorService{
            sensors: newSensors(),
        })
    })

    client := pb.NewSensorClient(conn)

    stream, err := client.Watch(context.Background())
    if err != nil {
        t.Fatalf("failed to watch: %v", err)
    }

    response := make(chan *pb.WatchResponse)
    // Go routine to read from the stream
    go func() {
        defer close(response)
        for {
            resp, err := stream.Recv()
            if errors.Is(err, io.EOF) {
                return
            }
            if err != nil {
                return
            }
            response <- resp
        }
    }()

    createRequest(t, stream, "temp")
    waitUntilCreated(t, response, "temp")
    waitForSensorData(t, response, "temp")

    createRequest(t, stream, "pres")
    waitUntilCreated(t, response, "pres")
    waitForSensorData(t, response, "pres")

    waitForSensorData(t, response, "temp")
    waitForSensorData(t, response, "pres")

    // invalid sensor
    createRequest(t, stream, "invalid")
    waitUntilCanceled(t, response, "invalid")

    nowRequest(t, stream, "light")
    waitForSensorData(t, response, "light")
    // Wait for 2 seconds to make sure we don't receive any data for light
    waitForNoSensorData(t, response, "light", 2*time.Second)

    cancelRequest(t, stream, "temp")
    waitUntilCanceled(t, response, "temp")

    waitForSensorData(t, response, "pres")
    // Wait for 2 seconds to make sure we don't receive any data for temp
    waitForNoSensorData(t, response, "temp", 2*time.Second)

    err = stream.CloseSend()
    if err != nil {
        t.Fatalf("failed to close send: %v", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

From the test above, we can see that the client can create, cancel, and get the current
value of a sensor. The client can also watch multiple sensors at the same time.

Challenge Yourself

  • Implement a chat application using gRPC streaming.
  • Modify the sensor service to send multiple values at once to save round trips.
  • Sniff the network traffic to see the difference between unary request and streaming request.

Conclusion

gRPC streaming is a versatile and powerful tool for building real-time applications.
By following best practices like using streaming only when necessary, batching data efficiently, and leveraging bidirectional streaming wisely, developers can maximize performance
and maintain code simplicity.
While gRPC streaming introduces complexity, its benefits far outweigh the challenges
when applied thoughtfully.

Stay in touch

If you have any questions or feedback, feel free to reach out to me on LinkedIn.

💖 💪 🙅 🚩
ramonberrutti
Ramón Berrutti

Posted on November 30, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related