[Scipy-svn] r3475 - in trunk/scipy/io: . tests

scipy-svn@scip... scipy-svn@scip...
Wed Oct 31 13:02:41 CDT 2007


Author: chris.burns
Date: 2007-10-31 13:02:38 -0500 (Wed, 31 Oct 2007)
New Revision: 3475

Added:
   trunk/scipy/io/tests/test_datasource.py
Modified:
   trunk/scipy/io/datasource.py
Log:
Rewrite datasource, add testing module.

Modified: trunk/scipy/io/datasource.py
===================================================================
--- trunk/scipy/io/datasource.py	2007-10-30 22:43:58 UTC (rev 3474)
+++ trunk/scipy/io/datasource.py	2007-10-31 18:02:38 UTC (rev 3475)
@@ -1,20 +1,34 @@
-"""A generic interface for importing data.  Data sources can originate from
-URLs or local paths and can be in compressed or uncompressed form.
+"""A generic file interface for handling data files.  The goal of datasource is
+to abstract some of the file system operations when dealing with files.
+Specifically acquiring the files.  DataSource files can originate locally or
+remotely, for example:
+    local files - '/home/name/blah/blah/blah/data.txt'
+    URLs (http, ftp, ...) - 'http://www.scipy.org/not/real/data.txt'
 
-"""
+DataSource files can also be compressed or uncompressed.  Currently only gzip
+and bz2 are supported.
 
-# TODO: Make DataSource and Repository the public interface.
-#       _Cache will be used internally.  Add methods in DataSource to expose
-#       some of the functionality that exists only in _Cache currently.
+In a typical use, you would pass a DataSource to a function that would open
+up the DataSource (as it would any file-like object) and read the file.
 
+Ex:
+    >>> ds = datasource.DataSource()
+    >>> fp = ds.open('http://www.scipy.org/not/real/data.txt.gz')
+    >>> fp.read()
+    >>> fp.close()
+    >>> del ds, fp
+
+"""
+
 __docformat__ = "restructuredtext en"
 
-import os
-import gzip
 import bz2
-from urlparse import urlparse
+import gzip
+import os
+import tempfile
+from shutil import rmtree
 from urllib2 import urlopen, URLError
-from tempfile import mkstemp
+from urlparse import urlparse
 
 import warnings
 
@@ -31,367 +45,252 @@
 
 warnings.warn(_api_warning)
 
-# TODO: Don't check file extensions, just try and open zip files?
-#       Follow the, it's easier to get forgiveness than permission?
-#       And because this shouldn't succeed:
-#           In [134]: datasource._iszip('/my/fake/dir/foobar.gz')
-#           Out[134]: True
-
 # TODO: .zip support, .tar support?
 _file_openers = {".gz":gzip.open, ".bz2":bz2.BZ2File, None:file}
 
-def _iszip(filename):
-    """Test if the given file is a zip file.
 
-    Currently only looks at the file extension, not very robust.
-    """
+class DataSource (object):
+    """A generic data source (file, http, ftp, ...).
 
-    fname, ext = os.path.splitext(filename)
-    return ext in _file_openers.keys()
+    DataSource could be from a local file or remote file/URL.  The file may also
+    be compressed or uncompressed.
 
-def _unzip(filename):
-    """Unzip the given file and return the path object to the new file."""
+    Ex URL DataSources:
+        Initialize DataSource with a local directory.  Default is os.curdir
 
-    # This function is not used in datasource.  Appears it was created
-    # so users could unzip a file without having to import the corresponding
-    # compression module.  Should this be part of datasource?
-    if not _iszip(filename):
-        raise ValueError("file %s is not zipped"%filename)
-    unzip_name, zipext = _splitzipext(filename)
-    opener = _file_openers[zipext]
-    outfile = file(unzip_name, 'w')
-    outfile.write(opener(filename).read())
-    outfile.close()
-    return unzip_name
+        >>> ds = DataSource('/home/guido')
+        >>> ds.open('http://fake.xyz.web/site/xyz.txt')
 
-def _iswritemode(mode):
-    """Test if the given mode will open a file for writing."""
+        Opened file exists here:  /home/guido/site/xyz.txt
 
-    _writemodes = ("w", "+", "a")
-    for c in mode:
-        if c in _writemodes: return True
-    return False
+    Ex using DataSource for temporary files:
+        Initialize DataSource with 'None' for local directory.
 
-def _splitzipext(filename):
-    """Split the filename into a path object and a zip extension.
+        >>> ds = DataSource(None)
+        >>> ds.open('/home/guido/foobar.txt')
 
-    If the filename does not have a zip extension the zip_ext in the
-    return will be None.
+        Opened file exists in tempdir like: /tmp/tmpUnhcvM/foobar.txt
 
-    Parameters:
-
-        filename : {string}
-            Filename to split.
-
-    Returns:
-
-        base, zip_ext : {tuple}
-            Tuple containing a path object to the file and the zip extension.
-
     """
 
-    if _iszip(filename):
-        return os.path.splitext(filename)
-    else:
-        return filename, None
+    def __init__(self, destpath=os.curdir):
+        if destpath:
+            self._destpath = destpath
+            self._istmpdest = False
+        else:
+            self._destpath = tempfile.mkdtemp()
+            self._istmpdest = True
 
-def _isurl(pathstr):
-    """Test whether a given string can be parsed as a URL.
+    def __del__(self):
+        if self._istmpdest:
+            rmtree(self._destpath)
 
-    A pathstr with a valid network scheme (http, ftp, ...) will return true.
-    A pathstr with a 'file' scheme will return false.
+    def _iszip(self, filename):
+        """Test if the filename is a zip file by looking at the file extension.
+        """
+        fname, ext = os.path.splitext(filename)
+        return ext in _file_openers.keys()
 
-    """
+    def _iswritemode(self, mode):
+        """Test if the given mode will open a file for writing."""
 
-    scheme, netloc, upath, uparams, uquery, ufrag = urlparse(pathstr)
-    return bool(scheme and netloc)
+        # Currently only used to test the bz2 files.  Not thoroughly tested!
+        _writemodes = ("w", "+")
+        for c in mode:
+            if c in _writemodes: return True
+        return False
 
-def _ensuredirs(directory):
-    """Ensure that the given directory path exists.  If not, create it."""
+    def _splitzipext(self, filename):
+        """Split zip extension from filename and return filename.
 
-    if not os.path.exists(directory):
-        os.makedirs(directory)
-
-class _Cache (object):
-    """A local file cache for URL datasources.
-
-    The path of the cache can be specified on intialization.  The default
-    path is ~/.scipy/cache
-
-
-    """
-
-    def __init__(self, cachepath=None):
-        if cachepath is not None:
-            self.path = cachepath
-        elif os.name == 'posix':
-            self.path = os.path.join(os.environ["HOME"], ".scipy", "cache")
-        elif os.name == 'nt':
-            self.path = os.path.join(os.environ["HOMEPATH"], ".scipy", "cache")
-        if not os.path.exists(self.path):
-            _ensuredirs(self.path)
-
-    def tempfile(self, suffix='', prefix=''):
-        """Create and return a temporary file in the cache.
-
-        Parameters:
-            suffix : {''}, optional
-
-            prefix : {''}, optional
-
         Returns:
-            tmpfile : {string}
-                String containing the full path to the temporary file.
+            base, zip_ext : {tuple}
 
-        Examples
-
-            >>> mycache = datasource._Cache()
-            >>> mytmpfile = mycache.tempfile()
-            >>> mytmpfile
-            '/home/guido/.scipy/cache/GUPhDv'
-
         """
 
-        _tmp, fname = mkstemp(suffix, prefix, self.path)
-        return fname
+        if self._iszip(filename):
+            return os.path.splitext(filename)
+        else:
+            return filename, None
 
