Skip to content

Caching integration #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 159 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
159 commits
Select commit Hold shift + click to select a range
9215d64
added storage object to represent caches and associated readout funct…
tfesenbecker Oct 31, 2019
0a46da7
extended CLI to support storage files
tfesenbecker Oct 31, 2019
e727b69
extended simulator to support storage files
tfesenbecker Oct 31, 2019
d64954d
added new drone attribute sitename connecting drones and storage elem…
tfesenbecker Oct 31, 2019
5597434
added file provider object connecting storage objects and jobs
tfesenbecker Oct 31, 2019
bb1dcbe
added different caching/cache cleaning/walltime recalculation algorithms
tfesenbecker Oct 31, 2019
8d6db96
renamed storage readout
tfesenbecker Oct 31, 2019
fb150db
fixed debug output
tfesenbecker Nov 1, 2019
69072ae
renamed storage input reader
tfesenbecker Nov 1, 2019
53ebec5
updated Job class
tfesenbecker Nov 2, 2019
f997223
replaced function modifying walltime by function with property decorator
tfesenbecker Nov 2, 2019
2e2c06f
Revert "replaced function modifying walltime by function with propert…
tfesenbecker Nov 2, 2019
110b3e9
replaced function modifying walltime by function with property decorator
tfesenbecker Nov 2, 2019
b032a0d
resolving PEP8 issues
tfesenbecker Nov 2, 2019
7753d0d
Merge branch 'master' of https://github.com/MatterMiners/lapis into c…
tfesenbecker Nov 2, 2019
5123034
fixed file provider bug (wrong inputfiles dictionary)
tfesenbecker Nov 2, 2019
1c2fe9f
Update lapis/cli/simulate.py
tfesenbecker Nov 4, 2019
8739ce9
renamed function get_used_storage to _calculate_used_storage
tfesenbecker Nov 4, 2019
0b5a922
Merge branch 'cachingextension' of https://github.com/tfesenbecker/la…
tfesenbecker Nov 4, 2019
855242a
attached fileprovider to drone instead of job and passed it via make_…
tfesenbecker Nov 4, 2019
bfadacb
reworked file coverage function to return a score
tfesenbecker Nov 4, 2019
3f30c58
added proper __repr__ function
tfesenbecker Nov 4, 2019
2b214aa
added file classes
tfesenbecker Nov 7, 2019
2bd91d7
moved caching algorithm and associated cache cleanup to it's own class
tfesenbecker Nov 7, 2019
29576eb
Redesign of the storage class and associated changes
tfesenbecker Nov 7, 2019
146fbe3
put walltime getter and walltime recalculation back in seperate methods
tfesenbecker Nov 7, 2019
7ef8dd9
added parallel treatment of jobs input files in file provider
tfesenbecker Nov 7, 2019
b94ab82
fixed failed unit test that were caused by Drone without file provide…
tfesenbecker Nov 7, 2019
1e9e795
Merge branch 'master' of https://github.com/MatterMiners/lapis into c…
tfesenbecker Nov 7, 2019
191df2b
changed scoring to take filesizes into consideration
tfesenbecker Nov 8, 2019
a635318
Merge branch 'cachingextension' into feature/storageimprovement
tfesenbecker Nov 8, 2019
6f7ace1
Merge pull request #1 from tfesenbecker/feature/storageimprovement
tfesenbecker Nov 8, 2019
75165ad
fixed bug from merge
tfesenbecker Nov 8, 2019
d943ed6
Merge branch 'cachingextension' of https://github.com/tfesenbecker/la…
tfesenbecker Nov 8, 2019
9453632
Merge pull request #2 from tfesenbecker/feature/storageimprovement
tfesenbecker Nov 8, 2019
32faa38
removed debug output to fix unit test
tfesenbecker Nov 8, 2019
78a6f18
First steps towards including everything concerning caching into moni…
tfesenbecker Nov 11, 2019
c7c2e03
renamed method
tfesenbecker Nov 11, 2019
73ada77
split processing of job into file transfer and actual calculation
tfesenbecker Nov 11, 2019
02a79bb
refactored storage and file provider objects in order to use Pipe
tfesenbecker Nov 11, 2019
63664e0
added monitoring for remote and storage connections
tfesenbecker Nov 12, 2019
2df3841
small fix in monitoring
tfesenbecker Nov 12, 2019
a6a9783
Merge branch 'master' of https://github.com/MatterMiners/lapis into f…
tfesenbecker Nov 13, 2019
6e5cdd7
Merge branch 'master' of https://github.com/MatterMiners/lapis into f…
tfesenbecker Nov 14, 2019
cf5b3ab
adapted job walltime to new job processing in order to fix job event …
tfesenbecker Nov 17, 2019
3768f5b
minor clean ups
tfesenbecker Nov 17, 2019
df17230
added cache modelation via cachehitrate
tfesenbecker Nov 17, 2019
128425f
Update lapis/cachealgorithm.py
tfesenbecker Nov 17, 2019
04223e4
Update lapis/storage.py
tfesenbecker Nov 17, 2019
59830d7
Update lapis/storage.py
tfesenbecker Nov 17, 2019
a7b3323
Update lapis/storage.py
tfesenbecker Nov 17, 2019
eff97c4
Update lapis/storage.py
tfesenbecker Nov 17, 2019
ffca7a3
Update lapis/storage.py
tfesenbecker Nov 17, 2019
97e3f83
Update lapis/storage.py
tfesenbecker Nov 17, 2019
a4ceec4
resolved PEP8 issue
tfesenbecker Nov 17, 2019
b2cb120
Merge pull request #61 from tfesenbecker/feature/includepipes
tfesenbecker Nov 19, 2019
cdeeea6
minor fix
tfesenbecker Nov 19, 2019
edfc8f1
moved definition of remote throughput to CLI input, storage object th…
tfesenbecker Nov 20, 2019
25a2a23
Extended cache algorithm documentation
tfesenbecker Nov 20, 2019
b374cf6
implemented minor changes requested in PRs
tfesenbecker Nov 20, 2019
4487db6
Update lapis/storage.py
tfesenbecker Nov 20, 2019
521529f
Merge branch 'master' of https://github.com/MatterMiners/lapis into f…
tfesenbecker Nov 22, 2019
5ebf272
updated usim version requirement to 0.4.2
tfesenbecker Nov 26, 2019
1e814ea
completed renaming of file provider to connection
tfesenbecker Nov 26, 2019
43361c9
fixed job and simulator unit tests
tfesenbecker Nov 27, 2019
d47789f
fixed job and simulator unit tests
tfesenbecker Nov 27, 2019
a443618
replaced Storage.__repr__ to match the other classes
tfesenbecker Nov 27, 2019
cca1859
added missing default values for unit test compatibility
tfesenbecker Nov 27, 2019
36f5966
extended monitoring
tfesenbecker Nov 27, 2019
be5482f
fixed PEP8 issue
tfesenbecker Nov 27, 2019
4a358cf
updated via_usim decorator
eileen-kuehn Nov 27, 2019
d0051c2
added statistics about jobs in DummyScheduler
eileen-kuehn Nov 27, 2019
4cae9be
made unit tests succeed again
eileen-kuehn Nov 27, 2019
19ff945
made enabling of monitoring explicit
eileen-kuehn Nov 27, 2019
719fc97
blackened file
eileen-kuehn Nov 27, 2019
b87064d
changed cli to also start without any caching information
eileen-kuehn Nov 27, 2019
c1ae198
changed assignment of connections a bit
eileen-kuehn Nov 27, 2019
e57cd00
removed creation of connection module from test as it is not required
eileen-kuehn Nov 27, 2019
6df1b27
converted storage and file sizes to bytes
eileen-kuehn Nov 27, 2019
aced023
Merge branch 'master' into feature/caching
eileen-kuehn Nov 27, 2019
000be38
corrected access to numberofaccesses
eileen-kuehn Nov 27, 2019
08c6432
changed signature of StoredFile and adapted in IO operations
eileen-kuehn Nov 27, 2019
883b758
improved storage
eileen-kuehn Nov 27, 2019
df362a5
Merge branch 'master' into feature/caching
eileen-kuehn Nov 27, 2019
d5673b4
minimum required usim version set to 0.4.3
eileen-kuehn Nov 27, 2019
65fd2be
renamed remove_from_storage and add_to_storage to remove and add
eileen-kuehn Nov 27, 2019
325194d
made free_space a property of storage
eileen-kuehn Nov 27, 2019
22e4ea7
removed method find_file from storage
eileen-kuehn Nov 27, 2019
7faf2ee
added todo
eileen-kuehn Nov 27, 2019
e7a21e7
ignored B006 for flake8
eileen-kuehn Nov 27, 2019
a06dc62
if file is available on storage, transfer now receives correct size
eileen-kuehn Nov 27, 2019
3e3752b
fixed position of noqa
eileen-kuehn Nov 27, 2019
a570d50
made determine_inputfile_source private to connection
eileen-kuehn Nov 27, 2019
4ce52df
renamed transfer_inputfiles to transfer_files
eileen-kuehn Nov 27, 2019
6eb4615
removed queue from file lookup in storage and improved determine inpu…
eileen-kuehn Nov 27, 2019
7a7492f
improved stream file in connection
eileen-kuehn Nov 27, 2019
6a8801b
introduced HitrateStorage that transfers data based on a cache hitrate
eileen-kuehn Nov 27, 2019
43577ac
fixed position of noqa
eileen-kuehn Nov 27, 2019
19bde1f
removed cachehitrate from connection
eileen-kuehn Nov 27, 2019
0c2bd3d
connection now sets reference to remote_connection for storage
eileen-kuehn Nov 27, 2019
122a351
storage objects are now created based on specified cache hit rate
eileen-kuehn Nov 27, 2019
23a7d7b
adapted usage of caching for jobs
eileen-kuehn Nov 28, 2019
e2e8f57
introduced calculation efficiency to job
eileen-kuehn Nov 28, 2019
b8e51fb
introduced calculation efficiency for jobs to cli
eileen-kuehn Nov 28, 2019
2056b41
added more type hints for job
eileen-kuehn Nov 28, 2019
ddb5727
removed initialisation of connection
eileen-kuehn Nov 28, 2019
f730a51
moved caching related monitoring to extra file
eileen-kuehn Nov 28, 2019
1bb6b13
each simulation run now can be identified
eileen-kuehn Nov 28, 2019
4c771a0
added caching-specific monitoring information to documentation
eileen-kuehn Nov 28, 2019
250ddae
added type hints for simulator
eileen-kuehn Nov 28, 2019
ddba83b
changed sizes for storage to bytes
eileen-kuehn Nov 28, 2019
f49c4ef
fixed bug leading to full RAM
tfesenbecker Nov 28, 2019
7d92a69
Merge branch 'feature/caching' of https://github.com/MatterMiners/lap…
tfesenbecker Nov 28, 2019
0b3b81e
added RemoteStorage
eileen-kuehn Nov 28, 2019
e2650c0
renamed storagesize to size and ensured correct units
eileen-kuehn Nov 28, 2019
0adb6d0
ensured that size is always int
eileen-kuehn Nov 28, 2019
ac73c82
renamed method again
eileen-kuehn Nov 28, 2019
17b23da
fixed semmle issue
eileen-kuehn Nov 28, 2019
09ba7a4
added type hints
eileen-kuehn Nov 28, 2019
eae5555
removed cachealgorithm from storage and moved to connection
eileen-kuehn Nov 28, 2019
2e6c1dd
fixed bug leading to full RAM again
tfesenbecker Nov 29, 2019
0e3c573
fixed bug leading to full RAM again
tfesenbecker Nov 29, 2019
27c1ea7
Merge branch 'feature/caching' of https://github.com/MatterMiners/lap…
tfesenbecker Nov 29, 2019
2a61f00
fix hit rate based caching functionality
tfesenbecker Nov 29, 2019
4a95f3a
added first test for storage io
eileen-kuehn Nov 29, 2019
0e89302
adapted access to connection for RemoteStorage
eileen-kuehn Nov 29, 2019
5fc8b37
added new test for storage input
eileen-kuehn Nov 29, 2019
d3ff0da
fixed assignment of remote storage
eileen-kuehn Nov 29, 2019
cbc1507
reverted change of transfer signature and added typehints
eileen-kuehn Nov 29, 2019
d0124d1
introduced interface for storage
eileen-kuehn Nov 29, 2019
c7af203
added docstrings to storage interface
eileen-kuehn Nov 29, 2019
0019583
extended tests
eileen-kuehn Nov 29, 2019
01efb93
removed public update method from storage and made update private
eileen-kuehn Nov 29, 2019
a7177f7
added unit conversion for storageelement connection and remote connec…
tfesenbecker Nov 29, 2019
d684dc8
reformated debug output
tfesenbecker Nov 29, 2019
328e6dc
added debug output and fixed wrong function call
tfesenbecker Nov 29, 2019
0e49c32
added debug output and fixed wrong function call
tfesenbecker Nov 29, 2019
98dbba1
Merge branch 'feature/caching' of https://github.com/MatterMiners/lap…
tfesenbecker Nov 29, 2019
93589ba
Merge branch 'master' of https://github.com/MatterMiners/lapis into f…
tfesenbecker Nov 30, 2019
2351f51
reduced debug output, added default value for job_repr
tfesenbecker Dec 3, 2019
9e3a446
Added unittests to test functionality of Connection class with Hitrat…
tfesenbecker Dec 3, 2019
5cdcf35
extended type hints
tfesenbecker Dec 3, 2019
2747bce
fixed error in _usedstorage calculation
tfesenbecker Dec 3, 2019
4f6132f
storage.py now catches exception caused by not specified storage cont…
tfesenbecker Dec 3, 2019
fa0e98f
fixed test_storage unit tests
tfesenbecker Dec 3, 2019
d01ba77
added new unit tests for hitrate based caching
tfesenbecker Dec 3, 2019
5477344
removed forgotten debug output
tfesenbecker Dec 3, 2019
2dc77ff
storage_content_reader() handles empty files correctly now
tfesenbecker Dec 3, 2019
c573ac5
fixed line length
tfesenbecker Dec 3, 2019
f6fec85
fixed line length
tfesenbecker Dec 3, 2019
2fb8afe
Monitor.run now properly closes the sampling aiter
maxfischer2781 Dec 3, 2019
a5d13de
made the linter happy
maxfischer2781 Dec 3, 2019
d277354
added default value for calculation_efficiency
tfesenbecker Dec 3, 2019
a165249
remove calculation_efficiency default value
tfesenbecker Dec 3, 2019
691d523
corrected wrong attribute names
tfesenbecker Dec 3, 2019
b8ae864
fixed usage of transfer() interface and debug outputs
tfesenbecker Dec 3, 2019
73d6352
fixed debug output leading to failing unit test
tfesenbecker Dec 3, 2019
3c58797
fixed bug in filesize unit conversion
tfesenbecker Dec 4, 2019
616a7e1
Extended hitrate based caching to support different cache hitrates fo…
tfesenbecker Jan 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -18,4 +18,4 @@ exclude_lines =
raise NotImplementedError
return NotImplemented
if __name__ == "__main__"
if __name__ == '__main__'
if __name__ == '__main__'
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -207,4 +207,3 @@ Icon
Network Trash Folder
Temporary Items
.apdisk

2 changes: 1 addition & 1 deletion docs/Makefile
Original file line number Diff line number Diff line change
@@ -16,4 +16,4 @@ help:
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
6 changes: 3 additions & 3 deletions docs/source/topics/monitoring.rst
Original file line number Diff line number Diff line change
@@ -77,9 +77,9 @@ COBalD-specific Monitoring
Caching-specific Monitoring
~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. TODO::

Will be added as soon as the caching branch is merged.
.. autofunction:: lapis.monitor.caching.storage_status
.. autofunction:: lapis.monitor.caching.storage_connection
.. autofunction:: lapis.monitor.caching.remote_connection

Telegraf
--------
57 changes: 57 additions & 0 deletions lapis/cachealgorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Optional, Callable, Tuple

from lapis.files import RequestedFile, StoredFile
from lapis.storageelement import StorageElement
from lapis.utilities.cache_cleanup_implementations import sort_files_by_cachedsince


def check_size(file: RequestedFile, storage: StorageElement):
return storage.size >= file.filesize


def check_relevance(file: RequestedFile, storage: StorageElement):
return True


def delete_oldest(
file: RequestedFile, storage: StorageElement
) -> Tuple[bool, Tuple[StoredFile]]:
deletable_files = []
currently_free = storage.available
if currently_free < storage.available:
sorted_files = sort_files_by_cachedsince(storage.files.items())
while currently_free < file.filesize:
deletable_files.append(next(sorted_files))
currently_free += deletable_files[-1].filesize
return True, tuple(deletable_files)


def delete_oldest_few_used(
file: RequestedFile, storage: StorageElement
) -> Tuple[bool, Optional[Tuple[StoredFile]]]:
deletable_files = []
currently_free = storage.available
if currently_free < storage.available:
sorted_files = sort_files_by_cachedsince(storage.files.items())
for current_file in sorted_files:
if current_file.numberofaccesses < 3:
deletable_files.append(current_file)
currently_free += deletable_files[-1].filesize
if currently_free >= file.filesize:
return True, tuple(deletable_files)
return False, None


class CacheAlgorithm(object):
def __init__(self, caching_strategy: Callable, deletion_strategy: Callable):
self._caching_strategy = lambda file, storage: check_size(
file, storage
) and check_relevance(file, storage)
self._deletion_strategy = lambda file, storage: delete_oldest(file, storage)

def consider(
self, file: RequestedFile, storage: StorageElement
) -> Tuple[bool, Optional[Tuple[StoredFile]]]:
if self._caching_strategy(file, storage):
return self._deletion_strategy(file, storage)
return False, None
57 changes: 52 additions & 5 deletions lapis/cli/simulate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from functools import partial

import click
import logging.handlers

@@ -9,6 +11,8 @@
from lapis.pool import StaticPool, Pool
from lapis.pool_io.htcondor import htcondor_pool_reader
from lapis.job_io.swf import swf_job_reader
from lapis.storageelement import StorageElement, HitrateStorage
from lapis.storage_io.storage import storage_reader

from lapis.scheduler import CondorJobScheduler
from lapis.simulator import Simulator
@@ -25,18 +29,22 @@

