OpenTelemetry Distributed Tracing with ZIO
Linh Nguyen
Posted on November 1, 2021
Introduction
This post is some quick notes on using ZIO and zio-telemetry to implement OpenTelemetry distributed tracing for Scala applications.
The source code is available here.
This is not an introduction to any of these technologies, but here are a few good reads:
- NewRelic's introduction to Distributed Tracing.
- Lightstep's concise summary for OpenTelemetry.
Initial implementation
For demonstration purpose, we will perform manual instrumentation on a modified version of the zio-grpc's helloworld example, in which we incorporate both gRPC and HTTP communications:
-
Original:
hello-client
sends aHelloRequest
withname
x andhello-server
returns aHelloResponse
withmessage
Hello, x. -
Modified: in addition to the original behavior, client sends an optional integer field
guess
and server performs an HTTP request to HTTPBin based on its value.
Add the new flag guess
:
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
google.protobuf.Int32Value guess = 2;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
Add zio-grpc dependency
resolvers += Resolver.sonatypeRepo("snapshots")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3")
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.4")
val zioGrpcVersion = "0.5.1+12-93cdbe22-SNAPSHOT"
libraryDependencies ++= Seq(
"com.thesamet.scalapb.zio-grpc" %% "zio-grpc-codegen" % zioGrpcVersion,
"com.thesamet.scalapb" %% "compilerplugin" % "0.11.5"
)
Set up build.sbt
- Generate Scala code from
helloworld.proto
. - Depend on sttp for HTTP client.
val grpcVersion = "1.41.0"
val sttpVersion = "3.3.15"
val scalaPBRuntime = "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
val grpcRuntimeDeps = Seq(
"io.grpc" % "grpc-netty" % grpcVersion,
scalaPBRuntime,
scalaPBRuntime % "protobuf"
)
val sttpZioDeps = Seq(
"com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % sttpVersion
)
lazy val root = Project("opentelemetry-distributed-tracing-zio", file(".")).aggregate(zio)
lazy val zio = commonProject("zio").settings(
Compile / PB.targets := Seq(
scalapb.gen(grpc = true) -> (Compile / sourceManaged).value,
scalapb.zio_grpc.ZioCodeGenerator -> (Compile / sourceManaged).value
),
libraryDependencies ++= grpcRuntimeDeps ++ sttpZioDeps
)
Client implementation
- Create a gRPC client pointing to localhost:9000.
- Send a single
HelloRequest
. - Send 5
HelloRequest
s in parallel. - Send a single
HelloRequest
with an invalid guess. - Print "Done" and exit.
object ZClient extends zio.App {
private val clientLayer = GreeterClient.live(
ZManagedChannel(
ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext()
)
)
private val singleHello = GreeterClient.sayHello(HelloRequest("World"))
private val multipleHellos = ZIO.collectAllParN(5)(
List(
GreeterClient.sayHello(HelloRequest("1", Some(1))),
GreeterClient.sayHello(HelloRequest("2", Some(2))),
GreeterClient.sayHello(HelloRequest("3", Some(3))),
GreeterClient.sayHello(HelloRequest("4", Some(4))),
GreeterClient.sayHello(HelloRequest("5", Some(5)))
)
)
private val invalidHello = GreeterClient.sayHello(HelloRequest("Invalid", Some(-1))).ignore
private def myAppLogic =
singleHello *> multipleHellos *> invalidHello *> putStrLn("Done")
def run(args: List[String]): URIO[ZEnv, ExitCode] =
myAppLogic.provideCustomLayer(clientLayer).exitCode
}
Server implementation
- Fail the request if
guess
is less than 0. - Based on the value of
guess
, delay for some time and then send a request to HTTPBin.
type ZGreeterEnv = Clock with Random with SttpClient
object ZGreeterImpl extends RGreeter[ZGreeterEnv] {
def sayHello(request: HelloRequest): ZIO[ZGreeterEnv, Status, HelloReply] = {
val guess = request.guess.getOrElse(0)
for {
_ <- ZIO.fail(Status.INVALID_ARGUMENT).when(guess < 0)
code <- ???
delayMs = ???
_ <- httpRequest(code)
.delay(delayMs.millis)
.mapError(ex => Status.INTERNAL.withCause(ex))
} yield HelloReply(s"Hello, ${request.name}")
}
def httpRequest(code: Int): RIO[SttpClient, Unit] =
send(basicRequest.get(uri"https://httpbin.org/status/$code")).unit
}
Run it
- To run the server:
$ sbt "zio/runMain com.github.tuleism.ZServer"
[info] running (fork) com.github.tuleism.ZServer
[info] Server is running. Press Ctrl-C to stop.
- To run the client:
$ sbt "zio/runMain com.github.tuleism.ZClient"
[info] running (fork) com.github.tuleism.ZClient
[info] Done
[success] Total time: 12 s
At this point, we only know that it takes roughly 12 seconds for the client to initialize and finish its work.
Let's add distributed tracing to gain more insights into this.
Common tracing requirements
- For both client and server, we need to acquire a Tracer, an object responsible for creating and managing Spans.
- Tracing data is sent to Jaeger, which acts as a standalone collector.
Add new dependencies
- zio-telemetry's OpenTelemetry module.
- We also depend on zio-config to read tracing config from file and zio-magic to ease ZLayer wiring.
val openTelemetryVersion = "1.6.0"
val zioConfigVersion = "1.0.10"
val zioMagicVersion = "0.3.9"
val zioTelemetryVersion = "0.8.2"
val openTelemetryDeps = Seq(
"io.opentelemetry" % "opentelemetry-exporter-jaeger" % openTelemetryVersion,
"io.opentelemetry" % "opentelemetry-sdk" % openTelemetryVersion,
"io.opentelemetry" % "opentelemetry-extension-noop-api" % s"$openTelemetryVersion-alpha"
)
val zioConfigDeps = Seq(
"dev.zio" %% "zio-config" % zioConfigVersion,
"dev.zio" %% "zio-config-magnolia" % zioConfigVersion,
"dev.zio" %% "zio-config-typesafe" % zioConfigVersion
)
val zioMagicDeps = Seq(
"io.github.kitlangton" %% "zio-magic" % zioMagicVersion
)
val zioTelemetryDeps = Seq(
"dev.zio" %% "zio-opentelemetry" % zioTelemetryVersion,
"com.softwaremill.sttp.client3" %% "zio-telemetry-opentelemetry-backend" % sttpVersion
)
Add a config layer
tracing {
enable = false
enable = ${?TRACING_ENABLE}
endpoint = "http://127.0.0.1:14250"
endpoint = ${?JAEGER_ENDPOINT}
}
case class AppConfig(tracing: TracingConfig)
case class TracingConfig(enable: Boolean, endpoint: String)
object AppConfig {
private val configDescriptor = descriptor[AppConfig]
val live: Layer[ReadError[String], Has[AppConfig]] = TypesafeConfig.fromDefaultLoader(configDescriptor)
}
Add a Tracer
layer
- Depend on the configuration, we either create a noop
Tracer
or one that sends data to Jaeger. - Once we have it, we can construct a
Tracing
layer, which give us access to many useful operations in zio-telemetry.
object ZTracer {
private val InstrumentationName = "com.github.tuleism"
private def managed(serviceName: String, endpoint: String) = {
val resource = Resource.builder().put(ResourceAttributes.SERVICE_NAME, serviceName).build()
for {
spanExporter <- ZManaged.fromAutoCloseable(
Task(JaegerGrpcSpanExporter.builder().setEndpoint(endpoint).build())
)
spanProcessor <- ZManaged.fromAutoCloseable(UIO(SimpleSpanProcessor.create(spanExporter)))
tracerProvider <- UIO(
SdkTracerProvider.builder().addSpanProcessor(spanProcessor).setResource(resource).build()
).toManaged_
openTelemetry <- UIO(OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build()).toManaged_
tracer <- UIO(openTelemetry.getTracer(InstrumentationName)).toManaged_
} yield tracer
}
def live(serviceName: String): RLayer[Has[TracingConfig], Has[Tracer]] =
(
for {
config <- ZIO.service[TracingConfig].toManaged_
tracer <- if (!config.enable) {
Task(NoopOpenTelemetry.getInstance().getTracer(InstrumentationName)).toManaged_
} else {
managed(serviceName, config.endpoint)
}
} yield tracer
).toLayer
}
New server
Instrument the HTTP client
- Use out-of-the-box sttp backend.
- We also add additional HTTP specific attributes according to the OpenTelemetry's semantic convention.
object SttpTracing {
private val wrapper = new ZioTelemetryOpenTelemetryTracer {
def before[T](request: Request[T, Nothing]): RIO[Tracing, Unit] =
Tracing.setAttribute(SemanticAttributes.HTTP_METHOD.getKey, request.method.method) *>
Tracing.setAttribute(SemanticAttributes.HTTP_URL.getKey, request.uri.toString()) *>
ZIO.unit
def after[T](response: Response[T]): RIO[Tracing, Unit] =
Tracing.setAttribute(SemanticAttributes.HTTP_STATUS_CODE.getKey, response.code.code) *>
ZIO.unit
}
val live = AsyncHttpClientZioBackend.layer().flatMap { hasBackend =>
ZIO
.service[Tracing.Service]
.map { tracing =>
ZioTelemetryOpenTelemetryBackend(hasBackend.get, tracing, wrapper)
}
.toLayer
}
}
Instrument the gRPC server
We can add Tracing
without changing our server implementation with a ZTransform. For each request:
- We use zio-telemetry's spanFrom, which extracts the propagated context (through gRPC Metadata, using W3C Trace Context format) and starts a new child
Span
right after. - We have access to a RequestContext and thus the full method name used for
Span
's name. - We also add additional gRPC specific attributes according to the OpenTelemetry's semantic convention.
object GrpcTracing {
private val propagator: TextMapPropagator = W3CTraceContextPropagator.getInstance()
private val metadataGetter: TextMapGetter[Metadata] = new TextMapGetter[Metadata] {
override def keys(carrier: Metadata): java.lang.Iterable[String] =
carrier.keys()
override def get(carrier: Metadata, key: String): String =
carrier.get(
Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)
)
}
private def withSemanticAttributes[R, A](effect: ZIO[R, Status, A]): ZIO[Tracing with R, Status, A] =
Tracing.setAttribute(SemanticAttributes.RPC_SYSTEM.getKey, "grpc") *>
effect
.tapBoth(
status =>
Tracing.setAttribute(
SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey,
status.getCode.value()
),
_ =>
Tracing.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey, Status.OK.getCode.value())
)
def serverTracingTransform[R]: ZTransform[R, Status, R with Tracing with Has[RequestContext]] =
new ZTransform[R, Status, R with Tracing with Has[RequestContext]] {
def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing with Has[RequestContext], Status, A] =
for {
rc <- ZIO.service[RequestContext]
metadata <- rc.metadata.wrap(identity)
result <- withSemanticAttributes(io)
.spanFrom(
propagator,
metadata,
metadataGetter,
rc.methodDescriptor.getFullMethodName,
SpanKind.SERVER,
{ case _ => StatusCode.ERROR }
)
} yield result
def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing with Has[RequestContext], Status, A] =
???
}
}
Update Server Main
- Add required layers for Tracing.
- Transform the original
ZGreeterImpl
.
import zio.magic._
object ZServer extends ServerMain {
private val requirements =
ZLayer
.wire[ZEnv with ZGreeterEnv](
ZEnv.live,
AppConfig.live.narrow(_.tracing),
ZTracer.live("hello-server"),
Tracing.live,
SttpTracing.live
)
.orDie
def services: ServiceList[Any] =
ServiceList
.add(ZGreeterImpl.transform[ZGreeterEnv, Has[RequestContext]](GrpcTracing.serverTracingTransform))
.provideLayer(requirements)
}
New client
Inject current context into gRPC Metadata for context propagation
object GrpcTracing {
...
private val metadataSetter: TextMapSetter[Metadata] = (carrier, key, value) =>
carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)
val contextPropagationClientInterceptor: ZClientInterceptor[Tracing] = ZClientInterceptor.headersUpdater {
(_, _, metadata) =>
metadata.wrapM(Tracing.inject(propagator, _, metadataSetter))
}
...
}
object ZClient extends zio.App {
private val clientLayer = GreeterClient.live(
ZManagedChannel(
ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext(),
Seq(GrpcTracing.contextPropagationClientInterceptor)
)
)
...
}
Start a Span
for each request
- Use
ZTransform
to record the relevant gRPC attributes.
object GrpcTracing {
...
def clientTracingTransform[R]: ZTransform[R, Status, R with Tracing] =
new ZTransform[R, Status, R with Tracing] {
def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing, Status, A] = withSemanticAttributes(io)
def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing, Status, A] = ???
}
}
- Unlike the server, we don't have access to a
RequestContext
object, so we have to set the method name manually. - We also start additional
Span
s.
object ZClient extends zio.App {
...
private def errorToStatusCode[E]: PartialFunction[E, StatusCode] = { case _ => StatusCode.ERROR }
private def sayHello(request: HelloRequest) =
GreeterClient
.sayHello(request)
.span(
GreeterGrpc.METHOD_SAY_HELLO.getFullMethodName,
SpanKind.CLIENT,
errorToStatusCode
)
private val singleHello = sayHello(HelloRequest("World"))
.span("singleHello", toErrorStatus = errorToStatusCode)
private val multipleHellos = ZIO
.collectAllParN(5)(
List(
sayHello(HelloRequest("1", Some(1))),
sayHello(HelloRequest("2", Some(2))),
sayHello(HelloRequest("3", Some(3))),
sayHello(HelloRequest("4", Some(4))),
sayHello(HelloRequest("5", Some(5)))
)
)
.span("multipleHellos", toErrorStatus = errorToStatusCode)
private val invalidHello = sayHello(HelloRequest("Invalid", Some(-1))).ignore
.span("invalidHello", toErrorStatus = errorToStatusCode)
}
Add required layers
object ZClient extends zio.App {
...
private val requirements = ZLayer
.wire[ZEnv with Tracing](
ZEnv.live,
AppConfig.live.narrow(_.tracing),
ZTracer.live("hello-client"),
Tracing.live
) >+> clientLayer
def run(args: List[String]): URIO[ZEnv, ExitCode] =
myAppLogic.provideCustomLayer(requirements).exitCode
}
Showtime
Run Jaeger through Docker
- Tracing data can be sent to port 14250.
- We can view Jaeger UI at http://localhost:16686.
$ docker run --rm --name jaeger \
-p 16686:16686 \
-p 14250:14250 \
jaegertracing/all-in-one:1.25
Start the server
$ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZServer"
[info] running (fork) com.github.tuleism.ZServer
[info] Server is running. Press Ctrl-C to stop.
Start the client
$ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZClient"
[info] running (fork) com.github.tuleism.ZClient
[info] Done
[success] Total time: 12 s
Distributed Tracing in action!
- Now we can see the details for
multipleHellos
:
- And which
guess
is causing the longest delay.
Integration with Logging
- Let's add tracing context into log messages following the specification.
- We're going to use izumi logstage, our favorite logging library.
- See the diffs.
Add logging dependency
val izumiVersion = "1.0.8"
val loggingDeps = Seq(
"io.7mind.izumi" %% "logstage-core" % izumiVersion,
"io.7mind.izumi" %% "logstage-adapter-slf4j" % izumiVersion
)
Setup logging
- Add
trace_id
,span_id
to logging context if current trace context is valid.
object Logging {
private def baseLogger = IzLogger()
val live: ZLayer[Has[Tracing.Service], Nothing, Has[LogZIO.Service]] =
(
for {
tracing <- ZIO.service[Tracing.Service]
} yield LogZIO.withDynamicContext(baseLogger)(
Tracing.getCurrentSpanContext
.map(spanContext =>
if (spanContext.isValid)
CustomContext(
"trace_id" -> spanContext.getTraceId,
"span_id" -> spanContext.getSpanId,
"trace_flags" -> spanContext.getTraceFlags.asHex()
)
else
CustomContext.empty
)
.provide(Has(tracing))
)
).toLayer
}
Add a few log messages
- E.g for
singleHello
.
object ZClient extends zio.App {
...
private val singleHello = (
for {
_ <- log.info("singleHello")
_ <- sayHello(HelloRequest("World"))
} yield ()
).span("singleHello", toErrorStatus = errorToStatusCode)
}
Sample Logs
[info] running (fork) com.github.tuleism.ZClient
[info] I 2021-11-01T22:59:10.881 (ZClient.scala:37) …tuleism.ZClient.singleHello [24:zio-default-async-11] trace_id=9c8a7ebb87381293bc8937a5f7673cb9, span_id=cb7c9a440472e1be, trace_flags=01 singleHello
[info] I 2021-11-01T22:59:14.064 (ZClient.scala:44) …eism.ZClient.multipleHellos [21:zio-default-async-8 ] trace_id=fe405246fbaa5f876c19f14fa649a99f, span_id=bef19494bef4106e, trace_flags=01 multipleHellos
[info] I 2021-11-01T22:59:18.171 (ZClient.scala:60) …uleism.ZClient.invalidHello [26:zio-default-async-13] trace_id=be5ccd425e0cfb01fd97274abd0c4d72, span_id=ea6499fb9a7c8d28, trace_flags=01 invalidHello
[info] I 2021-11-01T22:59:18.272 (ZClient.scala:66) ….tuleism.ZClient.myAppLogic [15:zio-default-async-2 ] Done
[success] Total time: 12 s
Extra notes
- If we receive an HTTP 5xx response, we should set the
Span
status to error according to the semantic convention. However, it is currently not possible withzio-telemetry
. - We need a better way to implement tracing for
zio-grpc
client.
Posted on November 1, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.