diff --git a/thunder/readers.py b/thunder/readers.py index 552f008e..09236744 100644 --- a/thunder/readers.py +++ b/thunder/readers.py @@ -3,6 +3,12 @@ import glob import os import logging +try: + import pydoop + import pydoop.hdfs as hdfs +except ImportError: + # If pydoop import fails, then HDFSParalellReader will error out. + pass # library reorganization between Python 2 and 3 try: @@ -69,6 +75,92 @@ def readlocal(path, offset=None, size=-1): raise return buf +def _hdfsRead(filePath, startOffset=None, size=-1): + + try: + #use hdfs.open since readonly is default, did not need to pass it in + with hdfs.open(filePath) as f: + if startOffset: + f.seek(startOffset) + buf = f.read(size) + except IOError as e: + if e.errno == errno.ENOENT: + raise FileNotFoundError(e) + else: + raise + return buf + +class HDFSParallelReader(object): + + def __init__(self, sparkContext, **kwargs): + # kwargs allow AWS credentials to be passed into generic Readers w/o exceptions being raised + # in this case kwargs are just ignored + self.sc = sparkContext + self.lastNRecs = None + + @staticmethod + def uriToPath(uri): + # thanks stack overflow: + # http://stackoverflow.com/questions/5977576/is-there-a-convenient-way-to-map-a-file-uri-to-os-path + path = urllib.url2pathname(urlparse.urlparse(uri).path) + if uri and (not path): + # passed a nonempty uri, got an empty path back + # this happens when given a file uri that starts with "file://" instead of "file:///" + # error here to prevent unexpected behavior of looking at current working directory + raise ValueError("Could not interpret %s as URI. " + + "Note absolute paths in URIs should start with 'file:///', not 'file://'") + return path + + @staticmethod + def _listFilesRecursive(absPath, ext=None): + if ext: + files = fnmatch.filter(hdfs.ls(absPath), '*.' + ext) + else: + files = fnmatch.filter(hdfs.ls(absPath), '*') + files.sort() + return sorted(files) + + @staticmethod + def _listFilesNonRecursive(absPath, ext=None): + if ext: + files = fnmatch.filter(hdfs.ls(absPath), '*.' + ext) + else: + files = fnmatch.filter(hdfs.ls(absPath), '*') + files.sort() + return sorted(files) + + + def listFiles(self, absPath, ext=None, startIdx=None, stopIdx=None, recursive=False): + """Get sorted list of file paths matching passed `absPath` path and `ext` filename extension + """ + files = HDFSParallelReader._listFilesNonRecursive(absPath, ext) if not recursive else \ + HDFSParallelReader._listFilesRecursive(absPath, ext) + if len(files) < 1: + raise FileNotFoundError('cannot find files of type "%s" in %s' % (ext if ext else '*', absPath)) + files = selectByStartAndStopIndices(files, startIdx, stopIdx) + + return files + + def read(self, dataPath, ext=None, startIdx=None, stopIdx=None, recursive=False, npartitions=None): + """Sets up Spark RDD across files specified by dataPath on local filesystem. + + Returns RDD of k/v pairs. + """ + if not hasattr(dataPath, '__iter__'): + filePaths = self.listFiles(dataPath, ext=ext, startIdx=startIdx, stopIdx=stopIdx, recursive=recursive) + else: + filePaths = [filePath for filePath in dataPath] + lfilepaths = len(filePaths) + self.lastNRecs = lfilepaths + npartitions = min(npartitions, lfilepaths) if npartitions else lfilepaths + return self.sc.parallelize(enumerate(filePaths), npartitions).map(lambda k, v: (k, _hdfsRead(v))) + + # side note: the use of parallelize on the line above is a little suspect. If the data is small enough to be + # read into a file stream then sent to spark to be parallelized, then do we really need to use Spark + # in the first place? This would imply the need for reading from HDFS directly to an RDD, not + # something we have to collect (via HDFS file read) then parallelize again. + + def listrecursive(path, ext=None): """ List files recurisvely @@ -140,8 +232,11 @@ def read(self, path, ext=None, start=None, stop=None, recursive=False, npartitio Returns RDD of k/v pairs. """ - path = uri_to_path(path) - files = self.list(path, ext=ext, start=start, stop=stop, recursive=recursive) + if not hasattr(path, '__iter__'): + path = uri_to_path(path) + files = self.list(path, ext=ext, start=start, stop=stop, recursive=recursive) + else: + files = [filePath for filePath in path] nfiles = len(files) self.nfiles = nfiles @@ -213,6 +308,77 @@ def open(self, path, filename=None): return open(filenames[0], 'rb') + +class HDFSFileReader(object): + """File reader backed by python's native file() objects. + """ + def __init__(self, **kwargs): + # do nothing... + pass + + def __listRecursive(self, dataPath): + if hdfs.path.isdir(dataPath): + dirname = dataPath + matchpattern = None + else: + dirname, matchpattern = hdfs.path.split(dataPath) + + filenames = set() + for root, dirs, files in hdfs.walk(dirname): + if matchpattern: + files = fnmatch.filter(files, matchpattern) + for filename in files: + filenames.add(hdfs.path.join(root, filename)) + filenames = list(filenames) + filenames.sort() + return filenames + + def list(self, dataPath, filename=None, startIdx=None, stopIdx=None, recursive=False, + includeDirectories=False): + """List files specified by dataPath. + + Datapath may include a single wildcard ('*') in the filename specifier. + + Returns sorted list of absolute path strings. + """ + absPath = dataPath + + if (not filename) and recursive: + return self.__listRecursive(absPath) + + if filename: + wildCard = '*' + filename + '*' + files = fnmatch.filter(hdfs.ls(absPath), wildCard ) + else: + files = fnmatch.filter(hdfs.ls(absPath), '*') + # filter out directories + if not includeDirectories: + files = [fpath for fpath in files if not hdfs.path.isdir(fpath)] + files.sort() + files = selectByStartAndStopIndices(files, startIdx, stopIdx) + + return files + + def read(self, dataPath, filename=None, startOffset=None, size=-1): + filenames = self.list(dataPath, filename=filename) + if not filenames: + raise FileNotFoundError("No file found matching: '%s'" % dataPath) + if len(filenames) > 1: + raise ValueError("Found multiple files matching: '%s'" % dataPath) + + return _hdfsRead(filenames[0], startOffset=startOffset, size=size) + + def open(self, dataPath, filename=None): + filenames = self.list(dataPath, filename=filename) + + if not filenames: + raise FileNotFoundError("No file found matching: '%s'" % dataPath) + if len(filenames) > 1: + raise ValueError("Found multiple files matching: '%s'" % dataPath) + hdfs_obj = hdfs.hdfs() + + return hdfs_obj.open_file(filenames[0], 'r') + class BotoClient(object): """ @@ -586,7 +752,7 @@ def mode(self): 'gs': BotoParallelReader, 's3': BotoParallelReader, 's3n': BotoParallelReader, - 'hdfs': None, + 'hdfs': HDFSParallelReader, 'http': None, 'https': None, 'ftp': None @@ -598,7 +764,7 @@ def mode(self): 'gs': BotoFileReader, 's3': BotoFileReader, 's3n': BotoFileReader, - 'hdfs': None, + 'hdfs': HDFSFileReader, 'http': None, 'https': None, 'ftp': None @@ -628,8 +794,11 @@ def get_by_scheme(path, lookup, default): """ Helper function used by get*ForPath(). """ - parsed = urlparse(path) - class_name = lookup.get(parsed.scheme, default) + if hasattr(path, '__iter__'): + class_name = LocalParallelReader + else: + parsed = urlparse(path) + class_name = lookup.get(parsed.scheme, default) if class_name is None: raise NotImplementedError("No implementation for scheme " + parsed.scheme) return class_name