A Prometheus fork for cloud scale anomaly detection across metrics and logs

Introduction

At Zebrium, we provide an Autonomous Monitoring service that automatically detects anomalies within logs and metrics. We started by correlating anomalies across log streams to automatically raise incidents that require our user's attention. Now, we have taken the step to augment our incident detection by detecting anomalies within a group of related metrics and correlate those with anomalies found in logs.

Since Prometheus is very popular in Kubernetes environments, we wanted to support discovering and scraping Prometheus targets and send those scraped metrics to our software running in the cloud for anomaly detection. Latency is important to us as we need to receive the metrics in near real time as they get scraped. We also need to preserve labels, types, and full fidelity of time stamps for anomaly detection and log correlation purposes. And, we need to do all this while being as efficient as possible in sending the metrics over the wire as data will be going from a user's Kubernetes cluster to our software which is running in the cloud.

In this blog, I will talk about our journey with using standard Prometheus server for our requirements and explain why we have built and open sourced a forked instance of the Prometheus server/scraper.

In summary, we are able to achieve near real time updates, preserve valuable information such as labels and types, handle out of order samples, and achieve over 500x bandwidth reduction (which translates to under 0.7 bytes per sample vs 391 bytes per sample in the raw form).

Metrics requirements for cloud scale anomaly detection

To achieve the goal of making our autonomous incident detection more accurate by detecting anomalies within a group of related metrics, and correlating those with anomalies found in logs, we came up with the following incremental requirements for Prometheus:

  • RQMT-1 : Ability to send and store time series data efficiently and in near real-time by consuming minimal bandwidth from the client-side to our SaaS back-end time series database.

  • RQMT-2 : Ability to type each metric (e.g. counters, gauge, histogram. etc.) to properly inform our anomaly detection.

  • RQMT-3: Ability to tolerate out-of-order times stamps of each consecutive sample so we don't omit inserting time series samples into our TSDB that may be critical to anomaly detection.

  • RQMT-4: Ability to tolerate differences in times stamps for any given sample so we don't omit inserting time series samples into our TSDB that may be critical to anomaly detection.

  • RQMT-5: Ability to automatically group similar metrics to help improve anomaly detection. For example, if there are different time series for cpu-user-time and cpu-interrupt-time, we want to automatically group by CPU without explicit direction from the end-user.

  • RQMT-6: Ability to correlate metrics with logs that are collected outside of our metrics collector (this is the Kubernetes log collector we use). To achieve this we need to include identical labels that allow us to match metrics and log events that come from the same container/source. 

Why We Chose Prometheus

Prometheus provides a very elegant deployment model for Kubernetes environments with out-of-the-box metrics collection requiring virtually no effort. It's popularity, coupled with a wide range of available exporters made it the obvious choice.

A Little Bit About Prometheus...

Prometheus is perhaps the most popular an open-source systems monitoring and alerting toolkit adopted by many organizations and with a very active developer and user community. 

The Prometheus architecture has two main components of interest for our solution:

  1. Metrics targets: These are the end points that export metrics over HTTP. These are stateless end points export all the metrics as a blob in plain text format that is defined by Prometheus. There is a large number of available exporters covering Databases, Hardware, Storage, APIs, Messaging Systems, HTTP, miscellaneous software products. Prometheus supports four core types of metrics:

    1. Monotonically increasing counters

    2. Gauges with a value that can arbitrarily go down or up

    3. Histograms exposes as buckets of observations

    4. Summary which exposes streaming quantiles

  2. Prometheus server: This discovers all the Prometheus targets and periodically scrapes metrics from them. In each scrape, it collects all the metrics from each target for a particular time stamp and saves them to its local time series database (TSDB). One can query this time series database and graph the metrics through the built-in graphing interface or through Grafana.

Compelling Attributes of Prometheus - Why we chose it!

  1. Flexible metadata layout for the metrics: Each metric can have a set of key/value labels where keys and values are like metadata for that metric that represents various dimensions. Keys and values can have free text and Prometheus server does not impose any hierarchy among those keys or relations among them. It's really very flexible.
  2. Amazing discovery plugins : Prometheus has built-in support for auto discovering targets in quite a few infrastructures like Kubernetes, AWS, Azure, GCP, OpenStack, etc.
  3. Very easy to write metric exporters  as they are stateless and simple HTTP end points. There are various language specific libraries that makes it easy for anyone to export any application level metrics as a Prometheus target with very little effort. And that's why there is a plethora of available exporters already at your disposal.
  4. Kubernetes  exports its cluster wide metrics in Prometheus native format.
  5. Very easy to install  Prometheus server in Kubernetes with Prometheus Operator (creates, configures, and manages Prometheus monitoring instances). We love this!! 

Prometheus Server and The Remote Storage Adapter Interface

Prometheus server provides a remote write feature to transparently send samples to a remote end-point. One could write a remote-storage-adapter for Prometheus to continually ship all the metrics from its local time series database (TSDB) storage to remote storage or another time series database. This is shown in the picture below:

