Skip to content
This repository was archived by the owner on Jun 7, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Config struct {
ValidateConfig bool // Validate configuration and error out for invalid conifgs
channelBuffer int
srand int
learner string
// derived
LogFileOrig string
}
Expand All @@ -69,6 +70,7 @@ var config = Config{

channelBuffer: 16,
srand: 1,
learner: "simple-learner",
}

//
Expand Down Expand Up @@ -196,6 +198,7 @@ func PreConfig() {
flag.BoolVar(&config.DEBUG, "d", config.DEBUG, "debug=true|false")
flag.BoolVar(&config.ValidateConfig, "validateconfig", config.ValidateConfig, "true|false. Error out on invalid config if true, else Reset to default sane values.")
flag.IntVar(&config.srand, "srand", config.srand, "random seed, use 0 (zero) for random seed selection")
flag.StringVar(&config.learner, "learner", config.learner, "Macine learning module to be used for the model")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Macine learning module/ML algorithm/

flag.IntVar(&configStorage.numReplicas, "replicas", configStorage.numReplicas, "number of replicas")
flag.IntVar(&configStorage.sizeDataChunk, "chunksize", configStorage.sizeDataChunk, "chunk size (KB)")
Expand Down Expand Up @@ -343,6 +346,15 @@ func configureReplicast(unicastBidMultiplier bool) {
}

// Wrapper methods for flag
func RegisterCmdlineBoolVar(dest *bool, opt string, def bool, desc string) bool {
if flag.Parsed() {
return false
}
flag.BoolVar(dest, opt, def, desc)

return true
}

func RegisterCmdlineIntVar(dest *int, opt string, def int, desc string) bool {
if flag.Parsed() {
return false
Expand Down
29 changes: 21 additions & 8 deletions disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,28 @@ func (d *Disk) Run() {
// returns true always for now.
//
func (d *Disk) NowIsDone() bool {
if d.nextWrDone.After(Now) {
if d.writes <= 1 || d.nextWrDone.After(Now) {
return true
}

// Simulate the write of one chunk
d.qMutex.Lock()
pendingwbytes := d.pendingwbytes
pendingwbytes -= int64(configStorage.sizeDataChunk * 1024)
if pendingwbytes >= 0 {
if pendingwbytes > 0 {
d.pendingwbytes = pendingwbytes
} else {
d.pendingwbytes = 0
}
qSz := float64(d.pendingwbytes) / float64(configStorage.sizeDataChunk * 1024)
d.qMutex.Unlock()
d.nextWrDone = Now.Add(configStorage.dskdurationDataChunk)
log(LogVVV, fmt.Sprintf("one-chunk write complete, new QDepth :%v", qSz))

// TODO: The delay is not taken into account here. We need to account for
// the delay as well.
// Temporary workaround is to add a 12 microsecond delay which is an average
// delay for window sizes between 1 and 32.
d.nextWrDone = Now.Add(configStorage.dskdurationDataChunk + (12 * time.Microsecond))

return true
}
Expand Down Expand Up @@ -175,7 +184,8 @@ func (l *LatencyParabola) Latency(params DiskLatencyParams) time.Duration {

func (d *Disk) scheduleWrite(sizebytes int) time.Duration {
at := sizeToDuration(sizebytes, "B", int64(d.MBps), "MB")
Copy link
Member

@alex-aizman alex-aizman Nov 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably makes sense to subclass Disk type for the mb and future models. Older models could then keep using the basic Disk with its basic lastIOdone. This would be safer, regression-wise. The changes in the scheduleWrite() and the need to maintain array of latencies would warrant this..

The only additional change that would probably still make sense to do on top of the above is to convert disk.lastIOdone member variable into disk.lastIOdone() method, overloaded in the disk implementation that we want for the mb model. This method would always return the properly recomputed latency based on the disk current state. And then you may not need to do any disk related logic inside NowIsDone(), which would be much cleaner.

w := d.writes
d.writes++
d.writebytes += int64(sizebytes)

// Simulate adding i/o to the queue.
d.qMutex.Lock()
Expand All @@ -184,23 +194,26 @@ func (d *Disk) scheduleWrite(sizebytes int) time.Duration {
// In the current implementation, we don't differentiate i/o and all of the
// existing models does one chunk per i/o, we assume queu size in units of chunks.
// FIXME: Quesize should be in units of i/o
qSz := float64(d.pendingwbytes) / float64(configStorage.sizeDataChunk * 1024)
d.pendingwbytes += int64(sizebytes)
qSz := float64(d.pendingwbytes) / float64(configStorage.sizeDataChunk * 1024)
d.qMutex.Unlock()

latency := d.latencySim.Latency(DiskLatencyParams{qSize: qSz})
log(LogVV, fmt.Sprintf("Scheduling chunk write current QDepth:%v, Induced delay: %v", qSz, latency))
at += latency

d.writes++
d.writebytes += int64(sizebytes)
if w > 0 && Now.Before(d.lastIOdone) {
if d.writes == 1 {
d.nextWrDone = Now.Add(at)
}
if Now.Before(d.lastIOdone) {
d.lastIOdone = d.lastIOdone.Add(at)
log(LogVVV, fmt.Sprintf("next-chunk complete in :%v", d.lastIOdone.Sub(time.Time{})))
at1 := d.lastIOdone.Sub(Now)
return at1
}

d.lastIOdone = Now.Add(at)
log(LogVVV, fmt.Sprintf("next-chunk complete in :%v", d.lastIOdone.Sub(time.Time{})))
return at
}

Expand Down
123 changes: 123 additions & 0 deletions learn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Basic Machine Learning framework for surge. The framework provides for
// APIs to train the module and then predict bandwidth given a comannd
// window size.
package surge

import (
"errors"
)

const (
minCmdWinsize = 1
maxCmdWinsize = 256
)

var learners map[string]LearnerInterface

var (
NotEnoughDataError = errors.New("not enough training data to predict.")
NoTrainingDataError = errors.New("you need to Fit() before you can Predict()")
NotImplementedError = errors.New("Method not implemented")
)

type LearnerInterface interface {
GetName() string
Initialize()
Train(cmdWinSize int32, tput int64)
Predict(cmdWinSize int32) (int64, error)
GetOptimalWinsize() (int32, error)
}

// Learner Template
type LearnerTemplate struct {
name string
minTrainValues int // Minimum number of training values before the learner can predict
}

func (l *LearnerTemplate) GetName() string {
return l.name
}

func (l *LearnerTemplate) Initialize() {
l.minTrainValues = 16
}

// A simple learner which remembers the optimal window sizes
// from all previous window sizes

type SimpleLearner struct {
LearnerTemplate
first int // minimal cmdwinsize value for this learner
last int // max cmdwinsize value for this learner
tputs map[int32]int64
fitCount int
optimalWinSize int32
maxTput int64
}

func (sl *SimpleLearner) Initialize() {

sl.first = minCmdWinsize
sl.last = maxCmdWinsize
sl.minTrainValues = 8

sl.tputs = make(map[int32]int64, sl.last)
sl.fitCount = 0
sl.optimalWinSize = 0

sl.maxTput = 0
for i := 0; i < sl.last; i++ {
sl.tputs[int32(i)] = 0
}

}

func (sl *SimpleLearner) Train(cmdWinSize int32, tput int64) {
sl.fitCount++
sl.tputs[cmdWinSize] = tput
if tput >= sl.maxTput {
sl.maxTput = tput
sl.optimalWinSize = cmdWinSize
} else if sl.optimalWinSize == cmdWinSize {
// The previous high value of this cmdWinsize changed.
// Need to find the next largest tput
sl.maxTput = 0
for i := 0; i < sl.last; i++ {
if sl.tputs[int32(i)] > sl.maxTput {
sl.maxTput = tput
sl.optimalWinSize = cmdWinSize
}
}
}
}

// This learner cannot predict. It only recollects from previous
// data.
func (sl *SimpleLearner) Predict(cmdWinSize int32) (int64, error) {
return 0, NotImplementedError;
}

func (sl *SimpleLearner) GetOptimalWinsize() (int32, error) {
if sl.fitCount < sl.minTrainValues {
return 0, NotEnoughDataError
}

return sl.optimalWinSize, nil
}

// Public functions

func RegisterLearner(name string, learner LearnerInterface) {
assert(learners[name] == nil)
learners[name] = learner;
}

func GetLearner(name string) LearnerInterface {
return learners[name]
}

var simpleLearner = SimpleLearner{}
func init() {
learners = make(map[string]LearnerInterface, 4)
RegisterLearner("simple-learner", &simpleLearner)
}
161 changes: 161 additions & 0 deletions linear_regression_learner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Implementation of a linear regression based learner
package surge

import (
"fmt"
"github.com/sjwhitworth/golearn/base"
"github.com/sjwhitworth/golearn/linear_models"
)

const (
maxRows = 256 // Max Training Datasets (command window sizes)
)

type LRLearner struct {
LearnerTemplate
inputSz int // count of distinct Training data sets.
data []int64 // Training data set
specs []base.AttributeSpec
attrs []base.Attribute
instances *base.DenseInstances
predictions *base.DenseInstances
}

func (ll *LRLearner) Initialize() {
ll.minTrainValues = 32

// Initialize the DenseInstance
// Two attributes:
// - CommandWindowSz: Independent variable
// - Throughput: Dependent variable (ClassAttribute)
ll.attrs = make([]base.Attribute, 2)
ll.attrs[0] = base.NewFloatAttribute("CommandWindowSz")
ll.attrs[1] = base.NewFloatAttribute("Throughput")
ll.specs = make([]base.AttributeSpec, len(ll.attrs))
ll.instances = base.NewDenseInstances()
for i, a := range ll.attrs {
spec := ll.instances.AddAttribute(a)
ll.specs[i] = spec
}
ll.instances.Extend(maxRows)
ll.instances.AddClassAttribute(ll.attrs[1])

// Make a copy in ll.predictions
ll.predictions = base.NewDenseCopy(ll.instances)
attrs := ll.predictions.AllAttributes()
ll.predictions.AddClassAttribute(attrs[1])

// Initialize the data set to -1 to distinguish updated training data with
// uninitialized data.
ll.data = make([]int64, maxRows)
for i := 0; i < maxRows; i++ {
ll.data[i] = -1
}
}

// Train() API just stores the data in the data array to be processed by the
// regression api later in Predict() API
func (ll *LRLearner) Train(cmdWinSize int32, tput int64) {
if ll.data[cmdWinSize] < 0 {
ll.inputSz++
}
ll.data[cmdWinSize] = tput
}


// This internal method populates the DenseInstances(golearn data structure) from
// the training dataset. There are two DenseInstances:
// - ll.instances : Training dataset for the Regression module. Attributes initialized
// only for those rows which has training data
// - ll.predictions : The output prediction dataset. Attribute is initialized for all
// rows, so that we get predictions for all rows.
//
func (ll *LRLearner) populateTrainingData() {
instance_specs := base.ResolveAllAttributes(ll.instances)
prediction_specs := base.ResolveAllAttributes(ll.predictions)
row := 0
for i := 1; i < maxRows; i++ {
tput := ll.data[i]
ll.predictions.Set(prediction_specs[0], row, base.PackFloatToBytes(float64(i)))
if tput == -1 { continue }

ll.predictions.Set(prediction_specs[1], row, base.PackFloatToBytes(float64(tput)))

ll.instances.Set(instance_specs[0], row, base.PackFloatToBytes(float64(i)))
ll.instances.Set(instance_specs[1], row, base.PackFloatToBytes(float64(tput)))
row++
}
}

// Internal method which performs the linear regression.
// Returns: DenseInstance predictions. This DenseInstance has only one attribute and that is
// the ClassAtribute - Throughput.
//
func (ll *LRLearner) predict() (base.FixedDataGrid, error) {
if ll.inputSz < ll.minTrainValues {
return nil, NotEnoughDataError
}
ll.populateTrainingData()

lr := linear_models.NewLinearRegression()
err := lr.Fit(ll.instances)
if err != nil {
log(LogVVV, fmt.Sprintf("LinearRegression.Fit() failed: %v", err))
return nil, err
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once fitting is done (via lr.Fit() above) , we have effectively a* x^2 + b * x + c polynomial expression, with a, b, and c computed by the golearn framework. And so there are two types of errors to look at: delta between the computed polynomial expression and the real throughput values for the training set, and the same delta but now for the testing set that must be kept separately from training.

Both deltas must be fairly close to zero, for us to believe that we can safely use this a* x^2 + b * x + c expression to calculate the maximum throughput.

predictions, err := lr.Predict(ll.predictions)
if err != nil {
log(LogVVV, fmt.Sprintf("LinearRegression.Predict() failed: %v", err))
return nil, err
}

return predictions, nil
}

func (ll *LRLearner) Predict(cmdWinSize int32) (int64, error) {
predictions, err := ll.predict()

if err != nil { return 0, err }

specs := base.ResolveAllAttributes(predictions)
tput := int64(base.UnpackBytesToFloat(predictions.Get(specs[1], int(cmdWinSize))))

return tput, nil
}

func (ll *LRLearner) GetOptimalWinsize() (int32, error) {
if ll.inputSz < ll.minTrainValues {
return 0, NotEnoughDataError
}

predictions, err := ll.predict()

if err != nil { return 0, err }

_, rows := predictions.Size()

specs := base.ResolveAllAttributes(predictions)
max_tput := int64(0)
opt_winsz := int32(0)
for i := 1; i < rows; i++ {
tput := int64(base.UnpackBytesToFloat(predictions.Get(specs[0], i)))
if max_tput < tput {
max_tput = tput
opt_winsz = int32(i)
}
}

if opt_winsz == 0 || max_tput == 0 {
return 0, NoTrainingDataError
}

return opt_winsz, nil
}

// Instantiation
var lrLearner = LRLearner{}

func init() {
RegisterLearner("lr-learner", &lrLearner)
}
Loading