diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea5cf20 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +env +logs diff --git a/README.md b/README.md index f0b486f..24e2a64 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,35 @@ openshift-celery-cartridge ========================== -Cartridge to Expose Celery as a Daemon on OpenShift +Cartridge to Expose Celery as a Daemon on OpenShift. -Environment Variables +Status --------------------- +This work on unscaled applications but only as a plugin cartridge on scaled applications. This means that it's can't scale independently of your main gear. At this point having a celery cartridge is no better than just starting celery in your app.y or action hooks. Further development can combine this with a python cartridge so it can operate in it's own gears. + +Instead of using this I recommend starting celery in your action hooks or as part of you app.py instead. This will give more flexibility, and example is here https://github.com/appsembler/appsembler-launch-openshift + +Configuration +--------------------- + +* OPENSHIFT_CELERY_CONFIG + This is the name of your config file, which by default is `$OPENSHIFT_CELERY_DIR/conf.d/celeryconfig.py` but you can copy this to `$OPENSHIFT_DATA_DIR/config/celeryconfig.py` and edit your own copy. + +To install +--------------------- + + rhc cartridge-add https://raw.github.com/wassname/openshift-celery-cartridge/master/metadata/manifest.yml -a "appname" + +Any log output will be generated to `${OPENSHIFT_HOMEDIR}logs/celery_log.txt` and will be viewable with the rhc tail "appname" command + +This was tested using: + + celery==3.1.11 + redis==2.10.3 + +To manage +--------------------- + + $ rhc cartridge-status celeryd -a "yourapp" -- OPENSHIFT_CELERY_BROKER_TRANS : Defines the broker type that celery will use. Current available options are: - -- mongodb : DEFAULT - -- amqp - -- redis -- OPENSHIFT_CELERY_BROKER_URL : The connection URL for the broker, omitting the transport. For example, an amqp value may look like: guest:guest@localhost:5672// -- -- OPENSHIFT_CELERY_IMPORTS : Defines the modules that celery should import. Currently only supports 1 module import path -Currently only supports MongoDB as backend for queues, but in progress to support more environment variables to allow vairous backends and configurations. diff --git a/bin/control b/bin/control index 9681467..011b369 100755 --- a/bin/control +++ b/bin/control @@ -1,18 +1,31 @@ #!/bin/bash -e -PATH=/bin/:/usr/bin:$PATH - -#source $OPENSHIFT_CARTRIDGE_SDK_BASH +source $OPENSHIFT_CARTRIDGE_SDK_BASH function start { - export PYTHONPATH=$OPENSHIFT_REPO_DIR/.openshift:$OPENSHIFT_REPO_DIR/wsgi::$OPENSHIFT_CELERY_DIR/conf.d:$OPENSHIFT_REPO_DIR:$PYTHONPATH - usr/celeryd-multi start --cmd=celeryd --pidfile=etc/celeryd.pid --hostname=$OPENSHIFT_INTERNAL_IP --loglevel=DEBUG + LOGPIPE=${OPENSHIFT_HOMEDIR}/app-root/runtime/logshifter-python + rm -f $LOGPIPE && mkfifo $LOGPIPE + /usr/bin/logshifter -tag python < $LOGPIPE & + + PYTHONPATH=$OPENSHIFT_DATA_DIR:$OPENSHIFT_DATA_DIR/config/:$OPENSHIFT_REPO_DIR/.openshift:$OPENSHIFT_REPO_DIR/config:$OPENSHIFT_REPO_DIR/EXT:$OPENSHIFT_REPO_DIR/wsgi::$OPENSHIFT_CELERY_DIR/conf.d:$OPENSHIFT_REPO_DIR:$PYTHONPATH + + echo "Starting Celery" + #echo "nohup nice -n 10 ${OPENSHIFT_CELERY_DIR}usr/celery multi start worker --config=${OPENSHIFT_CELERY_CONFIG=celeryconfig} --loglevel=DEBUG --hostname=$OPENSHIFT_APP_DNS --pidfile=${OPENSHIFT_CELERY_DIR}etc/celeryd.pid --logfile=${OPENSHIFT_HOMEDIR}app-root/logs/celery_log.txt &> $LOGPIPE &" > ${OPENSHIFT_LOG_DIR}celery_log.txt + nohup nice -n 10 ${OPENSHIFT_CELERY_DIR}usr/celery multi start worker --config=${OPENSHIFT_CELERY_CONFIG=celeryconfig} --loglevel=DEBUG --hostname=$OPENSHIFT_APP_DNS --pidfile=${OPENSHIFT_CELERY_DIR}etc/celeryd.pid --logfile=${OPENSHIFT_LOG_DIR}celery_log.txt &> $LOGPIPE & + if ps -p `cat ${OPENSHIFT_CELERY_DIR}etc/celeryd.pid` > /dev/null; + then + echo "Starting celery worked" + else + echo "Starting celery failed" + fi } function stop { - if ps -p `cat etc/celeryd.pid` > /dev/null 2>$1 + celery multi stop worker --pidfile=${OPENSHIFT_CELERY_DIR}etc/celeryd.pid + if ps -p `cat ${OPENSHIFT_CELERY_DIR}etc/celeryd.pid` > /dev/null; then - kill -9 `cat etc/celeryd.pid` + kill -9 `cat ${OPENSHIFT_CELERY_DIR}etc/celeryd.pid` + rm -f ${OPENSHIFT_CELERY_DIR}etc/celeryd.pid else echo "nothing to kill" fi @@ -20,13 +33,36 @@ function stop { } function restart { - echo "restarted" + celery multi restart worker --config=${OPENSHIFT_CELERY_CONFIG=celeryconfig} --loglevel=DEBUG --hostname=$OPENSHIFT_APP_DNS --pidfile=${OPENSHIFT_CELERY_DIR}etc/celeryd.pid --logfile=${OPENSHIFT_LOG_DIR}celery_log.txt + echo "Restarted celery" + } +function status() { + pid=`cat ${OPENSHIFT_CELERY_DIR}etc/celeryd.pid` + PROCESS_COUNT=$(ps -ef | grep "${PROCESS_NAME}" | grep -v "grep" | wc -l) + client_result $pwd + if [ -f $pid ] && ( kill -0 $(cat $pid) ); then + client_result "Celery is running at pid:${pid}, $PROCESS_COUNT instances of celery" + else + client_result "Celery is not running" + fi +} + + function catchall { echo "not yet implemented" } +# Ensure arguments. +if ! [ $# -gt 0 ]; then + echo "Usage: $0 [start|restart|stop|status]" + exit 1 +fi + +# Source utility functions. +source $OPENSHIFT_CARTRIDGE_SDK_BASH + case "$1" in start) start ;; stop) stop ;; diff --git a/bin/mock_server.rb b/bin/mock_server.rb deleted file mode 100755 index e9ef115..0000000 --- a/bin/mock_server.rb +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env ruby -require 'webrick' -include WEBrick - -config = {} -config.update(:Port => 8080) -config.update(:BindAddress => ARGV[0]) -config.update(:DocumentRoot => ARGV[1]) -server = HTTPServer.new(config) -['INT', 'TERM'].each {|signal| - trap(signal) {server.shutdown} -} - -server.start diff --git a/bin/setup b/bin/setup index 8b13789..66df359 100755 --- a/bin/setup +++ b/bin/setup @@ -1 +1,9 @@ +#!/bin/bash -e +source $OPENSHIFT_CARTRIDGE_SDK_BASH + +for dir in logs pid tmp env; do + mkdir -p $dir +done + +mkdir -p $OPENSHIFT_DATA_DIR/.celery diff --git a/conf.d/celeryconfig.py b/conf.d/celeryconfig.py new file mode 100644 index 0000000..ac04e6d --- /dev/null +++ b/conf.d/celeryconfig.py @@ -0,0 +1,20 @@ +# Modify this file to configure celery. +# You can move it to your data directory and reference it in enviromental variable $OPENSHIFT_CELERY_CONFIG. +# You could also insert variables at build using an acton hook. +# config options here http://docs.celeryproject.org/en/2.5/configuration.html#worker-celeryd + +import os +import sys + +sys.path.append('.') + +BROKER_HOST = "localhost" +BROKER_PORT = 5672 +BROKER_USER = "celeryuser" +BROKER_PASSWORD = "celery" +BROKER_VHOST = "celeryvhost" +CELERYD_OPTS="--autoscale=3,1 --time-limit=300 --concurrency=2 " +CELERY_RESULT_BACKEND = "amqp" + +# Here we check the env vars first, otherwise go with "tasks" +CELERY_IMPORTS = ( os.getenv("OPENSHIFT_CELERY_IMPORTS","tasks") ,) diff --git a/conf.d/celeryconfig.py.erb b/conf.d/celeryconfig.py.erb deleted file mode 100644 index 5e5cfc3..0000000 --- a/conf.d/celeryconfig.py.erb +++ /dev/null @@ -1,66 +0,0 @@ -print 'Configuring celeryd with default cartridge config file' -<% -broker_type = ENV.fetch('OPENSHIFT_CELERY_BROKER_TRANS', 'mongodb') -if broker_type == "mongodb" %> -## Broker settings. -<% default_url = "localhost:27017/celery_tasks" %> -BROKER_URL = 'mongodb://<%= ENV.fetch('OPENSHIFT_CELERY_BROKER_URL', default_url) %>' -## Using the database to store task state and results. -CELERY_RESULT_BACKEND = "mongodb" -CELERY_RESULT_DBURI = 'mongodb://<%= ENV.fetch('OPENSHIFT_CELERY_RESULT_BACKEND', default_url) %>' - -<% elsif broker_type == "amqp" %> -## Broker settings -BROKER_URL = 'amqp://<%= ENV.fetch('OPENSHIFT_CELERY_BROKER_URL', 'guest:guest@localhost:5672//') %>' -<% elsif broker_type == "redis" %> -## Broker settings -BROKER_URL = 'redis://<%= ENV.fetch('OPENSHIFT_CELERY_BROKER_URL', 'localhost:6379/0') %>' -BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': <%= ENV.fetch('OPENSHIFT_CELERY_VISIBILITY_TIME', 3600) %> } -CELERY_RESULT_BACKEND = 'redis://<%= ENV.fetch('OPENSHIFT_CELERY_RESULT_BACKEND', 'localhost:6379/0') %>' -<% end %> -# List of modules to import when celery starts. -<% - celery_imports = ENV.fetch('OPENSHIFT_CELERY_IMPORTS', '').split(',') - celery_imports = celery_imports.map { |a| "'" + a + "'" } - celery_imports = celery_imports.join(",") -%> -CELERY_IMPORTS = (<%= celery_imports %>, ) - - -CELERY_ANNOTATIONS = {"tasks.add": {"rate_limit": <%= ENV.fetch('OPENSHIFT_CELERY_RATE_LIM', '10/s') %>}} - -""" -BROKER_BACKEND = 'mongodb' -CELERY_RESULT_BACKEND = 'mongodb' -BROKER_HOST = os.environ.get('OPENSHIFT_MONGODB_DB_HOST') -BROKER_PORT = os.environ.get('OPENSHIFT_MONGODB_DB_PORT') -BROKER_USER = os.environ.get('OPENSHIFT_MONGODB_DB_USERNAME') -BROKER_PASSWORD = os.environ.get('OPENSHIFT_MONGODB_DB_PASSWORD') -BROKER_VHOST = 'celery_tasks' -BROKER_URL = 'mongodb://admin:XCf67tq_BqKU@writeown-tresback.rhcloud.com/celery_tasks' -CELERY_RESULT_BACKEND = 'mongodb://admin:XCf67tq_BqKU@writeown-tresback.rhcloud.com/celery_tasks' -BROKER_URL = 'mongodb://admin:XCf67tq_BqKU@127.8.167.130/celery_tasks' -#"%scelery_tasks" % os.environ.get('OPENSHIFT_MONGODB_DB_URL') -BROKER_BACKEND = 'mongodb' -CELERY_RESULT_BACKEND = 'mongodb' -BROKER_HOST = os.environ.get('OPENSHIFT_MONGODB_DB_HOST') -BROKER_PORT = os.environ.get('OPENSHIFT_MONGODB_DB_PORT') -BROKER_TRANSPORT = 'mongodb' -BROKER_VHOST = 'celery_tasks' -CELERY_MONGODB_BACKEND_SETTINGS = { - "host": os.environ.get('OPENSHIFT_MONGODB_DB_HOST'), - "port": os.environ.get('OPENSHIFT_MONGODB_DB_PORT'), - "database": 'celery_tasks', -} -CELERY_MONGODB_BACKEND_SETTINGS = { - "host": os.environ.get('OPENSHIFT_MONGODB_DB_HOST'), - "port": os.environ.get('OPENSHIFT_MONGODB_DB_PORT'), - "database": 'celery_tasks', - "user": os.environ.get('OPENSHIFT_MONGODB_DB_USERNAME'), - "password": os.environ.get('OPENSHIFT_MONGODB_DB_PASSWORD') -} - -BROKER_BACKEND = 'redis' -BROKER_URL = "redis://127.8.167.131:16379/0" -CELERY_RESULT_BACKEND = "redis://127.8.167.131:16379/0" -""" diff --git a/hooks/publish-celery-connection-info b/hooks/publish-celery-connection-info new file mode 100755 index 0000000..28ce2bb --- /dev/null +++ b/hooks/publish-celery-connection-info @@ -0,0 +1,5 @@ +#!/bin/bash + +echo "S_CELERY_MASTER=$CELERY_MASTER" +echo "S_CELERY_HOST=$OPENSHIFT_GEAR_DNS" +echo "S_CELERY_PORT=$OPENSHIFT_GEAR_PORT" diff --git a/hooks/publish-db-connection-info b/hooks/publish-db-connection-info new file mode 100755 index 0000000..dc458e8 --- /dev/null +++ b/hooks/publish-db-connection-info @@ -0,0 +1,4 @@ +#!/bin/bash + +echo "OPENSHIFT_CELERY_HOST=$OPENSHIFT_GEAR_DNS" +echo "OPENSHIFT_CELERY_PORT=$OPENSHIFT_GEAR_PORT" diff --git a/hooks/set-celery-connection-info b/hooks/set-celery-connection-info new file mode 100755 index 0000000..4498507 --- /dev/null +++ b/hooks/set-celery-connection-info @@ -0,0 +1,87 @@ +#!/usr/bin/ruby + +def gear_info(tokens, gear_id, &block) + gears = {} + if tokens.length == 1 + gears[gear_id] = tokens.first + else + while not tokens.empty? + gear, delim, data = tokens.shift(3) + data = yield data if block_given? + raise "Invalid data" unless delim == '=' + gears[gear] = data + tokens.shift if tokens.first == ' ' + end + end + gears +end + +def tokenize(s) + tokens = [] + a = "" + state = :start + s.scan(/([ \t]+)|(\\')|(')|([^ \t']+)/) do |args| + space, escaped_delim, delim, text = args + case state + when :start + case + when space then " " + when escaped_delim then raise "Unexpected delimiter" + when delim then state = :within_delim + when text then tokens << text + else raise "error" + end + when :within_delim + case + when space then a << space + when escaped_delim then a << '"' + when delim then tokens << a; a = ""; state = :start + when text then a << text + end + end + end + tokens << a if a.length > 0 + tokens +end + +gear_id = ARGV.shift +domain = ARGV.shift +tokens = tokenize(ARGV.shift) + +# better if this is in creation order (oldest first) +gears = gear_info(tokens, gear_id) do |d| + d.split(' ').map{ |s| s.scan(/\A(.+?)=(.*?);?\Z/).first }.inject({}){ |h, (k,v)| h[k] = v if v != ''; h } +end +gears.each_pair{ |k,v| puts "Found gear #{k}#{k == gear_id ? '* ' : ''} with data #{v.inspect}" } +gear_ids = gears.keys.sort.uniq + +puts "-------" + +was_master = ENV['CELERY_MASTER'] == '1' +masters = gears.map{ |k,v| v['S_CELERY_MASTER'] == '1' ? k : nil }.compact.uniq.sort + +mode = + case ENV['CELERY_MODE'] + when 'read_replica' + if masters.length > 1 + masters = masters[0,1] + elsif masters.length == 0 + masters = gears.keys.uniq.sort[0,1] + end + puts "Running in read replica mode with master #{masters}" + :read_replica + else + masters = gears.keys.uniq.sort + puts "Running sharded with masters #{masters.inspect}" + :sharded + end + +# An array of all of the host:port pairs for the cluster +hosts = gears.map{ |k,v| "#{v['S_CELERY_HOST']}:#{v['S_CELERY_PORT']}" }.compact.uniq.sort +# A list of key value pairs in => for the cluster +members = gears.map{ |k,v| "#{k}=#{v['S_CELERY_HOST']}:#{v['S_CELERY_PORT']}" }.compact.uniq.sort + +File.open('env/CELERY_CLUSTER_MEMBERS', 'w'){ |f| f.puts members.join("\n") } +File.open('env/CELERY_CLUSTER', 'w'){ |f| f.puts hosts.join(",") } +File.open('env/CELERY_CLUSTER_MASTERS', 'w'){ |f| f.puts masters.join(",") } +File.open('env/CELERY_MASTER', 'w'){ |f| f.puts masters.include?(gear_id) ? "1" : "0" } diff --git a/metadata/managed_files.yml b/metadata/managed_files.yml index 24d0b94..be02f60 100644 --- a/metadata/managed_files.yml +++ b/metadata/managed_files.yml @@ -3,5 +3,3 @@ locked_files: - env/ - env/* - conf.d/* -processed_templates: -- '**/*.erb' diff --git a/metadata/manifest.yml b/metadata/manifest.yml index d0ba915..aab7c58 100644 --- a/metadata/manifest.yml +++ b/metadata/manifest.yml @@ -1,21 +1,49 @@ Name: celeryd Cartridge-Short-Name: CELERY Display-Name: Celery Cartridge -Description: "A cartridge for running the celeryd daemon to allow asynchronous tasks in your application." -Version: '0.1' -License: "None" -Vendor: Custom Cartridges Inc -Cartridge-Version: 0.0.1 -Cartridge-Vendor: customcarts -Source-Url: https://github.com/tresbailey/openshift-celery-cartridge.git +Description: A cartridge for running the celeryd daemon to allow asynchronous tasks in your application. +Version: '0.16' +License: ASL 2.0 +Vendor: Matti HEIKKILA +Cartridge-Version: + - '1.0.2' +Cartridge-Vendor: wassname +Website: https://github.com/wassname/openshift-celery-cartridge +Source-Url: https://github.com/wassname/openshift-celery-cartridge.git Categories: - - service + - plugin + - embedded +Requires: python +Scaling: + Min: 1 + Max: -1 Provides: - celery +Endpoints: + - Private-IP-Name: HOST + Private-Port-Name: PORT + Private-Port: 16388 + Public-Port-Name: PROXY_PORT Cart-Data: - Key: OPENSHIFT_CELERY_HOME Type: environment - Description: "An environment variable for the Home dir of the binary" + Description: An environment variable for the Home dir of the binary + - Key: OPENSHIFT_CELERY_BROKER_URL + Type: environment + Description: An environment variable for the celery broker + - Key: OPENSHIFT_CELERY_IMPORTS + Type: environment + Description: An environment variable for the python modules for celery to import Group-Overrides: - components: - celery + - python +Subscribes: + set-rcelery-connection-info: + Type: "ENV:NET_TCP:db:celery:connection-info-v1" + Required: false +Publishes: + publish-db-connection-info: + Type: "ENV:NET_TCP:db:connection-info" + publish-celery-connection-info: + Type: "ENV:NET_TCP:db:celery:connection-info-v1" diff --git a/usr/celery b/usr/celery index 10ec98c..de3cef9 100755 --- a/usr/celery +++ b/usr/celery @@ -1,13 +1,16 @@ #!/usr/bin/env python2.7 -import os; activate_this=os.path.join(os.path.dirname('%svirtenv/bin/' % os.getenv('OPENSHIFT_PYTHON_DIR')), 'activate_this.py'); execfile(activate_this, dict(__file__=activate_this)); del os, activate_this +import os +activate_this=os.path.join(os.path.dirname('%svirtenv/bin/' % os.getenv('OPENSHIFT_PYTHON_DIR')), 'activate_this.py') +execfile(activate_this, dict(__file__=activate_this)) +del os, activate_this -# EASY-INSTALL-ENTRY-SCRIPT: 'celery==3.0.24','console_scripts','celery' -__requires__ = 'celery==3.0.24' +# EASY-INSTALL-ENTRY-SCRIPT: 'celery','console_scripts','celery' +__requires__ = 'celery' import sys from pkg_resources import load_entry_point if __name__ == '__main__': sys.exit( - load_entry_point('celery==3.0.24', 'console_scripts', 'celery')() + load_entry_point('celery', 'console_scripts', 'celery')() ) diff --git a/usr/celerybeat b/usr/celerybeat index aeafaf0..2f7fd10 100755 --- a/usr/celerybeat +++ b/usr/celerybeat @@ -1,9 +1,9 @@ #!/home/tres/Code/Scribble/screnv/bin/python # EASY-INSTALL-ENTRY-SCRIPT: 'celery==3.0.21','console_scripts','celerybeat' -__requires__ = 'celery==3.0.21' +__requires__ = 'celery' import sys from pkg_resources import load_entry_point sys.exit( - load_entry_point('celery==3.0.21', 'console_scripts', 'celerybeat')() + load_entry_point('celery', 'console_scripts', 'celerybeat')() ) diff --git a/usr/celeryctl b/usr/celeryctl index 703c735..3ce9912 100755 --- a/usr/celeryctl +++ b/usr/celeryctl @@ -1,9 +1,9 @@ #!/home/tres/Code/Scribble/screnv/bin/python # EASY-INSTALL-ENTRY-SCRIPT: 'celery==3.0.24','console_scripts','celeryctl' -__requires__ = 'celery==3.0.24' +__requires__ = 'celery' import sys from pkg_resources import load_entry_point sys.exit( - load_entry_point('celery==3.0.24', 'console_scripts', 'celeryctl')() + load_entry_point('celery', 'console_scripts', 'celeryctl')() ) diff --git a/usr/celeryd b/usr/celeryd deleted file mode 100755 index af87eb7..0000000 --- a/usr/celeryd +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env python2.7 - -import os; activate_this=os.path.join(os.path.dirname('%svirtenv/bin/' % os.getenv('OPENSHIFT_PYTHON_DIR')), 'activate_this.py'); execfile(activate_this, dict(__file__=activate_this)); del os, activate_this - -# EASY-INSTALL-ENTRY-SCRIPT: 'celery==3.0.24','console_scripts','celeryd' -__requires__ = 'celery==3.0.24' -import sys -from pkg_resources import load_entry_point - -if __name__ == '__main__': - sys.exit( - load_entry_point('celery==3.0.24', 'console_scripts', 'celeryd')() - ) diff --git a/usr/celeryd-multi b/usr/celeryd-multi deleted file mode 100755 index 12adf31..0000000 --- a/usr/celeryd-multi +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env python2.7 - -import os; activate_this=os.path.join(os.path.dirname('%svirtenv/bin/' % os.getenv('OPENSHIFT_PYTHON_DIR')), 'activate_this.py'); execfile(activate_this, dict(__file__=activate_this)); del os, activate_this - -# EASY-INSTALL-ENTRY-SCRIPT: 'celery==3.0.24','console_scripts','celeryd-multi' -__requires__ = 'celery==3.0.24' -import sys -from pkg_resources import load_entry_point - -sys.exit( - load_entry_point('celery==3.0.24', 'console_scripts', 'celeryd-multi')() -) diff --git a/usr/celeryev b/usr/celeryev index ac9af3a..ead9de0 100755 --- a/usr/celeryev +++ b/usr/celeryev @@ -1,9 +1,9 @@ #!/home/tres/Code/Scribble/screnv/bin/python # EASY-INSTALL-ENTRY-SCRIPT: 'celery==3.0.21','console_scripts','celeryev' -__requires__ = 'celery==3.0.21' +__requires__ = 'celery' import sys from pkg_resources import load_entry_point sys.exit( - load_entry_point('celery==3.0.21', 'console_scripts', 'celeryev')() + load_entry_point('celery', 'console_scripts', 'celeryev')() )