Associate with parent Cloud Workflows logs and child APIs logs using structured logs

koshilife

Kenji Koshikawa

Posted on March 4, 2022

Associate with parent Cloud Workflows logs and child APIs logs using structured logs

Lately, I build a system using Cloud Workflows which can combine Google Cloud Services such as Cloud Functions and Cloud Run. Sometimes, I was in a situation where I want to examine more efficiently using logs on Cloud Logging when debugging or daily monitoring.

In a workflow, You can easily save logs to Cloud Logging using sys.log. Also, you can see API logs on Cloud Logging which be called by the inside of Workflow. But these logs don't associate with each other, so it is hard to trace across parent workflow logs and child API logs from top to bottom orderly. It was tough when I met in high traffic cases. I associated manually using each log's timestamp as a clue.

In this article, I introduce to associate parent workflow logs and child API logs using a workflow execution id and structured logs.

An example app

I created a simple workflow which calls Cloud Functions and Cloud Run each one time. These APIs used Flask.

All source code which I wrote about in this article has been uploaded below for your reference.
https://github.com/koshilife/cloud-workflows-logging-example

Cloud Workflows

The following code is the workflow.

workflows/v1.yml

main:
    params: [input]
    steps:
    - set_trace_id:
        assign:
            - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - api_url_function: "https://asia-northeast1-xxx.cloudfunctions.net/logging-example-v1" # TODO set your environment
            - api_url_cloudrun: "https://logging-example-v1-xxx-an.a.run.app" # TODO set your environment
    - call_function:
        call: http.post
        args:
            url: ${api_url_function}
            body:
                workflow_execution_id: ${execution_id}
            auth:
                type: OIDC
        result: result_function
    - call_cloudrun:
        call: http.post
        args:
            url: ${api_url_cloudrun}
            body:
                workflow_execution_id: ${execution_id}
            auth:
                type: OIDC
        result: result_cloudrun
    - set_result:
        assign:
            - result_map:
                message: ${execution_id}
                function_execution_id: ${result_function.headers["Function-Execution-Id"]}
                function_trace_id: ${text.split(result_function.headers["X-Cloud-Trace-Context"], ";o=")[0]}
                cloudrun_trace_id: ${text.split(result_cloudrun.headers["X-Cloud-Trace-Context"], ";o=")[0]}
    - log_result:
        call: sys.log
        args:
            json: ${result_map}
    - return_output:
        return: ${result_map}
Enter fullscreen mode Exit fullscreen mode

${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} gets a workflow execution ID and then the ID is set into each API request params.

In addition, I included each API Trace ID and Functions execution ID that hope to be a clue for examining.

Cloud Functions

The following code is the Cloud Functions.

functions/v1/main.py

import json

from flask import jsonify, request

GCP_PROJECT = os.environ.get('GCP_PROJECT')

def main(request):
    log('Cloud Functions msg1')
    log('Cloud Functions msg1', 'WARNING')
    return jsonify(dict(result='ok'))


def log(message, severity='INFO'):
    log_fields = {}

    # refs: https://cloud.google.com/functions/docs/monitoring/logging#writing_structured_logs
    if request and request.headers.get("X-Cloud-Trace-Context") and GCP_PROJECT:
        trace = request.headers["X-Cloud-Trace-Context"].split("/")
        log_fields["logging.googleapis.com/trace"] = f"projects/{GCP_PROJECT}/traces/{trace[0]}"

    request_json = request.get_json()
    if request_json and request_json.get("workflow_execution_id"):
        log_fields["logging.googleapis.com/labels"] = {'workflows.googleapis.com/execution_id': request_json["workflow_execution_id"]}

    payload = dict(message=message, severity=severity, **log_fields)
    print(json.dumps(payload))

Enter fullscreen mode Exit fullscreen mode

The key point is using structured logs.
I set a field logging.googleapis.com/labels in each log entry to the workflow execution ID from the parent workflow.

refs: Special fields in structured payloads

Also, a reason why to specify workflows.googleapis.com/execution_id as a key of the labels field is that workflow execution logs use the same key.

Cloud Run

The following code is the Cloud Run.

cloudruns/v1/main.py

import os
import json

from flask import Flask, jsonify, request

GCP_PROJECT = os.environ.get('GCP_PROJECT')

app = Flask(__name__)


@app.route("/", methods=["POST"])
def main():
    log('Cloud Run msg1')
    log('Cloud Run msg1', 'WARNING')
    return jsonify(dict(result='ok'))


def log(message, severity='INFO'):
    log_fields = {}

    # refs: https://cloud.google.com/functions/docs/monitoring/logging#writing_structured_logs
    if request and request.headers.get("X-Cloud-Trace-Context") and GCP_PROJECT:
        trace = request.headers["X-Cloud-Trace-Context"].split("/")
        log_fields["logging.googleapis.com/trace"] = f"projects/{GCP_PROJECT}/traces/{trace[0]}"

    request_json = request.get_json()
    if request_json and request_json.get("workflow_execution_id"):
        log_fields["logging.googleapis.com/labels"] = {'workflows.googleapis.com/execution_id': request_json["workflow_execution_id"]}

    payload = dict(message=message, severity=severity, **log_fields)
    print(json.dumps(payload))


if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
Enter fullscreen mode Exit fullscreen mode

The point is the same as mentioned above at Cloud Functions.
I used structured logs and put the parent workflow execution ID.

Result

After executing the workflow, I could retrieve across the parent workflow logs and child APIs logs by the following query on Cloud Logging Console.

labels."workflows.googleapis.com/execution_id"="<Workflow Execution ID>" OR
labels.execution_id="<Workflow Execution ID>"
Enter fullscreen mode Exit fullscreen mode

Image description

I tried an another example app which is more logs than above. (refs: The app name is v2. The source code is here)

I could see that all logs as below.

Image description

(Addition)

I wanted to simplify the query on Cloud Logging like one-liner labels."workflows.googleapis.com/execution_id"="<Workflow Execution ID>", so I tried to change sys.log params to use structured logs in the workflow. But It didn't work. I guess Cloud Workflows may not support structured logs yet.

Following code is sub-workflow which tries to use structured logs. (all source code is here)

log:
    params: [message]
    steps:
    - set_log_data:
        assign:
            - execution_id_map: {}
            - execution_id_map["workflows.googleapis.com/execution_id"]: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
            - data: {}
            - data["message"]: ${message}
            - data["logging.googleapis.com/labels"]: ${execution_id_map}
    - call_sys_log:
        call: sys.log
        args:
            json: ${data}
Enter fullscreen mode Exit fullscreen mode

LogEntry
Image description

💖 💪 🙅 🚩
koshilife
Kenji Koshikawa

Posted on March 4, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related