diff --git a/ompi/mca/coll/han/coll_han.h b/ompi/mca/coll/han/coll_han.h index e8f9db87c0e..c6bf543c7d9 100644 --- a/ompi/mca/coll/han/coll_han.h +++ b/ompi/mca/coll/han/coll_han.h @@ -294,6 +294,8 @@ typedef struct mca_coll_han_component_t { int max_dynamic_errors; opal_free_list_t pack_buffers; + int64_t han_packbuf_max_count; + int64_t han_packbuf_bytes; } mca_coll_han_component_t; /* @@ -576,7 +578,4 @@ static inline struct mca_smsc_endpoint_t *mca_coll_han_get_smsc_endpoint (struct return (struct mca_smsc_endpoint_t *) proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_SMSC]; } -#define COLL_HAN_PACKBUF_PAYLOAD_BYTES (128*1024) - - #endif /* MCA_COLL_HAN_EXPORT_H */ diff --git a/ompi/mca/coll/han/coll_han_alltoallv.c b/ompi/mca/coll/han/coll_han_alltoallv.c index 564cbb16b06..6d99bc8eacf 100644 --- a/ompi/mca/coll/han/coll_han_alltoallv.c +++ b/ompi/mca/coll/han/coll_han_alltoallv.c @@ -58,12 +58,10 @@ static inline int ring_partner(int rank, int round, int comm_size) { } struct peer_data { - const void *sbuf; // mmapped: buf1 - void *rbuf; // mmapped: buf2 - struct peer_counts *counts; // mmapped: buf3 - opal_datatype_t *sendtype; // deserialized from buf3, local copy - opal_datatype_t *recvtype; // deserialized from buf3, local copy - void *map_ctx[3]; + const void *sbuf; /* mmapped: ctx in map_ctx[1] */ + struct peer_counts *counts; /* mmapped: ctx in map_ctx[0] */ + opal_datatype_t *sendtype; /* local. deserialized from tailer of map_ctx[0] */ + void *map_ctx[2]; }; struct peer_counts { @@ -75,14 +73,54 @@ struct peer_counts { struct gathered_data { size_t stype_serialized_length; - size_t rtype_serialized_length; - size_t sbuf_length; - size_t rbuf_length; + ssize_t sbuf_span; + ssize_t sbuf_gap; void *sbuf; - void *rbuf; void *serialization_buffer; }; +/** calculate the minumum true lower bound and maximum true upper bound for all + * peers accessing a buffer. + * Outputs: min_lb and max_lb contain signed offsets in bytes from buffer pointer. + */ +static void coll_han_alltoallv_calc_all_span( + int nranks, + opal_datatype_t *dtype, + ompi_count_array_t counts, + ompi_disp_array_t displs, + ssize_t *gap, + ssize_t *span ) { + ptrdiff_t min_lb = PTRDIFF_MAX; + ptrdiff_t max_ub = PTRDIFF_MIN; + *gap = 0; + *span = 0; + + ptrdiff_t dummy_lb; + ptrdiff_t dtype_extent; + opal_datatype_get_extent(dtype, &dummy_lb, &dtype_extent); + + for (int jrankw=0; jrankw 0) { + ptrdiff_t one_gap; + ptrdiff_t one_span = opal_datatype_span(dtype, count, &one_gap); + lb = dtype_extent*displ + one_gap; + ub = lb + one_span; + min_lb = MIN( min_lb, lb ); + max_ub = MAX( max_ub, ub ); + } + } + if (max_ub > min_lb) { + *gap = min_lb; + *span = max_ub - min_lb; + } +} + /* Serialize the datatype into the buffer and return buffer length. If buf is NULL, just return the length of the required buffer. */ static size_t ddt_pack_datatype(opal_datatype_t* type, uint8_t* buf) @@ -142,7 +180,13 @@ static size_t ddt_unpack_datatype(opal_datatype_t* type, uint8_t* buf) return length; } -/* basic implementation: send all buffers without packing keeping a limited number in flight. */ +/* Simple implementation: send all buffers without packing, but still keeping a + limited number in flight. + + Note: CMA on XPMEM-mapped buffers does not work. If the low-level network + provider attempts to use CMA to implement send/recv, then errors will + occur! +*/ static inline int alltoallv_sendrecv_w_direct_for_debugging( void **send_from_addrs, size_t *send_counts, @@ -160,11 +204,12 @@ static inline int alltoallv_sendrecv_w_direct_for_debugging( const int MAX_BUF_COUNT=8; int nreqs = MAX_BUF_COUNT; ompi_request_t *requests[MAX_BUF_COUNT]; + const char* problem_hint; int jfirst_sendreq = nreqs/2 + nreqs%2; int jreq; - ompi_datatype_t *yuck_ompi_dtype_from_opal; + ompi_datatype_t *ompi_dtype_from_opal; int rc = 0; int jloop; @@ -182,9 +227,17 @@ static inline int alltoallv_sendrecv_w_direct_for_debugging( if (jloop < nreqs){ jreq = jloop; have_completion = 0; + requests[jreq] = MPI_REQUEST_NULL; } else { have_completion = 1; - ompi_request_wait_any( nreqs, requests, &jreq, MPI_STATUS_IGNORE ); + jreq = jloop%nreqs; + if (requests[jreq] == MPI_REQUEST_NULL) { + continue; + } + problem_hint = "waiting for request completion"; + rc = ompi_request_wait(&requests[jreq], MPI_STATUS_IGNORE); + if (rc) break; + requests[jreq] = MPI_REQUEST_NULL; } int ii_send_req = jreq >= jfirst_sendreq; if (have_completion) { @@ -194,30 +247,46 @@ static inline int alltoallv_sendrecv_w_direct_for_debugging( jrecvs_completed++; } - requests[jreq] = &ompi_request_null.request; if (ii_send_req && jsends_posted < ntypes_send) { - rc = ompi_datatype_create_contiguous( 1, (ompi_datatype_t*)send_types[jsends_posted], &yuck_ompi_dtype_from_opal ); - ompi_datatype_commit(&yuck_ompi_dtype_from_opal); - MCA_PML_CALL(isend - (send_from_addrs[jsends_posted], (int)send_counts[jsends_posted], yuck_ompi_dtype_from_opal, jrank_sendto, + problem_hint = "creating or committing temporary datatype"; + rc = ompi_datatype_create_contiguous( 1, (ompi_datatype_t*)send_types[jsends_posted], &ompi_dtype_from_opal ); + if (rc) break; + rc = ompi_datatype_commit(&ompi_dtype_from_opal); + if (rc) break; + problem_hint = "posting isend"; + rc = MCA_PML_CALL(isend + (send_from_addrs[jsends_posted], (int)send_counts[jsends_posted], ompi_dtype_from_opal, jrank_sendto, MCA_COLL_BASE_TAG_ALLTOALLV, MCA_PML_BASE_SEND_STANDARD, comm, &requests[jreq])); - ompi_datatype_destroy( &yuck_ompi_dtype_from_opal ); + if (rc) break; + problem_hint = "destroying temporary datatype"; + rc = ompi_datatype_destroy( &ompi_dtype_from_opal ); + if (rc) break; + jsends_posted++; } if (!ii_send_req && jrecvs_posted < ntypes_recv ) { - rc = ompi_datatype_create_contiguous( 1, (ompi_datatype_t*)recv_types[jrecvs_posted], &yuck_ompi_dtype_from_opal ); - ompi_datatype_commit(&yuck_ompi_dtype_from_opal); - MCA_PML_CALL(irecv - (recv_to_addrs[jrecvs_posted], (int)recv_counts[jrecvs_posted], yuck_ompi_dtype_from_opal, jrank_recvfrom, + problem_hint = "creating or committing temporary datatype"; + rc = ompi_datatype_create_contiguous( 1, (ompi_datatype_t*)recv_types[jrecvs_posted], &ompi_dtype_from_opal ); + if (rc) break; + rc = ompi_datatype_commit(&ompi_dtype_from_opal); + if (rc) break; + problem_hint = "posting irecv"; + rc = MCA_PML_CALL(irecv + (recv_to_addrs[jrecvs_posted], (int)recv_counts[jrecvs_posted], ompi_dtype_from_opal, jrank_recvfrom, MCA_COLL_BASE_TAG_ALLTOALLV, comm, &requests[jreq])); - ompi_datatype_destroy( &yuck_ompi_dtype_from_opal ); + if (rc) break; + problem_hint = "destroying temporary datatype"; + rc = ompi_datatype_destroy( &ompi_dtype_from_opal ); + if (rc) break; jrecvs_posted++; } - - - if (rc) { break; }; + } + if (rc) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "Failed in alltoallv_sendrecv_w_direct_for_debugging while %s: jloop=%d, rc=%d\n", + problem_hint, jloop,rc); } return rc; } @@ -244,22 +313,41 @@ static int alltoallv_sendrecv_w( ompi_request_t *requests[MAX_BUF_COUNT]; opal_free_list_item_t *buf_items[MAX_BUF_COUNT]; - size_t buf_len = COLL_HAN_PACKBUF_PAYLOAD_BYTES; + size_t buf_len = mca_coll_han_component.han_packbuf_bytes; int nbufs = MAX_BUF_COUNT; for (int jbuf=0; jbuf 0; int ii_more_sends_to_complete = nsend_req_pending > 0; - int ii_more_recvs_to_post = recv_post_remaining_bytes > 0; - int ii_more_recvs_to_complete = recv_convertor_bytes_remaining > 0 || jtype_recv < ntypes_recv; - - if ( !( ii_more_sends_to_post || ii_more_sends_to_complete || - ii_more_recvs_to_post || ii_more_recvs_to_complete) ) { + ii_more_recvs_to_post ) ) { /* exit. All done! */ break; } if (jloop >= nreqs) { /* Common Case: */ - /* wait for any send or recv to complete */ - rc = ompi_request_wait_any(nreqs, requests, &jreq, MPI_STATUS_IGNORE); - if (rc != 0 || jreq == MPI_UNDEFINED) { - return 1; + /* wait for the send or recv to complete */ + jreq = jloop%nreqs; + if (requests[jreq] == MPI_REQUEST_NULL) { + if (++sequential_continues > nbufs) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "ERROR: no active requests to wait on! Loop=%ld: %d %d %d\n", + jloop, + ii_more_sends_to_post, ii_more_sends_to_complete, + ii_more_recvs_to_post ); + return MPI_ERR_INTERN; + } + continue; } + sequential_continues = 0; + ompi_request_wait( &requests[jreq], MPI_STATUS_IGNORE ); have_completion = 1; + requests[jreq] = MPI_REQUEST_NULL; } else { /* priming the loop: post sends or recvs while have_completion=0. @@ -358,7 +460,7 @@ static int alltoallv_sendrecv_w( nsend_req_pending--; } - ssize_t buf_remain = buf_len; + int buf_remain = buf_len - buf_header_len; while (buf_remain > 0 && (jtype_send < ntypes_send || send_pack_bytes_remaining > 0) ) { if (jtype_send < ntypes_send && send_pack_bytes_remaining==0) { /* switch to next datatype and prepare convertor: */ @@ -383,15 +485,25 @@ static int alltoallv_sendrecv_w( iov.iov_len = buf_remain; iov_count = 1; opal_convertor_pack(&send_convertor, &iov, &iov_count, &nbytes_pack); + if (!nbytes_pack) { + /* can happen when buffer is only a few bytes from full, + and convertor doesn't want to split a primitive data + type. */ + break; + } buf_remain -= nbytes_pack; send_pack_bytes_remaining -= nbytes_pack; } } + /* start the buffer with an integer describing how many bytes we + packed, *including* the integer's size */ + int pack_bytes_len = buf_len - buf_remain; /* post send */ - if (buf_len - buf_remain > 0) { + if (pack_bytes_len > buf_header_len) { + *((int*)req_buf) = pack_bytes_len; MCA_PML_CALL(isend - (req_buf, (buf_len-buf_remain), MPI_PACKED, jrank_sendto, + (req_buf, pack_bytes_len, MPI_PACKED, jrank_sendto, MCA_COLL_BASE_TAG_ALLTOALLV, MCA_PML_BASE_SEND_STANDARD, comm, &requests[jreq])); nsend_req_pending++; @@ -402,8 +514,8 @@ static int alltoallv_sendrecv_w( } else { /* recv request */ if (have_completion) { /* unpack data */ - ssize_t buf_remain = buf_len; - size_t buf_converted = 0; + ssize_t buf_remain = *((int*)req_buf) - buf_header_len; + size_t buf_converted = buf_header_len; while (true) { if (jtype_recv < ntypes_recv && recv_convertor_bytes_remaining==0) { /* switch to next datatype and prepare convertor: */ @@ -416,11 +528,21 @@ static int alltoallv_sendrecv_w( 0, &recv_convertor); opal_convertor_get_packed_size( &recv_convertor, &recv_convertor_bytes_remaining ); + if (recv_convertor_bytes_remaining == 0) { + continue; + } } } if (jtype_recv == ntypes_recv && recv_convertor_bytes_remaining==0 ) { /* _all_ recving work is done! */ + ii_more_recvs_to_post = false; buf_remain = 0; + for (int kreq=0; kreq 0) { + if (ii_more_recvs_to_post) { /* post a new recv */ MCA_PML_CALL(irecv - (req_buf, bytes_to_post, MPI_PACKED, jrank_recvfrom, + (req_buf, buf_len, MPI_PACKED, jrank_recvfrom, MCA_COLL_BASE_TAG_ALLTOALLV, comm, &requests[jreq])); - - /* update posted_recv_bytes */ - recv_post_remaining_bytes -= bytes_to_post; } else { requests[jreq] = MPI_REQUEST_NULL; } @@ -489,14 +607,20 @@ static int decide_to_use_smsc_alg( /* Perform an allreduce on all ranks to decide if this algorithm is worth - using. There are three important things: - - 1. Device buffers. XPMEM doesn't support GPU buffers, so we cannot proceed + using. There are four important things: + + 1. sbuf == MPI_IN_PLACE. This algorithm does not currently support the + in-place operation. (Future developers may note that the inter-node + communications are ordered such that in-place could be supported, but + additional ordering and/or buffering would be required to ensure local + ranks do not overwrite buffers before sends are posted. However, for now + we just bail out.) + 2. Device buffers. XPMEM doesn't support GPU buffers, so we cannot proceed with this algorithm. - 2. Send size per rank. This algorithm can pack small messages together, + 3. Send size per rank. This algorithm can pack small messages together, but this typically isn't helpful for large messages, and XPMEM-mapped memory cannot be registered with ibv_reg_mr. - 3. Contiguous buffers. The exception to #2 above is if we can't post our + 4. Contiguous buffers. The exception to #2 above is if we can't post our sends/recvs in one large block to the NIC. For these non-contiguous datatypes, our packing algorithm is better because (a) we re-use our buffers from a free-list which can remain registered to the NIC, and (b) @@ -509,6 +633,15 @@ static int decide_to_use_smsc_alg( our execution time, which is <1/10 of the "basic" algorithm. */ + if (sbuf == MPI_IN_PLACE) { + if (comm_rank == 0) { + opal_output_verbose(10, mca_coll_han_component.han_output, "alltoallv: decide_to_use_smsc_alg: " + "MPI_IN_PLACE operation prevents smsc_alg from being used. " + "Continue with SMSC? ==> no.\n"); + } + *use_smsc = 0; + } + /* some magic in the count: if we pick 1, need_buffers() might not be accurate. We could be precisely correct and compute need_buffers for every rank's count, but that could be a lot of iteration. Just use 2 and assume @@ -519,11 +652,14 @@ static int decide_to_use_smsc_alg( OBJ_CONSTRUCT( &convertor, opal_convertor_t ); rc = opal_convertor_copy_and_prepare_for_recv(ompi_mpi_local_convertor, &rdtype->super, count_for_convertor, rbuf, 0, &convertor); + if (rc) goto cleanup1; bufs_on_device = opal_convertor_on_device(&convertor); need_bufs = opal_convertor_need_buffers(&convertor); - rc |= opal_convertor_cleanup(&convertor); - rc |= opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor, + rc = opal_convertor_cleanup(&convertor); + if (rc) goto cleanup1; + rc = opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor, &sdtype->super, count_for_convertor, sbuf, 0, &convertor); + if (rc) goto cleanup1; bufs_on_device |= opal_convertor_on_device(&convertor); opal_convertor_get_packed_size(&convertor, &packed_size_bytes); for (int jrank=0; jrankprevious_alltoallv_module); } - int w_rank = ompi_comm_rank(comm); int w_size = ompi_comm_size(comm); int use_smsc; rc = decide_to_use_smsc_alg(&use_smsc, sbuf, scounts, sdispls, sdtype, rbuf, rcounts, rdispls, rdtype, comm); - if (rc != 0) { return rc; } + if (rc != 0) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "decide_to_use_smsc_alg failed during execution! rc=%d\n", rc); + } if (!use_smsc) { return han_module->previous_alltoallv(sbuf, scounts, sdispls, sdtype, rbuf, rcounts, rdispls, rdtype, comm, han_module->previous_alltoallv_module); @@ -650,11 +794,9 @@ int mca_coll_han_alltoallv_using_smsc( low_gather_in.stype_serialized_length = ddt_pack_datatype(&sdtype->super, NULL); - low_gather_in.rtype_serialized_length = ddt_pack_datatype(&rdtype->super, NULL); uint8_t *serialization_buf; size_t serialization_buf_length = low_gather_in.stype_serialized_length - + low_gather_in.rtype_serialized_length + sizeof(struct peer_counts)*w_size; /* allocate data */ @@ -662,31 +804,16 @@ int mca_coll_han_alltoallv_using_smsc( low_gather_out = malloc(sizeof(*low_gather_out) * low_size); struct peer_data *peers = malloc(sizeof(*peers) * low_size); opal_datatype_t *peer_send_types = malloc(sizeof(*peer_send_types) * low_size); - opal_datatype_t *peer_recv_types = malloc(sizeof(*peer_recv_types) * low_size); low_gather_in.serialization_buffer = serialization_buf; - low_gather_in.sbuf = (void*)sbuf; // discard const - low_gather_in.rbuf = rbuf; + low_gather_in.sbuf = (void*)sbuf; // cast to discard the const - low_gather_in.sbuf_length = 0; - low_gather_in.rbuf_length = 0; - ptrdiff_t sextent; - ptrdiff_t rextent; - rc = ompi_datatype_type_extent( sdtype, &sextent); - rc = ompi_datatype_type_extent( rdtype, &rextent); + ptrdiff_t r_extent, r_lb; + rc = ompi_datatype_get_extent( rdtype, &r_lb, &r_extent); - /* calculate the extent of our buffers so that peers can mmap the whole thing */ - for (int jrankw=0; jrankw low_gather_in.sbuf_length) { - low_gather_in.sbuf_length = sz; - } - sz = (ompi_disp_array_get(rdispls,jrankw) + ompi_count_array_get(rcounts,jrankw))*rextent; - if (sz > low_gather_in.rbuf_length) { - low_gather_in.rbuf_length = sz; - } - } + /* calculate the full gap and span of all accesses to our buffer: */ + coll_han_alltoallv_calc_all_span( w_size, &sdtype->super, scounts, sdispls, + &low_gather_in.sbuf_gap, &low_gather_in.sbuf_span ); /* pack the serialization buffer: first the array of counts */ size_t buf_packed = 0; @@ -701,15 +828,14 @@ int mca_coll_han_alltoallv_using_smsc( } /* pack the serialization buffer: next the send and recv dtypes */ buf_packed += ddt_pack_datatype(&sdtype->super, serialization_buf + buf_packed); - buf_packed += ddt_pack_datatype(&rdtype->super, serialization_buf + buf_packed); assert(buf_packed == serialization_buf_length); rc = low_comm->c_coll->coll_allgather(&low_gather_in, sizeof(low_gather_in), MPI_BYTE, low_gather_out, sizeof(low_gather_in), MPI_BYTE, low_comm, low_comm->c_coll->coll_allgather_module); if (rc != 0) { - OPAL_OUTPUT_VERBOSE((40, mca_coll_han_component.han_output, - "Allgather failed with %d\n",rc)); + opal_output_verbose(1, mca_coll_han_component.han_output, + "During mca_coll_han_alltoallv_using_smsc: Allgather failed with rc=%d\n",rc); goto cleanup; } @@ -722,18 +848,18 @@ int mca_coll_han_alltoallv_using_smsc( */ for (int jrank=0; jranksuper; - peers[jrank].sendtype = &rdtype->super; - peers[jrank].map_ctx[0] = NULL; - peers[jrank].map_ctx[1] = NULL; - peers[jrank].map_ctx[2] = NULL; + peers[jrank].sendtype = &sdtype->super; continue; } + struct gathered_data *gathered = &low_gather_out[jrank]; struct ompi_proc_t* ompi_proc = ompi_comm_peer_lookup(low_comm, jrank); mca_smsc_endpoint_t *smsc_ep; @@ -742,7 +868,6 @@ int mca_coll_han_alltoallv_using_smsc( uint8_t *peer_serialization_buf; size_t peer_serialization_buf_length; peer_serialization_buf_length = w_size * sizeof(struct peer_counts); - peer_serialization_buf_length += gathered->rtype_serialized_length; peer_serialization_buf_length += gathered->stype_serialized_length; /* mmap the buffers */ @@ -752,18 +877,15 @@ int mca_coll_han_alltoallv_using_smsc( gathered->serialization_buffer, peer_serialization_buf_length, (void**) &peer_serialization_buf ); - peers[jrank].map_ctx[1] = mca_smsc->map_peer_region( - smsc_ep, - MCA_RCACHE_FLAGS_PERSIST, - gathered->sbuf, - gathered->sbuf_length, - (void**)&peers[jrank].sbuf ); - peers[jrank].map_ctx[2] = mca_smsc->map_peer_region( - smsc_ep, - MCA_RCACHE_FLAGS_PERSIST, - gathered->rbuf, - gathered->rbuf_length, - &peers[jrank].rbuf ); + if (gathered->sbuf_span > 0) { + peers[jrank].map_ctx[1] = mca_smsc->map_peer_region( + smsc_ep, + MCA_RCACHE_FLAGS_PERSIST, + (char*)gathered->sbuf + gathered->sbuf_gap, + gathered->sbuf_span, + &tmp_ptr ); + peers[jrank].sbuf = (char*)tmp_ptr - gathered->sbuf_gap; + } /* point the counts pointer into the mmapped serialization buffer */ peers[jrank].counts = (struct peer_counts*)peer_serialization_buf; @@ -771,17 +893,15 @@ int mca_coll_han_alltoallv_using_smsc( /* unpack the dtypes */ peer_serialization_buf += ddt_unpack_datatype( &peer_send_types[jrank], peer_serialization_buf); - peer_serialization_buf += ddt_unpack_datatype( &peer_recv_types[jrank], peer_serialization_buf); peers[jrank].sendtype = &peer_send_types[jrank]; - peers[jrank].recvtype = &peer_recv_types[jrank]; } - void **send_from_addrs = malloc(sizeof(*send_from_addrs)*low_size); - void **recv_to_addrs = malloc(sizeof(*recv_to_addrs)*low_size); - size_t *send_counts = malloc(sizeof(*send_counts)*low_size); - size_t *recv_counts = malloc(sizeof(*recv_counts)*low_size); - opal_datatype_t **send_types = malloc(sizeof(*send_types)*low_size); - opal_datatype_t **recv_types = malloc(sizeof(*recv_types)*low_size); + send_from_addrs = malloc(sizeof(*send_from_addrs)*low_size); + recv_to_addrs = malloc(sizeof(*recv_to_addrs)*low_size); + send_counts = malloc(sizeof(*send_counts)*low_size); + recv_counts = malloc(sizeof(*recv_counts)*low_size); + send_types = malloc(sizeof(*send_types)*low_size); + recv_types = malloc(sizeof(*recv_types)*low_size); /**** * Main exchange loop @@ -798,14 +918,19 @@ int mca_coll_han_alltoallv_using_smsc( ptrdiff_t peer_sextent; rc = opal_datatype_type_extent( peers[jlow].sendtype, &peer_sextent); + if (rc != 0) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "opal_datatype_type_extent returned error code = %d during mca_coll_han_alltoallv_using_smsc!\n",rc); + goto cleanup; + } void *from_addr = (uint8_t*)peers[jlow].sbuf + peers[jlow].counts[jrank_sendto].sdispl*peer_sextent; send_from_addrs[jlow] = from_addr; send_counts[jlow] = peers[jlow].counts[jrank_sendto].scount; -send_types[jlow] = peers[jlow].sendtype; -// send_types[jlow] = &(sdtype->super); + send_types[jlow] = peers[jlow].sendtype; - recv_to_addrs[jlow] = (uint8_t*)rbuf + ompi_disp_array_get(rdispls,remote_wrank)*rextent; + + recv_to_addrs[jlow] = (uint8_t*)rbuf + ompi_disp_array_get(rdispls,remote_wrank)*r_extent; recv_counts[jlow] = ompi_count_array_get(rcounts,remote_wrank); recv_types[jlow] = &(rdtype->super); } @@ -817,27 +942,34 @@ send_types[jlow] = peers[jlow].sendtype; send_from_addrs, send_counts, send_types, jrank_sendto, ntypes_send, recv_to_addrs, recv_counts, recv_types, jrank_recvfrom, ntypes_recv, comm); - if (rc != 0) goto cleanup; - } + if (rc != 0) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "alltoallv_sendrecv_w returned error code = %d!\n",rc); + goto cleanup; + } - free(send_from_addrs); - free(recv_to_addrs); - free(send_counts); - free(recv_counts); - free(send_types); - free(recv_types); + } rc=0; + + cleanup: low_comm->c_coll->coll_barrier(low_comm, low_comm->c_coll->coll_barrier_module); -cleanup: + if (send_from_addrs) { + free(send_from_addrs); + free(recv_to_addrs); + free(send_counts); + free(recv_counts); + free(send_types); + free(recv_types); + } + for (int jlow=0; jlowunmap_peer_region(peers[jlow].map_ctx[jbuf]); } @@ -847,7 +979,6 @@ send_types[jlow] = peers[jlow].sendtype; free(low_gather_out); free(peers); free(peer_send_types); - free(peer_recv_types); OPAL_OUTPUT_VERBOSE((40, mca_coll_han_component.han_output, "Alltoall Complete with %d\n",rc)); diff --git a/ompi/mca/coll/han/coll_han_component.c b/ompi/mca/coll/han/coll_han_component.c index b3362314602..3926faaaac3 100644 --- a/ompi/mca/coll/han/coll_han_component.c +++ b/ompi/mca/coll/han/coll_han_component.c @@ -126,9 +126,9 @@ static int han_open(void) /* opal_class_t *frag_class */ OBJ_CLASS(opal_free_list_item_t), /* payload_buffer_size, payload_buffer_alignment */ - COLL_HAN_PACKBUF_PAYLOAD_BYTES, 8, + mca_coll_han_component.han_packbuf_bytes, 8, /* num_elements_to_alloc, max_elements_to_alloc, num_elements_per_alloc */ - 0, 32, 8, + 0, mca_coll_han_component.han_packbuf_max_count, 8, /* *mpool, rcache_reg_flags, *rcache, */ NULL, 0, NULL, /* fn_t item_init, void *ctx */ @@ -456,6 +456,20 @@ static int han_register(void) MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_READONLY, &cs->han_reproducible); + + cs->han_packbuf_bytes = 128*1024; + (void) mca_base_component_var_register(c, "packbuf_bytes", + "The number of bytes in each HAN packbuf.", + MCA_BASE_VAR_TYPE_INT64_T, NULL, 0, 0, + OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, + &cs->han_packbuf_bytes); + cs->han_packbuf_max_count = 32; + (void) mca_base_component_var_register(c, "packbuf_max_count", + "The maximum number of packbufs that are allowed to be allocated.", + MCA_BASE_VAR_TYPE_INT64_T, NULL, 0, 0, + OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, + &cs->han_packbuf_max_count); + /* * Han algorithms MCA parameters for each collective. * Shows algorithms thanks to enumerator