Skip to content

Commit 6790ea1

Browse files
authored
feat: tiered storage support ssd/nvme as first layer (#30)
1 parent 7994dcf commit 6790ea1

15 files changed

Lines changed: 604 additions & 203 deletions

File tree

README.md

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ For the complete documentation of Kafka, refer to [here](README.kafka.md).
3333
The easiest way to try Pafka is to use the docker image: https://hub.docker.com/r/4pdopensource/pafka-dev
3434

3535
```
36-
docker run -it -v $YOUR_PMEM_PATH:/mnt/mem 4pdopensource/pafka-dev bash
36+
docker run -it 4pdopensource/pafka-dev bash
3737
```
3838

3939
where $YOUR_PMEM_PATH is the mount point of PMem (DAX file system) in the host system.
@@ -78,31 +78,49 @@ To verify the correctness, you can use any file systems with normal hard disks.
7878

7979
#### 3.3.2. Config
8080

81-
In order to support PMem storage, we add some more config fields to the Kafka [server config](config/server.properties).
81+
In order to support tiered storage, we add some more config fields to the Kafka [server config](config/server.properties).
8282

8383
|Config|Default Value|Note|
8484
|------|-------------|----|
85-
|storage.pmem.paths|/pmem|pmem mount paths (separated by ,). first-layer storage <br /> (Only applicable if log.channel.type=mix or pmem)|
86-
|storage.pmem.sizes|-1|pmem capacities in bytes (separated by ,); -1 means use all the space <br />(Only applicable if log.channel.type=mix or pmem)|
87-
|storage.hdd.paths|/hdd|hdd mount paths (separated by ,). second-layer storage <br />(Only applicable if log.channel.type=mix)|
88-
|storage.migrate.threads|1|the number of threads used for migration <br />(Only applicable if log.channel.type=mix)|
89-
|storage.migrate.threshold|0.5|the threshold used to control when to start the migration. <br /> -1 means no migration. <br />(Only applicable if log.channel.type=mix)|
90-
|log.channel.type|file|log file channel type. <br /> Options: "file", "pmem", "mix".<br />"file": use normal FileChannel as vanilla Kafka does <br />"pmem": use PMemChannel, which will use pmem as the log storage<br />"mix": use MixChannel, which will use pmem as the first-layer storage and hdd as the second-layer storage|
91-
|log.pmem.pool.ratio|0.8|A pool of log segments will be pre-allocated. This is the proportion of total pmem size. Pre-allocation will increase the first startup time, but can eliminate the dynamic allocation cost when serving requests.<br />(Only applicable if log.channel.type=mix or pmem)|
85+
|log.channel.type|file|log file channel type. <br /> Options: "file", "pmem", "tiered".<br />"file": use normal file as vanilla Kafka does <br />"pmem": use pmem as the log storage<br />"tiered": use tiered storage|
86+
|storage.tiers.types|PMEM,HDD|the storage types for each layers (separated by ,). <br /> Available types are PMEM, NVME, SSD, HDD.|
87+
|storage.tiers.first.paths|/pmem|first-layer storage paths (separated by ,). <br /> (Only applicable if log.channel.type=tiered or pmem)|
88+
|storage.tiers.first.sizes|-1|first-layer storage capacities in bytes (separated by ,); -1 means use all the space <br />(Only applicable if log.channel.type=tiered or pmem)|
89+
|storage.tiers.second.paths|/hdd|second-layer storage paths (separated by ,) <br />(Only applicable if log.channel.type=tiered)|
90+
|storage.migrate.threshold|0.5|the threshold used to control when to start the migration. <br /> -1 means no migration. <br />(Only applicable if log.channel.type=tiered)|
91+
|storage.migrate.threads|1|the number of threads used for migration <br />(Only applicable if log.channel.type=tiered)|
92+
|log.pmem.pool.ratio|0.8|A pool of log segments will be pre-allocated. This is the proportion of total pmem size. Pre-allocation will increase the first startup time, but can eliminate the dynamic allocation cost when serving requests.<br />(Only applicable if we are using pmem as the first-layer storage)|
9293

9394
> :warning: **`log.preallocate` has to be set to `true` if pmem is used, as PMem MemoryBlock does not support `append`-like operations.**
9495
9596
Sample config in config/server.properties is as follows:
9697

97-
log.dirs=/pmem/pafka
98-
storage.pmem.paths=/pmem/pafka-pool
99-
storage.pmem.sizes=600000000000
100-
storage.hdd.paths=/hdd/pafka-pool
98+
######## start of tiered storage config ########
99+
100+
# log file channel type; Options: "file", "pmem", "tiered".
101+
# if "file": use normal file as vanilla Kafka does. Following configs are not applicable.
102+
log.channel.type=file
103+
# the storage types for each layers (separated by ,)
104+
storage.tiers.types=PMEM,HDD
105+
# first-layer storage paths (separated by ,)
106+
storage.tiers.first.paths=/pmem
107+
# first-layer storage capacities in bytes (separated by ,); -1 means use all the space
108+
storage.tiers.first.sizes=-1
109+
# second-layer storage paths (separated by ,)
110+
storage.tiers.second.paths=/hdd
111+
# threshold to control when to start the migration; -1 means no migration.
112+
storage.migrate.threshold=0.5
113+
# migration threads
114+
storage.migrate.threads=1
115+
116+
# pmem-specific config
117+
# pre-allocated pool ratio
101118
log.pmem.pool.ratio=0.8
102-
log.channel.type=mix
103119
# log.preallocate have to set to true if pmem is used
104120
log.preallocate=true
105121

122+
######## end of tiered storage config ########
123+
106124
#### 3.3.3. Start Pafka
107125
Follow instructions in https://kafka.apache.org/quickstart. Basically:
108126

@@ -160,8 +178,7 @@ We've tested on Java 8, Java 11 and Java 15.
160178
> WARNING: All illegal access operations will be denied in a future release
161179
162180

163-
- Currently, only the log files are stored in PMem, while the indexes are still kept as normal files, as we do not see much performance gain if we move the indexes to PMem.
164-
- Release `v0.1.x` uses PMem as the only storage device, which may limit the use for some scenarios that require a large capacity for log storage. Release `v0.2.0` addresses this issue by introducing a tiered storage strategy.
181+
- Currently, only the log files are stored in the tiered storage, while the indexes are still kept as normal files, as we do not see much performance gain if we move the indexes to fast storage.
165182

166183

167184
## 5. Roadmap
@@ -170,7 +187,7 @@ We've tested on Java 8, Java 11 and Java 15.
170187
|---|---|---|
171188
|v0.1.1|Released|- Use PMem for data storage <br /> - Significant performance boost compared with Kafka |
172189
|v0.2.0|Released|- A two-layered storage strategy to utilize the total capacity of all storage devices while maintaining the efficiency by our cold-hot data migration algorithms<br /> - Further PMem performance improvement by using `libpmem` |
173-
|v0.3.0|Q4 2021|- Configurable storage devices for both 1st and 2nd layers, to support using large SSD as the first layer |
190+
|v0.3.0|Released|- Configurable storage devices for both 1st and 2nd layers, to support using large SSD/NVMe as the first layer |
174191

175192
## 6. Community
176193

bin/bench.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -149,16 +149,16 @@ def config_topic(topic_num, brokers, topic_config):
149149
# print("cmd2:", out, err)
150150

151151

152-
def control_clients(hosts, threads, throughput=0, init=False):
152+
def control_clients(hosts, threads, throughput=0, init=False, config="/tmp/producer-throttler"):
153153
print("Throttle throughput = {} records/s".format(throughput))
154154

155155
bin = PAFKA_HOME
156156
throughput_per_thread = int(throughput / threads / len(hosts))
157157
for host in hosts:
158158
if init:
159-
cmd='ssh {} "cd {}; python3 ./bin/throttler.py --action init"'.format(host, bin)
159+
cmd='ssh {} "cd {}; python3 ./bin/throttler.py --action init --config {}"'.format(host, bin, config)
160160
else:
161-
cmd='ssh {} "cd {}; python3 ./bin/throttler.py --action set --throughput {}"'.format(host, bin, throughput_per_thread)
161+
cmd='ssh {} "cd {}; python3 ./bin/throttler.py --action set --throughput {} --config {}"'.format(host, bin, throughput_per_thread, config)
162162

163163
out, err = execute_cmd(cmd)
164164

@@ -244,7 +244,7 @@ def parse_consumer(line):
244244

245245

246246
class Throttler (threading.Thread):
247-
def __init__(self, hosts: list, threads: int, dynamic_thr: str, sleept=10, steps=5, control_type="sleep", name="Throttler", only_min_max=False):
247+
def __init__(self, hosts: list, threads: int, dynamic_thr: str, sleept=10, steps=5, control_type="sleep", name="Throttler", only_min_max=False, throttler_config="/tmp/producer-throttler"):
248248
threading.Thread.__init__(self)
249249
self.name = name
250250
toks = dynamic_thr.split(":")
@@ -259,17 +259,20 @@ def __init__(self, hosts: list, threads: int, dynamic_thr: str, sleept=10, steps
259259
self.control_type = control_type
260260
self.stopped = False
261261
self.only_min_max = only_min_max
262+
self.throttler_config = throttler_config
262263

263264
def run(self):
264265
print("Starting " + self.name)
265266
factor = float(self.max_thr + self.avg_thr) / (self.avg_thr + self.min_thr)
266267
run_steps = []
268+
delay_high = 20
269+
delay_low = 20
267270

268271
if self.only_min_max:
269-
run_steps = [self.min_thr, self.max_thr]
272+
run_steps = [self.max_thr, self.min_thr]
270273
factor = float(self.max_thr - self.avg_thr) / (self.avg_thr - self.min_thr)
271-
sleep_low = factor * self.sleept
272-
sleep_high = self.sleept
274+
sleep_low = factor * self.sleept - delay_low
275+
sleep_high = self.sleept - delay_high
273276
else:
274277
if self.control_type == "step":
275278
steps_high = self.steps
@@ -321,8 +324,9 @@ def run(self):
321324
# random.shuffle(run_steps)
322325

323326
print("Run steps: {}".format(run_steps))
327+
print("Sleep windows: low: {}, high: {}".format(sleep_low, sleep_high))
324328

325-
control_clients(self.hosts, self.threads, self.min_thr)
329+
control_clients(self.hosts, self.threads, self.min_thr, config=self.throttler_config)
326330
time.sleep(10)
327331

328332
count = 0
@@ -333,7 +337,7 @@ def run(self):
333337
else:
334338
sleep_curr = sleep_high
335339

336-
control_clients(self.hosts, self.threads, thr)
340+
control_clients(self.hosts, self.threads, thr, config=self.throttler_config)
337341
time.sleep(sleep_curr)
338342

339343
print("Exiting " + self.name)
@@ -373,8 +377,8 @@ def print_res(cum_records, total_rec_per_s, total_thr_per_s, thr_unit, total_avg
373377
cum_records = 0
374378

375379
if args.use_dynamic and args.type == "producer":
376-
control_clients(hosts=hosts, threads=threads, init=True)
377-
throttler = Throttler(hosts=hosts, threads=threads, dynamic_thr=args.dynamic, sleept=args.sleept, steps=args.steps, control_type=args.control_type, only_min_max=args.only_min_max)
380+
control_clients(hosts=hosts, threads=threads, init=True, config=args.throttler_config)
381+
throttler = Throttler(hosts=hosts, threads=threads, dynamic_thr=args.dynamic, sleept=args.sleept, steps=args.steps, control_type=args.control_type, only_min_max=args.only_min_max, throttler_config=args.throttler_config)
378382
throttler.daemon = True
379383
throttler.start()
380384

0 commit comments

Comments
 (0)