3
\$\begingroup\$

I am new to python, and even more new to vectorization. I have attempted to vectorize a custom similarity function that should return a matrix of pairwise similarities between each row in an input array.

IMPORTS:

import numpy as np
from itertools import product
from numpy.lib.stride_tricks import sliding_window_view

INPUT:

np.random.seed(11)

a = np.array([0, 0, 0, 0, 0, 10, 0, 0, 0, 50, 0, 0, 5, 0, 0, 10])
b = np.array([0, 0, 5, 0, 0, 10, 0, 0, 0, 50, 0, 0, 10, 0, 0, 5])
c = np.array([0, 0, 5, 1, 0, 20, 0, 0, 0, 30, 0, 1, 10, 0, 0, 5])

m = np.array((a,b,c))

OUTPUT:

custom_func(m)

array([[   0,  440, 1903],
       [ 440,    0, 1603],
       [1903, 1603,    0]])

FUNCTION:

def custom_func(arr):
    diffs = 0
    max_k = 6
    
    for n in range(1, max_k):

        arr1 = np.array([np.sum(i, axis = 1) for i in sliding_window_view(arr, window_shape = n, axis = 1)])
    
        # this function uses np.maximum and np.minimum to subtract the max and min elements (element-wise) between two rows and then sum up the entire of that subtraction
        diffs += np.sum((np.array([np.maximum(arr1[i[0]], arr1[i[1]]) for i in product(np.arange(len(arr1)), np.arange(len(arr1)))]) - np.array([np.minimum(arr1[i[0]], arr1[i[1]]) for i in product(np.arange(len(arr1)), np.arange(len(arr1)))])), axis = 1) * n
    
    diffs = diffs.reshape(len(arr), -1)
    
    return diffs

The function is quite simple, it sums up the element-wise differences between max and minimum of rows in N sliding windows. This function is much faster than what I was using before finding out about vectorization today (for loops and pandas dataframes yay).

My first thought is to figure out a way to find both the minimum and maximum of my arrays in a single pass since I currently THINK it has to do two passes, but I was unable to figure out how. Also there is a for loop in my current function because I need to do this for multiple N sliding windows, and I am not sure how to do this without the loop.

Any help is appreciated!

EDIT 1: There was a suggestion of removing the first list comprehension in my function with arr1 = sliding_window_view(arr, window_shape = n, axis = 1).sum(axis=2) but that actually slowed the function down significantly on larger datasets for some reason.

\$\endgroup\$

1 Answer 1

3
\$\begingroup\$