-    def filepath(self, uri):
-        """Return a path object to the uri in the cache.
+    def _possible_names(self, filename):
+        """Return a tuple containing compressed filename variations."""
+        names = [filename]
+        if not self._iszip(filename):
+            for zipext in _file_openers.keys():
+                if zipext:
+                    names.append(filename+zipext)
+        return names
 
-        Parameters:
-            uri : {string}
-                Filename to use in the returned path object.
+    def _isurl(self, path):
+        """Test if path is a net location.  Tests the scheme and netloc."""
+        scheme, netloc, upath, uparams, uquery, ufrag = urlparse(path)
+        return bool(scheme and netloc)
 
-        Returns:
-            path : {string}
-                Complete path for the given uri.
+    def _cache(self, path):
+        """Cache the file specified by path.
 
-        Examples
+        Creates a copy of the file in the datasource cache.
 
-            >>> mycache = datasource._Cache()
-            >>> mycache.filepath('xyzcoords.txt')
-            '/home/guido/.scipy/cache/xyzcoords.txt'
-
         """
 
-        scheme, netloc, upath, uparams, uquery, ufrag = urlparse(uri)
-        return os.path.join(self.path, netloc, upath.strip('/'))
+        upath = self.abspath(path)
 
-    def cache(self, uri):
-        """Copy the file at uri into the cache.
+        # ensure directory exists
+        if not os.path.exists(os.path.dirname(upath)):
+            os.makedirs(os.path.dirname(upath))
 
-        Parameters:
-            uri : {string}
-                path or url of source file to cache.
-
-        Returns:
-            None
-
-        """
-
-        if self.iscached(uri):
-            return
-
-        upath = self.filepath(uri)
-        _ensuredirs(os.path.dirname(upath))
-
-        print 'cache - source:', uri
-        print 'cache - destination:', upath
-
-        if _isurl(uri):
+        # TODO: Doesn't handle compressed files!
+        if self._isurl(path):
             try:
-                openedurl = urlopen(uri)
+                openedurl = urlopen(path)
                 file(upath, 'w').write(openedurl.read())
             except URLError:
-                raise URLError("URL not found: " + str(uri))
+                raise URLError("URL not found: ", path)
         else:
             try:
-                fp = file(uri, 'r')
+                # TODO: Why not just copy the file with shutils.copyfile?
+                fp = file(path, 'r')
                 file(upath, 'w').write(fp.read())
             except IOError:
-                raise IOError("File not founcd: " + str(uri))
+                raise IOError("File not found: ", path)
+        return upath
 
-    def clear(self):
-        """Delete all files in the cache."""
+    def _findfile(self, path):
+        """Searches for path and returns full path if found.
 
-        # TODO: This deletes all files in the cache directory, regardless
-        #       of if this instance created them.  Too destructive and
-        #       unexpected behavior.
-        #for _file in self.path.files():
-        #    os.remove(file)
-        raise NotImplementedError
+        If path is an URL, _findfile will cache a local copy and return
+        the path to the cached file.
+        If path is a local file, _findfile will return a path to that local
+        file.
 
-    def iscached(self, uri):
-        """ Check if a file exists in the cache.
+        The search will include possible compressed versions of the file and
+        return the first occurence found.
 
-        Returns
-            boolean
-
         """
 
-        upath = self.filepath(uri)
-        return os.path.exists(upath)
+        # Build list of possible local file paths
+        filelist = self._possible_names(self.abspath(path))
+        if self._isurl(path):
+            # Add list of possible remote urls
+            filelist = filelist + self._possible_names(path)
 
