Source code for hippylibX.algorithms.multivector

# --------------------------------------------------------------------------bc-
# Copyright (C) 2024 The University of Texas at Austin
#
# This file is part of the hIPPYlibx library. For more information and source
# code availability see https://hippylib.github.io.
#
# SPDX-License-Identifier: GPL-2.0-only
# --------------------------------------------------------------------------ec-

import numpy as np
import petsc4py
from typing import Union
from typing import Type
import petsc4py.PETSc


[docs] class MultiVector: """ Multivector object is created as a list of PETSc.petsc4py.Vec objects. """ def __init__(self, example_vec: petsc4py.PETSc.Vec, nvec: int): self.nvec = nvec self.data = [] for i in range(self.nvec): self.data.append(example_vec.duplicate())
[docs] @classmethod def createFromVec( cls, example_vec: petsc4py.PETSc.Vec, nvec: int ) -> Type["MultiVector"]: """ Create multivector from sample petsc4py vector whose parallel distribution is to be replicated. """ return cls(example_vec, nvec)
[docs] @classmethod def createFromMultiVec(cls, mv: Type["MultiVector"]) -> Type["MultiVector"]: """ Create multivector from another MultiVector whose parallel distribution is to be replicated. """ mv_copy = cls(mv[0], mv.nvec) for i in range(mv_copy.nvec): mv_copy.data[i] = mv.data[i].duplicate(mv.data[i].getArray()) return mv_copy
def __del__(self) -> None: for d in self.data: d.destroy() def __getitem__(self, k: int) -> petsc4py.PETSc.Vec: return self.data[k]
[docs] def scale(self, alpha: Union[float, np.ndarray]) -> None: """ Scale each value in the Multivector - either by a single float value or a numpy array of float values. """ if isinstance(alpha, float): for d in self.data: d.scale(alpha) else: for i, d in enumerate(self.data): d.scale(alpha[i])
[docs] def dot(self, v: Union[petsc4py.PETSc.Vec, Type["MultiVector"]]) -> np.array: """ Perform dot product of a MultiVector object and petsc4py Vec object, store result in numpy array. """ if isinstance(v, petsc4py.PETSc.Vec): return_values = np.zeros(self.nvec) for i in range(self.nvec): return_values[i] = self[i].dot(v) elif isinstance(v, MultiVector): return_values = np.zeros((self.nvec, v.nvec)) for i in range(self.nvec): for j in range(v.nvec): return_values[i, j] = self[i].dot(v[j]) return return_values
[docs] def reduce(self, y: petsc4py.PETSc.Vec, alpha: np.array) -> None: """ Reduction of petsc4py Vec using values in a numpy array stored in each data element of MultiVector object. """ for i in range(self.nvec): y.axpy(alpha[i], self[i])
[docs] def axpy(self, alpha: Union[float, np.array], Y: Type["MultiVector"]) -> None: """ Reduction of MultiVector object with a float or values in a numpy array stored in another MultiVector object. """ if isinstance(alpha, float): for i in range(self.nvec): self[i].axpy(alpha, Y[i]) else: for i in range(self.nvec): self[i].axpy(alpha[i], Y[i])
[docs] def norm(self, norm_type: petsc4py.PETSc.NormType) -> np.array: """ Return numpy array containing norm of each data element of MultiVector. """ norm_vals = np.zeros(self.nvec) for i in range(self.nvec): norm_vals[i] = self[i].norm(norm_type) return norm_vals
[docs] def Borthogonalize(self, B: petsc4py.PETSc.Mat): """ Returns :math:`QR` decomposition of self. :math:`Q` and :math:`R` satisfy the following relations in exact arithmetic .. math:: QR \\,= \\,Z, && (1), Q^*BQ\\, = \\, I, && (2), Q^*BZ \\, = \\,R, && (3), ZR^{-1} \\, = \\, Q, && (4). Returns: :code:`Bq` of type :code:`MultiVector` -> :code:`B`:math:`^{-1}`-orthogonal vectors :code:`r` of type :code:`ndarray` -> The :math:`r` of the QR decomposition. .. note:: :code:`self` is overwritten by :math:`Q`. """ return self._mgs_stable(B)
[docs] def _mgs_stable( self, B: petsc4py.PETSc.Mat ) -> tuple[Type["MultiVector"], np.array]: """ Returns :math:`QR` decomposition of self, which satisfies conditions (1)--(4). Uses Modified Gram-Schmidt with re-orthogonalization (Rutishauser variant) for computing the :math:`B`-orthogonal :math:`QR` factorization. References: 1. `A.K. Saibaba, J. Lee and P.K. Kitanidis, Randomized algorithms for Generalized \ Hermitian Eigenvalue Problems with application to computing \ Karhunen-Loe've expansion http://arxiv.org/abs/1307.6885` 2. `W. Gander, Algorithms for the QR decomposition. Res. Rep, 80(02), 1980` https://github.com/arvindks/kle """ n = self.nvec Bq = MultiVector(self[0], n) r = np.zeros((n, n), dtype="d") reorth = np.zeros((n,), dtype="d") eps = np.finfo(np.float64).eps for k in np.arange(n): B.mult(self[k], Bq[k]) t = np.sqrt(Bq[k].dot(self[k])) nach = 1 u = 0 while nach: u += 1 for i in np.arange(k): s = Bq[i].dot(self[k]) r[i, k] += s self[k].axpy(-s, self[i]) B.mult(self[k], Bq[k]) tt = np.sqrt(Bq[k].dot(self[k])) if tt > t * 10.0 * eps and tt < t / 10.0: nach = 1 t = tt else: nach = 0 if tt < 10.0 * eps * t: tt = 0.0 reorth[k] = u r[k, k] = tt if np.abs(tt * eps) > 0.0: tt = 1.0 / tt else: tt = 0.0 self[k].scale(tt) Bq[k].scale(tt) return Bq, r
[docs] def MatMvMult(A: petsc4py.PETSc.Mat, x: MultiVector, y: MultiVector) -> None: assert x.nvec == y.nvec, "x and y have non-matching number of vectors" if hasattr(A, "matMvMult"): A.matMvMult(x, y) else: for i in range(x.nvec): A.mult(x[i], y[i])
[docs] def MatMvTranspmult(A: petsc4py.PETSc.Mat, x: MultiVector, y: MultiVector) -> None: assert x.nvec == y.nvec, "x and y have non-matching number of vectors" assert hasattr(A, "transpmult"), "A does not have transpmult method implemented" if hasattr(A, "matMvTranspmult"): A.matMvTranspmult(x, y) else: for i in range(x.nvec): A.multTranspose(x[i], y[i])
[docs] def MvDSmatMult(X: MultiVector, A: np.array, Y: MultiVector) -> None: assert X.nvec == A.shape[0], ( "X Number of vecs incompatible with number of rows in A" ) assert Y.nvec == A.shape[1], ( "Y Number of vecs incompatible with number of cols in A" ) for j in range(Y.nvec): Y[j].scale(0.0) X.reduce(Y[j], A[:, j].flatten())