Source code for abydos.util._data

# Copyright 2019-2020 by Christopher C. Little.
# This file is part of Abydos.
# Abydos is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Abydos is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Abydos. If not, see <>.


The util._data module manages datasets from, including downloading them,
decompressing them, and locating them once installed.

Much of this is copied from NLTK's similar facility in, because they seem to have the
issues figured out, because I don't want to expend the effort to re-invent a
solution, and because their license (Apache) allows for it.

import os
import re
import sys

    import urllib.request as urllib
except ImportError:  # pragma: no cover
    import urllib
import zipfile

from xml.etree import ElementTree  # noqa: S405

__all__ = [

DATA_SUBDIRS = ['corpora']

data_path = []
"""A list of directories where the Abydos data package might reside.
   These directories will be checked in order when looking for a
   resource in the data package.  Note that this allows users to
   substitute in their own versions of resources, if they have them
   (e.g., in their home directory under ~/abydos_data)."""

# User-specified locations:
_paths_from_env = os.environ.get('ABYDOS_DATA', str('')).split(
)  # pragma: no cover
data_path += [d for d in _paths_from_env if d]
if 'APPENGINE_RUNTIME' not in os.environ and os.path.expanduser('~/') != '~/':

if sys.platform.startswith('win'):  # pragma: no cover
    # Common locations on Windows:
    data_path += [
        os.path.join(sys.prefix, str('abydos_data')),
        os.path.join(sys.prefix, str('share'), str('abydos_data')),
        os.path.join(sys.prefix, str('lib'), str('abydos_data')),
            os.environ.get(str('APPDATA'), str('C:\\')), str('abydos_data')
    # Common locations on UNIX & OS X:
    data_path += [
        os.path.join(sys.prefix, str('abydos_data')),
        os.path.join(sys.prefix, str('share'), str('abydos_data')),
        os.path.join(sys.prefix, str('lib'), str('abydos_data')),

[docs]def package_path(resource_name): """Given a resource name, returns the path to the package.""" for path in data_path: for subdir in DATA_SUBDIRS: check_path = os.path.join(path, subdir, resource_name) if os.path.isdir(check_path): return check_path msg = 'Data package not found. You may need to install or re-install it.' raise FileNotFoundError(msg)
[docs]def list_installed_packages(path=None): """List all installed data packages.""" if path: paths = [path] else: paths = data_path packages = [] for path in paths: for subdir in DATA_SUBDIRS: check_path = os.path.join(path, subdir) if os.path.isdir(check_path): possible_packages = os.listdir(check_path) for package in possible_packages: if os.path.isdir(os.path.join(check_path, package)): with open( os.path.join(check_path, package + '.xml') ) as xml: file = name ='name="([^"]+)"', file).group(1) version = r'version="([^"]+)"', file ).group(1) packages.append((package, name, float(version))) return packages
[docs]def list_available_packages(url=None): """List all data packages available for install.""" installed_packages = {_[0]: _[2] for _ in list_installed_packages()} if url is None: url = INDEX_URL if url[:8] != 'https://': raise ValueError('url should begin with "https://"') with urllib.urlopen(url) as ix: # noqa: S310 xml = ElementTree.fromstring( # noqa: S314 packages = [ ( _.attrib['id'], _.attrib['name'], float(_.attrib['version']), _.attrib['url'], _.attrib['subdir'], 'not-installed' if _.attrib['id'] not in installed_packages else ( 'up-to-date' if installed_packages[_.attrib['id']] >= float(_.attrib['version']) else 'update available' ), ) for _ in xml.findall('packages/package') ] collections = [ ( _.attrib['id'], _.attrib['name'], [__.attrib['ref'] for __ in _.findall('item')], ) for _ in xml.findall('collections/collection') ] return packages, collections
def _default_download_dir(): """Return the directory to which packages will be downloaded by default. This is mostly copied from NLTK's nltk.downloader.Downloader.default_download_dir """ # Check if we are on GAE where we cannot write into filesystem. if 'APPENGINE_RUNTIME' in os.environ: # pragma: no cover return None # Check if we have sufficient permissions to install in a # variety of system-wide locations. for abydos_data in data_path: if os.path.exists(abydos_data) and os.access( abydos_data, os.W_OK ): # pragma: no cover return abydos_data # On Windows, use %APPDATA% if sys.platform == 'win32' and 'APPDATA' in os.environ: # pragma: no cover homedir = os.environ['APPDATA'] # Otherwise, install in the user's home directory. else: # pragma: no cover homedir = os.path.expanduser('~/') if homedir == '~/': raise ValueError('Could not find a default download directory') # append "abydos_data" to the home directory return os.path.join(homedir, 'abydos_data') # pragma: no cover
[docs]def download_package( resource_name, url=None, data_path=None, force=False, silent=False ): """Download and install a package or collection.""" packages, collections = list_available_packages(url) installed = list_installed_packages(data_path) if data_path is None: data_path = _default_download_dir() os.makedirs(data_path, mode=0o775, exist_ok=True) for coll in collections: if resource_name == coll[0]: if not silent: # pragma: no branch print('Installing {} collection'.format(coll[1])) # noqa: T001 for resource_name in coll[2]: download_package(resource_name, url, data_path) return else: for pack in packages: if resource_name == pack[0]: if not force: for inst in installed: # pragma: no branch if pack[0] == inst[0] and pack[2] <= inst[2]: if not silent: print( # pragma: no cover # noqa: T001 '{} package already up-to-date'.format( pack[1] ) ) return if not silent: # pragma: no branch print( # noqa: T001 'Installing {} package'.format(pack[1]) ) zip_fn = os.path.join(data_path, pack[4], pack[0] + '.zip') os.makedirs( os.path.join(data_path, pack[4]), mode=0o775, exist_ok=True ) urllib.urlretrieve( # noqa: S310 pack[3][:-3] + 'xml', zip_fn[:-3] + 'xml' ) urllib.urlretrieve(pack[3], zip_fn) # noqa: S310 zip_pkg = zipfile.ZipFile(zip_fn) zip_pkg.extractall(os.path.join(data_path, pack[4])) zip_pkg.close() os.remove(zip_fn)
if __name__ == '__main__': import doctest doctest.testmod()