Source code for pyrsistent_extras._pheap

from __future__ import annotations
from typing import Collection, Iterable, Iterator, Hashable, \
	ClassVar, TypeVar, Generic, Optional, Callable, Tuple, Any, Union, cast
from abc import abstractmethod

import itertools
import operator
import builtins

from ._util import Comparable, compare_next, compare_iter, sphinx_build

T = TypeVar('T')
K = TypeVar('K', bound=Comparable)
V = TypeVar('V')

class Tree(Generic[K,V]):
	__slots__ = ('_key', '_value', '_child', '_sibling')

	_key: K
	_value: V
	_child: Optional[Tree[K,V]]
	_sibling: Optional[Tree[K,V]]

	def __new__(cls, key:K, value:V,
			child:Optional[Tree[K,V]], sibling:Optional[Tree[K,V]]):
		self = super().__new__(cls)
		self._key = key
		self._value = value
		self._child = child
		self._sibling = sibling
		return self

	def merge(self, other:Optional[Tree[K,V]], down:bool) -> Tree[K,V]:
		if other is None: return self
		if (self._key < other._key) == down:
			self, other = other, self
		assert self._sibling is None
		assert other._sibling is None
		other = Tree(other._key, other._value, other._child, self._child)
		return Tree(self._key, self._value, other, None)

class Forest(Generic[K,V]):
	__slots__ = ('_order', '_tree', '_next')

	_order: int
	_tree: Tree[K,V]
	_next: Optional[Forest[K,V]]

	def __new__(cls, order:int, tree:Tree[K,V], next:Optional[Forest[K,V]]):
		self = super().__new__(cls)
		self._order = order
		self._tree = tree
		self._next = next
		return self

	def push(self:Optional[Forest[K,V]], down:bool,
			order:int, tree:Tree[K,V]) -> Forest[K,V]:
		if self is None:
			return Forest(order, tree, None)
		if order < self._order:
			return Forest(order, tree, self)
		if order > self._order:
			return Forest(self._order, self._tree,
				Forest.push(self._next, down, order, tree))
		return Forest.push(self._next, down, order + 1,
			self._tree.merge(tree, down))

	def popbranch(self:Forest[K,V], down:bool) \
			-> Tuple[int,Optional[Tree[K,V]],K,V,Optional[Forest[K,V]]]:
		assert self._tree._sibling is None
		if self._next is None:
			return self._order - 1, self._tree._child, \
				self._tree._key, self._tree._value, None
		order, branch, key, value, forest = self._next.popbranch(down)
		if (self._tree._key < key) != down:
			return self._order - 1, self._tree._child, \
				self._tree._key, self._tree._value, self._next
		while order >= self._order:
			assert branch is not None
			tree = Tree(branch._key, branch._value, branch._child, None)
			forest = Forest.push(forest, down, order, tree)
			order, branch = order - 1, branch._sibling
		return order, branch, key, value, \
			Forest.push(forest, down, self._order, self._tree)

	def pop(self:Forest[K,V], down:bool) -> Tuple[K,V,Optional[Forest[K,V]]]:
		order, branch, key, value, forest = self.popbranch(down)
		while branch is not None:
			tree = Tree(branch._key, branch._value, branch._child, None)
			forest = Forest.push(forest, down, order, tree)
			order, branch = order - 1, branch._sibling
		return key, value, forest

	def merge(self:Optional[Forest[K,V]], other:Optional[Forest[K,V]],
			down:bool) -> Optional[Forest[K,V]]:
		if self is None: return other
		if other is None: return self
		if self._order < other._order:
			return Forest(self._order, self._tree,
				Forest.merge(self._next, other, down))
		if self._order > other._order:
			return Forest(other._order, other._tree,
				Forest.merge(other._next, self, down))
		forest = Forest.merge(self._next, other._next, down)
		return Forest.push(forest, down, self._order + 1,
			self._tree.merge(other._tree, down))

