Skip to content

Adding support for HDFS in Image Reader #297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 175 additions & 6 deletions thunder/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import glob
import os
import logging
try:
import pydoop
import pydoop.hdfs as hdfs
except ImportError:
# If pydoop import fails, then HDFSParalellReader will error out.
pass

# library reorganization between Python 2 and 3
try:
Expand Down Expand Up @@ -69,6 +75,92 @@ def readlocal(path, offset=None, size=-1):
raise
return buf

def _hdfsRead(filePath, startOffset=None, size=-1):

try:
#use hdfs.open since readonly is default, did not need to pass it in
with hdfs.open(filePath) as f:
if startOffset:
f.seek(startOffset)
buf = f.read(size)
except IOError as e:
if e.errno == errno.ENOENT:
raise FileNotFoundError(e)
else:
raise
return buf

class HDFSParallelReader(object):

def __init__(self, sparkContext, **kwargs):
# kwargs allow AWS credentials to be passed into generic Readers w/o exceptions being raised
# in this case kwargs are just ignored
self.sc = sparkContext
self.lastNRecs = None

@staticmethod
def uriToPath(uri):
# thanks stack overflow:
# http://stackoverflow.com/questions/5977576/is-there-a-convenient-way-to-map-a-file-uri-to-os-path
path = urllib.url2pathname(urlparse.urlparse(uri).path)
if uri and (not path):
# passed a nonempty uri, got an empty path back
# this happens when given a file uri that starts with "file://" instead of "file:///"
# error here to prevent unexpected behavior of looking at current working directory
raise ValueError("Could not interpret %s as URI. " +
"Note absolute paths in URIs should start with 'file:///', not 'file://'")
return path

@staticmethod
def _listFilesRecursive(absPath, ext=None):
if ext:
files = fnmatch.filter(hdfs.ls(absPath), '*.' + ext)
else:
files = fnmatch.filter(hdfs.ls(absPath), '*')
files.sort()
return sorted(files)

@staticmethod
def _listFilesNonRecursive(absPath, ext=None):
if ext:
files = fnmatch.filter(hdfs.ls(absPath), '*.' + ext)
else:
files = fnmatch.filter(hdfs.ls(absPath), '*')
files.sort()
return sorted(files)


def listFiles(self, absPath, ext=None, startIdx=None, stopIdx=None, recursive=False):
"""Get sorted list of file paths matching passed `absPath` path and `ext` filename extension
"""
files = HDFSParallelReader._listFilesNonRecursive(absPath, ext) if not recursive else \
HDFSParallelReader._listFilesRecursive(absPath, ext)
if len(files) < 1:
raise FileNotFoundError('cannot find files of type "%s" in %s' % (ext if ext else '*', absPath))
files = selectByStartAndStopIndices(files, startIdx, stopIdx)

return files

def read(self, dataPath, ext=None, startIdx=None, stopIdx=None, recursive=False, npartitions=None):
"""Sets up Spark RDD across files specified by dataPath on local filesystem.

Returns RDD of <integer file index, string buffer> k/v pairs.
"""
if not hasattr(dataPath, '__iter__'):
filePaths = self.listFiles(dataPath, ext=ext, startIdx=startIdx, stopIdx=stopIdx, recursive=recursive)
else:
filePaths = [filePath for filePath in dataPath]
lfilepaths = len(filePaths)
self.lastNRecs = lfilepaths
npartitions = min(npartitions, lfilepaths) if npartitions else lfilepaths
return self.sc.parallelize(enumerate(filePaths), npartitions).map(lambda k, v: (k, _hdfsRead(v)))

# side note: the use of parallelize on the line above is a little suspect. If the data is small enough to be
# read into a file stream then sent to spark to be parallelized, then do we really need to use Spark
# in the first place? This would imply the need for reading from HDFS directly to an RDD, not
# something we have to collect (via HDFS file read) then parallelize again.