This mechanism has been used by various cloud monitoring services to ship metrics from a user's Prometheus server to the cloud. In the Prometheus world, each Prometheus exporter exports a set of metrics. Each metric's value on a time scale is one time series, i.e a time series is a series of values for a given metric on the time axis. Each metric has a metric name, and the associated metadata. This metadata contains a set of labels and values.  To the remote storage, metrics are written one time-series at a time.

We investigated using this remote write feature and discovered the following:

  1. Each time series that is shipped to the remote side includes the metric name and its labels and values followed by the set of sampled values and their time stamps. In order to optimize remote write, Prometheus batches a set of samples for a given metric and sends them to the remote storage after compressing it. It actually gets the samples from the local storage buffer that is ready to be written to disk. What this means is, the samples that are received at the remote side are considered "soft" real time. This interface is designed for shipping metrics to the remote side to overcome the local storage limitations so that metrics can be retained longer. Using this interface for real-time metrics shipping is somewhat limiting and may not meet our near real-time requirement ( RQMT-1 ).
  2. Once the metrics are in the local Prometheus storage, it does not preserve the type of metric (gauge, counter, etc.). Prometheus assumes the metric type is known to the user by the metric name that they are querying. This means, the remote storage interface does not send the metric type information to the remote side. We require the metric type information for our autonomous anomaly detection ( RQMT-2 ).
  3. There could be cases where out-of-order time stamped samples are dropped and won't be shipped to the remote write storage. While we believe this is likely to be rare in the Prometheus architecture, it's not clear that this would be handled ( RQMT-3
  4. Prometheus buffers samples in memory windows of time ranges. If a sample is collected that does not fit in the windowed bucket (say due to time skew) then that sample may be dropped ( RQMT-4 ).
  5. Getting all of the above requires sending more data and forgoing some compression opportunities (by not batching data as much). However, we wanted to meet these requirements in the most bandwidth efficient way possible (to minimize WAN usage). ( RQMT-1 ).

Our Solution

We came up with a few options to address the incremental requirements we needed in the standard Prometheus Remote Storage Adapter

  1. Write our own metrics collector starting from scratch: While this would gives us a lot of flexibility, users would still want our metrics collector to scrape their existing Prometheus targets. This means, we would end up unnecessarily replicating a lot of interfaces and code similar to Prometheus server.
  2. Take the Prometheus server and remove the unneeded modules and add new modules to support our additional requirements. We went with this approach.

What changes did we make to the Prometheus server?

  1. We have kept the scrapper, service discovery and config parser modules and removed all other modules (yes, no local storage!).
  2. We have added three new modules called zpacker , zcacher and zqmgr modules that do all the magic of sending metrics efficiently over the network in real time without losing any information.

This is our customized Prometheus server:


This is a summary of how it works:

  1. As we scrape a target, we want to send that data immediately to the remote side.
  2. The scraped data from a target is like a blob of data that has a set of various metrics and their values with a particular time stamp along with the labels and values.
  3. We want to send this blob as efficiently as possible over the wire. This blob contains a set of samples but only one sample of each metric from the scraped target. The number of metrics can change from one scrape to the next scrape as it is up to the exporter on what it provides in each scrape.
  4. We want to batch as many blobs as possible into a single request to send over the wire without adding too much latency. This reduces the number of requests that we need to handle on our remote server side.
  5. On the remote server side, from the blob(s), we will reconstruct the data as if we scraped the target locally.
  6. Every sample that is scraped reaches our remote server with no drops due to out-of-order samples.

Encoding the blob over the wire

Each scraped blob contains a set of metrics. Each metric contains:

  1. help string
  2. type information ( addresses RQMT-2 )
  3. metric name
  4. metric label/value pairs
  5. one time stamp ( addresses RQMT-3, RQMT-4 )
  6. one value of that sample
  7. Zebrium specific labels which allow us to cross correlate the logs and metrics coming from the same container or same target ( addresses RQMT-6 )

The basic idea is, if we have to send the blob as is, even after compression, it still represents a lot of data going over the wire. But if we do the diff of the blob with the blob that we scraped last time from this target, that diff will be very small. We call this diff an incremental blob . This incremental blob is computed as follows:

  1. Metric's sample value is diff'd from its last sample value. This difference will be very small or zero in most cases and can be encoded efficiently with variable byte encoding.
  2. If the diff value is zero, when the sample value does not change, we skip sending this metric completely in our incremental blob.
  3. We also do the diff of the time stamp and do variable byte encoding.
  4. If the time stamp is the same across all the metrics of a blob, we detect that and send the time stamp once in the incremental blob instead of per sample.
  5. Incremental blob does NOT contain the metric name.
  6. Incremental blob does NOT contain the metric's help string.
  7. Incremental blob does NOT contain any of the labels or any of the values.

One can see that the incremental blob contains the data of the samples that changed from the last time we scraped and on top of that we do all the above mentioned optimizations to reduce its size.

Sending one request to the remote server for each blob is quite expensive in terms of the number of HTTP requests. So, we coalesce blobs, either incremental or full blobs, across all the targets and send them in one request. If the scraping interval is too small, we also coalesce blobs from the same target in one request. Each blob, either incremental or full, is also compressed before sending on the wire.

On the remote server side, given the incremental blob, we should be able to reconstruct the original full blob.

Reconstructing the full blob on the remote server

So far so good, one might think. Reconstructing the full blob from an incremental blob on the remote server side means you have to maintain the state. So how do we synchronize this state with the client side's Zebrium metrics collector? Well, we keep the state between remote server and metrics collector as independent as possible, something like a NFS file handle approach. The state is divided between metrics collector and remote server as follows:

  1. It is the responsibility of the metrics collector to detect that some schema has changed between the last blob and the current blob. This can happen, for example, when we see a new sample this time which did not exist last time or the scraped target came online for the first time. When the metrics collector detects this, it will NEVER send the incremental blob. Instead it will send the full blob with everything.
  2. Metrics collector always creates a full blob in addition to the incremental blob if there is one. However, the metrics collector does not send the full blob over the wire if the incremental blob is available to start with. 
  3. If the remote server for any reason cannot reconstruct the full blob from the incremental blob, as part of HTTP(S) POST response it will ask the metrics collector to send the full blob. This can happen, if the remote server did not find the last blob in its cache either because the remote server process crashed and came back up or it purged the last blob because of memory pressure.

On the remote server side, we reconstruct the metrics data as if it was scraped from the local target. We then perform a learning function to group the related metrics for ingest into our analytics database. Remember this example: if there are different time series for cpu-user-time and cpu-interrupt-time, we need to automatically group by CPU without explicit direction from the end-user ( addresses RQMT-5 )

Zebrium transfers metrics by using only 0.7 bytes per sample in most cases, including all metadata, and achieves near real-time results in lab testing ( addresses RQMT-1 ).

Code Changes

We have added three modules to the Prometheus server: zpacker , zcacher and  zqmgr:

  1. Scraper module scrapes the targets periodically from each target. Each time it scrapes from one target, it sends that scraped data to zapcker module. 
  2. zpacker (Zebrium packer) module is responsible for buffering and creating two blobs (full, incremental) out of the data that scraper sends.
  3. zcacher (Zebrium cacher) module is responsible for caching the last blob.
  4. zqmgr (Zebrium queue manager) module is responsible for connection management and coalescing of blobs and sending the HTTP requests to the remote server.

Test Results

Here is what we have seen in terms of number of bytes sent over the wire from our own Kubernetes test cluster (as mentioned above, we achieved over 500x bandwidth reduction - ~0.7 bytes per sample vs 391 bytes per sample in the raw form):

2020-03-09T06:54:23.454-07:00 15810 INFO: worker.go:192 worker.dumpStats: Account summary stats for portal11, reqs=7660 fbytes=78304197246 bytes=405821031 cbytes=138765890 compr=564 misses=12 errs=0 full_blobs=115 fsamples=127504 labels=0 incr_blobs=47500 isamples=200095802
  1. reqs=7660 --> Total HTTP requests
  2. fbytes=78,304,197,246 --> Total bytes of data (metrics, label names/values, type information, etc.) if we had to transfer as-is (pre blob-optimization). This works out to 391 bytes/sample.
  3. bytes: 405,821,031 --> Actual data (with blob optimization) to be transferred pre-compression.
  4. cbytes=138,765,890 --> Actual compressed data (with blob optimization) sent over the wire.
  5. compr=564 --> Compression ratio (fbytes/cbytes) 564x
  6. misses=12 --> Remote-side cache misses resulting in requests for full blobs.
  7. full_blobs=115 --> Total full blobs sent
  8. fsamples=127,504 --> Total samples received in full blobs.
  9. incr_blobs=47,500 --> Total incremental blobs sent
  10. isamples=200,095,802 --> Total samples received in incremental blobs.

So to transfer a total of 200,223,306 samples, we only sent 138,765,890 bytes (about 132MB). On average, this works out to 0.69 bytes per sample

Summary

Prometheus server proved to be an outstanding choice to use as the platform for our metrics collection. It provides a very elegant deployment model for Kubernetes environments and out-of-the-box metrics collection requiring virtually no effort. This coupled with a wide range of available exporters, allows our users to take full advantage of the Zebrium Autonomous Monitoring solution .

As outlined about, the stringent requirements (RQMTs 1-6) imposed by our anomaly detection meant it was necessary to address them through custom code. The Zebrium Metrics Collector preserves all meta data and time stamps regardless of ordering, with no loss of data and while achieving bandwidth efficiency of 500:1 compression over the wire. Coupling this with with additional labels inserted at the source, allows us to cross correlate logs and metrics coming from the same container or same target. Our back-end machine learning then groups related metrics for ingest into our analytics database.

From this foundation, we are able to use machine learning to perform autonomous anomaly detection and incident creation across logs and metrics . But more on this part in a future blog...  Stay tuned! Or try it by signing up for a free account

Github Repositories

The code we have built for this has been open sourced under the Apache license and can be found in the following Github repositories:

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章