Sunday, March 10, 2013

Efficient Processing of Arrays using SSE/SIMD and C++ Functors

This post is about how to write beautiful SIMD/SSE code that is easy to debug and maintain, yet allows to produce optimal code with zero overhead. It builds upon existing posts such as the Template based vector library and How to unroll a loop in C++. Here we present a pattern that is fully based on functors and similar to array processing in the Standard Template Library (STL).
Given one or multiple array of values (such as float, double, int, etc.), we want to process the array element wise. For example, we want to scale and add the value of an input array and add that to the output array. This is exactly the behavior of the ?AXPY method of the BLAS linear algebra library:
Y = A*X + Y
Note that this special function is just a running example (we will use the single precision datatype float, thus our function of interest is called saxpy). The underlying concept is true for any element-wise array function.
In plain C++, we would write saxpy like this:
void saxpy(float* y, float* x, size_t length, float a)
{
  for (int n = 0; n < length; n++)
    y[n] += a * x[n];
}
The STL concept of functors allows separate the loop kernel (actual computation) from the loop itself:
struct saxpy_kernel
{
public:
  saxpy_kernel(float a) : a(a) {}
  inline void operator()(float* y, float* x) const
  {
    (*y) += a * (*x);
  }
protected:
  float a;
};
// ....
void saxpy(float* y, float* x, size_t length, float a)
{
  saxpy_kernel op(a);
  for (int n = 0; n < length; n++) op(y + n, x + n);
}
So far, there is no advantage in the above code. It requires the programmer to write more code. At least, due to inlining there is no performance penalty after compilation. The real benefit becomes apparent when we replace our saxpy function by a generic, templated loop function. We'll call it transform_n and it will behave similar to the std::transform function from the STL library.
template<class OutputPointer, class InputPointer, class Operation>
void transform_n(OutputPointer dst, InputPointer src, size_t length, Operation op)
{
  for (int n = 0; n < length; n++) op(dst + n, src + n);
}
We can call transform_n using any kernel that has one output and one input parameter. It also works with arbitrary pointers. In our case, we will call it like this:
float* my_x = /* data of X */ ;
float* my_y = /* data of Y */ ;
transform_n(my_y, my_x, num_elements, saxpy_kernel(a));
Finally, we will introduce SSE code to boost performance. We will do so without modifying the call to transform_n, so the programmer calling this function will not even notice any difference between SSE and non-SSE code. This is also a big advantage when debugging the SSE code, since it allows to compare the output of SSE and non-SSE code easily.
The main idea of using SIMD instructions is that we can process multiple array elements at the same time. From a loop's point of view, this is very similar to unrolling loops. We add a new method to our kernel that processes multiple array elements at the same time. Let's call this method block:
struct saxpy_kernel
{
public:
  enum { BLOCK_SIZE = 4 }; // this defines how many elements of the array are processed by a single call to block()
  saxpy_kernel(float a) : a(a) {}
  inline void operator()(float* y, const float* x) const {
    (*y) += a * (*x); 
  }
  inline void block(float* y, const float* x) const {
    // load 4 data elements at a time
    __m128 X = _mm_loadu_ps(x);
    __m128 Y = _mm_loadu_ps(y);
    // do the computations
    __m128 result = _mm_add_ps(Y, _mm_mul_ps(X, _mm_set1_ps(a)));
    // store the results
    _mm_storeu_ps(y, result);
  }
protected:
  float a;
};
Note that in addition to the new method block, which does the computations using SSE intrinsics, we added a constant BLOCK_SIZE which we can later use to determine the number of elements processed by each call of block(). In addition, this code does not assume aligned memory access. If you can assure that your memory is properly aligned, you can use aligned load and store instructions.
So far, a call to transform_n does not execute any SSE code. Therefore, we need to call block() for every valid block of subsequent array elements. Similar to loop unrolling, we will have two loops: the first loop is processing blocks of BLOCK_SIZE elements while the latter one handles remaining array elements one by one:
#define ROUND_DOWN(x, s) ((x) & ~((s)-1)) // rounds down x to a multiple of s (i.e. ROUND_DOWN(5, 4) becomes 4)
template<class OutputPointer, class InputPointer, class Operation>
void transform_n(OutputPointer dst, InputPointer src, size_t length, Operation op)
{
  size_t n;
  // execute op on array elements block-wise
  for(n = 0; n < ROUND_DOWN(length, op.BLOCK_SIZE); n += op.BLOCK_SIZE) 
    op.block(dst + n, src + n);
  // execute the remaining array elements one by one
  for(; n < length; n++) op(dst + n, src + n);
}
Since a call to block() will be inlined by the compiler, there is no overhead in calling. In fact, a smart compiler will even move the set-instruction _mm_set1_ps(a) outside the loop.
There remains one additional optimization that can boost execution performance: hiding load latencies by interleaving multiple loop bodies. It is a good practice to not access an SSE register immediately after loading from memory since loading operations take more than one CPU cycle. Instead of waiting for the load operation to finish for a set of 4 array elements, we fill in the load operation for the next 4 array elements. In practice, we simply copy the kernel code and interleave both copies such that all load operations are at the beginning of the kernel, followed by computations and stores:
struct saxpy_kernel
{
public:
  enum { BLOCK_SIZE = 8 }; // note that instead of 4, we now process 8 elements
  saxpy_kernel(float a) : a(a) {}
  inline void operator()(float* y, const float* x) const {
    (*y) += a * (*x); 
  }
  inline void block(float* y, const float* x) const {
    // load 8 data elements at a time
    __m128 X1 = _mm_loadu_ps(x + 0);
    __m128 X2 = _mm_loadu_ps(x + 4);
    __m128 Y1 = _mm_loadu_ps(y + 0);
    __m128 Y2 = _mm_loadu_ps(y + 4);
    // do the computations
    __m128 result0 = _mm_add_ps(Y1, _mm_mul_ps(X1, _mm_set1_ps(a)));
    __m128 result1 = _mm_add_ps(Y2, _mm_mul_ps(X2, _mm_set1_ps(a)));
    // store the results
    _mm_storeu_ps(y + 0, result1);
    _mm_storeu_ps(y + 4, result2);
  }
protected:
  float a;
};
The two differences are the modified block() method as well as the adapted BLOCK_SIZE, which now tells the loop that one call of block() will process 8 float elements. The function transform_n does not need to be changed at all.
A performance benchmark of the above code gives the following results for arrays larger than the CPU cache size (ca. 4M floats)): The pure C++ code processing one element at each loop iteration is the baseline with 100% of the runtime. The 4-element SSE kernel is 3.6% faster and the 8-element kernel runs about 5% faster than the C++ version. The main performance bottleneck is memory bandwidth. When the input arrays are small enough to fit in cache (4K float elements), the results are as follows: 4-element SSE +300% and 8-element +312%.
In summary, this article showed how to split a loop into loop management and computation code. Then, we added SSE intrinsics to process multiple elements of the array at the same time. Last, we further unrolled the loop kernel to hide load latencies. This pattern is valid for a variety of loop kernels and can be extended to accumulation (i.e. sum, dot product) and multiple inputs.

