Skip to content

Commit d4d88bd

Browse files
committed
Switch over to remainder of the code
Signed-off-by: Gavin Halliday <[email protected]>
1 parent 2d60327 commit d4d88bd

File tree

3 files changed

+105
-68
lines changed

3 files changed

+105
-68
lines changed

dali/base/daqueue.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#define THOR_QUEUE_EXT ".thor"
3131
#define THORLINGER_QUEUE_EXT ".lingerthor"
3232
#define ECLCCSERVER_QUEUE_EXT ".eclserver"
33+
#define ECLCCSERVER_COMPILE_QUEUE_EXT ".eclserver.compile"
3334
#define ECLSERVER_QUEUE_EXT ECLCCSERVER_QUEUE_EXT
3435
#define ECLSCHEDULER_QUEUE_EXT ".eclscheduler"
3536
#define ECLAGENT_QUEUE_EXT ".agent"
@@ -48,6 +49,11 @@ inline StringBuffer &getClusterEclCCServerQueueName(StringBuffer &ret, const cha
4849
return ret.append(cluster).append(ECLCCSERVER_QUEUE_EXT);
4950
}
5051

52+
inline StringBuffer &getClusterEclCCServerCompileQueueName(StringBuffer &ret, const char *cluster)
53+
{
54+
return ret.append(cluster).append(ECLCCSERVER_COMPILE_QUEUE_EXT);
55+
}
56+
5157
inline StringBuffer &getClusterEclServerQueueName(StringBuffer &ret, const char *cluster)
5258
{
5359
return ret.append(cluster).append(ECLSERVER_QUEUE_EXT);

ecl/eclccserver/eclccserver.cpp

Lines changed: 96 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -265,13 +265,59 @@ static void configGitLock()
265265

266266
//---------------------------------------------------------------------------------------------------------------------
267267

268+
static StringBuffer &getQueues(StringBuffer &queueNames, bool isLingeringServer)
269+
{
270+
Owned<IPropertyTree> config = getComponentConfig();
271+
const char * processName = config->queryProp("@name");
272+
#ifdef _CONTAINERIZED
273+
bool filtered = false;
274+
std::unordered_map<std::string, bool> listenQueues;
275+
Owned<IPTreeIterator> listening = config->getElements("listen");
276+
ForEach (*listening)
277+
{
278+
const char *lq = listening->query().queryProp(".");
279+
if (lq)
280+
{
281+
listenQueues[lq] = true;
282+
filtered = true;
283+
}
284+
}
285+
Owned<IPTreeIterator> queues = config->getElements("queues");
286+
ForEach(*queues)
287+
{
288+
IPTree &queue = queues->query();
289+
const char *qname = queue.queryProp("@name");
290+
if (!filtered || listenQueues.count(qname))
291+
{
292+
if (queueNames.length())
293+
queueNames.append(",");
294+
getClusterEclCCServerQueueName(queueNames, qname);
295+
}
296+
}
297+
if (isLingeringServer)
298+
{
299+
queueNames.append(",");
300+
getClusterEclCCServerCompileQueueName(queueNames, processName);
301+
}
302+
#else
303+
SCMStringBuffer scmQueueNames;
304+
getEclCCServerQueueNames(scmQueueNames, processName);
305+
queueNames.append(scmQueueNames.str());
306+
#endif
307+
return queueNames;
308+
}
309+
310+
//---------------------------------------------------------------------------------------------------------------------
311+
268312
class EclccCompiler : implements IErrorReporter
269313
{
314+
StringAttr instanceName;
270315
StringAttr wuid;
271316
Owned<IWorkUnit> workunit;
272317
StringBuffer idxStr;
273318
StringArray filesSeen;
274319
StringBuffer repoRootPath;
320+
unsigned instanceNumber;
275321
unsigned defaultMaxCompileThreads = 1;
276322
bool saveTemps = false;
277323

@@ -857,8 +903,9 @@ class EclccCompiler : implements IErrorReporter
857903
}
858904

859905
public:
860-
EclccCompiler(unsigned _idx)
906+
EclccCompiler(const char * _instanceName, unsigned _idx) : instanceName(_instanceName)
861907
{
908+
instanceNumber = _idx+1;
862909
idxStr.append(_idx);
863910

864911
const char * repoPathOption = getenv("ECLCC_ECLREPO_PATH");
@@ -887,25 +934,40 @@ class EclccCompiler : implements IErrorReporter
887934
OWARNLOG("Could not deduce the directory to store cached git repositories");
888935
}
889936

890-
void compileViaK8sJob(bool noteDequeued)
937+
void compileViaK8sJob(const char * wuid)
891938
{
892939
#ifdef _CONTAINERIZED
893940
Owned<IException> error;
894941
try
895942
{
943+
// Add the item onto a queue that only lingering eclccservers listen to.
944+
// This prevents other threads picking up the item if it was placed back
945+
// onto one of the other queues.
946+
// Push it before starting the eclserver job - so that completing servers can
947+
// pick it up early
948+
StringBuffer internalQueueName;
949+
getClusterEclCCServerCompileQueueName(internalQueueName, instanceName);
950+
951+
Owned<IJobQueue> queue = createJobQueue(internalQueueName);
952+
Owned<IJobQueueItem> item = createJobQueueItem(wuid);
953+
queue->enqueue(item.getClear());
954+
896955
SCMStringBuffer optPlatformVersion;
897956
{
898957
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
899-
Owned<IWorkUnit> wu = factory->updateWorkUnit(wuid.get());
958+
Owned<IWorkUnit> wu = factory->updateWorkUnit(wuid);
900959
wu->getDebugValue("platformVersion", optPlatformVersion);
901-
if (noteDequeued)
902-
addTimeStamp(wu, SSToperation, ">compile", StWhenDequeued, 0);
960+
addTimeStamp(wu, SSToperation, ">compile", StWhenDequeued, 0);
903961
addTimeStamp(wu, SSToperation, ">compile", StWhenK8sLaunched, 0);
904962
}
905963
std::list<std::pair<std::string, std::string>> params = { };
906964
if (optPlatformVersion.length())
907965
params.push_back({ "_HPCC_JOB_VERSION_", optPlatformVersion.str() });
908-
k8s::runJob("compile", wuid, wuid, params);
966+
967+
StringBuffer jobName;
968+
jobName.append("eclcc-").append(instanceName).append("-").append(instanceNumber);
969+
970+
k8s::runJob("compile", nullptr, jobName, params);
909971
}
910972
catch (IException *E)
911973
{
@@ -914,7 +976,7 @@ class EclccCompiler : implements IErrorReporter
914976
if (error)
915977
{
916978
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
917-
workunit.setown(factory->updateWorkUnit(wuid.get()));
979+
workunit.setown(factory->updateWorkUnit(wuid));
918980
if (workunit)
919981
{
920982
if (workunit->aborting())
@@ -956,7 +1018,7 @@ class EclccCompiler : implements IErrorReporter
9561018
//NOTE: This call does not modify the workunit itself, so no need to commit afterwards
9571019
workunit->setContainerizedProcessInfo("EclCCServer", getComponentConfigSP()->queryProp("@name"), k8s::queryMyPodName(), k8s::queryMyContainerName(), nullptr, nullptr);
9581020
workunit.clear();
959-
compileViaK8sJob(true);
1021+
compileViaK8sJob(wuid);
9601022
return WUStateUnknown;
9611023
}
9621024
}
@@ -1101,7 +1163,7 @@ class EclccCompileThread : implements IPooledThread, public CInterface
11011163

11021164
public:
11031165
IMPLEMENT_IINTERFACE;
1104-
EclccCompileThread(unsigned _idx) : compiler(_idx)
1166+
EclccCompileThread(const char * _instanceName, unsigned _idx) : compiler(_instanceName, _idx)
11051167
{
11061168
}
11071169

@@ -1130,27 +1192,32 @@ class EclccCompileThread : implements IPooledThread, public CInterface
11301192

11311193
class EclccLingeringCompiler
11321194
{
1195+
const char * instanceName;
11331196
EclccCompiler compiler;
11341197
Owned<IJobQueue> queue;
1135-
const char * name;
11361198
unsigned lingerTimeMs;
11371199
double costPerHour;
11381200

11391201
public:
1140-
EclccLingeringCompiler(IPropertyTree * globals, unsigned _idx) : compiler(_idx)
1202+
EclccLingeringCompiler(IPropertyTree * globals, unsigned _idx)
1203+
: instanceName(globals->queryProp("@name")), compiler(instanceName, _idx)
11411204
{
11421205
lingerTimeMs = globals->getPropInt("@lingerPeriod") * 1000;
1143-
name = globals->queryProp("@name");
11441206
costPerHour = getMachineCostRate();
1145-
//MORE: Set up queue
1207+
1208+
//This could spot updates to the queues, but if they change, the lingering compile servers
1209+
//should exit and then eclccseverver will start new instances with the updated queues.
1210+
StringBuffer queueNames;
1211+
getQueues(queueNames, true);
1212+
queue.setown(createJobQueue(queueNames));
11461213
}
11471214

11481215
void run()
11491216
{
11501217
__uint64 startupElapsedTimeNs = 0; // MORE
11511218
cost_type costStart = money2cost_type(calcCostNs(costPerHour, startupElapsedTimeNs));
11521219

1153-
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", name } }, { StNumStarts, StTimeStart, StCostStart }, { 1ULL, startupElapsedTimeNs, costStart });
1220+
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", instanceName } }, { StNumStarts, StTimeStart, StCostStart }, { 1ULL, startupElapsedTimeNs, costStart });
11541221
for (;;)
11551222
{
11561223
__uint64 priority = getTimeStampNowValue();
@@ -1161,11 +1228,11 @@ class EclccLingeringCompiler
11611228

11621229
if (!item)
11631230
{
1164-
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", name } }, { StNumWaits, StTimeWaitFailure, StCostWait }, { 1, waitTimeNs, costWait });
1231+
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", instanceName } }, { StNumWaits, StTimeWaitFailure, StCostWait }, { 1, waitTimeNs, costWait });
11651232
break;
11661233
}
11671234

1168-
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", name } }, { StNumAccepts, StNumWaits, StTimeWaitSuccess, StCostWait }, { 1, 1, waitTimeNs, costWait });
1235+
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", instanceName } }, { StNumAccepts, StNumWaits, StTimeWaitSuccess, StCostWait }, { 1, 1, waitTimeNs, costWait });
11691236

11701237
WUState state = compiler.compileWorkunit(item->queryWUID(), true);
11711238

@@ -1174,11 +1241,11 @@ class EclccLingeringCompiler
11741241
const char * username = "MORE";
11751242

11761243
if (state == WUStateAborted)
1177-
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", name }, { "user", username } }, { StNumAborts, StTimeLocalExecute, StCostAbort }, { 1, executeTimeNs, costExecute });
1244+
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", instanceName }, { "user", username } }, { StNumAborts, StTimeLocalExecute, StCostAbort }, { 1, executeTimeNs, costExecute });
11781245
else if (state == WUStateFailed)
1179-
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", name }, { "user", username } }, { StNumFailures, StTimeLocalExecute, StCostExecute }, { 1, executeTimeNs, costExecute });
1246+
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", instanceName }, { "user", username } }, { StNumFailures, StTimeLocalExecute, StCostExecute }, { 1, executeTimeNs, costExecute });
11801247
else if (state != WUStateUnknown)
1181-
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", name }, { "user", username } }, { StTimeLocalExecute, StCostExecute }, { executeTimeNs, costExecute });
1248+
recordGlobalMetrics("Queue", { {"component", "eclccserver" }, { "name", instanceName }, { "user", username } }, { StTimeLocalExecute, StCostExecute }, { executeTimeNs, costExecute });
11821249
}
11831250
}
11841251
};
@@ -1223,45 +1290,9 @@ static void removePrecompiledHeader()
12231290
// Class EclccServer manages a pool of compile threads
12241291
//------------------------------------------------------------------------------------------------------------------
12251292

1226-
static StringBuffer &getQueues(StringBuffer &queueNames)
1227-
{
1228-
Owned<IPropertyTree> config = getComponentConfig();
1229-
#ifdef _CONTAINERIZED
1230-
bool filtered = false;
1231-
std::unordered_map<std::string, bool> listenQueues;
1232-
Owned<IPTreeIterator> listening = config->getElements("listen");
1233-
ForEach (*listening)
1234-
{
1235-
const char *lq = listening->query().queryProp(".");
1236-
if (lq)
1237-
{
1238-
listenQueues[lq] = true;
1239-
filtered = true;
1240-
}
1241-
}
1242-
Owned<IPTreeIterator> queues = config->getElements("queues");
1243-
ForEach(*queues)
1244-
{
1245-
IPTree &queue = queues->query();
1246-
const char *qname = queue.queryProp("@name");
1247-
if (!filtered || listenQueues.count(qname))
1248-
{
1249-
if (queueNames.length())
1250-
queueNames.append(",");
1251-
getClusterEclCCServerQueueName(queueNames, qname);
1252-
}
1253-
}
1254-
#else
1255-
const char * processName = config->queryProp("@name");
1256-
SCMStringBuffer scmQueueNames;
1257-
getEclCCServerQueueNames(scmQueueNames, processName);
1258-
queueNames.append(scmQueueNames.str());
1259-
#endif
1260-
return queueNames;
1261-
}
1262-
12631293
class EclccServer : public CInterface, implements IThreadFactory, implements IAbortHandler
12641294
{
1295+
StringAttr instanceName;
12651296
StringAttr queueNames;
12661297
unsigned poolSize;
12671298
Owned<IThreadPool> pool;
@@ -1279,7 +1310,7 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb
12791310
void configUpdate()
12801311
{
12811312
StringBuffer newQueueNames;
1282-
getQueues(newQueueNames);
1313+
getQueues(newQueueNames, false);
12831314
if (!newQueueNames.length())
12841315
ERRLOG("No queues found to listen on");
12851316
Linked<IJobQueue> currentQueue;
@@ -1296,8 +1327,8 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb
12961327
}
12971328
public:
12981329
IMPLEMENT_IINTERFACE;
1299-
EclccServer(const char *_queueName, unsigned _poolSize)
1300-
: poolSize(_poolSize), serverstatus("ECLCCserver"), updatedQueueNames(_queueName)
1330+
EclccServer(const char * _instanceName, const char *_queueName, unsigned _poolSize)
1331+
: instanceName(_instanceName), poolSize(_poolSize), serverstatus("ECLCCserver"), updatedQueueNames(_queueName)
13011332
{
13021333
threadsActive = 0;
13031334
running = false;
@@ -1389,7 +1420,7 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb
13891420
virtual IPooledThread *createNew()
13901421
{
13911422
CriticalBlock b(threadActiveCrit);
1392-
return new EclccCompileThread(threadsActive++);
1423+
return new EclccCompileThread(instanceName, threadsActive++);
13931424
}
13941425

13951426
virtual bool onAbort()
@@ -1541,13 +1572,11 @@ int main(int argc, const char *argv[])
15411572
initClientProcess(serverGroup, DCR_EclCCServer);
15421573
openLogFile();
15431574
configGitLock();
1544-
const char *wuid = globals->queryProp("@workunit");
1545-
if (wuid)
1575+
if (globals->getPropBool("@k8sJob", false))
15461576
{
15471577
// One shot mode
1548-
EclccCompileThread compiler(0);
1549-
compiler.init(const_cast<char *>(wuid));
1550-
compiler.threadmain();
1578+
EclccLingeringCompiler compiler(globals, 0);
1579+
compiler.run();
15511580
}
15521581
else
15531582
{
@@ -1559,7 +1588,7 @@ int main(int argc, const char *argv[])
15591588
}
15601589

15611590
StringBuffer queueNames;
1562-
getQueues(queueNames);
1591+
getQueues(queueNames, false);
15631592
if (!queueNames.length())
15641593
throw MakeStringException(0, "No queues found to listen on");
15651594

@@ -1573,7 +1602,7 @@ int main(int argc, const char *argv[])
15731602
// still accept the old name if the new one is not present.
15741603
unsigned maxThreads = globals->getPropInt("@maxEclccProcesses", globals->getPropInt("@maxCompileThreads", 4));
15751604
#endif
1576-
EclccServer server(queueNames.str(), maxThreads);
1605+
EclccServer server(globals->queryProp("@name"), queueNames.str(), maxThreads);
15771606
// if we got here, eclserver is successfully started and all options are good, so create the "sentinel file" for re-runs from the script
15781607
// put in its own "scope" to force the flush
15791608
writeSentinelFile(sentinelFile);

system/jlib/jcontainerized.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,9 @@ bool applyYaml(const char *componentName, const char *wuid, const char *job, con
225225
}
226226
jobYaml.replaceString("_HPCC_JOBNAME_", jobName.str());
227227

228-
VStringBuffer args("\"--workunit=%s\"", wuid);
228+
StringBuffer args;
229+
if (wuid)
230+
args.appendf("\"--workunit=%s\"", wuid);
229231
args.append(" \"--k8sJob=true\"");
230232
const char *baseImageVersion = getenv("baseImageVersion");
231233
const char *runtimeImageVersion = baseImageVersion; // runtime image version will equal base version unless changed dynamically below

0 commit comments

Comments
 (0)