Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nameko/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
HEARTBEAT_CONFIG_KEY = 'HEARTBEAT'

MAX_WORKERS_CONFIG_KEY = 'max_workers'
MAX_WORKERS_PER_GROUP_CONFIG_KEY = 'MAX_WORKERS_PER_GROUP'
PARENT_CALLS_CONFIG_KEY = 'parent_calls_tracked'

DEFAULT_MAX_WORKERS = 10
DEFAULT_PARENT_CALLS_TRACKED = 10
DEFAULT_SERIALIZER = 'json'
DEFAULT_RETRY_POLICY = {'max_retries': 3}
DEFAULT_HEARTBEAT = 60
DEFAULT_WORKER_GROUP = 'DEFAULT'

CALL_ID_STACK_CONTEXT_KEY = 'call_id_stack'
AUTH_TOKEN_CONTEXT_KEY = 'auth_token'
Expand Down
39 changes: 34 additions & 5 deletions nameko/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid
import warnings
from collections import deque
from logging import getLogger
from logging import getLogger, DEBUG

import eventlet
import six
Expand All @@ -14,7 +14,8 @@
from greenlet import GreenletExit # pylint: disable=E0611
from nameko.constants import (
CALL_ID_STACK_CONTEXT_KEY, DEFAULT_MAX_WORKERS,
DEFAULT_PARENT_CALLS_TRACKED, DEFAULT_SERIALIZER, MAX_WORKERS_CONFIG_KEY,
DEFAULT_PARENT_CALLS_TRACKED, DEFAULT_SERIALIZER, DEFAULT_WORKER_GROUP,
MAX_WORKERS_CONFIG_KEY, MAX_WORKERS_PER_GROUP_CONFIG_KEY,
PARENT_CALLS_CONFIG_KEY, SERIALIZER_CONFIG_KEY)
from nameko.exceptions import ConfigurationError, ContainerBeingKilled
from nameko.extensions import (
Expand Down Expand Up @@ -142,6 +143,9 @@ def __init__(self, service_cls, config, worker_ctx_cls=None):
self.max_workers = (
config.get(MAX_WORKERS_CONFIG_KEY) or DEFAULT_MAX_WORKERS)

self.max_workers_per_group = config.get(
MAX_WORKERS_PER_GROUP_CONFIG_KEY)

self.serializer = config.get(
SERIALIZER_CONFIG_KEY, DEFAULT_SERIALIZER)

Expand All @@ -157,6 +161,8 @@ def __init__(self, service_cls, config, worker_ctx_cls=None):
self.dependencies.add(bound)
self.subextensions.update(iter_extensions(bound))

self._worker_groups = {DEFAULT_WORKER_GROUP}

for method_name, method in inspect.getmembers(service_cls, is_method):
entrypoints = getattr(method, ENTRYPOINT_EXTENSIONS_ATTR, [])
for entrypoint in entrypoints:
Expand All @@ -165,7 +171,11 @@ def __init__(self, service_cls, config, worker_ctx_cls=None):
self.subextensions.update(iter_extensions(bound))

self.started = False
self._worker_pool = GreenPool(size=self.max_workers)

self._worker_pools = {}
for group in self._worker_groups:
size = self.max_workers_per_group.get(group) or self.max_workers
self._worker_pools[group] = GreenPool(size=size)

self._worker_threads = {}
self._managed_threads = {}
Expand Down Expand Up @@ -238,7 +248,7 @@ def stop(self):

# there might still be some running workers, which we have to
# wait for to complete before we can stop dependencies
self._worker_pool.waitall()
self._wait_for_worker_pools()

# it should be safe now to stop any dependency as there is no
# active worker which could be using it
Expand Down Expand Up @@ -320,6 +330,9 @@ def wait(self):
"""
return self._died.wait()

def register_worker_group(self, worker_group):
self._worker_groups.add(worker_group)

def spawn_worker(self, entrypoint, args, kwargs,
context_data=None, handle_result=None):
""" Spawn a worker thread for running the service method decorated
Expand All @@ -345,12 +358,20 @@ def spawn_worker(self, entrypoint, args, kwargs,
self, service, entrypoint, args, kwargs, data=context_data)

_log.debug('spawning %s', worker_ctx)
gt = self._worker_pool.spawn(
worker_pool = self._get_worker_pool_for_entrypoint(entrypoint)
gt = worker_pool.spawn(
self._run_worker, worker_ctx, handle_result
)
gt.link(self._handle_worker_thread_exited, worker_ctx)

self._worker_threads[worker_ctx] = gt

if _log.isEnabledFor(DEBUG):
for key, pool in self._worker_pools.items():
_log.debug(
'worker-pool:%s has %s free of %s' % (
key, pool.free(), pool.size))

return worker_ctx

def spawn_managed_thread(self, fn, protected=None, identifier=None):
Expand Down Expand Up @@ -383,6 +404,14 @@ def spawn_managed_thread(self, fn, protected=None, identifier=None):
gt.link(self._handle_managed_thread_exited, identifier)
return gt

def _get_worker_pool_for_entrypoint(self, entrypoint):
key = entrypoint.worker_group or DEFAULT_WORKER_GROUP
return self._worker_pools[key]

def _wait_for_worker_pools(self):
for pool in self._worker_pools.values():
pool.waitall()

def _run_worker(self, worker_ctx, handle_result):
_log.debug('setting up %s', worker_ctx)

Expand Down
4 changes: 4 additions & 0 deletions nameko/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,17 @@ def register_entrypoint(fn, entrypoint):
class Entrypoint(Extension):

method_name = None
worker_group = None

def bind(self, container, method_name):
""" Get an instance of this Entrypoint to bind to `container` with
`method_name`.
"""
instance = super(Entrypoint, self).bind(container)
instance.method_name = method_name
if instance.worker_group:
container.register_worker_group(instance.worker_group)

return instance

def check_signature(self, args, kwargs):
Expand Down
3 changes: 2 additions & 1 deletion nameko/web/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
class HttpRequestHandler(Entrypoint):
server = WebServer()

def __init__(self, method, url, expected_exceptions=()):
def __init__(self, method, url, expected_exceptions=(), worker_group=None):
self.method = method
self.url = url
self.expected_exceptions = expected_exceptions
self.worker_group = worker_group

def get_url_rule(self):
return Rule(self.url, methods=[self.method])
Expand Down