# Copyright (C) 2012 Anaconda, Inc
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import abc
import json
import os
import re
import sys
from collections.abc import Callable, Iterable
from os.path import (
abspath,
basename,
dirname,
exists,
expanduser,
expandvars,
isdir,
join,
)
from pathlib import Path
from textwrap import dedent
# Since we have to have configuration context here, anything imported by
# conda.base.context is fair game, but nothing more.
from . import CONDA_PACKAGE_ROOT, CONDA_SOURCE_ROOT
from .auxlib.compat import Utf8NamedTemporaryFile
from .base.constants import (
CONDA_ENV_VARS_UNSET_VAR,
PACKAGE_ENV_VARS_DIR,
PREFIX_STATE_FILE,
)
from .base.context import ROOT_ENV_NAME, context, locate_prefix_by_name
from .common.compat import FILESYSTEM_ENCODING, on_win
from .common.path import paths_equal
class _Activator(metaclass=abc.ABCMeta):
# Activate and deactivate have three tasks
# 1. Set and unset environment variables
# 2. Execute/source activate.d/deactivate.d scripts
# 3. Update the command prompt
#
# Shells should also use 'reactivate' following conda's install, update, and
# remove/uninstall commands.
#
# All core logic is in build_activate() or build_deactivate(), and is independent of
# shell type. Each returns a map containing the keys:
# export_vars
# unset_var
# activate_scripts
# deactivate_scripts
#
# The value of the CONDA_PROMPT_MODIFIER environment variable holds conda's contribution
# to the command prompt.
#
# To implement support for a new shell, ideally one would only need to add shell-specific
# information to the __init__ method of this class.
# The following instance variables must be defined by each implementation.
pathsep_join: str
sep: str
path_conversion: Callable[
(str | Iterable[str] | None), str | tuple[str, ...] | None
]
script_extension: str
#: temporary file's extension, None writes to stdout instead
tempfile_extension: str | None
command_join: str
unset_var_tmpl: str
export_var_tmpl: str
set_var_tmpl: str
run_script_tmpl: str
hook_source_path: str
def __init__(self, arguments=None):
self._raw_arguments = arguments
self.environ = os.environ.copy()
def get_export_unset_vars(self, export_metavars=True, **kwargs):
"""
:param export_metavars: whether to export `conda_exe_vars` meta variables.
:param kwargs: environment variables to export.
.. if you pass and set any other variable to None, then it
emits it to the dict with a value of None.
:return: A dict of env vars to export ordered the same way as kwargs.
And a list of env vars to unset.
"""
unset_vars = []
export_vars = {}
# split provided environment variables into exports vs unsets
for name, value in kwargs.items():
if value is None:
unset_vars.append(name.upper())
else:
export_vars[name.upper()] = value
if export_metavars:
# split meta variables into exports vs unsets
for name, value in context.conda_exe_vars_dict.items():
if value is None:
unset_vars.append(name.upper())
elif "/" in value or "\\" in value:
export_vars[name.upper()] = self.path_conversion(value)
else:
export_vars[name.upper()] = value
else:
# unset all meta variables
unset_vars.extend(context.conda_exe_vars_dict)
return export_vars, unset_vars
# Used in tests only.
def add_export_unset_vars(self, export_vars, unset_vars, **kwargs):
new_export_vars, new_unset_vars = self.get_export_unset_vars(**kwargs)
if export_vars is not None:
export_vars = {**export_vars, **new_export_vars}
if unset_vars is not None:
unset_vars = [*unset_vars, *new_unset_vars]
return export_vars, unset_vars
# Used in tests only.
def get_scripts_export_unset_vars(self, **kwargs):
export_vars, unset_vars = self.get_export_unset_vars(**kwargs)
script_export_vars = script_unset_vars = None
if export_vars:
script_export_vars = self.command_join.join(
[self.export_var_tmpl % (k, v) for k, v in export_vars.items()]
)
if unset_vars:
script_unset_vars = self.command_join.join(
[self.unset_var_tmpl % (k) for k in unset_vars]
)
return script_export_vars or "", script_unset_vars or ""
def _finalize(self, commands, ext):
commands = (*commands, "") # add terminating newline
if ext is None:
return self.command_join.join(commands)
elif ext:
with Utf8NamedTemporaryFile("w+", suffix=ext, delete=False) as tf:
# the default mode is 'w+b', and universal new lines don't work in that mode
# command_join should account for that
tf.write(self.command_join.join(commands))
return tf.name
else:
raise NotImplementedError()
def activate(self):
if self.stack:
builder_result = self.build_stack(self.env_name_or_prefix)
else:
builder_result = self.build_activate(self.env_name_or_prefix)
return self._finalize(
self._yield_commands(builder_result), self.tempfile_extension
)
def deactivate(self):
return self._finalize(
self._yield_commands(self.build_deactivate()), self.tempfile_extension
)
def reactivate(self):
return self._finalize(
self._yield_commands(self.build_reactivate()), self.tempfile_extension
)
def hook(self, auto_activate_base=None):
builder = []
builder.append(self._hook_preamble())
with open(self.hook_source_path) as fsrc:
builder.append(fsrc.read())
if (
auto_activate_base is None
and context.auto_activate_base
or auto_activate_base
):
builder.append("conda activate base\n")
postamble = self._hook_postamble()
if postamble is not None:
builder.append(postamble)
return "\n".join(builder)
def execute(self):
# return value meant to be written to stdout
self._parse_and_set_args(self._raw_arguments)
return getattr(self, self.command)()
def commands(self):
"""
Returns a list of possible subcommands that are valid
immediately following `conda` at the command line.
This method is generally only used by tab-completion.
"""
# Import locally to reduce impact on initialization time.
from .cli.conda_argparse import find_builtin_commands, generate_parser
from .cli.find_commands import find_commands
# return value meant to be written to stdout
# Hidden commands to provide metadata to shells.
return "\n".join(
sorted(
find_builtin_commands(generate_parser()) + tuple(find_commands(True))
)
)
@abc.abstractmethod
def _hook_preamble(self) -> str | None:
# must be implemented in subclass
raise NotImplementedError
def _hook_postamble(self) -> str | None:
return None
def _parse_and_set_args(self, arguments):
def raise_invalid_command_error(actual_command=None):
from .exceptions import ArgumentError
message = (
"'activate', 'deactivate', 'hook', 'commands', or 'reactivate' "
"command must be given"
)
if actual_command:
message += ". Instead got '%s'." % actual_command
raise ArgumentError(message)
if arguments is None or len(arguments) < 1:
raise_invalid_command_error()
command, *arguments = arguments
help_flags = ("-h", "--help", "/?")
non_help_args = tuple(arg for arg in arguments if arg not in help_flags)
help_requested = len(arguments) != len(non_help_args)
remainder_args = list(arg for arg in non_help_args if arg and arg != command)
if not command:
raise_invalid_command_error()
elif help_requested:
from .exceptions import ActivateHelp, DeactivateHelp, GenericHelp
help_classes = {
"activate": ActivateHelp(),
"deactivate": DeactivateHelp(),
"hook": GenericHelp("hook"),
"commands": GenericHelp("commands"),
"reactivate": GenericHelp("reactivate"),
}
raise help_classes[command]
elif command not in (
"activate",
"deactivate",
"reactivate",
"hook",
"commands",
):
raise_invalid_command_error(actual_command=command)
if command.endswith("activate") or command == "hook":
try:
dev_idx = remainder_args.index("--dev")
except ValueError:
context.dev = False
else:
del remainder_args[dev_idx]
context.dev = True
if command == "activate":
self.stack = context.auto_stack and context.shlvl <= context.auto_stack
try:
stack_idx = remainder_args.index("--stack")
except ValueError:
stack_idx = -1
try:
no_stack_idx = remainder_args.index("--no-stack")
except ValueError:
no_stack_idx = -1
if stack_idx >= 0 and no_stack_idx >= 0:
from .exceptions import ArgumentError
raise ArgumentError(
"cannot specify both --stack and --no-stack to " + command
)
if stack_idx >= 0:
self.stack = True
del remainder_args[stack_idx]
if no_stack_idx >= 0:
self.stack = False
del remainder_args[no_stack_idx]
if len(remainder_args) > 1:
from .exceptions import ArgumentError
raise ArgumentError(
command
+ " does not accept more than one argument:\n"
+ str(remainder_args)
+ "\n"
)
self.env_name_or_prefix = remainder_args and remainder_args[0] or "base"
else:
if remainder_args:
from .exceptions import ArgumentError
raise ArgumentError(
"%s does not accept arguments\nremainder_args: %s\n"
% (command, remainder_args)
)
self.command = command
def _yield_commands(self, cmds_dict):
for key, value in sorted(cmds_dict.get("export_path", {}).items()):
yield self.export_var_tmpl % (key, value)
for script in cmds_dict.get("deactivate_scripts", ()):
yield self.run_script_tmpl % script
for key in cmds_dict.get("unset_vars", ()):
yield self.unset_var_tmpl % key
for key, value in cmds_dict.get("set_vars", {}).items():
yield self.set_var_tmpl % (key, value)
for key, value in cmds_dict.get("export_vars", {}).items():
yield self.export_var_tmpl % (key, value)
for script in cmds_dict.get("activate_scripts", ()):
yield self.run_script_tmpl % script
def build_activate(self, env_name_or_prefix):
return self._build_activate_stack(env_name_or_prefix, False)
def build_stack(self, env_name_or_prefix):
return self._build_activate_stack(env_name_or_prefix, True)
def _build_activate_stack(self, env_name_or_prefix, stack):
# get environment prefix
if re.search(r"\\|/", env_name_or_prefix):
prefix = expand(env_name_or_prefix)
if not isdir(join(prefix, "conda-meta")):
from .exceptions import EnvironmentLocationNotFound
raise EnvironmentLocationNotFound(prefix)
elif env_name_or_prefix in (ROOT_ENV_NAME, "root"):
prefix = context.root_prefix
else:
prefix = locate_prefix_by_name(env_name_or_prefix)
# get prior shlvl and prefix
old_conda_shlvl = int(self.environ.get("CONDA_SHLVL", "").strip() or 0)
old_conda_prefix = self.environ.get("CONDA_PREFIX")
# if the prior active prefix is this prefix we are actually doing a reactivate
if old_conda_prefix == prefix and old_conda_shlvl > 0:
return self.build_reactivate()
activate_scripts = self._get_activate_scripts(prefix)
conda_shlvl = old_conda_shlvl + 1
conda_default_env = self._default_env(prefix)
conda_prompt_modifier = self._prompt_modifier(prefix, conda_default_env)
env_vars = {
name: value
for name, value in self._get_environment_env_vars(prefix).items()
if value != CONDA_ENV_VARS_UNSET_VAR
}
# get clobbered environment variables
clobber_vars = set(env_vars.keys()).intersection(os.environ.keys())
clobber_vars = set(
filter(lambda var: env_vars[var] != os.environ[var], clobber_vars)
)
if clobber_vars:
print(
"WARNING: overwriting environment variables set in the machine",
file=sys.stderr,
)
print(f"overwriting variable {clobber_vars}", file=sys.stderr)
for name in clobber_vars:
env_vars[f"__CONDA_SHLVL_{old_conda_shlvl}_{name}"] = os.environ.get(name)
if old_conda_shlvl == 0:
export_vars, unset_vars = self.get_export_unset_vars(
path=self.pathsep_join(self._add_prefix_to_path(prefix)),
conda_prefix=prefix,
conda_shlvl=conda_shlvl,
conda_default_env=conda_default_env,
conda_prompt_modifier=conda_prompt_modifier,
**env_vars,
)
deactivate_scripts = ()
elif stack:
export_vars, unset_vars = self.get_export_unset_vars(
path=self.pathsep_join(self._add_prefix_to_path(prefix)),
conda_prefix=prefix,
conda_shlvl=conda_shlvl,
conda_default_env=conda_default_env,
conda_prompt_modifier=conda_prompt_modifier,
**env_vars,
**{
f"CONDA_PREFIX_{old_conda_shlvl}": old_conda_prefix,
f"CONDA_STACKED_{conda_shlvl}": "true",
},
)
deactivate_scripts = ()
else:
export_vars, unset_vars = self.get_export_unset_vars(
path=self.pathsep_join(
self._replace_prefix_in_path(old_conda_prefix, prefix)
),
conda_prefix=prefix,
conda_shlvl=conda_shlvl,
conda_default_env=conda_default_env,
conda_prompt_modifier=conda_prompt_modifier,
**env_vars,
**{
f"CONDA_PREFIX_{old_conda_shlvl}": old_conda_prefix,
},
)
deactivate_scripts = self._get_deactivate_scripts(old_conda_prefix)
set_vars = {}
if context.changeps1:
self._update_prompt(set_vars, conda_prompt_modifier)
return {
"unset_vars": unset_vars,
"set_vars": set_vars,
"export_vars": export_vars,
"deactivate_scripts": deactivate_scripts,
"activate_scripts": activate_scripts,
}
def build_deactivate(self):
self._deactivate = True
# query environment
old_conda_prefix = self.environ.get("CONDA_PREFIX")
old_conda_shlvl = int(self.environ.get("CONDA_SHLVL", "").strip() or 0)
if not old_conda_prefix or old_conda_shlvl < 1:
# no active environment, so cannot deactivate; do nothing
return {
"unset_vars": (),
"set_vars": {},
"export_vars": {},
"deactivate_scripts": (),
"activate_scripts": (),
}
deactivate_scripts = self._get_deactivate_scripts(old_conda_prefix)
old_conda_environment_env_vars = self._get_environment_env_vars(
old_conda_prefix
)
new_conda_shlvl = old_conda_shlvl - 1
set_vars = {}
if old_conda_shlvl == 1:
new_path = self.pathsep_join(
self._remove_prefix_from_path(old_conda_prefix)
)
# You might think that you can remove the CONDA_EXE vars with export_metavars=False
# here so that "deactivate means deactivate" but you cannot since the conda shell
# scripts still refer to them and they only set them once at the top. We could change
# that though, the conda() shell function could set them instead of doing it at the
# top. This would be *much* cleaner. I personally cannot abide that I have
# deactivated conda and anything at all in my env still references it (apart from the
# shell script, we need something I suppose!)
export_vars, unset_vars = self.get_export_unset_vars(
conda_prefix=None,
conda_shlvl=new_conda_shlvl,
conda_default_env=None,
conda_prompt_modifier=None,
)
conda_prompt_modifier = ""
activate_scripts = ()
export_path = {
"PATH": new_path,
}
else:
assert old_conda_shlvl > 1
new_prefix = self.environ.get("CONDA_PREFIX_%d" % new_conda_shlvl)
conda_default_env = self._default_env(new_prefix)
conda_prompt_modifier = self._prompt_modifier(new_prefix, conda_default_env)
new_conda_environment_env_vars = self._get_environment_env_vars(new_prefix)
old_prefix_stacked = "CONDA_STACKED_%d" % old_conda_shlvl in self.environ
new_path = ""
unset_vars = ["CONDA_PREFIX_%d" % new_conda_shlvl]
if old_prefix_stacked:
new_path = self.pathsep_join(
self._remove_prefix_from_path(old_conda_prefix)
)
unset_vars.append("CONDA_STACKED_%d" % old_conda_shlvl)
else:
new_path = self.pathsep_join(
self._replace_prefix_in_path(old_conda_prefix, new_prefix)
)
export_vars, unset_vars2 = self.get_export_unset_vars(
conda_prefix=new_prefix,
conda_shlvl=new_conda_shlvl,
conda_default_env=conda_default_env,
conda_prompt_modifier=conda_prompt_modifier,
**new_conda_environment_env_vars,
)
unset_vars += unset_vars2
export_path = {
"PATH": new_path,
}
activate_scripts = self._get_activate_scripts(new_prefix)
if context.changeps1:
self._update_prompt(set_vars, conda_prompt_modifier)
for env_var in old_conda_environment_env_vars.keys():
unset_vars.append(env_var)
save_var = f"__CONDA_SHLVL_{new_conda_shlvl}_{env_var}"
if save_var in os.environ.keys():
export_vars[env_var] = os.environ[save_var]
return {
"unset_vars": unset_vars,
"set_vars": set_vars,
"export_vars": export_vars,
"export_path": export_path,
"deactivate_scripts": deactivate_scripts,
"activate_scripts": activate_scripts,
}
def build_reactivate(self):
self._reactivate = True
conda_prefix = self.environ.get("CONDA_PREFIX")
conda_shlvl = int(self.environ.get("CONDA_SHLVL", "").strip() or 0)
if not conda_prefix or conda_shlvl < 1:
# no active environment, so cannot reactivate; do nothing
return {
"unset_vars": (),
"set_vars": {},
"export_vars": {},
"deactivate_scripts": (),
"activate_scripts": (),
}
conda_default_env = self.environ.get(
"CONDA_DEFAULT_ENV", self._default_env(conda_prefix)
)
new_path = self.pathsep_join(
self._replace_prefix_in_path(conda_prefix, conda_prefix)
)
set_vars = {}
conda_prompt_modifier = self._prompt_modifier(conda_prefix, conda_default_env)
if context.changeps1:
self._update_prompt(set_vars, conda_prompt_modifier)
env_vars_to_unset = ()
env_vars_to_export = {
"PATH": new_path,
"CONDA_SHLVL": conda_shlvl,
"CONDA_PROMPT_MODIFIER": self._prompt_modifier(
conda_prefix, conda_default_env
),
}
conda_environment_env_vars = self._get_environment_env_vars(conda_prefix)
for k, v in conda_environment_env_vars.items():
if v == CONDA_ENV_VARS_UNSET_VAR:
env_vars_to_unset = env_vars_to_unset + (k,)
else:
env_vars_to_export[k] = v
# environment variables are set only to aid transition from conda 4.3 to conda 4.4
return {
"unset_vars": env_vars_to_unset,
"set_vars": set_vars,
"export_vars": env_vars_to_export,
"deactivate_scripts": self._get_deactivate_scripts(conda_prefix),
"activate_scripts": self._get_activate_scripts(conda_prefix),
}
def _get_starting_path_list(self):
# For isolation, running the conda test suite *without* env. var. inheritance
# every so often is a good idea. We should probably make this a pytest fixture
# along with one that tests both hardlink-only and copy-only, but before that
# conda's testsuite needs to be a lot faster!
clean_paths = {
"darwin": "/usr/bin:/bin:/usr/sbin:/sbin",
# You may think 'let us do something more clever here and interpolate
# `%windir%`' but the point here is the the whole env. is cleaned out
"win32": "C:\\Windows\\system32;"
"C:\\Windows;"
"C:\\Windows\\System32\\Wbem;"
"C:\\Windows\\System32\\WindowsPowerShell\\v1.0\\",
}
path = self.environ.get(
"PATH",
clean_paths[sys.platform] if sys.platform in clean_paths else "/usr/bin",
)
path_split = path.split(os.pathsep)
return path_split
def _get_path_dirs(self, prefix, extra_library_bin=False):
if on_win: # pragma: unix no cover
yield prefix.rstrip("\\")
yield self.sep.join((prefix, "Library", "mingw-w64", "bin"))
yield self.sep.join((prefix, "Library", "usr", "bin"))
yield self.sep.join((prefix, "Library", "bin"))
yield self.sep.join((prefix, "Scripts"))
yield self.sep.join((prefix, "bin"))
else:
yield self.sep.join((prefix, "bin"))
def _add_prefix_to_path(self, prefix, starting_path_dirs=None):
prefix = self.path_conversion(prefix)
if starting_path_dirs is None:
path_list = list(self.path_conversion(self._get_starting_path_list()))
else:
path_list = list(self.path_conversion(starting_path_dirs))
# If this is the first time we're activating an environment, we need to ensure that
# the condabin directory is included in the path list.
# Under normal conditions, if the shell hook is working correctly, this should
# never trigger.
old_conda_shlvl = int(self.environ.get("CONDA_SHLVL", "").strip() or 0)
if not old_conda_shlvl and not any(p.endswith("condabin") for p in path_list):
condabin_dir = self.path_conversion(join(context.conda_prefix, "condabin"))
path_list.insert(0, condabin_dir)
path_list[0:0] = list(self.path_conversion(self._get_path_dirs(prefix)))
return tuple(path_list)
def _remove_prefix_from_path(self, prefix, starting_path_dirs=None):
return self._replace_prefix_in_path(prefix, None, starting_path_dirs)
def _replace_prefix_in_path(self, old_prefix, new_prefix, starting_path_dirs=None):
old_prefix = self.path_conversion(old_prefix)
new_prefix = self.path_conversion(new_prefix)
if starting_path_dirs is None:
path_list = list(self.path_conversion(self._get_starting_path_list()))
else:
path_list = list(self.path_conversion(starting_path_dirs))
def index_of_path(paths, test_path):
for q, path in enumerate(paths):
if paths_equal(path, test_path):
return q
return None
if old_prefix is not None:
prefix_dirs = tuple(self._get_path_dirs(old_prefix))
first_idx = index_of_path(path_list, prefix_dirs[0])
if first_idx is None:
first_idx = 0
else:
prefix_dirs_idx = len(prefix_dirs) - 1
last_idx = None
while last_idx is None and prefix_dirs_idx > -1:
last_idx = index_of_path(path_list, prefix_dirs[prefix_dirs_idx])
if last_idx is None:
print(
f"Did not find path entry {prefix_dirs[prefix_dirs_idx]}",
file=sys.stderr,
)
prefix_dirs_idx = prefix_dirs_idx - 1
# this compensates for an extra Library/bin dir entry from the interpreter on
# windows. If that entry isn't being added, it should have no effect.
library_bin_dir = self.path_conversion(
self.sep.join((sys.prefix, "Library", "bin"))
)
if path_list[last_idx + 1] == library_bin_dir:
last_idx += 1
del path_list[first_idx : last_idx + 1]
else:
first_idx = 0
if new_prefix is not None:
path_list[first_idx:first_idx] = list(self._get_path_dirs(new_prefix))
return tuple(path_list)
def _update_prompt(self, set_vars, conda_prompt_modifier):
pass
def _default_env(self, prefix):
if paths_equal(prefix, context.root_prefix):
return "base"
return basename(prefix) if basename(dirname(prefix)) == "envs" else prefix
def _prompt_modifier(self, prefix, conda_default_env):
if context.changeps1:
# Get current environment and prompt stack
env_stack = []
prompt_stack = []
old_shlvl = int(self.environ.get("CONDA_SHLVL", "0").rstrip())
for i in range(1, old_shlvl + 1):
if i == old_shlvl:
env_i = self._default_env(self.environ.get("CONDA_PREFIX", ""))
else:
env_i = self._default_env(
self.environ.get(f"CONDA_PREFIX_{i}", "").rstrip()
)
stacked_i = bool(self.environ.get(f"CONDA_STACKED_{i}", "").rstrip())
env_stack.append(env_i)
if not stacked_i:
prompt_stack = prompt_stack[0:-1]
prompt_stack.append(env_i)
# Modify prompt stack according to pending operation
deactivate = getattr(self, "_deactivate", False)
reactivate = getattr(self, "_reactivate", False)
if deactivate:
prompt_stack = prompt_stack[0:-1]
env_stack = env_stack[0:-1]
stacked = bool(
self.environ.get(f"CONDA_STACKED_{old_shlvl}", "").rstrip()
)
if not stacked and env_stack:
prompt_stack.append(env_stack[-1])
elif reactivate:
pass
else:
stack = getattr(self, "stack", False)
if not stack:
prompt_stack = prompt_stack[0:-1]
prompt_stack.append(conda_default_env)
conda_stacked_env = ",".join(prompt_stack[::-1])
return context.env_prompt.format(
default_env=conda_default_env,
stacked_env=conda_stacked_env,
prefix=prefix,
name=basename(prefix),
)
else:
return ""
def _get_activate_scripts(self, prefix):
_script_extension = self.script_extension
se_len = -len(_script_extension)
try:
paths = (
entry.path
for entry in os.scandir(join(prefix, "etc", "conda", "activate.d"))
)
except OSError:
return ()
return self.path_conversion(
sorted(p for p in paths if p[se_len:] == _script_extension)
)
def _get_deactivate_scripts(self, prefix):
_script_extension = self.script_extension
se_len = -len(_script_extension)
try:
paths = (
entry.path
for entry in os.scandir(join(prefix, "etc", "conda", "deactivate.d"))
)
except OSError:
return ()
return self.path_conversion(
sorted((p for p in paths if p[se_len:] == _script_extension), reverse=True)
)
def _get_environment_env_vars(self, prefix):
env_vars_file = join(prefix, PREFIX_STATE_FILE)
pkg_env_var_dir = join(prefix, PACKAGE_ENV_VARS_DIR)
env_vars = {}
# First get env vars from packages
if exists(pkg_env_var_dir):
for pkg_env_var_path in sorted(
entry.path for entry in os.scandir(pkg_env_var_dir)
):
with open(pkg_env_var_path) as f:
env_vars.update(json.loads(f.read()))
# Then get env vars from environment specification
if exists(env_vars_file):
with open(env_vars_file) as f:
prefix_state = json.loads(f.read())
prefix_state_env_vars = prefix_state.get("env_vars", {})
dup_vars = [
ev for ev in env_vars.keys() if ev in prefix_state_env_vars.keys()
]
for dup in dup_vars:
print(
"WARNING: duplicate env vars detected. Vars from the environment "
"will overwrite those from packages",
file=sys.stderr,
)
print("variable %s duplicated" % dup, file=sys.stderr)
env_vars.update(prefix_state_env_vars)
return env_vars
def expand(path):
return abspath(expanduser(expandvars(path)))
def ensure_binary(value):
try:
return value.encode("utf-8")
except AttributeError: # pragma: no cover
# AttributeError: '<>' object has no attribute 'encode'
# In this case assume already binary type and do nothing
return value
def ensure_fs_path_encoding(value):
try:
return value.decode(FILESYSTEM_ENCODING)
except AttributeError:
return value
def native_path_to_unix(
paths: str | Iterable[str] | None,
) -> str | tuple[str, ...] | None:
if paths is None:
return None
elif not on_win:
return path_identity(paths)
# short-circuit if we don't get any paths
paths = paths if isinstance(paths, str) else tuple(paths)
if not paths:
return "." if isinstance(paths, str) else ()
# on windows, uses cygpath to convert windows native paths to posix paths
from shutil import which
from subprocess import run
# It is very easy to end up with a bash in one place and a cygpath in another due to e.g.
# using upstream MSYS2 bash, but with a conda env that does not have bash but does have
# cygpath. When this happens, we have two different virtual POSIX machines, rooted at
# different points in the Windows filesystem. We do our path conversions with one and
# expect the results to work with the other. It does not.
bash = which("bash")
cygpath = (Path(bash).parent / "cygpath") if bash else "cygpath"
joined = paths if isinstance(paths, str) else os.pathsep.join(paths)
try:
# if present, use cygpath to convert paths since its more reliable
unix_path = run(
[cygpath, "--path", joined],
text=True,
capture_output=True,
check=True,
).stdout.strip()
except FileNotFoundError:
# fallback logic when cygpath is not available
# i.e. conda without anything else installed
def _translation(found_path):
found = (
found_path.group(1)
.replace("\\", "/")
.replace(":", "")
.replace("//", "/")
)
return "/" + found.rstrip("/")
unix_path = (
re.sub(
r"([a-zA-Z]:[\/\\\\]+(?:[^:*?\"<>|;]+[\/\\\\]*)*)",
_translation,
joined,
)
.replace(";/", ":/")
.rstrip(";")
)
unix_path = unix_path.split(":") if unix_path else ()
return unix_path[0] if isinstance(paths, str) else tuple(unix_path)
def path_identity(paths: str | Iterable[str] | None) -> str | tuple[str, ...] | None:
if paths is None:
return None
elif isinstance(paths, str):
return os.path.normpath(paths)
else:
return tuple(os.path.normpath(path) for path in paths)
def backslash_to_forwardslash(
paths: str | Iterable[str] | None,
) -> str | tuple[str, ...] | None:
if paths is None:
return None
elif isinstance(paths, str):
return paths.replace("\\", "/")
else:
return tuple([path.replace("\\", "/") for path in paths])
class PosixActivator(_Activator):
pathsep_join = ":".join
sep = "/"
path_conversion = staticmethod(native_path_to_unix)
script_extension = ".sh"
tempfile_extension = None # output to stdout
command_join = "\n"
unset_var_tmpl = "unset %s"
export_var_tmpl = "export %s='%s'"
set_var_tmpl = "%s='%s'"
run_script_tmpl = '. "%s"'
hook_source_path = join(
CONDA_PACKAGE_ROOT,
"shell",
"etc",
"profile.d",
"conda.sh",
)
def _update_prompt(self, set_vars, conda_prompt_modifier):
ps1 = self.environ.get("PS1", "")
if "POWERLINE_COMMAND" in ps1:
# Defer to powerline (https://github.com/powerline/powerline) if it's in use.
return
current_prompt_modifier = self.environ.get("CONDA_PROMPT_MODIFIER")
if current_prompt_modifier:
ps1 = re.sub(re.escape(current_prompt_modifier), r"", ps1)
# Because we're using single-quotes to set shell variables, we need to handle the
# proper escaping of single quotes that are already part of the string.
# Best solution appears to be https://stackoverflow.com/a/1250279
ps1 = ps1.replace("'", "'\"'\"'")
set_vars.update(
{
"PS1": conda_prompt_modifier + ps1,
}
)
def _hook_preamble(self) -> str:
result = []
for key, value in context.conda_exe_vars_dict.items():
if value is None:
# Using `unset_var_tmpl` would cause issues for people running
# with shell flag -u set (error on unset).
result.append(self.export_var_tmpl % (key, ""))
elif on_win and ("/" in value or "\\" in value):
result.append(f'''export {key}="$(cygpath '{value}')"''')
else:
result.append(self.export_var_tmpl % (key, value))
return "\n".join(result) + "\n"
class CshActivator(_Activator):
pathsep_join = ":".join
sep = "/"
path_conversion = staticmethod(native_path_to_unix)
script_extension = ".csh"
tempfile_extension = None # output to stdout
command_join = ";\n"
unset_var_tmpl = "unsetenv %s"
export_var_tmpl = 'setenv %s "%s"'
set_var_tmpl = "set %s='%s'"
run_script_tmpl = 'source "%s"'
hook_source_path = join(
CONDA_PACKAGE_ROOT,
"shell",
"etc",
"profile.d",
"conda.csh",
)
def _update_prompt(self, set_vars, conda_prompt_modifier):
prompt = self.environ.get("prompt", "")
current_prompt_modifier = self.environ.get("CONDA_PROMPT_MODIFIER")
if current_prompt_modifier:
prompt = re.sub(re.escape(current_prompt_modifier), r"", prompt)
set_vars.update(
{
"prompt": conda_prompt_modifier + prompt,
}
)
def _hook_preamble(self) -> str:
if on_win:
return dedent(
f"""
setenv CONDA_EXE `cygpath {context.conda_exe}`
setenv _CONDA_ROOT `cygpath {context.conda_prefix}`
setenv _CONDA_EXE `cygpath {context.conda_exe}`
setenv CONDA_PYTHON_EXE `cygpath {sys.executable}`
"""
).strip()
else:
return dedent(
f"""
setenv CONDA_EXE "{context.conda_exe}"
setenv _CONDA_ROOT "{context.conda_prefix}"
setenv _CONDA_EXE "{context.conda_exe}"
setenv CONDA_PYTHON_EXE "{sys.executable}"
"""
).strip()
class XonshActivator(_Activator):
pathsep_join = ";".join if on_win else ":".join
sep = "/"
path_conversion = staticmethod(
backslash_to_forwardslash if on_win else path_identity
)
# 'scripts' really refer to de/activation scripts, not scripts in the language per se
# xonsh can piggy-back activation scripts from other languages depending on the platform
script_extension = ".bat" if on_win else ".sh"
tempfile_extension = None # output to stdout
command_join = "\n"
unset_var_tmpl = "del $%s"
export_var_tmpl = "$%s = '%s'"
# TODO: determine if different than export_var_tmpl
set_var_tmpl = "$%s = '%s'"
run_script_tmpl = (
'source-cmd --suppress-skip-message "%s"'
if on_win
else 'source-bash --suppress-skip-message "%s"'
)
hook_source_path = join(CONDA_PACKAGE_ROOT, "shell", "conda.xsh")
def _hook_preamble(self) -> str:
return '$CONDA_EXE = "%s"' % self.path_conversion(context.conda_exe)
class CmdExeActivator(_Activator):
pathsep_join = ";".join
sep = "\\"
path_conversion = staticmethod(path_identity)
script_extension = ".bat"
tempfile_extension = ".bat"
command_join = "\n"
unset_var_tmpl = "@SET %s="
export_var_tmpl = '@SET "%s=%s"'
# TODO: determine if different than export_var_tmpl
set_var_tmpl = '@SET "%s=%s"'
run_script_tmpl = '@CALL "%s"'
hook_source_path = None
def _hook_preamble(self) -> None:
# TODO: cmd.exe doesn't get a hook function? Or do we need to do something different?
# Like, for cmd.exe only, put a special directory containing only conda.bat on PATH?
pass
class FishActivator(_Activator):
pathsep_join = '" "'.join
sep = "/"
path_conversion = staticmethod(native_path_to_unix)
script_extension = ".fish"
tempfile_extension = None # output to stdout
command_join = ";\n"
unset_var_tmpl = "set -e %s"
export_var_tmpl = 'set -gx %s "%s"'
set_var_tmpl = 'set -g %s "%s"'
run_script_tmpl = 'source "%s"'
hook_source_path = join(
CONDA_PACKAGE_ROOT,
"shell",
"etc",
"fish",
"conf.d",
"conda.fish",
)
def _hook_preamble(self) -> str:
if on_win:
return dedent(
f"""
set -gx CONDA_EXE (cygpath "{context.conda_exe}")
set _CONDA_ROOT (cygpath "{context.conda_prefix}")
set _CONDA_EXE (cygpath "{context.conda_exe}")
set -gx CONDA_PYTHON_EXE (cygpath "{sys.executable}")
"""
).strip()
else:
return dedent(
f"""
set -gx CONDA_EXE "{context.conda_exe}"
set _CONDA_ROOT "{context.conda_prefix}"
set _CONDA_EXE "{context.conda_exe}"
set -gx CONDA_PYTHON_EXE "{sys.executable}"
"""
).strip()
class PowerShellActivator(_Activator):
pathsep_join = ";".join if on_win else ":".join
sep = "\\" if on_win else "/"
path_conversion = staticmethod(path_identity)
script_extension = ".ps1"
tempfile_extension = None # output to stdout
command_join = "\n"
unset_var_tmpl = '$Env:%s = ""'
export_var_tmpl = '$Env:%s = "%s"'
set_var_tmpl = '$Env:%s = "%s"'
run_script_tmpl = '. "%s"'
hook_source_path = join(
CONDA_PACKAGE_ROOT,
"shell",
"condabin",
"conda-hook.ps1",
)
def _hook_preamble(self) -> str:
if context.dev:
return dedent(
f"""
$Env:PYTHONPATH = "{CONDA_SOURCE_ROOT}"
$Env:CONDA_EXE = "{sys.executable}"
$Env:_CE_M = "-m"
$Env:_CE_CONDA = "conda"
$Env:_CONDA_ROOT = "{CONDA_PACKAGE_ROOT}"
$Env:_CONDA_EXE = "{context.conda_exe}"
$CondaModuleArgs = @{{ChangePs1 = ${context.changeps1}}}
"""
).strip()
else:
return dedent(
f"""
$Env:CONDA_EXE = "{context.conda_exe}"
$Env:_CE_M = ""
$Env:_CE_CONDA = ""
$Env:_CONDA_ROOT = "{context.conda_prefix}"
$Env:_CONDA_EXE = "{context.conda_exe}"
$CondaModuleArgs = @{{ChangePs1 = ${context.changeps1}}}
"""
).strip()
def _hook_postamble(self) -> str:
return "Remove-Variable CondaModuleArgs"
class JSONFormatMixin(_Activator):
"""Returns the necessary values for activation as JSON, so that tools can use them."""
pathsep_join = list
tempfile_extension = None # output to stdout
command_join = list
def _hook_preamble(self):
if context.dev:
return {
"PYTHONPATH": CONDA_SOURCE_ROOT,
"CONDA_EXE": sys.executable,
"_CE_M": "-m",
"_CE_CONDA": "conda",
"_CONDA_ROOT": CONDA_PACKAGE_ROOT,
"_CONDA_EXE": context.conda_exe,
}
else:
return {
"CONDA_EXE": context.conda_exe,
"_CE_M": "",
"_CE_CONDA": "",
"_CONDA_ROOT": context.conda_prefix,
"_CONDA_EXE": context.conda_exe,
}
def get_scripts_export_unset_vars(self, **kwargs):
export_vars, unset_vars = self.get_export_unset_vars(**kwargs)
script_export_vars = script_unset_vars = None
if export_vars:
script_export_vars = dict(export_vars.items())
if unset_vars:
script_unset_vars = unset_vars
return script_export_vars or {}, script_unset_vars or []
def _finalize(self, commands, ext):
merged = {}
for _cmds in commands:
merged.update(_cmds)
commands = merged
if ext is None:
return json.dumps(commands, indent=2)
elif ext:
with Utf8NamedTemporaryFile("w+", suffix=ext, delete=False) as tf:
# the default mode is 'w+b', and universal new lines don't work in that mode
# command_join should account for that
json.dump(commands, tf, indent=2)
return tf.name
else:
raise NotImplementedError()
def _yield_commands(self, cmds_dict):
# TODO: _Is_ defining our own object shape here any better than
# just dumping the `cmds_dict`?
path = cmds_dict.get("export_path", {})
export_vars = cmds_dict.get("export_vars", {})
# treat PATH specially
if "PATH" in export_vars:
new_path = path.get("PATH", [])
new_path.extend(export_vars.pop("PATH"))
path["PATH"] = new_path
yield {
"path": path,
"vars": {
"export": export_vars,
"unset": cmds_dict.get("unset_vars", ()),
"set": cmds_dict.get("set_vars", {}),
},
"scripts": {
"activate": cmds_dict.get("activate_scripts", ()),
"deactivate": cmds_dict.get("deactivate_scripts", ()),
},
}
activator_map: dict[str, type[_Activator]] = {
"posix": PosixActivator,
"ash": PosixActivator,
"bash": PosixActivator,
"dash": PosixActivator,
"zsh": PosixActivator,
"csh": CshActivator,
"tcsh": CshActivator,
"xonsh": XonshActivator,
"cmd.exe": CmdExeActivator,
"fish": FishActivator,
"powershell": PowerShellActivator,
}
formatter_map = {
"json": JSONFormatMixin,
}
def _build_activator_cls(shell):
"""Dynamically construct the activator class.
Detect the base activator and any number of formatters (appended using '+' to the base name).
For example, `posix+json` (as in `conda shell.posix+json activate`) would use the
`PosixActivator` base class and add the `JSONFormatMixin`.
"""
shell_etc = shell.split("+")
activator, formatters = shell_etc[0], shell_etc[1:]
bases = [activator_map[activator]]
for f in formatters:
bases.append(formatter_map[f])
cls = type("Activator", tuple(reversed(bases)), {})
return cls