pool_import_mapper = {"htcondor": htcondor_pool_reader}

storage_import_mapper = {"standard": storage_reader}


@click.group()
@click.option("--seed", type=int, default=1234)
@click.option("--until", type=float)
@click.option("--log-tcp", "log_tcp", is_flag=True)
@click.option("--log-file", "log_file", type=click.File("w"))
@click.option("--log-telegraf", "log_telegraf", is_flag=True)
@click.option("--calculation-efficiency", type=float)
@click.pass_context
def cli(ctx, seed, until, log_tcp, log_file, log_telegraf):
def cli(ctx, seed, until, log_tcp, log_file, log_telegraf, calculation_efficiency):
ctx.ensure_object(dict)
ctx.obj["seed"] = seed
ctx.obj["until"] = until
ctx.obj["calculation_efficiency"] = calculation_efficiency
monitoring_logger = logging.getLogger()
monitoring_logger.setLevel(logging.DEBUG)
time_filter = SimulationTimeFilter()
@@ -71,22 +79,51 @@ def cli(ctx, seed, until, log_tcp, log_file, log_telegraf):
type=(click.File("r"), click.Choice(list(pool_import_mapper.keys()))),
multiple=True,
)
@click.option(
"--storage-files",
"storage_files",
type=(
click.File("r"),
click.File("r"),
click.Choice(list(storage_import_mapper.keys())),
),
default=(None, None, None),
)
@click.option("--remote-throughput", "remote_throughput", type=float, default=10)
@click.option("--cache-hitrate", "cache_hitrate", type=float, default=None)
@click.pass_context
def static(ctx, job_file, pool_file):
def static(ctx, job_file, pool_file, storage_files, remote_throughput, cache_hitrate):
click.echo("starting static environment")
simulator = Simulator(seed=ctx.obj["seed"])
file, file_type = job_file
simulator.create_job_generator(
job_input=file, job_reader=job_import_mapper[file_type]
job_input=file,
job_reader=partial(
job_import_mapper[file_type],
calculation_efficiency=ctx.obj["calculation_efficiency"],
),
)
simulator.create_scheduler(scheduler_type=CondorJobScheduler)

if all(storage_files):
simulator.create_connection_module(remote_throughput * 1024 * 1024 * 1024)
storage_file, storage_content_file, storage_type = storage_files
simulator.create_storage(
storage_input=storage_file,
storage_content_input=storage_content_file,
storage_reader=storage_import_mapper[storage_type],
storage_type=partial(HitrateStorage, cache_hitrate)
if cache_hitrate is not None
else StorageElement,
)
for current_pool in pool_file:
pool_file, pool_file_type = current_pool
simulator.create_pools(
pool_input=pool_file,
pool_reader=pool_import_mapper[pool_file_type],
pool_type=StaticPool,
)
simulator.enable_monitoring()
simulator.run(until=ctx.obj["until"])


@@ -108,7 +145,11 @@ def dynamic(ctx, job_file, pool_file):
simulator = Simulator(seed=ctx.obj["seed"])
file, file_type = job_file
simulator.create_job_generator(
job_input=file, job_reader=job_import_mapper[file_type]
job_input=file,
job_reader=partial(
job_import_mapper[file_type],
calculation_efficiency=ctx.obj["calculation_efficiency"],
),
)
simulator.create_scheduler(scheduler_type=CondorJobScheduler)
for current_pool in pool_file:
@@ -119,6 +160,7 @@ def dynamic(ctx, job_file, pool_file):
pool_type=Pool,
controller=SimulatedLinearController,
)
simulator.enable_monitoring()
simulator.run(until=ctx.obj["until"])


@@ -146,7 +188,11 @@ def hybrid(ctx, job_file, static_pool_file, dynamic_pool_file):
simulator = Simulator(seed=ctx.obj["seed"])
file, file_type = job_file
simulator.create_job_generator(
job_input=file, job_reader=job_import_mapper[file_type]
job_input=file,
job_reader=partial(
job_import_mapper[file_type],
calculation_efficiency=ctx.obj["calculation_efficiency"],
),
)
simulator.create_scheduler(scheduler_type=CondorJobScheduler)
for current_pool in static_pool_file:
@@ -164,6 +210,7 @@ def hybrid(ctx, job_file, static_pool_file, dynamic_pool_file):
pool_type=Pool,
controller=SimulatedLinearController,
)
simulator.enable_monitoring()
simulator.run(until=ctx.obj["until"])


149 changes: 149 additions & 0 deletions lapis/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import random

from typing import Union, Optional
from usim import Scope, time, Pipe

from lapis.cachealgorithm import (
CacheAlgorithm,
check_size,
check_relevance,
delete_oldest_few_used,
)
from lapis.storageelement import StorageElement, RemoteStorage
from lapis.files import RequestedFile, RequestedFile_HitrateBased
from lapis.monitor import sampling_required


class Connection(object):

__slots__ = ("storages", "remote_connection", "caching_algorithm")

def __init__(self, throughput=100):
self.storages = dict()
self.remote_connection = RemoteStorage(Pipe(throughput=throughput))
self.caching_algorithm = CacheAlgorithm(
caching_strategy=lambda file, storage: check_size(file, storage)
and check_relevance(file, storage),
deletion_strategy=lambda file, storage: delete_oldest_few_used(
file, storage
),
)

def add_storage_element(self, storage_element: StorageElement):
"""
Register storage element in Connetion module clustering storage elements by
sitename
:param storage_element:
:return:
"""
storage_element.remote_storage = self.remote_connection
try:
self.storages[storage_element.sitename].append(storage_element)
except KeyError:
self.storages[storage_element.sitename] = [storage_element]

async def _determine_inputfile_source(
self,
requested_file: RequestedFile,
dronesite: Optional[str],
job_repr: Optional[str] = None,
) -> Union[StorageElement, RemoteStorage]:
"""
Collects NamedTuples containing the amount of data of the requested file
cached in a storage element and the storage element for all reachable storage
objects on the drone's site. The tuples are sorted by amount of cached data
and the storage object where the biggest part of the file is cached is
returned. If the file is not cached in any storage object the connection module
remote connection is returned.
:param requested_file:
:param dronesite:
:param job_repr:
:return:
"""
provided_storages = self.storages.get(dronesite, None)
if provided_storages is not None:
look_up_list = []
for storage in provided_storages:
look_up_list.append(storage.find(requested_file, job_repr))
storage_list = sorted(
[entry for entry in look_up_list], key=lambda x: x[0], reverse=True
)
for entry in storage_list:
# TODO: check should better check that size is bigger than requested
if entry.cached_filesize > 0:
return entry.storage
return self.remote_connection

async def stream_file(
self, requested_file: RequestedFile, dronesite, job_repr=None
):
"""
Determines which storage object is used to provide the requested file and
startes the files transfer. For files transfered via remote connection a
potential cache decides whether to cache the file and handles the caching
process.
:param requested_file:
:param dronesite:
:param job_repr:
:return:
"""
used_connection = await self._determine_inputfile_source(
requested_file, dronesite, job_repr
)
await sampling_required.put(used_connection)
if used_connection == self.remote_connection and self.storages.get(
dronesite, None
):
try:
potential_cache = random.choice(self.storages[dronesite])
cache_file, files_for_deletion = self.caching_algorithm.consider(
file=requested_file, storage=potential_cache
)
if cache_file:
for file in files_for_deletion:
await potential_cache.remove(file, job_repr)
await potential_cache.add(requested_file, job_repr)
else:
print(
f"APPLY CACHING DECISION: Job {job_repr}, "
f"File {requested_file.filename}: File wasnt "
f"cached @ {time.now}"
)
except KeyError:
pass
print(f"now transfering {requested_file.filesize} from {used_connection}")
await used_connection.transfer(requested_file, job_repr=job_repr)
print(
"Job {}: finished transfering of file {}: {}GB @ {}".format(
job_repr, requested_file.filename, requested_file.filesize, time.now
)
)

async def transfer_files(self, drone, requested_files: dict, job_repr=None):
"""
Converts dict information about requested files to RequestedFile object and
parallely launches streaming for all files
:param drone:
:param requested_files:
:param job_repr:
:return:
"""
start_time = time.now
async with Scope() as scope:
for inputfilename, inputfilespecs in requested_files.items():
if "hitrates" in inputfilespecs.keys():
requested_file = RequestedFile_HitrateBased(
inputfilename,
inputfilespecs["usedsize"],
inputfilespecs["hitrates"],
)
else:
requested_file = RequestedFile(
inputfilename, inputfilespecs["usedsize"]
)
scope.do(self.stream_file(requested_file, drone.sitename, job_repr))
stream_time = time.now - start_time
print(
"STREAMED files {} in {}".format(list(requested_files.keys()), stream_time)
)
return stream_time
15 changes: 13 additions & 2 deletions lapis/drone.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from cobald import interfaces
from typing import Optional
from usim import time, Scope, instant, Capacities, ResourcesUnavailable, Queue

from lapis.job import Job
from lapis.connection import Connection


