Skip to content

Add socket options #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions kinetic/baseclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ class BaseClient(object):
# drive default
USER_ID = 1
CLIENT_SECRET = 'asdfasdf'
# socket options
TCP_NODELAY_VALUE = 1
OPT_TCP_NODELAY = 'TCP_NODELAY'
OPT_SO_SNDBUF = 'SO_SNDBUF'
OPT_SO_RCVBUF = 'SO_RCVBUF'
OPT_IP_TOS = 'IP_TOS'


def __init__(self, hostname=HOSTNAME, port=PORT, identity=USER_ID,
cluster_version=None, secret=CLIENT_SECRET,
Expand All @@ -67,7 +74,7 @@ def __init__(self, hostname=HOSTNAME, port=PORT, identity=USER_ID,
socket_timeout=common.DEFAULT_SOCKET_TIMEOUT,
socket_address=None, socket_port=0,
defer_read=False,
use_ssl=False, pin=None):
use_ssl=False, pin=None, **kwargs):
self.hostname = hostname
self.port = port
self.identity = identity
Expand All @@ -88,6 +95,11 @@ def __init__(self, hostname=HOSTNAME, port=PORT, identity=USER_ID,
self.pin = pin
self.on_unsolicited = None

if len(kwargs) > 0 and 'socket_options' in kwargs:
self.socket_options = kwargs['socket_options']
else:
self.socket_options = None

@property
def socket(self):
if not self._socket:
Expand All @@ -112,13 +124,41 @@ def connect(self):
if self.use_ssl:
s = ssl.wrap_socket(s)

# set up our defaults
tcp_nodelay_value = BaseClient.TCP_NODELAY_VALUE

if self.socket_options is not None:
dict_options = self.socket_options
if BaseClient.OPT_TCP_NODELAY in dict_options:
tcp_nodelay_value = dict_options[BaseClient.OPT_TCP_NODELAY]
if tcp_nodelay_value != BaseClient.TCP_NODELAY_VALUE:
LOG.info("Setting socket TCP NODELAY to " + str(tcp_nodelay_value))

if BaseClient.OPT_SO_SNDBUF in dict_options:
so_sndbuf_value = dict_options[BaseClient.OPT_SO_SNDBUF]
if so_sndbuf_value > 0:
LOG.info("Setting socket send buffer size to " + str(so_sndbuf_value))
s.setsockopt(ss.SOL_SOCKET, ss.SO_SNDBUF, so_sndbuf_value)

if BaseClient.OPT_SO_RCVBUF in dict_options:
so_rcvbuf_value = dict_options[BaseClient.OPT_SO_RCVBUF]
if so_rcvbuf_value > 0:
LOG.info("Setting socket receive buffer size to " + str(so_rcvbuf_value))
s.setsockopt(ss.SOL_SOCKET, ss.SO_RCVBUF, so_rcvbuf_value)

if BaseClient.OPT_IP_TOS in dict_options:
ip_tos_value = dict_options[BaseClient.OPT_IP_TOS]
if ip_tos_value >= 0:
LOG.info("Setting IP TOS to " + str(ip_tos_value))
s.setsockopt(ss.IPPROTO_IP, ss.IP_TOS, ip_tos_value)

s.settimeout(self.connect_timeout)
if self.socket_address:
LOG.debug("Client local port address bound to " + self.socket_address)
s.bind((self.socket_address, self.socket_port))
# if connect fails, there is nothing to clean up
s.connect(sockaddr) # use first
s.setsockopt(ss.IPPROTO_TCP, ss.TCP_NODELAY, 1)
s.setsockopt(ss.IPPROTO_TCP, ss.TCP_NODELAY, tcp_nodelay_value)

# We are connected now, update attributes
self._socket = s
Expand Down Expand Up @@ -360,6 +400,21 @@ def send(self, header, value):
else: done = True
return m,cmd,value

def get_socket_options(self):
"""
Retrieves current socket options.
:return: dictionary of options for socket connection, or empty dictionary if no socket connection exists.
"""
dict_socket_options = {}

if self._socket is not None:
dict_socket_options[BaseClient.OPT_TCP_NODELAY] = self._socket.getsockopt(ss.IPPROTO_TCP, ss.TCP_NODELAY)
dict_socket_options[BaseClient.OPT_SO_SNDBUF] = self._socket.getsockopt(ss.SOL_SOCKET, ss.SO_SNDBUF)
dict_socket_options[BaseClient.OPT_SO_RCVBUF] = self._socket.getsockopt(ss.SOL_SOCKET, ss.SO_RCVBUF)
dict_socket_options[BaseClient.OPT_IP_TOS] = self._socket.getsockopt(ss.IPPROTO_IP, ss.IP_TOS)

return dict_socket_options

### with statement support ###

def __enter__(self):
Expand Down
24 changes: 24 additions & 0 deletions kinetic/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,30 @@ def parse(self, m, value):
return [op for op in m.body.p2pOperation.operation]


class PushKeys(object):

@staticmethod
def build(keys, hostname='localhost', port=8123, **kwargs):
m = messages.Command()
m.header.messageType = messages.Command.PEER2PEERPUSH
m.body.p2pOperation.peer.hostname = hostname
m.body.p2pOperation.peer.port = port

m.body.p2pOperation.operation.extend([
messages.Command.P2POperation.Operation(key=key) for key in keys
])

return (m, None)

@staticmethod
def parse(m, value):
return [op for op in m.body.p2pOperation.operation]

@staticmethod
def onError(e):
raise e


class Flush(BaseOperation):

def _build(self):
Expand Down
234 changes: 226 additions & 8 deletions test/test_adminclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,23 @@

#@author: Robert Cope

import unittest

from kinetic import Client
from kinetic import AdminClient
from kinetic import KineticMessageException
from base import BaseTestCase
from kinetic import common
from kinetic.common import KineticException
import kinetic.kinetic_pb2 as messages


class AdminClientTestCase(BaseTestCase):

DEFAULT_CLUSTER_VERSION = 0
MAX_KEY_SIZE = 4096
MAX_VALUE_SIZE = 1024 * 1024
MAX_VERSION_SIZE = 2048
MAX_KEY_RANGE_COUNT = 200

def setUp(self):
super(AdminClientTestCase, self).setUp()
self.adminClient = AdminClient(self.host, self.port)
Expand All @@ -42,10 +49,221 @@ def test_setSecurity(self):
domain = common.Domain(roles=[common.Roles.READ])
acl.domains = [domain]

self.adminClient.setSecurity([acl])
if self.adminClient.use_ssl:
self.adminClient.setSecurity([acl])

# Verify user 100 can only read
read_only_client = Client(self.host, self.port, identity=100)
read_only_client.get(self.buildKey(1)) # Should be OK.
args = (self.buildKey(2), 'test_value_2')
self.assertRaises(KineticMessageException, read_only_client.put, *args)
else:
try:
#TODO: change this to self.assertRaises
self.adminClient.setSecurity([acl])
except KineticException:
pass
else:
self.fail('Exception should be thrown if not using SSL')

def test_get_capacity(self):
log = self.adminClient.getLog([messages.Command.GetLog.CAPACITIES])
self.assertIsNotNone(log)

capacity = log.capacity
self.assertIsNotNone(capacity)

self.assertTrue(capacity.portionFull >= 0)
self.assertTrue(capacity.nominalCapacityInBytes >= 0)

def test_get_capacity_and_utilization(self):
log = self.adminClient.getLog([messages.Command.GetLog.CAPACITIES, messages.Command.GetLog.UTILIZATIONS])
self.assertIsNotNone(log)

capacity = log.capacity
self.assertIsNotNone(capacity)

self.assertTrue(capacity.portionFull >= 0)
self.assertTrue(capacity.nominalCapacityInBytes >= 0)

util_list = log.utilizations

for util in util_list:
self.assertTrue(util.value >= 0)

def test_get_configuration(self):
log = self.adminClient.getLog([messages.Command.GetLog.CONFIGURATION])
self.assertIsNotNone(log)

configuration = log.configuration
self.assertIsNotNone(configuration)

self.assertTrue(len(configuration.compilationDate) > 0)
self.assertTrue(len(configuration.model) > 0)
self.assertTrue(configuration.port >= 0)
self.assertTrue(configuration.tlsPort >= 0)
self.assertTrue(len(configuration.serialNumber) > 0)
self.assertTrue(len(configuration.sourceHash) > 0)
self.assertTrue(len(configuration.vendor) > 0)
self.assertTrue(len(configuration.version) > 0)

for interface in configuration.interface:
self.assertTrue(len(interface.name) > 0)

def test_get_limits(self):
log = self.adminClient.getLog([messages.Command.GetLog.LIMITS])
self.assertIsNotNone(log)

limits = log.limits
self.assertIsNotNone(limits)

self.assertTrue(limits.maxKeySize == AdminClientTestCase.MAX_KEY_SIZE)
self.assertTrue(limits.maxValueSize == AdminClientTestCase.MAX_VALUE_SIZE)
self.assertTrue(limits.maxVersionSize == AdminClientTestCase.MAX_VERSION_SIZE)
self.assertTrue(limits.maxKeyRangeCount == AdminClientTestCase.MAX_KEY_RANGE_COUNT)

def test_get_log(self):
#TODO: is there a way to specify all types without explicitly enumerating them all?
log = self.adminClient.getLog([messages.Command.GetLog.TEMPERATURES, messages.Command.GetLog.UTILIZATIONS,
messages.Command.GetLog.STATISTICS, messages.Command.GetLog.MESSAGES,
messages.Command.GetLog.CAPACITIES, messages.Command.GetLog.LIMITS])
self.assertIsNotNone(log)

self.assertTrue(len(log.temperatures) > 0)
self.assertTrue(len(log.utilizations) > 0)
self.assertTrue(len(log.statistics) > 0)
self.assertTrue(log.messages > 0)
self.assertTrue(log.capacity.portionFull >= 0)
self.assertTrue(log.capacity.nominalCapacityInBytes >= 0)
self.assertTrue(log.limits.maxKeySize == AdminClientTestCase.MAX_KEY_SIZE)
self.assertTrue(log.limits.maxValueSize == AdminClientTestCase.MAX_VALUE_SIZE)
self.assertTrue(log.limits.maxVersionSize == AdminClientTestCase.MAX_VERSION_SIZE)
self.assertTrue(log.limits.maxKeyRangeCount == AdminClientTestCase.MAX_KEY_RANGE_COUNT)

def test_get_temperature(self):
log = self.adminClient.getLog([messages.Command.GetLog.TEMPERATURES])
self.assertIsNotNone(log)

for temperature in log.temperatures:
self.assertTrue(temperature.current >= 0)
self.assertTrue(temperature.maximum >= 0)

def test_get_temperature_and_capacity(self):
log = self.adminClient.getLog([messages.Command.GetLog.TEMPERATURES, messages.Command.GetLog.CAPACITIES])
self.assertIsNotNone(log)

for temperature in log.temperatures:
self.assertTrue(temperature.current >= 0)
self.assertTrue(temperature.maximum >= 0)

capacity = log.capacity
self.assertIsNotNone(capacity)

self.assertTrue(capacity.portionFull >= 0)
self.assertTrue(capacity.nominalCapacityInBytes >= 0)

def test_get_temperature_and_capacity_and_utilization(self):
log = self.adminClient.getLog([messages.Command.GetLog.TEMPERATURES, messages.Command.GetLog.CAPACITIES,
messages.Command.GetLog.UTILIZATIONS])
self.assertIsNotNone(log)

for temperature in log.temperatures:
self.assertTrue(temperature.current >= 0)
self.assertTrue(temperature.maximum >= 0)

capacity = log.capacity
self.assertIsNotNone(capacity)

self.assertTrue(capacity.portionFull >= 0)
self.assertTrue(capacity.nominalCapacityInBytes >= 0)

util_list = log.utilizations

for util in util_list:
self.assertTrue(util.value >= 0)

def test_get_temperature_and_utilization(self):
log = self.adminClient.getLog([messages.Command.GetLog.TEMPERATURES, messages.Command.GetLog.UTILIZATIONS])
self.assertIsNotNone(log)

for temperature in log.temperatures:
self.assertTrue(temperature.current >= 0)
self.assertTrue(temperature.maximum >= 0)

util_list = log.utilizations

for util in util_list:
self.assertTrue(util.value >= 0)

def test_get_utilization(self):
log = self.adminClient.getLog([messages.Command.GetLog.UTILIZATIONS])
self.assertIsNotNone(log)

util_list = log.utilizations

for util in util_list:
self.assertTrue(util.value >= 0)

def reset_cluster_version_to_default(self, current_cluster_version):
self.adminClient = AdminClient(cluster_version=current_cluster_version)
self.adminClient.setClusterVersion(AdminClientTestCase.DEFAULT_CLUSTER_VERSION)
self.adminClient.close()

def test_set_cluster_version(self):
new_cluster_version = AdminClientTestCase.DEFAULT_CLUSTER_VERSION + 1
self.adminClient.setClusterVersion(new_cluster_version)
self.reset_cluster_version_to_default(new_cluster_version)

def test_update_firmware(self):
#TODO: implement test_update_firmware
pass

def test_unlock(self):
#TODO: implement test_unlock
if self.adminClient.use_ssl:
pass
else:
self.assertRaises(KineticException)

def test_lock(self):
#TODO: implement test_lock
if self.adminClient.use_ssl:
pass
else:
self.assertRaises(KineticException)

def test_erase(self):
#TODO: implement test_erase
if self.adminClient.use_ssl:
pass
else:
self.assertRaises(KineticException)

def test_instance_secure_erase(self):
#TODO: implement test_instance_secure_erase
if self.adminClient.use_ssl:
pass
else:
self.assertRaises(KineticException)

def test_set_erase_pin(self):
#TODO: implement test_set_erase_pin
if self.adminClient.use_ssl:
pass
else:
self.assertRaises(KineticException)

def test_set_lock_pin(self):
#TODO: implement test_set_lock_pin
if self.adminClient.use_ssl:
pass
else:
self.assertRaises(KineticException)

def test_set_acl(self):
#TODO: implement test_set_acl
if self.adminClient.use_ssl:
pass
else:
self.assertRaises(KineticException)

# Verify user 100 can only read
read_only_client = Client(self.host, self.port, identity=100)
read_only_client.get(self.buildKey(1)) # Should be OK.
args = (self.buildKey(2), 'test_value_2')
self.assertRaises(KineticMessageException, read_only_client.put, *args)
Loading