diff --git a/CHANGES b/CHANGES index b46839c..974ccbd 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,11 @@ +blueox (0.12.0) + * Move recorders to separate module + * Add pycernan recorder + * Update the way blueox is configured to allow desired + recorder from imported constant + +-- Aaron Biller Tue Sep 4 12:40:45 2018 -0400 + blueox (0.11.6.4) * Fix encoding of unknown types diff --git a/blueox/__init__.py b/blueox/__init__.py index 2f3fc36..69d63c3 100644 --- a/blueox/__init__.py +++ b/blueox/__init__.py @@ -9,7 +9,7 @@ """ __title__ = 'blueox' -__version__ = '0.11.6.4' +__version__ = '0.12.0' __author__ = 'Rhett Garber' __author_email__ = 'rhettg@gmail.com' __license__ = 'ISC' @@ -18,10 +18,8 @@ __url__ = 'https://github.com/rhettg/BlueOx' import logging -import os from . import utils -from . import network from . import ports from .context import ( Context, set, append, add, context_wrap, current_context, find_context, @@ -30,36 +28,53 @@ from .errors import Error from .logger import LogHandler from .timer import timeit +from .recorders import pycernan, zmq log = logging.getLogger(__name__) +ZMQ_RECORDER = 'zmq' +PYCERNAN_RECORDER = 'pycernan' +RECORDERS = { + ZMQ_RECORDER: zmq, + PYCERNAN_RECORDER: pycernan, +} +DEFAULT_RECORDER = ZMQ_RECORDER + def configure(host, port, recorder=None): """Initialize blueox - This instructs the blueox system where to send it's logging data. If blueox is not configured, log data will - be silently dropped. + This instructs the blueox system where to send its logging data. + If blueox is not configured, log data will be silently dropped. - Currently we support logging through the network (and the configured host and port) to a blueoxd instances, or - to the specified recorder function + Currently we support logging through the network (and the configured host + and port) to a blueoxd instances, or to the specified recorder function. """ - if recorder: + if callable(recorder): _context_mod._recorder_function = recorder - elif host and port: - network.init(host, port) - _context_mod._recorder_function = network.send + else: - log.info("Empty blueox configuration") - _context_mod._recorder_function = None + _rec = RECORDERS.get(recorder, None) + if _rec is not None: + _rec.init(host, port) + _context_mod._recorder_function = _rec.send + else: + log.info("Empty blueox configuration") + _context_mod._recorder_function = None -def default_configure(host=None): + +def default_configure(host=None, recorder=DEFAULT_RECORDER): """Configure BlueOx based on defaults Accepts a connection string override in the form `localhost:3514`. Respects environment variable BLUEOX_HOST """ - host = ports.default_collect_host(host) + _rec = RECORDERS.get(recorder, None) + if _rec is None: + _rec = RECORDERS.get(DEFAULT_RECORDER) + + host = _rec.default_host(host) hostname, port = host.split(':') try: @@ -67,8 +82,9 @@ def default_configure(host=None): except ValueError: raise Error("Invalid value for port") - configure(hostname, int_port) + configure(hostname, int_port, recorder=recorder) def shutdown(): - network.close() + zmq.close() + pycernan.close() diff --git a/blueox/client.py b/blueox/client.py index 8062c64..ec2f420 100644 --- a/blueox/client.py +++ b/blueox/client.py @@ -3,7 +3,8 @@ blueox.client ~~~~~~~~ -This module provides utilities for writing client applications which connect or use blueox data. +This module provides utilities for writing client applications +which connector use blueox data. :copyright: (c) 2012 by Rhett Garber :license: ISC, see LICENSE for more details. @@ -35,7 +36,8 @@ def default_host(host=None): def decode_stream(stream): - """A generator which reads data out of the buffered file stream, unpacks and decodes the blueox events + """A generator which reads data out of the buffered file stream, + unpacks and decodes the blueox events This is useful for parsing on disk log files generated by blueoxd """ @@ -97,8 +99,8 @@ def subscribe_stream(control_host, subscribe): sock.connect("tcp://%s" % (stream_host,)) # Now that we are connected, loop almost forever emiting events. - # If we fail to receive any events within the specified timeout, we'll quit - # and verify that we are connected to a valid stream. + # If we fail to receive any events within the specified timeout, + # we'll quit and verify that we are connected to a valid stream. poller = zmq.Poller() poller.register(sock, zmq.POLLIN) while True: @@ -113,7 +115,7 @@ def subscribe_stream(control_host, subscribe): if not prefix and subscription and channel != subscription: continue - yield msgpack.unpackb(data,encoding='utf8') + yield msgpack.unpackb(data, encoding='utf8') else: break @@ -137,10 +139,10 @@ def stdin_stream(): class Grouper(object): """Utility for grouping events and sub-events together. - + Events fed into a Grouper are joined by their common 'id'. Encountering the parent event type will trigger emitting a list of all events and sub events - for that single id. + for that single id. This assumes that the parent event will be the last encountered. diff --git a/blueox/context.py b/blueox/context.py index c23fcb7..fc117a9 100644 --- a/blueox/context.py +++ b/blueox/context.py @@ -19,7 +19,6 @@ import logging from . import utils -from . import network log = logging.getLogger(__name__) @@ -41,8 +40,10 @@ def __init__(self, type_name, id=None, sample=None): heirarchy of parent requests. Examples: '.foo' - Will generate a name like '.foo' - '.foo.bar' - If the parent ends in '.foo', the final name will be '.bar' - '^.foo' - Will use the top-most context, generating '.foo' + '.foo.bar' - If the parent ends in '.foo', the final name + will be '.bar' + '^.foo' - Will use the top-most context, generating + '.foo' 'top.foo.bar' - The name will be based on the longest matched parent context. If there is a parent context named 'top' and a parent context named 'top.foo', the new context will be named @@ -111,11 +112,13 @@ def __init__(self, type_name, id=None, sample=None): elif parent_ctx: self.id = parent_ctx.id else: - # Generate an id if one wasn't provided and we don't have any parents - # We're going to encode the time as the front 4 bytes so we have some order to the ids - # that could prove useful later on by making sorting a little easier. - self.id = (struct.pack(">L", int(time.time())) + os.urandom(12)).encode( - 'hex') + # Generate an id if one wasn't provided and we don't have any + # parents. We're going to encode the time as the front 4 bytes + # so we have some order to the ids that could prove useful + # later on by making sorting a little easier. + self.id = ( + struct.pack(">L", int(time.time())) + + os.urandom(12)).encode('hex') if parent_ctx and not parent_ctx.enabled: self.enabled = False diff --git a/blueox/contrib/__init__.py b/blueox/contrib/__init__.py index 8b13789..e69de29 100644 --- a/blueox/contrib/__init__.py +++ b/blueox/contrib/__init__.py @@ -1 +0,0 @@ - diff --git a/blueox/contrib/celery/__init__.py b/blueox/contrib/celery/__init__.py index 8b13789..e69de29 100644 --- a/blueox/contrib/celery/__init__.py +++ b/blueox/contrib/celery/__init__.py @@ -1 +0,0 @@ - diff --git a/blueox/contrib/celery/celery_signals.py b/blueox/contrib/celery/celery_signals.py index 3c20092..c37dba6 100644 --- a/blueox/contrib/celery/celery_signals.py +++ b/blueox/contrib/celery/celery_signals.py @@ -1,8 +1,7 @@ """Hooks for gathering celery task data into blueox. -Importing this module will register signal handlers into Celery worker's runtime. - -We also will track creation of tasks on the client side. +Importing this module will register signal handlers into Celery +worker's runtime. We also will track creation of tasks on the client side. """ import traceback @@ -33,9 +32,9 @@ def on_task_sent(sender=None, body=None, **kwargs): @signals.task_sent.connect def on_task_sent(**kwargs): with blueox.Context('.celery.task_sent'): - # Arguments for this signal are different than the worker signals. Sometimes - # they are even different than what the documentation says. See also - # https://github.com/celery/celery/issues/1606 + # Arguments for this signal are different than the worker signals. + # Sometimes they are even different than what the documentation + # says. See also https://github.com/celery/celery/issues/1606 blueox.set('task_id', kwargs.get('task_id', kwargs.get('id'))) blueox.set('task', str(kwargs['task'])) blueox.set('eta', kwargs['eta']) @@ -43,7 +42,14 @@ def on_task_sent(**kwargs): @signals.worker_process_init.connect def on_worker_process_init(**kwargs): - if hasattr(settings, 'BLUEOX_HOST'): + if hasattr(settings, 'BLUEOX_PYCERNAN_HOST'): + if settings.BLUEOX_PYCERNAN_HOST: + rec = blueox.PYCERNAN_RECORDER + blueox.default_configure( + settings.BLUEOX_PYCERNAN_HOST, recorder=rec) + else: + blueox.configure(None, None) + elif hasattr(settings, 'BLUEOX_HOST'): if settings.BLUEOX_HOST: blueox.default_configure(settings.BLUEOX_HOST) else: diff --git a/blueox/contrib/django/__init__.py b/blueox/contrib/django/__init__.py index 8b13789..e69de29 100644 --- a/blueox/contrib/django/__init__.py +++ b/blueox/contrib/django/__init__.py @@ -1 +0,0 @@ - diff --git a/blueox/contrib/django/middleware.py b/blueox/contrib/django/middleware.py index b16f486..1471ac2 100644 --- a/blueox/contrib/django/middleware.py +++ b/blueox/contrib/django/middleware.py @@ -1,6 +1,5 @@ import sys import traceback -import logging import blueox @@ -10,7 +9,14 @@ class Middleware(object): def __init__(self): - if hasattr(settings, 'BLUEOX_HOST'): + if hasattr(settings, 'BLUEOX_PYCERNAN_HOST'): + if settings.BLUEOX_PYCERNAN_HOST: + rec = blueox.PYCERNAN_RECORDER + blueox.default_configure( + settings.BLUEOX_PYCERNAN_HOST, recorder=rec) + else: + blueox.configure(None, None) + elif hasattr(settings, 'BLUEOX_HOST'): if settings.BLUEOX_HOST: blueox.default_configure(settings.BLUEOX_HOST) else: @@ -28,7 +34,9 @@ def process_request(self, request): headers = {} for k, v in request.META.iteritems(): - if k.startswith('HTTP_') or k in ('CONTENT_LENGTH', 'CONTENT_TYPE'): + if ( + k.startswith('HTTP_') or + k in ('CONTENT_LENGTH', 'CONTENT_TYPE')): headers[k] = v blueox.set('headers', headers) diff --git a/blueox/contrib/flask/__init__.py b/blueox/contrib/flask/__init__.py index 56fb178..57de85d 100644 --- a/blueox/contrib/flask/__init__.py +++ b/blueox/contrib/flask/__init__.py @@ -23,7 +23,13 @@ class BlueOxMiddleware(object): def __init__(self, app): self.app = app - if 'BLUEOX_HOST' in app.config: + if 'BLUEOX_PYCERNAN_HOST' in app.config: + self.blueox_pycernan_host = app.config['BLUEOX_PYCERNAN_HOST'] + if self.blueox_pycernan_host: + rec = blueox.PYCERNAN_RECORDER + blueox.default_configure( + self.blueox_pycernan_host, recorder=rec) + elif 'BLUEOX_HOST' in app.config: self.blueox_host = app.config['BLUEOX_HOST'] if self.blueox_host: blueox.default_configure(self.blueox_host) @@ -45,8 +51,8 @@ def before_request(self, *args, **kwargs): headers = {} for k, v in request.environ.iteritems(): if ( - k.startswith('HTTP_') or k in - ('CONTENT_LENGTH', 'CONTENT_TYPE')): + k.startswith('HTTP_') or + k in ('CONTENT_LENGTH', 'CONTENT_TYPE')): headers[k] = v blueox.set('headers', headers) diff --git a/blueox/logger.py b/blueox/logger.py index 37d2180..6a7c025 100644 --- a/blueox/logger.py +++ b/blueox/logger.py @@ -3,7 +3,9 @@ blueox.logger ~~~~~~~~ -This module provides integration with blueox and standard python logging module. +This module provides integration with blueox and standard +python logging module. + :copyright: (c) 2012 by Rhett Garber :license: ISC, see LICENSE for more details. @@ -20,7 +22,8 @@ class LogHandler(logging.Handler): Records standard fields such as logger name, level the message and if an exception was provided, the string formatted exception. - The type name, if not specified will be something like '.log' + The type name, if not specified will be something like + '.log' """ def __init__(self, type_name=None): diff --git a/blueox/ports.py b/blueox/ports.py index 5b1ca2f..d977847 100644 --- a/blueox/ports.py +++ b/blueox/ports.py @@ -28,7 +28,7 @@ def _default_host(host, default_host, default_port): if not host: host = default_host if ':' not in host: - host = "{}:{}".format(host, default_port) + host = '{}:{}'.format(host, default_port) return host @@ -41,3 +41,13 @@ def default_control_host(host=None): def default_collect_host(host=None): default_host = os.environ.get(ENV_VAR_COLLECT_HOST, DEFAULT_HOST) return _default_host(host, default_host, DEFAULT_COLLECT_PORT) + + +# For consistency, we'll abstract pycernan connections in the same way +ENV_VAR_PYCERNAN_HOST = 'BLUEOX_PYCERNAN_HOST' +DEFAULT_PYCERNAN_PORT = 2003 + + +def default_pycernan_host(host=None): + default_host = os.environ.get(ENV_VAR_PYCERNAN_HOST, DEFAULT_HOST) + return _default_host(host, default_host, DEFAULT_PYCERNAN_PORT) diff --git a/blueox/recorders/__init__.py b/blueox/recorders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/blueox/recorders/pycernan.py b/blueox/recorders/pycernan.py new file mode 100644 index 0000000..1edfd5b --- /dev/null +++ b/blueox/recorders/pycernan.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +""" +blueox.recorders.pycernan +~~~~~~~~ + +This module provides the interface into pycernan + +:copyright: (c) 2018 by Aaron Biller?? +:license: ISC, see LICENSE for more details. + +""" +from __future__ import absolute_import + +import atexit +import datetime +import decimal +import json +import logging +import os +import threading + +from pycernan.avro import Client + +from blueox import ports + +log = logging.getLogger(__name__) + +_uname = os.uname()[1] + +# Global blueox avro schema definition +BLUEOX_AVRO_RECORD = { + "doc": "A BlueOx event", + "name": "blueox_event", + "namespace": "blueox.{}".format(_uname), + "type": "record", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "type", "type": "string"}, + {"name": "host", "type": "string"}, + {"name": "pid", "type": "long"}, + {"name": "start", "type": "double"}, + {"name": "end", "type": "double"}, + {"name": "body", "type": ["null", "string"], "default": "null"} + ] +} + + +def default_host(host=None): + """Build a default host string for pycernan + """ + return ports.default_pycernan_host(host) + + +def _serializer(obj): + """Serialize native python objects + """ + if isinstance(obj, (datetime.datetime, datetime.date)): + return obj.isoformat() + elif isinstance(obj, decimal.Decimal): + return float(obj) + try: + obj = str(obj) + except Exception: + raise TypeError(repr(obj) + ' is not JSON serializable') + return obj + + +threadLocal = threading.local() + +# Context can be shared between threads +_client = None + + +def init(host, port): + global _client + + _client = Client(host=host, port=port) + + +def _thread_connect(): + if _client and not getattr(threadLocal, 'client', None): + threadLocal.client = _client + + +def _serialize_context(context): + context_dict = context.to_dict() + for key in ('host', 'type'): + if len(context_dict.get(key, '')) > 64: + raise ValueError('Value too long: %r' % key) + + context_dict['id'] = str(context_dict['id']) + + body = context_dict.get('body', None) + if body is not None: + try: + context_dict['body'] = json.dumps(body, default=_serializer) + except (TypeError, ValueError): + try: + context_dict['body'] = unicode(body) + except Exception: + log.exception( + 'Serialization failure (not fatal, dropping data)') + context_dict['body'] = None + + context_dict = { + k: v.encode('utf-8') if isinstance(v, unicode) + else v for k, v in context_dict.items() + } + + return context_dict + + +def send(context): + _thread_connect() + + try: + context_data = [_serialize_context(context)] + except Exception: + log.exception('Failed to serialize context') + return + + if _client and threadLocal.client is not None: + try: + log.debug('Sending msg') + threadLocal.client.publish( + BLUEOX_AVRO_RECORD, context_data, sync=False) + except Exception: + log.exception('Failed during publish to pycernan.') + else: + log.info('Skipping sending event %s', context.name) + + +def close(): + if getattr(threadLocal, 'client', None): + threadLocal.client.close() + threadLocal.client = None + + +atexit.register(close) diff --git a/blueox/network.py b/blueox/recorders/zmq.py similarity index 94% rename from blueox/network.py rename to blueox/recorders/zmq.py index ede1a1e..fa02207 100644 --- a/blueox/network.py +++ b/blueox/recorders/zmq.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -blueox.network +blueox.recorders.zmq ~~~~~~~~ This module provides our interface into ZeroMQ @@ -9,6 +9,8 @@ :license: ISC, see LICENSE for more details. """ +from __future__ import absolute_import + import atexit import logging import msgpack @@ -16,7 +18,8 @@ import threading import zmq -from . import utils +from blueox import ports +from blueox import utils log = logging.getLogger(__name__) @@ -44,6 +47,12 @@ def check_meta_version(meta): raise ValueError(value) +def default_host(host=None): + """Build a default host string for the blueox collector + """ + return ports.default_collect_host(host) + + threadLocal = threading.local() # Context can be shared between threads diff --git a/blueox/store.py b/blueox/store.py index 66f4f19..afeaa2f 100644 --- a/blueox/store.py +++ b/blueox/store.py @@ -226,7 +226,8 @@ def list_log_files(log_path): def filter_log_files_for_active(log_files): - """Filter our list of log files to remove those we expect might be active.""" + """Filter our list of log files to remove those we expect might be active. + """ out_log_files = [] files_by_type = collections.defaultdict(list) @@ -242,11 +243,11 @@ def filter_log_files_for_active(log_files): out_log_files += type_files - # If that last log file is old, then it's probably not being used either. - # We add a buffer of an hour just to make sure everything has rotated - # away safely when this is run close to midnight. - cutoff_date = (datetime.datetime.utcnow() - datetime.timedelta(hours=1) - ).date() + # If that last log file is old, then it's probably not being used + # either. We add a buffer of an hour just to make sure everything has + # rotated away safely when this is run close to midnight. + cutoff_date = ( + datetime.datetime.utcnow() - datetime.timedelta(hours=1)).date() if last_lf.date < cutoff_date: out_log_files.append(last_lf) diff --git a/blueox/timer.py b/blueox/timer.py index dc0cf18..77a22ff 100644 --- a/blueox/timer.py +++ b/blueox/timer.py @@ -3,7 +3,9 @@ blueox.timer ~~~~~~~~ -This module has a timer context manager for easily tracking wall-clock time for some execution +This module has a timer context manager for easily tracking wall-clock +time for some execution + :copyright: (c) 2012 by Rhett Garber :license: ISC, see LICENSE for more details. diff --git a/blueox/tornado_utils.py b/blueox/tornado_utils.py index ffbfa9b..af3e2c4 100644 --- a/blueox/tornado_utils.py +++ b/blueox/tornado_utils.py @@ -5,8 +5,8 @@ This module provides hooks for using blueox with the Tornado async web server. Making blueox useful inside tornado is a challenge since you'll likely want a -blueox context per request, but multiple requests can be going on at once inside -tornado. +blueox context per request, but multiple requests can be going on at once +inside tornado. :copyright: (c) 2012 by Rhett Garber :license: ISC, see LICENSE for more details. @@ -19,8 +19,6 @@ import sys import time -log = logging.getLogger(__name__) - import tornado.web import tornado.gen import tornado.httpclient @@ -29,6 +27,8 @@ import blueox +log = logging.getLogger(__name__) + def _gen_wrapper(ctx, generator): """Generator Wrapper that starts/stops our context @@ -112,7 +112,8 @@ def on_finish(self): class SampleRequestHandler(BlueOxRequestHandlerMixin, tornado.web.RequestHandler): - """Sample base request handler that provides basic information about the request. + """Sample base request handler that provides basic + information about the request. """ def prepare(self): @@ -123,8 +124,8 @@ def prepare(self): def write_error(self, status_code, **kwargs): if 'exc_info' in kwargs: - blueox.set('exception', - ''.join(traceback.format_exception(*kwargs["exc_info"]))) + blueox.set('exception', ''.join( + traceback.format_exception(*kwargs["exc_info"]))) return super(SampleRequestHandler, self).write_error(status_code, **kwargs) @@ -159,15 +160,16 @@ def fetch(self, request, callback=None, **kwargs): ctx.stop() # I'd love to use the future to handle the completion step, BUT, we - # need this to happen first. If the caller has provided a callback, we don't want them - # to get called before we do. Rather than poke into the internal datastructures, we'll just - # handle the callback explicitly + # need this to happen first. If the caller has provided a callback, we + # don't want them to get called before we do. Rather than poke into the + # internal datastructures, we'll just handle the callback explicitly def complete_context(response): ctx.start() ctx.set('response.code', response.code) - ctx.set('response.size', len(response.body) if response.body else 0) + ctx.set('response.size', + len(response.body) if response.body else 0) ctx.done() @@ -175,12 +177,12 @@ def complete_context(response): def fetch_complete(future): # This error handling is just copied from tornado.httpclient as - # we need to record a real HTTPError. httpclient might do the same thing - # again if needs to deal with the caller's callbacks. + # we need to record a real HTTPError. httpclient might do the + # same thing again if needs to deal with the caller's callbacks exc = future.exception() - if isinstance( - exc, - tornado.httpclient.HTTPError) and exc.response is not None: + if ( + isinstance(exc, tornado.httpclient.HTTPError) and + exc.response is not None): response = exc.response elif exc is not None: response = tornado.httpclient.HTTPResponse( diff --git a/pycernan.patch b/pycernan.patch new file mode 100644 index 0000000..91783f3 --- /dev/null +++ b/pycernan.patch @@ -0,0 +1,1273 @@ +diff --git a/CHANGES b/CHANGES +index b46839c..974ccbd 100644 +--- a/CHANGES ++++ b/CHANGES +@@ -1,3 +1,11 @@ ++blueox (0.12.0) ++ * Move recorders to separate module ++ * Add pycernan recorder ++ * Update the way blueox is configured to allow desired ++ recorder from imported constant ++ ++-- Aaron Biller Tue Sep 4 12:40:45 2018 -0400 ++ + blueox (0.11.6.4) + * Fix encoding of unknown types + +diff --git a/blueox/__init__.py b/blueox/__init__.py +index 2f3fc36..69d63c3 100644 +--- a/blueox/__init__.py ++++ b/blueox/__init__.py +@@ -9,7 +9,7 @@ blueox + """ + + __title__ = 'blueox' +-__version__ = '0.11.6.4' ++__version__ = '0.12.0' + __author__ = 'Rhett Garber' + __author_email__ = 'rhettg@gmail.com' + __license__ = 'ISC' +@@ -18,10 +18,8 @@ __description__ = 'A library for python-based application logging and data colle + __url__ = 'https://github.com/rhettg/BlueOx' + + import logging +-import os + + from . import utils +-from . import network + from . import ports + from .context import ( + Context, set, append, add, context_wrap, current_context, find_context, +@@ -30,36 +28,53 @@ from . import context as _context_mod + from .errors import Error + from .logger import LogHandler + from .timer import timeit ++from .recorders import pycernan, zmq + + log = logging.getLogger(__name__) + ++ZMQ_RECORDER = 'zmq' ++PYCERNAN_RECORDER = 'pycernan' ++RECORDERS = { ++ ZMQ_RECORDER: zmq, ++ PYCERNAN_RECORDER: pycernan, ++} ++DEFAULT_RECORDER = ZMQ_RECORDER ++ + + def configure(host, port, recorder=None): + """Initialize blueox + +- This instructs the blueox system where to send it's logging data. If blueox is not configured, log data will +- be silently dropped. ++ This instructs the blueox system where to send its logging data. ++ If blueox is not configured, log data will be silently dropped. + +- Currently we support logging through the network (and the configured host and port) to a blueoxd instances, or +- to the specified recorder function ++ Currently we support logging through the network (and the configured host ++ and port) to a blueoxd instances, or to the specified recorder function. + """ +- if recorder: ++ if callable(recorder): + _context_mod._recorder_function = recorder +- elif host and port: +- network.init(host, port) +- _context_mod._recorder_function = network.send ++ + else: +- log.info("Empty blueox configuration") +- _context_mod._recorder_function = None ++ _rec = RECORDERS.get(recorder, None) + ++ if _rec is not None: ++ _rec.init(host, port) ++ _context_mod._recorder_function = _rec.send ++ else: ++ log.info("Empty blueox configuration") ++ _context_mod._recorder_function = None + +-def default_configure(host=None): ++ ++def default_configure(host=None, recorder=DEFAULT_RECORDER): + """Configure BlueOx based on defaults + + Accepts a connection string override in the form `localhost:3514`. Respects + environment variable BLUEOX_HOST + """ +- host = ports.default_collect_host(host) ++ _rec = RECORDERS.get(recorder, None) ++ if _rec is None: ++ _rec = RECORDERS.get(DEFAULT_RECORDER) ++ ++ host = _rec.default_host(host) + hostname, port = host.split(':') + + try: +@@ -67,8 +82,9 @@ def default_configure(host=None): + except ValueError: + raise Error("Invalid value for port") + +- configure(hostname, int_port) ++ configure(hostname, int_port, recorder=recorder) + + + def shutdown(): +- network.close() ++ zmq.close() ++ pycernan.close() +diff --git a/blueox/client.py b/blueox/client.py +index 8062c64..ec2f420 100644 +--- a/blueox/client.py ++++ b/blueox/client.py +@@ -3,7 +3,8 @@ + blueox.client + ~~~~~~~~ + +-This module provides utilities for writing client applications which connect or use blueox data. ++This module provides utilities for writing client applications ++which connector use blueox data. + + :copyright: (c) 2012 by Rhett Garber + :license: ISC, see LICENSE for more details. +@@ -35,7 +36,8 @@ def default_host(host=None): + + + def decode_stream(stream): +- """A generator which reads data out of the buffered file stream, unpacks and decodes the blueox events ++ """A generator which reads data out of the buffered file stream, ++ unpacks and decodes the blueox events + + This is useful for parsing on disk log files generated by blueoxd + """ +@@ -97,8 +99,8 @@ def subscribe_stream(control_host, subscribe): + sock.connect("tcp://%s" % (stream_host,)) + + # Now that we are connected, loop almost forever emiting events. +- # If we fail to receive any events within the specified timeout, we'll quit +- # and verify that we are connected to a valid stream. ++ # If we fail to receive any events within the specified timeout, ++ # we'll quit and verify that we are connected to a valid stream. + poller = zmq.Poller() + poller.register(sock, zmq.POLLIN) + while True: +@@ -113,7 +115,7 @@ def subscribe_stream(control_host, subscribe): + if not prefix and subscription and channel != subscription: + continue + +- yield msgpack.unpackb(data,encoding='utf8') ++ yield msgpack.unpackb(data, encoding='utf8') + else: + break + +@@ -137,10 +139,10 @@ def stdin_stream(): + + class Grouper(object): + """Utility for grouping events and sub-events together. +- ++ + Events fed into a Grouper are joined by their common 'id'. Encountering the + parent event type will trigger emitting a list of all events and sub events +- for that single id. ++ for that single id. + + This assumes that the parent event will be the last encountered. + +diff --git a/blueox/context.py b/blueox/context.py +index c23fcb7..fc117a9 100644 +--- a/blueox/context.py ++++ b/blueox/context.py +@@ -19,7 +19,6 @@ import functools + import logging + + from . import utils +-from . import network + + log = logging.getLogger(__name__) + +@@ -41,8 +40,10 @@ class Context(object): + heirarchy of parent requests. Examples: + + '.foo' - Will generate a name like '.foo' +- '.foo.bar' - If the parent ends in '.foo', the final name will be '.bar' +- '^.foo' - Will use the top-most context, generating '.foo' ++ '.foo.bar' - If the parent ends in '.foo', the final name ++ will be '.bar' ++ '^.foo' - Will use the top-most context, generating ++ '.foo' + 'top.foo.bar' - The name will be based on the longest matched + parent context. If there is a parent context named 'top' and a + parent context named 'top.foo', the new context will be named +@@ -111,11 +112,13 @@ class Context(object): + elif parent_ctx: + self.id = parent_ctx.id + else: +- # Generate an id if one wasn't provided and we don't have any parents +- # We're going to encode the time as the front 4 bytes so we have some order to the ids +- # that could prove useful later on by making sorting a little easier. +- self.id = (struct.pack(">L", int(time.time())) + os.urandom(12)).encode( +- 'hex') ++ # Generate an id if one wasn't provided and we don't have any ++ # parents. We're going to encode the time as the front 4 bytes ++ # so we have some order to the ids that could prove useful ++ # later on by making sorting a little easier. ++ self.id = ( ++ struct.pack(">L", int(time.time())) ++ + os.urandom(12)).encode('hex') + + if parent_ctx and not parent_ctx.enabled: + self.enabled = False +diff --git a/blueox/contrib/__init__.py b/blueox/contrib/__init__.py +index 8b13789..e69de29 100644 +--- a/blueox/contrib/__init__.py ++++ b/blueox/contrib/__init__.py +@@ -1 +0,0 @@ +- +diff --git a/blueox/contrib/celery/__init__.py b/blueox/contrib/celery/__init__.py +index 8b13789..e69de29 100644 +--- a/blueox/contrib/celery/__init__.py ++++ b/blueox/contrib/celery/__init__.py +@@ -1 +0,0 @@ +- +diff --git a/blueox/contrib/celery/celery_signals.py b/blueox/contrib/celery/celery_signals.py +index 3c20092..c37dba6 100644 +--- a/blueox/contrib/celery/celery_signals.py ++++ b/blueox/contrib/celery/celery_signals.py +@@ -1,8 +1,7 @@ + """Hooks for gathering celery task data into blueox. + +-Importing this module will register signal handlers into Celery worker's runtime. +- +-We also will track creation of tasks on the client side. ++Importing this module will register signal handlers into Celery ++worker's runtime. We also will track creation of tasks on the client side. + """ + import traceback + +@@ -33,9 +32,9 @@ else: + @signals.task_sent.connect + def on_task_sent(**kwargs): + with blueox.Context('.celery.task_sent'): +- # Arguments for this signal are different than the worker signals. Sometimes +- # they are even different than what the documentation says. See also +- # https://github.com/celery/celery/issues/1606 ++ # Arguments for this signal are different than the worker signals. ++ # Sometimes they are even different than what the documentation ++ # says. See also https://github.com/celery/celery/issues/1606 + blueox.set('task_id', kwargs.get('task_id', kwargs.get('id'))) + blueox.set('task', str(kwargs['task'])) + blueox.set('eta', kwargs['eta']) +@@ -43,7 +42,14 @@ else: + + @signals.worker_process_init.connect + def on_worker_process_init(**kwargs): +- if hasattr(settings, 'BLUEOX_HOST'): ++ if hasattr(settings, 'BLUEOX_PYCERNAN_HOST'): ++ if settings.BLUEOX_PYCERNAN_HOST: ++ rec = blueox.PYCERNAN_RECORDER ++ blueox.default_configure( ++ settings.BLUEOX_PYCERNAN_HOST, recorder=rec) ++ else: ++ blueox.configure(None, None) ++ elif hasattr(settings, 'BLUEOX_HOST'): + if settings.BLUEOX_HOST: + blueox.default_configure(settings.BLUEOX_HOST) + else: +diff --git a/blueox/contrib/django/__init__.py b/blueox/contrib/django/__init__.py +index 8b13789..e69de29 100644 +--- a/blueox/contrib/django/__init__.py ++++ b/blueox/contrib/django/__init__.py +@@ -1 +0,0 @@ +- +diff --git a/blueox/contrib/django/middleware.py b/blueox/contrib/django/middleware.py +index b16f486..1471ac2 100644 +--- a/blueox/contrib/django/middleware.py ++++ b/blueox/contrib/django/middleware.py +@@ -1,6 +1,5 @@ + import sys + import traceback +-import logging + + import blueox + +@@ -10,7 +9,14 @@ from django.conf import settings + class Middleware(object): + + def __init__(self): +- if hasattr(settings, 'BLUEOX_HOST'): ++ if hasattr(settings, 'BLUEOX_PYCERNAN_HOST'): ++ if settings.BLUEOX_PYCERNAN_HOST: ++ rec = blueox.PYCERNAN_RECORDER ++ blueox.default_configure( ++ settings.BLUEOX_PYCERNAN_HOST, recorder=rec) ++ else: ++ blueox.configure(None, None) ++ elif hasattr(settings, 'BLUEOX_HOST'): + if settings.BLUEOX_HOST: + blueox.default_configure(settings.BLUEOX_HOST) + else: +@@ -28,7 +34,9 @@ class Middleware(object): + + headers = {} + for k, v in request.META.iteritems(): +- if k.startswith('HTTP_') or k in ('CONTENT_LENGTH', 'CONTENT_TYPE'): ++ if ( ++ k.startswith('HTTP_') or ++ k in ('CONTENT_LENGTH', 'CONTENT_TYPE')): + headers[k] = v + blueox.set('headers', headers) + +diff --git a/blueox/contrib/flask/__init__.py b/blueox/contrib/flask/__init__.py +index 56fb178..57de85d 100644 +--- a/blueox/contrib/flask/__init__.py ++++ b/blueox/contrib/flask/__init__.py +@@ -23,7 +23,13 @@ class BlueOxMiddleware(object): + def __init__(self, app): + self.app = app + +- if 'BLUEOX_HOST' in app.config: ++ if 'BLUEOX_PYCERNAN_HOST' in app.config: ++ self.blueox_pycernan_host = app.config['BLUEOX_PYCERNAN_HOST'] ++ if self.blueox_pycernan_host: ++ rec = blueox.PYCERNAN_RECORDER ++ blueox.default_configure( ++ self.blueox_pycernan_host, recorder=rec) ++ elif 'BLUEOX_HOST' in app.config: + self.blueox_host = app.config['BLUEOX_HOST'] + if self.blueox_host: + blueox.default_configure(self.blueox_host) +@@ -45,8 +51,8 @@ class BlueOxMiddleware(object): + headers = {} + for k, v in request.environ.iteritems(): + if ( +- k.startswith('HTTP_') or k in +- ('CONTENT_LENGTH', 'CONTENT_TYPE')): ++ k.startswith('HTTP_') or ++ k in ('CONTENT_LENGTH', 'CONTENT_TYPE')): + headers[k] = v + + blueox.set('headers', headers) +diff --git a/blueox/logger.py b/blueox/logger.py +index 37d2180..6a7c025 100644 +--- a/blueox/logger.py ++++ b/blueox/logger.py +@@ -3,7 +3,9 @@ + blueox.logger + ~~~~~~~~ + +-This module provides integration with blueox and standard python logging module. ++This module provides integration with blueox and standard ++python logging module. ++ + :copyright: (c) 2012 by Rhett Garber + :license: ISC, see LICENSE for more details. + +@@ -20,7 +22,8 @@ class LogHandler(logging.Handler): + Records standard fields such as logger name, level the message and if an + exception was provided, the string formatted exception. + +- The type name, if not specified will be something like '.log' ++ The type name, if not specified will be something like ++ '.log' + """ + + def __init__(self, type_name=None): +diff --git a/blueox/ports.py b/blueox/ports.py +index 5b1ca2f..d977847 100644 +--- a/blueox/ports.py ++++ b/blueox/ports.py +@@ -28,7 +28,7 @@ def _default_host(host, default_host, default_port): + if not host: + host = default_host + if ':' not in host: +- host = "{}:{}".format(host, default_port) ++ host = '{}:{}'.format(host, default_port) + + return host + +@@ -41,3 +41,13 @@ def default_control_host(host=None): + def default_collect_host(host=None): + default_host = os.environ.get(ENV_VAR_COLLECT_HOST, DEFAULT_HOST) + return _default_host(host, default_host, DEFAULT_COLLECT_PORT) ++ ++ ++# For consistency, we'll abstract pycernan connections in the same way ++ENV_VAR_PYCERNAN_HOST = 'BLUEOX_PYCERNAN_HOST' ++DEFAULT_PYCERNAN_PORT = 2003 ++ ++ ++def default_pycernan_host(host=None): ++ default_host = os.environ.get(ENV_VAR_PYCERNAN_HOST, DEFAULT_HOST) ++ return _default_host(host, default_host, DEFAULT_PYCERNAN_PORT) +diff --git a/blueox/recorders/__init__.py b/blueox/recorders/__init__.py +new file mode 100644 +index 0000000..e69de29 +diff --git a/blueox/recorders/pycernan.py b/blueox/recorders/pycernan.py +new file mode 100644 +index 0000000..1edfd5b +--- /dev/null ++++ b/blueox/recorders/pycernan.py +@@ -0,0 +1,139 @@ ++# -*- coding: utf-8 -*- ++""" ++blueox.recorders.pycernan ++~~~~~~~~ ++ ++This module provides the interface into pycernan ++ ++:copyright: (c) 2018 by Aaron Biller?? ++:license: ISC, see LICENSE for more details. ++ ++""" ++from __future__ import absolute_import ++ ++import atexit ++import datetime ++import decimal ++import json ++import logging ++import os ++import threading ++ ++from pycernan.avro import Client ++ ++from blueox import ports ++ ++log = logging.getLogger(__name__) ++ ++_uname = os.uname()[1] ++ ++# Global blueox avro schema definition ++BLUEOX_AVRO_RECORD = { ++ "doc": "A BlueOx event", ++ "name": "blueox_event", ++ "namespace": "blueox.{}".format(_uname), ++ "type": "record", ++ "fields": [ ++ {"name": "id", "type": "string"}, ++ {"name": "type", "type": "string"}, ++ {"name": "host", "type": "string"}, ++ {"name": "pid", "type": "long"}, ++ {"name": "start", "type": "double"}, ++ {"name": "end", "type": "double"}, ++ {"name": "body", "type": ["null", "string"], "default": "null"} ++ ] ++} ++ ++ ++def default_host(host=None): ++ """Build a default host string for pycernan ++ """ ++ return ports.default_pycernan_host(host) ++ ++ ++def _serializer(obj): ++ """Serialize native python objects ++ """ ++ if isinstance(obj, (datetime.datetime, datetime.date)): ++ return obj.isoformat() ++ elif isinstance(obj, decimal.Decimal): ++ return float(obj) ++ try: ++ obj = str(obj) ++ except Exception: ++ raise TypeError(repr(obj) + ' is not JSON serializable') ++ return obj ++ ++ ++threadLocal = threading.local() ++ ++# Context can be shared between threads ++_client = None ++ ++ ++def init(host, port): ++ global _client ++ ++ _client = Client(host=host, port=port) ++ ++ ++def _thread_connect(): ++ if _client and not getattr(threadLocal, 'client', None): ++ threadLocal.client = _client ++ ++ ++def _serialize_context(context): ++ context_dict = context.to_dict() ++ for key in ('host', 'type'): ++ if len(context_dict.get(key, '')) > 64: ++ raise ValueError('Value too long: %r' % key) ++ ++ context_dict['id'] = str(context_dict['id']) ++ ++ body = context_dict.get('body', None) ++ if body is not None: ++ try: ++ context_dict['body'] = json.dumps(body, default=_serializer) ++ except (TypeError, ValueError): ++ try: ++ context_dict['body'] = unicode(body) ++ except Exception: ++ log.exception( ++ 'Serialization failure (not fatal, dropping data)') ++ context_dict['body'] = None ++ ++ context_dict = { ++ k: v.encode('utf-8') if isinstance(v, unicode) ++ else v for k, v in context_dict.items() ++ } ++ ++ return context_dict ++ ++ ++def send(context): ++ _thread_connect() ++ ++ try: ++ context_data = [_serialize_context(context)] ++ except Exception: ++ log.exception('Failed to serialize context') ++ return ++ ++ if _client and threadLocal.client is not None: ++ try: ++ log.debug('Sending msg') ++ threadLocal.client.publish( ++ BLUEOX_AVRO_RECORD, context_data, sync=False) ++ except Exception: ++ log.exception('Failed during publish to pycernan.') ++ else: ++ log.info('Skipping sending event %s', context.name) ++ ++ ++def close(): ++ if getattr(threadLocal, 'client', None): ++ threadLocal.client.close() ++ threadLocal.client = None ++ ++ ++atexit.register(close) +diff --git a/blueox/network.py b/blueox/recorders/zmq.py +similarity index 94% +rename from blueox/network.py +rename to blueox/recorders/zmq.py +index ede1a1e..fa02207 100644 +--- a/blueox/network.py ++++ b/blueox/recorders/zmq.py +@@ -1,6 +1,6 @@ + # -*- coding: utf-8 -*- + """ +-blueox.network ++blueox.recorders.zmq + ~~~~~~~~ + + This module provides our interface into ZeroMQ +@@ -9,6 +9,8 @@ This module provides our interface into ZeroMQ + :license: ISC, see LICENSE for more details. + + """ ++from __future__ import absolute_import ++ + import atexit + import logging + import msgpack +@@ -16,7 +18,8 @@ import struct + import threading + import zmq + +-from . import utils ++from blueox import ports ++from blueox import utils + + log = logging.getLogger(__name__) + +@@ -44,6 +47,12 @@ def check_meta_version(meta): + raise ValueError(value) + + ++def default_host(host=None): ++ """Build a default host string for the blueox collector ++ """ ++ return ports.default_collect_host(host) ++ ++ + threadLocal = threading.local() + + # Context can be shared between threads +diff --git a/blueox/store.py b/blueox/store.py +index 66f4f19..afeaa2f 100644 +--- a/blueox/store.py ++++ b/blueox/store.py +@@ -226,7 +226,8 @@ def list_log_files(log_path): + + + def filter_log_files_for_active(log_files): +- """Filter our list of log files to remove those we expect might be active.""" ++ """Filter our list of log files to remove those we expect might be active. ++ """ + out_log_files = [] + + files_by_type = collections.defaultdict(list) +@@ -242,11 +243,11 @@ def filter_log_files_for_active(log_files): + + out_log_files += type_files + +- # If that last log file is old, then it's probably not being used either. +- # We add a buffer of an hour just to make sure everything has rotated +- # away safely when this is run close to midnight. +- cutoff_date = (datetime.datetime.utcnow() - datetime.timedelta(hours=1) +- ).date() ++ # If that last log file is old, then it's probably not being used ++ # either. We add a buffer of an hour just to make sure everything has ++ # rotated away safely when this is run close to midnight. ++ cutoff_date = ( ++ datetime.datetime.utcnow() - datetime.timedelta(hours=1)).date() + if last_lf.date < cutoff_date: + out_log_files.append(last_lf) + +diff --git a/blueox/timer.py b/blueox/timer.py +index dc0cf18..77a22ff 100644 +--- a/blueox/timer.py ++++ b/blueox/timer.py +@@ -3,7 +3,9 @@ + blueox.timer + ~~~~~~~~ + +-This module has a timer context manager for easily tracking wall-clock time for some execution ++This module has a timer context manager for easily tracking wall-clock ++time for some execution ++ + :copyright: (c) 2012 by Rhett Garber + :license: ISC, see LICENSE for more details. + +diff --git a/blueox/tornado_utils.py b/blueox/tornado_utils.py +index ffbfa9b..af3e2c4 100644 +--- a/blueox/tornado_utils.py ++++ b/blueox/tornado_utils.py +@@ -5,8 +5,8 @@ blueox.tornado + + This module provides hooks for using blueox with the Tornado async web server. + Making blueox useful inside tornado is a challenge since you'll likely want a +-blueox context per request, but multiple requests can be going on at once inside +-tornado. ++blueox context per request, but multiple requests can be going on at once ++inside tornado. + + :copyright: (c) 2012 by Rhett Garber + :license: ISC, see LICENSE for more details. +@@ -19,8 +19,6 @@ import types + import sys + import time + +-log = logging.getLogger(__name__) +- + import tornado.web + import tornado.gen + import tornado.httpclient +@@ -29,6 +27,8 @@ import tornado.stack_context + + import blueox + ++log = logging.getLogger(__name__) ++ + + def _gen_wrapper(ctx, generator): + """Generator Wrapper that starts/stops our context +@@ -112,7 +112,8 @@ class BlueOxRequestHandlerMixin(object): + + class SampleRequestHandler(BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): +- """Sample base request handler that provides basic information about the request. ++ """Sample base request handler that provides basic ++ information about the request. + """ + + def prepare(self): +@@ -123,8 +124,8 @@ class SampleRequestHandler(BlueOxRequestHandlerMixin, + + def write_error(self, status_code, **kwargs): + if 'exc_info' in kwargs: +- blueox.set('exception', +- ''.join(traceback.format_exception(*kwargs["exc_info"]))) ++ blueox.set('exception', ''.join( ++ traceback.format_exception(*kwargs["exc_info"]))) + + return super(SampleRequestHandler, self).write_error(status_code, + **kwargs) +@@ -159,15 +160,16 @@ class AsyncHTTPClient(tornado.simple_httpclient.SimpleAsyncHTTPClient): + ctx.stop() + + # I'd love to use the future to handle the completion step, BUT, we +- # need this to happen first. If the caller has provided a callback, we don't want them +- # to get called before we do. Rather than poke into the internal datastructures, we'll just +- # handle the callback explicitly ++ # need this to happen first. If the caller has provided a callback, we ++ # don't want them to get called before we do. Rather than poke into the ++ # internal datastructures, we'll just handle the callback explicitly + + def complete_context(response): + ctx.start() + + ctx.set('response.code', response.code) +- ctx.set('response.size', len(response.body) if response.body else 0) ++ ctx.set('response.size', ++ len(response.body) if response.body else 0) + + ctx.done() + +@@ -175,12 +177,12 @@ class AsyncHTTPClient(tornado.simple_httpclient.SimpleAsyncHTTPClient): + + def fetch_complete(future): + # This error handling is just copied from tornado.httpclient as +- # we need to record a real HTTPError. httpclient might do the same thing +- # again if needs to deal with the caller's callbacks. ++ # we need to record a real HTTPError. httpclient might do the ++ # same thing again if needs to deal with the caller's callbacks + exc = future.exception() +- if isinstance( +- exc, +- tornado.httpclient.HTTPError) and exc.response is not None: ++ if ( ++ isinstance(exc, tornado.httpclient.HTTPError) and ++ exc.response is not None): + response = exc.response + elif exc is not None: + response = tornado.httpclient.HTTPResponse( +diff --git a/requirements.txt b/requirements.txt +index dfdd0e7..4e7e345 100644 +--- a/requirements.txt ++++ b/requirements.txt +@@ -4,3 +4,4 @@ pyflakes + tornado==3.2 + boto + yapf ++./vendor/pycernan-0.0.10.zip +diff --git a/tests/ports_test.py b/tests/ports_test.py +index c7d278c..0c5aeb5 100644 +--- a/tests/ports_test.py ++++ b/tests/ports_test.py +@@ -1,5 +1,8 @@ + import os +-from testify import * ++from testify import ( ++ TestCase, ++ assert_equal, ++ teardown) + + from blueox import ports + +@@ -71,3 +74,31 @@ class DefaultCollectHost(TestCase): + os.environ['BLUEOX_HOST'] = 'master:123' + host = ports.default_collect_host() + assert_equal(host, "master:123") ++ ++ ++class DefaultPycernanHost(TestCase): ++ @teardown ++ def clear_env(self): ++ try: ++ del os.environ['BLUEOX_PYCERNAN_HOST'] ++ except KeyError: ++ pass ++ ++ def test_emtpy(self): ++ host = ports.default_pycernan_host() ++ assert_equal(host, '127.0.0.1:2003') ++ ++ def test_env(self): ++ os.environ['BLUEOX_PYCERNAN_HOST'] = 'local.svc.team-me.aws.jk8s' ++ host = ports.default_pycernan_host() ++ assert_equal(host, 'local.svc.team-me.aws.jk8s:2003') ++ ++ def test_env_port(self): ++ os.environ['BLUEOX_PYCERNAN_HOST'] = 'local.svc.team-me.aws.jk8s:2003' ++ host = ports.default_pycernan_host() ++ assert_equal(host, 'local.svc.team-me.aws.jk8s:2003') ++ ++ def test_passed(self): ++ _host = 'my.wish.is.your.command' ++ host = ports.default_pycernan_host(_host) ++ assert_equal(host, 'my.wish.is.your.command:2003') +diff --git a/tests/recorders/__init__.py b/tests/recorders/__init__.py +new file mode 100644 +index 0000000..e69de29 +diff --git a/tests/recorders/pycernan_test.py b/tests/recorders/pycernan_test.py +new file mode 100644 +index 0000000..56ef550 +--- /dev/null ++++ b/tests/recorders/pycernan_test.py +@@ -0,0 +1,164 @@ ++import datetime ++import decimal ++import json ++import random ++ ++from testify import ( ++ TestCase, ++ setup, ++ teardown, ++ assert_equal, ++ assert_raises) ++ ++from pycernan.avro.serde import serialize ++from pycernan.avro.exceptions import DatumTypeException ++ ++from blueox import default_configure, PYCERNAN_RECORDER ++from blueox import utils ++from blueox import context ++from blueox.recorders import pycernan as pycernan_rec ++from blueox.recorders import zmq ++ ++ ++class MockPycernanClient(object): ++ last_schema = None ++ last_batch = None ++ last_sync = None ++ ++ def __call__(self, host=None, port=None): ++ self.host = host ++ self.port = port ++ return self ++ ++ def publish(self, schema, batch, sync=None): ++ self.last_schema = schema ++ self.last_batch = batch ++ self.last_sync = sync ++ ++ def close(self): ++ pass ++ ++ ++class CantSerializeMe(object): ++ def __repr__(self): ++ return chr(167) ++ ++ ++class PycernanOverrideTestCase(TestCase): ++ def test_configure_no_override(self): ++ default_configure() ++ assert_equal(context._recorder_function, zmq.send) ++ ++ def test_configure_override(self): ++ pycernan_rec.Client = MockPycernanClient() ++ default_configure(recorder=PYCERNAN_RECORDER) ++ assert_equal(context._recorder_function, pycernan_rec.send) ++ ++ ++class PycernanSendTestCase(TestCase): ++ @setup ++ def build_context(self): ++ self.context = context.Context('test', 1) ++ ++ @setup ++ def init_pycernan(self): ++ self.port = random.randint(30000, 40000) ++ self.client = MockPycernanClient() ++ pycernan_rec.Client = self.client ++ pycernan_rec.init('127.0.0.1', self.port) ++ ++ @setup ++ def configure_pycernan(self): ++ context._recorder_function = pycernan_rec.send ++ ++ @teardown ++ def unconfigure_pycernan(self): ++ context._recorder_function = None ++ ++ @teardown ++ def destroy_recorder(self): ++ pycernan_rec.close() ++ ++ def test(self): ++ with self.context: ++ self.context.set('foo', True) ++ self.context.set('bar.baz', 10.0) ++ ++ data = self.client.last_batch[0] ++ data['body'] = json.loads(data['body']) ++ assert_equal(self.client.last_schema, pycernan_rec.BLUEOX_AVRO_RECORD) ++ assert_equal(self.client.last_sync, False) ++ assert_equal(data['id'], '1') ++ assert_equal(data['type'], 'test') ++ assert_equal(utils.get_deep(data['body'], 'bar.baz'), 10.0) ++ ++ assert_equal(self.client.host, '127.0.0.1') ++ assert_equal(self.client.port, self.port) ++ ++ ++class SerializeContextTestCase(TestCase): ++ @setup ++ def build_context(self): ++ self.context = context.Context('test', 1) ++ ++ def test_types(self): ++ with self.context: ++ self.context.set('decimal_value', decimal.Decimal('6.66')) ++ self.context.set('date_value', datetime.date(2013, 12, 10)) ++ self.context.set( ++ 'datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) ++ ++ data = pycernan_rec._serialize_context(self.context) ++ data['body'] = json.loads(data['body']) ++ assert_equal(data['body']['decimal_value'], 6.66) ++ assert_equal(data['body']['date_value'], '2013-12-10') ++ assert_equal( ++ datetime.datetime.strptime( ++ data['body']['datetime_value'], '%Y-%m-%dT%H:%M:%S'), ++ datetime.datetime(2013, 12, 10, 12, 12, 12)) ++ ++ def test_exception(self): ++ with self.context: ++ self.context.set('value', CantSerializeMe()) ++ ++ data = pycernan_rec._serialize_context(self.context) ++ ++ # The serialization should fail, but that just ++ # means we don't have any data. ++ assert_equal(data['body'], None) ++ ++ ++class EncodeAvroTestCase(TestCase): ++ @setup ++ def build_context(self): ++ self.context = context.Context('test', 1) ++ ++ def test_success(self): ++ with self.context: ++ self.context.set('foo', True) ++ self.context.set('bar.baz', 10.0) ++ ++ data = pycernan_rec._serialize_context(self.context) ++ serialize(pycernan_rec.BLUEOX_AVRO_RECORD, [data]) ++ ++ def test_failure(self): ++ with self.context: ++ self.context.set('foo', True) ++ self.context.set('bar.baz', 10.0) ++ self.context.set('decimal_value', decimal.Decimal('6.66')) ++ self.context.set('date_value', datetime.date(2013, 12, 10)) ++ self.context.set( ++ 'datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) ++ ++ data = pycernan_rec._serialize_context(self.context) ++ data['host'] = None ++ with assert_raises(DatumTypeException): ++ serialize(pycernan_rec.BLUEOX_AVRO_RECORD, [data]) ++ ++ def test_none_body(self): ++ with self.context: ++ self.context.set('bad_char', CantSerializeMe()) ++ ++ data = pycernan_rec._serialize_context(self.context) ++ assert_equal(data['body'], None) ++ serialize(pycernan_rec.BLUEOX_AVRO_RECORD, [data]) +diff --git a/tests/network_test.py b/tests/recorders/zmq_test.py +similarity index 72% +rename from tests/network_test.py +rename to tests/recorders/zmq_test.py +index dbfa4c2..c6ae28a 100644 +--- a/tests/network_test.py ++++ b/tests/recorders/zmq_test.py +@@ -3,18 +3,24 @@ import struct + import decimal + import datetime + +-from testify import * ++from testify import ( ++ TestCase, ++ setup, ++ teardown, ++ assert_equal) + import zmq + import msgpack + + from blueox import utils +-from blueox import network + from blueox import context ++from blueox.recorders import zmq as zmq_rec ++ + + class NoNetworkSendTestCase(TestCase): + def test(self): + """Verify that if network isn't setup, send just does nothing""" +- network.send(context.Context('test', 1)) ++ zmq_rec.send(context.Context('test', 1)) ++ + + class NetworkSendTestCase(TestCase): + @setup +@@ -24,11 +30,11 @@ class NetworkSendTestCase(TestCase): + @setup + def init_network(self): + self.port = random.randint(30000, 40000) +- network.init("127.0.0.1", self.port) ++ zmq_rec.init("127.0.0.1", self.port) + + @setup + def configure_network(self): +- context._recorder_function = network.send ++ context._recorder_function = zmq_rec.send + + @teardown + def unconfigure_network(self): +@@ -36,7 +42,7 @@ class NetworkSendTestCase(TestCase): + + @setup + def build_server_socket(self): +- self.server = network._zmq_context.socket(zmq.PULL) ++ self.server = zmq_rec._zmq_context.socket(zmq.PULL) + self.server.bind("tcp://127.0.0.1:%d" % self.port) + + @teardown +@@ -45,7 +51,7 @@ class NetworkSendTestCase(TestCase): + + @teardown + def destory_network(self): +- network.close() ++ zmq_rec.close() + + def test(self): + with self.context: +@@ -53,8 +59,9 @@ class NetworkSendTestCase(TestCase): + self.context.set('bar.baz', 10.0) + + event_meta, raw_data = self.server.recv_multipart() +- network.check_meta_version(event_meta) +- _, event_time, event_host, event_type = struct.unpack(network.META_STRUCT_FMT, event_meta) ++ zmq_rec.check_meta_version(event_meta) ++ _, event_time, event_host, event_type = struct.unpack( ++ zmq_rec.META_STRUCT_FMT, event_meta) + assert_equal(event_type, 'test') + + data = msgpack.unpackb(raw_data) +@@ -72,26 +79,25 @@ class SerializeContextTestCase(TestCase): + with self.context: + self.context.set('decimal_value', decimal.Decimal("6.66")) + self.context.set('date_value', datetime.date(2013, 12, 10)) +- self.context.set('datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) ++ self.context.set( ++ 'datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) + +- meta_data, context_data = network._serialize_context(self.context) ++ meta_data, context_data = zmq_rec._serialize_context(self.context) + data = msgpack.unpackb(context_data) + assert_equal(data['body']['decimal_value'], "6.66") + assert_equal(data['body']['date_value'], "2013-12-10") + assert_equal( +- datetime.datetime.fromtimestamp(float(data['body']['datetime_value'])), ++ datetime.datetime.fromtimestamp( ++ float(data['body']['datetime_value'])), + datetime.datetime(2013, 12, 10, 12, 12, 12)) + + def test_exception(self): + with self.context: + self.context.set('value', Exception('hello')) + +- meta_data, context_data = network._serialize_context(self.context) ++ meta_data, context_data = zmq_rec._serialize_context(self.context) + data = msgpack.unpackb(context_data) + + # The serialization should fail, but that just means we don't have any + # data. + assert_equal(data['body'], None) +- +- +- +diff --git a/tests/tornado_utils_test.py b/tests/tornado_utils_test.py +index e6aedd4..085da93 100644 +--- a/tests/tornado_utils_test.py ++++ b/tests/tornado_utils_test.py +@@ -1,9 +1,8 @@ + import time +-import pprint + import random + import collections + import traceback +-from testify import * ++from testify import assert_equal, setup + + import tornado.ioloop + import tornado.gen +@@ -14,7 +13,10 @@ from blueox.utils import get_deep + # vendor module. Tornado testing in Testify + import tornado_test + +-class AsyncHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): ++ ++class AsyncHandler( ++ blueox.tornado_utils.BlueOxRequestHandlerMixin, ++ tornado.web.RequestHandler): + @blueox.tornado_utils.coroutine + def get(self): + loop = self.request.connection.stream.io_loop +@@ -22,7 +24,8 @@ class AsyncHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.R + req_id = self.blueox_ctx.id + blueox.set('async', True) + +- result = yield blueox.tornado_utils.AsyncHTTPClient(loop).fetch(self.application.test_url) ++ result = yield blueox.tornado_utils.AsyncHTTPClient(loop).fetch( ++ self.application.test_url) + assert result.code == 200 + + with blueox.Context('.extra'): +@@ -32,31 +35,40 @@ class AsyncHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.R + self.finish() + + +-class AsyncErrorHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): ++class AsyncErrorHandler( ++ blueox.tornado_utils.BlueOxRequestHandlerMixin, ++ tornado.web.RequestHandler): + @blueox.tornado_utils.coroutine + def get(self): + loop = self.request.connection.stream.io_loop + +- called = yield tornado.gen.Task(loop.add_timeout, time.time() + random.randint(1, 2)) ++ _ = yield tornado.gen.Task(loop.add_timeout, time.time() ++ + random.randint(1, 2)) + + raise Exception('hi') + + def write_error(self, status_code, **kwargs): + if 'exc_info' in kwargs: +- blueox.set('exception', ''.join(traceback.format_exception(*kwargs["exc_info"]))) ++ blueox.set('exception', ''.join( ++ traceback.format_exception(*kwargs["exc_info"]))) + +- return super(AsyncErrorHandler, self).write_error(status_code, **kwargs) ++ return super(AsyncErrorHandler, self).write_error(status_code, ++ **kwargs) + + +-class AsyncTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): ++class AsyncTimeoutHandler( ++ blueox.tornado_utils.BlueOxRequestHandlerMixin, ++ tornado.web.RequestHandler): + @blueox.tornado_utils.coroutine + def get(self): + loop = self.request.connection.stream.io_loop + +- called = yield tornado.gen.Task(loop.add_timeout, time.time() + 1.0) ++ _ = yield tornado.gen.Task(loop.add_timeout, time.time() + 1.0) + + +-class AsyncRecurseTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): ++class AsyncRecurseTimeoutHandler( ++ blueox.tornado_utils.BlueOxRequestHandlerMixin, ++ tornado.web.RequestHandler): + @blueox.tornado_utils.coroutine + def post(self): + loop = self.request.connection.stream.io_loop +@@ -64,8 +76,8 @@ class AsyncRecurseTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, + + blueox.set("start", True) + try: +- f = yield http_client.fetch(self.request.body, request_timeout=0.5) +- except tornado.httpclient.HTTPError, e: ++ _ = yield http_client.fetch(self.request.body, request_timeout=0.5) ++ except tornado.httpclient.HTTPError: + self.write("got it") + else: + self.write("nope") +@@ -73,13 +85,14 @@ class AsyncRecurseTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, + blueox.set("end", True) + + +-class MainHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): ++class MainHandler( ++ blueox.tornado_utils.BlueOxRequestHandlerMixin, ++ tornado.web.RequestHandler): + def get(self): + blueox.set('async', False) + self.write("Hello, world") + + +- + class SimpleTestCase(tornado_test.AsyncHTTPTestCase): + @setup + def setup_bluox(self): +@@ -112,11 +125,6 @@ class SimpleTestCase(tornado_test.AsyncHTTPTestCase): + f = self.http_client.fetch(self.get_url("/error"), self.stop) + resp = self.wait() + +- #for ctx_id in self.log_ctx: +- #print ctx_id +- #for ctx in self.log_ctx[ctx_id]: +- #pprint.pprint(ctx.to_dict()) +- + assert_equal(len(self.log_ctx), 2) + + found_exception = False +@@ -128,31 +136,22 @@ class SimpleTestCase(tornado_test.AsyncHTTPTestCase): + assert found_exception + + def test_timeout_error(self): +- f = self.http_client.fetch(self.get_url("/timeout"), self.stop, request_timeout=0.5) ++ f = self.http_client.fetch( ++ self.get_url("/timeout"), self.stop, request_timeout=0.5) + resp = self.wait() + +- #for ctx_id in self.log_ctx: +- #print ctx_id +- #for ctx in self.log_ctx[ctx_id]: +- #pprint.pprint(ctx.to_dict()) +- + assert_equal(len(self.log_ctx), 1) + ctx = self.log_ctx[self.log_ctx.keys()[0]][0] + assert_equal(get_deep(ctx.to_dict(), 'body.response.code'), 599) + + def test_recurse_timeout_error(self): + url = self.get_url("/timeout") +- f = self.http_client.fetch(self.get_url("/recurse_timeout"), self.stop, ++ _ = self.http_client.fetch(self.get_url("/recurse_timeout"), self.stop, + body=url, + method="POST", + request_timeout=1.5) + resp = self.wait() + +- #for ctx_id in self.log_ctx: +- #print ctx_id +- #for ctx in self.log_ctx[ctx_id]: +- #pprint.pprint(ctx.to_dict()) +- + assert_equal(resp.code, 200) + assert_equal(resp.body, "got it") + +@@ -161,7 +160,9 @@ class SimpleTestCase(tornado_test.AsyncHTTPTestCase): + for ctx_list in self.log_ctx.values(): + for ctx in ctx_list: + c = ctx.to_dict() +- if c['type'] == 'request.httpclient' and c['body']['response']['code'] == 599: ++ if ( ++ c['type'] == 'request.httpclient' and ++ c['body']['response']['code'] == 599): + found_timeout = True + + if c['type'] == 'request' and get_deep(c, 'body.start'): +@@ -175,13 +176,8 @@ class SimpleTestCase(tornado_test.AsyncHTTPTestCase): + self.http_client.fetch(self.get_url("/async"), self.stop) + resp = self.wait() + +- #for ctx_id in self.log_ctx: +- #print +- #print ctx_id +- #for ctx in self.log_ctx[ctx_id]: +- #pprint.pprint(ctx.to_dict()) +- +- # If everything worked properly, we should have two separate ids, one will have two contexts associated with it. ++ # If everything worked properly, we should have two separate ids, ++ # one will have two contexts associated with it. + # Hopefully it's the right one. + found_sync = None + found_async = None +@@ -191,7 +187,9 @@ class SimpleTestCase(tornado_test.AsyncHTTPTestCase): + if ctx.name == "request" and ctx.to_dict()['body']['async']: + assert_equal(len(ctx_list), 3) + found_async = ctx +- if ctx.name == "request" and not ctx.to_dict()['body']['async']: ++ if ( ++ ctx.name == "request" and ++ not ctx.to_dict()['body']['async']): + assert_equal(len(ctx_list), 1) + found_sync = ctx + if ctx.name.endswith("httpclient"): diff --git a/requirements.txt b/requirements.txt index dfdd0e7..4e7e345 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ pyflakes tornado==3.2 boto yapf +./vendor/pycernan-0.0.10.zip diff --git a/tests/ports_test.py b/tests/ports_test.py index c7d278c..0c5aeb5 100644 --- a/tests/ports_test.py +++ b/tests/ports_test.py @@ -1,5 +1,8 @@ import os -from testify import * +from testify import ( + TestCase, + assert_equal, + teardown) from blueox import ports @@ -71,3 +74,31 @@ def test_env_port(self): os.environ['BLUEOX_HOST'] = 'master:123' host = ports.default_collect_host() assert_equal(host, "master:123") + + +class DefaultPycernanHost(TestCase): + @teardown + def clear_env(self): + try: + del os.environ['BLUEOX_PYCERNAN_HOST'] + except KeyError: + pass + + def test_emtpy(self): + host = ports.default_pycernan_host() + assert_equal(host, '127.0.0.1:2003') + + def test_env(self): + os.environ['BLUEOX_PYCERNAN_HOST'] = 'local.svc.team-me.aws.jk8s' + host = ports.default_pycernan_host() + assert_equal(host, 'local.svc.team-me.aws.jk8s:2003') + + def test_env_port(self): + os.environ['BLUEOX_PYCERNAN_HOST'] = 'local.svc.team-me.aws.jk8s:2003' + host = ports.default_pycernan_host() + assert_equal(host, 'local.svc.team-me.aws.jk8s:2003') + + def test_passed(self): + _host = 'my.wish.is.your.command' + host = ports.default_pycernan_host(_host) + assert_equal(host, 'my.wish.is.your.command:2003') diff --git a/tests/recorders/__init__.py b/tests/recorders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/recorders/pycernan_test.py b/tests/recorders/pycernan_test.py new file mode 100644 index 0000000..56ef550 --- /dev/null +++ b/tests/recorders/pycernan_test.py @@ -0,0 +1,164 @@ +import datetime +import decimal +import json +import random + +from testify import ( + TestCase, + setup, + teardown, + assert_equal, + assert_raises) + +from pycernan.avro.serde import serialize +from pycernan.avro.exceptions import DatumTypeException + +from blueox import default_configure, PYCERNAN_RECORDER +from blueox import utils +from blueox import context +from blueox.recorders import pycernan as pycernan_rec +from blueox.recorders import zmq + + +class MockPycernanClient(object): + last_schema = None + last_batch = None + last_sync = None + + def __call__(self, host=None, port=None): + self.host = host + self.port = port + return self + + def publish(self, schema, batch, sync=None): + self.last_schema = schema + self.last_batch = batch + self.last_sync = sync + + def close(self): + pass + + +class CantSerializeMe(object): + def __repr__(self): + return chr(167) + + +class PycernanOverrideTestCase(TestCase): + def test_configure_no_override(self): + default_configure() + assert_equal(context._recorder_function, zmq.send) + + def test_configure_override(self): + pycernan_rec.Client = MockPycernanClient() + default_configure(recorder=PYCERNAN_RECORDER) + assert_equal(context._recorder_function, pycernan_rec.send) + + +class PycernanSendTestCase(TestCase): + @setup + def build_context(self): + self.context = context.Context('test', 1) + + @setup + def init_pycernan(self): + self.port = random.randint(30000, 40000) + self.client = MockPycernanClient() + pycernan_rec.Client = self.client + pycernan_rec.init('127.0.0.1', self.port) + + @setup + def configure_pycernan(self): + context._recorder_function = pycernan_rec.send + + @teardown + def unconfigure_pycernan(self): + context._recorder_function = None + + @teardown + def destroy_recorder(self): + pycernan_rec.close() + + def test(self): + with self.context: + self.context.set('foo', True) + self.context.set('bar.baz', 10.0) + + data = self.client.last_batch[0] + data['body'] = json.loads(data['body']) + assert_equal(self.client.last_schema, pycernan_rec.BLUEOX_AVRO_RECORD) + assert_equal(self.client.last_sync, False) + assert_equal(data['id'], '1') + assert_equal(data['type'], 'test') + assert_equal(utils.get_deep(data['body'], 'bar.baz'), 10.0) + + assert_equal(self.client.host, '127.0.0.1') + assert_equal(self.client.port, self.port) + + +class SerializeContextTestCase(TestCase): + @setup + def build_context(self): + self.context = context.Context('test', 1) + + def test_types(self): + with self.context: + self.context.set('decimal_value', decimal.Decimal('6.66')) + self.context.set('date_value', datetime.date(2013, 12, 10)) + self.context.set( + 'datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) + + data = pycernan_rec._serialize_context(self.context) + data['body'] = json.loads(data['body']) + assert_equal(data['body']['decimal_value'], 6.66) + assert_equal(data['body']['date_value'], '2013-12-10') + assert_equal( + datetime.datetime.strptime( + data['body']['datetime_value'], '%Y-%m-%dT%H:%M:%S'), + datetime.datetime(2013, 12, 10, 12, 12, 12)) + + def test_exception(self): + with self.context: + self.context.set('value', CantSerializeMe()) + + data = pycernan_rec._serialize_context(self.context) + + # The serialization should fail, but that just + # means we don't have any data. + assert_equal(data['body'], None) + + +class EncodeAvroTestCase(TestCase): + @setup + def build_context(self): + self.context = context.Context('test', 1) + + def test_success(self): + with self.context: + self.context.set('foo', True) + self.context.set('bar.baz', 10.0) + + data = pycernan_rec._serialize_context(self.context) + serialize(pycernan_rec.BLUEOX_AVRO_RECORD, [data]) + + def test_failure(self): + with self.context: + self.context.set('foo', True) + self.context.set('bar.baz', 10.0) + self.context.set('decimal_value', decimal.Decimal('6.66')) + self.context.set('date_value', datetime.date(2013, 12, 10)) + self.context.set( + 'datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) + + data = pycernan_rec._serialize_context(self.context) + data['host'] = None + with assert_raises(DatumTypeException): + serialize(pycernan_rec.BLUEOX_AVRO_RECORD, [data]) + + def test_none_body(self): + with self.context: + self.context.set('bad_char', CantSerializeMe()) + + data = pycernan_rec._serialize_context(self.context) + assert_equal(data['body'], None) + serialize(pycernan_rec.BLUEOX_AVRO_RECORD, [data]) diff --git a/tests/network_test.py b/tests/recorders/zmq_test.py similarity index 72% rename from tests/network_test.py rename to tests/recorders/zmq_test.py index dbfa4c2..c6ae28a 100644 --- a/tests/network_test.py +++ b/tests/recorders/zmq_test.py @@ -3,18 +3,24 @@ import decimal import datetime -from testify import * +from testify import ( + TestCase, + setup, + teardown, + assert_equal) import zmq import msgpack from blueox import utils -from blueox import network from blueox import context +from blueox.recorders import zmq as zmq_rec + class NoNetworkSendTestCase(TestCase): def test(self): """Verify that if network isn't setup, send just does nothing""" - network.send(context.Context('test', 1)) + zmq_rec.send(context.Context('test', 1)) + class NetworkSendTestCase(TestCase): @setup @@ -24,11 +30,11 @@ def build_context(self): @setup def init_network(self): self.port = random.randint(30000, 40000) - network.init("127.0.0.1", self.port) + zmq_rec.init("127.0.0.1", self.port) @setup def configure_network(self): - context._recorder_function = network.send + context._recorder_function = zmq_rec.send @teardown def unconfigure_network(self): @@ -36,7 +42,7 @@ def unconfigure_network(self): @setup def build_server_socket(self): - self.server = network._zmq_context.socket(zmq.PULL) + self.server = zmq_rec._zmq_context.socket(zmq.PULL) self.server.bind("tcp://127.0.0.1:%d" % self.port) @teardown @@ -45,7 +51,7 @@ def destroy_server(self): @teardown def destory_network(self): - network.close() + zmq_rec.close() def test(self): with self.context: @@ -53,8 +59,9 @@ def test(self): self.context.set('bar.baz', 10.0) event_meta, raw_data = self.server.recv_multipart() - network.check_meta_version(event_meta) - _, event_time, event_host, event_type = struct.unpack(network.META_STRUCT_FMT, event_meta) + zmq_rec.check_meta_version(event_meta) + _, event_time, event_host, event_type = struct.unpack( + zmq_rec.META_STRUCT_FMT, event_meta) assert_equal(event_type, 'test') data = msgpack.unpackb(raw_data) @@ -72,26 +79,25 @@ def test_types(self): with self.context: self.context.set('decimal_value', decimal.Decimal("6.66")) self.context.set('date_value', datetime.date(2013, 12, 10)) - self.context.set('datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) + self.context.set( + 'datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12)) - meta_data, context_data = network._serialize_context(self.context) + meta_data, context_data = zmq_rec._serialize_context(self.context) data = msgpack.unpackb(context_data) assert_equal(data['body']['decimal_value'], "6.66") assert_equal(data['body']['date_value'], "2013-12-10") assert_equal( - datetime.datetime.fromtimestamp(float(data['body']['datetime_value'])), + datetime.datetime.fromtimestamp( + float(data['body']['datetime_value'])), datetime.datetime(2013, 12, 10, 12, 12, 12)) def test_exception(self): with self.context: self.context.set('value', Exception('hello')) - meta_data, context_data = network._serialize_context(self.context) + meta_data, context_data = zmq_rec._serialize_context(self.context) data = msgpack.unpackb(context_data) # The serialization should fail, but that just means we don't have any # data. assert_equal(data['body'], None) - - - diff --git a/tests/tornado_utils_test.py b/tests/tornado_utils_test.py index e6aedd4..085da93 100644 --- a/tests/tornado_utils_test.py +++ b/tests/tornado_utils_test.py @@ -1,9 +1,8 @@ import time -import pprint import random import collections import traceback -from testify import * +from testify import assert_equal, setup import tornado.ioloop import tornado.gen @@ -14,7 +13,10 @@ # vendor module. Tornado testing in Testify import tornado_test -class AsyncHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): + +class AsyncHandler( + blueox.tornado_utils.BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): @blueox.tornado_utils.coroutine def get(self): loop = self.request.connection.stream.io_loop @@ -22,7 +24,8 @@ def get(self): req_id = self.blueox_ctx.id blueox.set('async', True) - result = yield blueox.tornado_utils.AsyncHTTPClient(loop).fetch(self.application.test_url) + result = yield blueox.tornado_utils.AsyncHTTPClient(loop).fetch( + self.application.test_url) assert result.code == 200 with blueox.Context('.extra'): @@ -32,31 +35,40 @@ def get(self): self.finish() -class AsyncErrorHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): +class AsyncErrorHandler( + blueox.tornado_utils.BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): @blueox.tornado_utils.coroutine def get(self): loop = self.request.connection.stream.io_loop - called = yield tornado.gen.Task(loop.add_timeout, time.time() + random.randint(1, 2)) + _ = yield tornado.gen.Task(loop.add_timeout, time.time() + + random.randint(1, 2)) raise Exception('hi') def write_error(self, status_code, **kwargs): if 'exc_info' in kwargs: - blueox.set('exception', ''.join(traceback.format_exception(*kwargs["exc_info"]))) + blueox.set('exception', ''.join( + traceback.format_exception(*kwargs["exc_info"]))) - return super(AsyncErrorHandler, self).write_error(status_code, **kwargs) + return super(AsyncErrorHandler, self).write_error(status_code, + **kwargs) -class AsyncTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): +class AsyncTimeoutHandler( + blueox.tornado_utils.BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): @blueox.tornado_utils.coroutine def get(self): loop = self.request.connection.stream.io_loop - called = yield tornado.gen.Task(loop.add_timeout, time.time() + 1.0) + _ = yield tornado.gen.Task(loop.add_timeout, time.time() + 1.0) -class AsyncRecurseTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): +class AsyncRecurseTimeoutHandler( + blueox.tornado_utils.BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): @blueox.tornado_utils.coroutine def post(self): loop = self.request.connection.stream.io_loop @@ -64,8 +76,8 @@ def post(self): blueox.set("start", True) try: - f = yield http_client.fetch(self.request.body, request_timeout=0.5) - except tornado.httpclient.HTTPError, e: + _ = yield http_client.fetch(self.request.body, request_timeout=0.5) + except tornado.httpclient.HTTPError: self.write("got it") else: self.write("nope") @@ -73,13 +85,14 @@ def post(self): blueox.set("end", True) -class MainHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): +class MainHandler( + blueox.tornado_utils.BlueOxRequestHandlerMixin, + tornado.web.RequestHandler): def get(self): blueox.set('async', False) self.write("Hello, world") - class SimpleTestCase(tornado_test.AsyncHTTPTestCase): @setup def setup_bluox(self): @@ -112,11 +125,6 @@ def test_error(self): f = self.http_client.fetch(self.get_url("/error"), self.stop) resp = self.wait() - #for ctx_id in self.log_ctx: - #print ctx_id - #for ctx in self.log_ctx[ctx_id]: - #pprint.pprint(ctx.to_dict()) - assert_equal(len(self.log_ctx), 2) found_exception = False @@ -128,31 +136,22 @@ def test_error(self): assert found_exception def test_timeout_error(self): - f = self.http_client.fetch(self.get_url("/timeout"), self.stop, request_timeout=0.5) + f = self.http_client.fetch( + self.get_url("/timeout"), self.stop, request_timeout=0.5) resp = self.wait() - #for ctx_id in self.log_ctx: - #print ctx_id - #for ctx in self.log_ctx[ctx_id]: - #pprint.pprint(ctx.to_dict()) - assert_equal(len(self.log_ctx), 1) ctx = self.log_ctx[self.log_ctx.keys()[0]][0] assert_equal(get_deep(ctx.to_dict(), 'body.response.code'), 599) def test_recurse_timeout_error(self): url = self.get_url("/timeout") - f = self.http_client.fetch(self.get_url("/recurse_timeout"), self.stop, + _ = self.http_client.fetch(self.get_url("/recurse_timeout"), self.stop, body=url, method="POST", request_timeout=1.5) resp = self.wait() - #for ctx_id in self.log_ctx: - #print ctx_id - #for ctx in self.log_ctx[ctx_id]: - #pprint.pprint(ctx.to_dict()) - assert_equal(resp.code, 200) assert_equal(resp.body, "got it") @@ -161,7 +160,9 @@ def test_recurse_timeout_error(self): for ctx_list in self.log_ctx.values(): for ctx in ctx_list: c = ctx.to_dict() - if c['type'] == 'request.httpclient' and c['body']['response']['code'] == 599: + if ( + c['type'] == 'request.httpclient' and + c['body']['response']['code'] == 599): found_timeout = True if c['type'] == 'request' and get_deep(c, 'body.start'): @@ -175,13 +176,8 @@ def test_context(self): self.http_client.fetch(self.get_url("/async"), self.stop) resp = self.wait() - #for ctx_id in self.log_ctx: - #print - #print ctx_id - #for ctx in self.log_ctx[ctx_id]: - #pprint.pprint(ctx.to_dict()) - - # If everything worked properly, we should have two separate ids, one will have two contexts associated with it. + # If everything worked properly, we should have two separate ids, + # one will have two contexts associated with it. # Hopefully it's the right one. found_sync = None found_async = None @@ -191,7 +187,9 @@ def test_context(self): if ctx.name == "request" and ctx.to_dict()['body']['async']: assert_equal(len(ctx_list), 3) found_async = ctx - if ctx.name == "request" and not ctx.to_dict()['body']['async']: + if ( + ctx.name == "request" and + not ctx.to_dict()['body']['async']): assert_equal(len(ctx_list), 1) found_sync = ctx if ctx.name.endswith("httpclient"): diff --git a/vendor/pycernan-0.0.10.zip b/vendor/pycernan-0.0.10.zip new file mode 100644 index 0000000..c47a78e Binary files /dev/null and b/vendor/pycernan-0.0.10.zip differ