@@ -265,13 +265,59 @@ static void configGitLock()
265
265
266
266
// ---------------------------------------------------------------------------------------------------------------------
267
267
268
+ static StringBuffer &getQueues (StringBuffer &queueNames, bool isLingeringServer)
269
+ {
270
+ Owned<IPropertyTree> config = getComponentConfig ();
271
+ const char * processName = config->queryProp (" @name" );
272
+ #ifdef _CONTAINERIZED
273
+ bool filtered = false ;
274
+ std::unordered_map<std::string, bool > listenQueues;
275
+ Owned<IPTreeIterator> listening = config->getElements (" listen" );
276
+ ForEach (*listening)
277
+ {
278
+ const char *lq = listening->query ().queryProp (" ." );
279
+ if (lq)
280
+ {
281
+ listenQueues[lq] = true ;
282
+ filtered = true ;
283
+ }
284
+ }
285
+ Owned<IPTreeIterator> queues = config->getElements (" queues" );
286
+ ForEach (*queues)
287
+ {
288
+ IPTree &queue = queues->query ();
289
+ const char *qname = queue.queryProp (" @name" );
290
+ if (!filtered || listenQueues.count (qname))
291
+ {
292
+ if (queueNames.length ())
293
+ queueNames.append (" ," );
294
+ getClusterEclCCServerQueueName (queueNames, qname);
295
+ }
296
+ }
297
+ if (isLingeringServer)
298
+ {
299
+ queueNames.append (" ," );
300
+ getClusterEclCCServerCompileQueueName (queueNames, processName);
301
+ }
302
+ #else
303
+ SCMStringBuffer scmQueueNames;
304
+ getEclCCServerQueueNames (scmQueueNames, processName);
305
+ queueNames.append (scmQueueNames.str ());
306
+ #endif
307
+ return queueNames;
308
+ }
309
+
310
+ // ---------------------------------------------------------------------------------------------------------------------
311
+
268
312
class EclccCompiler : implements IErrorReporter
269
313
{
314
+ StringAttr instanceName;
270
315
StringAttr wuid;
271
316
Owned<IWorkUnit> workunit;
272
317
StringBuffer idxStr;
273
318
StringArray filesSeen;
274
319
StringBuffer repoRootPath;
320
+ unsigned instanceNumber;
275
321
unsigned defaultMaxCompileThreads = 1 ;
276
322
bool saveTemps = false ;
277
323
@@ -857,8 +903,9 @@ class EclccCompiler : implements IErrorReporter
857
903
}
858
904
859
905
public:
860
- EclccCompiler (unsigned _idx)
906
+ EclccCompiler (const char * _instanceName, unsigned _idx) : instanceName(_instanceName )
861
907
{
908
+ instanceNumber = _idx+1 ;
862
909
idxStr.append (_idx);
863
910
864
911
const char * repoPathOption = getenv (" ECLCC_ECLREPO_PATH" );
@@ -887,25 +934,40 @@ class EclccCompiler : implements IErrorReporter
887
934
OWARNLOG (" Could not deduce the directory to store cached git repositories" );
888
935
}
889
936
890
- void compileViaK8sJob (bool noteDequeued )
937
+ void compileViaK8sJob (const char * wuid )
891
938
{
892
939
#ifdef _CONTAINERIZED
893
940
Owned<IException> error;
894
941
try
895
942
{
943
+ // Add the item onto a queue that only lingering eclccservers listen to.
944
+ // This prevents other threads picking up the item if it was placed back
945
+ // onto one of the other queues.
946
+ // Push it before starting the eclserver job - so that completing servers can
947
+ // pick it up early
948
+ StringBuffer internalQueueName;
949
+ getClusterEclCCServerCompileQueueName (internalQueueName, instanceName);
950
+
951
+ Owned<IJobQueue> queue = createJobQueue (internalQueueName);
952
+ Owned<IJobQueueItem> item = createJobQueueItem (wuid);
953
+ queue->enqueue (item.getClear ());
954
+
896
955
SCMStringBuffer optPlatformVersion;
897
956
{
898
957
Owned<IWorkUnitFactory> factory = getWorkUnitFactory ();
899
- Owned<IWorkUnit> wu = factory->updateWorkUnit (wuid. get () );
958
+ Owned<IWorkUnit> wu = factory->updateWorkUnit (wuid);
900
959
wu->getDebugValue (" platformVersion" , optPlatformVersion);
901
- if (noteDequeued)
902
- addTimeStamp (wu, SSToperation, " >compile" , StWhenDequeued, 0 );
960
+ addTimeStamp (wu, SSToperation, " >compile" , StWhenDequeued, 0 );
903
961
addTimeStamp (wu, SSToperation, " >compile" , StWhenK8sLaunched, 0 );
904
962
}
905
963
std::list<std::pair<std::string, std::string>> params = { };
906
964
if (optPlatformVersion.length ())
907
965
params.push_back ({ " _HPCC_JOB_VERSION_" , optPlatformVersion.str () });
908
- k8s::runJob (" compile" , wuid, wuid, params);
966
+
967
+ StringBuffer jobName;
968
+ jobName.append (" eclcc-" ).append (instanceName).append (" -" ).append (instanceNumber);
969
+
970
+ k8s::runJob (" compile" , nullptr , jobName, params);
909
971
}
910
972
catch (IException *E)
911
973
{
@@ -956,7 +1018,7 @@ class EclccCompiler : implements IErrorReporter
956
1018
// NOTE: This call does not modify the workunit itself, so no need to commit afterwards
957
1019
workunit->setContainerizedProcessInfo (" EclCCServer" , getComponentConfigSP ()->queryProp (" @name" ), k8s::queryMyPodName (), k8s::queryMyContainerName (), nullptr , nullptr );
958
1020
workunit.clear ();
959
- compileViaK8sJob (true );
1021
+ compileViaK8sJob (wuid );
960
1022
return WUStateUnknown;
961
1023
}
962
1024
}
@@ -1101,7 +1163,7 @@ class EclccCompileThread : implements IPooledThread, public CInterface
1101
1163
1102
1164
public:
1103
1165
IMPLEMENT_IINTERFACE;
1104
- EclccCompileThread (unsigned _idx) : compiler(_idx)
1166
+ EclccCompileThread (const char * _instanceName, unsigned _idx) : compiler(_instanceName, _idx)
1105
1167
{
1106
1168
}
1107
1169
@@ -1130,27 +1192,32 @@ class EclccCompileThread : implements IPooledThread, public CInterface
1130
1192
1131
1193
class EclccLingeringCompiler
1132
1194
{
1195
+ const char * instanceName;
1133
1196
EclccCompiler compiler;
1134
1197
Owned<IJobQueue> queue;
1135
- const char * name;
1136
1198
unsigned lingerTimeMs;
1137
1199
double costPerHour;
1138
1200
1139
1201
public:
1140
- EclccLingeringCompiler (IPropertyTree * globals, unsigned _idx) : compiler(_idx)
1202
+ EclccLingeringCompiler (IPropertyTree * globals, unsigned _idx)
1203
+ : instanceName(globals->queryProp (" @name" )), compiler(instanceName, _idx)
1141
1204
{
1142
1205
lingerTimeMs = globals->getPropInt (" @lingerPeriod" ) * 1000 ;
1143
- name = globals->queryProp (" @name" );
1144
1206
costPerHour = getMachineCostRate ();
1145
- // MORE: Set up queue
1207
+
1208
+ // This could spot updates to the queues, but if they change, the lingering compile servers
1209
+ // should exit and then eclccseverver will start new instances with the updated queues.
1210
+ StringBuffer queueNames;
1211
+ getQueues (queueNames, true );
1212
+ queue.setown (createJobQueue (queueNames));
1146
1213
}
1147
1214
1148
1215
void run ()
1149
1216
{
1150
1217
__uint64 startupElapsedTimeNs = 0 ; // MORE
1151
1218
cost_type costStart = money2cost_type (calcCostNs (costPerHour, startupElapsedTimeNs));
1152
1219
1153
- recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , name } }, { StNumStarts, StTimeStart, StCostStart }, { 1ULL , startupElapsedTimeNs, costStart });
1220
+ recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , instanceName } }, { StNumStarts, StTimeStart, StCostStart }, { 1ULL , startupElapsedTimeNs, costStart });
1154
1221
for (;;)
1155
1222
{
1156
1223
__uint64 priority = getTimeStampNowValue ();
@@ -1161,11 +1228,11 @@ class EclccLingeringCompiler
1161
1228
1162
1229
if (!item)
1163
1230
{
1164
- recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , name } }, { StNumWaits, StTimeWaitFailure, StCostWait }, { 1 , waitTimeNs, costWait });
1231
+ recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , instanceName } }, { StNumWaits, StTimeWaitFailure, StCostWait }, { 1 , waitTimeNs, costWait });
1165
1232
break ;
1166
1233
}
1167
1234
1168
- recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , name } }, { StNumAccepts, StNumWaits, StTimeWaitSuccess, StCostWait }, { 1 , 1 , waitTimeNs, costWait });
1235
+ recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , instanceName } }, { StNumAccepts, StNumWaits, StTimeWaitSuccess, StCostWait }, { 1 , 1 , waitTimeNs, costWait });
1169
1236
1170
1237
WUState state = compiler.compileWorkunit (item->queryWUID (), true );
1171
1238
@@ -1174,11 +1241,11 @@ class EclccLingeringCompiler
1174
1241
const char * username = " MORE" ;
1175
1242
1176
1243
if (state == WUStateAborted)
1177
- recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , name }, { " user" , username } }, { StNumAborts, StTimeLocalExecute, StCostAbort }, { 1 , executeTimeNs, costExecute });
1244
+ recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , instanceName }, { " user" , username } }, { StNumAborts, StTimeLocalExecute, StCostAbort }, { 1 , executeTimeNs, costExecute });
1178
1245
else if (state == WUStateFailed)
1179
- recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , name }, { " user" , username } }, { StNumFailures, StTimeLocalExecute, StCostExecute }, { 1 , executeTimeNs, costExecute });
1246
+ recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , instanceName }, { " user" , username } }, { StNumFailures, StTimeLocalExecute, StCostExecute }, { 1 , executeTimeNs, costExecute });
1180
1247
else if (state != WUStateUnknown)
1181
- recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , name }, { " user" , username } }, { StTimeLocalExecute, StCostExecute }, { executeTimeNs, costExecute });
1248
+ recordGlobalMetrics (" Queue" , { {" component" , " eclccserver" }, { " name" , instanceName }, { " user" , username } }, { StTimeLocalExecute, StCostExecute }, { executeTimeNs, costExecute });
1182
1249
}
1183
1250
}
1184
1251
};
@@ -1223,45 +1290,9 @@ static void removePrecompiledHeader()
1223
1290
// Class EclccServer manages a pool of compile threads
1224
1291
// ------------------------------------------------------------------------------------------------------------------
1225
1292
1226
- static StringBuffer &getQueues (StringBuffer &queueNames)
1227
- {
1228
- Owned<IPropertyTree> config = getComponentConfig ();
1229
- #ifdef _CONTAINERIZED
1230
- bool filtered = false ;
1231
- std::unordered_map<std::string, bool > listenQueues;
1232
- Owned<IPTreeIterator> listening = config->getElements (" listen" );
1233
- ForEach (*listening)
1234
- {
1235
- const char *lq = listening->query ().queryProp (" ." );
1236
- if (lq)
1237
- {
1238
- listenQueues[lq] = true ;
1239
- filtered = true ;
1240
- }
1241
- }
1242
- Owned<IPTreeIterator> queues = config->getElements (" queues" );
1243
- ForEach (*queues)
1244
- {
1245
- IPTree &queue = queues->query ();
1246
- const char *qname = queue.queryProp (" @name" );
1247
- if (!filtered || listenQueues.count (qname))
1248
- {
1249
- if (queueNames.length ())
1250
- queueNames.append (" ," );
1251
- getClusterEclCCServerQueueName (queueNames, qname);
1252
- }
1253
- }
1254
- #else
1255
- const char * processName = config->queryProp (" @name" );
1256
- SCMStringBuffer scmQueueNames;
1257
- getEclCCServerQueueNames (scmQueueNames, processName);
1258
- queueNames.append (scmQueueNames.str ());
1259
- #endif
1260
- return queueNames;
1261
- }
1262
-
1263
1293
class EclccServer : public CInterface , implements IThreadFactory, implements IAbortHandler
1264
1294
{
1295
+ StringAttr instanceName;
1265
1296
StringAttr queueNames;
1266
1297
unsigned poolSize;
1267
1298
Owned<IThreadPool> pool;
@@ -1279,7 +1310,7 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb
1279
1310
void configUpdate ()
1280
1311
{
1281
1312
StringBuffer newQueueNames;
1282
- getQueues (newQueueNames);
1313
+ getQueues (newQueueNames, false );
1283
1314
if (!newQueueNames.length ())
1284
1315
ERRLOG (" No queues found to listen on" );
1285
1316
Linked<IJobQueue> currentQueue;
@@ -1296,8 +1327,8 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb
1296
1327
}
1297
1328
public:
1298
1329
IMPLEMENT_IINTERFACE;
1299
- EclccServer (const char *_queueName, unsigned _poolSize)
1300
- : poolSize(_poolSize), serverstatus(" ECLCCserver" ), updatedQueueNames(_queueName)
1330
+ EclccServer (const char * _instanceName, const char * _queueName, unsigned _poolSize)
1331
+ : instanceName(_instanceName), poolSize(_poolSize), serverstatus(" ECLCCserver" ), updatedQueueNames(_queueName)
1301
1332
{
1302
1333
threadsActive = 0 ;
1303
1334
running = false ;
@@ -1389,7 +1420,7 @@ class EclccServer : public CInterface, implements IThreadFactory, implements IAb
1389
1420
virtual IPooledThread *createNew ()
1390
1421
{
1391
1422
CriticalBlock b (threadActiveCrit);
1392
- return new EclccCompileThread (threadsActive++);
1423
+ return new EclccCompileThread (instanceName, threadsActive++);
1393
1424
}
1394
1425
1395
1426
virtual bool onAbort ()
@@ -1541,13 +1572,11 @@ int main(int argc, const char *argv[])
1541
1572
initClientProcess (serverGroup, DCR_EclCCServer);
1542
1573
openLogFile ();
1543
1574
configGitLock ();
1544
- const char *wuid = globals->queryProp (" @workunit" );
1545
- if (wuid)
1575
+ if (globals->getPropBool (" @k8sJob" , false ))
1546
1576
{
1547
1577
// One shot mode
1548
- EclccCompileThread compiler (0 );
1549
- compiler.init (const_cast <char *>(wuid));
1550
- compiler.threadmain ();
1578
+ EclccLingeringCompiler compiler (globals, 0 );
1579
+ compiler.run ();
1551
1580
}
1552
1581
else
1553
1582
{
@@ -1559,7 +1588,7 @@ int main(int argc, const char *argv[])
1559
1588
}
1560
1589
1561
1590
StringBuffer queueNames;
1562
- getQueues (queueNames);
1591
+ getQueues (queueNames, false );
1563
1592
if (!queueNames.length ())
1564
1593
throw MakeStringException (0 , " No queues found to listen on" );
1565
1594
@@ -1573,7 +1602,7 @@ int main(int argc, const char *argv[])
1573
1602
// still accept the old name if the new one is not present.
1574
1603
unsigned maxThreads = globals->getPropInt (" @maxEclccProcesses" , globals->getPropInt (" @maxCompileThreads" , 4 ));
1575
1604
#endif
1576
- EclccServer server (queueNames.str (), maxThreads);
1605
+ EclccServer server (globals-> queryProp ( " @name " ), queueNames.str (), maxThreads);
1577
1606
// if we got here, eclserver is successfully started and all options are good, so create the "sentinel file" for re-runs from the script
1578
1607
// put in its own "scope" to force the flush
1579
1608
writeSentinelFile (sentinelFile);
0 commit comments