Prometheus: How is the Remote Write API code Path different from the Scrape Code Path

mikkergimenez

mikkergimenez

Posted on January 17, 2023

Prometheus: How is the Remote Write API code Path different from the Scrape Code Path

In my last post, I wanted to dig into the remote write code to learn more about how it worked and see if you could use it to simulate a push based agent. This is all just experimental to learn how Prometheus works. I think you can, and I'm working on testing it out, but before I get there, I wanted to explore the other side: that is the prometheus remote-write receiver.

I came up with a few questions I want to answer while exploring the remote write API:

  1. Where does the code path between scraping a prometheus endpoint and remote write differ?
  2. In what format does Prometheus write data to disk?
  3. What exactly in the code makes Prometheus pull based over push based. Can I identify the specific code blocks where algorithms are implemented that will optimize for one over the other?

Web API write endpoint

main/web/api/v1/api.go

// Line 364
r.Post("/write", api.ready(api.remoteWrite))
Enter fullscreen mode Exit fullscreen mode

The remote write handler (api.remoteWrite) is created by the following code block.

storage/remote/write_handler.go


// NewWriteHandler creates a http.Handler that accepts remote write requests and
// writes them to the provided appendable.
func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
    return &writeHandler{
        logger: logger,
        appendable: appendable
    }
}
Enter fullscreen mode Exit fullscreen mode

I guess I'm surprised how quickly we get from remote write endpoint to the appender, it seems like there's no real special buffering on remote write to make it different from the scraper? In the previous article, we traced remote write back to an appender, so write is writing to an appender which may then re-write to another upstream pretty directly. I don't know why Prometheus writing would be more complicated than this, but as an SRE who doesn't write low level code, I'm curious to go back to the question "What makes us say that Prometheus isn't push based" Seems like you could write a push agent pretty easily.

I traced this back quite a bit, but this appendable is injected from the prometheus command code here (cfg.web.storage is passed) fanout has primary and secondary storages. The local storage configured below is the 'primary' storage and remote storage is the 'secondary' storage. Ready Storage is a struct.

var (
    localStorage = &readyStorage{stats: tsdb.NewDBStats()}
    scraper = &readyScrapeManager{}

    remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)

    fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)

)


// Line 1287
type readyStorage struct {
    mtx sync.RWMutex
    db storage.Storage
    startTimeMargin int64
    stats *tsdb.DBStats
}
Enter fullscreen mode Exit fullscreen mode

https://github.com/prometheus/prometheus/blob/main/cmd/prometheus/main.go#L566

storage/interface.go


type Appendable interface {
    // Appender returns a new appender for the storage. The implementation
    // can choose whether or not to use the context, for deadlines or to check
    // for errors.
    Appender(ctx context.Context) Appender
}
Enter fullscreen mode Exit fullscreen mode

https://github.com/prometheus/prometheus/blob/main/storage/remote/write_handler.go#L86

https://github.com/prometheus/prometheus/blob/136956cca40b7bf29dc303ad497d01c30534e373/storage/interface.go#L219

type Appender interface {

    // Append adds a sample pair for the given series.
    // An optional series reference can be provided to accelerate calls.
    // A series reference number is returned which can be used to add further
    // samples to the given series in the same or later transactions.
    // Returned reference numbers are ephemeral and may be rejected in calls
    // to Append() at any point. Adding the sample via Append() returns a new
    // reference number.

    // If the reference is 0 it must not be used for caching.

    Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error)
    // Commit submits the collected samples and purges the batch.  If Commit
    // returns a non-nil error, it also rolls back all modifications made in
    // the appender so far, as Rollback would do. In any case, an Appender
    // must not be used anymore after Commit has been called.

    Commit() error

    // Rollback rolls back all modifications made in the appender so far.
    // Appender has to be discarded after rollback.

    Rollback() error

    ExemplarAppender

    HistogramAppender

    MetadataUpdater

}
Enter fullscreen mode Exit fullscreen mode

Scrape Code Path:

Prometheus works by scraping endpoints periodically according to a scrape interval. This process is a loop, defined very clearly by the process entitied "mainLoop":

scrape/scrape.go

mainLoop:
    for {
        ...
    }
Enter fullscreen mode Exit fullscreen mode

Alot of these are really long functions so I'm going to pull out the key pieces.

Calls "Scrape and Report":
https://github.com/prometheus/prometheus/blob/6fc5305ce904a75f56ca762281c7a1b052f19092/scrape/scrape.go#L1264

last = sl.scrapeAndReport(last, scrapeTime, errc)

Calls "Scrape":
https://github.com/prometheus/prometheus/blob/6fc5305ce904a75f56ca762281c7a1b052f19092/scrape/scrape.go#L1340

scrapeCtx, cancel := context.WithTimeout(sl.parentCtx, sl.timeout)
contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf)

#... Line 1364
total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
Enter fullscreen mode Exit fullscreen mode

This function is what actually makes the HTTP request to Scrape a Destination:
https://github.com/prometheus/prometheus/blob/6fc5305ce904a75f56ca762281c7a1b052f19092/scrape/scrape.go#L792

