Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 51 additions & 35 deletions pika_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,9 @@
import pika.exceptions


__version__ = '0.1.2'
__version__ = "0.1.3"

__all__ = [
'Error'
'Timeout'
'Overflow'
'Connection',
'Pool',
'NullPool',
'QueuedPool',
]
__all__ = ["Error" "Timeout" "Overflow" "Connection", "Pool", "NullPool", "QueuedPool"]


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -132,9 +124,7 @@ def is_connection_invalidated(cls, exc):

:return: True if connection has been invalidted, otherwise False.
"""
return any(
isinstance(exc, error)for error in cls.connectivity_errors
)
return any(isinstance(exc, error) for error in cls.connectivity_errors)

def __init__(self, pool, fairy):
self.pool = pool
Expand Down Expand Up @@ -242,13 +232,25 @@ def cxn_params(self):
def cxn_str(self):
params = self.cxn_params
if params:
return '{0}:{1}/{2}'.format(params.host, params.port, params.virtual_host)
return "{0}:{1}/{2}".format(
params.host, params.port, params.virtual_host
)

def __str__(self):
return ', '.join('{0}={1}'.format(k, v) for k, v in [
('cxn', self.cxn_str),
('channel', '{0}'.format(int(self.channel) if self.channel is not None else self.channel)),
])
return ", ".join(
"{0}={1}".format(k, v)
for k, v in [
("cxn", self.cxn_str),
(
"channel",
"{0}".format(
int(self.channel)
if self.channel is not None
else self.channel
),
),
]
)

def _create(self):
"""
Expand All @@ -274,14 +276,9 @@ class QueuedPool(Pool):
Queue backed pool.
"""

def __init__(self,
create,
max_size=10,
max_overflow=10,
timeout=30,
recycle=None,
stale=None,
):
def __init__(
self, create, max_size=10, max_overflow=10, timeout=30, recycle=None, stale=None
):
"""
:param max_size:
Maximum number of connections to keep queued.
Expand Down Expand Up @@ -327,11 +324,11 @@ def acquire(self, timeout=None):
except Overflow:
raise Timeout()
if self.is_expired(fairy):
logger.info('closing expired connection - %s', fairy)
logger.info("closing expired connection - %s", fairy)
self.close(fairy)
return self.acquire(timeout=timeout)
if self.is_stale(fairy):
logger.info('closing stale connection - %s', fairy)
logger.info("closing stale connection - %s", fairy)
self.close(fairy)
return self.acquire(timeout=timeout)
return self.Connection(self, fairy)
Expand Down Expand Up @@ -364,18 +361,37 @@ def _create(self):
raise

class Fairy(Pool.Fairy):

def __init__(self, cxn):
super(QueuedPool.Fairy, self).__init__(cxn)
self.released_at = self.created_at = time.time()

def __str__(self):
return ', '.join('{0}={1}'.format(k, v) for k, v in [
('cxn', self.cxn_str),
('channel', '{0}'.format(int(self.channel) if self.channel is not None else self.channel)),
('created_at', '{0}'.format(datetime.fromtimestamp(self.created_at).isoformat())),
('released_at', '{0}'.format(datetime.fromtimestamp(self.released_at).isoformat())),
])
return ", ".join(
"{0}={1}".format(k, v)
for k, v in [
("cxn", self.cxn_str),
(
"channel",
"{0}".format(
int(self.channel)
if self.channel is not None
else self.channel
),
),
(
"created_at",
"{0}".format(
datetime.fromtimestamp(self.created_at).isoformat()
),
),
(
"released_at",
"{0}".format(
datetime.fromtimestamp(self.released_at).isoformat()
),
),
]
)

def is_stale(self, fairy):
if not self.stale:
Expand Down
61 changes: 29 additions & 32 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,36 @@
import setuptools


with open("pika_pool.py") as version_file:
CODE = version_file.read()

with open("README.rst") as readme_file:
LONG_DESCRIPTION = readme_file.read()


setuptools.setup(
name='pika-pool',
version=(
re
.compile(r".*__version__ = '(.*?)'", re.S)
.match(open('pika_pool.py').read())
.group(1)
),
url='https://github.com/bninja/pika-pool',
license='BSD',
author='egon',
author_email='egon@gb.com',
description='Pools for pikas.',
long_description=open('README.rst').read(),
py_modules=['pika_pool'],
name="pika-pool",
version=(re.compile(r".*__version__ = \"(.*?)\"", re.S).match(CODE).group(1)),
url="https://github.com/bninja/pika-pool",
license="BSD",
author="egon",
author_email="egon@gb.com",
description="Pools for pikas.",
long_description=LONG_DESCRIPTION,
py_modules=["pika_pool"],
include_package_data=True,
platforms='any',
install_requires=[
'pika >=0.9,<0.11',
],
extras_require={
'tests': [
'pytest >=2.5.2,<3',
'pytest-cov >=1.7,<2',
],
},
platforms="any",
install_requires=["pika >=0.9,<0.11"],
extras_require={"tests": ["pytest >=2.5.2,<3", "pytest-cov >=1.7,<2"]},
classifiers=[
'Environment :: Web Environment',
'Intended Audience :: Developers',
'License :: OSI Approved :: BSD License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Software Development :: Libraries :: Python Modules'
]
"Environment :: Web Environment",
"Intended Audience :: Developers",
"License :: OSI Approved :: BSD License",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Topic :: Internet :: WWW/HTTP :: Dynamic Content",
"Topic :: Software Development :: Libraries :: Python Modules",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
],
)
64 changes: 25 additions & 39 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,23 @@
import pika_pool


@pytest.fixture(scope='session')
@pytest.fixture(scope="session")
def params():
return pika.URLParameters('amqp://guest:guest@localhost:5672/')
return pika.URLParameters("amqp://guest:guest@localhost:5672/")


@pytest.fixture(scope='session', autouse=True)
@pytest.fixture(scope="session", autouse=True)
def schema(request, params):
cxn = pika.BlockingConnection(params)
channel = cxn.channel()
channel.queue_declare(queue='pika_pool_test')
channel.queue_declare(queue="pika_pool_test")


consumed = {
}
consumed = {}


@pytest.fixture(scope='session', autouse=True)
@pytest.fixture(scope="session", autouse=True)
def consume(params):

def _callback(ch, method, properties, body):
msg = Message.from_json(body)
consumed[msg.id] = msg
Expand All @@ -40,8 +38,8 @@ def _forever():

cxn = pika.BlockingConnection(params)
channel = cxn.channel()
channel.queue_declare(queue='pika_pool_test')
channel.basic_consume(_callback, queue='pika_pool_test', no_ack=True)
channel.queue_declare(queue="pika_pool_test")
channel.basic_consume(_callback, queue="pika_pool_test", no_ack=True)

thd = threading.Thread(target=_forever)
thd.daemon = True
Expand All @@ -50,39 +48,33 @@ def _forever():

@pytest.fixture
def null_pool(params):
return pika_pool.NullPool(
create=lambda: pika.BlockingConnection(params),
)
return pika_pool.NullPool(create=lambda: pika.BlockingConnection(params))


class Message(dict):

@classmethod
def generate(cls, **kwargs):
id = kwargs.pop('id', uuid.uuid4().hex)
id = kwargs.pop("id", uuid.uuid4().hex)
return cls(id=id, **kwargs)

@property
def id(self):
return self['id']
return self["id"]

def to_json(self):
return json.dumps(self)

@classmethod
def from_json(cls, raw):
return cls(json.loads(raw.decode('utf-8')))
return cls(json.loads(raw.decode("utf-8")))


class TestNullPool(object):

def test_pub(self, null_pool):
msg = Message.generate()
with null_pool.acquire() as cxn:
cxn.channel.basic_publish(
exchange='',
routing_key='pika_pool_test',
body=msg.to_json()
exchange="", routing_key="pika_pool_test", body=msg.to_json()
)
time.sleep(0.1)
assert msg.id in consumed
Expand Down Expand Up @@ -111,9 +103,9 @@ def empty_queued_pool(request, queued_pool):

def test_use_it():
params = pika.URLParameters(
'amqp://guest:guest@localhost:5672/?'
'socket_timeout=10&'
'connection_attempts=2'
"amqp://guest:guest@localhost:5672/?"
"socket_timeout=10&"
"connection_attempts=2"
)

pool = pika_pool.QueuedPool(
Expand All @@ -127,38 +119,32 @@ def test_use_it():

with pool.acquire() as cxn:
cxn.channel.basic_publish(
body=json.dumps({
'type': 'banana',
'description': 'they are yellow'
}),
exchange='',
routing_key='fruits',
body=json.dumps({"type": "banana", "description": "they are yellow"}),
exchange="",
routing_key="fruits",
properties=pika.BasicProperties(
content_type='application/json',
content_encoding='utf-8',
content_type="application/json",
content_encoding="utf-8",
delivery_mode=2,
)
),
)
assert 'cxn=localhost:5672//' in str(cxn.fairy)
assert "cxn=localhost:5672//" in str(cxn.fairy)


class TestQueuedPool(object):

def test_invalidate_connection(slef, queued_pool):
msg = Message.generate()
with pytest.raises(select.error):
with queued_pool.acquire() as cxn:
fairy = cxn.fairy
raise select.error(9, 'Bad file descriptor')
raise select.error(9, "Bad file descriptor")
assert fairy.cxn.is_closed

def test_pub(self, queued_pool):
msg = Message.generate()
with queued_pool.acquire() as cxn:
cxn.channel.basic_publish(
exchange='',
routing_key='pika_pool_test',
body=msg.to_json()
exchange="", routing_key="pika_pool_test", body=msg.to_json()
)
time.sleep(0.1)
assert msg.id in consumed
Expand Down