class ResourcesExceeded(Exception):
@@ -12,9 +14,11 @@ class Drone(interfaces.Pool):
def __init__(
self,
scheduler,
pool_resources: dict,
scheduling_duration: float,
pool_resources: Optional[dict] = None,
scheduling_duration: Optional[float] = None,
ignore_resources: list = None,
sitename: str = None,
connection: Connection = None,
):
"""
:param scheduler:
@@ -23,6 +27,8 @@ def __init__(
"""
super(Drone, self).__init__()
self.scheduler = scheduler
self.connection = connection
self.sitename = sitename
self.pool_resources = pool_resources
self.resources = Capacities(**pool_resources)
# shadowing requested resources to determine jobs to be killed
@@ -143,6 +149,11 @@ async def _run_job(self, job: Job, kill: bool):
pass
self.scheduler.update_drone(self)
await job_execution.done
print(
"finished job {} on drone {} @ {}".format(
repr(job), repr(self), time.now
)
)
except ResourcesUnavailable:
await instant
job_execution.cancel()
54 changes: 54 additions & 0 deletions lapis/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import Optional, NamedTuple


class StoredFile(object):

__slots__ = (
"filename",
"filesize",
"storedsize",
"cachedsince",
"lastaccessed",
"numberofaccesses",
)

def __init__(
self,
filename: str,
filesize: Optional[int] = None,
storedsize: Optional[int] = None,
cachedsince: Optional[int] = None,
lastaccessed: Optional[int] = None,
numberofaccesses: Optional[int] = None,
**filespecs,
):
self.filename = filename
self.filesize = filesize
self.storedsize = storedsize or self.filesize
self.cachedsince = cachedsince
self.lastaccessed = lastaccessed
self.numberofaccesses = numberofaccesses

def increment_accesses(self):
self.numberofaccesses += 1


class RequestedFile(NamedTuple):
filename: str
filesize: Optional[int] = None

def convert_to_stored_file_object(self, currenttime):
print(self.filesize)
return StoredFile(
self.filename,
filesize=self.filesize,
cachedsince=currenttime,
lastaccessed=currenttime,
numberofaccesses=1,
)


class RequestedFile_HitrateBased(NamedTuple):
filename: str
filesize: float
cachehitrate: dict
61 changes: 61 additions & 0 deletions lapis/interfaces/_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import abc

from typing import NamedTuple

from lapis.files import RequestedFile, StoredFile


class LookUpInformation(NamedTuple):
cached_filesize: int
storage: "Storage"


class Storage(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def size(self) -> int:
"""Total size of storage in Bytes"""
raise NotImplementedError

@property
@abc.abstractmethod
def available(self) -> int:
"""Available storage in Bytes"""
raise NotImplementedError

@property
@abc.abstractmethod
def used(self) -> int:
"""Used storage in Bytes"""
raise NotImplementedError

@abc.abstractmethod
async def transfer(self, file: RequestedFile, job_repr):
"""
Transfer size of given file via the storages' connection and update file
information. If the file was deleted since it was originally looked up
the resulting error is not raised.
.. TODO:: What does this mean with the error?
"""
raise NotImplementedError

@abc.abstractmethod
async def add(self, file: RequestedFile, job_repr):
"""
Add file information to storage and transfer the size of the file via
the storages' connection.
"""
raise NotImplementedError

@abc.abstractmethod
async def remove(self, file: StoredFile, job_repr):
"""
Remove all file information and used filesize from the storage.
"""
raise NotImplementedError

@abc.abstractmethod
def find(self, file: RequestedFile, job_repr) -> LookUpInformation:
"""Information if a file is stored in Storage"""
raise NotImplementedError
72 changes: 63 additions & 9 deletions lapis/job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import Optional, TYPE_CHECKING

from usim import time
from usim import time, Scope, instant
from usim import CancelTask

from lapis.monitor import sampling_required
@@ -24,6 +24,7 @@ class Job(object):
"_name",
"drone",
"_success",
"calculation_efficiency",
)

def __init__(
@@ -32,8 +33,9 @@ def __init__(
used_resources: dict,
in_queue_since: float = 0,
queue_date: float = 0,
name: str = None,
drone: "Drone" = None,
name: Optional[str] = None,
drone: "Optional[Drone]" = None,
calculation_efficiency: Optional[float] = None,
):
"""
Definition of a job that uses a specified amount of resources `used_resources`
@@ -59,17 +61,20 @@ def __init__(
self.used_resources[key],
)
self.resources[key] = self.used_resources[key]
self.walltime = used_resources.pop("walltime")
self.requested_walltime = resources.pop("walltime", None)
self.requested_inputfiles = resources.pop("inputfiles", None)
self.used_inputfiles = used_resources.pop("inputfiles", None)
self.walltime: int = used_resources.pop("walltime")
self.requested_walltime: Optional[int] = resources.pop("walltime", None)
self.queue_date = queue_date
assert in_queue_since >= 0, "Queue time cannot be negative"
self.in_queue_since = in_queue_since
self.in_queue_until = None
self.in_queue_until: Optional[float] = None
self.drone = drone
self._name = name
self._success: Optional[bool] = None
self.calculation_efficiency = calculation_efficiency

# caching-related
self.requested_inputfiles = resources.pop("inputfiles", None)
self.used_inputfiles = used_resources.pop("inputfiles", None)

@property
def name(self) -> str:
@@ -91,22 +96,71 @@ def waiting_time(self) -> float:
return self.in_queue_until - self.in_queue_since
return float("Inf")

async def _calculate(self):
"""
Determines a jobs calculation time based on the jobs CPU time and a
calculation efficiency representing inefficient programming.
:param calculation_efficiency:
:return:
"""
print(
f"WALLTIME: Job {self} @ {time.now}, "
f"{self.used_resources.get('cores', None)}, "
f"{self.calculation_efficiency}"
)
result = self.walltime
try:
result = (
self.used_resources["cores"] / self.calculation_efficiency
) * self.walltime
except (KeyError, TypeError):
pass
start = time.now
await (time + result)
print(f"finished calculation at {time.now - start}")

async def _transfer_inputfiles(self):
try:
start = time.now
print(f"TRANSFERING INPUTFILES: Job {self} @ {start}")
await self.drone.connection.transfer_files(
drone=self.drone,
requested_files=self.used_inputfiles,
job_repr=repr(self),
)
print(
f"streamed inputfiles {self.used_inputfiles.keys()} for job {self} "
f"in {time.now - start} timeunits, finished @ {time.now}"
)
except AttributeError:
pass

async def run(self, drone: "Drone"):
assert drone, "Jobs cannot run without a drone being assigned"
self.drone = drone
self.in_queue_until = time.now
self._success = None
await sampling_required.put(self)
print("running job {} in drone {}".format(repr(self), repr(self.drone)))
try:
await (time + self.walltime)
start = time.now
async with Scope() as scope:
await instant
scope.do(self._transfer_inputfiles())
scope.do(self._calculate())
except CancelTask:
self.drone = None
self._success = False
# TODO: in_queue_until is still set
except BaseException:
self.drone = None
self._success = False
# TODO: in_queue_until is still set
raise
else:
old_walltime = self.walltime
self.walltime = time.now - start
print(f"monitored walltime of {old_walltime} changed to {self.walltime}")
self.drone = None
self._success = True
await sampling_required.put(self)
29 changes: 26 additions & 3 deletions lapis/job_io/htcondor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import csv
import json
import logging
from typing import Optional

from lapis.job import Job
from copy import deepcopy


def htcondor_job_reader(
iterable,
calculation_efficiency: Optional[float] = None,
resource_name_mapping={ # noqa: B006
"cores": "RequestCpus",
"walltime": "RequestWalltime", # s
@@ -29,6 +31,8 @@ def htcondor_job_reader(
"RemoteWallClockTime": 1,
"MemoryUsage": 1000 * 1000,
"DiskUsage_RAW": 1024,
"filesize": 1024 * 1024 * 1024,
"usedsize": 1024 * 1024 * 1024,
},
):
input_file_type = iterable.name.split(".")[-1].lower()
@@ -70,17 +74,34 @@ def htcondor_job_reader(
* unit_conversion_mapping.get(original_key, 1)
)

calculation_efficiency = entry.get(
"calculation_efficiency", calculation_efficiency
)

try:
if not entry["Inputfiles"]:
del entry["Inputfiles"]
resources["inputfiles"] = deepcopy(entry["Inputfiles"])
used_resources["inputfiles"] = deepcopy(entry["Inputfiles"])
for filename, filespecs in entry["Inputfiles"].items():
for key in filespecs.keys():
if key == "hitrates":
continue
resources["inputfiles"][filename][key] = filespecs[
key
] * unit_conversion_mapping.get(key, 1)
used_resources["inputfiles"][filename][key] = filespecs[
key
] * unit_conversion_mapping.get(key, 1)

if "usedsize" in filespecs:
del resources["inputfiles"][filename]["usedsize"]

if "filesize" in filespecs:
if "usedsize" not in filespecs:
used_resources["inputfiles"][filename]["usedsize"] = filespecs[
"filesize"
]
used_resources["inputfiles"][filename]["usedsize"] = resources[
"inputfiles"
][filename]["filesize"]
del used_resources["inputfiles"][filename]["filesize"]

except KeyError:
@@ -89,4 +110,6 @@ def htcondor_job_reader(
resources=resources,
used_resources=used_resources,
queue_date=float(entry[used_resource_name_mapping["queuetime"]]),
calculation_efficiency=calculation_efficiency,
name=entry.get("name", None),
)
3 changes: 3 additions & 0 deletions lapis/job_io/swf.py
Original file line number Diff line number Diff line change
@@ -4,12 +4,14 @@
[Standard Workload Format](http://www.cs.huji.ac.il/labs/parallel/workload/swf.html).
"""
import csv
from typing import Optional

from lapis.job import Job


def swf_job_reader(
iterable,
calculation_efficiency: Optional[float] = None,
resource_name_mapping={ # noqa: B006
"cores": "Requested Number of Processors",
"walltime": "Requested Time", # s
@@ -90,4 +92,5 @@ def swf_job_reader(
used_resources=used_resources,
queue_date=float(row[header[used_resource_name_mapping["queuetime"]]]),
name=row[header["Job Number"]],
calculation_efficiency=calculation_efficiency,
)
21 changes: 16 additions & 5 deletions lapis/monitor/__init__.py
Original file line number Diff line number Diff line change
@@ -7,6 +7,9 @@
from usim import time, Queue


SIMULATION_START = None


class LoggingSocketHandler(logging.handlers.SocketHandler):
def makePickle(self, record):
return self.format(record).encode()
@@ -42,11 +45,19 @@ def __init__(self):
self._statistics = {}

async def run(self):
async for log_object in sampling_required:
for statistic in self._statistics.get(type(log_object), set()):
# do the logging
for record in statistic(log_object):
logging.getLogger(statistic.name).info(statistic.name, record)
# The Queue.__aiter__ cannot safely be finalised unless closed.
# We explicitly create and later on aclose it, to ensure this happens
# when the Scope collects us and the event loop is still around.
log_iter = sampling_required.__aiter__()
try:
async for log_object in log_iter:
for statistic in self._statistics.get(type(log_object), set()):
# do the logging
for record in statistic(log_object):
record["tardis"] = "lapis-%s" % SIMULATION_START
logging.getLogger(statistic.name).info(statistic.name, record)
except GeneratorExit:
await log_iter.aclose()

def register_statistic(self, statistic: Callable) -> None:
"""
91 changes: 91 additions & 0 deletions lapis/monitor/caching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import logging

from cobald.monitor.format_json import JsonFormatter
from cobald.monitor.format_line import LineProtocolFormatter
from usim import Pipe

from lapis.monitor import LoggingSocketHandler, LoggingUDPSocketHandler
from lapis.storageelement import StorageElement


def storage_status(storage: StorageElement) -> list:
"""
Log information about current storage object state
:param storage:
:return: list of records for logging
"""
results = [
{
"storage": repr(storage),
"usedstorage": storage.used,
"storagesize": storage.size,
"numberoffiles": len(storage.files),
}
]
return results


storage_status.name = "storage_status"
storage_status.whitelist = (StorageElement,)
storage_status.logging_formatter = {
LoggingSocketHandler.__name__: JsonFormatter(),
logging.StreamHandler.__name__: JsonFormatter(),
LoggingUDPSocketHandler.__name__: LineProtocolFormatter(
tags={"tardis", "storage"}, resolution=1
),
}


def storage_connection(storage: StorageElement) -> list:
"""
Log information about the storages connection
:param storage:
:return:
"""
results = [
{
"storage": repr(storage),
"throughput": storage.connection.throughput,
"requested_throughput": sum(storage.connection._subscriptions.values()),
"throughput_scale": storage.connection._throughput_scale,
}
]
return results


storage_connection.name = "storage_connection"
storage_connection.whitelist = (StorageElement,)
storage_connection.logging_formatter = {
LoggingSocketHandler.__name__: JsonFormatter(),
logging.StreamHandler.__name__: JsonFormatter(),
LoggingUDPSocketHandler.__name__: LineProtocolFormatter(
tags={"tardis", "storage"}, resolution=1
),
}


def remote_connection(remote: Pipe) -> list:
"""
Log information about the remote connection
:param remote:
:return:
"""
results = [
{
"throughput": remote.throughput,
"requested_throughput": sum(remote._subscriptions.values()),
"throughput_scale": remote._throughput_scale,
}
]
return results


remote_connection.name = "remote_connection"
remote_connection.whitelist = (Pipe,)
remote_connection.logging_formatter = {
LoggingSocketHandler.__name__: JsonFormatter(),
logging.StreamHandler.__name__: JsonFormatter(),
LoggingUDPSocketHandler.__name__: LineProtocolFormatter(
tags={"tardis"}, resolution=1
),
}
18 changes: 15 additions & 3 deletions lapis/pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from functools import partial
from typing import Generator, Callable
from cobald import interfaces
from usim import eternity, Scope, interval

from lapis.connection import Connection
from .drone import Drone


@@ -24,15 +26,20 @@ def __init__(
capacity: int = float("inf"),
init: int = 0,
name: str = None,
connection: Connection = None,
):
super(Pool, self).__init__()
assert init <= capacity
self.make_drone = make_drone
self._drones = []
self._demand = 1
self._level = init
self._capacity = capacity
self._name = name
# TODO: Should drones have access to the pool or the connection directly?
if connection is not None:
self.make_drone = partial(make_drone, connection=connection)
else:
self.make_drone = make_drone

async def init_pool(self, scope: Scope, init: int = 0):
"""
@@ -136,10 +143,15 @@ class StaticPool(Pool):
instantiated within the pool
"""

def __init__(self, make_drone: Callable, capacity: int = 0):
def __init__(
self, make_drone: Callable, capacity: int = 0, connection: Connection = None
):
assert capacity > 0, "Static pool was initialised without any resources..."
super(StaticPool, self).__init__(
capacity=capacity, init=capacity, make_drone=make_drone
capacity=capacity,
init=capacity,
make_drone=make_drone,
connection=connection,
)
self._demand = capacity

5 changes: 5 additions & 0 deletions lapis/pool_io/htcondor.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@
from functools import partial

from typing import Callable

from lapis.connection import Connection
from ..pool import Pool


@@ -19,6 +21,7 @@ def htcondor_pool_reader(
},
pool_type: Callable = Pool,
make_drone: Callable = None,
connection: Connection = None,
):
"""
Load a pool configuration that was exported via htcondor from files or
@@ -48,5 +51,7 @@ def htcondor_pool_reader(
for key, value in resource_name_mapping.items()
},
ignore_resources=["disk"],
sitename=row.get("sitename", None),
),
connection=connection,
)
15 changes: 14 additions & 1 deletion lapis/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Dict
from usim import Scope, interval, Resources
from usim import Scope, interval, Resources, time

from lapis.drone import Drone
from lapis.monitor import sampling_required
@@ -88,9 +88,17 @@ async def run(self):
async with Scope() as scope:
scope.do(self._collect_jobs())
async for _ in interval(self.interval):
print("NEW SCHEDULING INTERVAL @ {}".format(time.now))
print(self.job_queue)
for job in self.job_queue.copy():
print("SCHEDULING {}".format(repr(job)))
best_match = self._schedule_job(job)
if best_match:
print(
"start job {} on drone {} @ {}".format(
repr(job), repr(best_match), time.now
)
)
await best_match.schedule_job(job)
self.job_queue.remove(job)
await sampling_required.put(self.job_queue)
@@ -129,6 +137,11 @@ def _schedule_job(self, job) -> Drone:
drone = cluster[0]
cost = 0
resources = drone.theoretical_available_resources
# print(
# "trying to match Job {} to {}, resources {}".format(
# repr(job), repr(drone), resources
# )
# )
for resource_type in job.resources:
if resources.get(resource_type, 0) < job.resources[resource_type]:
# Inf for all job resources that a drone does not support
51 changes: 38 additions & 13 deletions lapis/simulator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import logging
import random
import time as pytime
from functools import partial

from typing import List

from cobald.interfaces import Controller
from usim import run, time, until, Scope, Queue

import lapis.monitor as monitor
from lapis.drone import Drone
from lapis.job import job_to_queue_scheduler
from lapis.connection import Connection
from lapis.monitor.caching import storage_status, storage_connection, remote_connection
from lapis.monitor.general import (
user_demand,
job_statistics,
@@ -14,29 +21,26 @@
configuration_information,
job_events,
)
from lapis.monitor import Monitoring
from lapis.monitor.cobald import drone_statistics, pool_statistics

from lapis.pool import Pool

logging.getLogger("implementation").propagate = False


class Simulator(object):
def __init__(self, seed=1234):
random.seed(seed)
self.job_queue = Queue()
self.pools = []
self.controllers = []
self.job_queue: Queue = Queue()
self.pools: List[Pool] = []
self.connection: Connection = None
self.controllers: List[Controller] = []
self.job_scheduler = None
self.job_generator = None
self.cost = 0
self._job_generators = []
self.monitoring = None
self.monitoring = monitor.Monitoring()
self.duration = None
self.enable_monitoring()

def enable_monitoring(self):
self.monitoring = Monitoring()
self.monitoring.register_statistic(user_demand)
self.monitoring.register_statistic(job_statistics)
self.monitoring.register_statistic(job_events)
@@ -45,6 +49,9 @@ def enable_monitoring(self):
self.monitoring.register_statistic(resource_statistics)
self.monitoring.register_statistic(pool_status)
self.monitoring.register_statistic(configuration_information)
self.monitoring.register_statistic(storage_status)
self.monitoring.register_statistic(storage_connection)
self.monitoring.register_statistic(remote_connection)

def create_job_generator(self, job_input, job_reader):
self._job_generators.append((job_input, job_reader))
@@ -55,20 +62,36 @@ def create_pools(self, pool_input, pool_reader, pool_type, controller=None):
iterable=pool_input,
pool_type=pool_type,
make_drone=partial(Drone, self.job_scheduler),
connection=self.connection,
):
self.pools.append(pool)
if controller:
self.controllers.append(controller(target=pool, rate=1))

