The comprehension statements are far too long and complicated and need to be broken up (but shouldn't exist at all).
The easy vectorisation pass involves replacing all of the comprehensions with vector calls. Also, don't start diffs
at the scalar 0; it should start as a vector of the correct size. Don't use itertools.product
; a Cartesian product is possible with a pure Numpy vector and can be used directly as an index.
A more complex vectorisation pass involves removing all loops, and making a linear operator. If you know the dimensions of the problem and can reuse those while repeatedly calling the function with different data, you can run a preparatory step where you calculate the linear operator.
Which method is fastest depends on the size of the input, but in no case is the original method the fastest.
import functools
import itertools
import timeit
import typing
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn
from numpy.lib.stride_tricks import sliding_window_view
def func_op(arr):
diffs = 0
max_k = 6
for n in range(1, max_k):
arr1 = np.array(
[np.sum(i, axis=1) for i in sliding_window_view(arr, window_shape=n, axis=1)])
# this function uses np.maximum and np.minimum to subtract the max and min elements (element-wise) between two rows and then sum up the entire of that subtraction
diffs += np.sum((np.array([np.maximum(arr1[i[0]], arr1[i[1]]) for i in
itertools.product(np.arange(len(arr1)), np.arange(len(arr1)))]) - np.array(
[np.minimum(arr1[i[0]], arr1[i[1]]) for i in
itertools.product(np.arange(len(arr1)), np.arange(len(arr1)))])), axis=1) * n
diffs = diffs.reshape(len(arr), -1)
return diffs
def func_semivec(arr: np.ndarray, max_k: int = 6) -> np.ndarray:
m = arr.shape[0]
idx = np.arange(m)
products = np.stack(np.meshgrid(idx, idx))
diffs = np.zeros(shape=(m, m), dtype=arr.dtype)
for n in range(1, max_k):
slid = sliding_window_view(arr, window_shape=n, axis=1)
totals = slid.sum(axis=-1)
arr1 = totals[products]
amin = arr1.min(axis=0)
amax = arr1.max(axis=0)
diffs += n*(amax - amin).sum(axis=-1)
return diffs
def prepare_fullvec(m: int, n: int, dtype: np.dtype = np.float64, max_k: int = 6) -> tuple[
np.ndarray, # slide operator
np.ndarray, # cartesian
np.ndarray, # i
]:
from numpy import newaxis as na
iflat = np.arange(max_k - 1)
i = iflat[:,na,na] # formerly "n"
jk = np.arange(n)
j = jk[na,:,na]
k = jk[na,na,:]
slide_operator = (
(k < n - i) # contracting right columns
& (j - k >= 0) # zero upper triangle
& (j - k <= i) # expanding lower triangle
).astype(dtype)
slide_operator.flags.writeable = False
cidx = np.arange(m)
cartesian = np.stack(np.meshgrid(cidx, cidx))
cartesian.flags.writeable = False
iout = iflat + 1
iout.flags.writeable = False
return slide_operator, cartesian, iout
def func_fullvec(
arr: np.ndarray,
slide_operator: np.ndarray,
cartesian: np.ndarray,
i: np.ndarray,
) -> np.ndarray:
totals = (arr @ slide_operator)[:, cartesian]
mins = totals.min(axis=1)
maxs = totals.max(axis=1)
diff = (maxs - mins).sum(axis=-1)
return np.einsum('i,i...->...', i, diff)
def func_fullvec_dry(arr: np.ndarray, max_k: int = 6) -> np.ndarray:
return func_fullvec(
arr, *prepare_fullvec(*arr.shape, dtype=arr.dtype, max_k=max_k),
)
def op_test() -> None:
m = np.array((
(0, 0, 0, 0, 0, 10, 0, 0, 0, 50, 0, 0, 5, 0, 0, 10),
(0, 0, 5, 0, 0, 10, 0, 0, 0, 50, 0, 0, 10, 0, 0, 5),
(0, 0, 5, 1, 0, 20, 0, 0, 0, 30, 0, 1, 10, 0, 0, 5),
))
m.flags.writeable = False
expected = np.array((
( 0, 440, 1903),
( 440, 0, 1603),
(1903, 1603, 0),
))
out = func_semivec(m)
assert np.array_equal(out, expected)
def regression_test() -> None:
rand = np.random.default_rng(seed=0)
maxk = 6
for m in range(1, 10):
for n in range(maxk - 1, 10):
arr = rand.integers(low=1, high=10, size=(m, n))
slide, cartesian, i = prepare_fullvec(m, n, arr.dtype)
fullvec = functools.partial(func_fullvec, slide_operator=slide, cartesian=cartesian, i=i)
res_op, *others = (
method(arr)
for method in (func_op, func_semivec, fullvec, func_fullvec_dry)
)
for other in others:
assert np.array_equal(res_op, other)
def benchmark() -> plt.Figure:
rand = np.random.default_rng(seed=0)
def one_dimension(orient: typing.Literal[1, -1]) -> pd.DataFrame:
rows = []
short = 5
for long in np.logspace(start=0.7, stop=3, num=50, dtype=np.uint32):
m, n = (short, long)[::orient]
slide, cartesian, i = prepare_fullvec(m, n)
arr = rand.random(size=(m, n))
for name, method in (
('op', func_op),
('semivec', func_semivec),
('fulldry', func_fullvec_dry),
(
'fullprep',
functools.partial(func_fullvec, slide_operator=slide, cartesian=cartesian, i=i),
),
):
if (
(orient == 1 and name == 'fulldry' and long >= 375) or
(orient == -1 and name == 'op' and long >= 100)
):
continue
def run() -> np.ndarray:
return method(arr)
print(name, m, n)
t = timeit.timeit(run, number=1)
rows.append((long, name, t))
return pd.DataFrame(
data=rows, columns=('long_dim', 'name', 'dur'),
)
fig, (ax_top, ax_bottom) = plt.subplots(nrows=2)
for ax, orient in (
(ax_top, 1),
(ax_bottom, -1),
):
seaborn.lineplot(
data=one_dimension(orient),
x='long_dim', y='dur', hue='name', ax=ax,
)
ax.set_xscale('log')
ax.set_yscale('log')
ax.set_title('mn'[::orient])
return fig
if __name__ == '__main__':
op_test()
regression_test()
benchmark()
plt.show()
This shows a benchmark against growing matrices in both axes:

Those are for narrow matrices. For a square matrix, the benchmark looks mostly like the m >> n case:
