Skip to content
49 changes: 40 additions & 9 deletions storm-core/src/clj/backtype/storm/daemon/drpc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
(:import [backtype.storm.daemon Shutdownable])
(:import [java.net InetAddress])
(:use [backtype.storm bootstrap config log])
(:use compojure.core)
(:use ring.middleware.reload)
(:use [ring.adapter.jetty :only [run-jetty]])
(:gen-class))

(bootstrap)
Expand Down Expand Up @@ -97,35 +100,63 @@
(.interrupt clear-thread))
)))

(defn handle-request [handler]
(fn [request]
(handler request)))

(defn webapp [handler]
(->(def http-routes
(routes
(GET "/drpc/:func/:args" [func args & m]
(.execute handler func args))
(GET "/drpc/:func/" [func & m]
(.execute handler func ""))
(GET "/drpc/:func" [func & m]
(.execute handler func ""))))
(wrap-reload '[backtype.storm.daemon.drpc])
handle-request))

(defn launch-server!
([]
(let [conf (read-storm-config)
worker-threads (int (conf DRPC-WORKER-THREADS))
worker-threads (int (conf DRPC-WORKER-THREADS))
queue-size (int (conf DRPC-QUEUE-SIZE))
service-handler (service-handler)
drpc-http-port (if (conf DRPC-HTTP-PORT) (int (conf DRPC-HTTP-PORT)) 0)
drpc-port (int (conf DRPC-PORT))
drpc-service-handler (service-handler)
;; requests and returns need to be on separate thread pools, since calls to
;; "execute" don't unblock until other thrift methods are called. So if
;; 64 threads are calling execute, the server won't accept the result
;; invocations that will unblock those threads
handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
handler-server (when (> drpc-port 0)
(THsHaServer. (-> (TNonblockingServerSocket. drpc-port)
(THsHaServer$Args.)
(.workerThreads 64)
(.executorService (ThreadPoolExecutor. worker-threads worker-threads
(.executorService (ThreadPoolExecutor. worker-threads worker-threads
60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size)))
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor (DistributedRPC$Processor. service-handler))
(.processor (DistributedRPC$Processor. drpc-service-handler))
))
)
invoke-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-INVOCATIONS-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor (DistributedRPCInvocations$Processor. service-handler))
(.processor (DistributedRPCInvocations$Processor. drpc-service-handler))
))]

(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server))))
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn []
(if handler-server (.stop handler-server))
(.stop invoke-server))))
(log-message "Starting Distributed RPC servers...")
(future (.serve invoke-server))
(.serve handler-server))))
(when (> drpc-http-port 0)
(run-jetty (webapp drpc-service-handler)
{:port drpc-http-port :join? false})
)
(when handler-server
(.serve handler-server)
)
)))

(defn -main []
(launch-server!))
6 changes: 6 additions & 0 deletions storm-core/src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ public class Config extends HashMap<String, Object> {
public static final String DRPC_SERVERS = "drpc.servers";
public static final Object DRPC_SERVERS_SCHEMA = ConfigValidation.StringsValidator;

/**
* This port is used by Storm DRPC for receiving HTTP DPRC requests from clients.
*/
public static final String DRPC_HTTP_PORT = "drpc.http.port";
public static final Object DRPC_HTTP_PORT_SCHEMA = Number.class;

/**
* This port is used by Storm DRPC for receiving DPRC requests from clients.
*/
Expand Down