4 comments:

  1. hm ... messing around with this and i'm quite impressed on VS2012 I'm finding that some of my SSE code that I was fighting the compiler on suddenly compiles much more efficiently when done using this method.

    Also been programming a fluid simulator and extracting the boundary loops from the actual calculation makes the code look much much cleaner.

    ReplyDelete
  2. Here's a modified version with a slightly more general version of transform_n. It only passes the iteration index to the kernel functions, so it will work with a kernel that takes any number of inputs and outputs.

    Note that I replaced <> with [] because the former were being removed from the comment.


    class saxpy_kernel
    {
    public:
    enum {BLOCK_SIZE = 4};

    saxpy_kernel(float aa, float *xx, float *yy)
    {
    a = aa;
    a_v = _mm_set1_ps(aa);
    x = xx;
    y = yy;
    }

    inline void scalar_op (size_t i) const
    {
    y[i] += a * x[i];
    }

    inline void vector_op(size_t i) const
    {
    __m128 X = _mm_loadu_ps(x + i);
    __m128 Y = _mm_loadu_ps(y + i);
    __m128 result = _mm_add_ps(Y, _mm_mul_ps(X, a_v));
    _mm_storeu_ps(y + i, result);
    }

    protected:
    float a;
    __m128 a_v;
    float *x, *y;
    };


    #define ROUND_DOWN(x, s) ((x) & ~((s)-1))

    template[class Operation]
    inline static void
    transform_n(size_t length, Operation op)
    {
    size_t n;
    // execute op on array elements block-wise
    for(n = 0; n < ROUND_DOWN(length, op.BLOCK_SIZE); n += op.BLOCK_SIZE)
    op.vector_op(n);
    // execute the remaining array elements one by one
    for(; n < length; n++)
    op.scalar_op(n);
    }

    // use:
    // transform_n[saxpy_kernel](length, saxpy_kernel(a, x, y))

    ReplyDelete
  3. Hey,
    By looking at the following article http://fastcpp.blogspot.fr/2011/04/how-to-process-stl-vector-using-sse.html first, and then this one, I'm a bit confusing about the last part (hiding latency):
    What is best between 1) or 2) ?

    1) load X1 load Y1 calculate result0 and store then load X2 load Y2 calculate result1 and store
    2) load X1 X2 Y1 Y2 calculate result0 and result1 and store

    Thanks for this blog
    Vincent

    ReplyDelete
  4. Hey,
    By looking at the following article http://fastcpp.blogspot.fr/2011/04/how-to-process-stl-vector-using-sse.html first, and then this one, I'm a bit confusing about the last part (hiding latency):
    What is best between 1) or 2) ?

    1) load X1 load Y1 calculate result0 and store then load X2 load Y2 calculate result1 and store
    2) load X1 X2 Y1 Y2 calculate result0 and result1 and store

    Thanks for this blog
    Vincent

    ReplyDelete