diff --git a/pika_pool.py b/pika_pool.py index 0488c3e..c765a00 100644 --- a/pika_pool.py +++ b/pika_pool.py @@ -67,17 +67,9 @@ import pika.exceptions -__version__ = '0.1.2' +__version__ = "0.1.3" -__all__ = [ - 'Error' - 'Timeout' - 'Overflow' - 'Connection', - 'Pool', - 'NullPool', - 'QueuedPool', -] +__all__ = ["Error" "Timeout" "Overflow" "Connection", "Pool", "NullPool", "QueuedPool"] logger = logging.getLogger(__name__) @@ -132,9 +124,7 @@ def is_connection_invalidated(cls, exc): :return: True if connection has been invalidted, otherwise False. """ - return any( - isinstance(exc, error)for error in cls.connectivity_errors - ) + return any(isinstance(exc, error) for error in cls.connectivity_errors) def __init__(self, pool, fairy): self.pool = pool @@ -242,13 +232,25 @@ def cxn_params(self): def cxn_str(self): params = self.cxn_params if params: - return '{0}:{1}/{2}'.format(params.host, params.port, params.virtual_host) + return "{0}:{1}/{2}".format( + params.host, params.port, params.virtual_host + ) def __str__(self): - return ', '.join('{0}={1}'.format(k, v) for k, v in [ - ('cxn', self.cxn_str), - ('channel', '{0}'.format(int(self.channel) if self.channel is not None else self.channel)), - ]) + return ", ".join( + "{0}={1}".format(k, v) + for k, v in [ + ("cxn", self.cxn_str), + ( + "channel", + "{0}".format( + int(self.channel) + if self.channel is not None + else self.channel + ), + ), + ] + ) def _create(self): """ @@ -274,14 +276,9 @@ class QueuedPool(Pool): Queue backed pool. """ - def __init__(self, - create, - max_size=10, - max_overflow=10, - timeout=30, - recycle=None, - stale=None, - ): + def __init__( + self, create, max_size=10, max_overflow=10, timeout=30, recycle=None, stale=None + ): """ :param max_size: Maximum number of connections to keep queued. @@ -327,11 +324,11 @@ def acquire(self, timeout=None): except Overflow: raise Timeout() if self.is_expired(fairy): - logger.info('closing expired connection - %s', fairy) + logger.info("closing expired connection - %s", fairy) self.close(fairy) return self.acquire(timeout=timeout) if self.is_stale(fairy): - logger.info('closing stale connection - %s', fairy) + logger.info("closing stale connection - %s", fairy) self.close(fairy) return self.acquire(timeout=timeout) return self.Connection(self, fairy) @@ -364,18 +361,37 @@ def _create(self): raise class Fairy(Pool.Fairy): - def __init__(self, cxn): super(QueuedPool.Fairy, self).__init__(cxn) self.released_at = self.created_at = time.time() def __str__(self): - return ', '.join('{0}={1}'.format(k, v) for k, v in [ - ('cxn', self.cxn_str), - ('channel', '{0}'.format(int(self.channel) if self.channel is not None else self.channel)), - ('created_at', '{0}'.format(datetime.fromtimestamp(self.created_at).isoformat())), - ('released_at', '{0}'.format(datetime.fromtimestamp(self.released_at).isoformat())), - ]) + return ", ".join( + "{0}={1}".format(k, v) + for k, v in [ + ("cxn", self.cxn_str), + ( + "channel", + "{0}".format( + int(self.channel) + if self.channel is not None + else self.channel + ), + ), + ( + "created_at", + "{0}".format( + datetime.fromtimestamp(self.created_at).isoformat() + ), + ), + ( + "released_at", + "{0}".format( + datetime.fromtimestamp(self.released_at).isoformat() + ), + ), + ] + ) def is_stale(self, fairy): if not self.stale: diff --git a/setup.py b/setup.py index ad0c6ef..f62259f 100644 --- a/setup.py +++ b/setup.py @@ -2,39 +2,36 @@ import setuptools +with open("pika_pool.py") as version_file: + CODE = version_file.read() + +with open("README.rst") as readme_file: + LONG_DESCRIPTION = readme_file.read() + + setuptools.setup( - name='pika-pool', - version=( - re - .compile(r".*__version__ = '(.*?)'", re.S) - .match(open('pika_pool.py').read()) - .group(1) - ), - url='https://github.com/bninja/pika-pool', - license='BSD', - author='egon', - author_email='egon@gb.com', - description='Pools for pikas.', - long_description=open('README.rst').read(), - py_modules=['pika_pool'], + name="pika-pool", + version=(re.compile(r".*__version__ = \"(.*?)\"", re.S).match(CODE).group(1)), + url="https://github.com/bninja/pika-pool", + license="BSD", + author="egon", + author_email="egon@gb.com", + description="Pools for pikas.", + long_description=LONG_DESCRIPTION, + py_modules=["pika_pool"], include_package_data=True, - platforms='any', - install_requires=[ - 'pika >=0.9,<0.11', - ], - extras_require={ - 'tests': [ - 'pytest >=2.5.2,<3', - 'pytest-cov >=1.7,<2', - ], - }, + platforms="any", + install_requires=["pika >=0.9,<0.11"], + extras_require={"tests": ["pytest >=2.5.2,<3", "pytest-cov >=1.7,<2"]}, classifiers=[ - 'Environment :: Web Environment', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: BSD License', - 'Operating System :: OS Independent', - 'Programming Language :: Python', - 'Topic :: Internet :: WWW/HTTP :: Dynamic Content', - 'Topic :: Software Development :: Libraries :: Python Modules' - ] + "Environment :: Web Environment", + "Intended Audience :: Developers", + "License :: OSI Approved :: BSD License", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Topic :: Internet :: WWW/HTTP :: Dynamic Content", + "Topic :: Software Development :: Libraries :: Python Modules", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + ], ) diff --git a/test.py b/test.py index eb1a8f9..76ca32f 100644 --- a/test.py +++ b/test.py @@ -12,25 +12,23 @@ import pika_pool -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def params(): - return pika.URLParameters('amqp://guest:guest@localhost:5672/') + return pika.URLParameters("amqp://guest:guest@localhost:5672/") -@pytest.fixture(scope='session', autouse=True) +@pytest.fixture(scope="session", autouse=True) def schema(request, params): cxn = pika.BlockingConnection(params) channel = cxn.channel() - channel.queue_declare(queue='pika_pool_test') + channel.queue_declare(queue="pika_pool_test") -consumed = { -} +consumed = {} -@pytest.fixture(scope='session', autouse=True) +@pytest.fixture(scope="session", autouse=True) def consume(params): - def _callback(ch, method, properties, body): msg = Message.from_json(body) consumed[msg.id] = msg @@ -40,8 +38,8 @@ def _forever(): cxn = pika.BlockingConnection(params) channel = cxn.channel() - channel.queue_declare(queue='pika_pool_test') - channel.basic_consume(_callback, queue='pika_pool_test', no_ack=True) + channel.queue_declare(queue="pika_pool_test") + channel.basic_consume(_callback, queue="pika_pool_test", no_ack=True) thd = threading.Thread(target=_forever) thd.daemon = True @@ -50,39 +48,33 @@ def _forever(): @pytest.fixture def null_pool(params): - return pika_pool.NullPool( - create=lambda: pika.BlockingConnection(params), - ) + return pika_pool.NullPool(create=lambda: pika.BlockingConnection(params)) class Message(dict): - @classmethod def generate(cls, **kwargs): - id = kwargs.pop('id', uuid.uuid4().hex) + id = kwargs.pop("id", uuid.uuid4().hex) return cls(id=id, **kwargs) @property def id(self): - return self['id'] + return self["id"] def to_json(self): return json.dumps(self) @classmethod def from_json(cls, raw): - return cls(json.loads(raw.decode('utf-8'))) + return cls(json.loads(raw.decode("utf-8"))) class TestNullPool(object): - def test_pub(self, null_pool): msg = Message.generate() with null_pool.acquire() as cxn: cxn.channel.basic_publish( - exchange='', - routing_key='pika_pool_test', - body=msg.to_json() + exchange="", routing_key="pika_pool_test", body=msg.to_json() ) time.sleep(0.1) assert msg.id in consumed @@ -111,9 +103,9 @@ def empty_queued_pool(request, queued_pool): def test_use_it(): params = pika.URLParameters( - 'amqp://guest:guest@localhost:5672/?' - 'socket_timeout=10&' - 'connection_attempts=2' + "amqp://guest:guest@localhost:5672/?" + "socket_timeout=10&" + "connection_attempts=2" ) pool = pika_pool.QueuedPool( @@ -127,38 +119,32 @@ def test_use_it(): with pool.acquire() as cxn: cxn.channel.basic_publish( - body=json.dumps({ - 'type': 'banana', - 'description': 'they are yellow' - }), - exchange='', - routing_key='fruits', + body=json.dumps({"type": "banana", "description": "they are yellow"}), + exchange="", + routing_key="fruits", properties=pika.BasicProperties( - content_type='application/json', - content_encoding='utf-8', + content_type="application/json", + content_encoding="utf-8", delivery_mode=2, - ) + ), ) - assert 'cxn=localhost:5672//' in str(cxn.fairy) + assert "cxn=localhost:5672//" in str(cxn.fairy) class TestQueuedPool(object): - def test_invalidate_connection(slef, queued_pool): msg = Message.generate() with pytest.raises(select.error): with queued_pool.acquire() as cxn: fairy = cxn.fairy - raise select.error(9, 'Bad file descriptor') + raise select.error(9, "Bad file descriptor") assert fairy.cxn.is_closed def test_pub(self, queued_pool): msg = Message.generate() with queued_pool.acquire() as cxn: cxn.channel.basic_publish( - exchange='', - routing_key='pika_pool_test', - body=msg.to_json() + exchange="", routing_key="pika_pool_test", body=msg.to_json() ) time.sleep(0.1) assert msg.id in consumed