-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Fix incorrect metrics getting generated from multiple readers #5900
base: main
Are you sure you want to change the base?
[WIP] Fix incorrect metrics getting generated from multiple readers #5900
Conversation
…w, once the fix goes it will pass
…ipeline and get the measure during collect for the called pipeline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking great overall.
switch o := inst.(type) { | ||
case int64Observable: | ||
if e := o.registerable(m); e != nil { | ||
if !errors.Is(e, errEmptyAgg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will return the same error multiple times (once for each pipeline). I think we should iterate over instruments once to see if it is registerable, and construct a list of valid instruments. Then iterate over valid instruments once for each pipe and register the valid ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. I will make the change
for i, pipe := range p { | ||
unregs[i] = pipe.addMultiCallback(c) | ||
} | ||
func (p pipelines) registerMultiCallbacks(unregs []func()) metric.Registration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider removing this function, and just returning unregisterFuncs{f: unregs}
from RegisterCallback
} | ||
|
||
type observer struct { | ||
embedded.Observer | ||
|
||
pipe *pipeline | ||
float64 map[observableID[float64]]struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you take my advice above to do validation once for all pipelines, you should be able to remove these maps from the observer. IIRC, they are only used for validation, so you could just create these maps at the beginning of RegisterCallback.
@@ -530,7 +546,10 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ... | |||
return | |||
} | |||
c := metric.NewObserveConfig(opts) | |||
oImpl.observe(v, c.Attributes()) | |||
measures := r.pipe.float64Measures[oImpl.observableID] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to lock around this? It would be good to ensure our parallel tests Invoke a callback in parallel with RegisterCallbacks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will make the change
Will fix #5866