-    def retrieve(self, uri):
-        """Retrieve a file from the cache.
-        If not already there, create the file and add it to the cache.
+        for name in filelist:
+            if self.exists(name):
+                if self._isurl(name):
+                    name = self._cache(name)
+                return name
+        return None
 
-        Returns
-            open file object
+    def abspath(self, path):
+        """Return an absolute path in the DataSource destination directory.
 
         """
 
-        self.cache(uri)
-        return file(self.filepath(uri))
+        # handle case where path includes self._destpath
+        splitpath = path.split(self._destpath, 2)
+        if len(splitpath) > 1:
+            path = splitpath[1]
+        scheme, netloc, upath, uparams, uquery, ufrag = urlparse(path)
+        return os.path.join(self._destpath, netloc, upath.strip(os.sep))
 
+    def exists(self, path):
+        """Test if path exists.
 
-class DataSource (object):
-    """A generic data source class.
+        Tests for local files, locally cached URLs and remote URLs.
 
-    Data sets could be from a file, a URL, or a cached file.  They may also
-    be compressed or uncompressed.
-
-    TODO: Improve DataSource docstring
-
-    """
-
-    def __init__(self, cachepath=os.curdir):
-        self._cache = _Cache(cachepath)
-
-    def tempfile(self, suffix='', prefix=''):
-        """Create a temporary file in the DataSource cache.
-
-        Parameters:
-            suffix : {''}, optional
-
-            prefix : {''}, optional
-
-        Returns:
-            tmpfile : {string}
-                String containing the full path to the temporary file.
-
-        Examples
-
-            >>> datasrc = datasource.DataSource()
-            >>> tmpfile = datasrc.tempfile()
-            >>> tmpfile
-            '/home/guido/src/scipy-trunk/scipy/io/PZTuKo'
-
         """
-        return self._cache.tempfile(suffix, prefix)
 
-    def _possible_names(self, filename):
-        """Return a tuple containing compressed filenames."""
-        names = [filename]
-        if not _iszip(filename):
-            for zipext in _file_openers.keys():
-                names.append(filename+zipext)
-        return tuple(names)
-
-    def cache(self, pathstr):
-        """Cache the file specified by pathstr.
-
-        Creates a copy of file pathstr in the datasource cache.
-
-        """
-
-        self._cache.cache(pathstr)
-
-    def clear(self):
-        # TODO: Implement a clear interface for deleting tempfiles.
-        #       There's a problem with the way this is handled in the _Cache,
-        #       All files in the cache directory will be deleted.  In the
-        #       default instance, that's the os.curdir.  I doubt this is what
-        #       people would want.  The instance should only delete files that
-        #       it created!
-        raise NotImplementedError
-
-    def filename(self, pathstr):
-        """Searches for pathstr file and returns full path if found.
-
-        If pathstr is an URL, filename will cache a local copy and return
-        the path to the cached file.
-        If pathstr is a local file, filename will return a path to that local
-        file.
-        BUG:  This should be modified so the behavior is identical for both
-              types of files!
-
-        The search will include possible compressed versions of the files.
-        BUG:  Will return the first occurence found, regardless of which
-              version is newer.
-
-        """
-
-        found = None
-        for name in self._possible_names(pathstr):
-            try:
-                if _isurl(name):
-                    self.cache(name)
-                    found = self._cache.filepath(name)
-                else:
-                    raise Exception
-            except:
-                if os.path.exists(name):
-                    found = name
-            if found:
-                break
-        if found is None:
-            raise IOError("%s not found"%pathstr)
-        return found
-
-    def exists(self, pathstr):
-        """Test if pathstr exists in the cache or the current directory.
-
-        If pathstr is an URL, it will be fetched and cached.
-
-        """
-
-        # Is this method doing to much?  At very least may want an option to
-        # not fetch and cache URLs.
-
-        try:
-            _datafile = self.filename(pathstr)
+        upath = self.abspath(path)
+        if os.path.exists(upath):
             return True
-        except IOError:
+        elif self._isurl(path):
+            try:
+                netfile = urlopen(path)
+                # just validate existence, nothing more.
+                del(netfile)
+                return True
+            except URLError:
+                return False
+        else:
             return False
 
