-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support ThroughputLimit in samplers #1300
base: main
Are you sure you want to change the base?
Conversation
3bdcd4a
to
78069b3
Compare
78069b3
to
574e6e0
Compare
return float64(currentEMA) / float64(c.throughputLimit) | ||
} | ||
|
||
type throughputReport struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code can be refactored to be a shared logic in both stress relief and throughput calculator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they're quite similar.
Would it make sense to go even farther, and bundle the updates into the same messages? So the system maintains a map of named values that can be updated internally by each peer, and the peers send the map through pubsub?
config/metadata/rulesMeta.yaml
Outdated
- name: EMAThroughputLimit | ||
sortorder: 1 | ||
title: EMAThroughput Limit | ||
description: > |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put this config options in the rules config because it's mostly impacting the samplers instead of refinery operations
c.Pubsub.Subscribe(context.Background(), stressReliefTopic, c.onThroughputUpdate) | ||
|
||
go func() { | ||
ticker := c.Clock.NewTicker(c.intervalLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only publish if the throughput is different from the previous calculation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
if rule.Drop { | ||
// If we dropped because of an explicit drop rule, then increment that too. | ||
s.Metrics.Increment(s.prefix + "num_dropped_by_drop_rule") | ||
rate = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have the rule's config in the MakeSamplingDecision
function. I changed the logic to return rate
as 0 to signal that it's a drop decision due to the Drop
config
c.Pubsub.Subscribe(context.Background(), stressReliefTopic, c.onThroughputUpdate) | ||
|
||
go func() { | ||
ticker := c.Clock.NewTicker(c.intervalLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
collect/throughput_calculator.go
Outdated
hostID string | ||
|
||
mut sync.RWMutex | ||
throughputs map[string]throughputReport |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using a generics.SetWithTTL here, which will avoid the explicit timeout checks?
return float64(currentEMA) / float64(c.throughputLimit) | ||
} | ||
|
||
type throughputReport struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they're quite similar.
Would it make sense to go even farther, and bundle the updates into the same messages? So the system maintains a map of named values that can be updated internally by each peer, and the peers send the map through pubsub?
return msg.peerID + "|" + fmt.Sprint(msg.throughput) | ||
} | ||
|
||
func unmarshalThroughputMessage(msg string) (*throughputMessage, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gives me an idea. Instead of taking a string for a message type, Pubsub could take a PubsubMessage, which would maybe just embed encoding.TextMarshaler and encoding.TextUnmarshaler.
That would kind of normalize the way we do these pack and unpack things for pubsub.
Or we could build a general-purpose PubsubMessage class that has the ability to add named fields.
config/metadata/rulesMeta.yaml
Outdated
description: > | ||
The duration after which the EMA Dynamic Sampler should recalculate | ||
its internal counters. It should be specified as a duration string. | ||
For example, "30s" or "1m". Defaults to "15s". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, "30s" or "1m". Defaults to "15s". | |
For example, `30s` or `1m`. Defaults to `15s`. |
config/sampler_config.go
Outdated
@@ -143,8 +143,15 @@ func (v *RulesBasedDownstreamSampler) NameMeaningfulRate() string { | |||
} | |||
|
|||
type V2SamplerConfig struct { | |||
RulesVersion int `json:"rulesversion" yaml:"RulesVersion" validate:"required,ge=2"` | |||
Samplers map[string]*V2SamplerChoice `json:"samplers" yaml:"Samplers,omitempty" validate:"required"` | |||
RulesVersion int `json:"rulesversion" yaml:"RulesVersion" validate:"required,ge=2"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my head, this is a configuration option more than a sampler option, because it's global and doesn't depend on the samplers. I also understand doing it this way, though, so I think we should talk it through.
return rate, "dynamic", key | ||
} | ||
|
||
func (d *DynamicSampler) MakeSamplingDecision(rate uint, trace *types.Trace) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty happy with the removal of the keep parameter, but I thought I understood that we were not going to add a new function to every sampler. I think MakeSamplingDecision is the same for all samplers, except for the metrics. Why is this better than centralizing that into the call from collect()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, the DeterministicSampler
's decision making logic is quite different from the rest of the samplers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh. You're right, but deterministic sampling breaks with the throughput throttle anyway, so maybe we should consider treating it differently or exempting it from the throttle.
Which problem is this PR solving?
relates to #956
Short description of the changes
EMAThroughputCalculator
to calculate cluster throughputs and publish individual throughput on an intervalSampler
interface to separate the sample rate calculation logic and sampling decision logicEMAThroughputLimit
in rules config