Skip to content

Commit

Permalink
make channel capacity configurable in poollet (#1061)
Browse files Browse the repository at this point in the history
  • Loading branch information
kasabe28 authored May 6, 2024
1 parent 5a4df91 commit 16ddc6e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
8 changes: 7 additions & 1 deletion poollet/bucketpoollet/cmd/bucketpoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type Options struct {
BucketRuntimeSocketDiscoveryTimeout time.Duration
BucketClassMapperSyncTimeout time.Duration

ChannelCapacity int

WatchFilterValue string
}

Expand All @@ -81,6 +83,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.BucketRuntimeSocketDiscoveryTimeout, "bucket-runtime-discovery-timeout", 20*time.Second, "Timeout for discovering the bucket runtime socket.")
fs.DurationVar(&o.BucketClassMapperSyncTimeout, "bcm-sync-timeout", 10*time.Second, "Timeout waiting for the bucket class mapper to sync.")

fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the bucket event generator")

fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.")
}

Expand Down Expand Up @@ -187,7 +191,9 @@ func Run(ctx context.Context, opts Options) error {
return nil, err
}
return res.Buckets, nil
}, irievent.GeneratorOptions{})
}, irievent.GeneratorOptions{
ChannelCapacity: opts.ChannelCapacity,
})
if err := mgr.Add(bucketEvents); err != nil {
return fmt.Errorf("error adding bucket event generator: %w", err)
}
Expand Down
8 changes: 7 additions & 1 deletion poollet/machinepoollet/cmd/machinepoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Options struct {
DialTimeout time.Duration
MachineClassMapperSyncTimeout time.Duration

ChannelCapacity int

ServerFlags server.Flags

AddressesOptions addresses.GetOptions
Expand All @@ -97,6 +99,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.DialTimeout, "dial-timeout", 1*time.Second, "Timeout for dialing to the machine runtime endpoint.")
fs.DurationVar(&o.MachineClassMapperSyncTimeout, "mcm-sync-timeout", 10*time.Second, "Timeout waiting for the machine class mapper to sync.")

fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the machine event generator")

o.ServerFlags.BindFlags(fs)

o.AddressesOptions.BindFlags(fs)
Expand Down Expand Up @@ -261,7 +265,9 @@ func Run(ctx context.Context, opts Options) error {
return nil, err
}
return res.Machines, nil
}, irievent.GeneratorOptions{})
}, irievent.GeneratorOptions{
ChannelCapacity: opts.ChannelCapacity,
})
if err := mgr.Add(machineEvents); err != nil {
return fmt.Errorf("error adding machine event generator: %w", err)
}
Expand Down
9 changes: 8 additions & 1 deletion poollet/volumepoollet/cmd/volumepoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Options struct {
VolumeRuntimeSocketDiscoveryTimeout time.Duration
VolumeClassMapperSyncTimeout time.Duration

ChannelCapacity int

WatchFilterValue string
}

Expand All @@ -81,6 +83,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.DialTimeout, "dial-timeout", 1*time.Second, "Timeout for dialing to the volume runtime endpoint.")
fs.DurationVar(&o.VolumeRuntimeSocketDiscoveryTimeout, "volume-runtime-discovery-timeout", 20*time.Second, "Timeout for discovering the volume runtime socket.")
fs.DurationVar(&o.VolumeClassMapperSyncTimeout, "vcm-sync-timeout", 10*time.Second, "Timeout waiting for the volume class mapper to sync.")

fs.IntVar(&o.ChannelCapacity, "channel-capacity", 1024, "channel capacity for the volume event generator")

fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.")
}

Expand Down Expand Up @@ -187,7 +192,9 @@ func Run(ctx context.Context, opts Options) error {
return nil, err
}
return res.Volumes, nil
}, irievent.GeneratorOptions{})
}, irievent.GeneratorOptions{
ChannelCapacity: opts.ChannelCapacity,
})
if err := mgr.Add(volumeEvents); err != nil {
return fmt.Errorf("error adding volume event generator: %w", err)
}
Expand Down

0 comments on commit 16ddc6e

Please sign in to comment.