Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@
### 0.4.4
- removed unwanted 'continue' from koi-worker; added explicit cast for wait time
### 0.4.5
- force a reconnect even of status variable indicates we are online
- force a reconnect even of status variable indicates we are online
### 0.4.6
- optionally skip the caching of new samples, usefull if we just want to send them and never consume or read back.
- added optional sleep time for instances that just changed, this prevents premature training of instances while someone else is writing to them#
2 changes: 1 addition & 1 deletion cpu-tf.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# convenience, we added tensorflow-hub to the image, so you can use pretrained
# models from TensorFlow Hub.

FROM tensorflow/tensorflow:2.11.0
FROM tensorflow/tensorflow:2.13.0

COPY ./dist/ /wheels/

Expand Down
4 changes: 1 addition & 3 deletions koi_core/api/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"finalized": "finalized",
"could_train": "could_train",
"last_modified": "last_modified",
"sample_last_modified": "sample_last_modified",
}


Expand Down Expand Up @@ -96,9 +97,6 @@ def get_instance_training_data(self, id: InstanceId, meta: CachingMeta):
def set_instance_training_data(self, id: InstanceId, data: bytes):
self.base._POST_raw(self.base._build_path(id) + "/training", data=data)

# endregion

# region descriptor
def get_descriptors(self, id: InstanceId, meta: CachingMeta = None):
data, meta = self.base._GET(self.base._build_path(id) + "/descriptor")
return (
Expand Down
32 changes: 30 additions & 2 deletions koi_core/control/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,25 @@ def _set_instance(instance: Instance, max_instances: int = 1):


def train(instance: Instance, batch_iterable=None, dev=False, max_instances=1):
global _runable_instance

if dev:
temp_dir = TemporaryDirectory()
model = instance.load_code(temp_dir.name)
actions.train(model, instance, batch_iterable)
temp_dir.cleanup()
else:
_set_instance(instance, max_instances)
_runable_instance.train(batch_iterable)
try:
_runable_instance.train(batch_iterable)
except:
kill_runable_instance()
raise


def infer(instance: Instance, data, dev=False, model=None, max_instances=1) -> List[Any]:
global _runable_instance

if dev:
temp_dir = None

Expand All @@ -86,7 +94,27 @@ def infer(instance: Instance, data, dev=False, model=None, max_instances=1) -> L

else:
_set_instance(instance, max_instances)
return _runable_instance.infer(data)
try:
return _runable_instance.infer(data)
except:
kill_runable_instance()
raise


def kill_runable_instance():
global _runable_instance
global _active_instances

if _runable_instance is None:
return

instance = _runable_instance.instance

_runable_instance.terminate()
_runable_instance = None

if instance in _active_instances:
del _active_instances[instance]


def terminate():
Expand Down
4 changes: 2 additions & 2 deletions koi_core/data/simple_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
@cache
def get_np(self: Union[SampleDatum, SampleLabel], meta) -> np.ndarray:
f = BytesIO(self.raw)
return np.load(f), meta
return np.load(f, allow_pickle=False), meta


def set_np(self: Union[SampleDatum, SampleLabel], value: np.ndarray) -> None:
f = BytesIO()
np.save(f, value)
np.save(f, value, allow_pickle=False)
self.raw = f.getvalue()
setCache(self, 'np', None)

Expand Down
8 changes: 4 additions & 4 deletions koi_core/resources/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# GNU Lesser General Public License is distributed along with this
# software and can be found at http://www.gnu.org/licenses/lgpl.html

from datetime import datetime
from koi_core.caching import cache, offlineFeature
from koi_core.resources.model import Model
from koi_core.resources.ids import InstanceId, ModelId, SampleId, DescriptorId
Expand Down Expand Up @@ -183,7 +182,8 @@ class InstanceBasicFields:
description: str
finalized: bool
could_train: bool
last_modified: datetime
last_modified: str
sample_last_modified: str


class InstanceProxy(Instance):
Expand Down Expand Up @@ -285,8 +285,8 @@ def __init__(self, pool: "APIObjectPool", id: Union[InstanceId, ModelId]) -> Non
self.descriptors = InstanceDescriptorAccessor(self)
self.parameter = InstanceParameterAccessor(self)

def new_sample(self):
sample = self.pool.new_sample(self.id)
def new_sample(self, remote_only:bool=False):
sample = self.pool.new_sample(self.id, not remote_only)
return sample

def merge(self, instances: Iterable):
Expand Down
5 changes: 3 additions & 2 deletions koi_core/resources/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ def new_instance(self, id: ModelId) -> Instance:
def sample(self, id: SampleId, meta) -> Sample:
return SampleProxy(self, id), meta

def new_sample(self, id: InstanceId) -> Sample:
def new_sample(self, id: InstanceId, cached:bool=True) -> Sample:
sample = SampleProxy(self, id)
setIndexedCache(self, "sample", sample.id, sample)
if cached:
setIndexedCache(self, "sample", sample.id, sample)
return sample
30 changes: 26 additions & 4 deletions koi_core/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import koi_core as koi
from koi_core.exceptions import KoiApiOfflineException
from time import sleep
from datetime import datetime


signal_interrupt = False

Expand Down Expand Up @@ -114,6 +116,13 @@ def main():
default=60,
help="Number of seconds to wait before a retry",
)
p.add(
"-w",
"--wait-training",
type=int,
default=10,
help="Number of seconds a instance has to be untouched before atempting to train it",
)

opt = p.parse_args()

Expand Down Expand Up @@ -193,9 +202,12 @@ def main():
instance.name,
)
continue


# seconds since last instance update
last_modified = datetime.fromisoformat(instance.sample_last_modified)
sec_since_last_change = (datetime.utcnow() - last_modified).total_seconds()
# train the instance if its ready to train or the user forces it
if opt.force or instance.could_train:
if opt.force or (instance.could_train and sec_since_last_change > opt.wait_training):
try:
logging.info(
"start to train instance %s/%s", model.name, instance.name
Expand All @@ -207,6 +219,13 @@ def main():
logging.exception(
"instance %s/%s had an exception", model.name, instance.name
)
else:
if (instance.could_train and sec_since_last_change < opt.wait_training):
logging.info(
"skipping instance %s/%s, as data was not present or still changing.",
model.name,
instance.name
)

# break here if the user selected to run once
if opt.once:
Expand All @@ -225,14 +244,17 @@ def main():
if retries == 0:
# if the retry counter is initilizes with a negative value, we will try forever
logging.error("koi api is offline, giving up")
raise ex
sys.exit(-1)

retries -= 1
logging.error("koi api is offline, retrying in %d seconds", opt.sleep_retry)
sleep((float)(opt.sleep_retry))

# set the online flag and try to athenticate
pool.api.reconnect()
try:
pool.api.reconnect()
except:
logging.error("could not reconnect to koi api")

logging.info("stopped")
sys.exit(0)
Expand Down
13 changes: 11 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@
requires = ["setuptools>=61.0.0"]
build-backend = "setuptools.build_meta"

[tools.setuptools]
packages = [
"koi_core",
"koi_core.api",
"koi_core.control",
"koi_core.data",
"koi_core.resources"
]

[project]
name = "koi-core"
version = "0.4.5"
version = "0.4.6"
description = "Runtime for the KOI-System including koi-worker"
readme = "README.md"
requires-python = ">=3.8,<3.11"
requires-python = ">=3.8,<3.12"
classifiers = ["Programming Language :: Python :: 3", "Operating System :: OS Independent", "License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)"]

dependencies = [
Expand Down