Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 64 additions & 16 deletions scripts/radossim.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,23 @@
import random
import math

# global variable for state monitoring
blockNext = 0

# collecting internal states
blocking_dur_vec = []
osd_queue_size = []
kv_queue_size = []
min_lat_vec = []


# Predict request process time in micro seconds (roughly based on spinning media model
# kaldewey:rtas08, Fig 2)
def latModel(
reqSize, lgMult=820.28, lgAdd=-1114.3, smMult=62.36, smAdd=8.33, mu=6.0, sigma=2.0
# below is the default settings
#reqSize, lgMult=820.28, lgAdd=-1114.3, smMult=62.36, smAdd=8.33, mu=6.0, sigma=2.0
# below is the customized settings for 4k rand write
reqSize, lgMult=820.28, lgAdd=-1114.3, smMult=62.36, smAdd=5764.36, mu=5.0, sigma=0.5
):
# Request size-dependent component
runLen = reqSize / 4096.0
Expand Down Expand Up @@ -42,12 +55,15 @@ def osdThread(env, srcQ, dstQ):
with srcQ.get() as get:
osdRequest = yield get
# Timestamp transaction (after this time request cannot be prioritized)
bsTxn = (osdRequest, env.now)
curTime = env.now
if curTime < blockNext:
yield env.timeout(blockNext-curTime) # this is wait_until
bsTxn = (osdRequest, curTime)
# Submit BlueStore transaction
with dstQ.put(bsTxn) as put:
yield put


# Batch incoming requests and process
def kvThread(env, srcQ, targetLat=5000, measInterval=100000):
bm = BatchManagement(srcQ, targetLat, measInterval)
Expand Down Expand Up @@ -81,11 +97,11 @@ def kvThread(env, srcQ, targetLat=5000, measInterval=100000):
# Process batch
kvQDispatch = env.now
yield env.timeout(latModel(batchReqSize))
kvCommit = env.now
kvCommit = env.now - kvQDispatch
# Diagnose and manage batching
bm.manageBatch(batch, batchReqSize, kvQDispatch, kvCommit)


# Manage batch sizing
class BatchManagement:
def __init__(self, queue, minLatTarget=5000, initInterval=100000):
Expand All @@ -102,29 +118,55 @@ def __init__(self, queue, minLatTarget=5000, initInterval=100000):
self.minLatViolationCnt = 0
self.intervalAdj = lambda x: math.sqrt(x)
self.minLat = None
self.preBlockingDur = 0
self.curBlockingDur = 0
self.constLat = 500
# Batch sizing state
self.batchSize = self.queue.capacity
self.batchSizeInit = 100
self.batchDownSize = lambda x: int(x / 2)
self.batchUpSize = lambda x: int(x + 10)

def manageBatch(self, batch, batchSize, dispatchTime, commitTime):
systemNow = env.now
self.minLat = 0
for txn in batch:
((priority, reqSize, arrivalOSD), arrivalKV) = txn
# Account latencies
osdQLat = arrivalKV - arrivalOSD
kvQLat = dispatchTime - arrivalKV
osdQLat = arrivalKV - arrivalOSD # op_queue latency
kvQLat = dispatchTime - arrivalKV # kv_queue latency
bs_kvq_lat = kvQLat + commitTime # kv_queue queueing time plus commit time
#if not self.minLat or kvQLat < self.minLat:
if not self.minLat or bs_kvq_lat < self.minLat:
self.minLat = bs_kvq_lat # get the min lat in a batch
self.count += 1
self.lat += osdQLat + kvQLat
#self.lat += osdQLat + kvQLat # total latency
self.lat += osdQLat + bs_kvq_lat # total latency
if priority in self.latMap:
self.latMap[priority] += osdQLat + kvQLat
#self.latMap[priority] += osdQLat + kvQLat
self.latMap[priority] += osdQLat + bs_kvq_lat
self.cntMap[priority] += 1
else:
self.latMap[priority] = osdQLat + kvQLat
#self.latMap[priority] = osdQLat + kvQLat
self.latMap[priority] = osdQLat + bs_kvq_lat
self.cntMap[priority] = 1
self.fightBufferbloat(kvQLat, dispatchTime)
self.printLats()

#self.fightBufferbloat(kvQLat, dispatchTime)
#self.printLats()
self.compareLatency(systemNow)

# CoDel with blocking duration/timestamp
def compareLatency(self, currentTime):
global blockNext
min_lat_vec.append(self.minLat)
if self.minLat <= self.minLatTarget:
self.curBlockingDur = self.preBlockingDur / 2
# violation
else:
self.curBlockingDur = self.preBlockingDur + self.constLat
blocking_dur_vec.append(self.curBlockingDur)
self.preBlockingDur = self.curBlockingDur
blockNext = currentTime + self.curBlockingDur

# Implement CoDel algorithm and call batchSizing
def fightBufferbloat(self, currQLat, currentTime):
if not self.minLat or currQLat < self.minLat:
Expand Down Expand Up @@ -169,7 +211,6 @@ def printLats(self, freq=1000):
print(priority, self.latMap[priority] / self.cntMap[priority] / 1000000)
print("total", self.lat / self.count / 1000000)


if __name__ == "__main__":

env = simpy.Environment()
Expand Down Expand Up @@ -199,7 +240,14 @@ def printLats(self, freq=1000):
env.process(osdThread(env, osdQ2, kvQ))

# KV thread in BlueStore with targetMinLat and measurement interval (in usec)
env.process(kvThread(env, kvQ, 80000, 1600000))

#env.process(kvThread(env, kvQ, 80000, 1600000))
env.process(kvThread(env, kvQ, 339, 1600000))

# Run simulation
env.run(120 * 60 * 1000000)