# Copyright (C) 2012 Anaconda, Inc
# SPDX-License-Identifier: BSD-3-Clause
import re
import sys
from logging import getLogger
from os.path import basename, dirname, isdir, isfile, join, normcase
from ..auxlib.ish import dals
from ..base.constants import ROOT_ENV_NAME
from ..base.context import context, env_name
from ..common.constants import NULL
from ..common.io import swallow_broken_pipe
from ..common.path import paths_equal
from ..common.serialize import json_dump
from ..deprecations import deprecated
from ..exceptions import DirectoryNotACondaEnvironmentError, EnvironmentLocationNotFound
from ..models.match_spec import MatchSpec
def confirm(message="Proceed", choices=("yes", "no"), default="yes", dry_run=NULL):
assert default in choices, default
if (dry_run is NULL and context.dry_run) or dry_run:
from ..exceptions import DryRunExit
raise DryRunExit()
options = []
for option in choices:
if option == default:
options.append("[%s]" % option[0])
else:
options.append(option[0])
message = "{} ({})? ".format(message, "/".join(options))
choices = {alt: choice for choice in choices for alt in [choice, choice[0]]}
choices[""] = default
while True:
# raw_input has a bug and prints to stderr, not desirable
sys.stdout.write(message)
sys.stdout.flush()
user_choice = sys.stdin.readline().strip().lower()
if user_choice not in choices:
print("Invalid choice: %s" % user_choice)
else:
sys.stdout.write("\n")
sys.stdout.flush()
return choices[user_choice]
def confirm_yn(message="Proceed", default="yes", dry_run=NULL):
if (dry_run is NULL and context.dry_run) or dry_run:
from ..exceptions import DryRunExit
raise DryRunExit()
if context.always_yes:
return True
try:
choice = confirm(
message=message, choices=("yes", "no"), default=default, dry_run=dry_run
)
except KeyboardInterrupt: # pragma: no cover
from ..exceptions import CondaSystemExit
raise CondaSystemExit("\nOperation aborted. Exiting.")
if choice == "no":
from ..exceptions import CondaSystemExit
raise CondaSystemExit("Exiting.")
return True
@deprecated("23.3", "23.9")
def ensure_name_or_prefix(args, command):
if not (args.name or args.prefix):
from ..exceptions import CondaValueError
raise CondaValueError(
"either -n NAME or -p PREFIX option required,\n"
'try "conda %s -h" for more details' % command
)
def is_active_prefix(prefix: str) -> bool:
"""
Determines whether the args we pass in are pointing to the active prefix.
Can be used a validation step to make sure operations are not being
performed on the active prefix.
"""
if context.active_prefix is None:
return False
return (
paths_equal(prefix, context.active_prefix)
# normcasing our prefix check for Windows, for case insensitivity
or normcase(prefix) == normcase(env_name(context.active_prefix))
)
def arg2spec(arg, json=False, update=False):
try:
spec = MatchSpec(arg)
except:
from ..exceptions import CondaValueError
raise CondaValueError("invalid package specification: %s" % arg)
name = spec.name
if not spec._is_simple() and update:
from ..exceptions import CondaValueError
raise CondaValueError(
"""version specifications not allowed with 'update'; use
conda update {}{} or
conda install {}""".format(
name, " " * (len(arg) - len(name)), arg
)
)
return str(spec)
def specs_from_args(args, json=False):
return [arg2spec(arg, json=json) for arg in args]
spec_pat = re.compile(
r"""
(?P<name>[^=<>!\s]+) # package name
\s* # ignore spaces
(
(?P<cc>=[^=]+(=[^=]+)?) # conda constraint
|
(?P<pc>(?:[=!]=|[><]=?|~=).+) # new pip-style constraints
)?$
""",
re.VERBOSE,
)
def strip_comment(line):
return line.split("#")[0].rstrip()
def spec_from_line(line):
m = spec_pat.match(strip_comment(line))
if m is None:
return None
name, cc, pc = (m.group("name").lower(), m.group("cc"), m.group("pc"))
if cc:
return name + cc.replace("=", " ")
elif pc:
if pc.startswith("~= "):
assert (
pc.count("~=") == 1
), f"Overly complex 'Compatible release' spec not handled {line}"
assert pc.count("."), f"No '.' in 'Compatible release' version {line}"
ver = pc.replace("~= ", "")
ver2 = ".".join(ver.split(".")[:-1]) + ".*"
return name + " >=" + ver + ",==" + ver2
else:
return name + " " + pc.replace(" ", "")
else:
return name
def specs_from_url(url, json=False):
from ..gateways.connection.download import TmpDownload
explicit = False
with TmpDownload(url, verbose=False) as path:
specs = []
try:
for line in open(path):
line = line.strip()
if not line or line.startswith("#"):
continue
if line == "@EXPLICIT":
explicit = True
if explicit:
specs.append(line)
continue
spec = spec_from_line(line)
if spec is None:
from ..exceptions import CondaValueError
raise CondaValueError(f"could not parse '{line}' in: {url}")
specs.append(spec)
except OSError as e:
from ..exceptions import CondaFileIOError
raise CondaFileIOError(path, e)
return specs
def names_in_specs(names, specs):
return any(spec.split()[0] in names for spec in specs)
def disp_features(features):
if features:
return "[%s]" % " ".join(features)
else:
return ""
@swallow_broken_pipe
def stdout_json(d):
getLogger("conda.stdout").info(json_dump(d))
def stdout_json_success(success=True, **kwargs):
result = {"success": success}
actions = kwargs.pop("actions", None)
if actions:
if "LINK" in actions:
actions["LINK"] = [prec.dist_fields_dump() for prec in actions["LINK"]]
if "UNLINK" in actions:
actions["UNLINK"] = [prec.dist_fields_dump() for prec in actions["UNLINK"]]
result["actions"] = actions
result.update(kwargs)
stdout_json(result)
def print_envs_list(known_conda_prefixes, output=True):
if output:
print("# conda environments:")
print("#")
def disp_env(prefix):
fmt = "%-20s %s %s"
active = "*" if prefix == context.active_prefix else " "
if prefix == context.root_prefix:
name = ROOT_ENV_NAME
elif any(
paths_equal(envs_dir, dirname(prefix)) for envs_dir in context.envs_dirs
):
name = basename(prefix)
else:
name = ""
if output:
print(fmt % (name, active, prefix))
for prefix in known_conda_prefixes:
disp_env(prefix)
if output:
print()
def check_non_admin():
from ..common._os import is_admin
if not context.non_admin_enabled and not is_admin():
from ..exceptions import OperationNotAllowed
raise OperationNotAllowed(
dals(
"""
The create, install, update, and remove operations have been disabled
on your system for non-privileged users.
"""
)
)
def validate_prefix(prefix):
"""Verifies the prefix is a valid conda environment.
:raises EnvironmentLocationNotFound: Non-existent path or not a directory.
:raises DirectoryNotACondaEnvironmentError: Directory is not a conda environment.
:returns: Valid prefix.
:rtype: str
"""
if isdir(prefix):
if not isfile(join(prefix, "conda-meta", "history")):
raise DirectoryNotACondaEnvironmentError(prefix)
else:
raise EnvironmentLocationNotFound(prefix)
return prefix