From a4a6ab11c2a41244de79ee240a34e20dc4918585 Mon Sep 17 00:00:00 2001 From: Luke Robison Date: Mon, 9 Sep 2024 18:54:20 +0000 Subject: [PATCH 1/2] coll/han: alltoallv bugfixes Fix several bugs in the original implementation: - Fallback if transform is in-place - Fix bug related to empty messages - Introduce barrier before exit during error - Handle datatypes with negative lower bounds - Fix a completion-ordering bug - Cleanup some unused variables Signed-off-by: Luke Robison --- ompi/mca/coll/han/coll_han_alltoallv.c | 388 ++++++++++++++++--------- 1 file changed, 252 insertions(+), 136 deletions(-) diff --git a/ompi/mca/coll/han/coll_han_alltoallv.c b/ompi/mca/coll/han/coll_han_alltoallv.c index 564cbb16b06..b9a37460b3a 100644 --- a/ompi/mca/coll/han/coll_han_alltoallv.c +++ b/ompi/mca/coll/han/coll_han_alltoallv.c @@ -58,12 +58,10 @@ static inline int ring_partner(int rank, int round, int comm_size) { } struct peer_data { - const void *sbuf; // mmapped: buf1 - void *rbuf; // mmapped: buf2 - struct peer_counts *counts; // mmapped: buf3 - opal_datatype_t *sendtype; // deserialized from buf3, local copy - opal_datatype_t *recvtype; // deserialized from buf3, local copy - void *map_ctx[3]; + const void *sbuf; /* mmapped: ctx in map_ctx[1] */ + struct peer_counts *counts; /* mmapped: ctx in map_ctx[0] */ + opal_datatype_t *sendtype; /* local. deserialized from tailer of map_ctx[0] */ + void *map_ctx[2]; }; struct peer_counts { @@ -75,14 +73,54 @@ struct peer_counts { struct gathered_data { size_t stype_serialized_length; - size_t rtype_serialized_length; - size_t sbuf_length; - size_t rbuf_length; + ssize_t sbuf_span; + ssize_t sbuf_gap; void *sbuf; - void *rbuf; void *serialization_buffer; }; +/** calculate the minumum true lower bound and maximum true upper bound for all + * peers accessing a buffer. + * Outputs: min_lb and max_lb contain signed offsets in bytes from buffer pointer. + */ +static void coll_han_alltoallv_calc_all_span( + int nranks, + opal_datatype_t *dtype, + ompi_count_array_t counts, + ompi_disp_array_t displs, + ssize_t *gap, + ssize_t *span ) { + ptrdiff_t min_lb = PTRDIFF_MAX; + ptrdiff_t max_ub = PTRDIFF_MIN; + *gap = 0; + *span = 0; + + ptrdiff_t dummy_lb; + ptrdiff_t dtype_extent; + opal_datatype_get_extent(dtype, &dummy_lb, &dtype_extent); + + for (int jrankw=0; jrankw 0) { + ptrdiff_t one_gap; + ptrdiff_t one_span = opal_datatype_span(dtype, count, &one_gap); + lb = dtype_extent*displ + one_gap; + ub = lb + one_span; + min_lb = MIN( min_lb, lb ); + max_ub = MAX( max_ub, ub ); + } + } + if (max_ub > min_lb) { + *gap = min_lb; + *span = max_ub - min_lb; + } +} + /* Serialize the datatype into the buffer and return buffer length. If buf is NULL, just return the length of the required buffer. */ static size_t ddt_pack_datatype(opal_datatype_t* type, uint8_t* buf) @@ -142,7 +180,13 @@ static size_t ddt_unpack_datatype(opal_datatype_t* type, uint8_t* buf) return length; } -/* basic implementation: send all buffers without packing keeping a limited number in flight. */ +/* Simple implementation: send all buffers without packing, but still keeping a + limited number in flight. + + Note: CMA on XPMEM-mapped buffers does not work. If the low-level network + provider attempts to use CMA to implement send/recv, then errors will + occur! +*/ static inline int alltoallv_sendrecv_w_direct_for_debugging( void **send_from_addrs, size_t *send_counts, @@ -160,11 +204,12 @@ static inline int alltoallv_sendrecv_w_direct_for_debugging( const int MAX_BUF_COUNT=8; int nreqs = MAX_BUF_COUNT; ompi_request_t *requests[MAX_BUF_COUNT]; + const char* problem_hint; int jfirst_sendreq = nreqs/2 + nreqs%2; int jreq; - ompi_datatype_t *yuck_ompi_dtype_from_opal; + ompi_datatype_t *ompi_dtype_from_opal; int rc = 0; int jloop; @@ -182,9 +227,17 @@ static inline int alltoallv_sendrecv_w_direct_for_debugging( if (jloop < nreqs){ jreq = jloop; have_completion = 0; + requests[jreq] = MPI_REQUEST_NULL; } else { have_completion = 1; - ompi_request_wait_any( nreqs, requests, &jreq, MPI_STATUS_IGNORE ); + jreq = jloop%nreqs; + if (requests[jreq] == MPI_REQUEST_NULL) { + continue; + } + problem_hint = "waiting for request completion"; + rc = ompi_request_wait(&requests[jreq], MPI_STATUS_IGNORE); + if (rc) break; + requests[jreq] = MPI_REQUEST_NULL; } int ii_send_req = jreq >= jfirst_sendreq; if (have_completion) { @@ -194,30 +247,46 @@ static inline int alltoallv_sendrecv_w_direct_for_debugging( jrecvs_completed++; } - requests[jreq] = &ompi_request_null.request; if (ii_send_req && jsends_posted < ntypes_send) { - rc = ompi_datatype_create_contiguous( 1, (ompi_datatype_t*)send_types[jsends_posted], &yuck_ompi_dtype_from_opal ); - ompi_datatype_commit(&yuck_ompi_dtype_from_opal); - MCA_PML_CALL(isend - (send_from_addrs[jsends_posted], (int)send_counts[jsends_posted], yuck_ompi_dtype_from_opal, jrank_sendto, + problem_hint = "creating or committing temporary datatype"; + rc = ompi_datatype_create_contiguous( 1, (ompi_datatype_t*)send_types[jsends_posted], &ompi_dtype_from_opal ); + if (rc) break; + rc = ompi_datatype_commit(&ompi_dtype_from_opal); + if (rc) break; + problem_hint = "posting isend"; + rc = MCA_PML_CALL(isend + (send_from_addrs[jsends_posted], (int)send_counts[jsends_posted], ompi_dtype_from_opal, jrank_sendto, MCA_COLL_BASE_TAG_ALLTOALLV, MCA_PML_BASE_SEND_STANDARD, comm, &requests[jreq])); - ompi_datatype_destroy( &yuck_ompi_dtype_from_opal ); + if (rc) break; + problem_hint = "destroying temporary datatype"; + rc = ompi_datatype_destroy( &ompi_dtype_from_opal ); + if (rc) break; + jsends_posted++; } if (!ii_send_req && jrecvs_posted < ntypes_recv ) { - rc = ompi_datatype_create_contiguous( 1, (ompi_datatype_t*)recv_types[jrecvs_posted], &yuck_ompi_dtype_from_opal ); - ompi_datatype_commit(&yuck_ompi_dtype_from_opal); - MCA_PML_CALL(irecv - (recv_to_addrs[jrecvs_posted], (int)recv_counts[jrecvs_posted], yuck_ompi_dtype_from_opal, jrank_recvfrom, + problem_hint = "creating or committing temporary datatype"; + rc = ompi_datatype_create_contiguous( 1, (ompi_datatype_t*)recv_types[jrecvs_posted], &ompi_dtype_from_opal ); + if (rc) break; + rc = ompi_datatype_commit(&ompi_dtype_from_opal); + if (rc) break; + problem_hint = "posting irecv"; + rc = MCA_PML_CALL(irecv + (recv_to_addrs[jrecvs_posted], (int)recv_counts[jrecvs_posted], ompi_dtype_from_opal, jrank_recvfrom, MCA_COLL_BASE_TAG_ALLTOALLV, comm, &requests[jreq])); - ompi_datatype_destroy( &yuck_ompi_dtype_from_opal ); + if (rc) break; + problem_hint = "destroying temporary datatype"; + rc = ompi_datatype_destroy( &ompi_dtype_from_opal ); + if (rc) break; jrecvs_posted++; } - - - if (rc) { break; }; + } + if (rc) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "Failed in alltoallv_sendrecv_w_direct_for_debugging while %s: jloop=%d, rc=%d\n", + problem_hint, jloop,rc); } return rc; } @@ -250,16 +319,20 @@ static int alltoallv_sendrecv_w( buf_items[jbuf] = opal_free_list_get(&mca_coll_han_component.pack_buffers); if (buf_items[jbuf] == NULL) { nbufs = jbuf - 1; - printf("Uh-oh, not enough buffers: %d\n",nbufs); + opal_output_verbose(20, mca_coll_han_component.han_output, + "Uh-oh, not enough buffers: %d\n",nbufs); break; } } + if (nbufs < 2) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "ERROR: Need at least 2 buffers from mca_coll_han_component.pack_buffers!"); + return MPI_ERR_NO_MEM; + } size_t nreqs = nbufs; int jreq; int jfirst_sendreq = nbufs/2 + nbufs%2; - size_t recv_post_remaining_bytes; - int rc; size_t jloop = 0; size_t send_pack_bytes_remaining = 0; @@ -269,6 +342,7 @@ static int alltoallv_sendrecv_w( int jtype_recv; size_t nbytes_pack; + const int buf_header_len = sizeof(int); int nsend_req_pending = 0; opal_convertor_t send_convertor; opal_convertor_t recv_convertor; @@ -281,13 +355,15 @@ static int alltoallv_sendrecv_w( there is no more data coming, we could cancel the recvs we posted 2) We could make a first-pass and count total bytes to recv, and be careful not to post more than we know is coming. - We take path 2 here, to avoid possible race conditions between the - cancel and the possibility of a matching recv. + + After realizing that the convertor may decide to not fully pack the + buffer, path 1 was the only clear way forward (since it was no longer + clear how many bytes would be arriving.) Note this is not necessary for Sends, because our convertor leads posting the sends rather than trails it. */ - recv_post_remaining_bytes = 0; + int ii_more_recvs_to_post = 0; for (jtype_recv=0; jtype_recv 0; int ii_more_sends_to_complete = nsend_req_pending > 0; - int ii_more_recvs_to_post = recv_post_remaining_bytes > 0; - int ii_more_recvs_to_complete = recv_convertor_bytes_remaining > 0 || jtype_recv < ntypes_recv; - - if ( !( ii_more_sends_to_post || ii_more_sends_to_complete || - ii_more_recvs_to_post || ii_more_recvs_to_complete) ) { + ii_more_recvs_to_post ) ) { /* exit. All done! */ break; } if (jloop >= nreqs) { /* Common Case: */ - /* wait for any send or recv to complete */ - rc = ompi_request_wait_any(nreqs, requests, &jreq, MPI_STATUS_IGNORE); - if (rc != 0 || jreq == MPI_UNDEFINED) { - return 1; + /* wait for the send or recv to complete */ + jreq = jloop%nreqs; + if (requests[jreq] == MPI_REQUEST_NULL) { + if (++sequential_continues > nbufs) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "ERROR: no active requests to wait on! Loop=%ld: %d %d %d\n", + jloop, + ii_more_sends_to_post, ii_more_sends_to_complete, + ii_more_recvs_to_post ); + return MPI_ERR_INTERN; + } + continue; } + sequential_continues = 0; + ompi_request_wait( &requests[jreq], MPI_STATUS_IGNORE ); have_completion = 1; + requests[jreq] = MPI_REQUEST_NULL; } else { /* priming the loop: post sends or recvs while have_completion=0. @@ -358,7 +445,7 @@ static int alltoallv_sendrecv_w( nsend_req_pending--; } - ssize_t buf_remain = buf_len; + int buf_remain = buf_len - buf_header_len; while (buf_remain > 0 && (jtype_send < ntypes_send || send_pack_bytes_remaining > 0) ) { if (jtype_send < ntypes_send && send_pack_bytes_remaining==0) { /* switch to next datatype and prepare convertor: */ @@ -383,15 +470,25 @@ static int alltoallv_sendrecv_w( iov.iov_len = buf_remain; iov_count = 1; opal_convertor_pack(&send_convertor, &iov, &iov_count, &nbytes_pack); + if (!nbytes_pack) { + /* can happen when buffer is only a few bytes from full, + and convertor doesn't want to split a primitive data + type. */ + break; + } buf_remain -= nbytes_pack; send_pack_bytes_remaining -= nbytes_pack; } } + /* start the buffer with an integer describing how many bytes we + packed, *including* the integer's size */ + int pack_bytes_len = buf_len - buf_remain; /* post send */ - if (buf_len - buf_remain > 0) { + if (pack_bytes_len > buf_header_len) { + *((int*)req_buf) = pack_bytes_len; MCA_PML_CALL(isend - (req_buf, (buf_len-buf_remain), MPI_PACKED, jrank_sendto, + (req_buf, pack_bytes_len, MPI_PACKED, jrank_sendto, MCA_COLL_BASE_TAG_ALLTOALLV, MCA_PML_BASE_SEND_STANDARD, comm, &requests[jreq])); nsend_req_pending++; @@ -402,8 +499,8 @@ static int alltoallv_sendrecv_w( } else { /* recv request */ if (have_completion) { /* unpack data */ - ssize_t buf_remain = buf_len; - size_t buf_converted = 0; + ssize_t buf_remain = *((int*)req_buf) - buf_header_len; + size_t buf_converted = buf_header_len; while (true) { if (jtype_recv < ntypes_recv && recv_convertor_bytes_remaining==0) { /* switch to next datatype and prepare convertor: */ @@ -416,11 +513,21 @@ static int alltoallv_sendrecv_w( 0, &recv_convertor); opal_convertor_get_packed_size( &recv_convertor, &recv_convertor_bytes_remaining ); + if (recv_convertor_bytes_remaining == 0) { + continue; + } } } if (jtype_recv == ntypes_recv && recv_convertor_bytes_remaining==0 ) { /* _all_ recving work is done! */ + ii_more_recvs_to_post = false; buf_remain = 0; + for (int kreq=0; kreq 0) { + if (ii_more_recvs_to_post) { /* post a new recv */ MCA_PML_CALL(irecv - (req_buf, bytes_to_post, MPI_PACKED, jrank_recvfrom, + (req_buf, buf_len, MPI_PACKED, jrank_recvfrom, MCA_COLL_BASE_TAG_ALLTOALLV, comm, &requests[jreq])); - - /* update posted_recv_bytes */ - recv_post_remaining_bytes -= bytes_to_post; } else { requests[jreq] = MPI_REQUEST_NULL; } @@ -489,14 +592,20 @@ static int decide_to_use_smsc_alg( /* Perform an allreduce on all ranks to decide if this algorithm is worth - using. There are three important things: - - 1. Device buffers. XPMEM doesn't support GPU buffers, so we cannot proceed + using. There are four important things: + + 1. sbuf == MPI_IN_PLACE. This algorithm does not currently support the + in-place operation. (Future developers may note that the inter-node + communications are ordered such that in-place could be supported, but + additional ordering and/or buffering would be required to ensure local + ranks do not overwrite buffers before sends are posted. However, for now + we just bail out.) + 2. Device buffers. XPMEM doesn't support GPU buffers, so we cannot proceed with this algorithm. - 2. Send size per rank. This algorithm can pack small messages together, + 3. Send size per rank. This algorithm can pack small messages together, but this typically isn't helpful for large messages, and XPMEM-mapped memory cannot be registered with ibv_reg_mr. - 3. Contiguous buffers. The exception to #2 above is if we can't post our + 4. Contiguous buffers. The exception to #2 above is if we can't post our sends/recvs in one large block to the NIC. For these non-contiguous datatypes, our packing algorithm is better because (a) we re-use our buffers from a free-list which can remain registered to the NIC, and (b) @@ -509,6 +618,15 @@ static int decide_to_use_smsc_alg( our execution time, which is <1/10 of the "basic" algorithm. */ + if (sbuf == MPI_IN_PLACE) { + if (comm_rank == 0) { + opal_output_verbose(10, mca_coll_han_component.han_output, "alltoallv: decide_to_use_smsc_alg: " + "MPI_IN_PLACE operation prevents smsc_alg from being used. " + "Continue with SMSC? ==> no.\n"); + } + *use_smsc = 0; + } + /* some magic in the count: if we pick 1, need_buffers() might not be accurate. We could be precisely correct and compute need_buffers for every rank's count, but that could be a lot of iteration. Just use 2 and assume @@ -519,11 +637,14 @@ static int decide_to_use_smsc_alg( OBJ_CONSTRUCT( &convertor, opal_convertor_t ); rc = opal_convertor_copy_and_prepare_for_recv(ompi_mpi_local_convertor, &rdtype->super, count_for_convertor, rbuf, 0, &convertor); + if (rc) goto cleanup1; bufs_on_device = opal_convertor_on_device(&convertor); need_bufs = opal_convertor_need_buffers(&convertor); - rc |= opal_convertor_cleanup(&convertor); - rc |= opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor, + rc = opal_convertor_cleanup(&convertor); + if (rc) goto cleanup1; + rc = opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor, &sdtype->super, count_for_convertor, sbuf, 0, &convertor); + if (rc) goto cleanup1; bufs_on_device |= opal_convertor_on_device(&convertor); opal_convertor_get_packed_size(&convertor, &packed_size_bytes); for (int jrank=0; jrankprevious_alltoallv_module); } - int w_rank = ompi_comm_rank(comm); int w_size = ompi_comm_size(comm); int use_smsc; rc = decide_to_use_smsc_alg(&use_smsc, sbuf, scounts, sdispls, sdtype, rbuf, rcounts, rdispls, rdtype, comm); - if (rc != 0) { return rc; } + if (rc != 0) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "decide_to_use_smsc_alg failed during execution! rc=%d\n", rc); + } if (!use_smsc) { return han_module->previous_alltoallv(sbuf, scounts, sdispls, sdtype, rbuf, rcounts, rdispls, rdtype, comm, han_module->previous_alltoallv_module); @@ -650,11 +779,9 @@ int mca_coll_han_alltoallv_using_smsc( low_gather_in.stype_serialized_length = ddt_pack_datatype(&sdtype->super, NULL); - low_gather_in.rtype_serialized_length = ddt_pack_datatype(&rdtype->super, NULL); uint8_t *serialization_buf; size_t serialization_buf_length = low_gather_in.stype_serialized_length - + low_gather_in.rtype_serialized_length + sizeof(struct peer_counts)*w_size; /* allocate data */ @@ -662,31 +789,16 @@ int mca_coll_han_alltoallv_using_smsc( low_gather_out = malloc(sizeof(*low_gather_out) * low_size); struct peer_data *peers = malloc(sizeof(*peers) * low_size); opal_datatype_t *peer_send_types = malloc(sizeof(*peer_send_types) * low_size); - opal_datatype_t *peer_recv_types = malloc(sizeof(*peer_recv_types) * low_size); low_gather_in.serialization_buffer = serialization_buf; - low_gather_in.sbuf = (void*)sbuf; // discard const - low_gather_in.rbuf = rbuf; + low_gather_in.sbuf = (void*)sbuf; // cast to discard the const - low_gather_in.sbuf_length = 0; - low_gather_in.rbuf_length = 0; - ptrdiff_t sextent; - ptrdiff_t rextent; - rc = ompi_datatype_type_extent( sdtype, &sextent); - rc = ompi_datatype_type_extent( rdtype, &rextent); + ptrdiff_t r_extent, r_lb; + rc = ompi_datatype_get_extent( rdtype, &r_lb, &r_extent); - /* calculate the extent of our buffers so that peers can mmap the whole thing */ - for (int jrankw=0; jrankw low_gather_in.sbuf_length) { - low_gather_in.sbuf_length = sz; - } - sz = (ompi_disp_array_get(rdispls,jrankw) + ompi_count_array_get(rcounts,jrankw))*rextent; - if (sz > low_gather_in.rbuf_length) { - low_gather_in.rbuf_length = sz; - } - } + /* calculate the full gap and span of all accesses to our buffer: */ + coll_han_alltoallv_calc_all_span( w_size, &sdtype->super, scounts, sdispls, + &low_gather_in.sbuf_gap, &low_gather_in.sbuf_span ); /* pack the serialization buffer: first the array of counts */ size_t buf_packed = 0; @@ -701,15 +813,14 @@ int mca_coll_han_alltoallv_using_smsc( } /* pack the serialization buffer: next the send and recv dtypes */ buf_packed += ddt_pack_datatype(&sdtype->super, serialization_buf + buf_packed); - buf_packed += ddt_pack_datatype(&rdtype->super, serialization_buf + buf_packed); assert(buf_packed == serialization_buf_length); rc = low_comm->c_coll->coll_allgather(&low_gather_in, sizeof(low_gather_in), MPI_BYTE, low_gather_out, sizeof(low_gather_in), MPI_BYTE, low_comm, low_comm->c_coll->coll_allgather_module); if (rc != 0) { - OPAL_OUTPUT_VERBOSE((40, mca_coll_han_component.han_output, - "Allgather failed with %d\n",rc)); + opal_output_verbose(1, mca_coll_han_component.han_output, + "During mca_coll_han_alltoallv_using_smsc: Allgather failed with rc=%d\n",rc); goto cleanup; } @@ -722,18 +833,18 @@ int mca_coll_han_alltoallv_using_smsc( */ for (int jrank=0; jranksuper; - peers[jrank].sendtype = &rdtype->super; - peers[jrank].map_ctx[0] = NULL; - peers[jrank].map_ctx[1] = NULL; - peers[jrank].map_ctx[2] = NULL; + peers[jrank].sendtype = &sdtype->super; continue; } + struct gathered_data *gathered = &low_gather_out[jrank]; struct ompi_proc_t* ompi_proc = ompi_comm_peer_lookup(low_comm, jrank); mca_smsc_endpoint_t *smsc_ep; @@ -742,7 +853,6 @@ int mca_coll_han_alltoallv_using_smsc( uint8_t *peer_serialization_buf; size_t peer_serialization_buf_length; peer_serialization_buf_length = w_size * sizeof(struct peer_counts); - peer_serialization_buf_length += gathered->rtype_serialized_length; peer_serialization_buf_length += gathered->stype_serialized_length; /* mmap the buffers */ @@ -752,18 +862,15 @@ int mca_coll_han_alltoallv_using_smsc( gathered->serialization_buffer, peer_serialization_buf_length, (void**) &peer_serialization_buf ); - peers[jrank].map_ctx[1] = mca_smsc->map_peer_region( - smsc_ep, - MCA_RCACHE_FLAGS_PERSIST, - gathered->sbuf, - gathered->sbuf_length, - (void**)&peers[jrank].sbuf ); - peers[jrank].map_ctx[2] = mca_smsc->map_peer_region( - smsc_ep, - MCA_RCACHE_FLAGS_PERSIST, - gathered->rbuf, - gathered->rbuf_length, - &peers[jrank].rbuf ); + if (gathered->sbuf_span > 0) { + peers[jrank].map_ctx[1] = mca_smsc->map_peer_region( + smsc_ep, + MCA_RCACHE_FLAGS_PERSIST, + (char*)gathered->sbuf + gathered->sbuf_gap, + gathered->sbuf_span, + &tmp_ptr ); + peers[jrank].sbuf = (char*)tmp_ptr - gathered->sbuf_gap; + } /* point the counts pointer into the mmapped serialization buffer */ peers[jrank].counts = (struct peer_counts*)peer_serialization_buf; @@ -771,17 +878,15 @@ int mca_coll_han_alltoallv_using_smsc( /* unpack the dtypes */ peer_serialization_buf += ddt_unpack_datatype( &peer_send_types[jrank], peer_serialization_buf); - peer_serialization_buf += ddt_unpack_datatype( &peer_recv_types[jrank], peer_serialization_buf); peers[jrank].sendtype = &peer_send_types[jrank]; - peers[jrank].recvtype = &peer_recv_types[jrank]; } - void **send_from_addrs = malloc(sizeof(*send_from_addrs)*low_size); - void **recv_to_addrs = malloc(sizeof(*recv_to_addrs)*low_size); - size_t *send_counts = malloc(sizeof(*send_counts)*low_size); - size_t *recv_counts = malloc(sizeof(*recv_counts)*low_size); - opal_datatype_t **send_types = malloc(sizeof(*send_types)*low_size); - opal_datatype_t **recv_types = malloc(sizeof(*recv_types)*low_size); + send_from_addrs = malloc(sizeof(*send_from_addrs)*low_size); + recv_to_addrs = malloc(sizeof(*recv_to_addrs)*low_size); + send_counts = malloc(sizeof(*send_counts)*low_size); + recv_counts = malloc(sizeof(*recv_counts)*low_size); + send_types = malloc(sizeof(*send_types)*low_size); + recv_types = malloc(sizeof(*recv_types)*low_size); /**** * Main exchange loop @@ -798,14 +903,19 @@ int mca_coll_han_alltoallv_using_smsc( ptrdiff_t peer_sextent; rc = opal_datatype_type_extent( peers[jlow].sendtype, &peer_sextent); + if (rc != 0) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "opal_datatype_type_extent returned error code = %d during mca_coll_han_alltoallv_using_smsc!\n",rc); + goto cleanup; + } void *from_addr = (uint8_t*)peers[jlow].sbuf + peers[jlow].counts[jrank_sendto].sdispl*peer_sextent; send_from_addrs[jlow] = from_addr; send_counts[jlow] = peers[jlow].counts[jrank_sendto].scount; -send_types[jlow] = peers[jlow].sendtype; -// send_types[jlow] = &(sdtype->super); + send_types[jlow] = peers[jlow].sendtype; - recv_to_addrs[jlow] = (uint8_t*)rbuf + ompi_disp_array_get(rdispls,remote_wrank)*rextent; + + recv_to_addrs[jlow] = (uint8_t*)rbuf + ompi_disp_array_get(rdispls,remote_wrank)*r_extent; recv_counts[jlow] = ompi_count_array_get(rcounts,remote_wrank); recv_types[jlow] = &(rdtype->super); } @@ -817,27 +927,34 @@ send_types[jlow] = peers[jlow].sendtype; send_from_addrs, send_counts, send_types, jrank_sendto, ntypes_send, recv_to_addrs, recv_counts, recv_types, jrank_recvfrom, ntypes_recv, comm); - if (rc != 0) goto cleanup; - } + if (rc != 0) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "alltoallv_sendrecv_w returned error code = %d!\n",rc); + goto cleanup; + } - free(send_from_addrs); - free(recv_to_addrs); - free(send_counts); - free(recv_counts); - free(send_types); - free(recv_types); + } rc=0; + + cleanup: low_comm->c_coll->coll_barrier(low_comm, low_comm->c_coll->coll_barrier_module); -cleanup: + if (send_from_addrs) { + free(send_from_addrs); + free(recv_to_addrs); + free(send_counts); + free(recv_counts); + free(send_types); + free(recv_types); + } + for (int jlow=0; jlowunmap_peer_region(peers[jlow].map_ctx[jbuf]); } @@ -847,7 +964,6 @@ send_types[jlow] = peers[jlow].sendtype; free(low_gather_out); free(peers); free(peer_send_types); - free(peer_recv_types); OPAL_OUTPUT_VERBOSE((40, mca_coll_han_component.han_output, "Alltoall Complete with %d\n",rc)); From 69ac92b0c2cedc73a68e3bf4169b567a5d3a1506 Mon Sep 17 00:00:00 2001 From: Luke Robison Date: Thu, 3 Oct 2024 20:09:17 +0000 Subject: [PATCH 2/2] coll/han: parameterize the packing buf size and count Allow the user to change the packing buffer size and how many maximum buffers will be allocated. Currently only han's alltoallv uses the buffers. Signed-off-by: Luke Robison --- ompi/mca/coll/han/coll_han.h | 5 ++--- ompi/mca/coll/han/coll_han_alltoallv.c | 25 ++++++++++++++++++++----- ompi/mca/coll/han/coll_han_component.c | 18 ++++++++++++++++-- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/ompi/mca/coll/han/coll_han.h b/ompi/mca/coll/han/coll_han.h index e8f9db87c0e..c6bf543c7d9 100644 --- a/ompi/mca/coll/han/coll_han.h +++ b/ompi/mca/coll/han/coll_han.h @@ -294,6 +294,8 @@ typedef struct mca_coll_han_component_t { int max_dynamic_errors; opal_free_list_t pack_buffers; + int64_t han_packbuf_max_count; + int64_t han_packbuf_bytes; } mca_coll_han_component_t; /* @@ -576,7 +578,4 @@ static inline struct mca_smsc_endpoint_t *mca_coll_han_get_smsc_endpoint (struct return (struct mca_smsc_endpoint_t *) proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_SMSC]; } -#define COLL_HAN_PACKBUF_PAYLOAD_BYTES (128*1024) - - #endif /* MCA_COLL_HAN_EXPORT_H */ diff --git a/ompi/mca/coll/han/coll_han_alltoallv.c b/ompi/mca/coll/han/coll_han_alltoallv.c index b9a37460b3a..6d99bc8eacf 100644 --- a/ompi/mca/coll/han/coll_han_alltoallv.c +++ b/ompi/mca/coll/han/coll_han_alltoallv.c @@ -313,20 +313,35 @@ static int alltoallv_sendrecv_w( ompi_request_t *requests[MAX_BUF_COUNT]; opal_free_list_item_t *buf_items[MAX_BUF_COUNT]; - size_t buf_len = COLL_HAN_PACKBUF_PAYLOAD_BYTES; + size_t buf_len = mca_coll_han_component.han_packbuf_bytes; int nbufs = MAX_BUF_COUNT; for (int jbuf=0; jbufhan_reproducible); + + cs->han_packbuf_bytes = 128*1024; + (void) mca_base_component_var_register(c, "packbuf_bytes", + "The number of bytes in each HAN packbuf.", + MCA_BASE_VAR_TYPE_INT64_T, NULL, 0, 0, + OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, + &cs->han_packbuf_bytes); + cs->han_packbuf_max_count = 32; + (void) mca_base_component_var_register(c, "packbuf_max_count", + "The maximum number of packbufs that are allowed to be allocated.", + MCA_BASE_VAR_TYPE_INT64_T, NULL, 0, 0, + OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, + &cs->han_packbuf_max_count); + /* * Han algorithms MCA parameters for each collective. * Shows algorithms thanks to enumerator