diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index c1997d3841f..7126ca486b9 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -16,7 +16,7 @@ * Copyright (c) 2011-2013 Inria. All rights reserved. * Copyright (c) 2011-2013 Universite Bordeaux 1 * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2012-2015 Los Alamos National Security, LLC. + * Copyright (c) 2012-2016 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. @@ -358,13 +358,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -374,13 +368,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, newcomp->c_contextid, comm->c_contextid ); /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -609,13 +597,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first, doesn't matter */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -634,36 +616,15 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); exit: - if ( NULL != results ) { - free ( results ); - } - if ( NULL != sorted ) { - free ( sorted ); - } - if ( NULL != rresults) { - free ( rresults ); - } - if ( NULL != rsorted ) { - free ( rsorted ); - } - if ( NULL != lranks ) { - free ( lranks ); - } - if ( NULL != rranks ) { - free ( rranks ); - } + free ( results ); + free ( sorted ); + free ( rresults ); + free ( rsorted ); + free ( lranks ); + free ( rranks ); /* Step 4: if we are not part of the comm, free the struct */ /* --------------------------------------------------------- */ @@ -675,7 +636,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, } *newcomm = newcomp; - return ( rc ); + return rc; } @@ -686,21 +647,32 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, * Produces an array of ranks that will be part of the local/remote group in the * new communicator. The results array will be modified by this call. */ -static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int **ranks_out, int *rank_size) { +static int ompi_comm_split_type_get_part (ompi_group_t *group, const int split_type, int **ranks_out, int *rank_size) { int size = ompi_group_size (group); int my_size = 0; int *ranks; int ret; + ranks = malloc (size * sizeof (int)); + if (OPAL_UNLIKELY(NULL == ranks)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + for (int i = 0 ; i < size ; ++i) { ompi_proc_t *proc = ompi_group_get_proc_ptr_raw (group, i); uint16_t locality, *u16ptr; - int split_type = results[i * 2]; int include = false; if (ompi_proc_is_sentinel (proc)) { opal_process_name_t proc_name = ompi_proc_sentinel_to_name ((uintptr_t) proc); + if (split_type <= OMPI_COMM_TYPE_HOST) { + /* local ranks should never be represented by sentinel procs. ideally we + * should be able to use OPAL_MODEX_RECV_VALUE_OPTIONAL but it does have + * some overhead. update this to use the optional recv if that is ever fixed. */ + continue; + } + u16ptr = &locality; OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCALITY, &proc_name, &u16ptr, OPAL_UINT16); @@ -751,32 +723,22 @@ static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int } if (include) { - /* copy data in place */ - results[2*my_size] = i; /* copy rank to break ties */ - results[2*my_size+1] = results[2*i+1]; /* copy key */ - ++my_size; + ranks[my_size++] = i; } } *rank_size = my_size; - /* silence a clang warning about a 0-byte malloc. my_size can not be 0 here */ + /* silence a clang warning about a 0-byte malloc. my_size will never be 0 here */ if (OPAL_UNLIKELY(0 == my_size)) { + free (ranks); return OMPI_SUCCESS; } - /* the new array needs to be sorted so that it is in 'key' order - * if two keys are equal then it is sorted in original rank order! */ - qsort (results, my_size, sizeof(int) * 2, rankkeycompare); - - /* put group elements in a list */ - ranks = (int *) malloc ( my_size * sizeof(int)); - if (NULL == ranks) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - for (int i = 0 ; i < my_size ; ++i) { - ranks[i] = results[i*2]; + /* shrink the rank array */ + int *tmp = realloc (ranks, my_size * sizeof (int)); + if (OPAL_LIKELY(NULL != tmp)) { + ranks = tmp; } *ranks_out = ranks; @@ -784,208 +746,220 @@ static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int return OMPI_SUCCESS; } -int -ompi_comm_split_type(ompi_communicator_t *comm, - int split_type, int key, - ompi_info_t *info, - ompi_communicator_t** newcomm) +static int ompi_comm_split_verify (ompi_communicator_t *comm, int split_type, int key, bool *need_split) { - int myinfo[2]; - int size, my_size; - int my_rsize; - int mode; - int rsize; - int inter; - int *results=NULL; - int *rresults=NULL; - int rc=OMPI_SUCCESS; - ompi_communicator_t *newcomp = NULL; - int *lranks=NULL, *rranks=NULL; - - ompi_comm_allgatherfct *allgatherfct=NULL; - - /* silence clang warning. newcomm should never be NULL */ - if (OPAL_UNLIKELY(NULL == newcomm)) { - return OMPI_ERR_BAD_PARAM; - } - - /* Step 1: determine all the information for the local group */ - /* --------------------------------------------------------- */ - - /* sort according to participation and rank. Gather information from everyone */ - /* allowed splitting types: - CLUSTER - CU - HOST - BOARD - NODE - NUMA - SOCKET - L3CACHE - L2CACHE - L1CACHE - CORE - HWTHREAD - Even though HWTHREAD/CORE etc. is overkill they are here for consistency. - They will most likely return a communicator which is equal to MPI_COMM_SELF - Unless oversubscribing. - */ - myinfo[0] = split_type; - myinfo[1] = key; + int rank = ompi_comm_rank (comm); + int size = ompi_comm_size (comm); + int *results; + int rc; - size = ompi_comm_size ( comm ); - inter = OMPI_COMM_IS_INTER(comm); - if ( inter ) { - allgatherfct = (ompi_comm_allgatherfct *)ompi_comm_allgather_emulate_intra; - } else { - allgatherfct = (ompi_comm_allgatherfct *)comm->c_coll.coll_allgather; + if (*need_split) { + return OMPI_SUCCESS; } - results = (int*) malloc ( 2 * size * sizeof(int)); - if ( NULL == results ) { + results = malloc (2 * sizeof (int) * size); + if (OPAL_UNLIKELY(NULL == results)) { return OMPI_ERR_OUT_OF_RESOURCE; } - rc = allgatherfct( myinfo, 2, MPI_INT, results, 2, MPI_INT, comm, comm->c_coll.coll_allgather_module ); - if ( OMPI_SUCCESS != rc ) { - goto exit; + *need_split = false; + + results[rank * 2] = split_type; + results[rank * 2 + 1] = key; + + rc = comm->c_coll.coll_allgather (MPI_IN_PLACE, 2, MPI_INT, results, 2, MPI_INT, comm, + comm->c_coll.coll_allgather_module); + if (OMPI_SUCCESS != rc) { + free (results); + return rc; } - /* check that all processors have been called with the same value */ for (int i = 0 ; i < size ; ++i) { - if ( results[2*i] != split_type && MPI_UNDEFINED != results[2*i] && MPI_UNDEFINED != split_type) { - rc = OMPI_ERR_BAD_PARAM; - goto exit; + if (MPI_UNDEFINED == results[i * 2] || (i > 1 && results[i * 2 + 1] < results[i * 2 - 1])) { + *need_split = true; + break; } } - /* how many are participating and on my node? */ - rc = ompi_comm_split_type_get_part (comm->c_local_group, results, &lranks, &my_size); - if (0 == my_size && MPI_UNDEFINED != split_type) { - /* should never happen */ - rc = OMPI_ERR_BAD_PARAM; + free (results); + + return OMPI_SUCCESS; +} + +int ompi_comm_split_type (ompi_communicator_t *comm, int split_type, int key, + ompi_info_t *info, ompi_communicator_t **newcomm) +{ + bool need_split = false, no_reorder = false, no_undefined = false; + ompi_communicator_t *newcomp = MPI_COMM_NULL; + int my_size, my_rsize = 0, mode, inter; + int *lranks = NULL, *rranks = NULL; + int global_split_type, ok, tmp[4]; + int rc; + + /* silence clang warning. newcomm should never be NULL */ + if (OPAL_UNLIKELY(NULL == newcomm)) { + return OMPI_ERR_BAD_PARAM; } - if (OMPI_SUCCESS != rc) { - goto exit; + + inter = OMPI_COMM_IS_INTER(comm); + + /* Step 1: verify all ranks have supplied the same value for split type. All split types + * must be the same or MPI_UNDEFINED (which is negative). */ + tmp[0] = split_type; + tmp[1] = -split_type; + tmp[2] = key; + tmp[3] = -key; + + rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &tmp, 4, MPI_INT, MPI_MAX, comm, + comm->c_coll.coll_allreduce_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } + global_split_type = tmp[0]; - /* Step 2: determine all the information for the remote group */ - /* --------------------------------------------------------- */ - if ( inter ) { - rsize = ompi_group_size (comm->c_remote_group); - rresults = (int *) malloc ( rsize * 2 * sizeof(int)); - if ( NULL == rresults ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } + if (tmp[0] != -tmp[1] || inter) { + /* at least one rank supplied a different split type check if our split_type is ok */ + ok = (MPI_UNDEFINED == split_type) || global_split_type == split_type; - /* this is an allgather on an inter-communicator */ - rc = comm->c_coll.coll_allgather( myinfo, 2, MPI_INT, rresults, 2, - MPI_INT, comm, + rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm, comm->c_coll.coll_allgather_module); - if ( OMPI_SUCCESS != rc ) { - goto exit; + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } - rc = ompi_comm_split_type_get_part (comm->c_remote_group, rresults, &rranks, &my_rsize); - if (OMPI_SUCCESS != rc) { - goto exit; + if (inter) { + /* need an extra allreduce to ensure that all ranks have the same result */ + rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm, + comm->c_coll.coll_allgather_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } } - mode = OMPI_COMM_CID_INTER; + if (OPAL_UNLIKELY(!ok)) { + return OMPI_ERR_BAD_PARAM; + } + + need_split = tmp[0] == -tmp[1]; } else { - my_rsize = 0; - mode = OMPI_COMM_CID_INTRA; + /* intracommunicator and all ranks specified the same split type */ + no_undefined = true; + /* check if all ranks specified the same key */ + no_reorder = tmp[2] == -tmp[3]; } + if (MPI_UNDEFINED == global_split_type) { + /* short-circut. every rank provided MPI_UNDEFINED */ + *newcomm = MPI_COMM_NULL; + return OMPI_SUCCESS; + } - /* Step 3: set up the communicator */ + /* Step 2: Build potential communicator groups. If any ranks will not be part of + * the ultimate communicator we will drop them later. This saves doing an extra + * allgather on the whole communicator. By using ompi_comm_split() later only + * if needed we 1) optimized the common case (no MPI_UNDEFINED and no reorder), + * and 2) limit the allgather to a smaller set of peers in the uncommon case. */ /* --------------------------------------------------------- */ - /* Create the communicator finally */ - rc = ompi_comm_set ( &newcomp, /* new comm */ - comm, /* old comm */ - my_size, /* local_size */ - lranks, /* local_ranks */ - my_rsize, /* remote_size */ - rranks, /* remote_ranks */ - NULL, /* attrs */ - comm->error_handler,/* error handler */ - false, /* don't copy the topo */ - NULL, /* local group */ - NULL ); /* remote group */ + /* allowed splitting types: + CLUSTER + CU + HOST + BOARD + NODE + NUMA + SOCKET + L3CACHE + L2CACHE + L1CACHE + CORE + HWTHREAD + Even though HWTHREAD/CORE etc. is overkill they are here for consistency. + They will most likely return a communicator which is equal to MPI_COMM_SELF + Unless oversubscribing. + */ - if ( NULL == newcomp ) { - rc = MPI_ERR_INTERN; - goto exit; - } - if ( OMPI_SUCCESS != rc ) { - goto exit; + /* how many ranks are potentially participating and on my node? */ + rc = ompi_comm_split_type_get_part (comm->c_local_group, global_split_type, &lranks, &my_size); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } - /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first, doesn't matter */ - if ( OMPI_SUCCESS != rc ) { - goto exit; + /* Step 3: determine all the information for the remote group */ + /* --------------------------------------------------------- */ + if (inter) { + rc = ompi_comm_split_type_get_part (comm->c_remote_group, global_split_type, &rranks, &my_rsize); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + free (lranks); + return rc; + } } - /* Set name for debugging purposes */ - snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT_TYPE FROM %d", - newcomp->c_contextid, comm->c_contextid ); + /* set the CID allgather mode to the appropriate one for the communicator */ + mode = inter ? OMPI_COMM_CID_INTER : OMPI_COMM_CID_INTRA; - /* set the rank to MPI_UNDEFINED. This prevents in comm_activate - * the collective module selection for a communicator that will - * be freed anyway. - */ - if ( MPI_UNDEFINED == split_type ) { - newcomp->c_local_group->grp_my_rank = MPI_UNDEFINED; - } + /* Step 4: set up the communicator */ + /* --------------------------------------------------------- */ + /* Create the communicator finally */ + do { + rc = ompi_comm_set (&newcomp, comm, my_size, lranks, my_rsize, + rranks, NULL, comm->error_handler, false, + NULL, NULL); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + break; + } - /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + /* Determine context id. It is identical to f_2_c_handle */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + break; + } - exit: - if ( NULL != results ) { - free ( results ); - } - if ( NULL != rresults) { - free ( rresults ); - } - if ( NULL != lranks ) { - free ( lranks ); - } - if ( NULL != rranks ) { - free ( rranks ); - } + /* Activate the communicator and init coll-component */ + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + break; + } - /* Step 4: if we are not part of the comm, free the struct */ - /* --------------------------------------------------------- */ - if ( NULL != newcomp && MPI_UNDEFINED == split_type ) { - ompi_comm_free ( &newcomp ); - } + /* Step 5: Check if we need to remove or reorder ranks in the communicator */ + if (!(no_reorder && no_undefined)) { + rc = ompi_comm_split_verify (newcomp, split_type, key, &need_split); - *newcomm = newcomp; - return ( rc ); -} + if (inter) { + /* verify that no local ranks need to be removed or reordered */ + rc = ompi_comm_split_verify (newcomp->c_local_comm, split_type, key, &need_split); + } + } + + if (!need_split) { + /* common case. no reordering and no MPI_UNDEFINED */ + *newcomm = newcomp; + /* Set name for debugging purposes */ + snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT_TYPE FROM %d", + newcomp->c_contextid, comm->c_contextid ); + break; + } + + /* TODO: there probably is better way to handle this case without throwing away the + * intermediate communicator. */ + rc = ompi_comm_split (newcomp, split_type, key, newcomm, false); + /* get rid of the intermediate communicator */ + ompi_comm_free (&newcomp); + } while (0); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc && MPI_COMM_NULL != newcomp)) { + ompi_comm_free (&newcomp); + *newcomm = MPI_COMM_NULL; + } + + free (lranks); + free (rranks); + + return rc; +} /**********************************************************************/ /**********************************************************************/ @@ -1031,13 +1005,7 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1047,13 +1015,7 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp newcomp->c_contextid, comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1062,11 +1024,15 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp return MPI_SUCCESS; } -struct ompi_comm_idup_with_info_context { +struct ompi_comm_idup_with_info_context_t { + opal_object_t super; ompi_communicator_t *comm; ompi_communicator_t *newcomp; }; +typedef struct ompi_comm_idup_with_info_context_t ompi_comm_idup_with_info_context_t; +OBJ_CLASS_INSTANCE(ompi_comm_idup_with_info_context_t, opal_object_t, NULL, NULL); + static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request); static int ompi_comm_idup_with_info_finish (ompi_comm_request_t *request); static int ompi_comm_idup_getcid (ompi_comm_request_t *request); @@ -1085,7 +1051,7 @@ int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *group, ompi_group_t *remote_group, ompi_info_t *info, ompi_communicator_t **newcomm, ompi_request_t **req) { - struct ompi_comm_idup_with_info_context *context; + ompi_comm_idup_with_info_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq[1]; int rc; @@ -1101,7 +1067,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); + context = OBJ_NEW(ompi_comm_idup_with_info_context_t); if (NULL == context) { ompi_comm_request_return (request); return OMPI_ERR_OUT_OF_RESOURCE; @@ -1109,7 +1075,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro context->comm = comm; - request->context = context; + request->context = &context->super; rc = ompi_comm_set_nb (&context->newcomp, /* new comm */ comm, /* old comm */ @@ -1142,8 +1108,8 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro static int ompi_comm_idup_getcid (ompi_comm_request_t *request) { - struct ompi_comm_idup_with_info_context *context = - (struct ompi_comm_idup_with_info_context *) request->context; + ompi_comm_idup_with_info_context_t *context = + (ompi_comm_idup_with_info_context_t *) request->context; ompi_request_t *subreq[1]; int rc, mode; @@ -1154,11 +1120,8 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request) } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid_nb (context->newcomp, /* new communicator */ - context->comm, /* old comm */ - NULL, /* bridge comm */ - mode, /* mode */ - subreq); /* new subrequest */ + rc = ompi_comm_nextcid_nb (context->newcomp, context->comm, NULL, NULL, + NULL, false, mode, subreq); if (OMPI_SUCCESS != rc) { ompi_comm_request_return (request); return rc; @@ -1171,8 +1134,8 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request) static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) { - struct ompi_comm_idup_with_info_context *context = - (struct ompi_comm_idup_with_info_context *) request->context; + ompi_comm_idup_with_info_context_t *context = + (ompi_comm_idup_with_info_context_t *) request->context; ompi_request_t *subreq[1]; int rc, mode; @@ -1187,7 +1150,7 @@ static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) context->newcomp->c_contextid, context->comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, mode, subreq); + rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, NULL, NULL, false, mode, subreq); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1233,13 +1196,7 @@ int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - newcomp, /* bridge comm (used to pass the group into the group allreduce) */ - &tag, /* user defined tag */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, &tag, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1249,13 +1206,7 @@ int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int newcomp->c_contextid, comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - newcomp, - &tag, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, &tag, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1924,13 +1875,8 @@ int ompi_comm_enable(ompi_communicator_t *old_comm, int ret = OMPI_SUCCESS; /* Determine context id. It is identical to f_2_c_handle */ - ret = ompi_comm_nextcid ( new_comm, /* new communicator */ - old_comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA, /* mode */ - -1 ); /* send first, doesn't matter */ + ret = ompi_comm_nextcid (new_comm, old_comm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTRA); if (OMPI_SUCCESS != ret) { /* something wrong happened while setting the communicator */ goto complete_and_return; @@ -1953,15 +1899,8 @@ int ompi_comm_enable(ompi_communicator_t *old_comm, goto complete_and_return; } - ret = ompi_comm_activate( &new_comm, /* new communicator */ - old_comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA, /* mode */ - -1 ); /* send first, doesn't matter */ - - + ret = ompi_comm_activate (&new_comm, old_comm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTRA); if (OMPI_SUCCESS != ret) { /* something wrong happened while setting the communicator */ goto complete_and_return; diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c old mode 100644 new mode 100755 index f0c332b5dc1..d91a427c93b --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -14,11 +14,11 @@ * Copyright (c) 2007 Voltaire All rights reserved. * Copyright (c) 2006-2010 University of Houston. All rights reserved. * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. - * Copyright (c) 2012-2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2012-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. - * Copyright (c) 2014 Research Organization for Information Science + * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * @@ -44,7 +44,84 @@ #include "ompi/request/request.h" #include "ompi/runtime/mpiruntime.h" -BEGIN_C_DECLS +struct ompi_comm_cid_context_t; + +typedef int (*ompi_comm_allreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, + struct ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); + + +struct ompi_comm_cid_context_t { + opal_object_t super; + + ompi_communicator_t *newcomm; + ompi_communicator_t **newcommp; + ompi_communicator_t *comm; + ompi_communicator_t *bridgecomm; + + ompi_comm_allreduce_impl_fn_t allreduce_fn; + + int nextcid; + int nextlocal_cid; + int start; + int flag, rflag; + int local_leader; + int remote_leader; + int iter; + /** storage for activate barrier */ + int ok; + char *port_string; + bool send_first; + int pml_tag; + char *pmix_tag; +}; + +typedef struct ompi_comm_cid_context_t ompi_comm_cid_context_t; + +static void mca_comm_cid_context_construct (ompi_comm_cid_context_t *context) +{ + memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super)); +} + +static void mca_comm_cid_context_destruct (ompi_comm_cid_context_t *context) +{ + free (context->port_string); + free (context->pmix_tag); +} + +OBJ_CLASS_INSTANCE (ompi_comm_cid_context_t, opal_object_t, + mca_comm_cid_context_construct, + mca_comm_cid_context_destruct); + +struct ompi_comm_allreduce_context_t { + opal_object_t super; + + int *inbuf; + int *outbuf; + int count; + struct ompi_op_t *op; + ompi_comm_cid_context_t *cid_context; + int *tmpbuf; + + /* for group allreduce */ + int peers_comm[3]; +}; + +typedef struct ompi_comm_allreduce_context_t ompi_comm_allreduce_context_t; + +static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t *context) +{ + memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super)); +} + +static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context) +{ + free (context->tmpbuf); +} + +OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t, + ompi_comm_allreduce_context_construct, + ompi_comm_allreduce_context_destruct); /** * These functions make sure, that we determine the global result over @@ -53,90 +130,29 @@ BEGIN_C_DECLS * and a bridge-comm (intercomm-create scenario). */ - -typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_ledaer, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter); - -static int ompi_comm_allreduce_intra_pmix (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter); - /* non-blocking intracommunicator allreduce */ -static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, ompi_request_t **req); /* non-blocking intercommunicator allreduce */ -static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, +static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, ompi_request_t **req); +static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -static int ompi_comm_register_cid (uint32_t contextid); -static int ompi_comm_unregister_cid (uint32_t contextid); -static uint32_t ompi_comm_lowest_cid ( void ); - -struct ompi_comm_reg_t{ - opal_list_item_t super; - uint32_t cid; -}; -typedef struct ompi_comm_reg_t ompi_comm_reg_t; -OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_comm_reg_t); - -static void ompi_comm_reg_constructor(ompi_comm_reg_t *regcom); -static void ompi_comm_reg_destructor(ompi_comm_reg_t *regcom); +static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -OBJ_CLASS_INSTANCE (ompi_comm_reg_t, - opal_list_item_t, - ompi_comm_reg_constructor, - ompi_comm_reg_destructor ); +static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -static opal_mutex_t ompi_cid_lock; -static opal_list_t ompi_registered_comms; +static opal_mutex_t ompi_cid_lock = OPAL_MUTEX_STATIC_INIT; int ompi_comm_cid_init (void) @@ -144,157 +160,80 @@ int ompi_comm_cid_init (void) return OMPI_SUCCESS; } -int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, int send_first ) +static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, const char *pmix_tag, bool send_first, + int mode) { - int ret; - int nextcid; - bool flag; - int nextlocal_cid; - int done=0; - int response, glresponse=0; - int start; - unsigned int i; - int iter=0; - ompi_comm_cid_allredfct* allredfnct; - - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_PMIX: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix; - break; - case OMPI_COMM_CID_GROUP: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; - break; - default: - return MPI_UNDEFINED; - break; - } + ompi_comm_cid_context_t *context; - ret = ompi_comm_register_cid (comm->c_contextid); - if (OMPI_SUCCESS != ret) { - return ret; + context = OBJ_NEW(ompi_comm_cid_context_t); + if (OPAL_UNLIKELY(NULL == context)) { + return NULL; } - start = ompi_mpi_communicators.lowest_free; - - while (!done) { - /** - * This is the real algorithm described in the doc - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - if (comm->c_contextid != ompi_comm_lowest_cid() ) { - /* if not lowest cid, we do not continue, but sleep and try again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - continue; - } - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - nextlocal_cid = mca_pml.pml_max_contextid; - flag = false; - for (i=start; i < mca_pml.pml_max_contextid ; i++) { - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - i, comm); - if (true == flag) { - nextlocal_cid = i; - break; - } - } - - ret = (allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm, - local_leader, remote_leader, send_first, "nextcid", iter ); - ++iter; - if( OMPI_SUCCESS != ret ) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - goto release_and_return; - } - - if (mca_pml.pml_max_contextid == (unsigned int) nextcid) { - /* at least one peer ran out of CIDs */ - if (flag) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto release_and_return; - } + context->newcomm = newcomm; + context->comm = comm; + context->bridgecomm = bridgecomm; + context->pml_tag = 0; + + /* Determine which implementation of allreduce we have to use + * for the current mode. */ + switch (mode) { + case OMPI_COMM_CID_INTRA: + context->allreduce_fn = ompi_comm_allreduce_intra_nb; + break; + case OMPI_COMM_CID_INTER: + context->allreduce_fn = ompi_comm_allreduce_inter_nb; + break; + case OMPI_COMM_CID_GROUP: + context->allreduce_fn = ompi_comm_allreduce_group_nb; + context->pml_tag = ((int *) arg0)[0]; + break; + case OMPI_COMM_CID_INTRA_PMIX: + context->allreduce_fn = ompi_comm_allreduce_intra_pmix_nb; + context->local_leader = ((int *) arg0)[0]; + if (arg1) { + context->port_string = strdup ((char *) arg1); } + context->pmix_tag = strdup ((char *) pmix_tag); + break; + case OMPI_COMM_CID_INTRA_BRIDGE: + context->allreduce_fn = ompi_comm_allreduce_intra_bridge_nb; + context->local_leader = ((int *) arg0)[0]; + context->remote_leader = ((int *) arg1)[0]; + break; + default: + OBJ_RELEASE(context); + return NULL; + } + + context->send_first = send_first; + context->iter = 0; + + return context; +} - if (nextcid == nextlocal_cid) { - response = 1; /* fine with me */ - } - else { - opal_pointer_array_set_item(&ompi_mpi_communicators, - nextlocal_cid, NULL); - - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - nextcid, comm ); - if (true == flag) { - response = 1; /* works as well */ - } - else { - response = 0; /* nope, not acceptable */ - } - } +static ompi_comm_allreduce_context_t *ompi_comm_allreduce_context_alloc (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context) +{ + ompi_comm_allreduce_context_t *context; - ret = (allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader, send_first, "nextcid", iter ); - ++iter; - if( OMPI_SUCCESS != ret ) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextcid, NULL); - goto release_and_return; - } - if (1 == glresponse) { - done = 1; /* we are done */ - break; - } - else if ( 0 == glresponse ) { - if ( 1 == response ) { - /* we could use that, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, - nextcid, NULL); - } - start = nextcid+1; /* that's where we can start the next round */ - } + context = OBJ_NEW(ompi_comm_allreduce_context_t); + if (OPAL_UNLIKELY(NULL == context)) { + return NULL; } - /* set the according values to the newcomm */ - newcomm->c_contextid = nextcid; - opal_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm); - - release_and_return: - ompi_comm_unregister_cid (comm->c_contextid); + context->inbuf = inbuf; + context->outbuf = outbuf; + context->count = count; + context->op = op; + context->cid_context = cid_context; - return ret; + return context; } -/* Non-blocking version of ompi_comm_nextcid */ -struct mca_comm_nextcid_context { - ompi_communicator_t* newcomm; - ompi_communicator_t* comm; - ompi_communicator_t* bridgecomm; - int mode; - int nextcid; - int nextlocal_cid; - int start; - int flag, rflag; -}; - /* find the next available local cid and start an allreduce */ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request); /* verify that the maximum cid is locally available and start an allreduce */ @@ -302,98 +241,97 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request); /* verify that the cid was available globally */ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request); -int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - int mode, ompi_request_t **req) +static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX; + +int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode, ompi_request_t **req) { - struct mca_comm_nextcid_context *context; + ompi_comm_cid_context_t *context; ompi_comm_request_t *request; - int ret; - - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { - return MPI_UNDEFINED; - } - - ret = ompi_comm_register_cid (comm->c_contextid); - if (OMPI_SUCCESS != ret) { - return ret; - } - context = calloc (1, sizeof (*context)); + context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1, + "nextcid", send_first, mode); if (NULL == context) { - ompi_comm_unregister_cid (comm->c_contextid); return OMPI_ERR_OUT_OF_RESOURCE; } + context->start = ompi_mpi_communicators.lowest_free; + request = ompi_comm_request_get (); if (NULL == request) { - ompi_comm_unregister_cid (comm->c_contextid); - free (context); + OBJ_RELEASE(context); return OMPI_ERR_OUT_OF_RESOURCE; } - context->newcomm = newcomm; - context->comm = comm; - context->bridgecomm = bridgecomm; - context->mode = mode; - context->start = ompi_mpi_communicators.lowest_free; - - request->context = context; + request->context = &context->super; ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); ompi_comm_request_start (request); *req = &request->super; + return OMPI_SUCCESS; } +int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_nextcid_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + ompi_request_wait_completion (req); + rc = req->req_status.MPI_ERROR; + ompi_comm_request_return ((ompi_comm_request_t *) req); + + return rc; +} + static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; + int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag); ompi_request_t *subreq; - unsigned int i; bool flag; int ret; - /** - * This is the real algorithm described in the doc - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - if (context->comm->c_contextid != ompi_comm_lowest_cid() ) { - /* if not lowest cid, we do not continue, but sleep and try again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } - return OMPI_SUCCESS; + if (ompi_comm_cid_lowest_id < my_id) { + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); } - OPAL_THREAD_UNLOCK(&ompi_cid_lock); + ompi_comm_cid_lowest_id = my_id; + + /** + * This is the real algorithm described in the doc + */ flag = false; context->nextlocal_cid = mca_pml.pml_max_contextid; - for (i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - i, context->comm); + for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { + flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i, + context->comm); if (true == flag) { context->nextlocal_cid = i; break; } } - if (context->mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, - context->comm, context->bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, - context->comm, context->bridgecomm, &subreq); - } - + ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, + context, &subreq); if (OMPI_SUCCESS != ret) { + ompi_comm_cid_lowest_id = INT64_MAX; + OPAL_THREAD_UNLOCK(&ompi_cid_lock); return ret; } @@ -402,58 +340,64 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) if (flag) { opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); } - + + ompi_comm_cid_lowest_id = INT64_MAX; + OPAL_THREAD_UNLOCK(&ompi_cid_lock); return OMPI_ERR_OUT_OF_RESOURCE; } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* next we want to verify that the resulting commid is ok */ - ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); - - return OMPI_SUCCESS; + return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); } static int ompi_comm_checkcid (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; ompi_request_t *subreq; int ret; + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0); + } + context->flag = (context->nextcid == context->nextlocal_cid); if (!context->flag) { opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); - context->flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - context->nextcid, context->comm); + context->flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, + context->nextcid, context->comm); } - if (context->mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, - context->bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, - context->bridgecomm, &subreq); - } + ++context->iter; - if (OMPI_SUCCESS != ret) { - return ret; + ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MIN, context, &subreq); + if (OMPI_SUCCESS == ret) { + ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); } - ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); + OPAL_THREAD_UNLOCK(&ompi_cid_lock); - return OMPI_SUCCESS; + return ret; } static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; + + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0); + } if (1 == context->rflag) { /* set the according values to the newcomm */ context->newcomm->c_contextid = context->nextcid; opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm); - ompi_comm_unregister_cid (context->comm->c_contextid); + /* unlock the cid generator */ + ompi_comm_cid_lowest_id = INT64_MAX; + OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* done! */ return OMPI_SUCCESS; @@ -461,118 +405,18 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) if (1 == context->flag) { /* we could use this cid, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextcid, NULL); + opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL); context->start = context->nextcid + 1; /* that's where we can start the next round */ } - /* try again */ - return ompi_comm_allreduce_getnextcid (request); -} - -/**************************************************************************/ -/**************************************************************************/ -/**************************************************************************/ -static void ompi_comm_reg_constructor (ompi_comm_reg_t *regcom) -{ - regcom->cid=MPI_UNDEFINED; -} - -static void ompi_comm_reg_destructor (ompi_comm_reg_t *regcom) -{ -} - -void ompi_comm_reg_init (void) -{ - OBJ_CONSTRUCT(&ompi_registered_comms, opal_list_t); - OBJ_CONSTRUCT(&ompi_cid_lock, opal_mutex_t); -} - -void ompi_comm_reg_finalize (void) -{ - OBJ_DESTRUCT(&ompi_registered_comms); - OBJ_DESTRUCT(&ompi_cid_lock); -} - - -static int ompi_comm_register_cid (uint32_t cid) -{ - ompi_comm_reg_t *regcom; - ompi_comm_reg_t *newentry = OBJ_NEW(ompi_comm_reg_t); - bool registered = false; - - do { - /* Only one communicator function allowed in same time on the - * same communicator. - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - - newentry->cid = cid; - if ( !(opal_list_is_empty (&ompi_registered_comms)) ) { - bool ok = true; - - OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { - if ( regcom->cid > cid ) { - break; - } -#if OMPI_ENABLE_THREAD_MULTIPLE - if( regcom->cid == cid ) { - /** - * The MPI standard state that is the user responsability to - * schedule the global communications in order to avoid any - * kind of troubles. As, managing communicators involve several - * collective communications, we should enforce a sequential - * execution order. This test only allow one communicator - * creation function based on the same communicator. - */ - ok = false; - break; - } -#endif /* OMPI_ENABLE_THREAD_MULTIPLE */ - } - if (ok) { - opal_list_insert_pos (&ompi_registered_comms, (opal_list_item_t *) regcom, - (opal_list_item_t *)newentry); - registered = true; - } - } else { - opal_list_append (&ompi_registered_comms, (opal_list_item_t *)newentry); - registered = true; - } - - /* drop the lock before trying again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - } while (!registered); - - return OMPI_SUCCESS; -} - -static int ompi_comm_unregister_cid (uint32_t cid) -{ - ompi_comm_reg_t *regcom; - - OPAL_THREAD_LOCK(&ompi_cid_lock); - - OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { - if(regcom->cid == cid) { - opal_list_remove_item(&ompi_registered_comms, (opal_list_item_t *) regcom); - OBJ_RELEASE(regcom); - break; - } - } + ++context->iter; OPAL_THREAD_UNLOCK(&ompi_cid_lock); - return OMPI_SUCCESS; + /* try again */ + return ompi_comm_allreduce_getnextcid (request); } -static uint32_t ompi_comm_lowest_cid (void) -{ - ompi_comm_reg_t *regcom=NULL; - opal_list_item_t *item=opal_list_get_first (&ompi_registered_comms); - - regcom = (ompi_comm_reg_t *)item; - return regcom->cid; -} /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ @@ -590,174 +434,41 @@ static uint32_t ompi_comm_lowest_cid (void) * comm.c is, that this file contains the allreduce implementations * which are required, and thus we avoid having duplicate code... */ -int ompi_comm_activate ( ompi_communicator_t** newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first ) -{ - int ret = 0; - - int ok=0, gok=0; - ompi_comm_cid_allredfct* allredfnct; - - /* Step 1: the barrier, after which it is allowed to - * send messages over the new communicator - */ - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_PMIX: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix; - break; - case OMPI_COMM_CID_GROUP: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; - break; - default: - return MPI_UNDEFINED; - break; - } - - if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { - /* Initialize the PML stuff in the newcomm */ - if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { - goto bail_on_error; - } - - OMPI_COMM_SET_PML_ADDED(*newcomm); - } - - - ret = (allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader, send_first, "activate", 0 ); - if( OMPI_SUCCESS != ret ) { - goto bail_on_error; - } - - - - /** - * Check to see if this process is in the new communicator. - * - * Specifically, this function is invoked by all proceses in the - * old communicator, regardless of whether they are in the new - * communicator or not. This is because it is far simpler to use - * MPI collective functions on the old communicator to determine - * some data for the new communicator (e.g., remote_leader) than - * to kludge up our own pseudo-collective routines over just the - * processes in the new communicator. Hence, *all* processes in - * the old communicator need to invoke this function. - * - * That being said, only processes in the new communicator need to - * select a coll module for the new communicator. More - * specifically, proceses who are not in the new communicator - * should *not* select a coll module -- for example, - * ompi_comm_rank(newcomm) returns MPI_UNDEFINED for processes who - * are not in the new communicator. This can cause errors in the - * selection / initialization of a coll module. Plus, it's - * wasteful -- processes in the new communicator will end up - * freeing the new communicator anyway, so we might as well leave - * the coll selection as NULL (the coll base comm unselect code - * handles that case properly). - */ - if (MPI_UNDEFINED == (*newcomm)->c_local_group->grp_my_rank) { - return OMPI_SUCCESS; - } - - /* Let the collectives components fight over who will do - collective on this new comm. */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*newcomm))) { - goto bail_on_error; - } - - /* For an inter communicator, we have to deal with the potential - * problem of what is happening if the local_comm that we created - * has a lower CID than the parent comm. This is not a problem - * as long as the user calls MPI_Comm_free on the inter communicator. - * However, if the communicators are not freed by the user but released - * by Open MPI in MPI_Finalize, we walk through the list of still available - * communicators and free them one by one. Thus, local_comm is freed before - * the actual inter-communicator. However, the local_comm pointer in the - * inter communicator will still contain the 'previous' address of the local_comm - * and thus this will lead to a segmentation violation. In order to prevent - * that from happening, we increase the reference counter local_comm - * by one if its CID is lower than the parent. We cannot increase however - * its reference counter if the CID of local_comm is larger than - * the CID of the inter communicators, since a regular MPI_Comm_free would - * leave in that the case the local_comm hanging around and thus we would not - * recycle CID's properly, which was the reason and the cause for this trouble. - */ - if ( OMPI_COMM_IS_INTER(*newcomm)) { - if ( OMPI_COMM_CID_IS_LOWER(*newcomm, comm)) { - OMPI_COMM_SET_EXTRA_RETAIN (*newcomm); - OBJ_RETAIN (*newcomm); - } - } - - - return OMPI_SUCCESS; - - bail_on_error: - OBJ_RELEASE(*newcomm); - *newcomm = MPI_COMM_NULL; - return ret; -} /* Non-blocking version of ompi_comm_activate */ -struct ompi_comm_activate_nb_context { - ompi_communicator_t **newcomm; - ompi_communicator_t *comm; - - /* storage for activate barrier */ - int ok; -}; - static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request); -int ompi_comm_activate_nb (ompi_communicator_t **newcomm, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - int mode, ompi_request_t **req) +int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode, ompi_request_t **req) { - struct ompi_comm_activate_nb_context *context; + ompi_comm_cid_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; int ret = 0; - request = ompi_comm_request_get (); - if (NULL == request) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - context = calloc (1, sizeof (*context)); + context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate", + send_first, mode); if (NULL == context) { - ompi_comm_request_return (request); return OMPI_ERR_OUT_OF_RESOURCE; } - context->newcomm = newcomm; - context->comm = comm; + /* keep track of the pointer so it can be set to MPI_COMM_NULL on failure */ + context->newcommp = newcomm; - request->context = context; - - if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { - return MPI_UNDEFINED; + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } + request->context = &context->super; + if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { /* Initialize the PML stuff in the newcomm */ if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { OBJ_RELEASE(newcomm); + OBJ_RELEASE(context); *newcomm = MPI_COMM_NULL; return ret; } @@ -767,14 +478,8 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, /* Step 1: the barrier, after which it is allowed to * send messages over the new communicator */ - if (mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->ok, &context->ok, 1, MPI_MIN, - context->comm, bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->ok, &context->ok, 1, MPI_MIN, - context->comm, bridgecomm, &subreq); - } - + ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MIN, context, + &subreq); if (OMPI_SUCCESS != ret) { ompi_comm_request_return (request); return ret; @@ -788,10 +493,28 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, return OMPI_SUCCESS; } +int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_activate_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + ompi_request_wait_completion (req); + rc = req->req_status.MPI_ERROR; + ompi_comm_request_return ((ompi_comm_request_t *) req); + + return rc; +} + static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) { - struct ompi_comm_activate_nb_context *context = - (struct ompi_comm_activate_nb_context *) request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; int ret; /** @@ -818,15 +541,15 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) * the coll selection as NULL (the coll base comm unselect code * handles that case properly). */ - if (MPI_UNDEFINED == (*context->newcomm)->c_local_group->grp_my_rank) { + if (MPI_UNDEFINED == (context->newcomm)->c_local_group->grp_my_rank) { return OMPI_SUCCESS; } /* Let the collectives components fight over who will do collective on this new comm. */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*context->newcomm))) { - OBJ_RELEASE(*context->newcomm); - *context->newcomm = MPI_COMM_NULL; + if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(context->newcomm))) { + OBJ_RELEASE(context->newcomm); + *context->newcommp = MPI_COMM_NULL; return ret; } @@ -847,10 +570,10 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) * leave in that the case the local_comm hanging around and thus we would not * recycle CID's properly, which was the reason and the cause for this trouble. */ - if (OMPI_COMM_IS_INTER(*context->newcomm)) { - if (OMPI_COMM_CID_IS_LOWER(*context->newcomm, context->comm)) { - OMPI_COMM_SET_EXTRA_RETAIN (*context->newcomm); - OBJ_RETAIN (*context->newcomm); + if (OMPI_COMM_IS_INTER(context->newcomm)) { + if (OMPI_COMM_CID_IS_LOWER(context->newcomm, context->comm)) { + OMPI_COMM_SET_EXTRA_RETAIN (context->newcomm); + OBJ_RETAIN (context->newcomm); } } @@ -861,551 +584,472 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ -/* Arguments not used in this implementation: - * - bridgecomm - * - local_leader - * - remote_leader - * - send_first - */ -static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *context, ompi_request_t **req) { - return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op, comm, - comm->c_coll.coll_allreduce_module ); -} + ompi_communicator_t *comm = context->comm; -static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - ompi_request_t **req) -{ return comm->c_coll.coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm, req, comm->c_coll.coll_iallreduce_module); } - -/* Arguments not used in this implementation: - * - bridgecomm - * - local_leader - * - remote_leader - * - send_first - */ -static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ) -{ - int local_rank, rsize; - int rc; - int *sbuf; - int *tmpbuf=NULL; - int *rcounts=NULL, scount=0; - int *rdisps=NULL; - - if ( !OMPI_COMM_IS_INTER (intercomm)) { - return MPI_ERR_COMM; - } - - /* Allocate temporary arrays */ - rsize = ompi_comm_remote_size (intercomm); - local_rank = ompi_comm_rank ( intercomm ); - - tmpbuf = (int *) malloc ( count * sizeof(int)); - rdisps = (int *) calloc ( rsize, sizeof(int)); - rcounts = (int *) calloc ( rsize, sizeof(int) ); - if ( OPAL_UNLIKELY (NULL == tmpbuf || NULL == rdisps || NULL == rcounts)) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - /* Execute the inter-allreduce: the result of our group will - be in the buffer of the remote group */ - rc = intercomm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, - op, intercomm, - intercomm->c_coll.coll_allreduce_module); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - if ( 0 == local_rank ) { - MPI_Request req; - - /* for the allgatherv later */ - scount = count; - - /* local leader exchange their data and determine the overall result - for both groups */ - rc = MCA_PML_CALL(irecv (outbuf, count, MPI_INT, 0, - OMPI_COMM_ALLREDUCE_TAG, - intercomm, &req)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, 0, - OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, - intercomm)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = ompi_request_wait ( &req, MPI_STATUS_IGNORE ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); - } - - /* distribute the overall result to all processes in the other group. - Instead of using bcast, we are using here allgatherv, to avoid the - possible deadlock. Else, we need an algorithm to determine, - which group sends first in the inter-bcast and which receives - the result first. - */ - rcounts[0] = count; - sbuf = outbuf; - rc = intercomm->c_coll.coll_allgatherv (sbuf, scount, MPI_INT, outbuf, - rcounts, rdisps, MPI_INT, - intercomm, - intercomm->c_coll.coll_allgatherv_module); - - exit: - if ( NULL != tmpbuf ) { - free ( tmpbuf ); - } - if ( NULL != rcounts ) { - free ( rcounts ); - } - if ( NULL != rdisps ) { - free ( rdisps ); - } - - return (rc); -} - /* Non-blocking version of ompi_comm_allreduce_inter */ -struct ompi_comm_allreduce_inter_context { - int *inbuf; - int *outbuf; - int count; - struct ompi_op_t *op; - ompi_communicator_t *intercomm; - ompi_communicator_t *bridgecomm; - int *tmpbuf; - int *rcounts; - int *rdisps; -}; - -static void ompi_comm_allreduce_inter_context_free (struct ompi_comm_allreduce_inter_context *context) -{ - if (context->tmpbuf) { - free (context->tmpbuf); - } - - if (context->rdisps) { - free (context->rdisps); - } - - if (context->rcounts) { - free (context->rcounts); - } - - free (context); -} - static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request); -static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request); -static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request); +static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request); -/* Arguments not used in this implementation: - * - bridgecomm - */ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, + ompi_comm_cid_context_t *cid_context, ompi_request_t **req) { - struct ompi_comm_allreduce_inter_context *context = NULL; - ompi_comm_request_t *request = NULL; + ompi_communicator_t *intercomm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + ompi_comm_request_t *request; ompi_request_t *subreq; - int local_rank, rsize, rc; + int local_rank, rc; - if (!OMPI_COMM_IS_INTER (intercomm)) { + if (!OMPI_COMM_IS_INTER (cid_context->comm)) { return MPI_ERR_COMM; } request = ompi_comm_request_get (); - if (NULL == request) { + if (OPAL_UNLIKELY(NULL == request)) { return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); - if (NULL == context) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; } - context->inbuf = inbuf; - context->outbuf = outbuf; - context->count = count; - context->op = op; - context->intercomm = intercomm; - context->bridgecomm = bridgecomm; + request->context = &context->super; /* Allocate temporary arrays */ - rsize = ompi_comm_remote_size (intercomm); local_rank = ompi_comm_rank (intercomm); - context->tmpbuf = (int *) calloc (count, sizeof(int)); - context->rdisps = (int *) calloc (rsize, sizeof(int)); - context->rcounts = (int *) calloc (rsize, sizeof(int)); - if (OPAL_UNLIKELY (NULL == context->tmpbuf || NULL == context->rdisps || NULL == context->rcounts)) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + if (0 == local_rank) { + context->tmpbuf = (int *) calloc (count, sizeof(int)); + if (OPAL_UNLIKELY (NULL == context->tmpbuf)) { + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; + } } - request->context = context; - /* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group * and vise-versa. */ - rc = intercomm->c_coll.coll_iallreduce (inbuf, context->tmpbuf, count, MPI_INT, op, intercomm, - &subreq, intercomm->c_coll.coll_iallreduce_module); - if (OMPI_SUCCESS != rc) { - goto exit; + rc = intercomm->c_local_comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, 0, + intercomm->c_local_comm, &subreq, + intercomm->c_local_comm->c_coll.coll_ireduce_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + ompi_comm_request_return (request); + return rc; } if (0 == local_rank) { ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_exchange, &subreq, 1); } else { - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather, &subreq, 1); + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_bcast, &subreq, 1); } ompi_comm_request_start (request); *req = &request->super; -exit: - if (OMPI_SUCCESS != rc) { - if (context) { - ompi_comm_allreduce_inter_context_free (context); - } - - if (request) { - request->context = NULL; - ompi_comm_request_return (request); - } - } - - return rc; + return OMPI_SUCCESS; } static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *intercomm = context->cid_context->comm; ompi_request_t *subreqs[2]; int rc; /* local leader exchange their data and determine the overall result for both groups */ rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, - context->intercomm, subreqs)); - if ( OMPI_SUCCESS != rc ) { - goto exit; + intercomm, subreqs)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, context->intercomm, subreqs + 1)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); - -exit: - if (OMPI_SUCCESS != rc) { - ompi_comm_allreduce_inter_context_free (context); - request->context = NULL; + MCA_PML_BASE_SEND_STANDARD, intercomm, subreqs + 1)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } - return rc; + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); } static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); - return ompi_comm_allreduce_inter_allgather (request); + return ompi_comm_allreduce_inter_bcast (request); } -static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) +static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *comm = context->cid_context->comm->c_local_comm; ompi_request_t *subreq; - int scount = 0, rc; - - /* distribute the overall result to all processes in the other group. - Instead of using bcast, we are using here allgatherv, to avoid the - possible deadlock. Else, we need an algorithm to determine, - which group sends first in the inter-bcast and which receives - the result first. - */ + int rc; - if (0 != ompi_comm_rank (context->intercomm)) { - context->rcounts[0] = context->count; - } else { - scount = context->count; + /* both roots have the same result. broadcast to the local group */ + rc = comm->c_coll.coll_ibcast (context->outbuf, context->count, MPI_INT, 0, comm, + &subreq, comm->c_coll.coll_ibcast_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } - rc = context->intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, - context->rcounts, context->rdisps, MPI_INT, - context->intercomm, &subreq, - context->intercomm->c_coll.coll_iallgatherv_module); - if (OMPI_SUCCESS != rc) { - ompi_comm_allreduce_inter_context_free (context); - request->context = NULL; + return ompi_comm_request_schedule_append (request, NULL, &subreq, 1); +} + +static int ompi_comm_allreduce_bridged_schedule_bcast (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *comm = context->cid_context->comm; + ompi_request_t *subreq; + int rc; + + rc = comm->c_coll.coll_ibcast (context->outbuf, context->count, MPI_INT, + context->cid_context->local_leader, comm, + &subreq, comm->c_coll.coll_ibcast_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { return rc; } - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather_complete, &subreq, 1); + return ompi_comm_request_schedule_append (request, NULL, &subreq, 1); +} - return OMPI_SUCCESS; +static int ompi_comm_allreduce_bridged_xchng_complete (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + + /* step 3: reduce leader data */ + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); + + /* schedule the broadcast to local peers */ + return ompi_comm_allreduce_bridged_schedule_bcast (request); } -static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request) +static int ompi_comm_allreduce_bridged_reduce_complete (ompi_comm_request_t *request) { - /* free this request's context */ - ompi_comm_allreduce_inter_context_free (request->context); - /* prevent a double-free from the progress engine */ - request->context = NULL; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *bridgecomm = context->cid_context->bridgecomm; + ompi_request_t *subreq[2]; + int rc; - /* done */ - return OMPI_SUCCESS; + /* step 2: leader exchange */ + rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, context->cid_context->remote_leader, + OMPI_COMM_ALLREDUCE_TAG, bridgecomm, subreq + 1)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, context->cid_context->remote_leader, + OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bridgecomm, + subreq)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_xchng_complete, subreq, 2); } -/* Arguments not used in this implementation: - * - send_first - */ -static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bcomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) { - int *tmpbuf=NULL; - int local_rank; - int i; + ompi_communicator_t *comm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + int local_rank = ompi_comm_rank (comm); + ompi_comm_request_t *request; + ompi_request_t *subreq; int rc; - int local_leader, remote_leader; - local_leader = (*((int*)lleader)); - remote_leader = (*((int*)rleader)); + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + if (local_rank == cid_context->local_leader) { + context->tmpbuf = (int *) calloc (count, sizeof (int)); + if (OPAL_UNLIKELY(NULL == context->tmpbuf)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + } - if ( &ompi_mpi_op_sum.op != op && &ompi_mpi_op_prod.op != op && - &ompi_mpi_op_max.op != op && &ompi_mpi_op_min.op != op ) { - return MPI_ERR_OP; + request = ompi_comm_request_get (); + if (OPAL_UNLIKELY(NULL == request)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } - local_rank = ompi_comm_rank ( comm ); - tmpbuf = (int *) malloc ( count * sizeof(int)); - if ( NULL == tmpbuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + request->context = &context->super; + + if (cid_context->local_leader == local_rank) { + memcpy (context->tmpbuf, inbuf, count * sizeof (int)); } - /* Intercomm_create */ - rc = comm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, - op, comm, comm->c_coll.coll_allreduce_module ); + /* step 1: reduce to the local leader */ + rc = comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, + cid_context->local_leader, comm, &subreq, + comm->c_coll.coll_ireduce_module); if ( OMPI_SUCCESS != rc ) { - goto exit; + ompi_comm_request_return (request); + return rc; } - if (local_rank == local_leader ) { - MPI_Request req; + if (cid_context->local_leader == local_rank) { + rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_reduce_complete, + &subreq, 1); + } else { + /* go ahead and schedule the broadcast */ + ompi_comm_request_schedule_append (request, NULL, &subreq, 1); - rc = MCA_PML_CALL(irecv ( outbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, - bcomm, &req)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, bcomm)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = ompi_request_wait( &req, MPI_STATUS_IGNORE); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + rc = ompi_comm_allreduce_bridged_schedule_bcast (request); + } - if ( &ompi_mpi_op_max.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] > outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_min.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] < outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_sum.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] += tmpbuf[i]; - } - } - else if ( &ompi_mpi_op_prod.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] *= tmpbuf[i]; - } - } + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; } - rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader, - comm, comm->c_coll.coll_bcast_module ); + ompi_comm_request_start (request); - exit: - if (NULL != tmpbuf ) { - free (tmpbuf); - } + *req = &request->super; - return (rc); + return OMPI_SUCCESS; } -/* Arguments not used in this implementation: - * - bridgecomm - * - * lleader is the local rank of root in comm - * rleader is the port_string - */ -static int ompi_comm_allreduce_intra_pmix (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *request) { - int *tmpbuf=NULL; - int rc; - int local_leader, local_rank; - char *port_string; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + int32_t size_count = context->count; opal_value_t info; opal_pmix_pdata_t pdat; opal_buffer_t sbuf; - int32_t size_count; + int rc; + + OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + + if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) { + OBJ_DESTRUCT(&sbuf); + fprintf (stderr, "pack failed. rc %d\n", rc); + return rc; + } + + OBJ_CONSTRUCT(&info, opal_value_t); + OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t); + + info.type = OPAL_BYTE_OBJECT; + pdat.value.type = OPAL_BYTE_OBJECT; + + opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size); + OBJ_DESTRUCT(&sbuf); + + if (cid_context->send_first) { + (void)asprintf(&info.key, "%s:%s:send:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + (void)asprintf(&pdat.value.key, "%s:%s:recv:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + } else { + (void)asprintf(&info.key, "%s:%s:recv:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + (void)asprintf(&pdat.value.key, "%s:%s:send:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + } + + /* this macro is not actually non-blocking. if a non-blocking version becomes available this function + * needs to be reworked to take advantage of it. */ + OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); + OBJ_DESTRUCT(&info); + if (OPAL_SUCCESS != rc) { + OBJ_DESTRUCT(&pdat); + return rc; + } + + OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size); + pdat.value.data.bo.bytes = NULL; + pdat.value.data.bo.size = 0; + OBJ_DESTRUCT(&pdat); + + rc = opal_dss.unpack (&sbuf, context->outbuf, &size_count, OPAL_INT); + OBJ_DESTRUCT(&sbuf); + if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { + return rc; + } + + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, size_count, MPI_INT); + + return ompi_comm_allreduce_bridged_schedule_bcast (request); +} + +static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) +{ + ompi_communicator_t *comm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + int local_rank = ompi_comm_rank (comm); + ompi_comm_request_t *request; + ompi_request_t *subreq; + int rc; + + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } - local_leader = (*((int*)lleader)); - port_string = (char*)rleader; - size_count = count; + if (cid_context->local_leader == local_rank) { + context->tmpbuf = (int *) calloc (count, sizeof(int)); + if (OPAL_UNLIKELY(NULL == context->tmpbuf)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + } - local_rank = ompi_comm_rank ( comm ); - tmpbuf = (int *) malloc ( count * sizeof(int)); - if ( NULL == tmpbuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } + request->context = &context->super; + /* comm is an intra-communicator */ - rc = comm->c_coll.coll_allreduce(inbuf,tmpbuf,count,MPI_INT,op, comm, - comm->c_coll.coll_allreduce_module); + rc = comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, + cid_context->local_leader, comm, + &subreq, comm->c_coll.coll_ireduce_module); if ( OMPI_SUCCESS != rc ) { - goto exit; + ompi_comm_request_return (request); + return rc; } - if (local_rank == local_leader ) { - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + if (cid_context->local_leader == local_rank) { + rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_pmix_reduce_complete, + &subreq, 1); + } else { + /* go ahead and schedule the broadcast */ + rc = ompi_comm_request_schedule_append (request, NULL, &subreq, 1); - if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, tmpbuf, (int32_t)count, OPAL_INT))) { - goto exit; - } - OBJ_CONSTRUCT(&info, opal_value_t); - OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t); + rc = ompi_comm_allreduce_bridged_schedule_bcast (request); + } - info.type = OPAL_BYTE_OBJECT; - pdat.value.type = OPAL_BYTE_OBJECT; + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; + } - opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size); - OBJ_DESTRUCT(&sbuf); + ompi_comm_request_start (request); + *req = (ompi_request_t *) request; - if (send_first) { - (void)asprintf(&info.key, "%s:%s:send:%d", port_string, tag, iter); - (void)asprintf(&pdat.value.key, "%s:%s:recv:%d", port_string, tag, iter); - } else { - (void)asprintf(&info.key, "%s:%s:recv:%d", port_string, tag, iter); - (void)asprintf(&pdat.value.key, "%s:%s:send:%d", port_string, tag, iter); - } + /* use the same function as bridged to schedule the broadcast */ + return OMPI_SUCCESS; +} - OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); - OBJ_DESTRUCT(&info); +static int ompi_comm_allreduce_group_broadcast (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + ompi_request_t *subreq[2]; + int subreq_count = 0; + int rc; - if (OPAL_SUCCESS != rc) { - OBJ_DESTRUCT(&pdat); - goto exit; + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[i+1], + cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD, + cid_context->comm, subreq + subreq_count++)); + if (OMPI_SUCCESS != rc) { + return rc; + } } - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); - opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size); - pdat.value.data.bo.bytes = NULL; - pdat.value.data.bo.size = 0; - OBJ_DESTRUCT(&pdat); + } + + return ompi_comm_request_schedule_append (request, NULL, subreq, subreq_count); +} - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&sbuf, outbuf, &size_count, OPAL_INT))) { - OBJ_DESTRUCT(&sbuf); - goto exit; +static int ompi_comm_allreduce_group_recv_complete (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + int *tmp = context->tmpbuf; + ompi_request_t *subreq[2]; + int rc; + + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + ompi_op_reduce (context->op, tmp, context->outbuf, context->count, MPI_INT); + tmp += context->count; } - OBJ_DESTRUCT(&sbuf); - count = (int)size_count; - ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); } - rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT, - local_leader, comm, - comm->c_coll.coll_bcast_module); + if (MPI_PROC_NULL != context->peers_comm[0]) { + /* interior node */ + rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[0], + cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD, + cid_context->comm, subreq)); + if (OMPI_SUCCESS != rc) { + return rc; + } + + rc = MCA_PML_CALL(irecv(context->outbuf, context->count, MPI_INT, context->peers_comm[0], + cid_context->pml_tag, cid_context->comm, subreq + 1)); + if (OMPI_SUCCESS != rc) { + return rc; + } - exit: - if (NULL != tmpbuf ) { - free (tmpbuf); + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_broadcast, subreq, 2); } - return (rc); + /* root */ + return ompi_comm_allreduce_group_broadcast (request); } -static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *newcomm, - void* local_leader, - void* remote_leader, - int send_first, char *intag, int iter) +static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) { - ompi_group_t *group = newcomm->c_local_group; - int peers_group[3], peers_comm[3]; + ompi_group_t *group = cid_context->newcomm->c_local_group; const int group_size = ompi_group_size (group); const int group_rank = ompi_group_rank (group); - int tag = *((int *) local_leader); - int *tmp1; - int i, rc=OMPI_SUCCESS; + ompi_communicator_t *comm = cid_context->comm; + int peers_group[3], *tmp, subreq_count = 0; + ompi_comm_allreduce_context_t *context; + ompi_comm_request_t *request; + ompi_request_t *subreq[3]; + + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (NULL == context) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + tmp = context->tmpbuf = calloc (sizeof (int), count * 3); + if (NULL == context->tmpbuf) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request->context = &context->super; /* basic recursive doubling allreduce on the group */ peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL; @@ -1413,54 +1057,28 @@ static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL; /* translate the ranks into the ranks of the parent communicator */ - ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, peers_comm); - - tmp1 = malloc (sizeof (int) * count); + ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, context->peers_comm); /* reduce */ memmove (outbuf, inbuf, sizeof (int) * count); - for (i = 1 ; i < 3 ; ++i) { - if (MPI_PROC_NULL != peers_comm[i]) { - rc = MCA_PML_CALL(recv(tmp1, count, MPI_INT, peers_comm[i], tag, comm, - MPI_STATUS_IGNORE)); + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + int rc = MCA_PML_CALL(irecv(tmp, count, MPI_INT, context->peers_comm[i + 1], + cid_context->pml_tag, comm, subreq + subreq_count++)); if (OMPI_SUCCESS != rc) { - goto out; + ompi_comm_request_return (request); + return rc; } - /* this is integer reduction so we do not care about ordering */ - ompi_op_reduce (op, tmp1, outbuf, count, MPI_INT); - } - } - - if (MPI_PROC_NULL != peers_comm[0]) { - rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[0], - tag, MCA_PML_BASE_SEND_STANDARD, comm)); - if (OMPI_SUCCESS != rc) { - goto out; - } - rc = MCA_PML_CALL(recv(outbuf, count, MPI_INT, peers_comm[0], - tag, comm, MPI_STATUS_IGNORE)); - if (OMPI_SUCCESS != rc) { - goto out; + tmp += count; } } - /* broadcast */ - for (i = 1 ; i < 3 ; ++i) { - if (MPI_PROC_NULL != peers_comm[i]) { - rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[i], tag, - MCA_PML_BASE_SEND_STANDARD, comm)); - if (OMPI_SUCCESS != rc) { - goto out; - } - } - } + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_recv_complete, subreq, subreq_count); - out: - free (tmp1); + ompi_comm_request_start (request); + *req = &request->super; - return rc; + return OMPI_SUCCESS; } - -END_C_DECLS diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index b2200bdb71e..f453ca1e8e1 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -206,10 +206,6 @@ int ompi_comm_init(void) OBJ_RETAIN(&ompi_mpi_group_null.group); OBJ_RETAIN(&ompi_mpi_errors_are_fatal.eh); - /* initialize the comm_reg stuff for multi-threaded comm_cid - allocation */ - ompi_comm_reg_init(); - /* initialize communicator requests (for ompi_comm_idup) */ ompi_comm_request_init (); @@ -328,27 +324,15 @@ int ompi_comm_finalize(void) } } - OBJ_DESTRUCT (&ompi_mpi_communicators); OBJ_DESTRUCT (&ompi_comm_f_to_c_table); - /* finalize the comm_reg stuff */ - ompi_comm_reg_finalize(); - /* finalize communicator requests */ ompi_comm_request_fini (); return OMPI_SUCCESS; } -/* - * For linking only. To be checked. - */ -int ompi_comm_link_function(void) -{ - return OMPI_SUCCESS; -} - /********************************************************************************/ /********************************************************************************/ /********************************************************************************/ diff --git a/ompi/communicator/comm_request.c b/ompi/communicator/comm_request.c index 496de0319aa..f7372e27452 100644 --- a/ompi/communicator/comm_request.c +++ b/ompi/communicator/comm_request.c @@ -234,6 +234,7 @@ static void ompi_comm_request_destruct (ompi_comm_request_t *request) { OBJ_DESTRUCT(&request->schedule); } + OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t, ompi_comm_request_construct, ompi_comm_request_destruct); @@ -257,10 +258,10 @@ ompi_comm_request_t *ompi_comm_request_get (void) void ompi_comm_request_return (ompi_comm_request_t *request) { if (request->context) { - free (request->context); - request->context = NULL; + OBJ_RELEASE (request->context); } + OMPI_REQUEST_FINI(&request->super); opal_free_list_return (&ompi_comm_requests, (opal_free_list_item_t *) request); } diff --git a/ompi/communicator/comm_request.h b/ompi/communicator/comm_request.h index 246a3010b0e..65af613f95f 100644 --- a/ompi/communicator/comm_request.h +++ b/ompi/communicator/comm_request.h @@ -22,7 +22,7 @@ typedef struct ompi_comm_request_t { ompi_request_t super; - void *context; + opal_object_t *context; opal_list_t schedule; } ompi_comm_request_t; OBJ_CLASS_DECLARATION(ompi_comm_request_t); diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 57ef9977fce..a8d4756cffa 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -351,7 +351,6 @@ static inline bool ompi_comm_peer_invalid(ompi_communicator_t* comm, int peer_id * Initialise MPI_COMM_WORLD and MPI_COMM_SELF */ int ompi_comm_init(void); -OMPI_DECLSPEC int ompi_comm_link_function(void); /** * extract the local group from a communicator @@ -493,24 +492,27 @@ ompi_communicator_t* ompi_comm_allocate (int local_group_size, * @param mode: combination of input * OMPI_COMM_CID_INTRA: intra-comm * OMPI_COMM_CID_INTER: inter-comm + * OMPI_COMM_CID_GROUP: only decide CID within the ompi_group_t + * associated with the communicator. arg0 + * must point to an int which will be used + * as the pml tag for communication. * OMPI_COMM_CID_INTRA_BRIDGE: 2 intracomms connected by - * a bridge comm. local_leader - * and remote leader are in this - * case an int (rank in bridge-comm). + * a bridge comm. arg0 and arg1 must point + * to integers representing the local and + * remote leader ranks. the remote leader rank + * is a rank in the bridgecomm. * OMPI_COMM_CID_INTRA_PMIX: 2 intracomms, leaders talk - * through PMIx. lleader and rleader - * are the required contact information. + * through PMIx. arg0 must point to an integer + * representing the local leader rank. arg1 + * must point to a string representing the + * port of the remote leader. * @param send_first: to avoid a potential deadlock for * the OOB version. * This routine has to be thread safe in the final version. */ -OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* oldcomm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first); +OMPI_DECLSPEC int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode); /** * allocate new communicator ID (non-blocking) @@ -522,10 +524,9 @@ OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, * OMPI_COMM_CID_INTER: inter-comm * This routine has to be thread safe in the final version. */ -OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - int mode, ompi_request_t **req); +OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode, ompi_request_t **req); /** * shut down the communicator infrastructure. @@ -618,18 +619,25 @@ int ompi_comm_determine_first ( ompi_communicator_t *intercomm, int high ); -OMPI_DECLSPEC int ompi_comm_activate ( ompi_communicator_t** newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first ); +OMPI_DECLSPEC int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode); -OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - int mode, ompi_request_t **req); +/** + * Non-blocking variant of comm_activate. + * + * @param[inout] newcomm New communicator + * @param[in] comm Parent communicator + * @param[in] bridgecomm Bridge communicator (used for PMIX and bridge modes) + * @param[in] arg0 Mode argument 0 + * @param[in] arg1 Mode argument 1 + * @param[in] send_first Send first from this process (PMIX mode only) + * @param[in] mode Collective mode + * @param[out] req New request object to track this operation + */ +OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode, ompi_request_t **req); /** * a simple function to dump the structure @@ -639,14 +647,6 @@ int ompi_comm_dump ( ompi_communicator_t *comm ); /* setting name */ int ompi_comm_set_name (ompi_communicator_t *comm, const char *name ); -/* - * these are the init and finalize functions for the comm_reg - * stuff. These routines are necessary for handling multi-threading - * scenarious in the communicator_cid allocation - */ -void ompi_comm_reg_init(void); -void ompi_comm_reg_finalize(void); - /* global variable to save the number od dynamic communicators */ extern int ompi_comm_num_dyncomm; diff --git a/ompi/dpm/dpm.c b/ompi/dpm/dpm.c index 2da39d920a4..7619e8a219f 100644 --- a/ompi/dpm/dpm.c +++ b/ompi/dpm/dpm.c @@ -469,25 +469,25 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, new_group_pointer = MPI_GROUP_NULL; /* allocate comm_cid */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old communicator */ - NULL, /* bridge comm */ - &root, /* local leader */ - (void*)port_string, /* rendezvous point */ - OMPI_COMM_CID_INTRA_PMIX, /* mode */ - send_first ); /* send or recv first */ + rc = ompi_comm_nextcid ( newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + (void*)port_string, /* rendezvous point */ + send_first, /* send or recv first */ + OMPI_COMM_CID_INTRA_PMIX); /* mode */ if (OMPI_SUCCESS != rc) { goto exit; } /* activate comm and init coll-component */ - rc = ompi_comm_activate ( &newcomp, /* new communicator */ - comm, /* old communicator */ - NULL, /* bridge comm */ - &root, /* local leader */ - (void*)port_string, /* rendezvous point */ - OMPI_COMM_CID_INTRA_PMIX, /* mode */ - send_first ); /* send or recv first */ + rc = ompi_comm_activate ( &newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + (void*)port_string, /* rendezvous point */ + send_first, /* send or recv first */ + OMPI_COMM_CID_INTRA_PMIX); /* mode */ if (OMPI_SUCCESS != rc) { goto exit; } @@ -500,7 +500,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, exit: if (OMPI_SUCCESS != rc) { if (MPI_COMM_NULL != newcomp && NULL != newcomp) { - OBJ_RETAIN(newcomp); + OBJ_RELEASE(newcomp); newcomp = MPI_COMM_NULL; } } diff --git a/ompi/mpi/c/intercomm_create.c b/ompi/mpi/c/intercomm_create.c index b7e948cd624..10f0e8ad2ca 100644 --- a/ompi/mpi/c/intercomm_create.c +++ b/ompi/mpi/c/intercomm_create.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,8 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -196,26 +199,15 @@ int MPI_Intercomm_create(MPI_Comm local_comm, int local_leader, new_group_pointer = MPI_GROUP_NULL; /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new comm */ - local_comm, /* old comm */ - bridge_comm, /* bridge comm */ - &lleader, /* local leader */ - &rleader, /* remote_leader */ - OMPI_COMM_CID_INTRA_BRIDGE, /* mode */ - -1 ); /* send_first */ - + rc = ompi_comm_nextcid (newcomp, local_comm, bridge_comm, &lleader, + &rleader, false, OMPI_COMM_CID_INTRA_BRIDGE); if ( MPI_SUCCESS != rc ) { goto err_exit; } /* activate comm and init coll-module */ - rc = ompi_comm_activate ( &newcomp, - local_comm, /* old comm */ - bridge_comm, /* bridge comm */ - &lleader, /* local leader */ - &rleader, /* remote_leader */ - OMPI_COMM_CID_INTRA_BRIDGE, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_activate (&newcomp, local_comm, bridge_comm, &lleader, &rleader, + false, OMPI_COMM_CID_INTRA_BRIDGE); if ( MPI_SUCCESS != rc ) { goto err_exit; } diff --git a/ompi/mpi/c/intercomm_merge.c b/ompi/mpi/c/intercomm_merge.c index bcf8f9fec73..55258b637ef 100644 --- a/ompi/mpi/c/intercomm_merge.c +++ b/ompi/mpi/c/intercomm_merge.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,8 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -115,26 +118,16 @@ int MPI_Intercomm_merge(MPI_Comm intercomm, int high, OBJ_RELEASE(new_group_pointer); new_group_pointer = MPI_GROUP_NULL; - /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new comm */ - intercomm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTER, /* mode */ - -1 ); /* send_first */ + /* Determine context id */ + rc = ompi_comm_nextcid (newcomp, intercomm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTER); if ( OMPI_SUCCESS != rc ) { goto exit; } /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new comm */ - intercomm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTER, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_activate (&newcomp, intercomm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTER); if ( OMPI_SUCCESS != rc ) { goto exit; } diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index 4a76bd8ab6d..21afa95f348 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -196,7 +196,7 @@ int ompi_request_default_wait_all( size_t count, rptr = requests; for (i = 0; i < count; i++) { request = *rptr++; - + if( request->req_state == OMPI_REQUEST_INACTIVE ) { completed++; continue;