[docs]class PHeapView(Generic[K,V,T], Collection[T]): __slots__ = ('_queue', '_sorted') if not sphinx_build: _queue: PHeap[K,V] _sorted: bool def __new__(cls, _queue, _sorted): self = super().__new__(cls) self._queue = _queue self._sorted = _sorted return self
[docs] def __contains__(self, item): r''' Check if the item is in the heap :math:`O(n)` or :math:`O(n\log{n})` >>> 1 in pminheap([(1,'a'), (2,'b'), (3,'c')]).keys() True >>> 'a' in pminheap([(1,'a'), (2,'b'), (3,'c')]).values() True >>> (1, 'a') in pminheap([(1,'a'), (2,'b'), (3,'c')]).items() True ''' return any(item == i for i in self)
[docs] def __len__(self): r''' Get the size of the heap :math:`O(1)` >>> len(pminheap([(1,'a'), (2,'b'), (3,'c')]).keys()) 3 >>> len(pminheap([(1,'a'), (2,'b'), (3,'c')]).values()) 3 >>> len(pminheap([(1,'a'), (2,'b'), (3,'c')]).items()) 3 ''' return self._queue._size
def _iter_unsorted(self) -> Iterator[Tuple[K,V]]: if self._queue._size == 0: return yield self._queue._key, self._queue._value forest = self._queue._forest stack:list[Tree[K,V]] = [] while forest is not None: stack.append(forest._tree) while stack: item = stack.pop() if isinstance(item, Tree): yield item._key, item._value if item._sibling is not None: stack.append(item._sibling) if item._child is not None: stack.append(item._child) forest = forest._next def _iter_sorted(self) -> Iterator[Tuple[K,V]]: queue = self._queue while queue._size > 0: key, value, queue = queue.pop() yield key, value def _iter(self) -> Iterator[Tuple[K,V]]: if self._sorted: return self._iter_sorted() return self._iter_unsorted()
[docs] @abstractmethod def __iter__(self) -> Iterator[T]: r''' Iterate through the heap view :math:`O(n)` or :math:`O(n\log{n})` >>> list(pminheap([(1,'a'), (2,'b'), (3,'c')]).keys()) [1, 2, 3] >>> list(pminheap([(1,'a'), (2,'b'), (3,'c')]).values()) ['a', 'b', 'c'] >>> list(pminheap([(1,'a'), (2,'b'), (3,'c')]).items()) [(1, 'a'), (2, 'b'), (3, 'c')] ''' return NotImplemented
class PHeapKeys(PHeapView[K,V,K]): def __iter__(self) -> Iterator[K]: return (k for k, v in super()._iter()) class PHeapValues(PHeapView[K,V,V]): def __iter__(self) -> Iterator[V]: return (v for k, v in super()._iter()) class PHeapItems(PHeapView[K,V,Tuple[K,V]]): def __iter__(self) -> Iterator[Tuple[K,V]]: return super()._iter() class PHeap(Generic[K,V], Collection[K], Hashable): r''' Persistent heap Meant for cases where a priority queue is needed. Do not instantiate directly, instead use the factory functions :func:`hl` or :func:`pminheap` to create a min-heap, and :func:`hg` or :func:`pmaxheap` to create a max-heap. The :class:`PMinHeap` and :class:`PMaxHeap` implements the :class:`python:typing.Container` protocol and is :class:`python:typing.Hashable`. The implementation is a binomial heap. Merge/pop operations are :math:`O(\log{n})`, and peek/push operations are :math:`O(1)`. Operations are not stable: values pushed with the same key may be popped in a different order. The following are examples of some common operations on persistent heaps: >>> heap1 = pminheap([(1,'a'), (2,'b')]) >>> heap1 pminheap([(1, 'a'), (2, 'b')]) >>> heap2 = heap1.push(3,'c') >>> heap2 pminheap([(1, 'a'), (2, 'b'), (3, 'c')]) >>> heap3 = heap1 + heap2 >>> heap3 pminheap([(1, 'a'), (1, 'a'), (2, 'b'), (2, 'b'), (3, 'c')]) >>> key, value, heap4 = heap3.pop() >>> (key, value) (1, 'a') >>> heap4 pminheap([(1, 'a'), (2, 'b'), (2, 'b'), (3, 'c')]) ''' __slots__ = ('_size', '_key', '_value', '_forest') if not sphinx_build: _size: int _key: K _value: V _forest: Optional[Forest[K,V]] _name: ClassVar[str] = cast(Any, None) _down: ClassVar[bool] = cast(Any, None) _empty: ClassVar[PHeap[Any,Any]] = cast(Any, None) def __new__(cls, _size, _key, _value, _forest): self = super().__new__(cls) self._size = _size self._key = _key self._value = _value self._forest = _forest return self def push(self, key:K, value:V=cast(V,None)) -> PHeap[K,V]: r''' Inserts a value with the specified key amortized :math:`O(1)`, worst case :math:`O(\log{n})` >>> pminheap([(1,'a'), (2,'b')]).push(3,'c') pminheap([(1, 'a'), (2, 'b'), (3, 'c')]) >>> pmaxheap([(1,'a'), (2,'b')]).push(3,'c') pmaxheap([(3, 'c'), (2, 'b'), (1, 'a')]) ''' if self._size == 0: return type(self)(1, key, value, None) if (key < self._key) != self._down: k1, v1, k2, v2 = key, value, self._key, self._value else: k2, v2, k1, v1 = key, value, self._key, self._value return type(self)(self._size + 1, k1, v1, Forest.push(self._forest, self._down, 0, Tree(k2, v2, None, None))) def pop(self) -> Tuple[K,V,PHeap[K,V]]: r''' Remove and return the min/max item :math:`O(\log{n})` :raises IndexError: if the heap is empty >>> pminheap([(1,'a'), (2,'b'), (3,'c')]).pop() (1, 'a', pminheap([(2, 'b'), (3, 'c')])) >>> pmaxheap([(1,'a'), (2,'b'), (3,'c')]).pop() (3, 'c', pmaxheap([(2, 'b'), (1, 'a')])) >>> pminheap([]).pop() Traceback (most recent call last): ... IndexError: ... ''' if self._size == 0: raise IndexError('pop from empty heap') if self._forest is None: return self._key, self._value, self._empty key, value, forest = self._forest.pop(self._down) return self._key, self._value, type(self)(self._size - 1, key, value, forest) def merge(self, other:PHeapLike[K,V]) -> PHeap[K,V]: r''' Return the union of the two heaps amortized :math:`O(\log(\min(n,m)))`, worst-case :math:`O(\log(\max(n,m)))` >>> pminheap([(1,'a'), (3,'c')]) + pminheap([(2,'b'), (4,'d')]) pminheap([(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')]) >>> pmaxheap([(1,'a'), (3,'c')]) + pmaxheap([(2,'b'), (4,'d')]) pmaxheap([(4, 'd'), (3, 'c'), (2, 'b'), (1, 'a')]) ''' other = self._fromitems(other) if self._size == 0: return other if other._size == 0: return self if (self._key < other._key) == self._down: self, other = other, self forest = Forest.merge(self._forest, other._forest, self._down) forest = Forest.push(forest, self._down, 0, Tree(other._key, other._value, None, None)) return type(self)(self._size + other._size, self._key, self._value, forest) __add__ = merge def peek(self) -> Tuple[K,V]: r''' Find the min/max element :math:`O(1)` :raises IndexError: if the heap is empty >>> pminheap([(1,'a'), (2,'b'), (3,'c')]).peek() (1, 'a') >>> pmaxheap([(1,'a'), (2,'b'), (3,'c')]).peek() (3, 'c') >>> pminheap().peek() Traceback (most recent call last): ... IndexError: ... ''' if self._size == 0: raise IndexError('min of empty heap') return self._key, self._value def __len__(self) -> int: r''' Get the size of the heap :math:`O(1)` >>> len(pminheap([(1,'a'), (2,'b'), (3,'c')])) 3 ''' return self._size def __bool__(self) -> bool: return self._size != 0 def __contains__(self, key) -> bool: return key in PHeapKeys(self, False) def __repr__(self) -> str: return '{}({})'.format(self._name, list(self.items())) def items(self, sorted:bool=True) -> PHeapItems[K,V]: r''' Create a view of the heap's items :math:`O(1)` Iterating over the entire heap is :math:`O(n\log{n})` for ``sorted=True`` and :math:`O(n)` for ``sorted=False``. Similar to :meth:`python:dict.items`. See :class:`PHeapView`. >>> list(pminheap([(1,'a'), (2,'b'), (3,'c')]).items()) [(1, 'a'), (2, 'b'), (3, 'c')] >>> list(pmaxheap([(1,'a'), (2,'b'), (3,'c')]).items()) [(3, 'c'), (2, 'b'), (1, 'a')] ''' return PHeapItems(self, sorted) def keys(self, sorted:bool=True) -> PHeapKeys[K,V]: r''' Create a view of the heap's keys :math:`O(1)` Iterating over the entire heap is :math:`O(n\log{n})` for ``sorted=True`` and :math:`O(n)` for ``sorted=False``. Similar to :meth:`python:dict.keys`. See :class:`PHeapView`. >>> list(pminheap([(1,'a'), (2,'b'), (3,'c')]).keys()) [1, 2, 3] >>> list(pmaxheap([(1,'a'), (2,'b'), (3,'c')]).keys()) [3, 2, 1] ''' return PHeapKeys(self, sorted) def values(self, sorted:bool=True) -> PHeapValues[K,V]: r''' Create a view of the heap's values :math:`O(1)` Iterating over the entire heap is :math:`O(n\log{n})` for ``sorted=True`` and :math:`O(n)` for ``sorted=False``. Similar to :meth:`python:dict.values`. See :class:`PHeapView`. >>> list(pminheap([(1,'a'), (2,'b'), (3,'c')]).values()) ['a', 'b', 'c'] >>> list(pmaxheap([(1,'a'), (2,'b'), (3,'c')]).values()) ['c', 'b', 'a'] ''' return PHeapValues(self, sorted) def __iter__(self) -> Iterator[K]: r''' Iterate through the heap's keys :math:`O(1)` Iterating over the entire heap is :math:`O(n\log{n})` See :meth:`keys`. >>> list(pminheap([(1,'a'), (2,'b'), (3,'c')]).keys()) [1, 2, 3] >>> list(pmaxheap([(1,'a'), (2,'b'), (3,'c')]).keys()) [3, 2, 1] ''' return iter(PHeapKeys(self, True)) @classmethod def _fromitems(cls, items:PHeapLike[K,V]) -> PHeap[K,V]: if isinstance(items, PHeap): if items._down == cls._down: return items return cls._fromitems(items.items(sorted=False)) size, forest = 0, None for key, value in items: forest = Forest.push(forest, cls._down, 0, Tree(key, value, None, None)) size += 1 if forest is None: return cls._empty key, value, forest = forest.pop(cls._down) return cls(size, key, value, forest) def _iter_sort_values(self) -> Iterator[Tuple[K,V]]: for k, vs in itertools.groupby(self.items(), key=operator.itemgetter(0)): yield from sorted(vs, key=operator.itemgetter(1)) def _iter_group_values(self) -> Iterator[Tuple[K,Iterator[Tuple[K,V]]]]: return itertools.groupby(self.items(), key=operator.itemgetter(0)) def _compare(self, other:PHeap[K,V], equality:bool) -> int: if not isinstance(other, PHeap): return NotImplemented if self._down != other._down: return NotImplemented if equality: if self is other: return 0 if len(self) != len(other): return 1 try: return compare_iter(self._iter_sort_values(), other._iter_sort_values(), equality) except TypeError as err: if not equality: raise xiter = self._iter_group_values() yiter = other._iter_group_values() while True: n = compare_next(xiter, yiter) if isinstance(n, int): return n if n[0][0] != n[1][0]: return 1 xvalues = [v for _, v in n[0][1]] yvalues = [v for _, v in n[1][1]] if len(xvalues) != len(yvalues): return 1 for x in xvalues: try: yvalues.remove(x) except ValueError: return 1 assert not yvalues def __eq__(self, other) -> bool: r''' Return self == other :math:`O(n\log{n})` if values are comparable :math:`O(n\log{n}+g^2)` if values are not comparable, where `g` is the number of values with the same key >>> xs = pminheap([(1,'a'), (2,'b'), (3,'c')]) >>> xs == pminheap([(1,'a'), (2,'b'), (3,'c')]) True >>> xs == pminheap([(2,'b'), (3,'c'), (4,'d')]) False ''' result = self._compare(other, True) if result is NotImplemented: return NotImplemented return result == 0 def __ne__(self, other) -> bool: r''' Return self != other :math:`O(n\log{n})` if values are comparable :math:`O(n\log{n}+g^2)` if values are not comparable, where `g` is the number of values with the same key >>> xs = pminheap([(1,'a'), (2,'b'), (3,'c')]) >>> xs != pminheap([(1,'a'), (2,'b'), (3,'c')]) False >>> xs != pminheap([(2,'b'), (3,'c'), (4,'d')]) True ''' result = self._compare(other, True) if result is NotImplemented: return NotImplemented return result != 0 def __gt__(self, other) -> bool: r''' Return self > other :math:`O(n\log{n})` :raises TypeError: if values are not comparable >>> xs = pminheap([(1,'a'), (2,'b'), (3,'c')]) >>> xs > pminheap([(1,'a'), (2,'b'), (3,'c')]) False >>> xs > pminheap([(2,'b'), (3,'c'), (4,'d')]) False >>> xs > pminheap([(0,'z'), (1,'a'), (2,'b')]) True ''' result = self._compare(other, False) if result is NotImplemented: return NotImplemented return result > 0 def __ge__(self, other) -> bool: r''' Return self >= other :math:`O(n\log{n})` :raises TypeError: if values are not comparable >>> xs = pminheap([(1,'a'), (2,'b'), (3,'c')]) >>> xs >= pminheap([(1,'a'), (2,'b'), (3,'c')]) True >>> xs >= pminheap([(2,'b'), (3,'c'), (4,'d')]) False >>> xs >= pminheap([(0,'z'), (1,'a'), (2,'b')]) True ''' result = self._compare(other, False) if result is NotImplemented: return NotImplemented return result >= 0 def __lt__(self, other) -> bool: r''' Return self < other :math:`O(n\log{n})` :raises TypeError: if values are not comparable >>> xs = pminheap([(1,'a'), (2,'b'), (3,'c')]) >>> xs < pminheap([(1,'a'), (2,'b'), (3,'c')]) False >>> xs < pminheap([(2,'b'), (3,'c'), (4,'d')]) True >>> xs < pminheap([(0,'z'), (1,'a'), (2,'b')]) False ''' result = self._compare(other, False) if result is NotImplemented: return NotImplemented return result < 0 def __le__(self, other) -> bool: r''' Return self <= other :math:`O(n\log{n})` :raises TypeError: if values are not comparable >>> xs = pminheap([(1,'a'), (2,'b'), (3,'c')]) >>> xs <= pminheap([(1,'a'), (2,'b'), (3,'c')]) True >>> xs <= pminheap([(2,'b'), (3,'c'), (4,'d')]) True >>> xs <= pminheap([(0,'z'), (1,'a'), (2,'b')]) False ''' result = self._compare(other, False) if result is NotImplemented: return NotImplemented return result <= 0 def __hash__(self) -> int: r''' Calculate the hash of the heap. :math:`O(n)` :raises TypeError: if values are not comparable >>> x1 = pminheap([(1,'a'), (2,'b'), (3,'c')]) >>> x2 = pminheap([(1,'a'), (2,'b'), (3,'c')]) >>> hash(x1) == hash(x2) True ''' h = hash(self._name) for k, vs in itertools.groupby(self.items(), key=operator.itemgetter(0)): h = hash((h, k, tuple(sorted(vs)))) return h def __reduce__(self): r''' Support method for :mod:`python:pickle` :math:`O(n)` >>> import pickle >>> pickle.loads(pickle.dumps(pminheap([(1,'a'), (2,'b'), (3,'c')]))) pminheap([(1, 'a'), (2, 'b'), (3, 'c')]) ''' return globals()[self._name], (tuple(self.items(False)),) PHeapLike = Union[PHeap[K,V],Iterable[Tuple[K,V]]]
[docs]class PMinHeap(PHeap[K,V]): __doc__ = PHeap.__doc__ _name: ClassVar[str] = 'pminheap' _down: ClassVar[bool] = False _empty: ClassVar[PMinHeap[Any,Any]] = cast(Any, None)
PMinHeap._empty = PMinHeap(0, None, None, None) # type: ignore
[docs]def pminheap(items:PHeapLike[K,V]=PMinHeap._empty) -> PMinHeap[K,V]: r''' Create a :class:`PMinHeap` from the given items :math:`O(n)` >>> pminheap() pminheap([]) >>> pminheap([(1,'a'), (2,'b'), (3,'c')]) pminheap([(1, 'a'), (2, 'b'), (3, 'c')]) ''' return cast(PMinHeap, PMinHeap._fromitems(items))
def pminheap_fromkeys(items:Iterable[K], value:V=None) -> PMinHeap[K,V]: r''' Create a :class:`PMinHeap` using a default value :math:`O(n)` >>> pminheap.fromkeys([1, 2, 3]) pminheap([(1, None), (2, None), (3, None)]) >>> pminheap.fromkeys([1, 2, 3], value='a') pminheap([(1, 'a'), (2, 'a'), (3, 'a')]) ''' val: V = cast(V, value) return pminheap((item, val) for item in items) setattr(pminheap, 'fromkeys', pminheap_fromkeys)
[docs]def hl(*items:Tuple[K,V]) -> PMinHeap[K,V]: ''' Shorthand for :func:`pminheap` Mnemonic: Heap Less-than >>> hl((1,'a'), (2,'b'), (3,'c')) pminheap([(1, 'a'), (2, 'b'), (3, 'c')]) ''' return pminheap(items)
[docs]class PMaxHeap(PHeap[K,V]): __doc__ = PHeap.__doc__ _name: ClassVar[str] = 'pmaxheap' _down: ClassVar[bool] = True _empty: ClassVar[PMaxHeap[Any,Any]] = cast(Any, None)
PMaxHeap._empty = PMaxHeap(0, None, None, None) # type: ignore
[docs]def pmaxheap(items:PHeapLike[K,V]=PMaxHeap._empty) -> PMaxHeap[K,V]: r''' Create a :class:`PMaxHeap` from the given items :math:`O(n)` >>> pmaxheap() pmaxheap([]) >>> pmaxheap([(1,'a'), (2,'b'), (3,'c')]) pmaxheap([(3, 'c'), (2, 'b'), (1, 'a')]) ''' return cast(PMaxHeap, PMaxHeap._fromitems(items))
def pmaxheap_fromkeys(items:Iterable[K], value:V=None) -> PMaxHeap[K,V]: r''' Create a :class:`PMaxHeap` using a default value :math:`O(n)` >>> pmaxheap.fromkeys([1, 2, 3]) pmaxheap([(3, None), (2, None), (1, None)]) >>> pmaxheap.fromkeys([1, 2, 3], value='a') pmaxheap([(3, 'a'), (2, 'a'), (1, 'a')]) ''' val: V = cast(V, value) return pmaxheap((item, val) for item in items) setattr(pmaxheap, 'fromkeys', pmaxheap_fromkeys)
[docs]def hg(*items:Tuple[K,V]) -> PMaxHeap[K,V]: ''' Shorthand for :func:`pmaxheap` Mnemonic: Heap Greater-than >>> hg((1,'a'), (2,'b'), (3,'c')) pmaxheap([(3, 'c'), (2, 'b'), (1, 'a')]) ''' return pmaxheap(items)
__all__: Tuple[str, ...] = \ ('hl', 'pminheap', 'PMinHeap', 'hg', 'pmaxheap', 'PMaxHeap') if sphinx_build: __all__ += ('PHeapView',)