Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,26 @@ Druid Benchmark

Code for our benchmarking blog post:
[http://druid.io/blog/2014/03/17/benchmarking-druid.html](http://druid.io/blog/2014/03/17/benchmarking-druid.html)


Prerequisites
=============

- R (http://www.r-project.org/)
Installation Steps
- install R on ubuntu (http://cran.r-project.org/bin/linux/ubuntu/README)
sudo vi /etc/apt/sources.list
deb http://cran.rstudio.com/bin/linux/ubuntu precise/
sudo apt-get update
sudo apt-get install r-base-core
- fix missing curl-config issue
sudo apt-get install libcurl4-gnutls-dev librtmp-dev
- install devtools/RDruid/microbenchmark
install.packages("devtools")
devtools::install_github("RDruid", "metamx")
install.packages("microbenchmark")

Add new tests
=============

Add tests under testcases, the f/w will execute all the cases mentioned in the test sequentially.
235 changes: 45 additions & 190 deletions benchmark-druid.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,188 +31,55 @@ cat(sprintf("Running benchmarks against [%s] on [%s], running each query [%d] ti

i <- interval(fromISO("1992-01-01T00:00:00"), fromISO("1999-01-01T00:00:00"))

total_size <- function(datasource) {
segments <- fromJSON(as.character(
httr::GET(
paste("http://10.151.79.16:8080/druid/coordinator/v1/datasources/", datasource, "/segments?full", sep="")
)
))
sum(laply(segments, function(x) x$size))
###############################################


runBenchmark <- function(absTestFileName, testFuncRegexp)
{
sandbox <- new.env(parent=.GlobalEnv)

if (!file.exists(absTestFileName)) {
msgText <- paste("Test case file ", absTestFileName," not found.")
cat(sprintf(msgText))
return
}

## catch syntax errors in test case file
res <- try(sys.source(absTestFileName, envir=sandbox))
if (inherits(res, "try-error")) {
message <- paste("Error while sourcing ",absTestFileName,":",geterrmessage())
cat(sprintf(message))
return
}
results <-list()
testFunctions <- ls(pattern=testFuncRegexp, envir=sandbox)
for (funcName in testFunctions) {
cat(sprintf("Executing Test: %s Case: %s.\n",absTestFileName, funcName))
testcase <- get(funcName, envir=sandbox)
## anything else than a function is ignored.
if(mode(testcase) != "function") {
return(invisible())
}
benchmarkResult <- microbenchmark(testcase(datasource), times=n)
benchmarkResult$expr <- paste(absTestFileName, funcName,sep="_")
results <- rbind(results, benchmarkResult)
}

return(results)
}

countrows <- function(datasource, interval, filter) {
druid.query.timeseries(
url = url,
dataSource = datasource,
intervals = interval,
aggregations = list(druid.count()),
filter = filter,
granularity = granularity("all"),
context=list(useCache=F, populateCache=F)
)$count
resultsList <- list()
testFiles <- list.files("testcases", pattern = ".+\\.[rR]$", full.names=TRUE)
for (file in testFiles){
case <- runBenchmark(file, ".+")
resultsList <- rbind(resultsList, case)
}

count_star_interval <- function(datasource) {
druid.query.timeseries(
url = url,
dataSource = datasource,
intervals = interval(ymd(19920103),ymd(19981130)),
aggregations = list(druid.count()),
granularity = granularity("all"),
context=list(useCache=F, populateCache=F)
)
}

sum_price <- function(datasource) {
druid.query.timeseries(
url = url,
dataSource = datasource,
intervals = i,
aggregations = list(
sum(metric("l_extendedprice"))
),
granularity = granularity("all"),
context=list(useCache=F, populateCache=F)
)
}

sum_all <- function(datasource) {
druid.query.timeseries(
url = url,
dataSource = datasource,
intervals = i,
aggregations = list(
sum(metric("l_extendedprice")),
sum(metric("l_discount")),
sum(metric("l_quantity")),
sum(metric("l_tax"))
),
filter = NULL,
granularity = granularity("all"),
context=list(useCache=F, populateCache=F)
)
}

sum_all_year <- function(datasource) {
druid.query.timeseries(
url = url,
dataSource = datasource,
intervals = i,
aggregations = list(
sum(metric("l_extendedprice")),
sum(metric("l_discount")),
sum(metric("l_quantity")),
sum(metric("l_tax"))
),
filter = NULL,
granularity = granularity("P1Y"),
context=list(useCache=F, populateCache=F)
)
}


sum_all_filter <- function(datasource) {
druid.query.timeseries(
url = url,
dataSource = datasource,
intervals = i,
aggregations = list(
sum(metric("l_extendedprice")),
sum(metric("l_discount")),
sum(metric("l_quantity")),
sum(metric("l_tax"))
),
filter = dimension("l_shipmode") %~% ".*AIR.*",
granularity = granularity("all"),
context=list(useCache=F, populateCache=F)
)
}
#########

top_100_parts <- function(datasource) {
druid.query.topN(
url = url,
dataSource = datasource,
intervals = i,
metric = "l_quantity",
dimension = "l_partkey",
n=100,
aggregations = list(
sum(metric("l_quantity"))
),
filter = NULL,
granularity = granularity("all"),
context=list(useCache=F, populateCache=F)
)
}

top_100_parts_details <- function(datasource) {
druid.query.topN(
url = url,
dataSource = datasource,
intervals = i,
metric = "l_quantity",
dimension = "l_partkey",
n=100,
aggregations = list(
sum(metric("l_quantity")),
sum(metric("l_extendedprice")),
min(metric("l_discount")),
max(metric("l_discount"))
),
filter = NULL,
granularity = granularity("all"),
context=list(useCache=F, populateCache=F)
)
}


top_100_parts_filter <- function(datasource) {
druid.query.topN(
url = url,
dataSource = datasource,
intervals = interval(ymd(19960115), ymd(19980315)),
metric = "l_quantity",
dimension = "l_partkey",
n=100,
aggregations = list(
sum(metric("l_quantity")),
sum(metric("l_extendedprice")),
min(metric("l_discount")),
max(metric("l_discount"))
),
granularity = granularity("all"),
context=list(useCache=F, populateCache=F)
)
}

top_100_commitdate <- function(datasource) {
druid.query.topN(
url = url,
dataSource = datasource,
intervals = i,
metric = "l_quantity",
dimension = "l_commitdate",
n=100,
aggregations = list(
sum(metric("l_quantity"))
),
filter = NULL,
granularity = granularity("all"),
context=list(useCache=F, populateCache=F)
)
}
results <- as.data.frame(rbind(resultsList))
cat(sprintf("Result List %s.\n", results$time))

res1 <- microbenchmark(count_star_interval(datasource), times=n)
res2 <- microbenchmark(sum_price(datasource), times=n)
res3 <- microbenchmark(sum_all(datasource), times=n)
res4 <- microbenchmark(sum_all_year(datasource), times=n)
res5 <- microbenchmark(sum_all_filter(datasource), times=n)
res6 <- microbenchmark(top_100_parts(datasource), times=n)
res7 <- microbenchmark(top_100_parts_details(datasource), times=n)
res8 <- microbenchmark(top_100_parts_filter(datasource), times=n)
res9 <- microbenchmark(top_100_commitdate(datasource), times=n)

results <- as.data.frame(rbind(res1, res2, res3, res4, res5, res6, res7, res8, res9))
results$time <- results$time / 1e9
results$query <- as.character(sub("\\(.*\\)", replacement="", results$expr))
druid <- results[c("query", "time")]
Expand All @@ -221,17 +88,5 @@ filename <- paste(engine, ".tsv", sep="")
cat(sprintf("Writing results to %s.\n", filename))
write.table(druid, filename, quote=F, sep="\t", col.names=F, row.names=F)

# rowcounts <- c(
# count_star_interval = countrows(datasource, interval(ymd(19920103),ymd(19981130)), NULL),
# sum_price = countrows(datasource, i, NULL),
# sum_all = countrows(datasource, i, NULL),
# sum_all_year = countrows(datasource, i, NULL),
# sum_all_filter = countrows(datasource, i, dimension("l_shipmode") %~% ".*AIR.*"),
# top_100_parts = countrows(datasource, i, NULL),
# top_100_parts_details = countrows(datasource, i, NULL),
# top_100_parts_filter = countrows(datasource, interval(ymd(19960115), ymd(19980315)), NULL),
# top_100_commitdate = countrows(datasource, i, NULL)
# )
#
# rowcounts <- as.data.frame(cbind(query=names(rowcounts), rows=rowcounts))
# write.table(rowcounts, paste(datasource, "-rowcounts.tsv", sep=""), quote=F, sep="\t", col.names=F, row.names=F)


2 changes: 1 addition & 1 deletion benchmark-mysql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ do
t=$(( time -p ( echo $sql | mysql --host="$host" --user $user --password=$passwd --database tpch ) 2>&1 ) | grep real | sed -e 's/real *//')
echo "$name $t"
done
done < queries-mysql.sql
done < sqls/queries-mysql.sql
36 changes: 36 additions & 0 deletions config/coordinator-runtime.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/benchmark/bard

druid.extensions.remoteRepositories=[]
druid.extensions.localRepository=lib
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.62"]

druid.zk.service.host=<x.x.x.x>,<x.x.x.x>,<x.x.x.x>
druid.zk.paths.base=/druid/benchmark

druid.discovery.curator.path=/discovery

druid.broker.cache.type=memcached
druid.broker.cache.hosts=<memcached>:11211
druid.broker.cache.memcachedPrefix=benchmark
druid.broker.cache.expiration=2147483647

druid.broker.http.numConnections=300
druid.broker.http.readTimeout=PT5M

druid.query.chunkPeriod=P10Y
druid.query.topN.chunkPeriod=P10Y

druid.server.http.numThreads=50

druid.request.logging.type=emitter
druid.request.logging.feed=druid_requests

druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor", "io.druid.client.cache.CacheMonitor"]

druid.emitter=http
druid.emitter.http.recipientBaseUrl=http://<metrics>/

druid.announcer.type=batch
druid.curator.compress=true
File renamed without changes.
File renamed without changes.
Empty file modified plot-results.R
100644 → 100755
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading