2013-05-19 2 views
5

ich brauche die folgende Operation mit doppelter Genauigkeit zu tun:6 Element double precision Vektormatrix Vektor multiply in AVX

enter image description here

Die Zahlen stellen, wie die Werte im Speicher gespeichert sind. Ich möchte dies mit AVX implementieren. Wäre es am besten, wenn ich zuerst die Spalten von [QK] auf 8 Elemente aufgefüllt und dann eine Matrixvektormultiplikation mit [x] und [QK] gefolgt von einem Skalarprodukt durchgeführt hätte?

EDIT: Ok, also habe ich beschlossen, eine FLOAT zu implementieren32-Bit- Version mit gepolstertem Vektoren wie unten dargestellt:

// Perform matrix vector multiplication of QK*x    
// Load first four columns QK into 4 ymm registers  
ymm0 = _mm256_load_ps((float *)(QK));  
ymm1 = _mm256_load_ps((float *)(QK+8));  
ymm2 = _mm256_load_ps((float *)(QK+16));  
ymm3 = _mm256_load_ps((float *)(QK+24)); 

// Load first four values of x 
ymm4 = _mm256_broadcast_ss((float *)(x)); 
ymm5 = _mm256_broadcast_ss((float *)(x+1)); 
ymm6 = _mm256_broadcast_ss((float *)(x+2)); 
ymm7 = _mm256_broadcast_ss((float *)(x+3)); 

// Multiply in place - frees up ymm4,ymm5,ymm6,ymm7 
ymm0 = _mm256_mul_ps(ymm0, ymm4); 
ymm1 = _mm256_mul_ps(ymm1, ymm5); 
ymm2 = _mm256_mul_ps(ymm2, ymm6); 
ymm3 = _mm256_mul_ps(ymm3, ymm7); 

// Add together, this frees up ymm1,ymm2,and ymm3 
ymm0 = _mm256_add_ps(ymm0, ymm1); 
ymm2 = _mm256_add_ps(ymm2, ymm3); 
ymm0 = _mm256_add_ps(ymm0, ymm2); 

// Load next last columns of QK 
ymm1 = _mm256_load_ps((float *)(QK+32));  
ymm2 = _mm256_load_ps((float *)(QK+40)); 

// Load last two values of x 
ymm6 = _mm256_broadcast_ss((float *)(x+4)); 
ymm7 = _mm256_broadcast_ss((float *)(x+5)); 

// Multiply in place 
ymm1 = _mm256_mul_ps(ymm1, ymm6); 
ymm2 = _mm256_mul_ps(ymm2, ymm7); 

// Add together, this frees up every register except for ymm0 
ymm0 = _mm256_add_ps(ymm0, ymm1); 
ymm0 = _mm256_add_ps(ymm0, ymm2); 

// Answer stored in ymm0 and ymm1 
// Calculate dot product of y*(QK*x) 
// Load x 
ymm1 = _mm256_load_ps((float *)(y)); 

// Do dotproduct by using horizontal multiply followed by horizontal add 
// Multiply in place 
ymm0 = _mm256_mul_ps(ymm0, ymm1); 

// Do horizontal sum 
__m128 xmm1 = _mm256_extractf128_ps(ymm0, 1); 
__m128 xmm2 = _mm256_extractf128_ps(ymm0, 0); 
xmm2 = _mm_add_ps(xmm1, xmm2); 
xmm1 = _mm_movehl_ps(xmm1, xmm2); 
xmm2 = _mm_add_ps(xmm1, xmm2); 
xmm1 = _mm_shuffle_ps(xmm2, xmm2, 1); 
xmm2 = _mm_add_ss(xmm1, xmm2); 
ans[0] = _mm_cvtss_f32(xmm2); 

Wie es aussieht, läuft es etwa 3-mal schneller als:

ans[0] = (QK[0]*x[0]+QK[8]*x[1]+QK[16]*x[2]+QK[24]*x[3]+QK[32]*x[4]+QK[40]*x[5])*y[0]+ 
     (QK[1]*x[0]+QK[9]*x[1]+QK[17]*x[2]+QK[25]*x[3]+QK[33]*x[4]+QK[41]*x[5])*y[1]+ 
     (QK[2]*x[0]+QK[10]*x[1]+QK[18]*x[2]+QK[26]*x[3]+QK[34]*x[4]+QK[42]*x[5])*y[2]+ 
     (QK[3]*x[0]+QK[11]*x[1]+QK[19]*x[2]+QK[27]*x[3]+QK[35]*x[4]+QK[43]*x[5])*y[3]+ 
     (QK[4]*x[0]+QK[12]*x[1]+QK[20]*x[2]+QK[28]*x[3]+QK[36]*x[4]+QK[44]*x[5])*y[4]+ 
     (QK[5]*x[0]+QK[13]*x[1]+QK[21]*x[2]+QK[29]*x[3]+QK[37]*x[4]+QK[45]*x[5])*y[5]; 

Für 500 Mil-Iterationen dauert die Standard-C-Version etwa 9 Sekunden und die Single-Precision-AVX-Version etwa 3,5 Sekunden. Wenn ich die horizontale Summe am Ende ausdenke, läuft sie in etwa 0,5 Sekunden. Die horizontale Summe tötet wirklich die Leistung ...

+0

für Windows 7 können Sie 8 der ymm Register ---> 6 für Matrixelemente 1 für Vektor verwenden a 1 für Vektor b. Für Linux ist es 16 Ymm Register frei. –

+0

Hoppla, ich hätte angeben sollen, das sind doppelte Genauigkeit. Ich werde die Frage bearbeiten. – Justin

+0

@huseyintugrulbuyukisik Sie erhalten 8 Ymm Register auf x86, 16 Register auf x64. – dsharlet

Antwort

1

Ich schuf Code, um dieses effizient zu tun. Ich bekomme fast 4x-Geschwindigkeit (am besten können Sie mit Doppel auf AVX erwarten) mit AVX für einen einzelnen Thread. Hier sind die Ergebnisse, die über sechs- Komponenten-Vektoren mit 2000, 32000 und 4000000 x und y über eine 6x6-Matrix laufen. Diese entsprechen ungefähr der L2-, L3- und >> L3-Cachegröße auf meinem System (jeder Vektor verwendet 48 Bytes).

Edit: Ich habe am Ende Text (und Code) hinzugefügt, um dies mit Float zu tun. Die Beschleunigung ist fast 8x mit AVX auf einem einzelnen Thread.

Der grundlegende Algorithmus tut SIMD auf den X- und Y-Vektoren, NICHT auf der 6x6-Matrix. Ein wichtiger Punkt ist, dass die x- und y-Vektoren Arrays von Blöcken der Struktur von Arrays sein müssen. Dies wird auch als Array von Array-Strukturen (AoSoA) oder Hybrid-Arrays bezeichnet. Hier können Sie mehr darüber lesen "Erweitern einer C-ähnlichen Sprache für die portable SIMD-Programmierung" http://www.sci.utah.edu/~wald/Publications/2012///ppopp/download//vecimp.pdf

Unten ist der Code. Die Funktion aos2aosoa wandelt Arrays aus sechs Komponentenvektoren in Arrays von SoA um. Ihre Anwendung sollte die Vektoren in dieser Form generieren (nicht die Konvertierung), wenn Sie den vollen Nutzen von SIMD nutzen wollen. Dieser Code verwendet Agner Fogs Vektorklasse http://www.agner.org/optimize/#vectorclass. Es sind nur einige Header-Dateien. Dieser Code funktioniert auch mit SSE (aber der Boost ist nur etwa 2x wie erwartet).

Eine kleine Einschränkung, die Arrays von x und y Vektoren und Ergebnisse müssen Vielfache von 4 sein. Die Anzahl der Vektoren jedoch nicht.

Compile wie diese

g++ m6.cpp -o m6 -O3 -mavx -fopenmp //system with AVX 
g++ m6.cpp -o m6 -O3 -msse4.2 -fopenmp //system with SSE 

in Visual Studio Put/arch: AVX in den Compiler commmand Zeilenoptionen

Der Code:

#include <stdio.h> 
#include <omp.h> 
#include "vectorclass.h" 
#include <stdlib.h> 

double prod_scalar(double *x, double *M, double *y) { 
    double sum = 0.0f; 
    for(int i=0; i<6; i++) { 
     for(int j=0; j<6; j++) { 
      sum += x[i]*M[i*6 + j]*y[j]; 
     } 
    } 
    return sum; 
} 

void prod_block4(double *x, double *M, double *y, double *result) { 
    Vec4d sum4 = 0.0f; 
    for(int i=0; i<6; i++) { 
     Vec4d x4 = Vec4d().load(&x[4*i]); 
     for(int j=0; j<6; j++) { 
      Vec4d y4 = Vec4d().load(&y[4*j]); 
      sum4 += x4*M[i*6 + j]*y4; 
     } 
    } 
    sum4.store(result); 
} 

void prod_block4_unroll2(double *x, double *M, double *y, double *result) { 
    Vec4d sum4_1 = 0.0f; 
    Vec4d sum4_2 = 0.0f; 
    Vec4d yrow[6]; 
    for(int i=0; i<6; i++) { 
     yrow[i] = Vec4d().load(&y[4*i]); 
    } 
    for(int i=0; i<6; i++) { 
     Vec4d x4 = Vec4d().load(&x[4*i]); 
     for(int j=0; j<6; j+=2) { 
      sum4_1 += x4*M[i*6 + j]*yrow[j]; 
      sum4_2 += x4*M[i*6 + j+1]*yrow[j+1]; 
     } 
    } 
    sum4_1 += sum4_2; 
    sum4_1.store(result); 
} 
void loop_scalar(double *x, double *M, double *y, double *result, const int nvec) { 
// #pragma omp parallel for 
    for(int i=0; i<nvec; i++) { 
     result[i] = prod_scalar(&x[6*i], M, &y[6*i]); 
    } 
} 

void loop_block4(double *x, double *M, double *y, double *result, const int nvec) { 
// #pragma omp parallel for 
    for(int i=0; i<(nvec/4); i++) { 
//  prod_block4(&x[i*24], M, &y[i*24], &result[4*i]); 
     prod_block4_unroll2(&x[i*24], M, &y[i*24], &result[4*i]); 
    } 
} 


void aos2soa(double *in, double *out) { 
    int cnt = 0; 
    for(int i=0; i<6; i++) { 
     for(int j=0; j<4; j++) { 
      out[cnt++] = in[6*j + i]; 
     } 
    } 
} 

void aos2aosoa(double *in, double *out, const int nvec) { 
    for(int i=0; i<(nvec/4); i++) { 
     aos2soa(&in[i*24], &out[i*24]); 
    } 
} 

double compare_results(double *r1, double * r2, const int nvec) { 
    double out = 0.0f; 
    for(int i=0; i<nvec; i++) { 
     out += r1[i] -r2[i]; 
     //if(out!=0) printf("%f %f\n",r1[i], r2[i]); 
    } 
    return out; 
} 

void loop(const int nvec, const int repeat) { 
    double *M = (double*)_mm_malloc(6*6*sizeof(double),32); 
    double *x_aos = (double*)_mm_malloc(nvec*6*sizeof(double),32); 
    double *y_aos = (double*)_mm_malloc(nvec*6*sizeof(double),32); 
    double *x_aosoa = (double*)_mm_malloc(nvec*6*sizeof(double),32); 
    double *y_aosoa = (double*)_mm_malloc(nvec*6*sizeof(double),32); 
    double *results_scalar = (double*)_mm_malloc(nvec*sizeof(double),32); 
    double *results_vector = (double*)_mm_malloc(nvec*sizeof(double),32); 

    for(int i=0; i<6; i++) { 
     for(int j=0; j<6; j++) { 
      M[j*6 + i] = i*6 + j; 
     } 
    } 
    for(int i=0; i<(6*nvec); i++) { 
     double r1 = (double)rand()/RAND_MAX; 
     double r2 = (double)rand()/RAND_MAX; 
     //x_aos[i] = i; 
     x_aos[i] = r1; 
     //y_aos[i] = i; 
     y_aos[i] = r2; 

    } 

    aos2aosoa(x_aos, x_aosoa, nvec); 
    aos2aosoa(y_aos, y_aosoa, nvec); 

    double dtime; 
    dtime = omp_get_wtime(); 
    for(int i=0; i<repeat; i++) { 
     loop_scalar(x_aos, M, y_aos, results_scalar, nvec); 
    } 
    dtime = omp_get_wtime() - dtime; 
    printf("time scalar %f\n", dtime); 

    double dtime_old = dtime; 
    dtime = omp_get_wtime(); 
    for(int i=0; i<repeat; i++) { 
     loop_block4(x_aosoa, M, y_aosoa, results_vector, nvec); 
    } 
    dtime = omp_get_wtime() - dtime; 
    printf("time vector %f\n", dtime); 
    printf("time scalar/vector %f\n", dtime_old/dtime); 

    printf("difference %f\n", compare_results(results_scalar, results_vector, nvec)); 

    _mm_free(M); 
    _mm_free(x_aos); 
    _mm_free(y_aos); 
    _mm_free(x_aosoa); 
    _mm_free(y_aosoa); 
    _mm_free(results_scalar); 
    _mm_free(results_vector); 

} 

int main() { 
    int nveca[3]; 
    nveca[0] = 2000; // 2000*2*6*8 = 192kb //L2 
    nveca[1] = 32000; // 32000*2*6*8 = 3Mb //L3 
    nveca[2] = 4*1000000; //366Mb 
    int nrepeat[3]; 
    nrepeat[0] = 10000; 
    nrepeat[1] = 1000; 
    nrepeat[2] = 10; 
    for(int i=0; i<3; i++) { 
     printf("nvec %d, repeat %d\n", nveca[i], nrepeat[i]); 
     loop(nveca[i], nrepeat[i]); 
     printf("\n"); 
    } 
} 

Hier sind die Ergebnisse für Schwimmer.Der Boost ist etwa 8x

nvec 2000, repeat 10000, 1 thread: time scalar/SIMD 7.86 
nvec 32000, repeat 1000, 1 thread: time scalar/SIMD 7.63 
nvec 5000000, repeat 100, 1 thread: time scalar/SIMD 6.63 

Hier ist der Code für Schwimmer statt Doppel

#include <stdio.h> 
#include <omp.h> 
#include "vectorclass.h" 
#include <stdlib.h> 

#define ROUND_DOWN(x, s) ((x) & ~((s)-1)) 

float prod_scalar(float *x, float *M, float *y) { 
    float sum = 0.0f; 
    for(int i=0; i<6; i++) { 
     for(int j=0; j<6; j++) { 
      sum += x[i]*M[i*6 + j]*y[j]; 
     } 
    } 
    return sum; 
} 

float prod_scalar_unroll2(float *x, float *M, float *y) { 
    float sum_1 = 0.0f; 
    float sum_2 = 0.0f; 
    for(int i=0; i<6; i++) { 
     for(int j=0; j<6; j+=2) { 
      sum_1 += x[i]*M[i*6 + j]*y[j]; 
      sum_2 += x[i]*M[i*6 + j+1]*y[j+1]; 
     } 
    } 
    return sum_1 + sum_2; 
} 

void prod_block4(float *x, float *M, float *y, float *result) { 
    Vec8f sum4 = 0.0f; 
    for(int i=0; i<6; i++) { 
     Vec8f x4 = Vec8f().load(&x[8*i]); 
     for(int j=0; j<6; j++) { 
      Vec8f y4 = Vec8f().load(&y[8*j]); 
      sum4 += x4*M[i*6 + j]*y4; 
     } 
    } 
    sum4.store(result); 
} 

void prod_block4_unroll2(float *x, float *M, float *y, float *result) { 
    Vec8f sum4_1 = 0.0f; 
    Vec8f sum4_2 = 0.0f; 
    Vec8f yrow[6]; 
    for(int i=0; i<6; i++) { 
     yrow[i] = Vec8f().load(&y[8*i]); 
    } 
    for(int i=0; i<6; i++) { 
     Vec8f x4 = Vec8f().load(&x[8*i]); 
     for(int j=0; j<6; j+=2) { 
      sum4_1 += x4*M[i*6 + j]*yrow[j]; 
      sum4_2 += x4*M[i*6 + j+1]*yrow[j+1]; 
     } 
    } 
    sum4_1 += sum4_2; 
    sum4_1.store(result); 
} 

void loop_scalar(float *x, float *M, float *y, float *result, const int nvec) { 
// #pragma omp parallel for 
    for(int i=0; i<nvec; i++) { 
     result[i] = prod_scalar(&x[6*i], M, &y[6*i]); 
     //result[i] = prod_scalar_unroll2(&x[6*i], M, &y[6*i]); 
    } 
} 

void loop_SIMD(float *x, float *M, float *y, float *result, const int nvec) { 
    //#pragma omp parallel for schedule(static,256) 
    //#pragma omp parallel for schedule(static) 
    const int N = nvec/8; 
    //printf("chuck %d\n", N/2); 
    //omp_set_num_threads(2); 

    //#pragma omp parallel 
    { 
     //int nthreads = omp_get_num_threads(); 
     //int ithread = omp_get_thread_num(); 

     //int start = (ithread*N)/nthreads; 
     //int end = ((ithread+1)*N)/nthreads; 
     //printf("ithread, start %d, end %d, chunk %d\n",start, end, end-start); 
     //#pragma omp for 
     for(int i=0; i<(nvec/8); i++) { 
     //for(int i=start; i<end; i++) { 
    //  prod_block4(&x[i*24], M, &y[i*24], &result[4*i]); 
     prod_block4_unroll2(&x[i*48], M, &y[i*48], &result[8*i]); 
     } 
    } 
} 

void aos2soa(float *in, float *out) { 
    int cnt = 0; 
    for(int i=0; i<6; i++) { 
     for(int j=0; j<8; j++) { 
      out[cnt++] = in[6*j + i]; 
     } 
    } 
} 

void aos2aosoa(float *in, float *out, const int nvec) { 
    for(int i=0; i<(nvec/8); i++) { 
     aos2soa(&in[i*48], &out[i*48]); 
    } 
} 

float compare_results(float *r1, float * r2, const int nvec) { 
    float out = 0.0f; 
    for(int i=0; i<nvec; i++) { 
     out += r1[i] -r2[i]; 
     //if(out!=0) printf("%f %f\n",r1[i], r2[i]); 
    } 
    return out; 
} 

void loop(const int nvec, const int repeat) { 
    float *M = (float*)_mm_malloc(6*6*sizeof(float),64); 
    float *x_aos = (float*)_mm_malloc(nvec*6*sizeof(float),64); 
    float *y_aos = (float*)_mm_malloc(nvec*6*sizeof(float),64); 
    float *x_aosoa = (float*)_mm_malloc(nvec*6*sizeof(float),64); 
    float *y_aosoa = (float*)_mm_malloc(nvec*6*sizeof(float),64); 
    float *results_scalar = (float*)_mm_malloc(nvec*sizeof(float),64); 
    float *results_SIMD = (float*)_mm_malloc(nvec*sizeof(float),64); 

    for(int i=0; i<6; i++) { 
     for(int j=0; j<6; j++) { 
      M[j*6 + i] = i*6 + j; 
     } 
    } 
    for(int i=0; i<(6*nvec); i++) { 
     float r1 = (float)rand()/RAND_MAX; 
     float r2 = (float)rand()/RAND_MAX; 
     //x_aos[i] = i; 
     x_aos[i] = r1; 
     //y_aos[i] = i; 
     y_aos[i] = r2; 

    } 

    aos2aosoa(x_aos, x_aosoa, nvec); 
    aos2aosoa(y_aos, y_aosoa, nvec); 

    float dtime; 
    dtime = omp_get_wtime(); 
    for(int i=0; i<repeat; i++) { 
     loop_scalar(x_aos, M, y_aos, results_scalar, nvec); 
    } 
    dtime = omp_get_wtime() - dtime; 
    printf("time scalar %f\n", dtime); 

    float dtime_old = dtime; 
    dtime = omp_get_wtime(); 
    for(int i=0; i<repeat; i++) { 
     loop_SIMD(x_aosoa, M, y_aosoa, results_SIMD, nvec); 
    } 
    dtime = omp_get_wtime() - dtime; 
    printf("time SIMD %f\n", dtime); 
    printf("time scalar/SIMD %f\n", dtime_old/dtime); 

    printf("difference %f\n", compare_results(results_scalar, results_SIMD, nvec)); 

    _mm_free(M); 
    _mm_free(x_aos); 
    _mm_free(y_aos); 
    _mm_free(x_aosoa); 
    _mm_free(y_aosoa); 
    _mm_free(results_scalar); 
    _mm_free(results_SIMD); 

} 

int main() { 
    int nveca[3]; 
    nveca[0] = 2000; // 2000*2*6*8 = 192kb //L2 
    nveca[1] = 32000; // 32000*2*6*8 = 3Mb //L3 
    nveca[2] = 5*1000000; //366Mb 
    int nrepeat[3]; 
    nrepeat[0] = 10000; 
    nrepeat[1] = 1000; 
    nrepeat[2] = 100; 
    for(int i=0; i<3; i++) { 
     printf("nvec %d, repeat %d\n", nveca[i], nrepeat[i]); 
     loop(nveca[i], nrepeat[i]); 
     printf("\n"); 
    } 
} 
+0

Interessanter Ansatz. Aber es gibt immer noch Probleme wie Sie erwähnt haben (nur die diagonalen Komponenten der resultierenden Matrix benötigen). Denken Sie auch, es wird immer noch das Problem der Ausrichtung beim Laden der 6x6 QK-Matrix geben ... Ich werde weiter darauf eingehen. Ich poste, was ich gestern an die Frage angehängt habe, um die Frage heute Nacht – Justin

+0

Ich weiß, um jetzt nur die diagonalen Komponenten zu bekommen. Ich werde versuchen, die Lösung bald oder morgen zu veröffentlichen. –

+0

Okay, ich habe meine Antwort bearbeitet, um den grundlegenden Algorithmus zu zeigen. Ich kann versuchen, einen Beispielcode zu posten und den Teil über die Anordnung des Speichers (falls es nicht klar ist) in den nächsten Tagen aufzuräumen. –