A Python client library for consuming messages from NATS JetStream in the SMILE ecosystem. This client provides both a standalone CLI tool and Django management commands for processing NATS messages with robust error handling, automatic reconnection, and graceful shutdown capabilities.
- Python 3.6+
# Clone the repository
git clone https://github.com/mskcc/smile-client.git
cd smile-client
# Install in development mode
pip install -e .
pip install -r requirements.txt
Create a config.json
file with your NATS connection parameters:
{
"NATS_URL": "nats://localhost:4222",
"NATS_USERNAME": "your_username",
"NATS_PASSWORD": "your_password",
"NATS_SSL_CERTFILE": "/path/to/cert.pem",
"NATS_SSL_KEYFILE": "/path/to/key.pem",
"NATS_ROOT_CA": "/path/to/rootCA.pem",
"NATS_FILTER_SUBJECT": "STREAM.consumers.*",
"NATS_DURABLE": "your_durable_name",
"CLIENT_TIMEOUT": 3600.0,
"CALLBACK": "your_module.your_callback_function"
}
For Django projects, add the configuration to your settings.py
:
SMILE_SETTINGS = {
"NATS_URL": "nats://localhost:4222",
"NATS_USERNAME": "your_username",
"NATS_PASSWORD": "your_password",
"NATS_SSL_CERTFILE": "/path/to/cert.pem",
"NATS_SSL_KEYFILE": "/path/to/key.pem",
"NATS_ROOT_CA": "/path/to/rootCA.pem",
"NATS_FILTER_SUBJECT": "STREAM.consumers.*",
"NATS_DURABLE": "your_durable_name",
"CLIENT_TIMEOUT": 3600.0,
"CALLBACK": "your_module.your_callback_function"
}
# Using the installed CLI command
smile-client start_listener --config=config.json --subject=STREAM.consumers.new-requests
# Using Python module
python -m smile_client.cli start_listener --config=config.json --subject=STREAM.consumers.new-requests
# Start consuming from a specific date
smile-client start_listener --config=config.json --subject=STREAM.consumers.new-requests --start-date=2024-01-15
# Enable debug logging
smile-client start_listener --config=config.json --subject=STREAM.consumers.new-requests --debug
Usage:
smile-client start_listener --config=<config_file> --subject=<subject> [--start-date=<date>] [--debug]
smile-client (-h | --help)
smile-client --version
Options:
-h --help Show this screen.
--version Show version.
--config=<config_file> Configuration file path [required].
--subject=<subject> NATS subject to consume from [required].
--start-date=<date> Start date in YYYY-MM-DD format [optional].
--debug Set logging level to DEBUG [optional].
# Run the consumer (uses Django settings)
python manage.py run_smile_consumer --subject=STREAM.consumers.new-requests
# Start from specific date
python manage.py run_smile_consumer --subject=STREAM.consumers.new-requests --start-date=2024-01-15
Create a custom message handler function to process incoming messages:
from smile_client.messages.smile_message import SmileMessage
import logging
logger = logging.getLogger("smile_client")
def my_message_handler(msg: SmileMessage):
"""
Process incoming SMILE messages
Args:
msg (SmileMessage): Message object containing subject and data
"""
try:
logger.info(f"Received message on '{msg.subject}': {msg.data}")
# Your message processing logic here
if msg.subject == "STREAM.consumers.new-requests":
process_new_request(msg.data)
elif msg.subject == "STREAM.consumers.updates":
process_update(msg.data)
except Exception as e:
logger.error(f"Error processing message: {e}")
raise
Then reference it in your configuration:
{
"CALLBACK": "my_module.my_message_handler"
}
The client supports graceful shutdown via:
- Ctrl+C (SIGINT): Interactive interrupt
- SIGTERM: Termination signal
- Connection errors: Automatic cleanup on connection failures
When a shutdown signal is received:
- Current message processing completes
- NATS subscription is unsubscribed
- Connection is cleanly closed
- Process exits with proper cleanup
The client automatically handles:
- Connection drops: Reconnects with exponential backoff
- Network timeouts: Retries with configurable delays
- Server unavailability: Continues retrying until successful
- Invalid JSON messages are logged and skipped
- Handler exceptions are caught and logged
- Failed messages are NAK'd for redelivery
# In your SmileClient initialization
client = SmileClient({
# ... other config ...
"max_reconnect_attempts": 10, # -1 for infinite
"reconnect_delay": 5, # seconds between retries
"client_timeout": 3600.0 # connection timeout
})
smile_client/
├── __init__.py
├── cli.py # Command-line interface
├── smile_client.py # Main client class
├── default_callback.py # Default message handler
├── messages/
│ ├── __init__.py
│ └── smile_message.py # Message data class
└── management/
└── commands/
└── run_smile_consumer.py # Django management command
TODO: Add this
TODO: Add this
The project uses:
- Black for code formatting
- flake8 for linting
If you encounter import errors, ensure the package is properly installed:
pip install -e .
Check your NATS server configuration and network connectivity:
# Test NATS connection
nats-cli server check --server=your-nats-server:4222
Verify your certificate paths and permissions:
# Check certificate files exist and are readable
ls -la /path/to/cert.pem /path/to/key.pem /path/to/rootCA.pem
Enable debug logging to troubleshoot issues:
smile-client start_listener --config=config.json --subject=your-subject --debug
Or in your Python code:
import logging
logging.basicConfig(level=logging.DEBUG)
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
TODO: Decide on License
For questions or issues:
- Check the troubleshooting section
- Open an issue on GitHub
- Contact the development team
- Initial release
- NATS JetStream integration
- CLI interface with docopt
- Django management commands
- Graceful shutdown handling # TODO: Fix this
- Automatic reconnection logic
- SSL/TLS support