-
Notifications
You must be signed in to change notification settings - Fork 19
Description
Problem
When using PocketCoffea on coffea-casa, the current DaskExecutorFactory.setup() logic repeatedly failed leading to:
- The Dask scheduler was reachable,
- but all workers disappeared (“Number of Workers: 0”, “Number of Cores: 0”),
- scaling workers from the UI or from the executor had no effect,
- PocketCoffea jobs remained stuck at 0% progress,
- the Dask dashboard showed no active tasks
Potential reasons:
- client.restart() terminates all connected workers, resets the scheduler state, but does not automatically resubmit new workers under HTCondor(?) - and I think that is the main issue
- PipInstall plugin is unnecessary
As PocketCoffea is suppossed to be already installed in the coffea-casa environment, PipInstall triggers package downloads on every worker and delays(maybe sometimes prevents) worker startup
Here is the simplified dask executor code with which I managed to start the dask executor:
from pocket_coffea.executors.executors_base import ExecutorFactoryABC
from pocket_coffea.executors.executors_base import IterativeExecutorFactory, FuturesExecutorFactory
from coffea import processor as coffea_processor
import os
class DaskExecutorFactory(ExecutorFactoryABC):
def __init__(self, run_options, outputdir, **kwargs):
self.outputdir = outputdir
super().__init__(run_options, **kwargs)
def setup(self):
''' Start the DASK cluster here'''
# At coffea-casa we have preconfigured Dask HTCondor cluster for you, please just use it available at tls://localhost:8786
from distributed import Client
addr = os.environ.get("DASK_SCHEDULER_ADDRESS", "tls://localhost:8786")
print(f"Connecting to Dask scheduler at {addr}")
self.dask_client = Client(addr)
# previous implementation
#def setup(self):
#''' Start the DASK cluster here'''
## At coffea-casa we have preconfigured Dask HTCondor cluster for you, please just use it available at tls://localhost:8786
#from distributed import Client
#self.dask_client = Client("tls://localhost:8786")
## Install pocket_coffea to dask workers via Dask scheduler plugin PipInstall
## PLEASE CHANGE if you are using fork or different tag / branch
#from dask.distributed import PipInstall
#plugin = PipInstall(packages=["git+https://github.com/PocketCoffea/PocketCoffea.git@main"])
#self.dask_client.register_plugin(plugin)
## Making sure all workers have uptodate configuration
#self.dask_client.restart()
def customized_args(self):
args = super().customized_args()
args["client"] = self.dask_client
return args
def get(self):
return coffea_processor.dask_executor(**self.customized_args())
def close(self):
self.dask_client.close()
def get_executor_factory(executor_name, **kwargs):
if executor_name == "iterative":
return IterativeExecutorFactory(**kwargs)
elif executor_name == "futures":
return FuturesExecutorFactory(**kwargs)
elif executor_name == "dask":
return DaskExecutorFactory(**kwargs)Maybe even the DASK_SCHEDULER_ADDRESS part is unnecessary — I haven’t tested without it yet.
The problem is - I’ve been running into this “half-dead” cluster state quite often on coffea-casa(no workers ever appear, and no tasks being processing). I’m not sure what exactly triggers it as with this executor I also experienced it — it could be something I did while experimenting (restarting the client too often, spawning workers, etc.), it could be the previous executor logic (client.restart() / PipInstall), or it might be something on the coffea-casa side.