def create_storage(
self, storage_input, storage_reader, storage_type, storage_content_input=None
):
assert self.connection, "Connection module needs to be created before storages"
for storage in storage_reader(
storage=storage_input,
storage_content=storage_content_input,
storage_type=storage_type,
):
self.connection.add_storage_element(storage)

def create_scheduler(self, scheduler_type):
self.job_scheduler = scheduler_type(job_queue=self.job_queue)

def create_connection_module(self, remote_throughput):
self.connection = Connection(remote_throughput)

def run(self, until=None):
print(f"running until {until}")
monitor.SIMULATION_START = pytime.time()
print(f"[lapis-{monitor.SIMULATION_START}] running until {until}")
run(self._simulate(until))

async def _simulate(self, end):
print(f"Starting simulation at {time.now}")
print(f"[lapis-{monitor.SIMULATION_START}] Starting simulation at {time.now}")
async with until(time == end) if end else Scope() as while_running:
for pool in self.pools:
while_running.do(pool.run(), volatile=True)
@@ -79,9 +102,11 @@ async def _simulate(self, end):
while_running.do(controller.run(), volatile=True)
while_running.do(self.monitoring.run(), volatile=True)
self.duration = time.now
print(f"Finished simulation at {self.duration}")
print(
f"[lapis-{monitor.SIMULATION_START}] Finished simulation at {self.duration}"
)

async def _queue_jobs(self, job_input, job_reader):
await job_to_queue_scheduler(
job_generator=job_reader(job_input), job_queue=self.job_queue
job_generator=partial(job_reader, job_input)(), job_queue=self.job_queue
)
Empty file added lapis/storage_io/__init__.py
Empty file.
83 changes: 83 additions & 0 deletions lapis/storage_io/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import csv
from functools import partial

from lapis.files import StoredFile


def storage_reader(
storage,
storage_content,
storage_type,
unit_conversion_mapping: dict = { # noqa: B006
"cachesizeGB": 1024 * 1024 * 1024,
"throughput_limit": 1024 * 1024 * 1024,
},
):
try:
storage_content = storage_content_reader(storage_content)
except TypeError:
storage_content = dict()
reader = csv.DictReader(storage, delimiter=" ", quotechar="'")
for row in reader:
yield partial(
storage_type,
name=row["name"],
sitename=row["sitename"],
size=int(
float(row["cachesizeGB"])
* unit_conversion_mapping.get("cachesizeGB", 1)
),
throughput_limit=int(
float(row["throughput_limit"])
* unit_conversion_mapping.get("throughput_limit", 1)
),
files=storage_content.get(row["name"], dict()),
)()


def storage_content_reader(
file_name,
unit_conversion_mapping: dict = { # noqa: B006
"filesize": 1024 * 1024 * 1024,
"storedsize": 1024 * 1024 * 1024,
},
):
reader = csv.DictReader(file_name, delimiter=" ", quotechar="'")
cache_information = dict()
for row in reader:
for key in row:
if key not in ["filename", "cachename"]:
row[key] = int(float(row[key]) * unit_conversion_mapping.get(key, 1))
cache_information.setdefault(row["cachename"], {})[
row["filename"]
] = StoredFile(**row)
return cache_information


def storage_reader_filebased_hitrate_caching(
storage,
storage_type,
storage_content=None,
unit_conversion_mapping: dict = { # noqa: B006
"cachesizeGB": 1024 * 1024 * 1024,
"throughput_limit": 1024 * 1024 * 1024,
},
):

reader = csv.DictReader(storage, delimiter=" ", quotechar="'")
for row in reader:
print(row)
yield partial(
storage_type,
name=row["name"],
sitename=row["sitename"],
size=int(
float(row["cachesizeGB"])
* unit_conversion_mapping.get("cachesizeGB", 1)
),
throughput_limit=int(
float(row["throughput_limit"])
* unit_conversion_mapping.get("throughput_limit", 1)
),
files=dict(),
)()
307 changes: 307 additions & 0 deletions lapis/storageelement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
from typing import Optional

from usim import time, Resources, Pipe, Scope

from lapis.files import StoredFile, RequestedFile, RequestedFile_HitrateBased
from lapis.interfaces._storage import Storage, LookUpInformation

import logging


class RemoteStorage(Storage):
def __init__(self, pipe: Pipe):
self.connection = pipe

@property
def size(self):
return float("Inf")

@property
def available(self):
return float("Inf")

@property
def used(self):
return 0

async def transfer(self, file: RequestedFile, **kwargs):
await self.connection.transfer(total=file.filesize)

async def add(self, file: StoredFile, **kwargs):
raise NotImplementedError

async def remove(self, file: StoredFile, **kwargs):
raise NotImplementedError

def find(self, file: RequestedFile, **kwargs) -> LookUpInformation:
raise NotImplementedError


class StorageElement(Storage):

__slots__ = (
"name",
"sitename",
"_size",
"deletion_duration",
"update_duration",
"_usedstorage",
"files",
"filenames",
"connection",
"remote_storage",
)

def __init__(
self,
name: Optional[str] = None,
sitename: Optional[str] = None,
size: int = 1000 * 1024 * 1024 * 1024,
throughput_limit: int = 10 * 1024 * 1024 * 1024,
files: Optional[dict] = None,
):
self.name = name
self.sitename = sitename
self.deletion_duration = 5
self.update_duration = 1
self._size = size
self.files = files
self._usedstorage = Resources(
size=sum(file.storedsize for file in files.values())
)
self.connection = Pipe(throughput_limit)
self.remote_storage = None

@property
def size(self):
return self._size

@property
def used(self):
return self._usedstorage.levels.size

@property
def available(self):
return self.size - self.used

async def remove(self, file: StoredFile, job_repr=None):
"""
Deletes file from storage object. The time this operation takes is defined
by the storages deletion_duration attribute.
:param file:
:param job_repr: Needed for debug output, will be replaced
:return:
"""
print(
"REMOVE FROM STORAGE: Job {}, File {} @ {}".format(
job_repr, file.filename, time.now
)
)
await (time + self.deletion_duration)
await self._usedstorage.decrease(size=file.filesize)
self.files.pop(file.filename)

async def add(self, file: RequestedFile, job_repr=None):
"""
Adds file to storage object transfering it through the storage objects
connection. This should be sufficient for now because files are only added
to the storage when they are also transfered through the Connections remote
connection. If this simulator is extended to include any kind of
direct file placement this has to be adapted.
:param file:
:param job_repr: Needed for debug output, will be replaced
:return:
"""
print(
"ADD TO STORAGE: Job {}, File {} @ {}".format(
job_repr, file.filename, time.now
)
)
file = file.convert_to_stored_file_object(time.now)
await self._usedstorage.increase(size=file.filesize)
self.files[file.filename] = file
await self.connection.transfer(file.filesize)

async def _update(self, stored_file: StoredFile, job_repr):
"""
Updates a stored files information upon access.
:param stored_file:
:param job_repr: Needed for debug output, will be replaced
:return:
"""
await (time + self.update_duration)
stored_file.lastaccessed = time.now
stored_file.increment_accesses()
print(
"UPDATE: Job {}, File {} @ {}".format(
job_repr, stored_file.filename, time.now
)
)

async def transfer(self, file: RequestedFile, job_repr=None):
"""
Manages file transfer via the storage elements connection and updates file
information. If the file should have been deleted since it was originally
looked up the resulting error is not raised.
:param file:
:param job_repr: Needed for debug output, will be replaced
:return:
"""
await self.connection.transfer(file.filesize)
try:
# TODO: needs handling of KeyError
await self._update(self.files[file.filename], job_repr)
except AttributeError:
pass

def find(self, requested_file: RequestedFile, job_repr=None):
"""
Searches storage object for the requested_file and sends result (amount of
cached data, storage object) to the queue.
:param requested_file:
:param job_repr: Needed for debug output, will be replaced
:return: (amount of cached data, storage object)
"""
print(
"LOOK UP FILE: Job {}, File {}, Storage {} @ {}".format(
job_repr, requested_file.filename, repr(self), time.now
)
)
try:
result = LookUpInformation(
self.files[requested_file.filename].filesize, self
)
except KeyError:
print(
"File {} not cached on any reachable storage".format(
requested_file.filename
)
)
result = LookUpInformation(0, self)
return result

def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self.name or id(self))


class HitrateStorage(StorageElement):
def __init__(
self,
hitrate,
name: Optional[str] = None,
sitename: Optional[str] = None,
size: int = 1000 * 1024 * 1024 * 1024,
throughput_limit: int = 10 * 1024 * 1024 * 1024,
files: Optional[dict] = None,
):
super(HitrateStorage, self).__init__(
name=name,
sitename=sitename,
size=size,
throughput_limit=throughput_limit,
files=files,
)
self._hitrate = hitrate

