Source code for emlib.misc

"""

Miscellaneous functionality

* **Search**: `nearest_element`, `nearest_unsorted`, `nearest_index`
* **Sort**: `sort_natural`, `zipsort`, `issorted`
* **Namedtuples**: `namedtupled_addcolumn`, `namedtuple_extend`, etc.
* **Open files**: `open_with_standard_app`, `wait_for_file_modified`, `open_with`
* **Unit conversions**: `cm_to_pixels`, `page_dinsize_to_mm`, etc.
* **Other**: `singleton`

"""
# -*- coding: utf-8 -*-
from __future__ import annotations
import os as _os
import sys as _sys
from bisect import bisect as _bisect
import re as _re

import numpy as np


from typing import TYPE_CHECKING
if TYPE_CHECKING or 'sphinx' in _sys.modules:
    from typing import TypeVar, Sequence, Union, Callable, Any, Iterable
    T = TypeVar("T")
    T2 = TypeVar("T2")
    from fractions import Fraction
    import numbers
    num_t =  TypeVar("num_t", int, float, numbers.Rational)
    number_t = Union[float, numbers.Rational]
    import subprocess



# ------------------------------------------------------------
#     CHUNKS
# ------------------------------------------------------------


[docs] def reverse_recursive(seq: list): """ Reverse seq recursively Args: seq: a (possibly nested) list of elements Returns: a reversed version of `seq` where all sub-lists are also reversed. Example ~~~~~~~ >>> reverse_recursive([1, 2, [3, 4], [5, [6, 7]]]) [[[7, 6], 5], [4, 3], 2, 1] .. note:: only lists will be reversed, other iterable collection remain untouched """ out = [] for x in seq: if isinstance(x, list): x = reverse_recursive(x) out.append(x) out.reverse() return out
def _partialsum(seq: Sequence[num_t], init: num_t) -> list[num_t]: accum: num_t = init out = [] for i in seq: accum += i out.append(accum) return out
[docs] def wrap_by_sizes(flatseq: list, packsizes: Sequence[int]) -> list[list]: """ Wrap a flat seq using the given sizes Args: flatseq: a flat sequence of items packsizes: a list of sizes Returns: a list of groups, where each group is of size as given by packsizes Example ~~~~~~~ >>> flatseq = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> wrap_by_sizes(flatseq, [3, 5, 2]) [[1, 2, 3], [4, 5, 6, 7, 8], [9, 10]] """ offsets = [0] + _partialsum(packsizes, 0) start = offsets[0] out = [] for i in range(1, len(offsets)): end = offsets[i] out.append(flatseq[start:end]) start = end return out
# ------------------------------------------------------------ # - # SEARCH - # - # ------------------------------------------------------------
[docs] def nearest_element(item: float, seq: list[float] | np.ndarray) -> float: """ Find the nearest element (the element, not the index) in seq **NB**: assumes that seq is sorted (this is not checked). seq can also be a numpy array, in which case searchsorted is used instead of bisect Args: item: the item to search seq: either a list of numbers, or a numpy array Returns: the value of the nearest element of seq Example ~~~~~~~ >>> seq = list(range(10)) >>> nearest_element(4.1, seq) 4 >>> nearest_element(3.6, [1,2,3,4,5]) 4 >>> nearest_element(200, np.array([3,5,20])) 20 >>> nearest_element(0.5, [0, 1]) 0 >>> nearest_element(1, [1, 1, 1]) 1 """ # check boundaries seq0 = seq[0] if item <= seq0: return seq0 seq1 = seq[-1] if item >= seq1: return seq1 if isinstance(seq, np.ndarray): ir = int(seq.searchsorted(item, 'right')) else: ir = _bisect(seq, item) element_r = seq[ir] element_l = seq[ir - 1] if abs(element_r - item) < abs(element_l - item): return element_r return element_l
[docs] def nearest_unsorted(x: num_t, seq: list[num_t]) -> num_t: """ Find nearest item in an unsorted sequence Args: x: a number seq: a seq. of numbers (assumes it is not sorted) Returns: the item in seq. which is nearest to x .. note:: for sorted seq. use :func:`nearest_index` Example ~~~~~~~ >>> assert nearest_unsorted(3.6, (1,2,3,4,5)) == 4 >>> assert nearest_unsorted(4, (2,3,4)) == 4 >>> assert nearest_unsorted(200, (3,5,20)) == 20 """ return min((abs(x - y), y) for y in seq)[1]
[docs] def nearest_index(item: num_t, seq: Sequence[num_t]) -> int: """ Return the index of the nearest element in seq to item Args: item: a number seq: a sorted sequence of numbers Returns: the index of the nearest item .. note:: Assumes that seq is sorted Example ~~~~~~~ >>> seq = [0, 3, 4, 8] >>> nearest_index(3.1, seq) 1 >>> nearest_index(6.5, seq) 3 .. seealso:: :func:`nearest_unsorted` """ ir = _bisect(seq, item) seqlen = len(seq) if ir == seqlen: return seqlen - 1 if ir == 0: return ir il = ir - 1 return ir if seq[ir] - item < item - seq[il] else il
# ------------------------------------------------------------ # # SORTING # # ------------------------------------------------------------
[docs] def sort_natural(seq: list, key: Callable[[Any], str]=None) -> list: """ sort a string sequence naturally Sorts the sequence so that 'item1' and 'item2' are before 'item10' Args: seq: the sequence to sort key: a function to convert an item in seq to a string Examples ~~~~~~~~ >>> seq = ["e10", "e2", "f", "e1"] >>> sorted(seq) ['e1', 'e10', 'e2', 'f'] >>> sort_natural(seq) ['e1', 'e2', 'e10', 'f'] >>> seq = [(2, "e10"), (10, "e2")] >>> sort_natural(seq, key=lambda tup:tup[1]) [(10, 'e2'), (2, 'e10')] """ def convert(text: str): return int(text) if text.isdigit() else text.lower() def alphanum_key(key:str): return [convert(c) for c in _re.split('([0-9]+)', key)] if key is not None: return sorted(seq, key=lambda x: alphanum_key(key(x))) return sorted(seq, key=alphanum_key)
[docs] def sort_natural_dict(d: dict[str, Any], recursive=True) -> dict: """ sort dict d naturally and recursively """ rows: list[tuple[str, Any]] = [] if recursive: for key, value in d.items(): if isinstance(value, dict): value = sort_natural_dict(value, recursive=recursive) rows.append((key, value)) sorted_rows = sort_natural(rows, key=lambda row: row[0]) else: keys = list(d.keys()) sorted_rows = [(key, d[key]) for key in sort_natural(keys)] return dict(sorted_rows)
[docs] def issorted(seq: Sequence, key=None) -> bool: """ Returns True if seq is sorted Args: seq: the seq. to query key: an optional key to use Example ~~~~~~~ >>> seq = [(10, "a"), (0, "b"), (45, None)] >>> issorted(seq, key=lambda item:item[0]) False """ lastx = -float('inf') if key is not None: for x in seq: x = key(x) if x < lastx: return False lastx = x return True else: for x in seq: if x < lastx: return False lastx = x return True
[docs] def some(x, otherwise=False): """ Returns ``x`` if it is not None, else ``otherwise`` This allows code like:: myvar = some(myvar) or default # If default does not need to be shortcircuited, then simply: myvar = some(myvar, default) instead of:: myvar = myvar if myvar is not None else default """ return x if x is not None else otherwise
[docs] def firstval(*values, sentinel=None): """ Get the first value in values which is not sentinel. At least one of the values should differ from sentinel, otherwise an exception is raised. To allow short-cirtcuit lazy evaluation, a callable can be given as value, in which case the function will only be evaluated if the previous values where `sentinel` .. seealso:: :func:`some` Example ~~~~~~~ .. code-block:: python config = {'a': 10, 'b': 20} def func(a=None, b=None): a = firstval(a, lambda: computation(), config['a']) """ for value in values: if callable(value): value2 = value() if value2 is not sentinel: return value2 elif value is not sentinel: return value raise ValueError(f"All values are {sentinel}")
[docs] def zipsort(a: Sequence[T], b: Sequence[T2], key: Callable | None = None, reverse=False ) -> tuple[list[T], list[T2]]: """ Sort a and keep b in sync It is the equivalent of:: a, b = unzip(sorted(zip(a, b), key=key)) Example ~~~~~~~ >>> names = ['John', 'Mary', 'Nick'] >>> ages = [20, 10, 34] >>> ages, names = zipsort(ages, names) >>> names ('Mary', 'John', 'Nick') """ zipped = sorted(zip(a, b), key=key, reverse=reverse) a, b = zip(*zipped) return (list(a), list(b))
[docs] def duplicates(seq: Sequence[T], mincount=2) -> list[T]: """ Find all elements in seq which are present at least `mincount` times """ if mincount == 2: from . import iterlib return list(iterlib.duplicates(seq)) from collections import Counter counter = Counter(seq).items() return [item for item, count in counter if count >= mincount]
[docs] def remove_duplicates(seq: Sequence[T]) -> list[T]: """ Remove all duplicates in seq while keeping its order If order is not important, use list(set(seq)) Args: seq: a list of elements (elements must be hashable) Returns: a new list with all the unique elements of seq in its original order .. note:: list(set(...)) does not keep order """ # we use the fact that dicts keep order: return list(dict.fromkeys(seq))
[docs] def remove_last_matching(seq: list[T], func: Callable[[T], bool]) -> T | None: """ Remove last element of *seq* matching the given condition, **in place** Args: seq: the list to modify func: a function taking an element of *seq*, should return True if this is the element to remove Returns: the removed element, or None if the condition was never met Example ~~~~~~~ >>> a = [0, 1, 2, 3, 4, 5, 6] >>> remove_last_matching(a, lambda item: item % 2 == 1) >>> a [0, 1, 2, 3, 4, 6] """ seqlen = len(seq) for i, x in enumerate(reversed(seq)): if func(x): return seq.pop(seqlen - i - 1) return None
[docs] def fractional_slice(seq: Sequence[T], step:float, start=0, end=-1) -> list[T]: """ Take a slice similar to seq[start:end:step] with fractional step Args: seq: the sequence of elements step: the step size start: start index end: end index Returns: the resulting list Example ~~~~~~~ >>> fractional_slice(range(10), 1.5) >>> [0, 2, 3, 5, 6, 8] """ if step < 1: raise ValueError("step should be >= 1 (for now)") accum = 0. out = [seq[start]] for elem in seq[start+1:end]: accum += 1 if accum >= step: out.append(elem) accum -= step return out
[docs] def sec2str(seconds:float, msdigits=3) -> str: """ Convert seconds to a suitable string representation Args: seconds: time in seconds Returns: the equivalent time as string """ h = int(seconds // 3600) m = int((seconds - h * 3600) // 60) s = seconds % 60 sint = int(s) sfrac = round((s - sint), msdigits) fmt = f"%.{msdigits}g" msstr = (fmt % sfrac)[1:2+msdigits] return f"{h}:{m:02}:{sint:02}{msstr}" if h > 0 else f"{m}:{sint:02}{msstr}"
[docs] def parse_time(t: str) -> float: """ Parse a time string ``HH:MM:SS.mmm`` and convert it to seconds Given a time in the format HH:MM:SS.mmm or any sub-form of it (SS.mmm, MM:SS, etc), return the time in seconds. This is the inverse of sec2str Args: t: the time as string Returns: seconds """ parts = t.split(":") if len(parts) == 1: # only seconds return float(parts[0]) elif len(parts) == 2: return float(parts[1]) + float(parts[0])*60 elif len(parts) == 3: return float(parts[2]) + float(parts[1])*60 + float(parts[0])*3600 else: raise ValueError("Format not understood")
# ------------------------------------------------------------ # # namedtuple utilities # # ------------------------------------------------------------
[docs] def namedtuple_addcolumn(namedtuples, seq, column_name: str, classname=""): # type: ignore """ Add a column to a sequence of named tuples Args: namedtuples: a list of namedtuples seq: the new column column_name: name of the column classname: nane of the new namedtuple Returns: a list of namedtuples with the added column """ from collections import namedtuple t0 = namedtuples[0] assert isinstance(t0, tuple) and hasattr(t0, "_fields"), "namedtuples should be a seq. of namedtuples" name = classname or namedtuples[0].__class__.__name__ + '_' + column_name # type: str NewTup = namedtuple(name, t0._fields + (column_name,)) # type: ignore newtuples = [NewTup(*(t + (value,))) for t, value in zip(namedtuples, seq)] return newtuples
[docs] def namedtuples_renamecolumn(namedtuples: list, oldname: str, newname: str, classname='' ) -> list: # type: ignore """ Rename the column of a seq of namedtuples Args: namedtuples: a list of namedtuples oldname: the name of the column which will be modified newname: the new name of the column classname: the name of the new namedtuple class Returns: the new namedtuples with the renamed column >>> from collections import namedtuple >>> Person = namedtuple("Person", "firstname familyname") >>> people = [Person("John", "Smith"), Person("Amy", "Adams")] >>> people2 = namedtuples_renamecolumn(people, "firstname", "first_name") >>> people2[0]._fields ('first_name', 'familyname') """ if classname is None: classname = "%s_R" % namedtuples[0].__class__.__name__ newfields = [field if field != oldname else newname for field in namedtuples[0]._fields] from collections import namedtuple NewTup = namedtuple(classname, newfields) newtuples = [NewTup(*t) for t in namedtuples] return newtuples
[docs] def namedtuple_extend(name: str, orig, columns: str | Sequence[str]): """ Create a new namedtuple constructor with the added columns It returns the class constructor and an ad-hoc constructor which takes as arguments an instance of the original namedtuple and the additional args Args: name: new name for the type orig: an instance of the original namedtuple or the constructor itself columns : the columns to add Returns: a tuple (newtype, newtype_from_old) Example ~~~~~~~ >>> from collections import namedtuple >>> Point = namedtuple("Point", "x y") >>> p = Point(10, 20) >>> Vec3, fromPoint = namedtuple_extend("Vec3", Point, "z") >>> Vec3(1, 2, 3) Vec3(x=1, y=2, z=3) >>> fromPoint(p, 30) Vec3(x=10, y=20, z=30) """ from collections import namedtuple if isinstance(columns, str): columns = columns.split() fields = orig._fields + tuple(columns) N = namedtuple(name, fields) def new_from_orig(orig, *args, **kws): """ orig: the original namedtuple from which to construct an extended version args, kws: the missing columns """ return N(*(orig + args), **kws) return N, new_from_orig
# ------------------------------------------------------------ # # MISCELLANEOUS # # ------------------------------------------------------------
[docs] def isiterable(obj, exceptions: tuple[type, ...]=(str, bytes)) -> bool: """ Is `obj` iterable? Example ~~~~~~~ >>> isiterable([1, 2, 3]) True >>> isiterable("test") False """ return hasattr(obj, '__iter__') and not isinstance(obj, exceptions)
[docs] def isgeneratorlike(obj): "Does ``obj`` behave like a generator? (it can be iterated but has no length)" return hasattr(obj, '__iter__') and not hasattr(obj, '__len__')
[docs] def asnumber(obj, accept_fractions=True, accept_expon=False ) -> int | float | Fraction | None: """ Convert ``obj`` to a number or None if it cannot be converted Example ~~~~~~~ >>> asnumber(1) 1 >>> asnumber("3.4") 3.4 >>> asnumber("1/3", accept_fractions=True) Fraction(1, 3) >>> asnumber("hello") is None True """ if hasattr(obj, '__float__'): return obj elif isinstance(obj, str): if accept_fractions and "/" in obj: from fractions import Fraction return Fraction(obj) try: asint = int(obj) return asint except ValueError: pass if not accept_expon and _re.search(r"[eE][+-]", obj): return None try: asfloat = float(obj) return asfloat except ValueError: return None else: return None
[docs] def astype(type_, obj=None, factory=None): """ Return obj as type. If obj is already of said type, obj itself is returned Otherwise, obj is converted to type. If a special contructor is needed, it can be given as `construct`. If no obj is passed, a partial function is returned which can check for that particular type Args: type_: the type the object should have obj: the object to be checkec/converted factory: if given, a function ``(obj) -> obj`` of type ``type_`` Example ~~~~~~~ >>> astype(list, (3, 4)) [3, 4] >>> l = [3, 4] >>> astype(list, l) is l True >>> aslist = astype(list) """ factory = factory or type_ if obj is None: return lambda obj: obj if isinstance(obj, type_) else factory(obj) return obj if isinstance(obj, type_) else factory(obj)
[docs] def str_is_number(s: str, accept_exp=False, accept_fractions=False) -> bool: """ Returns True if the given string represents a number Args: s: the string to inspect accept_exp: accept exponential notation accept_fractions: accept numbers of the form "3/4" Returns: True if s represents a number, False otherwise .. note:: fractions should have the form num/den, like 3/4, with no spaces in between """ if accept_exp and accept_fractions: return asnumber(s) is not None import re if accept_exp: return re.fullmatch(r"[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?", s) is not None elif accept_fractions: return bool(re.fullmatch(r"[-+]?[0-9]+/[0-9]+", s) or re.fullmatch(r"[-+]?[0-9]*\.?[0-9]+", s)) else: return re.fullmatch(r"[-+]?[0-9]*\.?[0-9]+", s) is not None
[docs] def dictmerge(dict1: dict, dict2: dict) -> dict: """ Merge the contents of the two dicts. If they have keys in common, the value in dict1 is overwritten by the value in dict2 Example ~~~~~~~ >>> a, b = {'A': 1, 'B': 2}, {'B': 20, 'C': 30} >>> dictmerge(a, b) == {'A': 1, 'B': 20, 'C': 30} True """ import warnings warnings.warn("Deprecated, use dict1 | dict2") return dict1 | dict2
[docs] def moses(pred: Callable[[T], bool], seq: Iterable[T] ) -> tuple[list[T], list[T]]: """ Divides *seq* into two lists: filter(pred, seq), filter(not pred, seq) Args: pred: a function predicate seq: the seq. to divide Returns: a tuple ``(true_elements, false_elements)``, where true_elements contains the items in *seq* for which *pred* evaluates to true, and *false_elements* contains the rest Example:: >>> moses(lambda x:x > 5, range(10)) ([6, 7, 8, 9], [0, 1, 2, 3, 4, 5]) """ trueseq, falseseq = [], [] for x in seq: (trueseq if pred(x) else falseseq).append(x) return trueseq, falseseq
[docs] def allequal(xs: Sequence) -> bool: """ Return True if all elements in xs are equal Args: xs: the seq. to query Returns: True if all elements in xs are equal """ x0 = xs[0] return all(x==x0 for x in xs)
[docs] def dumpobj(obj) -> list[tuple[str, Any]]: """ Return all 'public' attributes of this object """ return [(item, getattr(obj, item)) for item in dir(obj) if not item.startswith('__')]
[docs] def can_be_pickled(obj) -> bool: """ Return True if obj can be pickled """ import pickle try: obj2 = pickle.loads(pickle.dumps(obj)) except pickle.PicklingError: return False return obj == obj2
[docs] def snap_to_grid(x: num_t, tick: num_t, offset: num_t = 0, nearest=True ) -> num_t: """ Find the nearest slot in a grid Given a grid defined by offset + tick * N, find the nearest element of that grid to a given x Args: x: the number to snap to the grid tick: distance between ticks of the grid offset: offset of the grid nearest: if True, snap to the nearest tick (the nearest of the next floor or ceil tick), otherwise to the floor tick Returns: the tick to which to snap x to .. note:: the result will have the same type as *x*, so if *x* is float, the result will be float, if it is a Fraction, then the result will be a fraction Example ~~~~~~~ >>> snap_to_grid(1.6, 0.5) 1.5 >>> from fractions import Fraction >>> snap_to_grid(Fraction(2, 3), Fraction(1, 5)) Fraction(3, 5) """ t = x.__class__ if nearest: return t(round((x - offset) / tick)) * tick + offset else: return t(int((x - offset) / tick)) * tick + offset
[docs] def snap_array(X: np.ndarray, tick: float, offset: float = 0., out: np.ndarray | None = None, nearest=True ) -> np.ndarray: """ Snap the values of X to the nearest slot in a grid Assuming a grid t defined by ``t(n) = offset + tick*n``, snap the values of X to the nearest value of t Args: X: an array tick: the step value of the grid offset: the offset of the grid out: if given, snapped values are placed in this array nearest: if True, the nearest slot is selected. Otherwise the next lower (floor) Returns: an array containing the snapped values. This array will be *out* if it was given """ if tick <= 0: raise ValueError("tick should be > 0") if nearest: return _snap_array_nearest(X, tick, offset=float(offset), out=out) return _snap_array_floor(X, tick, offset=float(offset), out=out)
def _snap_array_nearest(X: np.ndarray, tick: number_t, offset: number_t = 0., out: np.ndarray | None = None ) -> np.ndarray: if out is None: out = X.copy() if offset != 0: out -= offset out /= tick out = np.round(out, out=out) out *= tick out += offset else: out /= tick out = np.round(out, out=out) out *= tick return out def _snap_array_floor(X: np.ndarray, tick:float, offset=0., out: np.ndarray=None ) -> np.ndarray: arr = out if out is not None else X.copy() if offset != 0: arr -= offset arr /= tick arr = np.floor(arr, out=arr) arr *= tick arr += offset else: arr /= tick arr = np.floor(arr, out=arr) arr *= tick return arr
[docs] def distribute_in_zones(x: num_t, split_points: Sequence[num_t], side="left") -> int: """ Returns the index of a "zone" where to place x. A zone is a numeric range defined by an inclusive lower boundary and a non-inclusive higher boundary **NB**: see :func:`distribute_in_zones_right` for a non-inclusive lower and inclusive upper boundary. The edge zones extend to inf. Args: x: the number to assign a zone to split_points: the split points which define the zones side: if "left", a zone has an inclusive lower bound and a non-inclusive upper bound. "right" is the opposite Returns: the index of the zone Example:: # 1 and 5 define three zones: (-inf, 1], (1, 5], (5, inf) >>> distribute_in_zones(2, [1, 5]) 1 >>> distribute_in_zones(5, [1, 5]) 2 """ if side == "right": return _distribute_in_zones_right(x, split_points) imin = 0 imax = len(split_points) while imin < imax: imid = imin + ((imax - imin) >> 2) if split_points[imid] <= x: imin = imid + 1 else: imax = imid return imin
def _distribute_in_zones_right(x: num_t, split_points: Sequence[num_t]) -> int: """ the same as distribute_in_zones, but with right inclusive zones """ imin = 0 imax = len(split_points) while imin < imax: imid = imin + ((imax - imin) >> 2) if split_points[imid] < x: imin = imid + 1 else: imax = imid return imin
[docs] def seq_contains(seq: Sequence[T], subseq: Sequence[T]) -> tuple[int, int] | None: """ Returns the (start, end) indexes if seq contains subseq, or None Example:: >>> seq, subseq = range(10), [3, 4, 5] >>> indices = seq_contains(seq, subseq) >>> assert seq[indices[0]:indices[1]] == subseq """ for i in range(len(seq)-len(subseq)+1): for j in range(len(subseq)): if seq[i+j] != subseq[j]: break else: return i, i+len(subseq) return None
[docs] def deepupdate(orig: dict, updatewith: dict) -> dict: """ recursively update orig with updatewith """ for key, value in updatewith.items(): if not isinstance(value, dict) or key not in orig: orig[key] = value else: orig[key] = deepupdate(orig[key], value) return orig
# ------------------------------------------------------------ # # Image and Pixels # # ------------------------------------------------------------
[docs] def pixels_to_cm(pixels: int, dpi=300) -> float: """ Convert a distance in pixels to cm Args: pixels: number of pixels dpi: dots (pixels) per inch Returns: the corresponding value in cm """ inches = pixels / dpi cm = inches * 2.54 return cm
[docs] def cm_to_pixels(cm: float, dpi=300) -> float: """ convert a distance in cm to pixels Args: cm: a value in cm dpi: dots-per-inch Returns: the corresponding value in pixels """ inches = cm * 0.3937008 pixels = inches * dpi return pixels
[docs] def inches_to_pixels(inches: float, dpi=300) -> float: """ Convert inches to pixels """ return inches * dpi
[docs] def pixels_to_inches(pixels: int, dpi=300) -> float: """Convert pixels to inches""" return pixels / dpi
[docs] def page_dinsize_to_mm(pagesize: str, pagelayout: str) -> tuple[float, float]: """ Return the (height, width) for a given DIN size and page orientation Args: pagesize: size as DIN string (a3, a4, etc) pagelayout: portrait or landscape Returns: a tuple (height, width) in mm ========== ================================= Format Width x Heigh (mm) ========== ================================= A0 841 x 1189 A1 594 x 841 A2 420 x 594 A3 297 x 420 A4 210 x 297 A5 148 x 210 A6 105 x 148 A7 74 x 105 ========== ================================= """ pagesizes = { 'a0': (1189, 841), 'a1': (841, 594), 'a2': (594, 420), 'a3': (420, 297), 'a4': (297, 210), 'a5': (210, 148), 'a6': (148, 105), 'a7': (105, 74) } w, h = pagesizes.get(pagesize.lower(), (0, 0)) if not w: raise ValueError(f"pagesize {pagesize} not known. Supported sizes: {pagesizes.keys()}") if pagelayout == 'portrait': w, h = h, w return h, w
# ------------------------------------------------------------ # # Decorators # # ------------------------------------------------------------ #
[docs] def public(f): """ decorator - keeps __all__ updated **NB**: it has no performance penalty at runtime since the decorator just returns the passed function * Based on an idea by Duncan Booth: http://groups.google.com/group/comp.lang.python/msg/11cbb03e09611b8a * Improved via a suggestion by Dave Angel: http://groups.google.com/group/comp.lang.python/msg/3d400fb22d8a42e1 """ publicapi = _sys.modules[f.__module__].__dict__.setdefault('__all__', []) if f.__name__ not in publicapi: # Prevent duplicates if run from an IDE. publicapi.append(f.__name__) return f
[docs] def singleton(cls: type): """ A class decorator to create a singleton class Example ------- :: @singleton class Logger(object): pass l = Logger() m = Logger() assert m is l """ instances = {} def get_instance(): if cls not in instances: instances[cls] = cls() return instances[cls] return get_instance()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def type_error_msg(x, *expected_types): """ To be used when raising a TypeError Example:: if isinstance(x, int): ... else: raise TypeError(type_error_msg(x, int)) # This will raise a TypeError with the message # 'Expected type (int,), got str: "foo"' """ return f"Expected type {expected_types}, got {type(x).__name__}: {x}"
# --- crossplatform --- def _open_with_standard_app(path: str, wait: str | bool = False, min_wait=0.5, timeout=0., capture_stdout=True, capture_stderr=True ) -> None: """ Open path with the app defined to handle it at the os level Uses *xdg-open* in linux, *start* in win and *open* in osx. Args: path: the file to open wait: if True, we wait until the app has returned. This is in many cases not possible. If the app returns right away a dialog is created to make waiting explicit until the user confirms this dialog. Alternatively wait can be passed the string "modified", in which case we wait until the given file is modified min_wait: min. wait time. when waiting on app being closed. If the app closes before this time, a dialog appears asking for confirmation. timeout: a timeout for waiting on modified """ import subprocess import time proc = None if _sys.platform == 'linux': proc = subprocess.Popen( ["xdg-open", path], stdout=subprocess.PIPE if capture_stdout else None, stderr=subprocess.PIPE if capture_stderr else None) elif _sys.platform == "win32": # this function exists only in windows _os.startfile(path) # type: ignore elif _sys.platform == "darwin": proc = subprocess.Popen( ["open", path], stdout=subprocess.PIPE if capture_stdout else None, stderr=subprocess.PIPE if capture_stderr else None) min_wait = max(min_wait, 1) else: raise RuntimeError(f"platform {_sys.platform} not supported") if wait == "modified": wait_for_file_modified(path, timeout=timeout or 36000) elif wait: from emlib import dialogs if _sys.platform == "win32": dialogs.showInfo("Close this dialog when finished") elif proc is not None: t0 = time.time() proc.wait() if time.time() - t0 < min_wait: dialogs.showInfo("Close this dialog when finished") def _split_command(s:str) -> list[str]: parts = s.split() parts = [p.replace('"', '') for p in parts] return parts
[docs] def open_with_app(path: str, app: str | list[str] | None = None, wait: bool | str = False, shell=False, min_wait=0.5, timeout=0., capture_stdout=True, capture_stderr=True) -> subprocess.Popen | None: """ Open a document with a given app. It can either wait on the app to exit or wait until the file was modified. The app can be either a command as a string or a list of string arguments passed to *subprocess.Popen* Args: path: the path to the file to open app: a command-line string or a list of string arguments. If no app is given, we ask the os to open this file with its standard app wait: if True, wait until the app stops. If the app is a daemon app (it returns immediately), this situation is detected and a dialog is created which needs to be clicked in order for the function to return. Alternatively, wait can be "modified", in which case we wait until ``path`` has been modified; or "dialog", where a confirmation dialog is open for the user to signal when the editing is done shell: should app be started from a shell? min_wait: if the application returns before this time a wait dialog is created timeout: a timeout for wait_on_modified """ if not app: assert not shell _open_with_standard_app(path, wait=wait, min_wait=min_wait, timeout=timeout, capture_stderr=capture_stderr, capture_stdout=capture_stdout) return import subprocess import time if shell: assert isinstance(app, str), "shell needs a command-line as string" proc = subprocess.Popen(f'{app} "{path}"', shell=True) else: args = app if isinstance(app, list) else app.split() args.append(path) proc = subprocess.Popen(args) t0 = time.time() if wait == "modified": wait_for_file_modified(path, timeout=timeout) elif wait == 'dialog': proc.wait() from emlib import dialogs dialogs.showInfo("Close this dialog when finished") elif wait: proc.wait() if time.time() - t0 < min_wait: from emlib import dialogs dialogs.showInfo("Close this dialog when finished")
[docs] def wait_for_file_modified(path: str, timeout: int | float = 0.) -> bool: """ Wait until file is modified. This is useful when editing a file on an external application which runs in a daemon mode, meaning that opening a file in it might return immediately. Args: path: the path of the file to monitor timeout: how long should we wait for, in seconds Returns: True if the file was modified, False if it wasn't or if the operation timed-out """ from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler directory, base = _os.path.split(path) if not directory: directory = "." handler = PatternMatchingEventHandler(patterns=[base], ignore_directories=True, case_sensitive=True) observer = Observer() modified = False def on_modified(event): nonlocal modified modified = True observer.stop() handler.on_modified = on_modified observer.schedule(handler, path=directory, recursive=False) observer.start() if timeout is None: timeout = 360000 # 100 hours observer.join(timeout) return modified
[docs] def first_existing_path(*paths: str) -> str | None: """ Returns the first path in paths which exists Args: *paths: the paths to test default: a default path returned when all other paths do not exist. It is not checked that this default path exists. Returns: the first existing path within the values given, None if no match was found """ for p in paths: p = _os.path.expanduser(p) if _os.path.exists(p): return p return None
[docs] def html_table(rows: list, headers: list[str], maxwidths: list[int] | None = None, rowstyles: list[str] | None = None, tablestyle='', headerstyle='' ) -> str: """ Create a html table Args: rows: the rows of the table, where each row is a sequence of cells headers: a list of column names maxwidths: if given, a list of max widths for each column rowstyles: if given, a list of styles, one for each column tablestyle: a style applied to the entire table headerstyle: a style applied to the table header Returns: a string with the generated HTML """ parts = [] _ = parts.append if tablestyle: _(f'<table style="{tablestyle}"') else: _("<table>") if headerstyle: _(f'<thead style="{headerstyle}"') else: _("<thead>") _("<tr>") if maxwidths is None: maxwidths = [0] * len(headers) if rowstyles is None: rowstyles = [''] * len(headers) for colname in headers: _(f'<th style="text-align:left">{colname}</th>') _("</tr></thead><tbody>") for row in rows: _("<tr>") for cell, maxwidth, rowstyle in zip(row, maxwidths, rowstyles): if rowstyle: cell = f'<span style="{rowstyle}">{cell}</span>' if maxwidth > 0: _(f'<td style="text-align:left;max-width:{maxwidth}px;">{cell}</td>') else: _(f'<td style="text-align:left">{cell}</td>') _("</tr>") _("</tbody></table>") return "".join(parts)
[docs] def replace_sigint_handler(handler: Callable[[None], None]): """ Replace current SIGINT hanler with the given one, return the old one Args: handler: the new handler. A handler is a function taking no parameters and returning nothing Returns: the old handler """ import signal original_handler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, handler) return original_handler
[docs] class temporary_sigint_handler: """ Context manager to install a temporary sigint handler Example:: >>> def handler(): ... print("sigint detected!") >>> with teporary_sigint_handler(handler): ... # Do something here, handler will be called if SIGINT (ctrl-c) is received """ def __init__(self, handler): self.handler = handler self.original_handler = None def __enter__(self): self.original_handler = replace_sigint_handler(self.handler) def __exit__(self, type, value, traceback): replace_sigint_handler(self.original_handler) return True
[docs] def simplify_breakpoints(bps: list[T], coordsfunc: Callable, tolerance= 0.01 ) -> list[T]: """ Simplify breakpoints in a breakpoint function Assuming a list of some objects building a multisegmented line in 2D, simplify this line by eliminating superfluous breakpoints which don't contribute (enough) to the resolution of this line Args: bps: a list of breakpoints coordsfunc: a function of the form (breakpoint) -> (x, y) tolerance: if the difference between two consecutive slopes is below this threshold we assume that the two lines are colinear and we don't need the middle point Returns: the list of simplified breakpoints. The first and last breakpoints of the original will always be part of the result Example:: >>> @dataclasses.dataclass ... class Point: ... name: str ... x: float ... y: float >>> points = [Point("A", 0, 0), ... Point("B", 2, 0), ... Point("C", 3, 0), ... Point("D", 4, 1), ... Point("E", 5, 2)] >>> simplify_breakpoints(points, coordsfunc=(lambda p: p.x, p.y)) [Point(name="A", x=0, y=0), Point(name="C", x=3, y=0), Point(name="E", x=5, y=2)] """ if len(bps) <= 3: return bps def colinear(A, B, C, tolerance=0.01): Ax, Ay = coordsfunc(A) Bx, By = coordsfunc(B) Cx, Cy = coordsfunc(C) slopeAB = (By - Ay) / (Bx - Ax) slopeBC = (Cy - By) / (Cx - Bx) return abs(slopeAB-slopeBC) < tolerance A = bps[0] B = bps[1] simplified = [A] for C in bps[2:]: if not colinear(A, B, C, tolerance=tolerance): simplified.append(B) A = B B = C simplified.append(bps[-1]) return simplified
[docs] def rgb_to_hex(r: int, g: int, b: int) -> str: "Convert a color in rgb to its hex representation" return '#%02x%02x%02x'% (r, g, b)
_attrs_by_class: dict[type, list[str]] = {}
[docs] def find_attrs(obj, excludeprefix='_') -> list[str]: """ Iterate over all attributes of objects. Args: obj: the object to query excludeprefix: attributes starting with this prefix will be excluded Returns: a list of all the attibutes (instance variables) of this object. Notice that results are cached by class so if an object has dynamic attributes these will not be detected .. note:: This function will only return attributes, no methods, class variables, staticmethods, etc. Example ------- >>> class Foo: ... def __init__(self, a, b): ... self.a = a ... self.b = b ... >>> class Bar(Foo): ... def __init__(self, c): ... super().__init__(1, 2) ... self.c = c ... >>> bar = Bar(3) >>> find_attrs(bar) ['a', 'b', 'c'] """ cls = type(obj) if attrs := _attrs_by_class.get(cls): return attrs attrs = _find_attrs(obj, excludeprefix=excludeprefix) _attrs_by_class[cls] = attrs return attrs
def _find_attrs(obj, excludeprefix='_') -> list[str]: import inspect visited = set() out = [] if hasattr(obj, "__dict__"): for attr in sorted(obj.__dict__): if attr not in visited: if not attr.startswith(excludeprefix): out.append(attr) visited.add(attr) for cls in reversed(inspect.getmro(obj.__class__)): if hasattr(cls, "__slots__"): for attr in cls.__slots__: if hasattr(obj, attr) and attr not in visited: if not attr.startswith(excludeprefix): out.append(attr) visited.add(attr) return out
[docs] class ReprMixin: """ Mixin class to provide automatic __repr__ """ __slots__ = [] def __repr__(self): attrs = find_attrs(self) reprstr = ", ".join(f"{attr}={repr(getattr(self, attr))}" for attr in attrs) return f"{type(self).__name__}({reprstr})"
# –––––––––––––––––––––––––––––––––––––––––––––––––––––––––– # END # –––––––––––––––––––––––––––––––––––––––––––––––––––––––––– if __name__ == '__main__': import doctest doctest.testmod()