[Scipy-svn] r3475 - in trunk/scipy/io: . tests
scipy-svn@scip...
scipy-svn@scip...
Wed Oct 31 13:02:41 CDT 2007
Author: chris.burns
Date: 2007-10-31 13:02:38 -0500 (Wed, 31 Oct 2007)
New Revision: 3475
Added:
trunk/scipy/io/tests/test_datasource.py
Modified:
trunk/scipy/io/datasource.py
Log:
Rewrite datasource, add testing module.
Modified: trunk/scipy/io/datasource.py
===================================================================
--- trunk/scipy/io/datasource.py 2007-10-30 22:43:58 UTC (rev 3474)
+++ trunk/scipy/io/datasource.py 2007-10-31 18:02:38 UTC (rev 3475)
@@ -1,20 +1,34 @@
-"""A generic interface for importing data. Data sources can originate from
-URLs or local paths and can be in compressed or uncompressed form.
+"""A generic file interface for handling data files. The goal of datasource is
+to abstract some of the file system operations when dealing with files.
+Specifically acquiring the files. DataSource files can originate locally or
+remotely, for example:
+ local files - '/home/name/blah/blah/blah/data.txt'
+ URLs (http, ftp, ...) - 'http://www.scipy.org/not/real/data.txt'
-"""
+DataSource files can also be compressed or uncompressed. Currently only gzip
+and bz2 are supported.
-# TODO: Make DataSource and Repository the public interface.
-# _Cache will be used internally. Add methods in DataSource to expose
-# some of the functionality that exists only in _Cache currently.
+In a typical use, you would pass a DataSource to a function that would open
+up the DataSource (as it would any file-like object) and read the file.
+Ex:
+ >>> ds = datasource.DataSource()
+ >>> fp = ds.open('http://www.scipy.org/not/real/data.txt.gz')
+ >>> fp.read()
+ >>> fp.close()
+ >>> del ds, fp
+
+"""
+
__docformat__ = "restructuredtext en"
-import os
-import gzip
import bz2
-from urlparse import urlparse
+import gzip
+import os
+import tempfile
+from shutil import rmtree
from urllib2 import urlopen, URLError
-from tempfile import mkstemp
+from urlparse import urlparse
import warnings
@@ -31,367 +45,252 @@
warnings.warn(_api_warning)
-# TODO: Don't check file extensions, just try and open zip files?
-# Follow the, it's easier to get forgiveness than permission?
-# And because this shouldn't succeed:
-# In [134]: datasource._iszip('/my/fake/dir/foobar.gz')
-# Out[134]: True
-
# TODO: .zip support, .tar support?
_file_openers = {".gz":gzip.open, ".bz2":bz2.BZ2File, None:file}
-def _iszip(filename):
- """Test if the given file is a zip file.
- Currently only looks at the file extension, not very robust.
- """
+class DataSource (object):
+ """A generic data source (file, http, ftp, ...).
- fname, ext = os.path.splitext(filename)
- return ext in _file_openers.keys()
+ DataSource could be from a local file or remote file/URL. The file may also
+ be compressed or uncompressed.
-def _unzip(filename):
- """Unzip the given file and return the path object to the new file."""
+ Ex URL DataSources:
+ Initialize DataSource with a local directory. Default is os.curdir
- # This function is not used in datasource. Appears it was created
- # so users could unzip a file without having to import the corresponding
- # compression module. Should this be part of datasource?
- if not _iszip(filename):
- raise ValueError("file %s is not zipped"%filename)
- unzip_name, zipext = _splitzipext(filename)
- opener = _file_openers[zipext]
- outfile = file(unzip_name, 'w')
- outfile.write(opener(filename).read())
- outfile.close()
- return unzip_name
+ >>> ds = DataSource('/home/guido')
+ >>> ds.open('http://fake.xyz.web/site/xyz.txt')
-def _iswritemode(mode):
- """Test if the given mode will open a file for writing."""
+ Opened file exists here: /home/guido/site/xyz.txt
- _writemodes = ("w", "+", "a")
- for c in mode:
- if c in _writemodes: return True
- return False
+ Ex using DataSource for temporary files:
+ Initialize DataSource with 'None' for local directory.
-def _splitzipext(filename):
- """Split the filename into a path object and a zip extension.
+ >>> ds = DataSource(None)
+ >>> ds.open('/home/guido/foobar.txt')
- If the filename does not have a zip extension the zip_ext in the
- return will be None.
+ Opened file exists in tempdir like: /tmp/tmpUnhcvM/foobar.txt
- Parameters:
-
- filename : {string}
- Filename to split.
-
- Returns:
-
- base, zip_ext : {tuple}
- Tuple containing a path object to the file and the zip extension.
-
"""
- if _iszip(filename):
- return os.path.splitext(filename)
- else:
- return filename, None
+ def __init__(self, destpath=os.curdir):
+ if destpath:
+ self._destpath = destpath
+ self._istmpdest = False
+ else:
+ self._destpath = tempfile.mkdtemp()
+ self._istmpdest = True
-def _isurl(pathstr):
- """Test whether a given string can be parsed as a URL.
+ def __del__(self):
+ if self._istmpdest:
+ rmtree(self._destpath)
- A pathstr with a valid network scheme (http, ftp, ...) will return true.
- A pathstr with a 'file' scheme will return false.
+ def _iszip(self, filename):
+ """Test if the filename is a zip file by looking at the file extension.
+ """
+ fname, ext = os.path.splitext(filename)
+ return ext in _file_openers.keys()
- """
+ def _iswritemode(self, mode):
+ """Test if the given mode will open a file for writing."""
- scheme, netloc, upath, uparams, uquery, ufrag = urlparse(pathstr)
- return bool(scheme and netloc)
+ # Currently only used to test the bz2 files. Not thoroughly tested!
+ _writemodes = ("w", "+")
+ for c in mode:
+ if c in _writemodes: return True
+ return False
-def _ensuredirs(directory):
- """Ensure that the given directory path exists. If not, create it."""
+ def _splitzipext(self, filename):
+ """Split zip extension from filename and return filename.
- if not os.path.exists(directory):
- os.makedirs(directory)
-
-class _Cache (object):
- """A local file cache for URL datasources.
-
- The path of the cache can be specified on intialization. The default
- path is ~/.scipy/cache
-
-
- """
-
- def __init__(self, cachepath=None):
- if cachepath is not None:
- self.path = cachepath
- elif os.name == 'posix':
- self.path = os.path.join(os.environ["HOME"], ".scipy", "cache")
- elif os.name == 'nt':
- self.path = os.path.join(os.environ["HOMEPATH"], ".scipy", "cache")
- if not os.path.exists(self.path):
- _ensuredirs(self.path)
-
- def tempfile(self, suffix='', prefix=''):
- """Create and return a temporary file in the cache.
-
- Parameters:
- suffix : {''}, optional
-
- prefix : {''}, optional
-
Returns:
- tmpfile : {string}
- String containing the full path to the temporary file.
+ base, zip_ext : {tuple}
- Examples
-
- >>> mycache = datasource._Cache()
- >>> mytmpfile = mycache.tempfile()
- >>> mytmpfile
- '/home/guido/.scipy/cache/GUPhDv'
-
"""
- _tmp, fname = mkstemp(suffix, prefix, self.path)
- return fname
+ if self._iszip(filename):
+ return os.path.splitext(filename)
+ else:
+ return filename, None
- def filepath(self, uri):
- """Return a path object to the uri in the cache.
+ def _possible_names(self, filename):
+ """Return a tuple containing compressed filename variations."""
+ names = [filename]
+ if not self._iszip(filename):
+ for zipext in _file_openers.keys():
+ if zipext:
+ names.append(filename+zipext)
+ return names
- Parameters:
- uri : {string}
- Filename to use in the returned path object.
+ def _isurl(self, path):
+ """Test if path is a net location. Tests the scheme and netloc."""
+ scheme, netloc, upath, uparams, uquery, ufrag = urlparse(path)
+ return bool(scheme and netloc)
- Returns:
- path : {string}
- Complete path for the given uri.
+ def _cache(self, path):
+ """Cache the file specified by path.
- Examples
+ Creates a copy of the file in the datasource cache.
- >>> mycache = datasource._Cache()
- >>> mycache.filepath('xyzcoords.txt')
- '/home/guido/.scipy/cache/xyzcoords.txt'
-
"""
- scheme, netloc, upath, uparams, uquery, ufrag = urlparse(uri)
- return os.path.join(self.path, netloc, upath.strip('/'))
+ upath = self.abspath(path)
- def cache(self, uri):
- """Copy the file at uri into the cache.
+ # ensure directory exists
+ if not os.path.exists(os.path.dirname(upath)):
+ os.makedirs(os.path.dirname(upath))
- Parameters:
- uri : {string}
- path or url of source file to cache.
-
- Returns:
- None
-
- """
-
- if self.iscached(uri):
- return
-
- upath = self.filepath(uri)
- _ensuredirs(os.path.dirname(upath))
-
- print 'cache - source:', uri
- print 'cache - destination:', upath
-
- if _isurl(uri):
+ # TODO: Doesn't handle compressed files!
+ if self._isurl(path):
try:
- openedurl = urlopen(uri)
+ openedurl = urlopen(path)
file(upath, 'w').write(openedurl.read())
except URLError:
- raise URLError("URL not found: " + str(uri))
+ raise URLError("URL not found: ", path)
else:
try:
- fp = file(uri, 'r')
+ # TODO: Why not just copy the file with shutils.copyfile?
+ fp = file(path, 'r')
file(upath, 'w').write(fp.read())
except IOError:
- raise IOError("File not founcd: " + str(uri))
+ raise IOError("File not found: ", path)
+ return upath
- def clear(self):
- """Delete all files in the cache."""
+ def _findfile(self, path):
+ """Searches for path and returns full path if found.
- # TODO: This deletes all files in the cache directory, regardless
- # of if this instance created them. Too destructive and
- # unexpected behavior.
- #for _file in self.path.files():
- # os.remove(file)
- raise NotImplementedError
+ If path is an URL, _findfile will cache a local copy and return
+ the path to the cached file.
+ If path is a local file, _findfile will return a path to that local
+ file.
- def iscached(self, uri):
- """ Check if a file exists in the cache.
+ The search will include possible compressed versions of the file and
+ return the first occurence found.
- Returns
- boolean
-
"""
- upath = self.filepath(uri)
- return os.path.exists(upath)
+ # Build list of possible local file paths
+ filelist = self._possible_names(self.abspath(path))
+ if self._isurl(path):
+ # Add list of possible remote urls
+ filelist = filelist + self._possible_names(path)
- def retrieve(self, uri):
- """Retrieve a file from the cache.
- If not already there, create the file and add it to the cache.
+ for name in filelist:
+ if self.exists(name):
+ if self._isurl(name):
+ name = self._cache(name)
+ return name
+ return None
- Returns
- open file object
+ def abspath(self, path):
+ """Return an absolute path in the DataSource destination directory.
"""
- self.cache(uri)
- return file(self.filepath(uri))
+ # handle case where path includes self._destpath
+ splitpath = path.split(self._destpath, 2)
+ if len(splitpath) > 1:
+ path = splitpath[1]
+ scheme, netloc, upath, uparams, uquery, ufrag = urlparse(path)
+ return os.path.join(self._destpath, netloc, upath.strip(os.sep))
+ def exists(self, path):
+ """Test if path exists.
-class DataSource (object):
- """A generic data source class.
+ Tests for local files, locally cached URLs and remote URLs.
- Data sets could be from a file, a URL, or a cached file. They may also
- be compressed or uncompressed.
-
- TODO: Improve DataSource docstring
-
- """
-
- def __init__(self, cachepath=os.curdir):
- self._cache = _Cache(cachepath)
-
- def tempfile(self, suffix='', prefix=''):
- """Create a temporary file in the DataSource cache.
-
- Parameters:
- suffix : {''}, optional
-
- prefix : {''}, optional
-
- Returns:
- tmpfile : {string}
- String containing the full path to the temporary file.
-
- Examples
-
- >>> datasrc = datasource.DataSource()
- >>> tmpfile = datasrc.tempfile()
- >>> tmpfile
- '/home/guido/src/scipy-trunk/scipy/io/PZTuKo'
-
"""
- return self._cache.tempfile(suffix, prefix)
- def _possible_names(self, filename):
- """Return a tuple containing compressed filenames."""
- names = [filename]
- if not _iszip(filename):
- for zipext in _file_openers.keys():
- names.append(filename+zipext)
- return tuple(names)
-
- def cache(self, pathstr):
- """Cache the file specified by pathstr.
-
- Creates a copy of file pathstr in the datasource cache.
-
- """
-
- self._cache.cache(pathstr)
-
- def clear(self):
- # TODO: Implement a clear interface for deleting tempfiles.
- # There's a problem with the way this is handled in the _Cache,
- # All files in the cache directory will be deleted. In the
- # default instance, that's the os.curdir. I doubt this is what
- # people would want. The instance should only delete files that
- # it created!
- raise NotImplementedError
-
- def filename(self, pathstr):
- """Searches for pathstr file and returns full path if found.
-
- If pathstr is an URL, filename will cache a local copy and return
- the path to the cached file.
- If pathstr is a local file, filename will return a path to that local
- file.
- BUG: This should be modified so the behavior is identical for both
- types of files!
-
- The search will include possible compressed versions of the files.
- BUG: Will return the first occurence found, regardless of which
- version is newer.
-
- """
-
- found = None
- for name in self._possible_names(pathstr):
- try:
- if _isurl(name):
- self.cache(name)
- found = self._cache.filepath(name)
- else:
- raise Exception
- except:
- if os.path.exists(name):
- found = name
- if found:
- break
- if found is None:
- raise IOError("%s not found"%pathstr)
- return found
-
- def exists(self, pathstr):
- """Test if pathstr exists in the cache or the current directory.
-
- If pathstr is an URL, it will be fetched and cached.
-
- """
-
- # Is this method doing to much? At very least may want an option to
- # not fetch and cache URLs.
-
- try:
- _datafile = self.filename(pathstr)
+ upath = self.abspath(path)
+ if os.path.exists(upath):
return True
- except IOError:
+ elif self._isurl(path):
+ try:
+ netfile = urlopen(path)
+ # just validate existence, nothing more.
+ del(netfile)
+ return True
+ except URLError:
+ return False
+ else:
return False
- def open(self, pathstr, mode='r'):
- """Open pathstr and return file object.
+ def open(self, path, mode='r'):
+ """Open path and return file object.
- If pathstr is an URL, it will be fetched and cached.
+ If path is an URL, it will be downloaded, stored in the DataSource
+ directory and opened.
+ TODO: Currently only opening for reading has been tested. There is no
+ support for opening a file for writing which doesn't exist yet
+ (creating a file).
+
"""
- # Is this method doing to much? Should it be fetching and caching?
-
- if _isurl(pathstr) and _iswritemode(mode):
+ if self._isurl(path) and self._iswritemode(mode):
raise ValueError("URLs are not writeable")
- found = self.filename(pathstr)
- _fname, ext = _splitzipext(found)
- if ext == 'bz2':
- mode.replace("+", "")
- return _file_openers[ext](found, mode=mode)
+ # NOTE: _findfile will fail on a new file opened for writing.
+ found = self._findfile(path)
+ if found:
+ _fname, ext = self._splitzipext(found)
+ if ext == 'bz2':
+ mode.replace("+", "")
+ return _file_openers[ext](found, mode=mode)
+ else:
+ raise IOError("%s not found." % path)
+
class Repository (DataSource):
- """Multiple DataSource's that share one base url.
+ """A data repository where multiple DataSource's share one base URL.
- TODO: Improve Repository docstring.
+ Use a Repository when you will be working with multiple files from one
+ base URL or directory. Initialize the Respository with the base URL,
+ then refer to each file only by it's filename.
+ >>> repos = Repository('/home/user/data/dir/')
+ >>> fp = repos.open('data01.txt')
+ >>> fp.analyze()
+ >>> fp.close()
+
+ Similarly you could use a URL for a repository:
+ >>> repos = Repository('http://www.xyz.edu/data')
+
"""
- def __init__(self, baseurl, cachepath=None):
- DataSource.__init__(self, cachepath=cachepath)
+ def __init__(self, baseurl, destpath=os.curdir):
+ DataSource.__init__(self, destpath=destpath)
self._baseurl = baseurl
- def _fullpath(self, pathstr):
- return os.path.join(self._baseurl, pathstr)
+ def _fullpath(self, path):
+ '''Return complete path for path. Prepends baseurl if necessary.'''
+ #print 'Repository._fullpath:', path
+ #print ' ._baseurl: ', self._baseurl
+ splitpath = path.split(self._baseurl, 2)
+ if len(splitpath) == 1:
+ result = os.path.join(self._baseurl, path)
+ else:
+ result = path # path contains baseurl already
+ return result
- def filename(self, pathstr):
- return DataSource.filename(self, self._fullpath(pathstr))
+ def _findfile(self, path):
+ #print 'Repository._findfile:', path
+ return DataSource._findfile(self, self._fullpath(path))
- def exists(self, pathstr):
- return DataSource.exists(self, self._fullpath(pathstr))
+ def abspath(self, path):
+ return DataSource.abspath(self, self._fullpath(path))
- def open(self, pathstr, mode='r'):
- return DataSource.open(self, self._fullpath(pathstr), mode)
+ def exists(self, path):
+ #print 'Respository.exists:', path
+ return DataSource.exists(self, self._fullpath(path))
+
+ def open(self, path, mode='r'):
+ #print 'Repository.open:', path
+ return DataSource.open(self, self._fullpath(path), mode)
+
+ def listdir(self):
+ '''List files in the source Repository.'''
+ if self._isurl(self._baseurl):
+ raise NotImplementedError
+ else:
+ return os.listdir(self._baseurl)
Added: trunk/scipy/io/tests/test_datasource.py
===================================================================
--- trunk/scipy/io/tests/test_datasource.py 2007-10-30 22:43:58 UTC (rev 3474)
+++ trunk/scipy/io/tests/test_datasource.py 2007-10-31 18:02:38 UTC (rev 3475)
@@ -0,0 +1,207 @@
+
+import bz2
+import gzip
+import os
+import sys
+import struct
+from tempfile import mkdtemp, mkstemp
+from shutil import rmtree
+from urlparse import urlparse
+
+from numpy.testing import *
+
+# HACK: import the src datasource
+# Until datasource is robust enough to include in scipy.io package
+sys.path.insert(0, os.path.abspath('..'))
+import datasource
+del sys.path[0]
+
+#set_package_path()
+#from scipy.io import datasource
+#restore_path()
+
+# Can rebind urlopen for testing, so we don't open real net connection.
+#def urlopen(url, data=None):
+# print 'test_datasource urlopen(', url, ')'
+#datasource.urlopen = urlopen
+
+#http_baseurl = 'http://nifti.nimh.nih.gov/nifti-1/data/'
+#http_filename = 'minimal.nii.gz'
+#http_abspath = os.path.join(http_baseurl, http_filename)
+
+# Temporarily use one of our files so we don't abuse someone elses server.
+http_path = 'https://cirl.berkeley.edu/twiki/pub/BIC/ImagingDocuments/'
+http_file = 'dork.pdf'
+
+http_fakepath = 'http://fake.abc.web/site/'
+http_fakefile = 'fake.txt'
+
+magic_line = 'three is the magic number'
+
+
+# Utility functions used by many TestCases
+def valid_textfile(filedir):
+ # Generate and return a valid temporary file.
+ fd, path = mkstemp(suffix='.txt', dir=filedir, text=True)
+ os.close(fd)
+ return path
+
+def invalid_textfile(filedir):
+ # Generate and return an invalid filename.
+ fd, path = mkstemp(suffix='.txt', dir=filedir)
+ os.close(fd)
+ os.remove(path)
+ return path
+
+def valid_httpurl():
+ return http_path+http_file
+
+def invalid_httpurl():
+ return http_fakepath+http_fakefile
+
+def valid_baseurl():
+ return http_path
+
+def invalid_baseurl():
+ return http_fakepath
+
+def valid_httpfile():
+ return http_file
+
+def invalid_httpfile():
+ return http_fakefile
+
+class TestDataSourceOpen(NumpyTestCase):
+ def setUp(self):
+ self.tmpdir = mkdtemp()
+ self.ds = datasource.DataSource(self.tmpdir)
+
+ def tearDown(self):
+ rmtree(self.tmpdir)
+ del self.ds
+
+ def test_ValidHTTP(self):
+ assert self.ds.open(valid_httpurl())
+
+ def test_InvalidHTTP(self):
+ self.assertRaises(IOError, self.ds.open, invalid_httpurl())
+
+ def test_ValidFile(self):
+ local_file = valid_textfile(self.tmpdir)
+ #print '\nDataSourceOpen test_ValidFile:', local_file
+ assert self.ds.open(local_file)
+
+ def test_InvalidFile(self):
+ invalid_file = invalid_textfile(self.tmpdir)
+ #print '\nDataSourceOpen test_InvalidFile:', invalid_file
+ self.assertRaises(IOError, self.ds.open, invalid_file)
+
+ def test_ValidGzipFile(self):
+ # Test datasource's internal file_opener for Gzip files.
+ filepath = os.path.join(self.tmpdir, 'foobar.txt.gz')
+ #print '\nDataSourceOpen test_ValidGzipFile:', filepath
+ fp = gzip.open(filepath, 'w')
+ fp.write(magic_line)
+ fp.close()
+ fp = self.ds.open(filepath)
+ result = fp.readline()
+ fp.close()
+ self.assertEqual(magic_line, result)
+
+ def test_ValidBz2File(self):
+ # Test datasource's internal file_opener for BZip2 files.
+ filepath = os.path.join(self.tmpdir, 'foobar.txt.bz2')
+ #print '\nDataSourceOpen test_ValidBZ2File:', filepath
+ fp = bz2.BZ2File(filepath, 'w')
+ fp.write(magic_line)
+ fp.close()
+ fp = self.ds.open(filepath)
+ result = fp.readline()
+ fp.close()
+ self.assertEqual(magic_line, result)
+
+
+class TestDataSourceExists(NumpyTestCase):
+ def setUp(self):
+ self.tmpdir = mkdtemp()
+ self.ds = datasource.DataSource(self.tmpdir)
+
+ def tearDown(self):
+ rmtree(self.tmpdir)
+ del self.ds
+
+ def test_ValidHTTP(self):
+ #print 'DataSourceExists test_ValidHTTP'
+ assert self.ds.exists(valid_httpurl())
+
+ def test_InvalidHTTP(self):
+ #print 'DataSourceExists test_InvalidHTTP'
+ self.assertEqual(self.ds.exists(invalid_httpurl()), False)
+
+ def test_ValidFile(self):
+ tmpfile = valid_textfile(self.tmpdir)
+ #print 'DataSourceExists test_ValidFile:', tmpfile
+ assert self.ds.exists(tmpfile)
+
+ def test_InvalidFile(self):
+ tmpfile = invalid_textfile(self.tmpdir)
+ #print 'DataSourceExists test_InvalidFile:', tmpfile
+ self.assertEqual(self.ds.exists(tmpfile), False)
+
+
+class TestDataSourceAbspath(NumpyTestCase):
+ def setUp(self):
+ self.tmpdir = mkdtemp()
+ self.ds = datasource.DataSource(self.tmpdir)
+
+ def tearDown(self):
+ rmtree(self.tmpdir)
+ del self.ds
+
+ def test_ValidHTTP(self):
+ scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
+ local_path = os.path.join(self.tmpdir, netloc, upath.strip(os.sep))
+ self.assertEqual(local_path, self.ds.abspath(valid_httpurl()))
+
+ def test_ValidFile(self):
+ tmpfile = valid_textfile(self.tmpdir)
+ tmpfilename = os.path.split(tmpfile)[-1]
+ # Test with filename only
+ self.assertEqual(tmpfile, self.ds.abspath(os.path.split(tmpfile)[-1]))
+ # Test filename with complete path
+ self.assertEqual(tmpfile, self.ds.abspath(tmpfile))
+
+ def test_InvalidHTTP(self):
+ scheme, netloc, upath, pms, qry, frg = urlparse(invalid_httpurl())
+ invalidhttp = os.path.join(self.tmpdir, netloc, upath.strip(os.sep))
+ self.assertNotEqual(invalidhttp, self.ds.abspath(valid_httpurl()))
+
+ def test_InvalidFile(self):
+ fd, invalidfile = mkstemp(suffix='.txt')
+ os.close(fd)
+ tmpfile = valid_textfile(self.tmpdir)
+ tmpfilename = os.path.split(tmpfile)[-1]
+ # Test with filename only
+ self.assertNotEqual(invalidfile, self.ds.abspath(tmpfilename))
+ # Test filename with complete path
+ self.assertNotEqual(invalidfile, self.ds.abspath(tmpfile))
+
+
+class TestRespositoryAbspath(NumpyTestCase):
+ def setUp(self):
+ self.repos = datasource.Repository(valid_baseurl(), None)
+
+ def tearDown(self):
+ del self.repos
+
+ def test_ValidHTTP(self):
+ scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
+ local_path = os.path.join(self.repos._destpath, netloc, \
+ upath.strip(os.sep))
+ filepath = self.repos.abspath(valid_httpfile())
+ self.assertEqual(local_path, filepath)
+
+
+if __name__ == "__main__":
+ NumpyTest().run()
+
More information about the Scipy-svn
mailing list