func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
    if s.req == nil {
        req, err := http.NewRequest("GET", s.URL().String(), nil)
        if err != nil {
            return "", err
        }
        req.Header.Add("Accept", s.acceptHeader)
        req.Header.Add("Accept-Encoding", "gzip")
        req.Header.Set("User-Agent", UserAgent)
        req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(s.timeout.Seconds(), 'f', -1, 64))

        s.req = req
    }

    resp, err := s.client.Do(s.req.WithContext(ctx))
    if err != nil {
        return "", err
    }
    defer func() {
        io.Copy(io.Discard, resp.Body)
        resp.Body.Close()
    }()

    if resp.StatusCode != http.StatusOK {
        return "", errors.Errorf("server returned HTTP status %s", resp.Status)
    }

    if s.bodySizeLimit <= 0 {
        s.bodySizeLimit = math.MaxInt64
    }
    if resp.Header.Get("Content-Encoding") != "gzip" {
        n, err := io.Copy(w, io.LimitReader(resp.Body, s.bodySizeLimit))
        if err != nil {
            return "", err
        }
        if n >= s.bodySizeLimit {
            targetScrapeExceededBodySizeLimit.Inc()
            return "", errBodySizeLimit
        }
        return resp.Header.Get("Content-Type"), nil
    }

    if s.gzipr == nil {
        s.buf = bufio.NewReader(resp.Body)
        s.gzipr, err = gzip.NewReader(s.buf)
        if err != nil {
            return "", err
        }
    } else {
        s.buf.Reset(resp.Body)
        if err = s.gzipr.Reset(s.buf); err != nil {
            return "", err
        }
    }

    n, err := io.Copy(w, io.LimitReader(s.gzipr, s.bodySizeLimit))
    s.gzipr.Close()
    if err != nil {
        return "", err
    }
    if n >= s.bodySizeLimit {
        targetScrapeExceededBodySizeLimit.Inc()
        return "", errBodySizeLimit
    }
    return resp.Header.Get("Content-Type"), nil
}
Enter fullscreen mode Exit fullscreen mode

Sets s.buf and s.gzipr:
https://github.com/prometheus/prometheus/blob/6fc5305ce904a75f56ca762281c7a1b052f19092/scrape/scrape.go#L835

Loop through and switch based on datatype:
https://github.com/prometheus/prometheus/blob/6fc5305ce904a75f56ca762281c7a1b052f19092/scrape/scrape.go#L1535

### For example, this switches on whether or not to append a histogram or just append:
if isHistogram {
       if h != nil {
              ref, err = app.AppendHistogram(ref, lset, t, h)
       }
} else {
          ref, err = app.Append(ref, lset, t, val)
}

Enter fullscreen mode Exit fullscreen mode

This is where it gets back to the same 'Append' function as before, there are a few different types of Appenders, the one mentioned above seems to be a base struct. The appender is assigned by the following function in scrape.go.

// appender returns an appender for ingested samples from the target.
func appender(app storage.Appender, limit int) storage.Appender {
    app = &timeLimitAppender{
        Appender: app,
        maxTime:  timestamp.FromTime(time.Now().Add(maxAheadTime)),
    }

    // The limit is applied after metrics are potentially dropped via relabeling.
    if limit > 0 {
        app = &limitAppender{
            Appender: app,
            limit:    limit,
        }
    }
    return app
}
Enter fullscreen mode Exit fullscreen mode

So, where does it go from here?

The /tsdb folder contains the code that we use to write Prometheus data to disk. Specifically blockwriter.go has a method called flush

tsdb/blockwriter.go


// Flush implements the Writer interface. This is where actual block writing
// happens. After flush completes, no writes can be done.

func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {

    mint := w.head.MinTime()

    // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).

    // Because of this block intervals are always +1 than the total samples it includes.

    maxt := w.head.MaxTime() + 1

    level.Info(w.logger).Log("msg", "flushing", "series_count", w.head.NumSeries(), "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt))

    compactor, err := NewLeveledCompactor(ctx, nil, w.logger []int64{w.blockSize}, chunkenc.NewPool(), nil)

    if err != nil {
        return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
    }

    id, err := compactor.Write(w.destinationDir, w.head, mint, maxt, nil)

    if err != nil {
        return ulid.ULID{}, errors.Wrap(err, "compactor write")
    }

    return id, nil
}
Enter fullscreen mode Exit fullscreen mode

This process takes us to the 'LeveledCompactor` which has a write method:

tsdb/compact.go

`

I think I'm close now but oddly can't find the specific method that writes the data files. This block here writes the metadata file, the tombstones and I assume the samples as well:

`go

The above block is very long, but for example, ehre is where it writes metadata and tombesones

if _, err = writeMetaFile(c.logger, tmp, meta); err != nil {
    return errors.Wrap(err, "write merged meta")
}

// Create an empty tombstones file.
if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil {
    return errors.Wrap(err, "write new tombstones file")
}
Enter fullscreen mode Exit fullscreen mode

`

And that's about it for our overview of the different scrape types. Let's look back at the questions we had before exploring this API:

Where does the code path between scraping a prometheus endpoint and remote write differ?
**
**In what format does Prometheus write data to disk?

In the next blog post, I'll show my demo of a push-based agent, and summarize my findings from all the posts.

💖 💪 🙅 🚩
mikkergimenez
mikkergimenez

Posted on January 17, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related