Skip to content

Commit 36dcc2d

Browse files
author
Luke Robison
committed
coll/han: Implement MPI_Alltoallv in han using SMSC and XPMEM
Extension of the previous han MPI_Alltoall algorithm, this change adds MPI_Alltoallv to han for a hierarchy-aware algorithm which uses XPMEM via the SMSC module in order to directly read data from ranks on the same host. The provides significant speed-up over the basic implementation when small messages are used, as many messages can be coalesced and packed into fewer sends. Introduces MCA parameters: - coll_han_alltoallv_smsc_avg_send_limit - coll_han_alltoallv_smsc_noncontig_limit Signed-off-by: Luke Robison <[email protected]>
1 parent c3bebd8 commit 36dcc2d

10 files changed

+1107
-3
lines changed

ompi/mca/coll/han/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ coll_han.h \
1616
coll_han_trigger.h \
1717
coll_han_algorithms.h \
1818
coll_han_alltoall.c \
19+
coll_han_alltoallv.c \
1920
coll_han_dynamic.h \
2021
coll_han_dynamic_file.h \
2122
coll_han_barrier.c \

ompi/mca/coll/han/coll_han.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ typedef struct mca_coll_han_op_module_name_t {
199199
mca_coll_han_op_up_low_module_name_t scatter;
200200
mca_coll_han_op_up_low_module_name_t scatterv;
201201
mca_coll_han_op_up_low_module_name_t alltoall;
202+
mca_coll_han_op_up_low_module_name_t alltoallv;
202203
} mca_coll_han_op_module_name_t;
203204

204205
/**
@@ -260,6 +261,11 @@ typedef struct mca_coll_han_component_t {
260261
/* alltoall: parallel stages */
261262
int32_t han_alltoall_pstages;
262263

264+
/* low level module for alltoallv */
265+
uint32_t han_alltoallv_low_module;
266+
int64_t han_alltoallv_smsc_avg_send_limit;
267+
double han_alltoallv_smsc_noncontig_activation_limit;
268+
263269

264270
/* name of the modules */
265271
mca_coll_han_op_module_name_t han_op_module_name;
@@ -286,6 +292,8 @@ typedef struct mca_coll_han_component_t {
286292

287293
/* Define maximum dynamic errors printed by rank 0 with a 0 verbosity level */
288294
int max_dynamic_errors;
295+
296+
opal_free_list_t pack_buffers;
289297
} mca_coll_han_component_t;
290298

291299
/*
@@ -297,6 +305,7 @@ typedef struct mca_coll_han_single_collective_fallback_s
297305
union
298306
{
299307
mca_coll_base_module_alltoall_fn_t alltoall;
308+
mca_coll_base_module_alltoallv_fn_t alltoallv;
300309
mca_coll_base_module_allgather_fn_t allgather;
301310
mca_coll_base_module_allgatherv_fn_t allgatherv;
302311
mca_coll_base_module_allreduce_fn_t allreduce;
@@ -319,6 +328,7 @@ typedef struct mca_coll_han_single_collective_fallback_s
319328
typedef struct mca_coll_han_collectives_fallback_s
320329
{
321330
mca_coll_han_single_collective_fallback_t alltoall;
331+
mca_coll_han_single_collective_fallback_t alltoallv;
322332
mca_coll_han_single_collective_fallback_t allgather;
323333
mca_coll_han_single_collective_fallback_t allgatherv;
324334
mca_coll_han_single_collective_fallback_t allreduce;
@@ -384,6 +394,9 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
384394
#define previous_alltoall fallback.alltoall.alltoall
385395
#define previous_alltoall_module fallback.alltoall.module
386396

397+
#define previous_alltoallv fallback.alltoallv.alltoallv
398+
#define previous_alltoallv_module fallback.alltoallv.module
399+
387400
#define previous_allgather fallback.allgather.allgather
388401
#define previous_allgather_module fallback.allgather.module
389402

@@ -440,6 +453,7 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
440453
HAN_UNINSTALL_COLL_API(COMM, HANM, allgather); \
441454
HAN_UNINSTALL_COLL_API(COMM, HANM, allgatherv); \
442455
HAN_UNINSTALL_COLL_API(COMM, HANM, alltoall); \
456+
HAN_UNINSTALL_COLL_API(COMM, HANM, alltoallv); \
443457
han_module->enabled = false; /* entire module set to pass-through from now on */ \
444458
} while(0)
445459

@@ -503,6 +517,9 @@ int
503517
mca_coll_han_alltoall_intra_dynamic(ALLTOALL_BASE_ARGS,
504518
mca_coll_base_module_t *module);
505519
int
520+
mca_coll_han_alltoallv_intra_dynamic(ALLTOALLV_BASE_ARGS,
521+
mca_coll_base_module_t *module);
522+
int
506523
mca_coll_han_allgather_intra_dynamic(ALLGATHER_BASE_ARGS,
507524
mca_coll_base_module_t *module);
508525
int
@@ -566,4 +583,7 @@ static inline struct mca_smsc_endpoint_t *mca_coll_han_get_smsc_endpoint (struct
566583
return (struct mca_smsc_endpoint_t *) proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_SMSC];
567584
}
568585

586+
#define COLL_HAN_PACKBUF_PAYLOAD_BYTES (128*1024)
587+
588+
569589
#endif /* MCA_COLL_HAN_EXPORT_H */

ompi/mca/coll/han/coll_han_algorithms.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ mca_coll_han_algorithm_value_t* mca_coll_han_available_algorithms[COLLCOUNT] =
8282
{"smsc", (fnptr_t)&mca_coll_han_alltoall_using_smsc}, // 2-level
8383
{ 0 }
8484
},
85+
[ALLTOALLV] = (mca_coll_han_algorithm_value_t[]){
86+
{"smsc", (fnptr_t)&mca_coll_han_alltoallv_using_smsc}, // 2-level
87+
{ 0 }
88+
},
8589
};
8690

8791
int

ompi/mca/coll/han/coll_han_algorithms.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,5 +214,10 @@ int
214214
mca_coll_han_alltoall_using_smsc(ALLTOALL_BASE_ARGS,
215215
mca_coll_base_module_t *module);
216216

217+
/* Alltoallv */
218+
int
219+
mca_coll_han_alltoallv_using_smsc(ALLTOALLV_BASE_ARGS,
220+
mca_coll_base_module_t *module);
221+
217222

218223
#endif

ompi/mca/coll/han/coll_han_alltoall.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@
3131
#include "opal/mca/rcache/rcache.h"
3232
#include "ompi/mca/osc/base/base.h"
3333

34-
34+
#if 1
35+
#define DEBUG_PRINT(...) do{ fprintf( stdout, __VA_ARGS__ ); } while( false )
36+
#else
37+
#define DEBUG_PRINT(...) do{ } while ( false )
38+
#endif
3539

3640
/* Who is the given ranks partner during the exchange?
3741
This function will require rounds comm_size-many rounds, and your partner
@@ -331,6 +335,22 @@ int mca_coll_han_alltoall_using_smsc(
331335
/* pack the data directly into local leader's sendbuf */
332336
packed_size_tmp = packed_size;
333337
rc = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size_tmp);
338+
339+
{
340+
char tmpstr[16*4+1];
341+
char leadstr[256];
342+
char *cur = tmpstr;
343+
if (ii_push_data) {
344+
sprintf(leadstr,"push: my([%d]) data to [%d] for remote [%d]",w_rank,jlow+up_rank*low_size,remote_wrank);
345+
} else {
346+
sprintf(leadstr,"pull: from [%d] to me([%d]) for remote [%d]",jlow+up_rank*low_size,w_rank,low_size*up_partner+low_rank);
347+
}
348+
for (int jbyte=0; jbyte<(int)packed_size_tmp && jbyte<16; jbyte++) {
349+
cur += sprintf(cur,"%3d ",((char*)(to_addr))[jbyte]);
350+
}
351+
DEBUG_PRINT("%s: %s\n",leadstr, tmpstr);
352+
}
353+
334354
opal_convertor_cleanup(&convertor);
335355

336356
if (1 != rc) {
@@ -348,6 +368,24 @@ int mca_coll_han_alltoall_using_smsc(
348368
low_comm->c_coll->coll_barrier(low_comm, low_comm->c_coll->coll_barrier_module);
349369
}
350370

371+
{
372+
char *sendbuftmp;
373+
char tmpstr[16*4+1];
374+
char leadstr[256];
375+
char *cur = tmpstr;
376+
if (use_isend) {
377+
sprintf(leadstr,"isend slot %d: from [%d] to [%d]",jfan_slot,w_rank,first_remote_wrank+low_rank);
378+
sendbuftmp = &send_bounce[send_bytes_per_fan*jfan_slot];
379+
} else {
380+
sprintf(leadstr,"send: from [%d] to [%d]",w_rank,first_remote_wrank+low_rank);
381+
sendbuftmp = send_bounce;
382+
}
383+
for (int jbyte=0; jbyte<(int)send_bytes_per_fan && jbyte<16; jbyte++) {
384+
cur += sprintf(cur,"%3d ",sendbuftmp[jbyte]);
385+
}
386+
DEBUG_PRINT("%s: %s\n",leadstr, tmpstr);
387+
}
388+
351389
if (use_isend == 0) {
352390
MCA_PML_CALL(send
353391
(send_bounce,

0 commit comments

Comments
 (0)