# Software License Agreement (BSD License)
#
# Copyright (c) 2013, Open Source Robotics Foundation, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Open Source Robotics Foundation, Inc. nor
# the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import print_function
import gzip
import logging
import os
try:
from cStringIO import StringIO
except ImportError:
from io import BytesIO as StringIO
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
import yaml
logger = logging.getLogger('rosdistro')
from .distribution import Distribution # noqa
from .distribution_cache import DistributionCache # noqa
from .distribution_file import DistributionFile # noqa
from .distribution_file import create_distribution_file # noqa
from .external.appdirs import site_config_dir, user_config_dir # noqa
from .index import Index # noqa
from .loader import load_url # noqa
from .manifest_provider.cache import CachedManifestProvider, CachedSourceManifestProvider # noqa
# same version as in:
# - setup.py
# - stdeb.cfg
__version__ = '0.8.3'
# index information
DEFAULT_INDEX_URL = 'https://raw.githubusercontent.com/ros/rosdistro/master/index-v4.yaml'
[docs]def get_index_url():
# environment variable has precedence over configuration files
if 'ROSDISTRO_INDEX_URL' in os.environ:
return os.environ['ROSDISTRO_INDEX_URL']
def read_cfg_index_url(fname):
try:
with open(fname) as f:
return yaml.safe_load(f.read())['index_url']
except (IOError, KeyError, yaml.YAMLError):
return None
cfg_file = 'config.yaml'
# first, look for the user configuration (usually ~/.config/rosdistro)
user_cfg_path = os.path.join(user_config_dir('rosdistro'), cfg_file)
index_url = read_cfg_index_url(user_cfg_path)
if index_url is not None:
return index_url
# if not found, look for the global configuration *usually /etc/xdg/rosdistro)
site_cfg_paths = os.path.join(site_config_dir('rosdistro', multipath=True), cfg_file).split(os.pathsep)
for site_cfg_path in site_cfg_paths:
index_url = read_cfg_index_url(site_cfg_path)
if index_url is not None:
return index_url
# if nothing is found, use the default
return DEFAULT_INDEX_URL
[docs]def get_index(url):
logger.debug('Load index from "%s"' % url)
yaml_str = load_url(url)
data = yaml.safe_load(yaml_str)
base_url = os.path.dirname(url)
url_parts = urlparse(url)
return Index(data, base_url, url_query=url_parts.query)
# distribution information
[docs]def get_distribution(index, dist_name):
dist_file = get_distribution_file(index, dist_name)
return Distribution(dist_file)
[docs]def get_distribution_file(index, dist_name):
data = _get_dist_file_data(index, dist_name, 'distribution')
return create_distribution_file(dist_name, data)
[docs]def get_distribution_files(index, dist_name):
data = _get_dist_file_data(index, dist_name, 'distribution')
if not isinstance(data, list):
data = [data]
dist_files = []
for d in data:
dist_file = DistributionFile(dist_name, d)
dist_files.append(dist_file)
return dist_files
[docs]def get_cached_distribution(index, dist_name, cache=None, allow_lazy_load=False):
if cache is None:
try:
cache = get_distribution_cache(index, dist_name)
except Exception:
if not allow_lazy_load:
raise
# create empty cache instance
dist_file_data = _get_dist_file_data(index, dist_name, 'distribution')
cache = DistributionCache(dist_name, distribution_file_data=dist_file_data)
dist = Distribution(
cache.distribution_file,
[CachedManifestProvider(cache, Distribution.default_manifest_providers if allow_lazy_load else None)],
[CachedSourceManifestProvider(cache, Distribution.default_source_manifest_providers if allow_lazy_load else None)])
assert cache.distribution_file.name == dist_name
return dist
[docs]def get_distribution_cache_string(index, dist_name):
if dist_name not in index.distributions.keys():
raise RuntimeError("Unknown distribution: '{0}'. Valid distribution names are: {1}".format(dist_name, ', '.join(sorted(index.distributions.keys()))))
dist = index.distributions[dist_name]
if 'distribution_cache' not in dist.keys():
raise RuntimeError("Distribution has no cache: '{0}'".format(dist_name))
url = dist['distribution_cache']
logger.debug('Load cache from "%s"' % url)
if url.endswith('.yaml'):
yaml_str = load_url(url)
elif url.endswith('.yaml.gz'):
yaml_gz_str = load_url(url, skip_decode=True)
yaml_gz_stream = StringIO(yaml_gz_str)
f = gzip.GzipFile(fileobj=yaml_gz_stream, mode='rb')
yaml_str = f.read()
f.close()
if not isinstance(yaml_str, str):
yaml_str = yaml_str.decode('utf-8')
else:
raise NotImplementedError('The url of the cache must end with either ".yaml" or ".yaml.gz"')
return yaml_str
[docs]def get_distribution_cache(index, dist_name):
yaml_str = get_distribution_cache_string(index, dist_name)
data = yaml.safe_load(yaml_str)
return DistributionCache(dist_name, data)
# internal
def _get_dist_file_data(index, dist_name, type_):
if dist_name not in index.distributions.keys():
raise RuntimeError("Unknown release: '{0}'. Valid release names are: {1}".format(dist_name, ', '.join(sorted(index.distributions.keys()))))
dist = index.distributions[dist_name]
if type_ not in dist.keys():
raise RuntimeError('unknown release type "%s"' % type_)
url = dist[type_]
def _load_yaml_data(url):
logger.debug('Load file from "%s"' % url)
yaml_str = load_url(url)
return yaml.safe_load(yaml_str)
if not isinstance(url, list):
data = _load_yaml_data(url)
else:
data = []
for u in url:
data.append(_load_yaml_data(u))
return data
from .legacy import * # noqa