Skip to content

mca/fcoll: read_all Receive data without temp buff by using MPI datatypes #8433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2008-2021 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
Expand All @@ -30,6 +30,9 @@
#include "ompi/mca/fcoll/base/base.h"
#include "ompi/mca/common/ompio/common_ompio.h"

#define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG 123
#define INIT_LEN 10

BEGIN_C_DECLS

/* Globally exported variables */
Expand Down
223 changes: 111 additions & 112 deletions ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2015 University of Houston. All rights reserved.
* Copyright (c) 2008-2021 University of Houston. All rights reserved.
* Copyright (c) 2017-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
Expand Down Expand Up @@ -56,7 +56,6 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
struct ompi_datatype_t *datatype,
ompi_status_public_t *status)
{
MPI_Aint position = 0;
MPI_Aint total_bytes = 0; /* total bytes to be read */
MPI_Aint bytes_to_read_in_cycle = 0; /* left to be read in a cycle*/
MPI_Aint bytes_per_cycle = 0; /* total read in each cycle by each process*/
Expand All @@ -75,7 +74,6 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
int iov_index = 0;
size_t current_position = 0;
struct iovec *local_iov_array=NULL, *global_iov_array=NULL;
char *receive_buf = NULL;
MPI_Aint *memory_displacements=NULL;
/* global iovec at the readers that contain the iovecs created from
file_set_view */
Expand All @@ -96,12 +94,12 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
size_t max_data = 0;
MPI_Aint *total_bytes_per_process = NULL;
ompi_datatype_t **sendtype = NULL;
MPI_Request *send_req=NULL, recv_req=NULL;
MPI_Request *send_req = NULL;
MPI_Request recv_req = MPI_REQUEST_NULL;
int my_aggregator =-1;
bool recvbuf_is_contiguous=false;
size_t ftype_size;
ptrdiff_t ftype_extent, lb;

int* blocklength_proc = NULL;
ptrdiff_t* displs_proc = NULL;

#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
Expand All @@ -113,32 +111,16 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
/**************************************************************************
** 1. In case the data is not contigous in memory, decode it into an iovec
**************************************************************************/

opal_datatype_type_size ( &datatype->super, &ftype_size );
opal_datatype_get_extent ( &datatype->super, &lb, &ftype_extent );

if ( (ftype_extent == (ptrdiff_t) ftype_size) &&
opal_datatype_is_contiguous_memory_layout(&datatype->super,1) &&
0 == lb ) {
recvbuf_is_contiguous = true;
}


if (! recvbuf_is_contiguous ) {
ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *)fh,
datatype,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
if (OMPI_SUCCESS != ret){
goto exit;
}
}
else {
max_data = count * datatype->super.size;
ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *)fh,
datatype,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
if (OMPI_SUCCESS != ret){
goto exit;
}

if ( MPI_STATUS_IGNORE != status ) {
Expand Down Expand Up @@ -743,6 +725,7 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
start_rcomm_time = MPI_Wtime();
#endif
for (i=0;i<fh->f_procs_per_group;i++){
size_t datatype_size;
send_req[i] = MPI_REQUEST_NULL;
if ( 0 < disp_index[i] ) {
ompi_datatype_create_hindexed(disp_index[i],
Expand All @@ -751,16 +734,20 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
MPI_BYTE,
&sendtype[i]);
ompi_datatype_commit(&sendtype[i]);
ret = MCA_PML_CALL (isend(global_buf,
1,
sendtype[i],
fh->f_procs_in_group[i],
123,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&send_req[i]));
if(OMPI_SUCCESS != ret){
goto exit;
opal_datatype_type_size(&sendtype[i]->super, &datatype_size);

if(datatype_size) {
ret = MCA_PML_CALL (isend(global_buf,
1,
sendtype[i],
fh->f_procs_in_group[i],
FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&send_req[i]));
if(OMPI_SUCCESS != ret){
goto exit;
}
}
}
}
Expand All @@ -773,35 +760,80 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
/**********************************************************
*** 7f. Scatter the Data from the readers
*********************************************************/
if ( recvbuf_is_contiguous ) {
receive_buf = &((char*)buf)[position];
}
else if (bytes_received) {
/* allocate a receive buffer and copy the data that needs
to be received into it in case the data is non-contigous
in memory */
receive_buf = malloc (bytes_received);
if (NULL == receive_buf) {
if(bytes_received) {
size_t remaining = bytes_received;
int block_index = -1;
int blocklength_size = INIT_LEN;

ptrdiff_t recv_mem_address = 0;
ompi_datatype_t *newType = MPI_DATATYPE_NULL;

blocklength_proc = (int *) calloc (blocklength_size, sizeof (int));
displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t));

if (NULL == blocklength_proc || NULL == displs_proc ) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
}

while (remaining) {
block_index++;

if(0 == block_index) {
recv_mem_address = (ptrdiff_t) (decoded_iov[iov_index].iov_base) + current_position;
}
else {
// Reallocate more memory if blocklength_size is not enough
if(0 == block_index % INIT_LEN) {
blocklength_size += INIT_LEN;
blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int));
displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t));
}
displs_proc[block_index] = (ptrdiff_t) (decoded_iov[iov_index].iov_base) +
current_position - recv_mem_address;
}

if (remaining >= (decoded_iov[iov_index].iov_len - current_position)) {
blocklength_proc[block_index] = decoded_iov[iov_index].iov_len - current_position;

remaining = remaining - blocklength_proc[block_index];
iov_index = iov_index + 1;
current_position = 0;
}
else {
blocklength_proc[block_index] = remaining;
current_position += remaining;
remaining = 0;
}
}

ompi_datatype_create_hindexed(block_index+1,
blocklength_proc,
displs_proc,
MPI_BYTE,
&newType);
ompi_datatype_commit(&newType);

#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_rcomm_time = MPI_Wtime();
start_rcomm_time = MPI_Wtime();
#endif
ret = MCA_PML_CALL(irecv(receive_buf,
bytes_received,
MPI_BYTE,
my_aggregator,
123,
fh->f_comm,
&recv_req));
if (OMPI_SUCCESS != ret){
goto exit;
}
ret = MCA_PML_CALL(irecv((char *)recv_mem_address,
1,
newType,
my_aggregator,
FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG,
fh->f_comm,
&recv_req));

if ( MPI_DATATYPE_NULL != newType ) {
ompi_datatype_destroy(&newType);
}

if (OMPI_SUCCESS != ret){
goto exit;
}
}

if (my_aggregator == fh->f_rank){
ret = ompi_request_wait_all (fh->f_procs_per_group,
Expand All @@ -816,50 +848,12 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
if (OMPI_SUCCESS != ret){
goto exit;
}
position += bytes_received;

/* If data is not contigous in memory, copy the data from the
receive buffer into the buffer passed in */
if (!recvbuf_is_contiguous ) {
ptrdiff_t mem_address;
size_t remaining = 0;
size_t temp_position = 0;

remaining = bytes_received;

while (remaining) {
mem_address = (ptrdiff_t)
(decoded_iov[iov_index].iov_base) + current_position;

if (remaining >=
(decoded_iov[iov_index].iov_len - current_position)) {
memcpy ((IOVBASE_TYPE *) mem_address,
receive_buf+temp_position,
decoded_iov[iov_index].iov_len - current_position);
remaining = remaining -
(decoded_iov[iov_index].iov_len - current_position);
temp_position = temp_position +
(decoded_iov[iov_index].iov_len - current_position);
iov_index = iov_index + 1;
current_position = 0;
}
else {
memcpy ((IOVBASE_TYPE *) mem_address,
receive_buf+temp_position,
remaining);
current_position = current_position + remaining;
remaining = 0;
}
}

if (NULL != receive_buf) {
free (receive_buf);
receive_buf = NULL;
}
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_rcomm_time = MPI_Wtime();
rcomm_time += end_rcomm_time - start_rcomm_time;
if(bytes_received) {
end_rcomm_time = MPI_Wtime();
rcomm_time += end_rcomm_time - start_rcomm_time;
}
#endif
} /* end for (index=0; index < cycles; index ++) */

Expand All @@ -881,12 +875,6 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
#endif

exit:
if (!recvbuf_is_contiguous) {
if (NULL != receive_buf) {
free (receive_buf);
receive_buf = NULL;
}
}
if (NULL != global_buf) {
free (global_buf);
global_buf = NULL;
Expand Down Expand Up @@ -916,6 +904,17 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
free (displs);
displs = NULL;
}

if (NULL != blocklength_proc) {
free (blocklength_proc);
blocklength_proc = NULL;
}

if (NULL != displs_proc) {
free (displs_proc);
displs_proc = NULL;
}

if (my_aggregator == fh->f_rank) {

if (NULL != sorted_file_offsets){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2008-2021 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
Expand All @@ -36,8 +36,6 @@


#define DEBUG_ON 0
#define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG 123
#define INIT_LEN 10

/*Used for loading file-offsets per aggregator*/
typedef struct mca_io_ompio_local_io_array{
Expand Down
5 changes: 4 additions & 1 deletion ompi/mca/fcoll/vulcan/fcoll_vulcan.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2017 University of Houston. All rights reserved.
* Copyright (c) 2008-2021 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
Expand All @@ -30,6 +30,9 @@
#include "ompi/mca/fcoll/base/base.h"
#include "ompi/mca/common/ompio/common_ompio.h"

#define FCOLL_VULCAN_SHUFFLE_TAG 123
#define INIT_LEN 10

BEGIN_C_DECLS

/* Globally exported variables */
Expand Down
Loading