Files
openstacksdk/openstack/utils.py
Stephen Finucane e0f57ad81f Bump Python version used for linters to 3.10
This looks very large, but the only manual change is in pyproject.toml
and the bump of the ruff pre-commit hook: the rest is entirely ruff
converting our use of e.g. 'typing.Union[X, Y]' to 'X | Y', as added by
PEP-604 [1].

[1] https://peps.python.org/pep-0604/

Change-Id: I3ed176018cf78c417e751834e57412d72884a69b
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
2025-05-12 12:04:42 +01:00

678 lines
22 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections.abc
import hashlib
import io
import queue
import string
import threading
import time
import typing as ty
import keystoneauth1
from keystoneauth1 import adapter as ks_adapter
from keystoneauth1 import discover
from openstack import _log
from openstack import exceptions
def urljoin(*args: str | None) -> str:
"""A custom version of urljoin that simply joins strings into a path.
The real urljoin takes into account web semantics like when joining a url
like /path this should be joined to http://host/path as it is an anchored
link. We generally won't care about that in client.
"""
return '/'.join(str(a or '').strip('/') for a in args)
def iterate_timeout(
timeout: int | None,
message: str,
wait: int | float | None = 2,
) -> ty.Generator[int, None, None]:
"""Iterate and raise an exception on timeout.
This is a generator that will continually yield and sleep for
wait seconds, and if the timeout is reached, will raise an exception
with <message>.
:param timeout: Maximum number of seconds to wait for transition. Set to
``None`` to wait forever.
:param message: The message to use for the exception if the timeout is
reached.
:param wait: Number of seconds to wait between checks. Set to ``None``
to use the default interval.
:returns: None
:raises: :class:`~openstack.exceptions.ResourceTimeout` transition
:raises: :class:`~openstack.exceptions.SDKException` if ``wait`` is not a
valid float, integer or None.
"""
log = _log.setup_logging('openstack.iterate_timeout')
try:
# None as a wait winds up flowing well in the per-resource cache
# flow. We could spread this logic around to all of the calling
# points, but just having this treat None as "I don't have a value"
# seems friendlier
if wait is None:
wait = 2
elif wait == 0:
# wait should be < timeout, unless timeout is None
wait = 0.1 if timeout is None else min(0.1, timeout)
wait = float(wait)
except ValueError:
raise exceptions.SDKException(
f"Wait value must be an int or float value. {wait!r} given instead"
)
start = time.time()
count = 0
while (timeout is None) or (time.time() < start + timeout):
count += 1
yield count
log.debug('Waiting %s seconds', wait)
time.sleep(wait)
raise exceptions.ResourceTimeout(message)
class _AccessSaver:
__slots__ = ('keys',)
def __init__(self) -> None:
self.keys: list[str] = []
def __getitem__(self, key: str) -> None:
self.keys.append(key)
def get_string_format_keys(
fmt_string: str, old_style: bool = True
) -> list[str]:
"""Gets a list of required keys from a format string
Required mostly for parsing base_path urls for required keys, which
use the old style string formatting.
"""
if old_style:
a = _AccessSaver()
fmt_string % a
return a.keys
else:
keys = []
for t in string.Formatter().parse(fmt_string):
if t[1] is not None:
keys.append(t[1])
return keys
def supports_version(
adapter: ks_adapter.Adapter,
version: str,
raise_exception: bool = False,
) -> bool:
"""Determine if the given adapter supports the given version.
Checks the version asserted by the service and ensures this matches the
provided version. ``version`` can be a major version or a major-minor
version
:param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
:param version: String containing the desired version.
:param raise_exception: Raise exception when requested version
is not supported by the server.
:returns: ``True`` if the service supports the version, else ``False``.
:raises: :class:`~openstack.exceptions.SDKException` when
``raise_exception`` is ``True`` and requested version is not supported.
"""
def _supports_version() -> bool:
required = discover.normalize_version_number(version)
major_version = adapter.get_api_major_version()
if not major_version:
return False
if not discover.version_match(required, major_version):
return False
return True
supported = _supports_version()
if not supported and raise_exception:
raise exceptions.SDKException(
f'Required version {version} is not supported by the server'
)
return supported
def supports_microversion(
adapter: ks_adapter.Adapter,
microversion: str | int | float | ty.Iterable[str | int | float],
raise_exception: bool = False,
) -> bool:
"""Determine if the given adapter supports the given microversion.
Checks the min and max microversion asserted by the service and ensures
``min <= microversion <= max``. If set, the current default microversion is
taken into consideration to ensure ``microversion <= default``.
:param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
:param microversion: String containing the desired microversion.
:param raise_exception: Raise exception when requested microversion
is not supported by the server or is higher than the current default
microversion.
:returns: True if the service supports the microversion, else False.
:raises: :class:`~openstack.exceptions.SDKException` when
``raise_exception`` is ``True`` and requested microversion is not
supported.
"""
endpoint_data = adapter.get_endpoint_data()
if endpoint_data is None:
if raise_exception:
raise exceptions.SDKException('Could not retrieve endpoint data')
return False
if (
endpoint_data.min_microversion
and endpoint_data.max_microversion
and discover.version_between(
endpoint_data.min_microversion,
endpoint_data.max_microversion,
microversion,
)
):
if adapter.default_microversion is not None:
# If default_microversion is set - evaluate
# whether it match the expectation
candidate = discover.normalize_version_number(
adapter.default_microversion
)
required = discover.normalize_version_number(microversion)
supports = discover.version_match(required, candidate)
if raise_exception and not supports:
raise exceptions.SDKException(
f'Required microversion {microversion} is higher than '
f'currently selected {adapter.default_microversion}'
)
return supports
return True
if raise_exception:
raise exceptions.SDKException(
f'Required microversion {microversion} is not supported '
f'by the server side'
)
return False
def require_microversion(adapter: ks_adapter.Adapter, required: str) -> None:
"""Require microversion.
:param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
:param str microversion: String containing the desired microversion.
:raises: :class:`~openstack.exceptions.SDKException` when requested
microversion is not supported
"""
supports_microversion(adapter, required, raise_exception=True)
def pick_microversion(
session: ks_adapter.Adapter, required: str
) -> str | None:
"""Get a new microversion if it is higher than session's default.
:param session: The session to use for making this request.
:param required: Minimum version that is required for an action.
:return: ``required`` as a string if the ``session``'s default is too low,
otherwise the ``session``'s default. Returns ``None`` if both
are ``None``.
:raises: TypeError if ``required`` is invalid.
:raises: :class:`~openstack.exceptions.SDKException` if requested
microversion is not supported.
"""
required_normalized = None
if required is not None:
required_normalized = discover.normalize_version_number(required)
if session.default_microversion is not None:
default = discover.normalize_version_number(
session.default_microversion
)
if required_normalized is None:
required_normalized = default
else:
required_normalized = (
default
if discover.version_match(required_normalized, default)
else required_normalized
)
if required_normalized is None:
return None
if not supports_microversion(session, required_normalized):
raise exceptions.SDKException(
'Requested microversion is not supported by the server side '
'or the default microversion is too low'
)
return discover.version_to_string(required_normalized)
def maximum_supported_microversion(
adapter: ks_adapter.Adapter,
client_maximum: str | None,
) -> str | None:
"""Determine the maximum microversion supported by both client and server.
:param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
:param client_maximum: Maximum microversion supported by the client.
If ``None``, ``None`` is returned.
:returns: the maximum supported microversion as string or ``None``.
"""
if client_maximum is None:
return None
# NOTE(dtantsur): if we cannot determine supported microversions, fall back
# to the default one.
try:
endpoint_data = adapter.get_endpoint_data()
except keystoneauth1.exceptions.discovery.DiscoveryFailure:
endpoint_data = None
if endpoint_data is None:
log = _log.setup_logging('openstack')
log.warning(
'Cannot determine endpoint data for service %s',
adapter.service_type or adapter.service_name,
)
return None
if not endpoint_data.max_microversion:
return None
client_max = discover.normalize_version_number(client_maximum)
server_max = discover.normalize_version_number(
endpoint_data.max_microversion
)
if endpoint_data.min_microversion:
server_min = discover.normalize_version_number(
endpoint_data.min_microversion
)
if client_max < server_min:
# NOTE(dtantsur): we may want to raise in this case, but this keeps
# the current behavior intact.
return None
result = min(client_max, server_max)
return discover.version_to_string(result)
def _hashes_up_to_date(
md5: str | None,
sha256: str | None,
md5_key: str,
sha256_key: str,
) -> bool:
"""Compare md5 and sha256 hashes for being up to date
md5 and sha256 are the current values.
md5_key and sha256_key are the previous values.
"""
up_to_date = False
if md5 and md5_key == md5:
up_to_date = True
if sha256 and sha256_key == sha256:
up_to_date = True
if md5 and md5_key != md5:
up_to_date = False
if sha256 and sha256_key != sha256:
up_to_date = False
return up_to_date
def _calculate_data_hashes(
data: io.BufferedReader | bytes,
) -> tuple[str, str]:
_md5 = hashlib.md5(usedforsecurity=False)
_sha256 = hashlib.sha256()
if isinstance(data, io.BufferedIOBase):
for chunk in iter(lambda: data.read(8192), b''):
_md5.update(chunk)
_sha256.update(chunk)
elif isinstance(data, bytes):
_md5.update(data)
_sha256.update(data)
else:
raise TypeError(
'unsupported type for data; expected IO stream or bytes; got '
'{type(data)}'
)
return _md5.hexdigest(), _sha256.hexdigest()
def _get_file_hashes(filename: str) -> tuple[str, str]:
_md5, _sha256 = (None, None)
with open(filename, 'rb') as file_obj:
_md5, _sha256 = _calculate_data_hashes(file_obj)
return _md5, _sha256
class TinyDAG:
"""Tiny DAG
Bases on the Kahn's algorithm, and enables parallel visiting of the nodes
(parallel execution of the workflow items).
"""
def __init__(self) -> None:
self._reset()
self._lock = threading.Lock()
def _reset(self) -> None:
self._graph: dict[str, set[str]] = {}
self._wait_timeout = 120
@property
def graph(self) -> dict[str, set[str]]:
"""Get graph as adjacency dict"""
return self._graph
def add_node(self, node: str) -> None:
self._graph.setdefault(node, set())
def add_edge(self, u: str, v: str) -> None:
self._graph[u].add(v)
def walk(self, timeout: int | None = None) -> 'TinyDAG':
"""Start the walking from the beginning."""
if timeout:
self._wait_timeout = timeout
return self
def __iter__(self) -> 'TinyDAG':
self._start_traverse()
return self
def __next__(self) -> str:
# Start waiting if it is expected to get something
# (counting down from graph length to 0).
if self._it_cnt > 0:
self._it_cnt -= 1
try:
res = self._queue.get(block=True, timeout=self._wait_timeout)
return res
except queue.Empty:
raise exceptions.SDKException(
'Timeout waiting for cleanup task to complete'
)
else:
raise StopIteration
def node_done(self, node: str) -> None:
"""Mark node as "processed" and put following items into the queue"""
self._done.add(node)
for v in self._graph[node]:
self._run_in_degree[v] -= 1
if self._run_in_degree[v] == 0:
self._queue.put(v)
def _start_traverse(self) -> None:
"""Initialize graph traversing"""
self._run_in_degree = self._get_in_degree()
self._queue: queue.Queue[str] = queue.Queue()
self._done: set[str] = set()
self._it_cnt = len(self._graph)
for k, v in self._run_in_degree.items():
if v == 0:
self._queue.put(k)
def _get_in_degree(self) -> dict[str, int]:
"""Calculate the in_degree (count incoming) for nodes"""
_in_degree: dict[str, int] = {u: 0 for u in self._graph.keys()}
for u in self._graph:
for v in self._graph[u]:
_in_degree[v] += 1
return _in_degree
def topological_sort(self) -> list[str]:
"""Return the graph nodes in the topological order"""
result = []
for node in self:
result.append(node)
self.node_done(node)
return result
def size(self) -> int:
return len(self._graph.keys())
def is_complete(self) -> bool:
return len(self._done) == self.size()
# Importing Munch is a relatively expensive operation (0.3s) while we do not
# really even need much of it. Before we can rework all places where we rely on
# it we can have a reduced version.
class Munch(dict[str, ty.Any]):
"""A slightly stripped version of munch.Munch class"""
def __init__(self, *args: ty.Any, **kwargs: ty.Any):
self.update(*args, **kwargs)
# only called if k not found in normal places
def __getattr__(self, k: str) -> ty.Any:
"""Gets key if it exists, otherwise throws AttributeError."""
try:
return object.__getattribute__(self, k)
except AttributeError:
try:
return self[k]
except KeyError:
raise AttributeError(k)
def __setattr__(self, k: str, v: ty.Any) -> None:
"""Sets attribute k if it exists, otherwise sets key k. A KeyError
raised by set-item (only likely if you subclass Munch) will
propagate as an AttributeError instead.
"""
try:
# Throws exception if not in prototype chain
object.__getattribute__(self, k)
except AttributeError:
try:
self[k] = v
except Exception:
raise AttributeError(k)
else:
object.__setattr__(self, k, v)
def __delattr__(self, k: str) -> None:
"""Deletes attribute k if it exists, otherwise deletes key k.
A KeyError raised by deleting the key - such as when the key is missing
- will propagate as an AttributeError instead.
"""
try:
# Throws exception if not in prototype chain
object.__getattribute__(self, k)
except AttributeError:
try:
del self[k]
except KeyError:
raise AttributeError(k)
else:
object.__delattr__(self, k)
def toDict(self) -> dict[str, ty.Any]:
"""Recursively converts a munch back into a dictionary."""
return unmunchify(self)
@property
def __dict__(self) -> dict[str, ty.Any]: # type: ignore[override]
return self.toDict()
def __repr__(self) -> str:
"""Invertible* string-form of a Munch."""
return f'{self.__class__.__name__}({dict.__repr__(self)})'
def __dir__(self) -> list[str]:
return list(self.keys())
def __getstate__(self) -> dict[str, ty.Any]:
"""Implement a serializable interface used for pickling.
See https://docs.python.org/3.6/library/pickle.html.
"""
return {k: v for k, v in self.items()}
def __setstate__(self, state: dict[str, ty.Any]) -> None:
"""Implement a serializable interface used for pickling.
See https://docs.python.org/3.6/library/pickle.html.
"""
self.clear()
self.update(state)
# TODO(stephenfin): This needs to be stricter in the types that it will
# accept. By limiting it to the primitive types (or subclasses of same) we
# should cover everything we (sdk) care about and will be able to type the
# results.
@classmethod
def fromDict(cls, d: dict[str, ty.Any]) -> 'Munch':
"""Recursively transforms a dictionary into a Munch via copy."""
# Munchify x, using `seen` to track object cycles
seen: dict[int, ty.Any] = dict()
def munchify_cycles(obj: ty.Any) -> ty.Any:
try:
return seen[id(obj)]
except KeyError:
pass
seen[id(obj)] = partial = pre_munchify(obj)
return post_munchify(partial, obj)
def pre_munchify(obj: ty.Any) -> ty.Any:
if isinstance(obj, collections.abc.Mapping):
return cls({})
elif isinstance(obj, list):
return type(obj)()
elif isinstance(obj, tuple):
type_factory = getattr(obj, "_make", type(obj))
return type_factory(munchify_cycles(item) for item in obj)
else:
return obj
def post_munchify(partial: ty.Any, obj: ty.Any) -> ty.Any:
if isinstance(obj, collections.abc.Mapping):
partial.update(
(k, munchify_cycles(obj[k])) for k in obj.keys()
)
elif isinstance(obj, list):
partial.extend(munchify_cycles(item) for item in obj)
elif isinstance(obj, tuple):
for item_partial, item in zip(partial, obj):
post_munchify(item_partial, item)
return partial
return ty.cast('Munch', munchify_cycles(d))
def copy(self) -> 'Munch':
return self.fromDict(self)
def update(self, *args: ty.Any, **kwargs: ty.Any) -> None:
"""
Override built-in method to call custom __setitem__ method that may
be defined in subclasses.
"""
for k, v in dict(*args, **kwargs).items():
self[k] = v
def get(self, k: str, d: ty.Any = None) -> ty.Any:
"""
D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.
"""
if k not in self:
return d
return self[k]
def setdefault(self, k: str, d: ty.Any = None) -> ty.Any:
"""
D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D
"""
if k not in self:
self[k] = d
return self[k]
def munchify(x: dict[str, ty.Any], factory: type[Munch] = Munch) -> Munch:
"""Recursively transforms a dictionary into a Munch via copy."""
return Munch.fromDict(x)
def unmunchify(x: Munch) -> dict[str, ty.Any]:
"""Recursively converts a Munch into a dictionary."""
# Munchify x, using `seen` to track object cycles
seen: dict[int, ty.Any] = dict()
def unmunchify_cycles(obj: ty.Any) -> ty.Any:
try:
return seen[id(obj)]
except KeyError:
pass
seen[id(obj)] = partial = pre_unmunchify(obj)
return post_unmunchify(partial, obj)
def pre_unmunchify(obj: ty.Any) -> ty.Any:
if isinstance(obj, collections.abc.Mapping):
return dict()
elif isinstance(obj, list):
return type(obj)()
elif isinstance(obj, tuple):
type_factory = getattr(obj, "_make", type(obj))
return type_factory(unmunchify_cycles(item) for item in obj)
else:
return obj
def post_unmunchify(partial: ty.Any, obj: ty.Any) -> ty.Any:
if isinstance(obj, collections.abc.Mapping):
partial.update((k, unmunchify_cycles(obj[k])) for k in obj.keys())
elif isinstance(obj, list):
partial.extend(unmunchify_cycles(v) for v in obj)
elif isinstance(obj, tuple):
for value_partial, value in zip(partial, obj):
post_unmunchify(value_partial, value)
return partial
return ty.cast(dict[str, ty.Any], unmunchify_cycles(x))