Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion harmonicPE/batch_error_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ class BatchErrorCode:
SUCCESS = 0
INVALID_PARAMETERS = 1
OPEN_SOCKET_ERROR = 2
DATA_SOCKET_ERROR = 3
DATA_SOCKET_ERROR = 3
IDLE_TIMEOUT = 4
37 changes: 35 additions & 2 deletions harmonicPE/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#import urllib3
import sys
import time
import requests
import os

#import zlib
#from pyurdme import *
#import struct
Expand Down Expand Up @@ -67,6 +70,9 @@ def listen_for_tasks(fn_process_message):
af, socktype, proto, canonname, sa = res
try:
listening_socket = socket.socket(af, socktype, proto)
if not Setting.get_idle_timeout() == None:
listening_socket.settimeout(Setting.get_idle_timeout()) # set the socket timeout to specified value from Settings

except socket.error as msg:
print(msg)
listening_socket = None
Expand All @@ -85,16 +91,42 @@ def listen_for_tasks(fn_process_message):
print('Open Socket Error')
sys.exit(BatchErrorCode.OPEN_SOCKET_ERROR)

restarted = False
try:
while True:
# Send a stream request to server
time1 = time.time()
data = bytearray()
Services.send_stream_request_data(data)
if not restarted:
Services.send_stream_request_data(data)
time2 = time3 = time.time()
if len(data) == 0:
# No data return from the system, waiting for stream.
conn, addr = listening_socket.accept()
try:
conn, addr = listening_socket.accept()
restarted = False
except socket.timeout as t:
# graceful container exit - notify master I want to quit because I didn't get data within timeout
url = "http://{}:{}/docker?token=None&command=finished&c_name={}&short_id={}".format(
Setting.get_node_addr(),
Setting.get_worker_port(),
Setting.get_node_name(),
os.getenv('HOSTNAME')
)
content = "I am exiting with timeout exception: {}\n".format(t)
req = requests.get(url)

if req.status_code == 200:
# master has acknowledged termination
listening_socket.shutdown(socket.SHUT_RDWR)
listening_socket.close()
sys.exit(BatchErrorCode.IDLE_TIMEOUT)

# master did not allow termination, move to next iteration of while True
restarted = True
continue


print('Streaming from ', addr[0], ":", addr[1])

# Extracting object id
Expand Down Expand Up @@ -132,6 +164,7 @@ def listen_for_tasks(fn_process_message):

except IOError as e:
print(str(e))
listening_socket.shutdown(socket.SHUT_RDWR)
listening_socket.close()
sys.exit(BatchErrorCode.DATA_SOCKET_ERROR)

Expand Down
23 changes: 20 additions & 3 deletions harmonicPE/setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ def __get_str_pull_req():
+ "/streamRequest?token=" + Setting.get_token() + \
"&batch_addr=" + Setting.get_node_addr() \
+ "&batch_port=" + str(Setting.get_node_port()) \
+ "&batch_status=0&c_name=" + Setting.get_node_name()
+ "&batch_status=0&c_name=" + Setting.get_node_name() \
+ "&short_id=" + os.getenv('HOSTNAME') # add the container short id

@staticmethod
def send_stream_request():
Expand Down Expand Up @@ -139,6 +140,8 @@ class Setting(object):
__master_port = None
__std_idle_time = None
__token = "None"
__idle_timeout = None
__worker_port = None

@staticmethod
def set_params_from_env():
Expand All @@ -151,11 +154,12 @@ def set_params_from_env():
Setting.__master_port = int(os.environ.get("HDE_MASTER_PORT"))
Setting.__std_idle_time = int(os.environ.get("HDE_STD_IDLE_TIME"))
Setting.__token = os.environ.get("HDE_TOKEN")

Setting.__idle_timeout = os.environ.get("HDE_IDLE_TIMEOUT")
Setting.__worker_port = int(os.environ.get('HDE_NODE_REST_PORT'))


@staticmethod
def set_params(node_name, node_data_port, master_addr, master_port, std_idle_time, repo_addr, repo_port,
def set_params(node_name, node_data_port, master_addr, master_port, std_idle_time, repo_addr, repo_port, container_idle_timeout=None, worker_port=None,
node_addr=None):
Setting.__node_name = node_name
Setting.__node_data_port = node_data_port
Expand All @@ -166,6 +170,8 @@ def set_params(node_name, node_data_port, master_addr, master_port, std_idle_tim
Setting.__repo_port = repo_port
# Get node container address from the environment
#Setting.__node_container_addr = os.environ.get("CONTAINER_ADDR")
Setting.__idle_timeout = container_idle_timeout
Setting.__worker_port = worker_port

# Set node addr
if node_addr:
Expand Down Expand Up @@ -221,3 +227,14 @@ def get_std_idle_time():
def get_token():
return Setting.__token

@staticmethod
def get_worker_port():
return Setting.__worker_port

@staticmethod
def get_idle_timeout():
t = Setting.__idle_timeout
if t:
t = int(t)
return t