The comprehension statements are far too long and complicated and need to be broken up (but shouldn't exist at all).

The easy vectorisation pass involves replacing all of the comprehensions with vector calls. Also, don't start diffs at the scalar 0; it should start as a vector of the correct size. Don't use itertools.product; a Cartesian product is possible with a pure Numpy vector and can be used directly as an index.

A more complex vectorisation pass involves removing all loops, and making a linear operator. If you know the dimensions of the problem and can reuse those while repeatedly calling the function with different data, you can run a preparatory step where you calculate the linear operator.

Which method is fastest depends on the size of the input, but in no case is the original method the fastest.

import functools
import itertools
import timeit
import typing

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn
from numpy.lib.stride_tricks import sliding_window_view


def func_op(arr):
    diffs = 0
    max_k = 6

    for n in range(1, max_k):
        arr1 = np.array(
            [np.sum(i, axis=1) for i in sliding_window_view(arr, window_shape=n, axis=1)])

        # this function uses np.maximum and np.minimum to subtract the max and min elements (element-wise) between two rows and then sum up the entire of that subtraction
        diffs += np.sum((np.array([np.maximum(arr1[i[0]], arr1[i[1]]) for i in
                                   itertools.product(np.arange(len(arr1)), np.arange(len(arr1)))]) - np.array(
            [np.minimum(arr1[i[0]], arr1[i[1]]) for i in
             itertools.product(np.arange(len(arr1)), np.arange(len(arr1)))])), axis=1) * n

    diffs = diffs.reshape(len(arr), -1)

    return diffs


def func_semivec(arr: np.ndarray, max_k: int = 6) -> np.ndarray:
    m = arr.shape[0]
    idx = np.arange(m)
    products = np.stack(np.meshgrid(idx, idx))
    diffs = np.zeros(shape=(m, m), dtype=arr.dtype)

    for n in range(1, max_k):
        slid = sliding_window_view(arr, window_shape=n, axis=1)
        totals = slid.sum(axis=-1)
        arr1 = totals[products]
        amin = arr1.min(axis=0)
        amax = arr1.max(axis=0)
        diffs += n*(amax - amin).sum(axis=-1)

    return diffs


def prepare_fullvec(m: int, n: int, dtype: np.dtype = np.float64, max_k: int = 6) -> tuple[
    np.ndarray,  # slide operator
    np.ndarray,  # cartesian
    np.ndarray,  # i
]:
    from numpy import newaxis as na

    iflat = np.arange(max_k - 1)
    i = iflat[:,na,na]  # formerly "n"
    jk = np.arange(n)
    j = jk[na,:,na]
    k = jk[na,na,:]

    slide_operator = (
          (k < n - i)   # contracting right columns
        & (j - k >= 0)  # zero upper triangle
        & (j - k <= i)  # expanding lower triangle
    ).astype(dtype)
    slide_operator.flags.writeable = False

    cidx = np.arange(m)
    cartesian = np.stack(np.meshgrid(cidx, cidx))
    cartesian.flags.writeable = False

    iout = iflat + 1
    iout.flags.writeable = False

    return slide_operator, cartesian, iout


def func_fullvec(
    arr: np.ndarray,
    slide_operator: np.ndarray,
    cartesian: np.ndarray,
    i: np.ndarray,
) -> np.ndarray:
    totals = (arr @ slide_operator)[:, cartesian]
    mins = totals.min(axis=1)
    maxs = totals.max(axis=1)
    diff = (maxs - mins).sum(axis=-1)
    return np.einsum('i,i...->...', i, diff)


def func_fullvec_dry(arr: np.ndarray, max_k: int = 6) -> np.ndarray:
    return func_fullvec(
        arr, *prepare_fullvec(*arr.shape, dtype=arr.dtype, max_k=max_k),
    )


def op_test() -> None:
    m = np.array((
        (0, 0, 0, 0, 0, 10, 0, 0, 0, 50, 0, 0,  5, 0, 0, 10),
        (0, 0, 5, 0, 0, 10, 0, 0, 0, 50, 0, 0, 10, 0, 0,  5),
        (0, 0, 5, 1, 0, 20, 0, 0, 0, 30, 0, 1, 10, 0, 0,  5),
    ))
    m.flags.writeable = False
    expected = np.array((
        (   0,  440, 1903),
        ( 440,    0, 1603),
        (1903, 1603,    0),
    ))
    out = func_semivec(m)
    assert np.array_equal(out, expected)


def regression_test() -> None:
    rand = np.random.default_rng(seed=0)
    maxk = 6
    for m in range(1, 10):
        for n in range(maxk - 1, 10):
            arr = rand.integers(low=1, high=10, size=(m, n))

            slide, cartesian, i = prepare_fullvec(m, n, arr.dtype)
            fullvec = functools.partial(func_fullvec, slide_operator=slide, cartesian=cartesian, i=i)

            res_op, *others = (
                method(arr)
                for method in (func_op, func_semivec, fullvec, func_fullvec_dry)
            )
            for other in others:
                assert np.array_equal(res_op, other)


def benchmark() -> plt.Figure:
    rand = np.random.default_rng(seed=0)

    def one_dimension(orient: typing.Literal[1, -1]) -> pd.DataFrame:
        rows = []
        short = 5
        for long in np.logspace(start=0.7, stop=3, num=50, dtype=np.uint32):
            m, n = (short, long)[::orient]

            slide, cartesian, i = prepare_fullvec(m, n)

            arr = rand.random(size=(m, n))

            for name, method in (
                ('op', func_op),
                ('semivec', func_semivec),
                ('fulldry', func_fullvec_dry),
                (
                    'fullprep',
                    functools.partial(func_fullvec, slide_operator=slide, cartesian=cartesian, i=i),
                ),
            ):
                if (
                    (orient == 1 and name == 'fulldry' and long >= 375) or
                    (orient == -1 and name == 'op' and long >= 100)
                ):
                    continue
                def run() -> np.ndarray:
                    return method(arr)
                print(name, m, n)
                t = timeit.timeit(run, number=1)
                rows.append((long, name, t))
        return pd.DataFrame(
            data=rows, columns=('long_dim', 'name', 'dur'),
        )

    fig, (ax_top, ax_bottom) = plt.subplots(nrows=2)
    for ax, orient in (
        (ax_top, 1),
        (ax_bottom, -1),
    ):
        seaborn.lineplot(
            data=one_dimension(orient),
            x='long_dim', y='dur', hue='name', ax=ax,
        )
        ax.set_xscale('log')
        ax.set_yscale('log')
        ax.set_title('mn'[::orient])
    return fig


if __name__ == '__main__':
    op_test()
    regression_test()
    benchmark()
    plt.show()

This shows a benchmark against growing matrices in both axes:

narrow benchmark

Those are for narrow matrices. For a square matrix, the benchmark looks mostly like the m >> n case:

square benchmark

\$\endgroup\$

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.