-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblockvec.py
239 lines (180 loc) · 6.3 KB
/
blockvec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
"""
This module contains the block vector definition
"""
from typing import TypeVar, List
import functools as ftls
import numpy as np
from . import subops as gops
from .blockarray import BlockArray
from .blockmat import BlockMatrix
from . import require_petsc, _HAS_PETSC
if _HAS_PETSC:
from petsc4py import PETSc
## pylint: disable=no-member
# Type variable for a 'sub'vector
T = TypeVar('T')
class BlockVector(BlockArray[T]):
"""
Represents a block vector with blocks indexed by labels
"""
def __init__(
self, subarrays, shape=None, labels=None, wrap=gops.wrap, check_bshape=True
):
super().__init__(subarrays, shape, labels, wrap=wrap, check_bshape=check_bshape)
if len(self.shape) > 1:
raise ValueError(
f"BlockVector must have dimension == 1, not {len(self.shape)}"
)
## Conversion to monolithic formats
def to_mono_ndarray(self):
ndarray_vecs = [np.array(vec) for vec in self.sub_blocks]
return np.concatenate(ndarray_vecs, axis=0)
@require_petsc
def to_mono_petsc_seq(self, comm=None):
total_size = np.sum(self.mshape)
vec = PETSc.Vec().createSeq(total_size, comm=comm)
vec.setUp()
vec.setArray(self.to_mono_ndarray())
vec.assemble()
return vec
@require_petsc
def to_mono_petsc(self, comm=None):
total_size = np.sum(self.mshape)
vec = PETSc.Vec().create(comm=comm)
vec.setSizes(total_size)
vec.setUp()
vec.setArray(self.to_mono_ndarray())
vec.assemble()
return vec
def set_mono(self, vec: T):
"""
Set values from an equivalent monolithic vector
"""
# Check sizes are compatible
assert vec.size == np.sum(self.bshape[0])
# indices of the boundaries of each block
n_blocks = np.concatenate(([0], np.cumsum(self.bshape[0])))
for i, (n_start, n_stop) in enumerate(zip(n_blocks[:-1], n_blocks[1:])):
self[i][:] = vec[n_start:n_stop]
## Special vector operations
def norm(self):
return dot(self, self) ** 0.5
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
# Import this here to avoid ciruclar import errors
# TODO: Fix bad module layout?
from . import ufunc as _ufunc
return _ufunc.apply_ufunc_mat_vec(ufunc, method, *inputs, **kwargs)
# TODO: Remove this unused code?
def validate_blockvec_size(*args):
"""
Check if a collection of BlockVecs have compatible block sizes
"""
ref_bsize = args[0].bshape[0]
valid_bsizes = [arg.bshape[0] == ref_bsize for arg in args]
return all(valid_bsizes)
# Utilities
def chunk(bvec, block_sizes):
"""
Splits a block vector into multiple block vectors
"""
block_cumul_sizes = [0] + np.cumsum(block_sizes).tolist()
split_bvecs = [
bvec[ii:jj] for ii, jj in zip(block_cumul_sizes[:-1], block_cumul_sizes[1:])
]
return tuple(split_bvecs)
def concatenate(args, labels=None):
"""
Concatenate a series of BlockVecs into a single BlockVector
Parameters
----------
args : List of BlockVector
"""
if labels is None:
labels = [ftls.reduce(lambda a, b: a + b, [bvec.labels[0] for bvec in args])]
vecs = np.concatenate([bvec.blocks for bvec in args])
return BlockVector(vecs, labels=labels)
def concatenate_with_prefix(bvecs: List[BlockVector], prefix=''):
"""
Concatenate a series of block vectors with a numbered prefix
"""
labels = tuple(
f'{prefix}{n}.{label}'
for n, bvec in enumerate(bvecs)
for label in bvec.labels[0]
)
return concatenate(bvecs, (labels,))
# Converting subtypes
@require_petsc
def convert_subtype_to_petsc(bvec):
"""
Converts a block matrix from one submatrix type to the PETSc submatrix type
Parameters
----------
bmat: BlockMatrix
"""
vecs = [gops.convert_vec_to_petsc(subvec) for subvec in bvec.blocks]
return BlockVector(vecs, labels=bvec.labels)
def convert_subtype_to_numpy(bvec):
"""
Converts a block matrix from one submatrix type to the PETSc submatrix type
Parameters
----------
bmat: BlockMatrix
"""
vecs = [gops.convert_vec_to_numpy(subvec) for subvec in bvec.blocks]
return BlockVector(vecs, labels=bvec.labels)
# Converting to monolithic vectors
@require_petsc
def to_mono_petsc(bvec, comm=None, finalize=True):
raise NotImplementedError()
# Converting to block matrix formats
@require_petsc
def to_block_rowmat(bvec):
mats = tuple(gops.convert_vec_to_rowmat(subvec) for subvec in bvec.sub_blocks)
return BlockMatrix(mats, shape=(1, len(mats)))
@require_petsc
def to_block_colmat(bvec):
mats = tuple(gops.convert_vec_to_colmat(subvec) for subvec in bvec.sub_blocks)
return BlockMatrix(mats, shape=(len(mats), 1))
# Basic operations
def dot(a, b):
"""
Return the dot product of a and b
"""
c = a * b
ret = 0
for subvec in c.sub_blocks:
# using the [:] indexing notation makes sum interpret the different data types as np arrays
# which can improve performance a lot
ret += sum(subvec[:])
return ret
def norm(a):
"""Return the 2-norm of a vector"""
return dot(a, a) ** 0.5
class MonotoBlock:
def __init__(self, bvec):
self.bvec = bvec
def __setitem__(self, key, value):
assert isinstance(key, slice)
total_size = np.sum(self.bvec.size)
# Let n refer to a monolithic index
# and m refer to a block index (from 0 to # of blocks-1)
nstart, nstop, nstep = self.slice_to_numeric(key, total_size)
# Get the monolithic ending indices of each block
# nblock = np.concatenate(np.cumsum(self.bvec.size))
# istart = np.where()
# istop = 0, 0
raise NotImplementedError("do this later I guess")
def slice_to_numeric(self, slice_obj, array_len):
nstart, nstop, nstep = 0, 0, 1
if slice_obj.start is not None:
nstart = slice_obj.start
if slice_obj.stop is None:
nstop = array_len + 1
elif slice_obj.stop < 0:
nstop = array_len + slice_obj.stop
else:
nstop = slice_obj.stop
if slice_obj.step is not None:
nstep = slice_obj.step
return (nstart, nstop, nstep)