@property
def available(self):
return self.size

@property
def used(self):
return 0

async def transfer(self, file: RequestedFile, job_repr=None):
print(
"TRANSFER: {}, filesize {}, remote: {}/{}, cache: {}/{}".format(
self._hitrate,
file.filesize,
(1 - self._hitrate) * file.filesize,
self.remote_storage.connection.throughput,
self._hitrate * file.filesize,
self.connection.throughput,
)
)
async with Scope() as scope:
logging.getLogger("implementation").warning(
"{} {} @ {} in {}".format(
self._hitrate * file.filesize,
(1 - self._hitrate) * file.filesize,
time.now,
file.filename[-30:],
)
)
scope.do(self.connection.transfer(total=self._hitrate * file.filesize))
scope.do(
self.remote_storage.connection.transfer(
total=(1 - self._hitrate) * file.filesize
)
)

def find(self, requested_file: RequestedFile, job_repr=None):
return LookUpInformation(requested_file.filesize, self)

async def add(self, file: RequestedFile, job_repr=None):
pass

async def remove(self, file: StoredFile, job_repr=None):
pass


class FileBasedHitrateStorage(StorageElement):
def __init__(
self,
name: Optional[str] = None,
sitename: Optional[str] = None,
size: int = 1000 * 1024 * 1024 * 1024,
throughput_limit: int = 10 * 1024 * 1024 * 1024,
files: Optional[dict] = None,
):
super(FileBasedHitrateStorage, self).__init__(
name=name,
sitename=sitename,
size=size,
throughput_limit=throughput_limit,
files=files,
)

@property
def available(self):
return self.size

@property
def used(self):
return 0

async def transfer(self, file: RequestedFile_HitrateBased, job_repr=None):
current_cachehitrate = file.cachehitrate.get(self.name, 0)
print(
"TRANSFER: on {} with {}, filesize {}, remote: {}/{}, cache: {}/{}".format(
self.name,
file.cachehitrate.get(self.name, 0),
file.filesize,
(1 - current_cachehitrate) * file.filesize,
self.remote_storage.connection.throughput,
current_cachehitrate * file.filesize,
self.connection.throughput,
)
)
async with Scope() as scope:

scope.do(
self.connection.transfer(total=current_cachehitrate * file.filesize)
)
scope.do(
self.remote_storage.connection.transfer(
total=(1 - current_cachehitrate) * file.filesize
)
)

def find(self, requested_file: RequestedFile, job_repr=None):
return LookUpInformation(requested_file.filesize, self)

async def add(self, file: RequestedFile, job_repr=None):
pass

async def remove(self, file: StoredFile, job_repr=None):
pass
5 changes: 5 additions & 0 deletions lapis/utilities/cache_algorithm_implementations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def cache_all(*args, **kwargs):
return True


cache_algorithm = {"standard": cache_all}
42 changes: 42 additions & 0 deletions lapis/utilities/cache_cleanup_implementations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import List

from lapis.files import StoredFile


def sort_files_by_cachedsince(stored_files: set) -> List[StoredFile]:
return sorted(list(stored_files), key=lambda x: x.cachedsince)


# async def fifo(size, storage):
# print("hit fifo")
# print(storage.files.keys())
# # FIFO, test different implementations
# sorted_content = sorted(
# list(storage.files.items()), key=lambda x: x.filespecs.cachedsince
# )
# print("sorted", sorted_content)
# while size < 0:
# print("hit while")
# size += sorted_content[0][1]["cachedsizeMB"]
# storage.files.pop(sorted_content[0][0])
# await sleep(storage.placement_duration)
# await storage._usedstorage.decrease(
# **{"usedsize": sorted_content[0][1]["cachedsizeMB"]})
# print(storage.usedstorage)
# sorted_content.pop(0)
# print("after fifo ", storage.files.keys())
#
#
# def last_accessed(size, storage):
# # FIFO, test different implementations
# sorted_content = sorted(
# list(storage.content.items()), key=lambda x: x[1]["lastaccessed"]
# )
# while size < 0:
# size += sorted_content[0][1]["cachedsizeMB"]
# storage.content.pop(sorted_content[0][0])
# storage.usedstorage -= sorted_content[0][1]["cachedsizeMB"]
# sorted_content.pop(0)
#
#
# cache_cleanup = {"fifo": fifo, "lastaccessed": last_accessed}
11 changes: 11 additions & 0 deletions lapis/utilities/walltime_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# walltime models for caching


def extrapolate_walltime_to_maximal_efficiency(
job, original_walltime, maximal_efficiency: float = 0.8
):
return (job.used_resources["cores"] / maximal_efficiency) * original_walltime


# TODO: add models depending on fraction of cached files etc
walltime_models = {"maxeff": extrapolate_walltime_to_maximal_efficiency}
48 changes: 40 additions & 8 deletions lapis_tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from typing import Callable, Coroutine
from typing import Callable, Coroutine, Optional
from functools import wraps

from usim import run
from usim import run, Resources

from lapis.drone import Drone
from lapis.job import Job
from lapis.connection import Connection


class UnfinishedTest(RuntimeError):
"""A test did never finish"""

def __init__(self, test_case):
self.test_case = test_case
super().__init__(
"Test case %r did not finish" % getattr(test_case, "__name__", test_case)
)


def via_usim(test_case: Callable[..., Coroutine]):
@@ -22,16 +34,24 @@ async def test_sleep():

@wraps(test_case)
def run_test(*args, **kwargs):
# pytest currently ignores __tracebackhide__ if we re-raise
# https://github.com/pytest-dev/pytest/issues/1904
__tracebackhide__ = True
# >>> This is not the frame you are looking for. Do read on. <<<
return run(test_case(*args, **kwargs))
test_completed = False

async def complete_test_case():
nonlocal test_completed
await test_case(*args, **kwargs)
test_completed = True

run(complete_test_case())
if not test_completed:
raise UnfinishedTest(test_case)

return run_test


class DummyScheduler:
def __init__(self):
self.statistics = Resources(job_succeeded=0, job_failed=0)

@staticmethod
def register_drone(drone: Drone):
pass
@@ -44,6 +64,18 @@ def unregister_drone(drone: Drone):
def update_drone(drone: Drone):
pass

async def job_finished(self, job: Job):
if job.successful:
await self.statistics.increase(job_succeeded=1)
else:
await self.statistics.increase(job_failed=1)


class DummyDrone:
pass
sitename = None

def __init__(self, throughput: Optional[float] = None):
if throughput:
self.connection = Connection(throughput)
else:
self.connection = None
2 changes: 1 addition & 1 deletion lapis_tests/data/htcondor_pools.csv
Original file line number Diff line number Diff line change
@@ -2,4 +2,4 @@
2 2 224400.0 8000
2 2 223100.0 8000
1 8 196300.0 32200
1 4 29700.0 8000
1 4 29700.0 8000
Empty file.
58 changes: 58 additions & 0 deletions lapis_tests/storage_io/test_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from tempfile import NamedTemporaryFile

from lapis.storageelement import StorageElement
from lapis.storage_io.storage import storage_reader


class TestStorageReader(object):
def _create_simple_config(self, to_string=False):
storage_config = NamedTemporaryFile(suffix=".csv")
with open(storage_config.name, "w") as write_stream:
write_stream.write(
f"name sitename cachesizeGB throughput_limit\n"
f"name sitename {str(10) if to_string else 10} "
f"{str(10.1) if to_string else 10.1} {str(1) if to_string else 1}"
)
return storage_config

def _create_simple_files(self, to_string=False):
file_config = NamedTemporaryFile(suffix=".csv")
with open(file_config.name, "w") as write_stream:
write_stream.write(
f"filename cachename filesize storedsize cachedsince lastaccessed "
f"numberofaccesses\n"
f"file name {str(10.1) if to_string else 10.1} "
f"{str(5.0) if to_string else 5.0} "
f"{str(0) if to_string else 0} {str(0) if to_string else 0} "
f"{str(1) if to_string else 1}"
)
return file_config

def test_empty_files(self):
simple_config = self._create_simple_config()
count = 0
for storage in storage_reader(
open(simple_config.name, "r+"), None, StorageElement
):
assert storage is not None
count += 1
assert count == 1

def test_simple_read(self):
for variant in [False, True]:
print(f"starting with {variant}")
simple_config = self._create_simple_config(to_string=variant)
simple_files = self._create_simple_files(to_string=variant)
count = 0
for storage in storage_reader(
open(simple_config.name, "r"),
open(simple_files.name, "r"),
StorageElement,
):
assert storage is not None
assert type(storage.available) == int
assert storage.available == int(5.0 * 1024 * 1024 * 1024)
assert type(storage.size) == int
assert storage.size == int(10.0 * 1024 * 1024 * 1024)
count += 1
assert count == 1
211 changes: 211 additions & 0 deletions lapis_tests/test_caching_hitrate_based.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
from usim import time
from tempfile import NamedTemporaryFile
import json
from functools import partial

