Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions pdns/recursordist/rec-keepwarm.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* This file is part of PowerDNS or dnsdist.
* Copyright -- PowerDNS.COM B.V. and its contributors
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of version 2 of the GNU General Public License as
* published by the Free Software Foundation.
*
* In addition, for the avoidance of any doubt, permission is granted to
* link this program with OpenSSL and to (re)distribute the binaries
* produced as the result of such linking.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once

#include <ctime>

#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/key_extractors.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index/tag.hpp>
#include <utility>

#include "dnsname.hh"
#include "qtype.hh"

namespace rec
{
using namespace ::boost::multi_index;

struct KeepWarmEntry
{
KeepWarmEntry(DNSName name, QType qtype, time_t ttd = 0) : d_qname(std::move(name)), d_ttd(ttd), d_qtype(qtype) {}
DNSName d_qname;
time_t d_ttd;
uint16_t d_qtype;
};

class KeepWarm
{
public:
struct QNameQTypeTag
{
};

struct TTDTag
{
};

using Queue = multi_index_container<
KeepWarmEntry,
indexed_by<ordered_unique<tag<QNameQTypeTag>,
composite_key<KeepWarmEntry,
member<KeepWarmEntry, DNSName, &KeepWarmEntry::d_qname>,
member<KeepWarmEntry, uint16_t, &KeepWarmEntry::d_qtype>>>,
ordered_non_unique<tag<TTDTag>, member<KeepWarmEntry, time_t, &KeepWarmEntry::d_ttd>, std::less<>>>>;

[[nodiscard]] const Queue& get() const
{
return d_queue;
}
void modifyTTD(KeepWarmEntry& entry, uint32_t ttd)
{
auto item = d_queue.find(std::tie(entry.d_qname, entry.d_qtype));
if (item != d_queue.end()) {
d_queue.modify(item, [ttd](rec::KeepWarmEntry& entry) { entry.d_ttd = ttd; });
}
entry.d_ttd = ttd;
}
void emplace(const DNSName& name, uint16_t qtype)
{
d_queue.emplace(name, qtype);
}
Queue::iterator erase(Queue::iterator iter)
{
return d_queue.erase(iter);
}
Queue::iterator begin()
{
return d_queue.begin();
}
Queue::iterator end()
{
return d_queue.end();
}
[[nodiscard]] size_t size() const
{
return d_queue.size();
}

private:
Queue d_queue;
};
}
1 change: 1 addition & 0 deletions pdns/recursordist/rec-lua-conf.hh
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public:
ProtobufExportConfig outgoingProtobufExportConfig;
FrameStreamExportConfig frameStreamExportConfig;
FrameStreamExportConfig nodFrameStreamExportConfig;
std::vector<std::pair<DNSName, QType>> keepWarm;
std::shared_ptr<Logr::Logger> d_slog;
/* we need to increment this every time the configuration
is reloaded, so we know if we need to reload the protobuf
Expand Down
143 changes: 136 additions & 7 deletions pdns/recursordist/rec-main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "threadname.hh"
#include "version.hh"
#include "ws-recursor.hh"
#include "rec-keepwarm.hh"

#ifdef NOD_ENABLED
#include "nod.hh"
Expand Down Expand Up @@ -99,7 +100,6 @@ LockGuarded<std::shared_ptr<NetmaskGroup>> g_initialAllowNotifyFrom; // new thre
LockGuarded<std::shared_ptr<notifyset_t>> g_initialAllowNotifyFor; // new threads need this to be setup
LockGuarded<std::shared_ptr<OpenTelemetryTraceConditions>> g_initialOpenTelemetryConditions; // new threads need this to be setup
static time_t s_statisticsInterval;
static std::atomic<uint32_t> s_counter;
int g_argc;
char** g_argv;
static string s_structured_logger_backend;
Expand Down Expand Up @@ -127,6 +127,7 @@ bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen
unsigned int RecThreadInfo::s_numDistributorThreads;
unsigned int RecThreadInfo::s_numUDPWorkerThreads;
unsigned int RecThreadInfo::s_numTCPWorkerThreads;
unsigned int RecThreadInfo::s_numTaskThreads;
thread_local unsigned int RecThreadInfo::t_id{RecThreadInfo::TID_NOT_INITED};

pdns::RateLimitedLog g_rateLimitedLogger;
Expand Down Expand Up @@ -2254,6 +2255,7 @@ static int serviceMain(Logr::log_t log)

RecThreadInfo::setNumDistributorThreads(::arg().asNum("distributor-threads"));
RecThreadInfo::setNumUDPWorkerThreads(::arg().asNum("threads"));
RecThreadInfo::setNumTaskThreads(::arg().asNum("taskthreads"));
if (RecThreadInfo::numUDPWorkers() < 1) {
log->info(Logr::Warning, "Asked to run with 0 threads, raising to 1 instead");
RecThreadInfo::setNumUDPWorkerThreads(1);
Expand Down Expand Up @@ -2452,6 +2454,119 @@ static void handleRCC(int fileDesc, FDMultiplexer::funcparam_t& /* var */)
}
}

static time_t keepCacheWarm(const timeval& now, LocalStateHolder<LuaConfigItems>& luaconfsLocal)
{
auto log = g_slog->withName("cachewarmer");

static LockGuarded<rec::KeepWarm> s_keepwarm;
static uint64_t lastgeneration = 0;

auto lock = s_keepwarm.lock();

if (lastgeneration != luaconfsLocal->generation) {
lastgeneration = luaconfsLocal->generation;
for (const auto& [qname, qtype] : luaconfsLocal->keepWarm) {
lock->emplace(qname, qtype);
}
std::set<std::pair<DNSName, QType>> all;
std::copy(luaconfsLocal->keepWarm.begin(), luaconfsLocal->keepWarm.end(), std::inserter(all, all.end()));
for (auto iter = lock->begin(); iter != lock->end();) {
if (all.count({iter->d_qname, QType(iter->d_qtype)}) == 0) {
iter = lock->erase(iter);
}
else {
++iter;
}
}
}

std::vector<rec::KeepWarmEntry> toBeHandled;

const auto& sidx = lock->get().template get<rec::KeepWarm::TTDTag>();
auto siter = sidx.begin();

const auto batchSize = std::min(static_cast<size_t>(1000), lock->size());
const time_t specialTime = 1;
const time_t cooldown = 60;
const time_t almost = 5;
for (size_t i = 0; i < batchSize && siter != sidx.end(); i++, siter++) {

if (siter->d_ttd > now.tv_sec + almost) {
break;
}
toBeHandled.emplace_back(*siter);
}

for (auto& element : toBeHandled) {
if (element.d_ttd == specialTime) {
SyncRes resolver(now);
resolver.setQNameMinimization(true);
resolver.setCacheOnly(true);
resolver.setDoDNSSEC(g_dnssecmode != DNSSECMode::Off);
resolver.setDNSSECValidationRequested(g_dnssecmode != DNSSECMode::Off && g_dnssecmode != DNSSECMode::ProcessNoValidate);
std::vector<DNSRecord> ret;
int res = -1;
const std::string msg = "Exception while resolving";
try {
res = resolver.beginResolve(element.d_qname, element.d_qtype, QClass::IN, ret, 0);
}
catch (const PDNSException& e) {
log->error(Logr::Warning, e.reason, msg, "exception", Logging::Loggable("PDNSException"));
ret.clear();
}
catch (const ImmediateServFailException& e) {
log->error(Logr::Warning, e.reason, msg, "exception", Logging::Loggable("ImmediateServFailException"));
ret.clear();
}
catch (const PolicyHitException& e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
catch (const PolicyHitException& e) {
catch (const PolicyHitException&) {

log->info(Logr::Warning, msg, "exception", Logging::Loggable("PolicyHitException"));
ret.clear();
}
catch (const std::exception& e) {
log->error(Logr::Warning, e.what(), msg, "exception", Logging::Loggable("std::exception"));
ret.clear();
}
catch (...) {
log->info(Logr::Warning, msg);
ret.clear();
}

uint32_t minttl = cooldown; // If no records found, either it did not resolve at all, or it did
// not resolve yet. In both cases, pace the work.
if (ret.size() > 0) {
minttl = std::numeric_limits<uint32_t>::max();
bool haveAnswerRecord = false;
for (const auto& record : ret) {
if (record.d_place == DNSResourceRecord::ANSWER) {
haveAnswerRecord = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tab crept in?

}
minttl = std::min(minttl, record.d_ttl);
}
if (haveAnswerRecord && !haveFinalAnswer(element.d_qname, element.d_qtype, res, ret)) {
// Common cause: a record in the CNAME chain expired, setting the minttl will trigger a task push below
minttl = 0;
}
}
lock->modifyTTD(element, now.tv_sec + minttl);
}
if (element.d_ttd <= now.tv_sec + almost) { // include non-initialized (0) case
pushAlmostExpiredTask(element.d_qname, element.d_qtype, now.tv_sec + cooldown, ComboAddress("255.255.255.255"), true);
lock->modifyTTD(element, specialTime);
}
}

time_t wait = cooldown;
siter = sidx.begin();
if (siter != sidx.end()) {
wait = siter->d_ttd - now.tv_sec - 1;
wait = std::max(static_cast<time_t>(1), wait);
wait = std::min(static_cast<time_t>(cooldown), wait);
}

log->info(Logr::Debug, "Wait", "interval", Logging::Loggable(wait));
return wait;
}

class PeriodicTask
{
public:
Expand Down Expand Up @@ -2530,8 +2645,14 @@ static void houseKeepingWork(Logr::log_t log)
// Below are the thread specific tasks for the handler and the taskThread
// Likley a few handler tasks could be moved to the taskThread
if (info.isTaskThread()) {
static PeriodicTask keepWarmTask{"KeepWarmTask", 1};
keepWarmTask.runIfDue(now, [now, &luaconfsLocal] {
auto actionRequired = keepCacheWarm(now, luaconfsLocal);
keepWarmTask.setPeriod(actionRequired);
});

// TaskQueue is run always
runTasks(10, g_logCommonErrors);
runTasks(100, g_logCommonErrors);

static PeriodicTask ztcTask{"ZTC", 60};
static map<DNSName, RecZoneToCache::State> ztcStates;
Expand Down Expand Up @@ -2608,17 +2729,18 @@ static void houseKeepingWork(Logr::log_t log)
pruneCookies(now.tv_sec - 3000);
});

// By default, refresh at 80% of max-cache-ttl with a minimum period of 10s
// By default, refresh at 80% of lowest TTL seen in the result with a minimum period of 10s
const unsigned int minRootRefreshInterval = 10;
static PeriodicTask rootUpdateTask{"rootUpdateTask", std::max(SyncRes::s_maxcachettl * 8 / 10, minRootRefreshInterval)};
rootUpdateTask.runIfDue(now, [now, &log, minRootRefreshInterval]() {
int res = 0;
uint32_t minttl = SyncRes::s_maxcachettl;
if (!g_regressionTestMode) {
res = SyncRes::getRootNS(now, nullptr, 0, log);
res = SyncRes::getRootNS(now, nullptr, 0, log, minttl);
}
if (res == 0) {
// Success, go back to the default period
rootUpdateTask.setPeriod(std::max(SyncRes::s_maxcachettl * 8 / 10, minRootRefreshInterval));
rootUpdateTask.setPeriod(std::max(minttl * 8 / 10, minRootRefreshInterval));
}
else {
// On failure, go to the middle of the remaining period (initially 80% / 8 = 10%) and shorten the interval on each
Expand Down Expand Up @@ -2734,6 +2856,13 @@ static void recLoop()

auto& threadInfo = RecThreadInfo::self();

static std::atomic<uint32_t> s_counter;

// Use primes, it avoid not being scheduled in cases where the counter has a regular pattern.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Use primes, it avoid not being scheduled in cases where the counter has a regular pattern.
// Use primes, they avoid not being scheduled in cases where the counter shows a regular pattern.

// We want to call handler thread often, it gets scheduled about 2 times per second on an idle recursor
constexpr uint32_t handlerAndTaskInterval = 11;
constexpr uint32_t otherInterval = 499;

while (!RecursorControlChannel::stop) {
try {
while (g_multiTasker->schedule(g_now)) {
Expand All @@ -2742,7 +2871,7 @@ static void recLoop()

// Use primes, it avoid not being scheduled in cases where the counter has a regular pattern.
// We want to call handler thread often, it gets scheduled about 2 times per second
if (((threadInfo.isHandler() || threadInfo.isTaskThread()) && s_counter % 11 == 0) || s_counter % 499 == 0) {
if (((threadInfo.isHandler() || threadInfo.isTaskThread()) && s_counter % handlerAndTaskInterval == 0) || s_counter % otherInterval == 0) {
timeval start{};
Utility::gettimeofday(&start);
g_multiTasker->makeThread(houseKeeping, nullptr);
Expand Down Expand Up @@ -2783,7 +2912,7 @@ static void recLoop()
}
runLuaMaintenance(threadInfo, last_lua_maintenance, luaMaintenanceInterval);

auto timeoutUsec = g_multiTasker->nextWaiterDelayUsec(500000);
auto timeoutUsec = g_multiTasker->nextWaiterDelayUsec(1000000U / handlerAndTaskInterval / 2);
t_fdm->run(&g_now, static_cast<int>(timeoutUsec / 1000));
// 'run' updates g_now for us
}
Expand Down
8 changes: 7 additions & 1 deletion pdns/recursordist/rec-main.hh
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public:

static unsigned int numTaskThreads()
{
return 1;
return s_numTaskThreads;
}

static unsigned int numUDPWorkers()
Expand Down Expand Up @@ -482,6 +482,11 @@ public:
s_numDistributorThreads = n;
}

static void setNumTaskThreads(unsigned int n)
{
s_numTaskThreads = n;
}

static unsigned int numRecursorThreads()
{
return numHandlers() + numDistributors() + numUDPWorkers() + numTCPWorkers() + numTaskThreads();
Expand Down Expand Up @@ -589,6 +594,7 @@ private:
static unsigned int s_numDistributorThreads;
static unsigned int s_numUDPWorkerThreads;
static unsigned int s_numTCPWorkerThreads;
static unsigned int s_numTaskThreads;
};

struct ThreadMSG
Expand Down
Loading
Loading