-    def open(self, pathstr, mode='r'):
-        """Open pathstr and return file object.
+    def open(self, path, mode='r'):
+        """Open path and return file object.
 
-        If pathstr is an URL, it will be fetched and cached.
+        If path is an URL, it will be downloaded, stored in the DataSource
+        directory and opened.
 
+        TODO: Currently only opening for reading has been tested.  There is no
+              support for opening a file for writing which doesn't exist yet
+              (creating a file).
+
         """
 
-        # Is this method doing to much?  Should it be fetching and caching?
-
-        if _isurl(pathstr) and _iswritemode(mode):
+        if self._isurl(path) and self._iswritemode(mode):
             raise ValueError("URLs are not writeable")
-        found = self.filename(pathstr)
-        _fname, ext = _splitzipext(found)
-        if ext == 'bz2':
-            mode.replace("+", "")
-        return _file_openers[ext](found, mode=mode)
 
+        # NOTE: _findfile will fail on a new file opened for writing.
+        found = self._findfile(path)
+        if found:
+            _fname, ext = self._splitzipext(found)
+            if ext == 'bz2':
+                mode.replace("+", "")
+            return _file_openers[ext](found, mode=mode)
+        else:
+            raise IOError("%s not found." % path)
 
+
 class Repository (DataSource):
-    """Multiple DataSource's that share one base url.
+    """A data repository where multiple DataSource's share one base URL.
 
-    TODO: Improve Repository docstring.
+    Use a Repository when you will be working with multiple files from one
+    base URL or directory.  Initialize the Respository with the base URL,
+    then refer to each file only by it's filename.
 
+    >>> repos = Repository('/home/user/data/dir/')
+    >>> fp = repos.open('data01.txt')
+    >>> fp.analyze()
+    >>> fp.close()
+
+    Similarly you could use a URL for a repository:
+    >>> repos = Repository('http://www.xyz.edu/data')
+
     """
 
-    def __init__(self, baseurl, cachepath=None):
-        DataSource.__init__(self, cachepath=cachepath)
+    def __init__(self, baseurl, destpath=os.curdir):
+        DataSource.__init__(self, destpath=destpath)
         self._baseurl = baseurl
 
-    def _fullpath(self, pathstr):
-        return os.path.join(self._baseurl, pathstr)
+    def _fullpath(self, path):
+        '''Return complete path for path.  Prepends baseurl if necessary.'''
+        #print 'Repository._fullpath:', path
+        #print '          ._baseurl: ', self._baseurl
+        splitpath = path.split(self._baseurl, 2)
+        if len(splitpath) == 1:
+            result = os.path.join(self._baseurl, path)
+        else:
+            result = path    # path contains baseurl already
+        return result
 
-    def filename(self, pathstr):
-        return DataSource.filename(self, self._fullpath(pathstr))
+    def _findfile(self, path):
+        #print 'Repository._findfile:', path
+        return DataSource._findfile(self, self._fullpath(path))
 
-    def exists(self, pathstr):
-        return DataSource.exists(self, self._fullpath(pathstr))
+    def abspath(self, path):
+        return DataSource.abspath(self, self._fullpath(path))
 
-    def open(self, pathstr, mode='r'):
-        return DataSource.open(self, self._fullpath(pathstr), mode)
+    def exists(self, path):
+        #print 'Respository.exists:', path
+        return DataSource.exists(self, self._fullpath(path))
+
+    def open(self, path, mode='r'):
+        #print 'Repository.open:', path
+        return DataSource.open(self, self._fullpath(path), mode)
+
+    def listdir(self):
+        '''List files in the source Repository.'''
+        if self._isurl(self._baseurl):
+            raise NotImplementedError
+        else:
+            return os.listdir(self._baseurl)