from lapis_tests import via_usim, DummyDrone
from lapis.connection import Connection
from lapis.storageelement import HitrateStorage
from lapis.storage_io.storage import storage_reader
from lapis.files import RequestedFile
from lapis.simulator import Simulator
from lapis.job_io.htcondor import htcondor_job_reader
from lapis.pool import StaticPool
from lapis.pool_io.htcondor import htcondor_pool_reader
from lapis.scheduler import CondorJobScheduler


class TestHitrateCaching(object):
def test_hitratestorage(self):
size = 1000
hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={})
requested_file = RequestedFile(filename="testfile", filesize=100)
looked_up_file = hitratestorage.find(requested_file, job_repr=None)

assert size == hitratestorage.available
assert 0 == hitratestorage.used
assert 100 == looked_up_file.cached_filesize
assert hitratestorage == looked_up_file.storage

@via_usim
async def test_add_storage_to_connection(self):
throughput = 10
size = 1000
hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={})
connection = Connection(throughput=throughput)
connection.add_storage_element(hitratestorage)
assert hitratestorage in connection.storages[hitratestorage.sitename]

@via_usim
async def test_determine_inputfile_source(self):
throughput = 10
size = 1000
requested_file = RequestedFile(filename="testfile", filesize=100)
hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={})
connection = Connection(throughput=throughput)
connection.add_storage_element(hitratestorage)
cache = await connection._determine_inputfile_source(
requested_file=requested_file, dronesite=None
)
assert cache is hitratestorage

@via_usim
async def test_stream_file(self):
throughput = 10
size = 1000
requested_file = RequestedFile(filename="testfile", filesize=100)
hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={})
connection = Connection(throughput=throughput)
connection.add_storage_element(hitratestorage)
assert 0 == time.now
await connection.stream_file(requested_file=requested_file, dronesite=None)
assert 5 == time.now

@via_usim
async def test_single_transfer_files(self):
throughput = 10
size = 1000
drone = DummyDrone(throughput)
requested_files = dict(test=dict(usedsize=100))
hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={})
drone.connection.add_storage_element(hitratestorage)
stream_time = await drone.connection.transfer_files(
drone=drone, requested_files=requested_files, job_repr="test"
)

assert time.now == 5
assert stream_time == 5

@via_usim
async def test_simultaneous_transfer(self):
throughput = 10
size = 1000
drone = DummyDrone(throughput)
requested_files = dict(test1=dict(usedsize=100), test2=dict(usedsize=200))
hitratestorage = HitrateStorage(hitrate=0.5, size=size, files={})
drone.connection.add_storage_element(hitratestorage)
stream_time = await drone.connection.transfer_files(
drone=drone, requested_files=requested_files
)
assert time.now == 15
assert stream_time == 15

@via_usim
async def test_caching_simulation_duration_short_jobs(self):
simulator = Simulator()
with NamedTemporaryFile(suffix=".csv") as machine_config, NamedTemporaryFile(
suffix=".csv"
) as storage_config, NamedTemporaryFile(suffix=".json") as job_config:
with open(machine_config.name, "w") as write_stream:
write_stream.write(
"TotalSlotCPUs TotalSlotDisk TotalSlotMemory Count sitename\n"
"1 44624348.0 8000 1 site1"
)
with open(job_config.name, "w") as write_stream:
job_description = [
{
"QDate": 0,
"RequestCpus": 1,
"RequestWalltime": 60,
"RequestMemory": 1024,
"RequestDisk": 1024,
"RemoteWallClockTime": 1.0,
"MemoryUsage": 1024,
"DiskUsage_RAW": 1024,
"RemoteSysCpu": 1.0,
"RemoteUserCpu": 0.0,
"Inputfiles": dict(
file1=dict(usedsize=10), file2=dict(usedsize=5)
),
}
] * 2
json.dump(job_description, write_stream)
with open(storage_config.name, "w") as write_stream:
write_stream.write(
"name sitename cachesizeGB throughput_limit\n"
"cache1 site1 1000 1.0"
)

job_input = open(job_config.name, "r+")
machine_input = open(machine_config.name, "r+")
storage_input = open(storage_config.name, "r+")
storage_content_input = None
cache_hitrate = 0.5
simulator.create_job_generator(
job_input=job_input, job_reader=htcondor_job_reader
)
simulator.create_scheduler(scheduler_type=CondorJobScheduler)
simulator.create_connection_module(remote_throughput=1.0)
simulator.create_pools(
pool_input=machine_input,
pool_reader=htcondor_pool_reader,
pool_type=StaticPool,
)
simulator.create_storage(
storage_input=storage_input,
storage_content_input=storage_content_input,
storage_reader=storage_reader,
storage_type=partial(HitrateStorage, cache_hitrate),
)
simulator.run()
assert 180 == simulator.duration

@via_usim
async def test_caching_simulation_duration_long_jobs(self):
simulator = Simulator()
with NamedTemporaryFile(suffix=".csv") as machine_config, NamedTemporaryFile(
suffix=".csv"
) as storage_config, NamedTemporaryFile(suffix=".json") as job_config:
with open(machine_config.name, "w") as write_stream:
write_stream.write(
"TotalSlotCPUs TotalSlotDisk TotalSlotMemory Count sitename\n"
"1 44624348.0 8000 1 site1"
)
with open(job_config.name, "w") as write_stream:
job_description = [
{
"QDate": 0,
"RequestCpus": 1,
"RequestWalltime": 60,
"RequestMemory": 1024,
"RequestDisk": 1024,
"RemoteWallClockTime": 1.0,
"MemoryUsage": 1024,
"DiskUsage_RAW": 1024,
"RemoteSysCpu": 1.0,
"RemoteUserCpu": 0.0,
"Inputfiles": dict(
file1=dict(usedsize=60), file2=dict(usedsize=60)
),
}
] * 2
json.dump(job_description, write_stream)
with open(storage_config.name, "w") as write_stream:
write_stream.write(
"name sitename cachesizeGB throughput_limit\n"
"cache1 site1 1000 1.0"
)

job_input = open(job_config.name, "r+")
machine_input = open(machine_config.name, "r+")
storage_input = open(storage_config.name, "r+")
storage_content_input = None
cache_hitrate = 0.5
simulator.create_job_generator(
job_input=job_input, job_reader=htcondor_job_reader
)
simulator.create_scheduler(scheduler_type=CondorJobScheduler)
simulator.create_connection_module(remote_throughput=1.0)
simulator.create_pools(
pool_input=machine_input,
pool_reader=htcondor_pool_reader,
pool_type=StaticPool,
)
simulator.create_storage(
storage_input=storage_input,
storage_content_input=storage_content_input,
storage_reader=storage_reader,
storage_type=partial(HitrateStorage, cache_hitrate),
)
simulator.run()
assert 300 == simulator.duration
26 changes: 22 additions & 4 deletions lapis_tests/test_job.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
from lapis.drone import Drone
from lapis.job import Job
from lapis_tests import via_usim, DummyScheduler, DummyDrone
from lapis.connection import Connection


class TestJob(object):
@@ -47,10 +48,15 @@ async def test_job_in_drone(self):
scheduler=scheduler,
pool_resources={"cores": 1, "memory": 1},
scheduling_duration=0,
connection=Connection(),
)
await drone.run()
async with Scope() as scope:
scope.do(drone.run(), volatile=True)
scope.do(drone.schedule_job(job=job))
await (
scheduler.statistics._available
== scheduler.statistics.resource_type(job_succeeded=1)
)
assert 10 == time.now
assert 0 == job.waiting_time
assert job.successful
@@ -67,9 +73,13 @@ async def test_nonmatching_job_in_drone(self):
pool_resources={"cores": 1, "memory": 1},
scheduling_duration=0,
)
await drone.run()
async with Scope() as scope:
scope.do(drone.run(), volatile=True)
scope.do(drone.schedule_job(job=job))
await (
scheduler.statistics._available
== scheduler.statistics.resource_type(job_failed=1)
)
assert 0 == time
assert not job.successful
assert 0 == job.waiting_time
@@ -90,10 +100,14 @@ async def test_two_nonmatching_jobs(self):
pool_resources={"cores": 1, "memory": 1},
scheduling_duration=0,
)
await drone.run()
async with Scope() as scope:
scope.do(drone.run(), volatile=True)
scope.do(drone.schedule_job(job=job_one))
scope.do(drone.schedule_job(job=job_two))
await (
scheduler.statistics._available
== scheduler.statistics.resource_type(job_succeeded=1, job_failed=1)
)
assert 10 == time
assert job_one.successful
assert not job_two.successful
@@ -116,10 +130,14 @@ async def test_two_matching_jobs(self):
pool_resources={"cores": 2, "memory": 2},
scheduling_duration=0,
)
await drone.run()
async with Scope() as scope:
scope.do(drone.run(), volatile=True)
scope.do(drone.schedule_job(job=job_one))
scope.do(drone.schedule_job(job=job_two))
await (
scheduler.statistics._available
== scheduler.statistics.resource_type(job_succeeded=2)
)
assert 10 == time
assert job_one.successful
assert job_two.successful
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ classifiers = [
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
]
requires = ["cobald", "usim == 0.4", "click"]
requires = ["cobald", "usim >= 0.4.3", "click"]

[tool.flit.metadata.requires-extra]
test = [