Skip to content

Commit 879f2b5

Browse files
bosilcaThananon Patinyasakdikul
authored andcommitted
Keep the out-of-sequence fragment ordered.
Rework the logic to handle the out-of-sequence fragments on the receiver side. A large number of OOS messages are still arriving even in single threaded scenarios. Signed-off-by: George Bosilca <[email protected]>
1 parent e3b267a commit 879f2b5

File tree

1 file changed

+153
-66
lines changed

1 file changed

+153
-66
lines changed

ompi/mca/pml/ob1/pml_ob1_recvfrag.c

Lines changed: 153 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
44
* University Research and Technology
55
* Corporation. All rights reserved.
6-
* Copyright (c) 2004-2016 The University of Tennessee and The University
6+
* Copyright (c) 2004-2017 The University of Tennessee and The University
77
* of Tennessee Research Foundation. All rights
88
* reserved.
99
* Copyright (c) 2004-2007 High Performance Computing Center Stuttgart,
@@ -66,7 +66,7 @@ OBJ_CLASS_INSTANCE( mca_pml_ob1_recv_frag_t,
6666
*/
6767

6868
/**
69-
* Append a unexpected descriptor to a queue. This function will allocate and
69+
* Append an unexpected descriptor to a queue. This function will allocate and
7070
* initialize the fragment (if necessary) and then will add it to the specified
7171
* queue. The allocated fragment is not returned to the caller.
7272
*/
@@ -82,21 +82,92 @@ append_frag_to_list(opal_list_t *queue, mca_btl_base_module_t *btl,
8282
opal_list_append(queue, (opal_list_item_t*)frag);
8383
}
8484

85+
/**
86+
* Append an unexpected descriptor to an ordered queue. This function will allocate and
87+
* initialize the fragment (if necessary) and then will add it to the specified
88+
* queue respecting the sequence number. The allocated fragment is not returned to the caller.
89+
*/
90+
static void
91+
append_frag_to_ordered_list(opal_list_t* queue, mca_btl_base_module_t *btl,
92+
mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
93+
size_t num_segments, mca_pml_ob1_recv_frag_t* frag)
94+
{
95+
mca_pml_ob1_recv_frag_t* tmpfrag;
96+
mca_pml_ob1_match_hdr_t* tmphdr;
97+
98+
if(NULL == frag) {
99+
MCA_PML_OB1_RECV_FRAG_ALLOC(frag);
100+
MCA_PML_OB1_RECV_FRAG_INIT(frag, hdr, segments, num_segments, btl);
101+
}
102+
103+
if( opal_list_is_empty(queue) ) { /* no pending fragments yet */
104+
opal_list_append(queue, (opal_list_item_t*)frag);
105+
return;
106+
}
107+
/* Shortcut for sequence number earlier than the first fragment in the list */
108+
tmpfrag = (mca_pml_ob1_recv_frag_t*)opal_list_get_first(queue);
109+
tmphdr = &tmpfrag->hdr.hdr_match;
110+
assert(hdr->hdr_seq != tmphdr->hdr_seq);
111+
if( hdr->hdr_seq < tmphdr->hdr_seq ) {
112+
opal_list_prepend(queue, (opal_list_item_t*)frag);
113+
return;
114+
}
115+
/* Shortcut for sequence number later than the last fragment in the list */
116+
tmpfrag = (mca_pml_ob1_recv_frag_t*)opal_list_get_last(queue);
117+
tmphdr = &tmpfrag->hdr.hdr_match;
118+
if( hdr->hdr_seq > tmphdr->hdr_seq ) {
119+
opal_list_append(queue, (opal_list_item_t*)frag);
120+
return;
121+
}
122+
/* For all other cases (sequence number missing in the list) */
123+
OPAL_LIST_FOREACH(tmpfrag, queue, mca_pml_ob1_recv_frag_t) {
124+
tmphdr = &tmpfrag->hdr.hdr_match;
125+
if( hdr->hdr_seq < tmphdr->hdr_seq ) {
126+
opal_list_insert_pos(queue, (opal_list_item_t*)tmpfrag,
127+
(opal_list_item_t*) frag);
128+
return;
129+
}
130+
}
131+
}
132+
85133
/**
86134
* Match incoming recv_frags against posted receives.
87135
* Supports out of order delivery.
88136
*
89-
* @param frag_header (IN) Header of received recv_frag.
90-
* @param frag_desc (IN) Received recv_frag descriptor.
91-
* @param match_made (OUT) Flag indicating wether a match was made.
92-
* @param additional_matches (OUT) List of additional matches
137+
* @param hdr (IN) Header of received recv_frag.
138+
* @param segments (IN) Received recv_frag descriptor.
139+
* @param num_segments (IN) Flag indicating wether a match was made.
140+
* @param type (IN) Type of the message header.
93141
* @return OMPI_SUCCESS or error status on failure.
94142
*/
95143
static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
96144
mca_pml_ob1_match_hdr_t *hdr,
97145
mca_btl_base_segment_t* segments,
98146
size_t num_segments,
99-
int type);
147+
int type );
148+
149+
/**
150+
* Match incoming frags against posted receives. If frag is not NULL then we assume
151+
* it is already local and that it can be released upon completion.
152+
* Supports out of order delivery.
153+
*
154+
* @param comm_ptr (IN) Communicator where the message has been received
155+
* @param proc (IN) Proc for which we have received the message.
156+
* @param hdr (IN) Header of received recv_frag.
157+
* @param segments (IN) Received recv_frag descriptor.
158+
* @param num_segments (IN) Flag indicating wether a match was made.
159+
* @param type (IN) Type of the message header.
160+
* @return OMPI_SUCCESS or error status on failure.
161+
*/
162+
static int
163+
mca_pml_ob1_recv_frag_match_proc( mca_btl_base_module_t *btl,
164+
ompi_communicator_t* comm_ptr,
165+
mca_pml_ob1_comm_proc_t *proc,
166+
mca_pml_ob1_match_hdr_t *hdr,
167+
mca_btl_base_segment_t* segments,
168+
size_t num_segments,
169+
int type,
170+
mca_pml_ob1_recv_frag_t* frag );
100171