Added: trunk/scipy/io/tests/test_datasource.py
===================================================================
--- trunk/scipy/io/tests/test_datasource.py	2007-10-30 22:43:58 UTC (rev 3474)
+++ trunk/scipy/io/tests/test_datasource.py	2007-10-31 18:02:38 UTC (rev 3475)
@@ -0,0 +1,207 @@
+
+import bz2
+import gzip
+import os
+import sys
+import struct
+from tempfile import mkdtemp, mkstemp
+from shutil import rmtree
+from urlparse import urlparse
+
+from numpy.testing import *
+
+# HACK: import the src datasource
+#       Until datasource is robust enough to include in scipy.io package
+sys.path.insert(0, os.path.abspath('..'))
+import datasource
+del sys.path[0]
+
+#set_package_path()
+#from scipy.io import datasource
+#restore_path()
+
+# Can rebind urlopen for testing, so we don't open real net connection.
+#def urlopen(url, data=None):
+#    print 'test_datasource urlopen(', url, ')'
+#datasource.urlopen = urlopen
+
+#http_baseurl = 'http://nifti.nimh.nih.gov/nifti-1/data/'
+#http_filename = 'minimal.nii.gz'
+#http_abspath = os.path.join(http_baseurl, http_filename)
+
+# Temporarily use one of our files so we don't abuse someone elses server.
+http_path = 'https://cirl.berkeley.edu/twiki/pub/BIC/ImagingDocuments/'
+http_file = 'dork.pdf'
+
+http_fakepath = 'http://fake.abc.web/site/'
+http_fakefile = 'fake.txt'
+
+magic_line = 'three is the magic number'
+
+
+# Utility functions used by many TestCases
+def valid_textfile(filedir):
+    # Generate and return a valid temporary file.
+    fd, path = mkstemp(suffix='.txt', dir=filedir, text=True)
+    os.close(fd)
+    return path
+
+def invalid_textfile(filedir):
+    # Generate and return an invalid filename.
+    fd, path = mkstemp(suffix='.txt', dir=filedir)
+    os.close(fd)
+    os.remove(path)
+    return path
+
+def valid_httpurl():
+    return http_path+http_file
+
+def invalid_httpurl():
+    return http_fakepath+http_fakefile
+
+def valid_baseurl():
+    return http_path
+
+def invalid_baseurl():
+    return http_fakepath
+
+def valid_httpfile():
+    return http_file
+
+def invalid_httpfile():
+    return http_fakefile
+
+class TestDataSourceOpen(NumpyTestCase):
+    def setUp(self):
+        self.tmpdir = mkdtemp()
+        self.ds = datasource.DataSource(self.tmpdir)
+
+    def tearDown(self):
+        rmtree(self.tmpdir)
+        del self.ds
+
+    def test_ValidHTTP(self):
+        assert self.ds.open(valid_httpurl())
+    
+    def test_InvalidHTTP(self):
+        self.assertRaises(IOError, self.ds.open, invalid_httpurl())
+
+    def test_ValidFile(self):
+        local_file = valid_textfile(self.tmpdir)
+        #print '\nDataSourceOpen test_ValidFile:', local_file
+        assert self.ds.open(local_file)
+
+    def test_InvalidFile(self):
+        invalid_file = invalid_textfile(self.tmpdir)
+        #print '\nDataSourceOpen test_InvalidFile:', invalid_file
+        self.assertRaises(IOError, self.ds.open, invalid_file)
+
+    def test_ValidGzipFile(self):
+        # Test datasource's internal file_opener for Gzip files.
+        filepath = os.path.join(self.tmpdir, 'foobar.txt.gz')
+        #print '\nDataSourceOpen test_ValidGzipFile:', filepath
+        fp = gzip.open(filepath, 'w')
+        fp.write(magic_line)
+        fp.close()
+        fp = self.ds.open(filepath)
+        result = fp.readline()
+        fp.close()
+        self.assertEqual(magic_line, result)
+
+    def test_ValidBz2File(self):
+        # Test datasource's internal file_opener for BZip2 files.
+        filepath = os.path.join(self.tmpdir, 'foobar.txt.bz2')
+        #print '\nDataSourceOpen test_ValidBZ2File:', filepath
+        fp = bz2.BZ2File(filepath, 'w')
+        fp.write(magic_line)
+        fp.close()
+        fp = self.ds.open(filepath)
+        result = fp.readline()
+        fp.close()
+        self.assertEqual(magic_line, result)
+
+
+class TestDataSourceExists(NumpyTestCase):
+    def setUp(self):
+        self.tmpdir = mkdtemp()
+        self.ds = datasource.DataSource(self.tmpdir)
+
+    def tearDown(self):
+        rmtree(self.tmpdir)
+        del self.ds
+
+    def test_ValidHTTP(self):
+        #print 'DataSourceExists test_ValidHTTP'
+        assert self.ds.exists(valid_httpurl())
+    
+    def test_InvalidHTTP(self):
+        #print 'DataSourceExists test_InvalidHTTP'
+        self.assertEqual(self.ds.exists(invalid_httpurl()), False)
+
+    def test_ValidFile(self):
+        tmpfile = valid_textfile(self.tmpdir)
+        #print 'DataSourceExists test_ValidFile:', tmpfile
+        assert self.ds.exists(tmpfile)
+
+    def test_InvalidFile(self):
+        tmpfile = invalid_textfile(self.tmpdir)
+        #print 'DataSourceExists test_InvalidFile:', tmpfile
+        self.assertEqual(self.ds.exists(tmpfile), False)
+
+
+class TestDataSourceAbspath(NumpyTestCase):
+    def setUp(self):
+        self.tmpdir = mkdtemp()
+        self.ds = datasource.DataSource(self.tmpdir)
+
+    def tearDown(self):
+        rmtree(self.tmpdir)
+        del self.ds
+
+    def test_ValidHTTP(self):
+        scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
+        local_path = os.path.join(self.tmpdir, netloc, upath.strip(os.sep))
+        self.assertEqual(local_path, self.ds.abspath(valid_httpurl()))
+
+    def test_ValidFile(self):
+        tmpfile = valid_textfile(self.tmpdir)
+        tmpfilename = os.path.split(tmpfile)[-1]
+        # Test with filename only
+        self.assertEqual(tmpfile, self.ds.abspath(os.path.split(tmpfile)[-1]))
+        # Test filename with complete path
+        self.assertEqual(tmpfile, self.ds.abspath(tmpfile))
+
+    def test_InvalidHTTP(self):
+        scheme, netloc, upath, pms, qry, frg = urlparse(invalid_httpurl())
+        invalidhttp = os.path.join(self.tmpdir, netloc, upath.strip(os.sep))
+        self.assertNotEqual(invalidhttp, self.ds.abspath(valid_httpurl()))
+
+    def test_InvalidFile(self):
+        fd, invalidfile = mkstemp(suffix='.txt')
+        os.close(fd)
+        tmpfile = valid_textfile(self.tmpdir)
+        tmpfilename = os.path.split(tmpfile)[-1]
+        # Test with filename only
+        self.assertNotEqual(invalidfile, self.ds.abspath(tmpfilename))
+        # Test filename with complete path
+        self.assertNotEqual(invalidfile, self.ds.abspath(tmpfile))
+
+
+class TestRespositoryAbspath(NumpyTestCase):
+    def setUp(self):
+        self.repos = datasource.Repository(valid_baseurl(), None)
+
+    def tearDown(self):
+        del self.repos
+
+    def test_ValidHTTP(self):
+        scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
+        local_path = os.path.join(self.repos._destpath, netloc, \
+                                  upath.strip(os.sep))
+        filepath = self.repos.abspath(valid_httpfile())
+        self.assertEqual(local_path, filepath)
+
+
+if __name__ == "__main__":
+    NumpyTest().run()
+



More information about the Scipy-svn mailing list