-
Notifications
You must be signed in to change notification settings - Fork 160
Open
Description
We want to implement multi-queue scheduling based on the Queue() module(core/modules/queue.cc), so we created multiple llring(core/kmod/llring.h) type queues. We want a specific number of packets from the queue to be dequeued and added to processbatch. Doing the following in runtask() will throw an error:
The places I modified are marked
struct task_result Queue::RunTask(Context *ctx, bess::PacketBatch *batch,
void *) {
if (children_overload_ > 0) {
return {
.block = true,
.packets = 0,
.bits = 0,
};
}
// const int burst = ACCESS_ONCE(burst_);
const int pkt_overhead = 24;
uint64_t total_bytes = 0;
int ret;
uint32_t cnt=0;
// My Modification
// uint32_t cnt = llring_sc_dequeue_burst(queue_, (void **)batch->pkts(), burst);
bess::Packet *pkt;
ret = llring_sc_dequeue(queue_, (void **)&pkt); //queue type: struct llring
if(!ret){
batch->add(pkt);
cnt++;
}
if (cnt == 0) {
return {.block = true, .packets = 0, .bits = 0};
}
stats_.dequeued += cnt;
batch->set_cnt(cnt);
if (prefetch_) {
for (uint32_t i = 0; i < cnt; i++) {
total_bytes += batch->pkts()[i]->total_len();
rte_prefetch0(batch->pkts()[i]->head_data());
}
} else {
for (uint32_t i = 0; i < cnt; i++) {
total_bytes += batch->pkts()[i]->total_len();
}
}
RunNextModule(ctx, batch);
if (backpressure_ && llring_count(queue_) < low_water_) {
SignalUnderload();
}
return {.block = false,
.packets = cnt,
.bits = (total_bytes + cnt * pkt_overhead) * 8};
}
CommandResponse Queue::CommandSetBurst(
const bess::pb::QueueCommandSetBurstArg &arg) {
uint64_t burst = arg.burst();
if (burst > bess::PacketBatch::kMaxBurst) {
return CommandFailure(EINVAL, "burst size must be [0,%zu]",
bess::PacketBatch::kMaxBurst);
}
burst_ = burst;
return CommandSuccess();
}
Modified as follows
// my modification
// uint32_t cnt = llring_sc_dequeue_burst(queue_, (void **)batch->pkts(), burst);
bess::Packet *pkt;
ret = llring_sc_dequeue(queue_, (void **)&pkt); //queue type: struct llring
if(!ret){
batch->add(pkt);
cnt++;
}
The error displayed by bessed.ERROR is as follows:
Log file created at: 2022/09/20 01:09:45
Running on machine: ubuntu
Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg
F0920 01:09:45.085120 9286 debug.cc:405] A critical error has occured. Aborting...
Signal: 11 (Segmentation fault), si_code: 1 (SEGV_MAPERR: address not mapped to object)
pid: 9270, tid: 9286, address: 0x7f16c9225838, IP: 0x557a0a5494a7
Backtrace (recent calls first) ---
(0): /home/bess/bess/core/bessd(_ZN5Queue7RunTaskEP7ContextPN4bess11PacketBatchEPv+0xc7) [0x557a0a5494a7]
Queue::RunTask(Context*, bess::PacketBatch*, void*) at /home/bess/bess/core/modules/../kmod/../pktbatch.h:54
51: // WARNING: this function has no bounds checks and so it's possible to
52: // overrun the buffer by calling this. We are not adding bounds check because
53: // we want maximum GOFAST.
-> 54: void add(Packet *pkt) { pkts_[cnt_++] = pkt; }
55: void add(PacketBatch *batch) {
56: bess::utils::CopyInlined(pkts_ + cnt_, batch->pkts(),
57: batch->cnt() * sizeof(Packet *));
(inlined by) Queue::RunTask(Context*, bess::PacketBatch*, void*) at /home/bess/bess/core/modules/queue.cc:222
219: bess::Packet *pkt;
220: ret = llring_sc_dequeue(queue_, (void **)&pkt); //queue type: struct llring
221: if(!ret){
-> 222: batch->add(pkt);
223: cnt++;
224: }
225:
(1): /home/bess/bess/core/bessd(_ZNK4TaskclEP7Context+0x67) [0x557a0a416457]
Task::operator()(Context*) const at /home/bess/bess/core/task.cc:53
-> 53: struct task_result result = module_->RunTask(ctx, &init_batch, arg_);
(2): /home/bess/bess/core/bessd(_ZN4bess16DefaultScheduler12ScheduleLoopEv+0x1d0) [0x557a0a3d33d0]
bess::DefaultScheduler::ScheduleLoop() at /home/bess/bess/core/scheduler.h:272
-> 272: auto ret = (*ctx->task)(ctx);
(inlined by) bess::DefaultScheduler::ScheduleLoop() at /home/bess/bess/core/scheduler.h:250
-> 250: ScheduleOnce(&ctx);
(3): /home/bess/bess/core/bessd(_ZN6Worker3RunEPv+0x201) [0x557a0a3cfc11]
Worker::Run(void*) at /home/bess/bess/core/worker.cc:323
-> 323: scheduler_->ScheduleLoop();
(4): /home/bess/bess/core/bessd(_Z10run_workerPv+0x7c) [0x557a0a3cfeec]
run_worker(void*) at /home/bess/bess/core/worker.cc:337
-> 337: return current_worker.Run(_arg);
(5): /home/bess/bess/core/bessd(+0x1191a6e) [0x557a0b0aea6e]
execute_native_thread_routine at thread.o:?
(6): /lib/x86_64-linux-gnu/libpthread.so.0(+0x76da) [0x7f162bd9e6da]
start_thread at /build/glibc-uZu3wS/glibc-2.27/nptl/pthread_create.c:463
(file/line not available)
(7): /lib/x86_64-linux-gnu/libc.so.6(clone+0x3e) [0x7f162b30d61e]
clone at /build/glibc-uZu3wS/glibc-2.27/misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:95
(file/line not available)
My test demo is:
import scapy.all as scapy
eth = scapy.Ether(dst='00:02:15:37:a2:44', src='00:ae:f3:52:aa:d1')
ip = scapy.IP()
udp = scapy.UDP()
payload = 'Hello World'
test_packet = bytes(eth/ip/udp/payload)
bess.add_worker(0, 0)
bess.add_worker(1, 1)
src::Source() \
-> Rewrite(templates=[test_packet]) \
-> VLANPush(tci=2) \
-> queue::Queue() \
-> Sink()
#queue.set_burst(burst=4)
bess.add_tc('fast', policy='rate_limit',wid=0, resource='bit', limit={'bit': 1000000})
src.attach_task('fast')
bess.add_tc('slow', policy='rate_limit', wid=1, resource='bit', limit={'bit': 1000000})
queue.attach_task('slow')
# To get queue occupancy, issue this command in bessctl once this script is loaded:
# command module queue get_status QueueCommandGetStatusArg
How can I solve this problem, looking forward to your reply :)
Metadata
Metadata
Assignees
Labels
No labels