def listrecursive(path, ext=None):
"""
List files recurisvely
Expand Down Expand Up @@ -140,8 +232,11 @@ def read(self, path, ext=None, start=None, stop=None, recursive=False, npartitio

Returns RDD of <integer file index, string buffer> k/v pairs.
"""
path = uri_to_path(path)
files = self.list(path, ext=ext, start=start, stop=stop, recursive=recursive)
if not hasattr(path, '__iter__'):
path = uri_to_path(path)
files = self.list(path, ext=ext, start=start, stop=stop, recursive=recursive)
else:
files = [filePath for filePath in path]

nfiles = len(files)
self.nfiles = nfiles
Expand Down Expand Up @@ -213,6 +308,77 @@ def open(self, path, filename=None):

return open(filenames[0], 'rb')


class HDFSFileReader(object):
"""File reader backed by python's native file() objects.
"""
def __init__(self, **kwargs):
# do nothing...
pass

def __listRecursive(self, dataPath):
if hdfs.path.isdir(dataPath):
dirname = dataPath
matchpattern = None
else:
dirname, matchpattern = hdfs.path.split(dataPath)

filenames = set()
for root, dirs, files in hdfs.walk(dirname):
if matchpattern:
files = fnmatch.filter(files, matchpattern)
for filename in files:
filenames.add(hdfs.path.join(root, filename))
filenames = list(filenames)
filenames.sort()
return filenames

def list(self, dataPath, filename=None, startIdx=None, stopIdx=None, recursive=False,
includeDirectories=False):
"""List files specified by dataPath.

Datapath may include a single wildcard ('*') in the filename specifier.

Returns sorted list of absolute path strings.
"""
absPath = dataPath

if (not filename) and recursive:
return self.__listRecursive(absPath)

if filename:
wildCard = '*' + filename + '*'
files = fnmatch.filter(hdfs.ls(absPath), wildCard )
else:
files = fnmatch.filter(hdfs.ls(absPath), '*')
# filter out directories
if not includeDirectories:
files = [fpath for fpath in files if not hdfs.path.isdir(fpath)]
files.sort()
files = selectByStartAndStopIndices(files, startIdx, stopIdx)

return files

def read(self, dataPath, filename=None, startOffset=None, size=-1):
filenames = self.list(dataPath, filename=filename)
if not filenames:
raise FileNotFoundError("No file found matching: '%s'" % dataPath)
if len(filenames) > 1:
raise ValueError("Found multiple files matching: '%s'" % dataPath)

return _hdfsRead(filenames[0], startOffset=startOffset, size=size)

def open(self, dataPath, filename=None):
filenames = self.list(dataPath, filename=filename)

if not filenames:
raise FileNotFoundError("No file found matching: '%s'" % dataPath)
if len(filenames) > 1:
raise ValueError("Found multiple files matching: '%s'" % dataPath)
hdfs_obj = hdfs.hdfs()

return hdfs_obj.open_file(filenames[0], 'r')


class BotoClient(object):
"""
Expand Down Expand Up @@ -586,7 +752,7 @@ def mode(self):
'gs': BotoParallelReader,
's3': BotoParallelReader,
's3n': BotoParallelReader,
'hdfs': None,
'hdfs': HDFSParallelReader,
'http': None,
'https': None,
'ftp': None
Expand All @@ -598,7 +764,7 @@ def mode(self):
'gs': BotoFileReader,
's3': BotoFileReader,
's3n': BotoFileReader,
'hdfs': None,
'hdfs': HDFSFileReader,
'http': None,
'https': None,
'ftp': None
Expand Down Expand Up @@ -628,8 +794,11 @@ def get_by_scheme(path, lookup, default):
"""
Helper function used by get*ForPath().
"""
parsed = urlparse(path)
class_name = lookup.get(parsed.scheme, default)
if hasattr(path, '__iter__'):
class_name = LocalParallelReader
else:
parsed = urlparse(path)
class_name = lookup.get(parsed.scheme, default)
if class_name is None:
raise NotImplementedError("No implementation for scheme " + parsed.scheme)
return class_name
Expand Down