Skip to content

Commit f9671e1

Browse files
authored
Introduce DBUtils PooledDB db connection pooling (#3)
* refactored database.db to app.db to simplify project structure * implement db connection pooling with DBUtils PooledDB (pooled_db) * minor fix in Handler: only close db connection if it was initialized * minor fix for previous commit * start explicit transaction with db.begin() in Cleanup, as required by DBUtils PooledDB * moved logger and db cleanup code into destructor of Handler made DBUtils PooledDB params configurable via .env vars * added db connection pooling configuration notes to README getting ready for v0.6.0 release
1 parent b2cb652 commit f9671e1

17 files changed

+138
-88
lines changed

.env.example

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ DB_USER=policyd-rate-guard
77
DB_PASSWORD=Example1234
88
DB_DATABASE=policyd-rate-guard
99

10+
## DB connection pool configuration
11+
## See https://webwareforpython.github.io/DBUtils/main.html#pooleddb-pooled-db
12+
# DB_POOL_MINCACHED=0
13+
# DB_POOL_MAXCACHED=10
14+
# DB_POOL_MAXSHARED=10
15+
# DB_POOL_MAXUSAGE=10000
16+
1017
## PolicydRateGuard configuration
1118
# QUOTA=1000 # The default quota for a user (default: 1000)
1219
# ACTION_TEXT_BLOCKED="Rate limit reached, retry later." # Here you can put a custom message to be shown to a user who is over the ratelimit.

CHANGELOG.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
11
# CHANGELOG
22

3+
## [v0.6.0](https://github.com/onlime/policyd-rate-guard/releases/tag/v0.6.0) (2023-09-01)
4+
5+
**Improved:**
6+
7+
- Improved performance and stability by introducing database connection pooling, using [DBUtils PooledDB (pooled_db)](https://webwareforpython.github.io/DBUtils/main.html#pooleddb-pooled-db)
8+
- Moved logger and db cleanup code into destructor of `Handler`.
9+
- Refactored `database.db` to `app.db` to simplify project structure.
10+
11+
**Added:**
12+
13+
- Added environment variables `DB_POOL_MINCACHED`, `DB_POOL_MAXCACHED`, `DB_POOL_MAXSHARED`, `DB_POOL_MAXUSAGE` for db connection pooling fine-tuning.
14+
15+
**Fixed:**
16+
17+
- Fix `Lost connection to MySQL server during query ` and ``AttributeError: 'NoneType' object has no attribute 'read'` (on db cursor) connectivity issues by introducing connection pooling.
18+
319
## [v0.5.1](https://github.com/onlime/policyd-rate-guard/releases/tag/v0.5.1) (2023-08-30)
420

521
**Added:**
@@ -9,7 +25,7 @@
925

1026
**Fixed:**
1127

12-
- Fix `AttributeError: 'NoneType' object has no attribute 'read' in Ratelimit.find()` edge case where `sasl_username` was set in Postfix DATA but empty. We now bail out early if `sasl_username` either does not exist or is empty.
28+
- Fix edge case where `sasl_username` was set in Postfix DATA but empty. We now bail out early if `sasl_username` either does not exist or is empty.
1329

1430
## [v0.5.0](https://github.com/onlime/policyd-rate-guard/releases/tag/v0.5.0) (2023-08-29)
1531

README.md

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ Actually, PolicydRateGuard is just a super simple Postfix policy daemon with onl
1717
But let me name some features that make it stand out from other solutions:
1818

1919
- **Super easy Postfix integration** using `check_policy_service` in `smtpd_data_restrictions`
20-
- Set **individual sender (SASL username) quotas**
20+
- **Tuned for high performance**, using network or unix sockets, threading, and db connection pooling.
21+
- Set **individual sender (SASL username) quotas**, which can be both persistent or only for the current time period.
2122
- Limit senders to **number of recipients** per time period
2223
- Automatically fills `ratelimits` table with new senders (SASL username) upon first email sent
23-
- Set your own time period (usually 24hrs) by resetting the counters via Systemd cleanup timer (or cronjob)
24+
- Set your own **time period (usually 24hrs)** by resetting the counters via Systemd cleanup timer (or cronjob)
2425
- Continues to raise counters (`msg_counter`, `rcpt_counter`) even in over quota state, so you know if a sender keeps retrying/spamming.
2526
- Keeps totals of all messages/recipients sent for each sender (SASL username)
2627
- Stores both **message and recipient counters** in database (`ratelimits` table)
@@ -29,9 +30,10 @@ But let me name some features that make it stand out from other solutions:
2930
- **Maximum failure safety:** On any unexpected exception, the daemon still replies with a `DUNNO` action, so that the mail is not getting rejected by Postfix. This is done both on Postfix integration side and application exception handling side.
3031
- **Block action message** `"Rate limit reached, retry later."` can be configured.
3132
- Lots of configuration params via a simple `.env`
32-
- **Tuned for high performance**, using network or unix sockets, and threading.
3333
- **Secure setup**, nothing running under `root`, only on `postfix` user.
34-
- A super slick minimal codebase with **only a few dependencies** ([PyMySQL](https://pypi.org/project/pymysql/), [python-dotenv](https://pypi.org/project/python-dotenv/), [yoyo-migrations](https://pypi.org/project/yoyo-migrations/)), using Python virtual environment for easy `pip` install. PyMySQL is a pure-Python MySQL client library, so you won't have any trouble on any future major system upgrades.
34+
- A multi-threaded app that uses [DBUtils PooledDB (pooled_db)](https://github.com/WebwareForPython/DBUtils) for **robust and efficient DB connection handling**.
35+
- Can be used with any [DB-API 2 (PEP 249)](https://peps.python.org/pep-0249/) conformant database adapter (currently supported: PyMySQL, sqlite3)
36+
- A super slick minimal codebase with **only a few dependencies** ([PyMySQL](https://pypi.org/project/pymysql/), [DBUtils](https://webwareforpython.github.io/DBUtils/), [python-dotenv](https://pypi.org/project/python-dotenv/), [yoyo-migrations](https://pypi.org/project/yoyo-migrations/)), using Python virtual environment for easy `pip` install. PyMySQL is a pure-Python MySQL client library, so you won't have any trouble on any future major system upgrades.
3537
- Provides an Ansible Galaxy role [`onlime.policyd_rate_guard`](https://galaxy.ansible.com/onlime/policyd_rate_guard) for easy installation on a Debian mailserver.
3638
- A **well maintained** project, as it is in active use at [Onlime GmbH](https://www.onlime.ch/), a Swiss webhoster with a rock-solid mailserver architecture.
3739

@@ -145,11 +147,12 @@ smtpd_data_restrictions =
145147
permit
146148
```
147149

148-
> **IMPORTANT:** We strongly recommend the advanced policy client configuration (supported since Postfix 3.0), using above syntax with **default action `DUNNO`**, instead of just using `check_policy_service inet:127.0.0.1:10033`.
149-
>
150-
> It ensures that if PolicydRateGuard becomes unavailable for any reason, Postfix will ignore it and keep accepting mail as if the rule was not there. PolicydRateGuard should be considered a "non-critical" policy service and you should use some monitoring solution to ensure it is always running as expected.
150+
**IMPORTANT:** We strongly recommend the advanced policy client configuration (supported since Postfix 3.0), using above syntax with **default action `DUNNO`**, instead of just using `check_policy_service inet:127.0.0.1:10033`.
151+
152+
It ensures that if PolicydRateGuard becomes unavailable for any reason, Postfix will ignore it and keep accepting mail as if the rule was not there. PolicydRateGuard should be considered a "non-critical" policy service and you should use some monitoring solution to ensure it is always running as expected.
151153

152-
> **NOTE:** You may use `unix:rateguard/policyd` instead of `inet:127.0.0.1:10033` if you have configured PolicydRateGuard to use a unix socket (`SOCKET="/var/spool/postfix/rateguard/policyd"` environment variable).
154+
> [!NOTE]
155+
> You may use `unix:rateguard/policyd` instead of `inet:127.0.0.1:10033` if you have configured PolicydRateGuard to use a unix socket (`SOCKET="/var/spool/postfix/rateguard/policyd"` environment variable).
153156
154157
Make sure to reload Postfix after this change:
155158

@@ -225,13 +228,21 @@ PolicydRateGuard can be fully configured through environment variables in `.env`
225228
- `SENTRY_ENVIRONMENT`
226229
Sentry environment. Suggested values: `dev` or `prod`, but can be any custom string. Defaults to `dev`.
227230

231+
You may also tune the database connection pooling by modifying the following environment variables (defaults are fine for most environments, and you'll find e detailed description in the [DBUtils PooledDB](https://webwareforpython.github.io/DBUtils/main.html#pooleddb-pooled-db-1) usage docs):
232+
233+
- `DB_POOL_MINCACHED` (default: `0`)
234+
- `DB_POOL_MAXCACHED` (default: `10`
235+
- `DB_POOL_MAXSHARED` (default: `10`)
236+
- `DB_POOL_MAXUSAGE` (default: `10000`)
237+
228238
For production, we recommend to start by copying `.env.example` and then fine-tune your `.env`:
229239

230240
```bash
231241
$ cp .env.example .env
232242
```
233243

234-
> **NOTE:** Minimally, you should set `DB_PASSWORD`, and maybe enable `SYSLOG` logging. For all the other config params it's usually fine to stick with the defaults.
244+
> [!NOTE]
245+
> Minimally, you should set `DB_PASSWORD`, and maybe enable `SYSLOG` logging. For all the other config params it's usually fine to stick with the defaults.
235246
236247
## Development 👩‍💻
237248

@@ -337,6 +348,7 @@ $ . venv/bin/activate
337348
(venv)$ ./tests.sh
338349
```
339350

351+
> [!IMPORTANT]
340352
> Make sure to always run the tests inside your venv!
341353
342354
### Configure Sentry SDK
@@ -389,8 +401,8 @@ Planned features (coming soon):
389401
- [x] **Sentry** integration for exception reporting
390402
- [x] **Ansible role** for easy production deployment
391403
- [x] **Github workflow** for CI/testing
392-
- [ ] **Publish package** to [PyPI](https://pypi.org/)
393404
- [ ] Implement a **configurable webhook API** call for notification to sender on reaching quota limit (on first block) to external service.
405+
- [ ] **Publish package** to [PyPI](https://pypi.org/) (Might need some restructuring. Any help greatly appreciated!)
394406

395407
## Credits 🙏
396408

app/db.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from importlib import import_module
2+
from dbutils.pooled_db import PooledDB
3+
4+
class DbConnectionPool:
5+
def __init__(self, conf: object):
6+
self.driver = conf.get('DB_DRIVER', 'pymysql').lower()
7+
self.backend = import_module(self.driver)
8+
db_config = getattr(self, 'get_dbconfig_' + self.driver)(conf)
9+
10+
self.pool = PooledDB(
11+
creator=self.backend, # pymysql or sqlite3
12+
mincached=int(conf.get('DB_POOL_MINCACHED', 0)),
13+
maxcached=int(conf.get('DB_POOL_MAXCACHED', 10)),
14+
maxshared=int(conf.get('DB_POOL_MAXSHARED', 10)),
15+
maxusage=int(conf.get('DB_POOL_MAXUSAGE', 10000)),
16+
# maxconnections=int(conf.get('DB_POOL_MAXCONNECTIONS', 20)),
17+
**db_config
18+
)
19+
20+
def connection(self):
21+
connection = self.pool.connection()
22+
if self.driver == 'sqlite3':
23+
# https://docs.python.org/3/library/sqlite3.html#sqlite3.Row
24+
connection.row_factory = self.backend.Row
25+
return connection
26+
27+
28+
def get_dbconfig_pymysql(self, conf: object) -> dict:
29+
return {
30+
'host': conf.get('DB_HOST', 'localhost'),
31+
'user': conf.get('DB_USER', 'policyd-rate-guard'),
32+
'password': conf.get('DB_PASSWORD', ''),
33+
'database': conf.get('DB_DATABASE', 'policyd-rate-guard'),
34+
'port': int(conf.get('DB_PORT', 3306)),
35+
'cursorclass': self.backend.cursors.DictCursor
36+
}
37+
38+
def get_dbconfig_sqlite3(self, conf: object) -> dict:
39+
return {
40+
'database': conf.get('DB_DATABASE', ':memory:'),
41+
}

app/handler.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,21 @@
33
class Handler:
44
"""Handle request"""
55

6+
db = None
67
data = ''
78

8-
def __init__(self, conn: object, addr: str, conf: object, logger: object, db: object):
9+
def __init__(self, conn: object, addr: str, conf: object, logger: object, db_pool: object):
910
self.conn = conn
1011
self.addr = addr
1112
self.conf = conf
1213
self.logger = logger
13-
self.db = db
14+
self.db_pool = db_pool
1415
try:
1516
self.handle()
1617
except Exception as e: # pragma: no cover
1718
self.logger.exception('Unhandled Exception: %s', e)
1819
self.logger.warning('Received DATA: %s', self.data)
1920
self.send_response('DUNNO') # use DUNNO as accept action, just to distinguish between OK and unhandled exception
20-
self.conn.close()
2121

2222
def handle(self):
2323
"""Handle request"""
@@ -46,16 +46,14 @@ def handle(self):
4646
if not request.get('sasl_username'):
4747
self.logger.debug('sasl_username is empty, accepting message and reply with DUNNO')
4848
self.send_response('DUNNO')
49-
self.conn.close()
5049
return
5150

5251
# Temp debugging of message data without recipient (e.g. on multiple To: addresses)
5352
# if not request.get('recipient'):
5453
# self.logger.warning('Received DATA with no recipient: %s', self.data)
5554

56-
# PyMySQL: Ensure the db connection is alive
57-
if self.conf.get('DB_DRIVER', 'pymysql').lower() == 'pymysql':
58-
self.db.ping(reconnect=True)
55+
# Get database connection from DB pool
56+
self.db = self.db_pool.connection()
5957

6058
# Handle message
6159
message = Message(
@@ -98,21 +96,28 @@ def handle(self):
9896
# Create response
9997
if blocked:
10098
self.logger.warning('Message BLOCKED: %s', message.get_props_description())
101-
self.send_response('DEFER_IF_PERMIT ' + self.conf.get('ACTION_TEXT_BLOCKED', 'Rate limit reached, retry later'))
10299
if not was_blocked: # TODO: Implement webhook API call for notification to sender on quota limit reached (only on first block)
103100
self.logger.debug('Quota limit reached for %s, notifying sender via webhook!', message.sender)
104101
self.logger.warning(log_message)
102+
self.send_response('DEFER_IF_PERMIT ' + self.conf.get('ACTION_TEXT_BLOCKED', 'Rate limit reached, retry later'))
105103
else:
106104
self.logger.debug('Message ACCEPTED: %s', message.get_props_description())
107-
self.send_response('OK')
108105
self.logger.info(log_message)
109-
110-
self.logger.msgid = None # Reset msgid in logger
111-
self.conn.close()
106+
self.send_response('OK')
112107

113108
def send_response(self, message: str = 'OK'):
114109
"""Send response"""
115-
# actions return to postfix, see http://www.postfix.org/access.5.html for a list of actions.
110+
# actions return to Postfix, see http://www.postfix.org/access.5.html for a list of actions.
116111
data = 'action={}\n\n'.format(message)
117112
self.logger.debug('Sending data: %s', data)
118113
self.conn.send(data.encode('utf-8'))
114+
self.conn.close()
115+
116+
def __del__(self):
117+
"""Destructor"""
118+
self.logger.msgid = None # Reset msgid in logger
119+
# TODO: Do we need to close the cursor as well? (prior to closing the connection)
120+
if self.db is not None:
121+
self.db.close() # Close database connection
122+
self.db = None
123+
self.logger.debug('Handler destroyed')

app/ratelimit.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,18 @@ def find(sender: str, db: object, logger: object, conf: object) -> object:
138138
)
139139

140140
@staticmethod
141-
def reset_all_counters(db: object, logger: object):
141+
def reset_all_counters(db_pool: object, logger: object):
142142
"""Reset all ratelimit counters"""
143143
logger.debug('Reset all counters')
144-
cursor = db.cursor()
145-
# reset all counters, but don't change updated_at timestamp
146-
cursor.execute('UPDATE `ratelimits` SET `msg_counter` = 0, `rcpt_counter` = 0, `updated_at` = `updated_at`')
147-
# only reset quota if it is not locked
148-
cursor.execute('UPDATE `ratelimits` SET `quota` = `quota_reset`, `updated_at` = `updated_at` WHERE `quota_locked` = 0')
149-
db.commit()
144+
db = db_pool.connection()
145+
try:
146+
# With DBUtils PooledDB, we need to explicitly start a transaction
147+
db.begin()
148+
cursor = db.cursor()
149+
# reset all counters, but don't change updated_at timestamp
150+
cursor.execute('UPDATE `ratelimits` SET `msg_counter` = 0, `rcpt_counter` = 0, `updated_at` = `updated_at`')
151+
# only reset quota if it is not locked
152+
cursor.execute('UPDATE `ratelimits` SET `quota` = `quota_reset`, `updated_at` = `updated_at` WHERE `quota_locked` = 0')
153+
db.commit()
154+
finally:
155+
db.close()

cleanup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
from app.conf import Config
22
from app.logging import get_logger
3-
from database.db import connect_database
3+
from app.db import DbConnectionPool
44
from app.ratelimit import Ratelimit
55

66
class Cleaner:
77

88
def __init__(self, conf: object = None) -> None:
99
self.conf = conf or Config()
1010
self.logger = get_logger(self.conf)
11-
self.db = connect_database(self.conf)
11+
self.db_pool = DbConnectionPool(self.conf)
1212
self.cleanup()
1313

1414
def cleanup(self) -> None:
1515
"""Cleanup database"""
1616
self.logger.debug('Cleaning up database')
17-
Ratelimit.reset_all_counters(self.db, self.logger)
17+
Ratelimit.reset_all_counters(self.db_pool, self.logger)
1818

1919

2020
if __name__ == '__main__': # pragma: no cover

database/db.py

Lines changed: 0 additions & 27 deletions
This file was deleted.

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
cryptography==41.0.*
2+
DBUtils==3.0.*
23
PyMySQL==1.1.*
34
python-dotenv==1.0.*
45
sentry-sdk==1.29.*

run.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
from app.conf import Config
1919
from app.handler import Handler
2020
from app.logging import get_logger
21-
from database.db import connect_database
21+
from app.db import DbConnectionPool
2222

2323
class Daemon:
2424

2525
def __init__(self) -> None:
2626
self.conf = Config()
2727
self.logger = get_logger(self.conf)
28-
self.db = connect_database(self.conf)
28+
self.db_pool = DbConnectionPool(self.conf)
2929
# TODO: Improve socket configuration parsing
3030
self.socket_conf = self.conf.get_array('SOCKET', '127.0.0.1,10033')
3131
self.init_sentry()
@@ -49,7 +49,7 @@ def run(self) -> None:
4949
conn, addr = self.socket.accept()
5050
threading.Thread(
5151
target=Handler,
52-
args=(conn, addr, self.conf, self.logger, self.db)
52+
args=(conn, addr, self.conf, self.logger, self.db_pool)
5353
).start()
5454
except KeyboardInterrupt:
5555
break

0 commit comments

Comments
 (0)