From 68912d04a876bc29644f30f9a916e001a2fba92f Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Sun, 31 Jan 2016 20:29:55 -0800 Subject: [PATCH] Fix the grpcomm operations at scale. Restore the direct component to be the default, and to execute a rollup collective. This may in fact be faster than the alternatives, and something appears broken at scale when using brks in particular. Turn off the rcd and brks components as they don't work at scale right now - they can be restored at some future point when someone can debug them. Adjust to Jeff's quibbles Fixes open-mpi/mpi#1215 --- orte/mca/grpcomm/base/grpcomm_base_frame.c | 3 +- orte/mca/grpcomm/base/grpcomm_base_stubs.c | 50 +++++++- orte/mca/grpcomm/brks/.opal_ignore | 0 orte/mca/grpcomm/direct/grpcomm_direct.c | 108 +++++++++--------- .../grpcomm/direct/grpcomm_direct_component.c | 4 +- orte/mca/grpcomm/grpcomm.h | 2 + orte/mca/grpcomm/rcd/.opal_ignore | 0 orte/mca/odls/base/odls_base_default_fns.c | 8 +- 8 files changed, 116 insertions(+), 59 deletions(-) create mode 100644 orte/mca/grpcomm/brks/.opal_ignore create mode 100644 orte/mca/grpcomm/rcd/.opal_ignore diff --git a/orte/mca/grpcomm/base/grpcomm_base_frame.c b/orte/mca/grpcomm/base/grpcomm_base_frame.c index c6362c71622..242e4410f0a 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_frame.c +++ b/orte/mca/grpcomm/base/grpcomm_base_frame.c @@ -12,7 +12,7 @@ * All rights reserved. * Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -122,6 +122,7 @@ static void ccon(orte_grpcomm_coll_t *p) OBJ_CONSTRUCT(&p->distance_mask_recv, opal_bitmap_t); p->dmns = NULL; p->ndmns = 0; + p->nexpected = 0; p->nreported = 0; p->cbfunc = NULL; p->cbdata = NULL; diff --git a/orte/mca/grpcomm/base/grpcomm_base_stubs.c b/orte/mca/grpcomm/base/grpcomm_base_stubs.c index 9e2b6572878..621b645da2b 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_stubs.c +++ b/orte/mca/grpcomm/base/grpcomm_base_stubs.c @@ -157,7 +157,7 @@ static void allgather_stub(int fd, short args, void *cbdata) ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)&cd->sig->seq_num); if (OPAL_SUCCESS != ret) { OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output, - "%s rpcomm:base:allgather can't not add new signature to hash table", + "%s rpcomm:base:allgather cannot add new signature to hash table", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_ERROR_LOG(ret); OBJ_RELEASE(cd); @@ -208,6 +208,9 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig { orte_grpcomm_coll_t *coll; int rc; + orte_namelist_t *nm; + opal_list_t children; + size_t n; /* search the existing tracker list to see if this already exists */ OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) { @@ -254,6 +257,30 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig ORTE_ERROR_LOG(rc); return NULL; } + /* cycle thru the array of daemons and compare them to our + * children in the routing tree, counting the ones that match + * so we know how many daemons we should receive contributions from */ + OBJ_CONSTRUCT(&children, opal_list_t); + orte_routed.get_routing_list(&children); + while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) { + for (n=0; n < coll->ndmns; n++) { + if (nm->name.vpid == coll->dmns[n]) { + coll->nexpected++; + break; + } + } + OBJ_RELEASE(nm); + } + OPAL_LIST_DESTRUCT(&children); + /* see if I am in the array of participants - note that I may + * be in the rollup tree even though I'm not participating + * in the collective itself */ + for (n=0; n < coll->ndmns; n++) { + if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) { + coll->nexpected++; + break; + } + } return coll; } @@ -292,6 +319,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig, /* all daemons hosting this jobid are participating */ if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } if (NULL == jdata->map) { @@ -326,7 +356,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig, /* should never happen */ ORTE_ERROR_LOG(ORTE_ERROR); free(dns); - return ORTE_ERROR; + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; + return ORTE_ERR_NOT_FOUND; } OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output, "%s grpcomm:base:create_dmns adding daemon %s to array", @@ -343,6 +376,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig, if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); OPAL_LIST_DESTRUCT(&ds); + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } opal_output_verbose(5, orte_grpcomm_base_framework.framework_output, @@ -352,12 +388,17 @@ static int create_dmns(orte_grpcomm_signature_t *sig, if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); OPAL_LIST_DESTRUCT(&ds); + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } if (NULL == proc->node || NULL == proc->node->daemon) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); OPAL_LIST_DESTRUCT(&ds); ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } vpid = proc->node->daemon->name.vpid; @@ -377,7 +418,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig, if (0 == opal_list_get_size(&ds)) { ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); OPAL_LIST_DESTRUCT(&ds); - return ORTE_ERR_BAD_PARAM; + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; + return ORTE_ERR_NOT_FOUND; } dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t)); nds = 0; diff --git a/orte/mca/grpcomm/brks/.opal_ignore b/orte/mca/grpcomm/brks/.opal_ignore new file mode 100644 index 00000000000..e69de29bb2d diff --git a/orte/mca/grpcomm/direct/grpcomm_direct.c b/orte/mca/grpcomm/direct/grpcomm_direct.c index efc46712307..4fc737865c2 100644 --- a/orte/mca/grpcomm/direct/grpcomm_direct.c +++ b/orte/mca/grpcomm/direct/grpcomm_direct.c @@ -5,7 +5,7 @@ * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All * rights reserved. - * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -124,7 +124,7 @@ static int xcast(orte_vpid_t *vpids, static int allgather(orte_grpcomm_coll_t *coll, opal_buffer_t *buf) { - int rc, ret; + int rc; opal_buffer_t *relay; OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, @@ -143,35 +143,16 @@ static int allgather(orte_grpcomm_coll_t *coll, return rc; } - /* if we are the HNP and nobody else is participating, - * then just execute the xcast */ - if (ORTE_PROC_IS_HNP && 1 == coll->ndmns) { - /* pack the status - success since the allgather completed. This - * would be an error if we timeout instead */ - ret = ORTE_SUCCESS; - if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(relay); - return rc; - } - /* pass along the payload */ - opal_dss.copy_payload(relay, buf); - orte_grpcomm.xcast(coll->sig, ORTE_RML_TAG_COLL_RELEASE, relay); - OBJ_RELEASE(relay); - return ORTE_SUCCESS; - } - /* pass along the payload */ opal_dss.copy_payload(relay, buf); - /* otherwise, we need to send this to the HNP for - * processing */ + /* send this to ourselves for processing */ OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, - "%s grpcomm:direct:allgather sending to HNP", + "%s grpcomm:direct:allgather sending to ourself", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* send the info to the HNP for tracking */ - rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, relay, + /* send the info to ourselves for tracking */ + rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay, ORTE_RML_TAG_ALLGATHER_DIRECT, orte_rml_send_callback, NULL); return rc; @@ -212,35 +193,60 @@ static void allgather_recv(int status, orte_process_name_t* sender, opal_dss.copy_payload(&coll->bucket, buffer); OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, - "%s grpcomm:direct allgather recv ndmns %d nrep %d", + "%s grpcomm:direct allgather recv nexpected %d nrep %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (int)coll->ndmns, (int)coll->nreported)); - - /* if all participating daemons have reported */ - if (coll->ndmns == coll->nreported) { - reply = OBJ_NEW(opal_buffer_t); - /* pack the signature */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(sig); - return; - } - /* pack the status - success since the allgather completed. This - * would be an error if we timeout instead */ - ret = ORTE_SUCCESS; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); + (int)coll->nexpected, (int)coll->nreported)); + + /* see if everyone has reported */ + if (coll->nreported == coll->nexpected) { + if (ORTE_PROC_IS_HNP) { + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, + "%s grpcomm:direct allgather HNP reports complete", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* the allgather is complete - send the xcast */ + reply = OBJ_NEW(opal_buffer_t); + /* pack the signature */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(sig); + return; + } + /* pack the status - success since the allgather completed. This + * would be an error if we timeout instead */ + ret = ORTE_SUCCESS; + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(sig); + return; + } + /* transfer the collected bucket */ + opal_dss.copy_payload(reply, &coll->bucket); + /* send the release via xcast */ + (void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply); OBJ_RELEASE(reply); - OBJ_RELEASE(sig); - return; + } else { + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, + "%s grpcomm:direct allgather rollup complete - sending to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT))); + /* relay the bucket upward */ + reply = OBJ_NEW(opal_buffer_t); + /* pack the signature */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(sig); + return; + } + /* transfer the collected bucket */ + opal_dss.copy_payload(reply, &coll->bucket); + /* send the info to our parent */ + rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_PARENT, reply, + ORTE_RML_TAG_ALLGATHER_DIRECT, + orte_rml_send_callback, NULL); } - /* transfer the collected bucket */ - opal_dss.copy_payload(reply, &coll->bucket); - - /* send the release via xcast */ - (void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply); - OBJ_RELEASE(reply); } OBJ_RELEASE(sig); } diff --git a/orte/mca/grpcomm/direct/grpcomm_direct_component.c b/orte/mca/grpcomm/direct/grpcomm_direct_component.c index ac4b6e693f3..3c6cad000d4 100644 --- a/orte/mca/grpcomm/direct/grpcomm_direct_component.c +++ b/orte/mca/grpcomm/direct/grpcomm_direct_component.c @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2011-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2014 Intel, Inc. All rights reserved. * $COPYRIGHT$ @@ -55,7 +55,7 @@ static int direct_register(void) /* make the priority adjustable so users can select * direct for use by apps without affecting daemons */ - my_priority = 1; + my_priority = 85; (void) mca_base_component_var_register(c, "priority", "Priority of the grpcomm direct component", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, diff --git a/orte/mca/grpcomm/grpcomm.h b/orte/mca/grpcomm/grpcomm.h index f3be029c0e2..00ddccacc42 100644 --- a/orte/mca/grpcomm/grpcomm.h +++ b/orte/mca/grpcomm/grpcomm.h @@ -77,6 +77,8 @@ typedef struct { size_t ndmns; /** my index in the dmns array */ unsigned long my_rank; + /* number of buckets expected */ + size_t nexpected; /* number reported in */ size_t nreported; /* distance masks for receive */ diff --git a/orte/mca/grpcomm/rcd/.opal_ignore b/orte/mca/grpcomm/rcd/.opal_ignore new file mode 100644 index 00000000000..e69de29bb2d diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index bed85583ff5..fc1c758d4b6 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -14,7 +14,7 @@ * Copyright (c) 2011-2013 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2011-2013 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -249,6 +249,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, orte_app_context_t *app; bool found; orte_node_t *node; + bool newmap = false; OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output, "%s odls:constructing child list", @@ -398,6 +399,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, /* ensure the map object is present */ if (NULL == jdata->map) { jdata->map = OBJ_NEW(orte_job_map_t); + newmap = true; } /* if we have a file map, then we need to load it */ @@ -455,7 +457,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, if (!found) { OBJ_RETAIN(dmn->node); opal_pointer_array_add(jdata->map->nodes, dmn->node); - jdata->map->num_nodes++; + if (newmap) { + jdata->map->num_nodes++; + } } /* see if it belongs to us */