diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 7231b15d61..055eca6f5f 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -117,6 +117,7 @@ (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) + (remove-assignment! [this storm-id]) (remove-storm! [this storm-id]) (report-error [this storm-id task-id error]) (errors [this storm-id task-id]) @@ -327,6 +328,9 @@ (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)) ) + (remove-assignment! [this storm-id] + (delete-node cluster-state (assignment-path storm-id))) + (remove-storm! [this storm-id] (delete-node cluster-state (assignment-path storm-id)) (remove-storm-base! this storm-id)) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 04731dc8f3..e5b9ac7f88 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -328,6 +328,7 @@ ;; tracked through heartbeat-cache (defn- update-executor-cache [curr hb] (let [reported-time (:time-secs hb) + uptime (:uptime hb) {last-nimbus-time :nimbus-time last-reported-time :executor-reported-time} curr reported-time (cond reported-time reported-time @@ -339,7 +340,8 @@ last-nimbus-time )] {:nimbus-time nimbus-time - :executor-reported-time reported-time})) + :executor-reported-time reported-time + :uptime uptime})) (defn update-heartbeat-cache [cache executor-beats all-executors] (let [cache (select-keys cache all-executors)] @@ -381,20 +383,25 @@ (filter (fn [executor] (let [start-time (get executor-start-times executor) nimbus-time (-> heartbeats-cache (get executor) :nimbus-time)] + (log-debug "Exetutor " storm-id ":" executor " start-time: " start-time " nimbus-time: " nimbus-time) (if (and start-time - (or - (< (time-delta start-time) - (conf NIMBUS-TASK-LAUNCH-SECS)) - (not nimbus-time) - (< (time-delta nimbus-time) - (conf NIMBUS-TASK-TIMEOUT-SECS)) - )) + (if-not nimbus-time + (do + (log-debug "nimbus-time is nil, check nimbus if start time longer than NIMBUS-RECOIVER-HEARTBEART-SECS") + (< ((:uptime nimbus)) (conf NIMBUS-RECOVER-HEARTBEART-SECS))) + (do + (log-debug "nimbus-time is " nimbus-time " check executor heartbeat time") + (or + (< (time-delta start-time) + (conf NIMBUS-TASK-LAUNCH-SECS)) + (< (time-delta nimbus-time) + (conf NIMBUS-TASK-TIMEOUT-SECS)))))) true (do - (log-message "Executor " storm-id ":" executor " not alive") + (log-message "Executor " storm-id ":" executor " not alive, start-time is " (if start-time start-time "nil") " nimbus-time " (if nimbus-time nimbus-time "nil")) false)) ))) - doall))) + doall))) (defn- to-executor-id [task-ids] @@ -537,7 +544,7 @@ storm-cluster-state (:storm-cluster-state nimbus) topology->executors (compute-topology->executors nimbus (keys existing-assignments)) ;; update the executors heartbeats first. - _ (update-all-heartbeats! nimbus existing-assignments topology->executors) + ;;_ (update-all-heartbeats! nimbus existing-assignments topology->executors) topology->alive-executors (compute-topology->alive-executors nimbus existing-assignments topologies @@ -642,7 +649,8 @@ ;; for the topology which wants rebalance (specified by the scratch-topology-id) ;; we exclude its assignment, meaning that all the slots occupied by its assignment ;; will be treated as free slot in the scheduler code. - (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id)) + (if (and (not-nil? scratch-topology-id) (= tid scratch-topology-id)) + (.remove-assignment! storm-cluster-state tid) {tid (.assignment-info storm-cluster-state tid nil)}))) ;; make the new assignments for topologies topology->executor->node+port (compute-new-topology->executor->node+port @@ -974,6 +982,27 @@ (transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true) )) + (workerHeartBeat [this storm-id work-id port executors uptime hbtime stats] + (let [hb { :storm-id storm-id + :time-secs hbtime + :uptime uptime + } + node-port (str work-id "-" port) + byte-len (.remaining stats) + data (byte-array byte-len) + _ (.get stats data) + executor-stats (Utils/deserialize data) + cache (@(:heartbeats-cache nimbus) storm-id) + cache (if cache cache {}) + newcache (into {} + (for [executor executors :let [curr (cache executor)]] + [executor (merge (update-executor-cache curr hb) {:stats (get executor-stats executor)})])) + ] + (log-debug "worker heartbeat storm-id: " storm-id " worker-id: " work-id " port: " port "executors: " executors " uptime: " uptime " hbtime: " hbtime) + + (swap! (:heartbeats-cache nimbus) assoc storm-id (merge cache newcache)) + )) + (activate [this storm-name] (transition-name! nimbus storm-name :activate true) ) @@ -1087,7 +1116,7 @@ task->component (storm-task-info (try-read-storm-topology conf storm-id) (try-read-storm-conf conf storm-id)) base (.storm-base storm-cluster-state storm-id nil) assignment (.assignment-info storm-cluster-state storm-id nil) - beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment)) + beats (@(:heartbeats-cache nimbus) storm-id) all-components (-> task->component reverse-map keys) errors (->> all-components (map (fn [c] [c (get-errors storm-cluster-state storm-id c)])) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 5182027c29..75ddaa8985 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -5,6 +5,8 @@ (:import [java.util.concurrent Executors]) (:import [backtype.storm.messaging TransportFactory]) (:import [backtype.storm.messaging IContext IConnection]) + (:import [java.nio ByteBuffer]) + (:import [backtype.storm.utils WorkerHbProxy]) (:gen-class)) (bootstrap) @@ -29,13 +31,19 @@ (->> executors (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)})) (apply merge))) - zk-hb {:storm-id (:storm-id worker) - :executor-stats stats - :uptime ((:uptime worker)) - :time-secs (current-time-secs) - }] + storm-id (:storm-id worker) + hb-proxy (:heartbeat-proxy worker) + assignment-id (:assignment-id worker) + port (:port worker) + uptime ((:uptime worker)) + time-secs (current-time-secs) + executors (:executors worker) + stats-ser (Utils/serialize stats)] + ;;do socket heartbeat + (log-message "stats key type:" (map type executors)) + (.workerHeartBeat hb-proxy storm-id assignment-id port executors uptime time-secs (ByteBuffer/wrap stats-ser)) ;; do the zookeeper heartbeat - (.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb) + ;(.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb) )) (defn do-heartbeat [worker] @@ -204,6 +212,7 @@ :user-shared-resources (mk-user-resources <>) :transfer-local-fn (mk-transfer-local-fn <>) :transfer-fn (mk-transfer-fn <>) + :heartbeat-proxy (WorkerHbProxy. (conf NIMBUS-HOST) (conf NIMBUS-THRIFT-PORT) (HashMap. conf)) ))) (defn- endpoint->string [[node port]] diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 564ad0d330..3940596b3f 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -208,6 +208,11 @@ public class Config extends HashMap { public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs"; public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = Number.class; + /** + * How long nimbus start wait for executor heartbeat connect + */ + public static String NIMBUS_RECOVER_HEARTBEART_SECS = "nimbus.recover.heartbeat.secs"; + public static final Object NIMBUS_RECOVER_HEARTBEART_SECS_SCHEMA = Number.class; /** * How often nimbus should wake up to check heartbeats and do reassignments. Note diff --git a/storm-core/src/jvm/backtype/storm/Constants.java b/storm-core/src/jvm/backtype/storm/Constants.java index a8ade3c53a..0a3c53b12e 100644 --- a/storm-core/src/jvm/backtype/storm/Constants.java +++ b/storm-core/src/jvm/backtype/storm/Constants.java @@ -1,14 +1,16 @@ package backtype.storm; import backtype.storm.coordination.CoordinatedBolt; -import clojure.lang.RT; + +import java.util.ArrayList; +import java.util.Arrays; public class Constants { - public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; + public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; public static final long SYSTEM_TASK_ID = -1; - public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]"); + public static final ArrayList SYSTEM_EXECUTOR_ID = new ArrayList(Arrays.asList(-1,-1)); public static final String SYSTEM_COMPONENT_ID = "__system"; public static final String SYSTEM_TICK_STREAM_ID = "__tick"; public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics"; diff --git a/storm-core/src/jvm/backtype/storm/generated/Nimbus.java b/storm-core/src/jvm/backtype/storm/generated/Nimbus.java index 6a8592a84e..1156582d8d 100644 --- a/storm-core/src/jvm/backtype/storm/generated/Nimbus.java +++ b/storm-core/src/jvm/backtype/storm/generated/Nimbus.java @@ -39,6 +39,8 @@ public interface Iface { public void rebalance(String name, RebalanceOptions options) throws NotAliveException, InvalidTopologyException, org.apache.thrift7.TException; + public void workerHeartBeat(String stormId, String workerId, int port, Set> executors, long upTime, long HBTime, ByteBuffer stats) throws org.apache.thrift7.TException; + public String beginFileUpload() throws org.apache.thrift7.TException; public void uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift7.TException; @@ -79,6 +81,8 @@ public interface AsyncIface { public void rebalance(String name, RebalanceOptions options, org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException; + public void workerHeartBeat(String stormId, String workerId, int port, Set> executors, long upTime, long HBTime, ByteBuffer stats, org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException; + public void beginFileUpload(org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException; public void uploadChunk(String location, ByteBuffer chunk, org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException; @@ -302,6 +306,32 @@ public void recv_rebalance() throws NotAliveException, InvalidTopologyException, return; } + public void workerHeartBeat(String stormId, String workerId, int port, Set> executors, long upTime, long HBTime, ByteBuffer stats) throws org.apache.thrift7.TException + { + send_workerHeartBeat(stormId, workerId, port, executors, upTime, HBTime, stats); + recv_workerHeartBeat(); + } + + public void send_workerHeartBeat(String stormId, String workerId, int port, Set> executors, long upTime, long HBTime, ByteBuffer stats) throws org.apache.thrift7.TException + { + workerHeartBeat_args args = new workerHeartBeat_args(); + args.set_stormId(stormId); + args.set_workerId(workerId); + args.set_port(port); + args.set_executors(executors); + args.set_upTime(upTime); + args.set_HBTime(HBTime); + args.set_stats(stats); + sendBase("workerHeartBeat", args); + } + + public void recv_workerHeartBeat() throws org.apache.thrift7.TException + { + workerHeartBeat_result result = new workerHeartBeat_result(); + receiveBase(result, "workerHeartBeat"); + return; + } + public String beginFileUpload() throws org.apache.thrift7.TException { send_beginFileUpload(); @@ -828,6 +858,56 @@ public void getResult() throws NotAliveException, InvalidTopologyException, org. } } + public void workerHeartBeat(String stormId, String workerId, int port, Set> executors, long upTime, long HBTime, ByteBuffer stats, org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException { + checkReady(); + workerHeartBeat_call method_call = new workerHeartBeat_call(stormId, workerId, port, executors, upTime, HBTime, stats, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class workerHeartBeat_call extends org.apache.thrift7.async.TAsyncMethodCall { + private String stormId; + private String workerId; + private int port; + private Set> executors; + private long upTime; + private long HBTime; + private ByteBuffer stats; + public workerHeartBeat_call(String stormId, String workerId, int port, Set> executors, long upTime, long HBTime, ByteBuffer stats, org.apache.thrift7.async.AsyncMethodCallback resultHandler, org.apache.thrift7.async.TAsyncClient client, org.apache.thrift7.protocol.TProtocolFactory protocolFactory, org.apache.thrift7.transport.TNonblockingTransport transport) throws org.apache.thrift7.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.stormId = stormId; + this.workerId = workerId; + this.port = port; + this.executors = executors; + this.upTime = upTime; + this.HBTime = HBTime; + this.stats = stats; + } + + public void write_args(org.apache.thrift7.protocol.TProtocol prot) throws org.apache.thrift7.TException { + prot.writeMessageBegin(new org.apache.thrift7.protocol.TMessage("workerHeartBeat", org.apache.thrift7.protocol.TMessageType.CALL, 0)); + workerHeartBeat_args args = new workerHeartBeat_args(); + args.set_stormId(stormId); + args.set_workerId(workerId); + args.set_port(port); + args.set_executors(executors); + args.set_upTime(upTime); + args.set_HBTime(HBTime); + args.set_stats(stats); + args.write(prot); + prot.writeMessageEnd(); + } + + public void getResult() throws org.apache.thrift7.TException { + if (getState() != org.apache.thrift7.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new IllegalStateException("Method call not finished!"); + } + org.apache.thrift7.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift7.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift7.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + (new Client(prot)).recv_workerHeartBeat(); + } + } + public void beginFileUpload(org.apache.thrift7.async.AsyncMethodCallback resultHandler) throws org.apache.thrift7.TException { checkReady(); beginFileUpload_call method_call = new beginFileUpload_call(resultHandler, this, ___protocolFactory, ___transport); @@ -1194,6 +1274,7 @@ protected Processor(I iface, Map extends org.apache.thrift7.ProcessFunction { + public workerHeartBeat() { + super("workerHeartBeat"); + } + + protected workerHeartBeat_args getEmptyArgsInstance() { + return new workerHeartBeat_args(); + } + + protected workerHeartBeat_result getResult(I iface, workerHeartBeat_args args) throws org.apache.thrift7.TException { + workerHeartBeat_result result = new workerHeartBeat_result(); + iface.workerHeartBeat(args.stormId, args.workerId, args.port, args.executors, args.upTime, args.HBTime, args.stats); + return result; + } + } + private static class beginFileUpload extends org.apache.thrift7.ProcessFunction { public beginFileUpload() { super("beginFileUpload"); @@ -6883,6 +6980,1125 @@ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException } + public static class workerHeartBeat_args implements org.apache.thrift7.TBase, java.io.Serializable, Cloneable { + private static final org.apache.thrift7.protocol.TStruct STRUCT_DESC = new org.apache.thrift7.protocol.TStruct("workerHeartBeat_args"); + + private static final org.apache.thrift7.protocol.TField STORM_ID_FIELD_DESC = new org.apache.thrift7.protocol.TField("stormId", org.apache.thrift7.protocol.TType.STRING, (short)1); + private static final org.apache.thrift7.protocol.TField WORKER_ID_FIELD_DESC = new org.apache.thrift7.protocol.TField("workerId", org.apache.thrift7.protocol.TType.STRING, (short)2); + private static final org.apache.thrift7.protocol.TField PORT_FIELD_DESC = new org.apache.thrift7.protocol.TField("port", org.apache.thrift7.protocol.TType.I32, (short)3); + private static final org.apache.thrift7.protocol.TField EXECUTORS_FIELD_DESC = new org.apache.thrift7.protocol.TField("executors", org.apache.thrift7.protocol.TType.SET, (short)4); + private static final org.apache.thrift7.protocol.TField UP_TIME_FIELD_DESC = new org.apache.thrift7.protocol.TField("upTime", org.apache.thrift7.protocol.TType.I64, (short)5); + private static final org.apache.thrift7.protocol.TField HBTIME_FIELD_DESC = new org.apache.thrift7.protocol.TField("HBTime", org.apache.thrift7.protocol.TType.I64, (short)6); + private static final org.apache.thrift7.protocol.TField STATS_FIELD_DESC = new org.apache.thrift7.protocol.TField("stats", org.apache.thrift7.protocol.TType.STRING, (short)7); + + private String stormId; // required + private String workerId; // required + private int port; // required + private Set> executors; // required + private long upTime; // required + private long HBTime; // required + private ByteBuffer stats; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift7.TFieldIdEnum { + STORM_ID((short)1, "stormId"), + WORKER_ID((short)2, "workerId"), + PORT((short)3, "port"), + EXECUTORS((short)4, "executors"), + UP_TIME((short)5, "upTime"), + HBTIME((short)6, "HBTime"), + STATS((short)7, "stats"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // STORM_ID + return STORM_ID; + case 2: // WORKER_ID + return WORKER_ID; + case 3: // PORT + return PORT; + case 4: // EXECUTORS + return EXECUTORS; + case 5: // UP_TIME + return UP_TIME; + case 6: // HBTIME + return HBTIME; + case 7: // STATS + return STATS; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __PORT_ISSET_ID = 0; + private static final int __UPTIME_ISSET_ID = 1; + private static final int __HBTIME_ISSET_ID = 2; + private BitSet __isset_bit_vector = new BitSet(3); + + public static final Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift7.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.STORM_ID, new org.apache.thrift7.meta_data.FieldMetaData("stormId", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING))); + tmpMap.put(_Fields.WORKER_ID, new org.apache.thrift7.meta_data.FieldMetaData("workerId", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING))); + tmpMap.put(_Fields.PORT, new org.apache.thrift7.meta_data.FieldMetaData("port", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.I32))); + tmpMap.put(_Fields.EXECUTORS, new org.apache.thrift7.meta_data.FieldMetaData("executors", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.SetMetaData(org.apache.thrift7.protocol.TType.SET, + new org.apache.thrift7.meta_data.ListMetaData(org.apache.thrift7.protocol.TType.LIST, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.I32))))); + tmpMap.put(_Fields.UP_TIME, new org.apache.thrift7.meta_data.FieldMetaData("upTime", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.I64))); + tmpMap.put(_Fields.HBTIME, new org.apache.thrift7.meta_data.FieldMetaData("HBTime", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.I64))); + tmpMap.put(_Fields.STATS, new org.apache.thrift7.meta_data.FieldMetaData("stats", org.apache.thrift7.TFieldRequirementType.DEFAULT, + new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING , true))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift7.meta_data.FieldMetaData.addStructMetaDataMap(workerHeartBeat_args.class, metaDataMap); + } + + public workerHeartBeat_args() { + } + + public workerHeartBeat_args( + String stormId, + String workerId, + int port, + Set> executors, + long upTime, + long HBTime, + ByteBuffer stats) + { + this(); + this.stormId = stormId; + this.workerId = workerId; + this.port = port; + set_port_isSet(true); + this.executors = executors; + this.upTime = upTime; + set_upTime_isSet(true); + this.HBTime = HBTime; + set_HBTime_isSet(true); + this.stats = stats; + } + + /** + * Performs a deep copy on other. + */ + public workerHeartBeat_args(workerHeartBeat_args other) { + __isset_bit_vector.clear(); + __isset_bit_vector.or(other.__isset_bit_vector); + if (other.is_set_stormId()) { + this.stormId = other.stormId; + } + if (other.is_set_workerId()) { + this.workerId = other.workerId; + } + this.port = other.port; + if (other.is_set_executors()) { + Set> __this__executors = new HashSet>(); + for (List other_element : other.executors) { + List __this__executors_copy = new ArrayList(); + for (Integer other_element_element : other_element) { + __this__executors_copy.add(other_element_element); + } + __this__executors.add(__this__executors_copy); + } + this.executors = __this__executors; + } + this.upTime = other.upTime; + this.HBTime = other.HBTime; + if (other.is_set_stats()) { + this.stats = org.apache.thrift7.TBaseHelper.copyBinary(other.stats); +; + } + } + + public workerHeartBeat_args deepCopy() { + return new workerHeartBeat_args(this); + } + + @Override + public void clear() { + this.stormId = null; + this.workerId = null; + set_port_isSet(false); + this.port = 0; + this.executors = null; + set_upTime_isSet(false); + this.upTime = 0; + set_HBTime_isSet(false); + this.HBTime = 0; + this.stats = null; + } + + public String get_stormId() { + return this.stormId; + } + + public void set_stormId(String stormId) { + this.stormId = stormId; + } + + public void unset_stormId() { + this.stormId = null; + } + + /** Returns true if field stormId is set (has been assigned a value) and false otherwise */ + public boolean is_set_stormId() { + return this.stormId != null; + } + + public void set_stormId_isSet(boolean value) { + if (!value) { + this.stormId = null; + } + } + + public String get_workerId() { + return this.workerId; + } + + public void set_workerId(String workerId) { + this.workerId = workerId; + } + + public void unset_workerId() { + this.workerId = null; + } + + /** Returns true if field workerId is set (has been assigned a value) and false otherwise */ + public boolean is_set_workerId() { + return this.workerId != null; + } + + public void set_workerId_isSet(boolean value) { + if (!value) { + this.workerId = null; + } + } + + public int get_port() { + return this.port; + } + + public void set_port(int port) { + this.port = port; + set_port_isSet(true); + } + + public void unset_port() { + __isset_bit_vector.clear(__PORT_ISSET_ID); + } + + /** Returns true if field port is set (has been assigned a value) and false otherwise */ + public boolean is_set_port() { + return __isset_bit_vector.get(__PORT_ISSET_ID); + } + + public void set_port_isSet(boolean value) { + __isset_bit_vector.set(__PORT_ISSET_ID, value); + } + + public int get_executors_size() { + return (this.executors == null) ? 0 : this.executors.size(); + } + + public java.util.Iterator> get_executors_iterator() { + return (this.executors == null) ? null : this.executors.iterator(); + } + + public void add_to_executors(List elem) { + if (this.executors == null) { + this.executors = new HashSet>(); + } + this.executors.add(elem); + } + + public Set> get_executors() { + return this.executors; + } + + public void set_executors(Set> executors) { + this.executors = executors; + } + + public void unset_executors() { + this.executors = null; + } + + /** Returns true if field executors is set (has been assigned a value) and false otherwise */ + public boolean is_set_executors() { + return this.executors != null; + } + + public void set_executors_isSet(boolean value) { + if (!value) { + this.executors = null; + } + } + + public long get_upTime() { + return this.upTime; + } + + public void set_upTime(long upTime) { + this.upTime = upTime; + set_upTime_isSet(true); + } + + public void unset_upTime() { + __isset_bit_vector.clear(__UPTIME_ISSET_ID); + } + + /** Returns true if field upTime is set (has been assigned a value) and false otherwise */ + public boolean is_set_upTime() { + return __isset_bit_vector.get(__UPTIME_ISSET_ID); + } + + public void set_upTime_isSet(boolean value) { + __isset_bit_vector.set(__UPTIME_ISSET_ID, value); + } + + public long get_HBTime() { + return this.HBTime; + } + + public void set_HBTime(long HBTime) { + this.HBTime = HBTime; + set_HBTime_isSet(true); + } + + public void unset_HBTime() { + __isset_bit_vector.clear(__HBTIME_ISSET_ID); + } + + /** Returns true if field HBTime is set (has been assigned a value) and false otherwise */ + public boolean is_set_HBTime() { + return __isset_bit_vector.get(__HBTIME_ISSET_ID); + } + + public void set_HBTime_isSet(boolean value) { + __isset_bit_vector.set(__HBTIME_ISSET_ID, value); + } + + public byte[] get_stats() { + set_stats(org.apache.thrift7.TBaseHelper.rightSize(stats)); + return stats == null ? null : stats.array(); + } + + public ByteBuffer buffer_for_stats() { + return stats; + } + + public void set_stats(byte[] stats) { + set_stats(stats == null ? (ByteBuffer)null : ByteBuffer.wrap(stats)); + } + + public void set_stats(ByteBuffer stats) { + this.stats = stats; + } + + public void unset_stats() { + this.stats = null; + } + + /** Returns true if field stats is set (has been assigned a value) and false otherwise */ + public boolean is_set_stats() { + return this.stats != null; + } + + public void set_stats_isSet(boolean value) { + if (!value) { + this.stats = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case STORM_ID: + if (value == null) { + unset_stormId(); + } else { + set_stormId((String)value); + } + break; + + case WORKER_ID: + if (value == null) { + unset_workerId(); + } else { + set_workerId((String)value); + } + break; + + case PORT: + if (value == null) { + unset_port(); + } else { + set_port((Integer)value); + } + break; + + case EXECUTORS: + if (value == null) { + unset_executors(); + } else { + set_executors((Set>)value); + } + break; + + case UP_TIME: + if (value == null) { + unset_upTime(); + } else { + set_upTime((Long)value); + } + break; + + case HBTIME: + if (value == null) { + unset_HBTime(); + } else { + set_HBTime((Long)value); + } + break; + + case STATS: + if (value == null) { + unset_stats(); + } else { + set_stats((ByteBuffer)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case STORM_ID: + return get_stormId(); + + case WORKER_ID: + return get_workerId(); + + case PORT: + return Integer.valueOf(get_port()); + + case EXECUTORS: + return get_executors(); + + case UP_TIME: + return Long.valueOf(get_upTime()); + + case HBTIME: + return Long.valueOf(get_HBTime()); + + case STATS: + return get_stats(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case STORM_ID: + return is_set_stormId(); + case WORKER_ID: + return is_set_workerId(); + case PORT: + return is_set_port(); + case EXECUTORS: + return is_set_executors(); + case UP_TIME: + return is_set_upTime(); + case HBTIME: + return is_set_HBTime(); + case STATS: + return is_set_stats(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof workerHeartBeat_args) + return this.equals((workerHeartBeat_args)that); + return false; + } + + public boolean equals(workerHeartBeat_args that) { + if (that == null) + return false; + + boolean this_present_stormId = true && this.is_set_stormId(); + boolean that_present_stormId = true && that.is_set_stormId(); + if (this_present_stormId || that_present_stormId) { + if (!(this_present_stormId && that_present_stormId)) + return false; + if (!this.stormId.equals(that.stormId)) + return false; + } + + boolean this_present_workerId = true && this.is_set_workerId(); + boolean that_present_workerId = true && that.is_set_workerId(); + if (this_present_workerId || that_present_workerId) { + if (!(this_present_workerId && that_present_workerId)) + return false; + if (!this.workerId.equals(that.workerId)) + return false; + } + + boolean this_present_port = true; + boolean that_present_port = true; + if (this_present_port || that_present_port) { + if (!(this_present_port && that_present_port)) + return false; + if (this.port != that.port) + return false; + } + + boolean this_present_executors = true && this.is_set_executors(); + boolean that_present_executors = true && that.is_set_executors(); + if (this_present_executors || that_present_executors) { + if (!(this_present_executors && that_present_executors)) + return false; + if (!this.executors.equals(that.executors)) + return false; + } + + boolean this_present_upTime = true; + boolean that_present_upTime = true; + if (this_present_upTime || that_present_upTime) { + if (!(this_present_upTime && that_present_upTime)) + return false; + if (this.upTime != that.upTime) + return false; + } + + boolean this_present_HBTime = true; + boolean that_present_HBTime = true; + if (this_present_HBTime || that_present_HBTime) { + if (!(this_present_HBTime && that_present_HBTime)) + return false; + if (this.HBTime != that.HBTime) + return false; + } + + boolean this_present_stats = true && this.is_set_stats(); + boolean that_present_stats = true && that.is_set_stats(); + if (this_present_stats || that_present_stats) { + if (!(this_present_stats && that_present_stats)) + return false; + if (!this.stats.equals(that.stats)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + HashCodeBuilder builder = new HashCodeBuilder(); + + boolean present_stormId = true && (is_set_stormId()); + builder.append(present_stormId); + if (present_stormId) + builder.append(stormId); + + boolean present_workerId = true && (is_set_workerId()); + builder.append(present_workerId); + if (present_workerId) + builder.append(workerId); + + boolean present_port = true; + builder.append(present_port); + if (present_port) + builder.append(port); + + boolean present_executors = true && (is_set_executors()); + builder.append(present_executors); + if (present_executors) + builder.append(executors); + + boolean present_upTime = true; + builder.append(present_upTime); + if (present_upTime) + builder.append(upTime); + + boolean present_HBTime = true; + builder.append(present_HBTime); + if (present_HBTime) + builder.append(HBTime); + + boolean present_stats = true && (is_set_stats()); + builder.append(present_stats); + if (present_stats) + builder.append(stats); + + return builder.toHashCode(); + } + + public int compareTo(workerHeartBeat_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + workerHeartBeat_args typedOther = (workerHeartBeat_args)other; + + lastComparison = Boolean.valueOf(is_set_stormId()).compareTo(typedOther.is_set_stormId()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_stormId()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.stormId, typedOther.stormId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_workerId()).compareTo(typedOther.is_set_workerId()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_workerId()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.workerId, typedOther.workerId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_port()).compareTo(typedOther.is_set_port()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_port()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.port, typedOther.port); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_executors()).compareTo(typedOther.is_set_executors()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_executors()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.executors, typedOther.executors); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_upTime()).compareTo(typedOther.is_set_upTime()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_upTime()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.upTime, typedOther.upTime); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_HBTime()).compareTo(typedOther.is_set_HBTime()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_HBTime()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.HBTime, typedOther.HBTime); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_stats()).compareTo(typedOther.is_set_stats()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_stats()) { + lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.stats, typedOther.stats); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift7.protocol.TProtocol iprot) throws org.apache.thrift7.TException { + org.apache.thrift7.protocol.TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == org.apache.thrift7.protocol.TType.STOP) { + break; + } + switch (field.id) { + case 1: // STORM_ID + if (field.type == org.apache.thrift7.protocol.TType.STRING) { + this.stormId = iprot.readString(); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: // WORKER_ID + if (field.type == org.apache.thrift7.protocol.TType.STRING) { + this.workerId = iprot.readString(); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 3: // PORT + if (field.type == org.apache.thrift7.protocol.TType.I32) { + this.port = iprot.readI32(); + set_port_isSet(true); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 4: // EXECUTORS + if (field.type == org.apache.thrift7.protocol.TType.SET) { + { + org.apache.thrift7.protocol.TSet _set163 = iprot.readSetBegin(); + this.executors = new HashSet>(2*_set163.size); + for (int _i164 = 0; _i164 < _set163.size; ++_i164) + { + List _elem165; // required + { + org.apache.thrift7.protocol.TList _list166 = iprot.readListBegin(); + _elem165 = new ArrayList(_list166.size); + for (int _i167 = 0; _i167 < _list166.size; ++_i167) + { + int _elem168; // required + _elem168 = iprot.readI32(); + _elem165.add(_elem168); + } + iprot.readListEnd(); + } + this.executors.add(_elem165); + } + iprot.readSetEnd(); + } + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 5: // UP_TIME + if (field.type == org.apache.thrift7.protocol.TType.I64) { + this.upTime = iprot.readI64(); + set_upTime_isSet(true); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 6: // HBTIME + if (field.type == org.apache.thrift7.protocol.TType.I64) { + this.HBTime = iprot.readI64(); + set_HBTime_isSet(true); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 7: // STATS + if (field.type == org.apache.thrift7.protocol.TType.STRING) { + this.stats = iprot.readBinary(); + } else { + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + default: + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + validate(); + } + + public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache.thrift7.TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (this.stormId != null) { + oprot.writeFieldBegin(STORM_ID_FIELD_DESC); + oprot.writeString(this.stormId); + oprot.writeFieldEnd(); + } + if (this.workerId != null) { + oprot.writeFieldBegin(WORKER_ID_FIELD_DESC); + oprot.writeString(this.workerId); + oprot.writeFieldEnd(); + } + oprot.writeFieldBegin(PORT_FIELD_DESC); + oprot.writeI32(this.port); + oprot.writeFieldEnd(); + if (this.executors != null) { + oprot.writeFieldBegin(EXECUTORS_FIELD_DESC); + { + oprot.writeSetBegin(new org.apache.thrift7.protocol.TSet(org.apache.thrift7.protocol.TType.LIST, this.executors.size())); + for (List _iter169 : this.executors) + { + { + oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.I32, _iter169.size())); + for (int _iter170 : _iter169) + { + oprot.writeI32(_iter170); + } + oprot.writeListEnd(); + } + } + oprot.writeSetEnd(); + } + oprot.writeFieldEnd(); + } + oprot.writeFieldBegin(UP_TIME_FIELD_DESC); + oprot.writeI64(this.upTime); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(HBTIME_FIELD_DESC); + oprot.writeI64(this.HBTime); + oprot.writeFieldEnd(); + if (this.stats != null) { + oprot.writeFieldBegin(STATS_FIELD_DESC); + oprot.writeBinary(this.stats); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("workerHeartBeat_args("); + boolean first = true; + + sb.append("stormId:"); + if (this.stormId == null) { + sb.append("null"); + } else { + sb.append(this.stormId); + } + first = false; + if (!first) sb.append(", "); + sb.append("workerId:"); + if (this.workerId == null) { + sb.append("null"); + } else { + sb.append(this.workerId); + } + first = false; + if (!first) sb.append(", "); + sb.append("port:"); + sb.append(this.port); + first = false; + if (!first) sb.append(", "); + sb.append("executors:"); + if (this.executors == null) { + sb.append("null"); + } else { + sb.append(this.executors); + } + first = false; + if (!first) sb.append(", "); + sb.append("upTime:"); + sb.append(this.upTime); + first = false; + if (!first) sb.append(", "); + sb.append("HBTime:"); + sb.append(this.HBTime); + first = false; + if (!first) sb.append(", "); + sb.append("stats:"); + if (this.stats == null) { + sb.append("null"); + } else { + org.apache.thrift7.TBaseHelper.toString(this.stats, sb); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift7.TException { + // check for required fields + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift7.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); + read(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift7.TException te) { + throw new java.io.IOException(te); + } + } + + } + + public static class workerHeartBeat_result implements org.apache.thrift7.TBase, java.io.Serializable, Cloneable { + private static final org.apache.thrift7.protocol.TStruct STRUCT_DESC = new org.apache.thrift7.protocol.TStruct("workerHeartBeat_result"); + + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift7.TFieldIdEnum { +; + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + public static final Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift7.meta_data.FieldMetaData>(_Fields.class); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift7.meta_data.FieldMetaData.addStructMetaDataMap(workerHeartBeat_result.class, metaDataMap); + } + + public workerHeartBeat_result() { + } + + /** + * Performs a deep copy on other. + */ + public workerHeartBeat_result(workerHeartBeat_result other) { + } + + public workerHeartBeat_result deepCopy() { + return new workerHeartBeat_result(this); + } + + @Override + public void clear() { + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof workerHeartBeat_result) + return this.equals((workerHeartBeat_result)that); + return false; + } + + public boolean equals(workerHeartBeat_result that) { + if (that == null) + return false; + + return true; + } + + @Override + public int hashCode() { + HashCodeBuilder builder = new HashCodeBuilder(); + + return builder.toHashCode(); + } + + public int compareTo(workerHeartBeat_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + workerHeartBeat_result typedOther = (workerHeartBeat_result)other; + + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift7.protocol.TProtocol iprot) throws org.apache.thrift7.TException { + org.apache.thrift7.protocol.TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == org.apache.thrift7.protocol.TType.STOP) { + break; + } + switch (field.id) { + default: + org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + validate(); + } + + public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache.thrift7.TException { + oprot.writeStructBegin(STRUCT_DESC); + + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("workerHeartBeat_result("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift7.TException { + // check for required fields + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift7.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift7.TException te) { + throw new java.io.IOException(te); + } + } + + } + public static class beginFileUpload_args implements org.apache.thrift7.TBase, java.io.Serializable, Cloneable { private static final org.apache.thrift7.protocol.TStruct STRUCT_DESC = new org.apache.thrift7.protocol.TStruct("beginFileUpload_args"); diff --git a/storm-core/src/jvm/backtype/storm/utils/WorkerHbProxy.java b/storm-core/src/jvm/backtype/storm/utils/WorkerHbProxy.java new file mode 100644 index 0000000000..c7837df961 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/WorkerHbProxy.java @@ -0,0 +1,61 @@ +package backtype.storm.utils; + +import backtype.storm.generated.Nimbus; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.thrift7.TException; +import org.apache.thrift7.protocol.TBinaryProtocol; +import org.apache.thrift7.transport.TFramedTransport; +import org.apache.thrift7.transport.TSocket; +import org.apache.thrift7.transport.TTransport; + +import java.lang.Integer; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class WorkerHbProxy { + public static final Log LOG = LogFactory.getLog(WorkerHbProxy.class); + private TTransport transport = null; + private Nimbus.Client client = null; + private String host; + private int port; + private Map stormMap; + + public WorkerHbProxy(String host,int port, Map stormMap) { + this.host = host; + this.port = port; + this.stormMap = stormMap; + } + + private void init() throws TException { + if (transport == null) { + LOG.info("Connecting to Nimbus at " + host + ":" + port); + transport = new TFramedTransport(new TSocket(host, port)); + client = new Nimbus.Client(new TBinaryProtocol(transport)); + transport.open(); + } + } + + private void close() { + if (transport != null) { + transport.close(); + transport = null; + client = null; + } + } + + public void workerHeartBeat(String stormId, String workerId, int port, + Set> executors, long upTime, long HBTime,ByteBuffer buf) { + LOG.info("do workerHeartBeat"); + try { + init(); + client.workerHeartBeat(stormId, workerId, port, executors, upTime, + HBTime,buf); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + close(); + } + } +} diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote index 4b2ff041a3..1035ba495f 100755 --- a/storm-core/src/py/storm/Nimbus-remote +++ b/storm-core/src/py/storm/Nimbus-remote @@ -28,6 +28,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help': print ' void activate(string name)' print ' void deactivate(string name)' print ' void rebalance(string name, RebalanceOptions options)' + print ' void workerHeartBeat(string stormId, string workerId, i32 port, executors, i64 upTime, i64 HBTime, string stats)' print ' string beginFileUpload()' print ' void uploadChunk(string location, string chunk)' print ' void finishFileUpload(string location)' @@ -131,6 +132,12 @@ elif cmd == 'rebalance': sys.exit(1) pp.pprint(client.rebalance(args[0],eval(args[1]),)) +elif cmd == 'workerHeartBeat': + if len(args) != 7: + print 'workerHeartBeat requires 7 args' + sys.exit(1) + pp.pprint(client.workerHeartBeat(args[0],args[1],eval(args[2]),eval(args[3]),eval(args[4]),eval(args[5]),args[6],)) + elif cmd == 'beginFileUpload': if len(args) != 0: print 'beginFileUpload requires 0 args' diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py index cd535be73a..feaa733c45 100644 --- a/storm-core/src/py/storm/Nimbus.py +++ b/storm-core/src/py/storm/Nimbus.py @@ -74,6 +74,19 @@ def rebalance(self, name, options): """ pass + def workerHeartBeat(self, stormId, workerId, port, executors, upTime, HBTime, stats): + """ + Parameters: + - stormId + - workerId + - port + - executors + - upTime + - HBTime + - stats + """ + pass + def beginFileUpload(self, ): pass @@ -382,6 +395,46 @@ def recv_rebalance(self, ): raise result.ite return + def workerHeartBeat(self, stormId, workerId, port, executors, upTime, HBTime, stats): + """ + Parameters: + - stormId + - workerId + - port + - executors + - upTime + - HBTime + - stats + """ + self.send_workerHeartBeat(stormId, workerId, port, executors, upTime, HBTime, stats) + self.recv_workerHeartBeat() + + def send_workerHeartBeat(self, stormId, workerId, port, executors, upTime, HBTime, stats): + self._oprot.writeMessageBegin('workerHeartBeat', TMessageType.CALL, self._seqid) + args = workerHeartBeat_args() + args.stormId = stormId + args.workerId = workerId + args.port = port + args.executors = executors + args.upTime = upTime + args.HBTime = HBTime + args.stats = stats + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_workerHeartBeat(self, ): + (fname, mtype, rseqid) = self._iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(self._iprot) + self._iprot.readMessageEnd() + raise x + result = workerHeartBeat_result() + result.read(self._iprot) + self._iprot.readMessageEnd() + return + def beginFileUpload(self, ): self.send_beginFileUpload() return self.recv_beginFileUpload() @@ -715,6 +768,7 @@ def __init__(self, handler): self._processMap["activate"] = Processor.process_activate self._processMap["deactivate"] = Processor.process_deactivate self._processMap["rebalance"] = Processor.process_rebalance + self._processMap["workerHeartBeat"] = Processor.process_workerHeartBeat self._processMap["beginFileUpload"] = Processor.process_beginFileUpload self._processMap["uploadChunk"] = Processor.process_uploadChunk self._processMap["finishFileUpload"] = Processor.process_finishFileUpload @@ -846,6 +900,17 @@ def process_rebalance(self, seqid, iprot, oprot): oprot.writeMessageEnd() oprot.trans.flush() + def process_workerHeartBeat(self, seqid, iprot, oprot): + args = workerHeartBeat_args() + args.read(iprot) + iprot.readMessageEnd() + result = workerHeartBeat_result() + self._handler.workerHeartBeat(args.stormId, args.workerId, args.port, args.executors, args.upTime, args.HBTime, args.stats) + oprot.writeMessageBegin("workerHeartBeat", TMessageType.REPLY, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + def process_beginFileUpload(self, seqid, iprot, oprot): args = beginFileUpload_args() args.read(iprot) @@ -2023,6 +2088,202 @@ def __eq__(self, other): def __ne__(self, other): return not (self == other) +class workerHeartBeat_args: + """ + Attributes: + - stormId + - workerId + - port + - executors + - upTime + - HBTime + - stats + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'stormId', None, None, ), # 1 + (2, TType.STRING, 'workerId', None, None, ), # 2 + (3, TType.I32, 'port', None, None, ), # 3 + (4, TType.SET, 'executors', (TType.LIST,(TType.I32,None)), None, ), # 4 + (5, TType.I64, 'upTime', None, None, ), # 5 + (6, TType.I64, 'HBTime', None, None, ), # 6 + (7, TType.STRING, 'stats', None, None, ), # 7 + ) + + def __hash__(self): + return 0 + hash(self.stormId) + hash(self.workerId) + hash(self.port) + hash(self.executors) + hash(self.upTime) + hash(self.HBTime) + hash(self.stats) + + def __init__(self, stormId=None, workerId=None, port=None, executors=None, upTime=None, HBTime=None, stats=None,): + self.stormId = stormId + self.workerId = workerId + self.port = port + self.executors = executors + self.upTime = upTime + self.HBTime = HBTime + self.stats = stats + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.stormId = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.workerId = iprot.readString().decode('utf-8') + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.port = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.SET: + self.executors = set() + (_etype295, _size292) = iprot.readSetBegin() + for _i296 in xrange(_size292): + _elem297 = [] + (_etype301, _size298) = iprot.readListBegin() + for _i302 in xrange(_size298): + _elem303 = iprot.readI32(); + _elem297.append(_elem303) + iprot.readListEnd() + self.executors.add(_elem297) + iprot.readSetEnd() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I64: + self.upTime = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.I64: + self.HBTime = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.STRING: + self.stats = iprot.readString(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('workerHeartBeat_args') + if self.stormId is not None: + oprot.writeFieldBegin('stormId', TType.STRING, 1) + oprot.writeString(self.stormId.encode('utf-8')) + oprot.writeFieldEnd() + if self.workerId is not None: + oprot.writeFieldBegin('workerId', TType.STRING, 2) + oprot.writeString(self.workerId.encode('utf-8')) + oprot.writeFieldEnd() + if self.port is not None: + oprot.writeFieldBegin('port', TType.I32, 3) + oprot.writeI32(self.port) + oprot.writeFieldEnd() + if self.executors is not None: + oprot.writeFieldBegin('executors', TType.SET, 4) + oprot.writeSetBegin(TType.LIST, len(self.executors)) + for iter304 in self.executors: + oprot.writeListBegin(TType.I32, len(iter304)) + for iter305 in iter304: + oprot.writeI32(iter305) + oprot.writeListEnd() + oprot.writeSetEnd() + oprot.writeFieldEnd() + if self.upTime is not None: + oprot.writeFieldBegin('upTime', TType.I64, 5) + oprot.writeI64(self.upTime) + oprot.writeFieldEnd() + if self.HBTime is not None: + oprot.writeFieldBegin('HBTime', TType.I64, 6) + oprot.writeI64(self.HBTime) + oprot.writeFieldEnd() + if self.stats is not None: + oprot.writeFieldBegin('stats', TType.STRING, 7) + oprot.writeString(self.stats) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class workerHeartBeat_result: + + thrift_spec = ( + ) + + def __hash__(self): + return 0 + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('workerHeartBeat_result') + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class beginFileUpload_args: thrift_spec = ( diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift index 475acd6acf..9dee821d11 100644 --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@ -214,6 +214,10 @@ service Nimbus { void deactivate(1: string name) throws (1: NotAliveException e); void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite); + //heartbeat funciton + void workerHeartBeat(1: string stormId, 2: string workerId, 3: i32 port, 4: set> executors 5: i64 upTime, 6: i64 HBTime 7: binary stats) + + // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs string beginFileUpload();