101172
static mca_pml_ob1_recv_request_t*
102173
match_one(mca_btl_base_module_t *btl,
@@ -105,6 +176,19 @@ match_one(mca_btl_base_module_t *btl,
105176
mca_pml_ob1_comm_proc_t *proc,
106177
mca_pml_ob1_recv_frag_t* frag);
107178

179+
static mca_pml_ob1_recv_frag_t* check_cantmatch_for_match(mca_pml_ob1_comm_proc_t *proc)
180+
{
181+
mca_pml_ob1_recv_frag_t *frag = NULL;
182+
183+
frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_first(&proc->frags_cant_match);
184+
if( (opal_list_get_end(&proc->frags_cant_match) != (opal_list_item_t*)frag) &&
185+
(frag->hdr.hdr_match.hdr_seq == proc->expected_sequence) ) {
186+
opal_list_remove_item(&proc->frags_cant_match, (opal_list_item_t*)frag);
187+
return frag;
188+
}
189+
return NULL;
190+
}
191+
108192
void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
109193
mca_btl_base_tag_t tag,
110194
mca_btl_base_descriptor_t* des,
@@ -177,7 +261,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
177261
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_ARRIVED, comm_ptr,
178262
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
179263

180-
append_frag_to_list(&proc->frags_cant_match, btl,
264+
append_frag_to_ordered_list(&proc->frags_cant_match, btl,
181265
hdr, segments, num_segments, NULL);
182266
OB1_MATCHING_UNLOCK(&comm->matching_lock);
183267
return;
@@ -198,7 +282,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
198282
* generation until we reach the correct sequence number.
199283
*/
200284
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_BEGIN, comm_ptr,
201-
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
285+
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
202286

203287
match = match_one(btl, hdr, segments, num_segments, comm_ptr, proc, NULL);
204288

@@ -261,8 +345,18 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
261345
/* don't need a rmb as that is for checking */
262346
recv_request_pml_complete(match);
263347
}
348+
if(0 != opal_list_get_size(&proc->frags_cant_match)) {
349+
mca_pml_ob1_recv_frag_t* frag;
350+
OB1_MATCHING_LOCK(&comm->matching_lock);
351+
if((frag = check_cantmatch_for_match(proc))) {
352+
mca_pml_ob1_recv_frag_match_proc(frag->btl, comm_ptr, proc, &frag->hdr.hdr_match,
353+
frag->segments, frag->num_segments,
354+
hdr->hdr_common.hdr_type, frag);
355+
} else {
356+
OB1_MATCHING_UNLOCK(&comm->matching_lock);
357+
}
358+
}
264359
return;
265-
266360
slow_path:
267361
OB1_MATCHING_UNLOCK(&comm->matching_lock);
268362
mca_pml_ob1_recv_frag_match(btl, hdr, segments,
@@ -605,31 +699,6 @@ match_one(mca_btl_base_module_t *btl,
605699
} while(true);
606700
}
607701

608-
static mca_pml_ob1_recv_frag_t* check_cantmatch_for_match(mca_pml_ob1_comm_proc_t *proc)
609-
{
610-
mca_pml_ob1_recv_frag_t *frag;
611-
612-
/* search the list for a fragment from the send with sequence
613-
* number next_msg_seq_expected
614-
*/
615-
for(frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_first(&proc->frags_cant_match);
616-
frag != (mca_pml_ob1_recv_frag_t*)opal_list_get_end(&proc->frags_cant_match);
617-
frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_next(frag))
618-
{
619-
mca_pml_ob1_match_hdr_t* hdr = &frag->hdr.hdr_match;
620-
/*
621-
* If the message has the next expected seq from that proc...
622-
*/
623-
if(hdr->hdr_seq != proc->expected_sequence)
624-
continue;
625-
626-
opal_list_remove_item(&proc->frags_cant_match, (opal_list_item_t*)frag);
627-
return frag;
628-
}
629-
630-
return NULL;
631-
}
632-
633702
/**
634703
* RCS/CTS receive side matching
635704
*
@@ -669,10 +738,8 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
669738
/* local variables */
670739
uint16_t next_msg_seq_expected, frag_msg_seq;
671740
ompi_communicator_t *comm_ptr;
672-
mca_pml_ob1_recv_request_t *match = NULL;
673741
mca_pml_ob1_comm_t *comm;
674742
mca_pml_ob1_comm_proc_t *proc;
675-
mca_pml_ob1_recv_frag_t* frag = NULL;
676743

677744
/* communicator pointer */
678745
comm_ptr = ompi_comm_lookup(hdr->hdr_ctx);
@@ -691,14 +758,13 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
691758
comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
692759

693760
/* source sequence number */
694-
frag_msg_seq = hdr->hdr_seq;
695761
proc = mca_pml_ob1_peer_lookup (comm_ptr, hdr->hdr_src);
696762

697-
/**
698-
* We generate the MSG_ARRIVED event as soon as the PML is aware of a matching
699-
* fragment arrival. Independing if it is received on the correct order or not.
700-
* This will allow the tools to figure out if the messages are not received in the
701-
* correct order (if multiple network interfaces).
763+
/* We generate the MSG_ARRIVED event as soon as the PML is aware
764+
* of a matching fragment arrival. Independing if it is received
765+
* on the correct order or not. This will allow the tools to
766+
* figure out if the messages are not received in the correct
767+
* order (if multiple network interfaces).
702768
*/
703769
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_ARRIVED, comm_ptr,
704770
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
@@ -712,38 +778,68 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
712778
*/
713779
OB1_MATCHING_LOCK(&comm->matching_lock);
714780

781+
frag_msg_seq = hdr->hdr_seq;
715782
/* get sequence number of next message that can be processed */
716783
next_msg_seq_expected = (uint16_t)proc->expected_sequence;
717784
if(OPAL_UNLIKELY(frag_msg_seq != next_msg_seq_expected))
718785
goto wrong_seq;
719786

787+
return mca_pml_ob1_recv_frag_match_proc(btl, comm_ptr, proc, hdr, segments, num_segments,
788+
type, NULL);
789+
wrong_seq:
790+
/*
791+
* This message comes after the next expected, so it
792+
* is ahead of sequence. Save it for later.
793+
*/
794+
append_frag_to_ordered_list(&proc->frags_cant_match, btl, hdr, segments,
795+
num_segments, NULL);
796+
OB1_MATCHING_UNLOCK(&comm->matching_lock);
797+
return OMPI_SUCCESS;
798+
}
799+
800+
/**
801+
* This function should be called with the comm lock help, but upon it's return
802+
* the mutex will be released.
803+
*/
804+
static int
805+
mca_pml_ob1_recv_frag_match_proc( mca_btl_base_module_t *btl,
806+
ompi_communicator_t* comm_ptr,
807+
mca_pml_ob1_comm_proc_t *proc,
808+
mca_pml_ob1_match_hdr_t *hdr,
809+
mca_btl_base_segment_t* segments,
810+
size_t num_segments,
811+
int type,
812+
mca_pml_ob1_recv_frag_t* frag )
813+
{
814+
/* local variables */
815+
mca_pml_ob1_comm_t* comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
816+
mca_pml_ob1_recv_request_t *match = NULL;
817+
720818
/*
721819
* This is the sequence number we were expecting,
722-
* so we can try matching it to already posted
723-
* receives.
820+
* so we can try matching it to already posted receives.
724821
*/
725822

726-
out_of_order_match:
823+
out_of_order_match:
727824
/* We're now expecting the next sequence number. */
728825
proc->expected_sequence++;
729826

730-
/**
731-
* We generate the SEARCH_POSTED_QUEUE only when the message is received
732-
* in the correct sequence. Otherwise, we delay the event generation until
733-
* we reach the correct sequence number.
827+
/* We generate the SEARCH_POSTED_QUEUE only when the message is
828+
* received in the correct sequence. Otherwise, we delay the event
829+
* generation until we reach the correct sequence number.
734830
*/
735831
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_BEGIN, comm_ptr,
736-
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
832+
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
737833

738834
match = match_one(btl, hdr, segments, num_segments, comm_ptr, proc, frag);
739835

740-
/**
741-
* The match is over. We generate the SEARCH_POSTED_Q_END here, before going
742-
* into the mca_pml_ob1_check_cantmatch_for_match so we can make a difference
743-
* for the searching time for all messages.
836+
/* The match is over. We generate the SEARCH_POSTED_Q_END here,
837+
* before going into the mca_pml_ob1_check_cantmatch_for_match so
838+
* we can make a difference for the searching time for all
839+
* messages.
744840
*/
745841
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_END, comm_ptr,
746-
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
842+
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
747843

748844
/* release matching lock before processing fragment */
749845
OB1_MATCHING_UNLOCK(&comm->matching_lock);
@@ -767,7 +863,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
767863

768864
/*
769865
* Now that new message has arrived, check to see if
770-
* any fragments on the c_c_frags_cant_match list
866+
* any fragments on the frags_cant_match list
771867
* may now be used to form new matchs
772868
*/
773869
if(OPAL_UNLIKELY(opal_list_get_size(&proc->frags_cant_match) > 0)) {
@@ -783,15 +879,6 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
783879
OB1_MATCHING_UNLOCK(&comm->matching_lock);
784880
}
785881

786-
return OMPI_SUCCESS;
787-
wrong_seq:
788-
/*
789-
* This message comes after the next expected, so it
790-
* is ahead of sequence. Save it for later.
791-
*/
792-
append_frag_to_list(&proc->frags_cant_match, btl, hdr, segments,
793-
num_segments, NULL);
794-
OB1_MATCHING_UNLOCK(&comm->matching_lock);
795882
return OMPI_SUCCESS;
796883
}
797884

0 commit comments

Comments
 (0)