Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple periodic readers increment the counter abnormaly #5866

Open
dashpole opened this issue Oct 2, 2024 · 17 comments · May be fixed by #5900
Open

Multiple periodic readers increment the counter abnormaly #5866

dashpole opened this issue Oct 2, 2024 · 17 comments · May be fixed by #5900
Assignees
Labels
area:metrics Part of OpenTelemetry Metrics bug Something isn't working
Milestone

Comments

@dashpole
Copy link
Contributor

dashpole commented Oct 2, 2024

Description

When multiple (periodic) readers are present, metrics increase abnormally.

Originally reported here: open-telemetry/opentelemetry-collector#11327

Steps To Reproduce

Run the OpenTelemetry collector with the configuration provided in the bug above.

Expected behavior

Metrics should not increase abnormally.

@dashpole
Copy link
Contributor Author

dashpole commented Oct 2, 2024

We should try to create a simpler reproduction (not using the collector) to make this easier to root-cause

@pree-dew
Copy link
Contributor

pree-dew commented Oct 3, 2024

@dashpole Can I pick this task?

@dashpole
Copy link
Contributor Author

dashpole commented Oct 3, 2024

@pree-dew feel free to work on this. It is likely going to be tricky to root-cause, but any help is appreciated

@pree-dew
Copy link
Contributor

pree-dew commented Oct 4, 2024

Able to reproduce the issue by providing multiple periodic reader in telemetry

Screenshot 2024-10-04 at 2 37 07 PM

Will try to find a way to reproduce outside of otel-collector

@pree-dew
Copy link
Contributor

pree-dew commented Oct 7, 2024

Able to reproduce this with sdk

========== Reader1=================
{
        "Resource": [
                {
                        "Key": "service.name",
                        "Value": {
                                "Type": "STRING",
                                "Value": "myapp"
                        }
                }
        ],
        "ScopeMetrics": [
                {
                        "Scope": {
                                "Name": "http",
                                "Version": "",
                                "SchemaURL": ""
                        },
                        "Metrics": [
                                {
                                        "Name": "http_requests_total",
                                        "Description": "TODO",
                                        "Unit": "count",
                                        "Data": {
                                                "DataPoints": [
                                                        {
                                                                "Attributes": [],
                                                                "StartTime": "2024-10-07T16:10:11.277202+05:30",
                                                                "Time": "2024-10-07T16:19:41.280994+05:30",
                                                                "Value": 4
                                                        }
                                                ],
                                                "Temporality": "CumulativeTemporality",
                                                "IsMonotonic": true
                                        }
                                }
                        ]
                }
        ]
}
======= Reader 2 ===================
{
	"Resource": [
		{
			"Key": "service.name",
			"Value": {
				"Type": "STRING",
				"Value": "myapp"
			}
		}
	],
	"ScopeMetrics": [
		{
			"Scope": {
				"Name": "http",
				"Version": "",
				"SchemaURL": ""
			},
			"Metrics": [
				{
					"Name": "http_requests_total",
					"Description": "TODO",
					"Unit": "count",
					"Data": {
						"DataPoints": [
							{
								"Attributes": [],
								"StartTime": "2024-10-07T16:10:11.277205+05:30",
								"Time": "2024-10-07T16:19:41.281038+05:30",
								"Value": 12
							}
						],
						"Temporality": "CumulativeTemporality",
						"IsMonotonic": true
					}
				}
			]
		}
	]
}

Same time, same metric is showing values as 4, 12 while the correct value is 3. Same setup giving correct value with 1 reader.

So far the observation is that it is happening with ObservableCounter but will give it one more shot to be sure.

@pree-dew
Copy link
Contributor

pree-dew commented Oct 19, 2024

@dashpole I have added a test case to reproduce the behaviour, it will fail as of now but will pass once the fix goes out.

Debugged the issue, this is my understanding of the issue:

Step 1
Say the value of variable(var reqCount = 1) used for metric type Int64ObserveCounter is 1:

Step2
Pipeline 1 starts-> calls the registered callback -> calls the measure function -> sets the value to 1 as the previous value initially is 0 ref

Step3
Pipeline 2 starts -> calls the registered callback -> calls the measure function -> picks the value set by pipeline1 as 1 and then add the value(reqCount) to it, making it 2

Step4
When the method .Collect will be called instead of value 1, it will get 2, Since it is PreComputedSum, expected value( as per reqCount value) is 1 not 2 [Bug!!!]

This is happening as the instrument is same for both pipelines and before the previous value gets cleared here, next pipeline picks the previous value set by pipeline1 and add the value, thereby making the value incorrect.

With above understanding, I am able to reproduce the issue.

Let me know what do you think about this?

@dashpole
Copy link
Contributor Author

dashpole commented Oct 22, 2024

Aggregations for each reader should be isolated from each other. A meter has a single resolver for int64:

int64Resolver: newResolver[int64](p, &viewCache),

The resolver has a list of pipelines:

in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter[N](p[i], vc)
}
return resolver[N]{in}

There is one pipeline for each reader:

func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for _, r := range readers {
p := newPipeline(res, r, views, exemplarFilter)
r.register(p)
pipes = append(pipes, p)
}
return pipes

The callback is registered separately with each pipeline:

func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
unregs := make([]func(), len(p))
for i, pipe := range p {
unregs[i] = pipe.addMultiCallback(c)
}
return unregisterFuncs{f: unregs}

During Collect, produce() should iterate over each callback on the pipeline:

for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(multiCallback)
if err := f(ctx); err != nil {
errs.append(err)
}

Some things to check:

  • Are aggregations being shared between readers when they shouldn't be?
  • Are callbacks being registered multiple times with the same reader?
  • Is Collect() for one reader invoking callbacks on other readers?

@pree-dew
Copy link
Contributor

pree-dew commented Oct 22, 2024

@dashpole Sure, I will check these points. So far what I have observed is that the instance(in) returned at this line

in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)

is same for both readers, even when there are 2 separate pipelines which internally is calling the measure function for both pipelines.

@dashpole
Copy link
Contributor Author

Added some printing in the sum aggregation:

measure s: 0xc0000aa740, v.n += value: 0 += 1
measure s: 0xc0000aa760, v.n += value: 0 += 1
cumulative s: 0xc0000cb830, i: 1/1, val: 1
measure s: 0xc0000aa740, v.n += value: 0 += 1
measure s: 0xc0000aa760, v.n += value: 1 += 1
cumulative s: 0xc0000cb8f0, i: 1/1, val: 2
--- FAIL: TestPipelineWithMultipleReaders (0.00s)

measure is being called for both pipelines when only one pipeline is collected.

@dashpole
Copy link
Contributor Author

I did some more digging. The issue is that the Instrument we return to users from Int64ObservableCounter contains the measure functions for that instrument from all pipelines (readers). Here, we get the measure function, in, for each inserter (which is 1:1 with a reader), and call instrument.appendMeasures on each measure function:

inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.int64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
fn := cback
insert.addCallback(func(ctx context.Context) error { return fn(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
})

Then, each time ObserveInt64 is called from our callback, it calls observe on the instrument provided by the user (which is our Int64ObservableCounter:

oImpl.observe(v, c.Attributes())

Observe iterates over all of the measure functions:

func (m measures[N]) observe(val N, s attribute.Set) {
for _, in := range m {
in(context.Background(), val, s)

@dashpole
Copy link
Contributor Author

I actually fixed a very similar bug if callbacks were registered using WithCallback() in #4742

@pree-dew
Copy link
Contributor

This issue #4742 looks similar. So, since we are appending measure function for all pipelines, it is getting called even if 1 pipeline collect is getting called.

So, we essentially have to append the measure function corresponding to the pipeline's reader as a solution. Is my understanding correct?

@dashpole
Copy link
Contributor Author

dashpole commented Oct 23, 2024

To fix it, we need to ensure that only callbacks associated with the pipeline are called during the pipeline's Collect() function. In #4742 this was done by passing an instrument to the callback that contained only the measure functions for that pipeline: https://github.com/open-telemetry/opentelemetry-go/pull/4742/files#diff-e6ec718e2bb8f9b29cbab6b14b62ddae15faa6192d82b927b0e7b7f1b4167cc7R124

But here, the instrument comes from the user and must be useable for callbacks for any pipeline, so we can't do that.

The only other option, which is accessible from ObserveInt64 is to have a different observer for each pipeline, which is currently created here:

reg := newObserver()

We might need to split the observer so that we can validate instruments once without using an observer, and then create an observer for each pipeline.

The observer will need to be able to look up the aggregate.Measure function for a given instrument (probably by observableID). That map[observableID]aggregate.Measure probably needs to be stored on the pipeline, and recorded here:

inst.appendMeasures(in)

And the observer needs to keep a reference to the pipeline.

@pree-dew
Copy link
Contributor

Got it, so basically

  1. Create separate instance of observer for each pipeline, register(reference) it w.r.t a pipeline.
  2. Maintain a map of observer with measure function using observeID
  3. During appendMeasure lookup by ObserveID and append only that measure.

@dashpole
Copy link
Contributor Author

dashpole commented Oct 23, 2024

It is probably easiest if observer has a reference to its pipeline, and the map[observableID]measure is on the pipeline.

We probably need to leave appendMeasure alone for now, since the measures on the instrument is used for validation. But right next to where we call appendMeasure, you should update the map[observableID]measure on the pipeline.

During ObserveInt64, we need to stop calling observe() on the observer provided by the user. Instead, we need to look up the measure function that we stored on the pipeline using the observableID from the user, and use that measure function.

@pree-dew
Copy link
Contributor

Got it and makes sense. I will make changes accordingly.

@dashpole dashpole added this to the v1.32.0 milestone Oct 24, 2024
@pree-dew
Copy link
Contributor

pree-dew commented Oct 28, 2024

@dashpole I have taken first attempt at making changes and pushed the code #5900. Let me know your review on that, happy to accomodate feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:metrics Part of OpenTelemetry Metrics bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants