Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ storm.zookeeper.retry.intervalceiling.millis: 30000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
storm.principal.tolocal: "backtype.storm.security.auth.DefaultPrincipalToLocal"
storm.messaging.transport: "backtype.storm.messaging.zmq"
storm.nimbus.retry.times: 5
storm.nimbus.retry.interval.millis: 2000
storm.nimbus.retry.intervalceiling.millis: 60000
storm.auth.simple-white-list.users: []

### nimbus.* configs are for the master
nimbus.host: "localhost"
nimbus.thrift.port: 6627
nimbus.thrift.threads: 64
nimbus.childopts: "-Xmx1024m"
nimbus.task.timeout.secs: 30
nimbus.supervisor.timeout.secs: 60
Expand Down
3 changes: 2 additions & 1 deletion storm-core/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
:target-path "target"

:profiles {:dev {:resource-paths ["src/dev"]
:dependencies [[org.mockito/mockito-all "1.9.5"]]}
:dependencies [[org.clojars.runa/conjure "2.1.1"]
[org.mockito/mockito-all "1.9.5"]]}
:release {}
:lib {}
}
Expand Down
4 changes: 2 additions & 2 deletions storm-core/src/clj/backtype/storm/LocalDRPC.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns backtype.storm.LocalDRPC
(:require [backtype.storm.daemon [drpc :as drpc]])
(:use [backtype.storm util])
(:use [backtype.storm config util])
(:import [backtype.storm.utils InprocMessaging ServiceRegistry])
(:gen-class
:init init
Expand All @@ -9,7 +9,7 @@
:state state ))

(defn -init []
(let [handler (drpc/service-handler)
(let [handler (drpc/service-handler (read-storm-config))
id (ServiceRegistry/registerService handler)
]
[[] {:service-id id :handler handler}]
Expand Down
3 changes: 2 additions & 1 deletion storm-core/src/clj/backtype/storm/bootstrap.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
RegisteredGlobalState ThriftTopologyUtils DisruptorQueue
MutableObject MutableLong]))
(import (quote [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer]))
(import (quote [backtype.storm.security.auth ThriftServer ThriftClient ReqContext]))
(import (quote [backtype.storm.spout ISpout SpoutOutputCollector ISpoutOutputCollector ShellSpout]))
(import (quote [backtype.storm.tuple Tuple TupleImpl Fields MessageId]))
(import (quote [backtype.storm.task IBolt IOutputCollector
Expand All @@ -37,7 +38,7 @@
TopologySummary ExecutorSummary ExecutorStats ExecutorSpecificStats
SpoutStats BoltStats ErrorInfo SupervisorSummary ExecutorInfo
KillOptions SubmitOptions RebalanceOptions JavaObject JavaObjectArg
TopologyInitialStatus]))
TopologyInitialStatus AuthorizationException]))
(import (quote [backtype.storm.daemon.common StormBase Assignment
SupervisorInfo WorkerHeartbeat]))
(import (quote [backtype.storm.grouping CustomStreamGrouping]))
Expand Down
12 changes: 12 additions & 0 deletions storm-core/src/clj/backtype/storm/daemon/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
(:import [backtype.storm.task WorkerTopologyContext])
(:import [backtype.storm Constants])
(:import [backtype.storm.metric SystemBolt])
(:import [backtype.storm.security.auth IAuthorizer])
(:require [clojure.set :as set])
(:require [backtype.storm.daemon.acker :as acker])
(:require [backtype.storm.thrift :as thrift])
Expand Down Expand Up @@ -332,3 +333,14 @@
(->> executor->node+port
(mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port])))
(into {})))


(defn mk-authorization-handler [klassname conf]
(let [aznClass (if klassname (Class/forName klassname))
aznHandler (if aznClass (.newInstance aznClass))]
(if aznHandler (.prepare ^IAuthorizer aznHandler conf))
(log-debug "authorization class name:" klassname
" class:" aznClass
" handler:" aznHandler)
aznHandler
))
55 changes: 33 additions & 22 deletions storm-core/src/clj/backtype/storm/daemon/drpc.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
(ns backtype.storm.daemon.drpc
(:import [org.apache.thrift7.server THsHaServer THsHaServer$Args])
(:import [org.apache.thrift7.protocol TBinaryProtocol TBinaryProtocol$Factory])
(:import [backtype.storm.security.auth ThriftServer ReqContext])
(:import [backtype.storm.security.auth.authorizer Audit])
(:import [org.apache.thrift7 TException])
(:import [org.apache.thrift7.transport TNonblockingServerTransport TNonblockingServerSocket])
(:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
DistributedRPCInvocations$Processor])
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
(:import [backtype.storm.daemon Shutdownable])
(:import [backtype.storm.security.auth IAuthorizer])
(:import [java.net InetAddress])
(:import [backtype.storm.generated AuthorizationException])
(:use [backtype.storm bootstrap config log])
(:use [backtype.storm.daemon common])
(:gen-class))

(bootstrap)
Expand All @@ -25,9 +27,19 @@
))
(@queues-atom function))

(defn check-authorization! [aclHandler storm-conf operation]
(let [ctxt (ReqContext/context)]
(log-debug "DRPC check-authorization with handler: " aclHandler)
(Audit/log ctxt operation storm-conf)
(if aclHandler
(if-not (.permit aclHandler ctxt operation storm-conf)
(throw (AuthorizationException. (str "DRPC request " operation " is not authorized")))
))))

;; TODO: change this to use TimeCacheMap
(defn service-handler []
(let [conf (read-storm-config)
(defn service-handler [conf]
(let [drpc-acl-handler (mk-authorization-handler (conf DRPC-AUTHORIZER) conf)
invocations-acl-handler (mk-authorization-handler (conf DRPC-INVOCATIONS-AUTHORIZER) conf)
ctr (atom 0)
id->sem (atom {})
id->result (atom {})
Expand All @@ -52,6 +64,7 @@
(reify DistributedRPC$Iface
(^String execute [this ^String function ^String args]
(log-debug "Received DRPC request for " function " " args " at " (System/currentTimeMillis))
(check-authorization! drpc-acl-handler conf "execute")
(let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
^Semaphore sem (Semaphore. 0)
req (DRPCRequest. args id)
Expand All @@ -72,19 +85,22 @@
))))
DistributedRPCInvocations$Iface
(^void result [this ^String id ^String result]
(check-authorization! invocations-acl-handler conf "result")
(let [^Semaphore sem (@id->sem id)]
(log-debug "Received result " result " for " id " at " (System/currentTimeMillis))
(when sem
(swap! id->result assoc id result)
(.release sem)
)))
(^void failRequest [this ^String id]
(check-authorization! invocations-acl-handler conf "failRequest")
(let [^Semaphore sem (@id->sem id)]
(when sem
(swap! id->result assoc id (DRPCExecutionException. "Request failed"))
(.release sem)
)))
(^DRPCRequest fetchRequest [this ^String func]
(check-authorization! invocations-acl-handler conf "fetchRequest")
(let [^ConcurrentLinkedQueue queue (acquire-queue request-queues func)
ret (.poll queue)]
(if ret
Expand All @@ -100,28 +116,23 @@
(defn launch-server!
([]
(let [conf (read-storm-config)
worker-threads (int (conf DRPC-WORKER-THREADS))
worker-threads (int (conf DRPC-WORKER-THREADS))
queue-size (int (conf DRPC-QUEUE-SIZE))
service-handler (service-handler)
service-handler (service-handler conf)
;; requests and returns need to be on separate thread pools, since calls to
;; "execute" don't unblock until other thrift methods are called. So if
;; 64 threads are calling execute, the server won't accept the result
;; invocations that will unblock those threads
handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.executorService (ThreadPoolExecutor. worker-threads worker-threads
60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size)))
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor (DistributedRPC$Processor. service-handler))
))
invoke-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-INVOCATIONS-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor (DistributedRPCInvocations$Processor. service-handler))
))]

handler-server (ThriftServer. conf
(DistributedRPC$Processor. service-handler)
(int (conf DRPC-PORT))
backtype.storm.Config$ThriftServerPurpose/DRPC
(ThreadPoolExecutor. worker-threads worker-threads
60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size)))
invoke-server (ThriftServer. conf
(DistributedRPCInvocations$Processor. service-handler)
(int (conf DRPC-INVOCATIONS-PORT))
backtype.storm.Config$ThriftServerPurpose/DRPC)]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server))))
(log-message "Starting Distributed RPC servers...")
(future (.serve invoke-server))
Expand Down
Loading