Skip to content

"last" aggregation in Multiprocess_mode (Gauge) #847

@singerjess

Description

@singerjess

Related to #154.
I'm working with Fast API + Gunicorn, and I have several cases where I need to display a gauge metric, but only the "last" value received. Not the sum, or the maximum value (for example, the CPU usage of another device that reports that metric to my API).

Currently, the Multiprocess_mode doesn't have a way to provide the "last" value. I think that would be a valuable feature to add.

What I did locally, was add another metric with the timestamp of the last metric added by each node, and then with a custom collector select the process with the higher timestamp, and return the metric associated with that process id.
This is a workaround, a cleaner way would be to add a new multiprocess mode to Gauges ("last"), and set the timestamp to the Gauge (currently I see it's null) when that mode is used. Want to check what you think of a potential solution like this before doing a PR.

I include here the code of the "accumulate_metrics" method from my custom collector. (There are a couple of technical details, since I can't add the "last" mode, I use the "liveall" mode and I specify the metric name starts with "last"):

@staticmethod
    def _accumulate_metrics(metrics, accumulate):

        ### Custom code to get last value
        last_pid = -1
        last_timestamp = 0.0
        for index, metric in enumerate(metrics.values()):
            for s in metric.samples:
                name, labels, value, timestamp, exemplar = s
                current_pid = -1
                if name == "process_heartbeat_timestamp":
                    for l in labels:
                        if l[0] == 'pid':
                            current_pid = l[1]
                    if float(value) > float(last_timestamp):
                        last_timestamp = value
                        last_pid = int(current_pid)

        ### End custom code
        for index, metric in enumerate(metrics.values()):
            samples = defaultdict(float)
            buckets = defaultdict(lambda: defaultdict(float))
            samples_setdefault = samples.setdefault
            for s in metric.samples:
                name, labels, value, timestamp, exemplar = s

                if metric.type == 'gauge':
                    without_pid_key = (name, tuple(l for l in labels if l[0] != 'pid'))
                    ### Custom code to get last value
                    if metric._multiprocess_mode == 'liveall' and name.startswith("last"):
                        current_pid = -1
                        for l in labels:
                            if l[0] == 'pid':
                                current_pid = int(l[1])
                        current = samples_setdefault(without_pid_key, value)
                        if current_pid == last_pid:
                            samples[without_pid_key] = value
                    ### End custom code
                    elif metric._multiprocess_mode == 'min':
                        current = samples_setdefault(without_pid_key, value)
                        if value < current:
                            samples[without_pid_key] = value
                    elif metric._multiprocess_mode == 'max':
                        current = samples_setdefault(without_pid_key, value)
                        if value > current:
                            samples[without_pid_key] = value
                    elif metric._multiprocess_mode == 'livesum':
                        samples[without_pid_key] += value
                    else:  # all/liveall
                        samples[(name, labels)] = value


                elif metric.type == 'histogram':
                    # A for loop with early exit is faster than a genexpr
                    # or a listcomp that ends up building unnecessary things
                    for l in labels:
                        if l[0] == 'le':
                            bucket_value = float(l[1])
                            # _bucket
                            without_le = tuple(l for l in labels if l[0] != 'le')
                            buckets[without_le][bucket_value] += value
                            break
                    else:  # did not find the `le` key
                        # _sum/_count
                        samples[(name, labels)] += value
                else:
                    # Counter and Summary.
                    samples[(name, labels)] += value

            # Accumulate bucket values.
            if metric.type == 'histogram':
                for labels, values in buckets.items():
                    acc = 0.0
                    for bucket, value in sorted(values.items()):
                        sample_key = (
                            metric.name + '_bucket', labels + (('le', floatToGoString(bucket)),),)
                        if accumulate:
                            acc += value
                            samples[sample_key] = acc
                        else:
                            samples[sample_key] = value
                    if accumulate:
                        samples[(metric.name + '_count', labels)] = acc

            # Convert to correct sample format.
            metric.samples = [Sample(name_, dict(labels), value) for (name_, labels), value in
                              samples.items()]
        return metrics.values()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions