Skip to content

Commit d490e10

Browse files
awlauriarhc54
andcommitted
Refactor show_help() to use the PMIx_Log() api.
- Port over the de-dupping code from prrte. - Add some additional keys to support a request to de-dup logging messages. - Remove locks from the high level PMIx_Log() api's - they don't seem necessary, and make it very easy to deadlock when the current thread already holds the global lock. - Likewise - use PMIx_Log_nb() instead of PMIx_Log() - the non-blocking counterpart will allow libevent to trigger the callbacks on its own time. Otherwise, in the case of a show_help() message from prted that happens before the main thread hits the event loop, it will hang forever. - We can avoid deadlock in show_help calls made prior to completing client/server/tool "init" by simply leaving the show_help system disabled until we get far enough along, and then explicitly "enabling" it. We also never call a PMIx API from inside the code base as that can lead to loopbacks that deadlock. Signed-off-by: Austen Lauria <[email protected]> Signed-off-by: Ralph Castain <[email protected]> Co-authored-by: Ralph Castain <[email protected]>
1 parent 69e6965 commit d490e10

File tree

12 files changed

+400
-62
lines changed

12 files changed

+400
-62
lines changed

include/pmix_common.h.in

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,10 @@ typedef uint32_t pmix_rank_t;
674674
#define PMIX_LOG_XML_OUTPUT "pmix.log.xml" // (bool) print the output stream in xml format
675675
#define PMIX_LOG_ONCE "pmix.log.once" // (bool) only log this once with whichever channel can first support it
676676
#define PMIX_LOG_MSG "pmix.log.msg" // (pmix_byte_object_t) message blob to be sent somewhere
677+
#define PMIX_LOG_KEY "pmix.log.key" // (char*) key to a logging message
678+
#define PMIX_LOG_VAL "pmix.log.val" // (char*) value to a logging message
679+
#define PMIX_LOG_AGG "pmix.log.agg" // (bool) Whether to aggregate and prevent duplicate logging messages
680+
// based on key value pairs.
677681

678682
#define PMIX_LOG_EMAIL "pmix.log.email" // (pmix_data_array_t*) log via email based on array of pmix_info_t
679683
// containing directives

src/client/pmix_client.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ static pmix_status_t pmix_init_result = PMIX_ERR_INIT;
7878
#include "src/util/pmix_name_fns.h"
7979
#include "src/util/pmix_output.h"
8080
#include "src/util/pmix_printf.h"
81+
#include "src/util/pmix_show_help.h"
8182

8283
#include "pmix_client_ops.h"
8384

@@ -860,6 +861,8 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc, pmix_info_t info[], size_
860861
PMIX_RELEASE_THREAD(&pmix_global_lock);
861862
return rc;
862863
}
864+
// enable show_help subsystem
865+
pmix_show_help_enabled = true;
863866
PMIX_RELEASE_THREAD(&pmix_global_lock);
864867

865868
/* look for a debugger attach key */

src/common/pmix_log.c

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,9 @@ PMIX_EXPORT pmix_status_t PMIx_Log(const pmix_info_t data[], size_t ndata,
6969
pmix_cb_t cb;
7070
pmix_status_t rc;
7171

72-
PMIX_ACQUIRE_THREAD(&pmix_global_lock);
73-
7472
if (pmix_globals.init_cntr <= 0) {
75-
PMIX_RELEASE_THREAD(&pmix_global_lock);
7673
return PMIX_ERR_INIT;
7774
}
78-
PMIX_RELEASE_THREAD(&pmix_global_lock);
7975

8076
pmix_output_verbose(2, pmix_plog_base_framework.framework_output, "%s pmix:log",
8177
PMIX_NAME_PRINT(&pmix_globals.myid));
@@ -115,37 +111,77 @@ static void localcbfunc(pmix_status_t status, void *cbdata)
115111
PMIX_RELEASE(cd);
116112
}
117113

114+
void pmix_log_local_op(int sd, short args, void *cbdata_)
115+
{
116+
pmix_shift_caddy_t *qd = (pmix_shift_caddy_t *) cbdata_;
117+
pmix_info_t *data = qd->info;
118+
size_t ndata = qd->ninfo;
119+
pmix_info_t *directives = qd->directives;
120+
size_t ndirs = qd->ndirs;
121+
pmix_op_cbfunc_t cbfunc = qd->cbfunc.opcbfn;
122+
void *cbdata = qd->cbdata;
123+
pmix_proc_t *source = qd->proc;
124+
pmix_status_t rc;
125+
pmix_shift_caddy_t *cd;
126+
size_t n;
127+
PMIX_HIDE_UNUSED_PARAMS(sd, args);
128+
129+
/* if no recorded source was found, then we must be it */
130+
if (NULL == source) {
131+
source = &pmix_globals.myid;
132+
cd = PMIX_NEW(pmix_shift_caddy_t);
133+
cd->cbfunc.opcbfn = cbfunc;
134+
cd->cbdata = cbdata;
135+
cd->ndirs = ndirs + 1;
136+
PMIX_INFO_CREATE(cd->directives, cd->ndirs);
137+
for (n = 0; n < ndirs; n++) {
138+
PMIX_INFO_XFER(&cd->directives[n], (pmix_info_t *) &directives[n]);
139+
}
140+
PMIX_INFO_LOAD(&cd->directives[ndirs], PMIX_LOG_SOURCE, source, PMIX_PROC);
141+
rc = pmix_plog.log(source, data, ndata, cd->directives, cd->ndirs, localcbfunc, cd);
142+
if (PMIX_SUCCESS != rc) {
143+
PMIX_INFO_FREE(cd->directives, cd->ndirs);
144+
PMIX_RELEASE(cd);
145+
}
146+
} else if (PMIX_CHECK_PROCID(source, &pmix_globals.myid)) {
147+
/* if I am the recorded source, then this is a re-submission of
148+
* something that got "upcalled" by a prior call. In this case,
149+
* we return a "not supported" error as clearly we couldn't
150+
* handle it, and neither could our host */
151+
rc = PMIX_ERR_NOT_SUPPORTED;
152+
} else {
153+
/* call down to process the request - the various components
154+
* will thread shift as required */
155+
rc = pmix_plog.log(source, data, ndata, directives, ndirs, cbfunc, cbdata);
156+
}
157+
}
158+
118159
PMIX_EXPORT pmix_status_t PMIx_Log_nb(const pmix_info_t data[], size_t ndata,
119160
const pmix_info_t directives[], size_t ndirs,
120161
pmix_op_cbfunc_t cbfunc, void *cbdata)
121162

122163
{
123-
pmix_shift_caddy_t *cd;
124164
pmix_cmd_t cmd = PMIX_LOG_CMD;
125165
pmix_buffer_t *msg;
126-
pmix_status_t rc;
127-
size_t n;
166+
pmix_status_t rc = PMIX_SUCCESS;
128167
time_t timestamp = 0;
129168
pmix_proc_t *source = NULL;
130-
131-
PMIX_ACQUIRE_THREAD(&pmix_global_lock);
169+
pmix_shift_caddy_t *cd;
132170

133171
pmix_output_verbose(2, pmix_globals.debug_output, "pmix:log non-blocking");
134172

135173
if (pmix_globals.init_cntr <= 0) {
136-
PMIX_RELEASE_THREAD(&pmix_global_lock);
137174
return PMIX_ERR_INIT;
138175
}
139176

140177
if (0 == ndata || NULL == data) {
141-
PMIX_RELEASE_THREAD(&pmix_global_lock);
142178
return PMIX_ERR_BAD_PARAM;
143179
}
144180

145181
/* check the directives - if they requested a timestamp, then
146182
* get the time, also look for a source */
147183
if (NULL != directives) {
148-
for (n = 0; n < ndirs; n++) {
184+
for (size_t n = 0; n < ndirs; n++) {
149185
if (0 == strncmp(directives[n].key, PMIX_LOG_GENERATE_TIMESTAMP, PMIX_MAX_KEYLEN)) {
150186
if (PMIX_INFO_TRUE(&directives[n])) {
151187
/* pickup the timestamp */
@@ -162,10 +198,8 @@ PMIX_EXPORT pmix_status_t PMIx_Log_nb(const pmix_info_t data[], size_t ndata,
162198
if (!PMIX_PEER_IS_SERVER(pmix_globals.mypeer) && !PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer)) {
163199
/* if we aren't connected, don't attempt to send */
164200
if (!pmix_globals.connected) {
165-
PMIX_RELEASE_THREAD(&pmix_global_lock);
166201
return PMIX_ERR_UNREACH;
167202
}
168-
PMIX_RELEASE_THREAD(&pmix_global_lock);
169203

170204
/* if we are not a server, then relay this request to the server */
171205
cd = PMIX_NEW(pmix_shift_caddy_t);
@@ -234,38 +268,16 @@ PMIX_EXPORT pmix_status_t PMIx_Log_nb(const pmix_info_t data[], size_t ndata,
234268
}
235269
return rc;
236270
}
237-
PMIX_RELEASE_THREAD(&pmix_global_lock);
238271

239-
/* if no recorded source was found, then we must be it */
240-
if (NULL == source) {
241-
source = &pmix_globals.myid;
242-
cd = PMIX_NEW(pmix_shift_caddy_t);
243-
cd->cbfunc.opcbfn = cbfunc;
244-
cd->cbdata = cbdata;
245-
cd->ndirs = ndirs + 1;
246-
PMIX_INFO_CREATE(cd->directives, cd->ndirs);
247-
for (n = 0; n < ndirs; n++) {
248-
PMIX_INFO_XFER(&cd->directives[n], (pmix_info_t *) &directives[n]);
249-
}
250-
PMIX_INFO_LOAD(&cd->directives[ndirs], PMIX_LOG_SOURCE, source, PMIX_PROC);
251-
/* call down to process the request - the various components
252-
* will thread shift as required */
253-
rc = pmix_plog.log(source, data, ndata, cd->directives, cd->ndirs, localcbfunc, cd);
254-
if (PMIX_SUCCESS != rc) {
255-
PMIX_INFO_FREE(cd->directives, cd->ndirs);
256-
PMIX_RELEASE(cd);
257-
}
258-
} else if (PMIX_CHECK_PROCID(source, &pmix_globals.myid)) {
259-
/* if I am the recorded source, then this is a re-submission of
260-
* something that got "upcalled" by a prior call. In this case,
261-
* we return a "not supported" error as clearly we couldn't
262-
* handle it, and neither could our host */
263-
rc = PMIX_ERR_NOT_SUPPORTED;
264-
} else {
265-
/* call down to process the request - the various components
266-
* will thread shift as required */
267-
rc = pmix_plog.log(source, data, ndata, directives, ndirs, cbfunc, cbdata);
268-
}
272+
cd = PMIX_NEW(pmix_shift_caddy_t);
273+
cd->info = (pmix_info_t *) data;
274+
cd->ninfo = ndata;
275+
cd->directives = (pmix_info_t *) directives;
276+
cd->ndirs = ndirs;
277+
cd->cbfunc.opcbfn = cbfunc;
278+
cd->cbdata = cbdata;
279+
cd->proc = source;
280+
PMIX_THREADSHIFT(cd, pmix_log_local_op);
269281

270282
return rc;
271283
}

src/include/pmix_globals.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ static void scon(pmix_shift_caddy_t *p)
295295
p->codes = NULL;
296296
p->ncodes = 0;
297297
p->peer = NULL;
298+
p->proc = NULL;
298299
p->pname.nspace = NULL;
299300
p->pname.rank = PMIX_RANK_UNDEF;
300301
p->data = NULL;

src/include/pmix_globals.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,9 @@ typedef struct {
369369
pmix_proc_t *targets;
370370
size_t ntargets;
371371
pmix_info_t *info;
372+
pmix_info_t *dirs;
372373
size_t ninfo;
374+
size_t ndirs;
373375
pmix_list_t results;
374376
size_t nreplies;
375377
size_t nrequests;
@@ -442,6 +444,7 @@ typedef struct {
442444
pmix_status_t *codes;
443445
size_t ncodes;
444446
pmix_name_t pname;
447+
pmix_proc_t *proc;
445448
pmix_peer_t *peer;
446449
const char *data;
447450
size_t ndata;
@@ -638,6 +641,8 @@ PMIX_EXPORT extern pmix_lock_t pmix_global_lock;
638641
PMIX_EXPORT extern const char* PMIX_PROXY_VERSION;
639642
PMIX_EXPORT extern const char* PMIX_PROXY_BUGREPORT;
640643

644+
PMIX_EXPORT void pmix_log_local_op(int sd, short args, void *cbdata_);
645+
641646
static inline bool pmix_check_node_info(const char *key)
642647
{
643648
char *keys[] = {PMIX_HOSTNAME,

src/mca/plog/base/plog_base_stubs.c

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "src/class/pmix_list.h"
1919
#include "src/server/pmix_server_ops.h"
2020
#include "src/util/pmix_error.h"
21+
#include "src/util/pmix_show_help.h"
2122

2223
#include "src/mca/plog/base/base.h"
2324

@@ -77,6 +78,8 @@ pmix_status_t pmix_plog_base_log(const pmix_proc_t *source, const pmix_info_t da
7778
pmix_mycount_t *mycount;
7879
pmix_list_t channels;
7980
bool all_complete = true;
81+
char *key = NULL, *val = NULL;
82+
bool agg = true; // default to aggregating show_help messages
8083

8184
if (!pmix_plog_globals.initialized) {
8285
return PMIX_ERR_INIT;
@@ -108,7 +111,24 @@ pmix_status_t pmix_plog_base_log(const pmix_proc_t *source, const pmix_info_t da
108111
for (n = 0; n < ndirs; n++) {
109112
if (PMIX_CHECK_KEY(&directives[n], PMIX_LOG_ONCE)) {
110113
logonce = PMIX_INFO_TRUE(&directives[n]);
111-
break;
114+
}
115+
else if (PMIX_CHECK_KEY(&directives[n], PMIX_LOG_AGG)) {
116+
agg = PMIX_INFO_TRUE(&directives[n]);
117+
}
118+
else if (PMIX_CHECK_KEY(&directives[n], PMIX_LOG_KEY)) {
119+
key = directives[n].value.data.string;
120+
}
121+
else if (PMIX_CHECK_KEY(&directives[n], PMIX_LOG_VAL)) {
122+
val = directives[n].value.data.string;
123+
}
124+
}
125+
if (agg && NULL != key && NULL != val) {
126+
if (PMIX_SUCCESS == pmix_help_check_dups(key, val)) {
127+
for (k = 0; k < ndata; k++) {
128+
// This is a dup and has been tracked as such,
129+
// mark this as complete so we don't log it again.
130+
PMIX_INFO_OP_COMPLETED(&data[k]);
131+
}
112132
}
113133
}
114134
}
@@ -162,7 +182,10 @@ pmix_status_t pmix_plog_base_log(const pmix_proc_t *source, const pmix_info_t da
162182
PMIX_DESTRUCT(&channels);
163183
PMIX_RELEASE(mycount);
164184
PMIX_RELEASE_THREAD(&pmix_plog_globals.lock);
165-
return PMIX_SUCCESS;
185+
186+
// Don't return PMIX_SUCCESS here, or else the called
187+
// will expect the cbfunc to be executed (which it won't be.).
188+
return PMIX_OPERATION_SUCCEEDED;
166189
}
167190
PMIX_ACQUIRE_THREAD(&mycount->lock);
168191
PMIX_LIST_FOREACH (active, &channels, pmix_plog_base_active_module_t) {

src/runtime/pmix_params.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ pmix_status_t pmix_register_params(void)
114114
PMIX_MCA_BASE_VAR_TYPE_BOOL,
115115
&pmix_suppress_missing_data_warning);
116116

117+
117118
/**** CLIENT: VERBOSE OUTPUT PARAMS ****/
118119
(void) pmix_mca_base_var_register("pmix", "pmix", "client", "get_verbose",
119120
"Verbosity for client get operations",

src/server/pmix_server.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,8 @@ PMIX_EXPORT pmix_status_t PMIx_server_init(pmix_server_module_t *module, pmix_in
837837
}
838838

839839
++pmix_globals.init_cntr;
840+
// enable show_help subsystem
841+
pmix_show_help_enabled = true;
840842
PMIX_RELEASE_THREAD(&pmix_global_lock);
841843

842844
/* register a handler to catch/aggregate PMIX_EVENT_WAITING_FOR_NOTIFY

src/tool/pmix_tool.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,8 @@ PMIX_EXPORT int PMIx_tool_init(pmix_proc_t *proc, pmix_info_t info[], size_t nin
943943
}
944944
PMIX_DESTRUCT(&cb);
945945
}
946+
// enable show_help subsystem
947+
pmix_show_help_enabled = true;
946948
PMIX_RELEASE_THREAD(&pmix_global_lock);
947949

948950
/* if we are acting as a server, then start listening */

0 commit comments

Comments
 (0)