Skip to content

Commit 4557d66

Browse files
authored
Supporting batching resources scaling (#32)
1 parent 7a0c5c4 commit 4557d66

File tree

8 files changed

+37
-24
lines changed

8 files changed

+37
-24
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ and k8s service/s that can be used to route incoming requests.
1515
For example, when the autoscaler decides it needs to scale some resource to zero, it executes the resource-scaler's
1616
`SetScale` function which has the knowledge how to scale to zero its specific resource.
1717

18-
**The autoscaler** - Responsible for periodically checking whether some resources should be scaled to zero. this is
18+
**The autoscaler** - Responsible for periodically checking whether some resources should be scaled to zero. This is
1919
performed by by querying the custom metrics API. Upon deciding a resource should be scaled to zero, it uses the internal
2020
resource-scaler module to scale the resource to zero.
2121
The resource-scaler will first route all incoming traffic to the DLX, which in terms of K8s is done by changing a

cmd/autoscaler/app/autoscaler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func Run(kubeconfigPath string,
6565
func createAutoScaler(restConfig *rest.Config,
6666
resourceScaler scaler_types.ResourceScaler,
6767
options scaler_types.AutoScalerOptions) (*autoscaler.Autoscaler, error) {
68-
rootLogger, err := nucliozap.NewNuclioZap("autoscaler", "console", os.Stdout, os.Stderr, nucliozap.DebugLevel)
68+
rootLogger, err := nucliozap.NewNuclioZap("scaler", "console", os.Stdout, os.Stderr, nucliozap.DebugLevel)
6969
if err != nil {
7070
return nil, errors.Wrap(err, "Failed to initialize root logger")
7171
}

cmd/dlx/app/dlx.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func Run(kubeconfigPath string,
6767
}
6868

6969
func createDLX(resourceScaler scaler_types.ResourceScaler, options scaler_types.DLXOptions) (*dlx.DLX, error) {
70-
rootLogger, err := nucliozap.NewNuclioZap("dlx", "console", os.Stdout, os.Stderr, nucliozap.DebugLevel)
70+
rootLogger, err := nucliozap.NewNuclioZap("scaler", "console", os.Stdout, os.Stderr, nucliozap.DebugLevel)
7171
if err != nil {
7272
return nil, errors.Wrap(err, "Failed to initialize root logger")
7373
}

pkg/autoscaler/autoscaler.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func NewAutoScaler(parentLogger logger.Logger,
2929
customMetricsClientSet custommetricsv1.CustomMetricsClient,
3030
options scaler_types.AutoScalerOptions) (*Autoscaler, error) {
3131
childLogger := parentLogger.GetChild("autoscaler")
32-
childLogger.DebugWith("Creating Autoscaler",
32+
childLogger.InfoWith("Creating Autoscaler",
3333
"options", options)
3434

3535
return &Autoscaler{
@@ -44,6 +44,8 @@ func NewAutoScaler(parentLogger logger.Logger,
4444
}
4545

4646
func (as *Autoscaler) Start() error {
47+
as.logger.DebugWith("Starting",
48+
"scaleInterval", as.scaleInterval)
4749
ticker := time.NewTicker(as.scaleInterval)
4850

4951
go func() {
@@ -145,7 +147,7 @@ func (as *Autoscaler) checkResourceToScale(resource scaler_types.Resource, resou
145147
"value", value)
146148
}
147149

148-
as.logger.InfoWith("All metric values below threshold, should scale to zero", "resourceName", resource.Name)
150+
as.logger.DebugWith("All metric values below threshold, should scale to zero", "resourceName", resource.Name)
149151
return true
150152
}
151153

@@ -175,6 +177,7 @@ func (as *Autoscaler) checkResourcesToScale() error {
175177
return errors.Wrap(err, "Failed to get resources metrics")
176178
}
177179

180+
resourcesToScale := make([]scaler_types.Resource, 0)
178181
for idx, resource := range activeResources {
179182
inScaleToZeroProcess, found := as.inScaleToZeroProcessMap[resource.Name]
180183
if found && inScaleToZeroProcess {
@@ -207,19 +210,28 @@ func (as *Autoscaler) checkResourcesToScale() error {
207210
}
208211

209212
as.inScaleToZeroProcessMap[resource.Name] = true
210-
go func(resource scaler_types.Resource) {
211-
err := as.scaleResourceToZero(resource)
213+
resourcesToScale = append(resourcesToScale, activeResources[idx])
214+
}
215+
216+
if len(resourcesToScale) > 0 {
217+
go func(resources []scaler_types.Resource) {
218+
as.logger.InfoWith("Scaling resources to zero", "resources", resources)
219+
err := as.scaleResourcesToZero(resources)
212220
if err != nil {
213-
as.logger.WarnWith("Failed to scale resource to zero", "resource", resource, "err", errors.GetErrorStackString(err, 10))
221+
as.logger.WarnWith("Failed to scale resources to zero", "resources", resources, "err", errors.GetErrorStackString(err, 10))
214222
}
215-
delete(as.inScaleToZeroProcessMap, resource.Name)
216-
}(activeResources[idx])
223+
as.logger.InfoWith("Successfully scaled resources to zero", "resources", resources)
224+
for _, resource := range resources {
225+
delete(as.inScaleToZeroProcessMap, resource.Name)
226+
}
227+
}(resourcesToScale)
217228
}
229+
218230
return nil
219231
}
220232

221-
func (as *Autoscaler) scaleResourceToZero(resource scaler_types.Resource) error {
222-
if err := as.resourceScaler.SetScale(resource, 0); err != nil {
233+
func (as *Autoscaler) scaleResourcesToZero(resources []scaler_types.Resource) error {
234+
if err := as.resourceScaler.SetScale(resources, 0); err != nil {
223235
return errors.Wrap(err, "Failed to set scale")
224236
}
225237

pkg/dlx/dlx.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,17 @@ type DLX struct {
1414
handler Handler
1515
}
1616

17-
func NewDLX(logger logger.Logger,
17+
func NewDLX(parentLogger logger.Logger,
1818
resourceScaler scaler_types.ResourceScaler,
1919
options scaler_types.DLXOptions) (*DLX, error) {
20-
resourceStarter, err := NewResourceStarter(logger, resourceScaler, options.Namespace, options.ResourceReadinessTimeout)
20+
childLogger := parentLogger.GetChild("dlx")
21+
childLogger.InfoWith("Creating DLX", "options", options)
22+
resourceStarter, err := NewResourceStarter(childLogger, resourceScaler, options.Namespace, options.ResourceReadinessTimeout)
2123
if err != nil {
2224
return nil, errors.Wrap(err, "Failed to create function starter")
2325
}
2426

25-
handler, err := NewHandler(logger,
27+
handler, err := NewHandler(childLogger,
2628
resourceStarter,
2729
options.TargetNameHeader,
2830
options.TargetPathHeader,
@@ -32,15 +34,14 @@ func NewDLX(logger logger.Logger,
3234
}
3335

3436
return &DLX{
35-
logger: logger,
37+
logger: childLogger,
3638
listenAddress: options.ListenAddress,
3739
handler: handler,
3840
}, nil
3941
}
4042

4143
func (d *DLX) Start() error {
42-
d.logger.InfoWith("Starting",
43-
"listenAddress", d.listenAddress)
44+
d.logger.DebugWith("Starting", "listenAddress", d.listenAddress)
4445

4546
http.HandleFunc("/", d.handler.HandleFunc)
4647
if err := http.ListenAndServe(d.listenAddress, nil); err != nil {

pkg/dlx/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ type Handler struct {
1919
targetPort int
2020
}
2121

22-
func NewHandler(logger logger.Logger,
22+
func NewHandler(parentLogger logger.Logger,
2323
resourceStarter *ResourceStarter,
2424
targetNameHeader string,
2525
targetPathHeader string,
2626
targetPort int) (Handler, error) {
2727
h := Handler{
28-
logger: logger,
28+
logger: parentLogger.GetChild("handler"),
2929
resourceStarter: resourceStarter,
3030
targetNameHeader: targetNameHeader,
3131
targetPathHeader: targetPathHeader,

pkg/dlx/resourcestarter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (r *ResourceStarter) startResource(resourceSinkChannel chan responseChannel
7575
// simple for now
7676
resourceName := target
7777

78-
r.logger.DebugWith("Starting resource", "resource", resourceName)
78+
r.logger.InfoWith("Starting resource", "resource", resourceName)
7979
resourceReadyChannel := make(chan error, 1)
8080
defer close(resourceReadyChannel)
8181

@@ -91,7 +91,7 @@ func (r *ResourceStarter) startResource(resourceSinkChannel chan responseChannel
9191
ResourceName: resourceName,
9292
}
9393
case err := <-resourceReadyChannel:
94-
r.logger.DebugWith("Resource ready", "target", target, "err", errors.GetErrorStackString(err, 10))
94+
r.logger.InfoWith("Resource ready", "target", target, "err", errors.GetErrorStackString(err, 10))
9595

9696
if err == nil {
9797
resultStatus = ResourceStatusResult{
@@ -123,7 +123,7 @@ func (r *ResourceStarter) startResource(resourceSinkChannel chan responseChannel
123123
}
124124

125125
func (r *ResourceStarter) waitResourceReadiness(resource scaler_types.Resource, resourceReadyChannel chan error) {
126-
err := r.scaler.SetScale(resource, 1)
126+
err := r.scaler.SetScale([]scaler_types.Resource{resource}, 1)
127127
resourceReadyChannel <- err
128128
}
129129

pkg/resourcescaler/resourcescaler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ func New(kubeconfigPath string, namespace string) (scaler_types.ResourceScaler,
1010
return &NopResourceScaler{}, nil
1111
}
1212

13-
func (r *NopResourceScaler) SetScale(resource scaler_types.Resource, scale int) error {
13+
func (r *NopResourceScaler) SetScale(resources []scaler_types.Resource, scale int) error {
1414
return nil
1515
}
1616

0 commit comments

Comments
 (0)