Skip to content

Commit d0143fc

Browse files
committed
Merging sorted slices using more than 1 cpu and k-way merge with loser tree
1 parent 73c8e5a commit d0143fc

File tree

5 files changed

+364
-36
lines changed

5 files changed

+364
-36
lines changed

pkg/distributor/distributor.go

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ const (
6464
typeMetadata = "metadata"
6565

6666
instanceIngestionRateTickInterval = time.Second
67+
68+
// mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
69+
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
70+
mergeSlicesParallelism = 8
6771
)
6872

6973
// Distributor is a storage.SampleAppender and a client.Querier which
@@ -959,23 +963,13 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
959963

960964
span, _ = opentracing.StartSpanFromContext(ctx, "response_merge")
961965
defer span.Finish()
962-
valueSet := map[string]struct{}{}
963-
for _, resp := range resps {
964-
for _, v := range resp.([]string) {
965-
valueSet[v] = struct{}{}
966-
}
966+
values := make([][]string, len(resps))
967+
for i, resp := range resps {
968+
values[i] = resp.([]string)
967969
}
968-
969-
values := make([]string, 0, len(valueSet))
970-
for v := range valueSet {
971-
values = append(values, v)
972-
}
973-
974-
// We need the values returned to be sorted.
975-
sort.Strings(values)
976-
span.SetTag("result_length", len(values))
977-
978-
return values, nil
970+
r := util.MergeSlicesParallel(mergeSlicesParallelism, values...)
971+
span.SetTag("result_length", len(r))
972+
return r, nil
979973
}
980974

981975
// LabelValuesForLabelName returns all the label values that are associated with a given label name.
@@ -1039,22 +1033,14 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
10391033

10401034
span, _ = opentracing.StartSpanFromContext(ctx, "response_merge")
10411035
defer span.Finish()
1042-
valueSet := map[string]struct{}{}
1043-
for _, resp := range resps {
1044-
for _, v := range resp.([]string) {
1045-
valueSet[v] = struct{}{}
1046-
}
1036+
values := make([][]string, len(resps))
1037+
for i, resp := range resps {
1038+
values[i] = resp.([]string)
10471039
}
1040+
r := util.MergeSlicesParallel(mergeSlicesParallelism, values...)
1041+
span.SetTag("result_length", len(r))
10481042

1049-
values := make([]string, 0, len(valueSet))
1050-
for v := range valueSet {
1051-
values = append(values, v)
1052-
}
1053-
1054-
sort.Strings(values)
1055-
span.SetTag("result_length", len(values))
1056-
1057-
return values, nil
1043+
return r, nil
10581044
}
10591045

10601046
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) {

pkg/distributor/distributor_test.go

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"math"
9+
"math/rand"
910
"net/http"
1011
"sort"
1112
"strconv"
@@ -1654,6 +1655,56 @@ func TestDistributor_Push_ExemplarValidation(t *testing.T) {
16541655
}
16551656
}
16561657

1658+
func BenchmarkDistributor_GetLabelsValues(b *testing.B) {
1659+
ctx := user.InjectOrgID(context.Background(), "user")
1660+
1661+
testCases := []struct {
1662+
numIngesters int
1663+
lblValuesPerIngester int
1664+
lblValuesDuplicateRatio float64
1665+
}{
1666+
{
1667+
numIngesters: 150,
1668+
lblValuesPerIngester: 1000,
1669+
lblValuesDuplicateRatio: 0.67, // Final Result will have 33% of the total size - replication factor of 3 and no duplicates
1670+
},
1671+
{
1672+
numIngesters: 150,
1673+
lblValuesPerIngester: 1000,
1674+
lblValuesDuplicateRatio: 0.98,
1675+
},
1676+
{
1677+
numIngesters: 500,
1678+
lblValuesPerIngester: 1000,
1679+
lblValuesDuplicateRatio: 0.67, // Final Result will have 33% of the total size - replication factor of 3 and no duplicates
1680+
},
1681+
{
1682+
numIngesters: 500,
1683+
lblValuesPerIngester: 1000,
1684+
lblValuesDuplicateRatio: 0.98,
1685+
},
1686+
}
1687+
1688+
for _, tc := range testCases {
1689+
name := fmt.Sprintf("numIngesters%v,lblValuesPerIngester%v,lblValuesDuplicateRatio%v", tc.numIngesters, tc.lblValuesPerIngester, tc.lblValuesDuplicateRatio)
1690+
ds, _, _, _ := prepare(b, prepConfig{
1691+
numIngesters: tc.numIngesters,
1692+
happyIngesters: tc.numIngesters,
1693+
numDistributors: 1,
1694+
lblValuesPerIngester: tc.lblValuesPerIngester,
1695+
lblValuesDuplicateRatio: tc.lblValuesDuplicateRatio,
1696+
})
1697+
b.Run(name, func(b *testing.B) {
1698+
b.ResetTimer()
1699+
b.ReportAllocs()
1700+
for i := 0; i < b.N; i++ {
1701+
_, err := ds[0].LabelValuesForLabelName(ctx, model.Time(time.Now().UnixMilli()), model.Time(time.Now().UnixMilli()), "__name__")
1702+
require.NoError(b, err)
1703+
}
1704+
})
1705+
}
1706+
}
1707+
16571708
func BenchmarkDistributor_Push(b *testing.B) {
16581709
const (
16591710
numSeriesPerRequest = 1000
@@ -1942,7 +1993,6 @@ func BenchmarkDistributor_Push(b *testing.B) {
19421993

19431994
for n := 0; n < b.N; n++ {
19441995
_, err := distributor.Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, nil, cortexpb.API))
1945-
19461996
if testData.expectedErr == "" && err != nil {
19471997
b.Fatalf("no error expected but got %v", err)
19481998
}
@@ -2392,6 +2442,8 @@ type prepConfig struct {
23922442
shardByAllLabels bool
23932443
shuffleShardEnabled bool
23942444
shuffleShardSize int
2445+
lblValuesPerIngester int
2446+
lblValuesDuplicateRatio float64
23952447
limits *validation.Limits
23962448
numDistributors int
23972449
skipLabelNameValidation bool
@@ -2403,13 +2455,23 @@ type prepConfig struct {
24032455
tokens [][]uint32
24042456
}
24052457

2458+
type prepState struct {
2459+
unusedStrings, usedStrings []string
2460+
}
2461+
24062462
func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*prometheus.Registry, *ring.Ring) {
2463+
// Strings to be used for get labels values/Names
2464+
var unusedStrings []string
2465+
if cfg.lblValuesPerIngester > 0 {
2466+
unusedStrings = make([]string, min(len(util.RandomStrings), cfg.numIngesters*cfg.lblValuesPerIngester))
2467+
copy(unusedStrings, util.RandomStrings)
2468+
}
2469+
s := &prepState{
2470+
unusedStrings: unusedStrings,
2471+
}
24072472
ingesters := []*mockIngester{}
24082473
for i := 0; i < cfg.happyIngesters; i++ {
2409-
ingesters = append(ingesters, &mockIngester{
2410-
happy: *atomic.NewBool(true),
2411-
queryDelay: cfg.queryDelay,
2412-
})
2474+
ingesters = append(ingesters, newMockIngester(i, s, cfg))
24132475
}
24142476
for i := cfg.happyIngesters; i < cfg.numIngesters; i++ {
24152477
miError := errFail
@@ -2679,6 +2741,33 @@ type mockIngester struct {
26792741
metadata map[uint32]map[cortexpb.MetricMetadata]struct{}
26802742
queryDelay time.Duration
26812743
calls map[string]int
2744+
lblsValues []string
2745+
}
2746+
2747+
func newMockIngester(id int, ps *prepState, cfg prepConfig) *mockIngester {
2748+
lblsValues := make([]string, 0, cfg.lblValuesPerIngester)
2749+
usedStrings := make([]string, len(ps.usedStrings))
2750+
copy(usedStrings, ps.usedStrings)
2751+
2752+
for i := 0; i < cfg.lblValuesPerIngester; i++ {
2753+
var s string
2754+
if i < int(float64(cfg.lblValuesPerIngester)*cfg.lblValuesDuplicateRatio) && id > 0 {
2755+
index := rand.Int() % len(usedStrings)
2756+
s = usedStrings[index]
2757+
usedStrings = append(usedStrings[:index], usedStrings[index+1:]...)
2758+
} else {
2759+
s = ps.unusedStrings[0]
2760+
ps.usedStrings = append(ps.usedStrings, s)
2761+
ps.unusedStrings = ps.unusedStrings[1:]
2762+
}
2763+
lblsValues = append(lblsValues, s)
2764+
}
2765+
sort.Strings(lblsValues)
2766+
return &mockIngester{
2767+
happy: *atomic.NewBool(true),
2768+
queryDelay: cfg.queryDelay,
2769+
lblsValues: lblsValues,
2770+
}
26822771
}
26832772

26842773
func (i *mockIngester) series() map[uint32]*cortexpb.PreallocTimeseries {
@@ -2705,6 +2794,12 @@ func (i *mockIngester) Close() error {
27052794
return nil
27062795
}
27072796

2797+
func (i *mockIngester) LabelValues(_ context.Context, _ *client.LabelValuesRequest, _ ...grpc.CallOption) (*client.LabelValuesResponse, error) {
2798+
return &client.LabelValuesResponse{
2799+
LabelValues: i.lblsValues,
2800+
}, nil
2801+
}
2802+
27082803
func (i *mockIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
27092804
return i.Push(ctx, &in.WriteRequest, opts...)
27102805
}

pkg/util/strings.go

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package util
22

3-
import "unsafe"
3+
import (
4+
"sync"
5+
"unsafe"
6+
7+
"github.com/bboreham/go-loser"
8+
)
49

510
// StringsContain returns true if the search value is within the list of input values.
611
func StringsContain(values []string, search string) bool {
@@ -29,3 +34,93 @@ func StringsClone(s string) string {
2934
copy(b, s)
3035
return *(*string)(unsafe.Pointer(&b))
3136
}
37+
38+
// MergeSlicesParallel merge sorted slices in parallel
39+
// using the MergeSlicesV2 function
40+
func MergeSlicesParallel(parallelism int, a ...[]string) []string {
41+
if parallelism <= 1 {
42+
return MergeSlicesV2(a...)
43+
}
44+
if len(a) == 0 {
45+
return nil
46+
}
47+
if len(a) == 1 {
48+
return a[0]
49+
}
50+
c := make(chan []string, len(a))
51+
wg := sync.WaitGroup{}
52+
var r [][]string
53+
p := min(parallelism, len(a)/2)
54+
batchSize := len(a) / p
55+
56+
for i := 0; i < len(a); i += batchSize {
57+
wg.Add(1)
58+
go func(i int) {
59+
m := min(len(a), i+batchSize)
60+
c <- MergeSlicesV2(a[i:m]...)
61+
wg.Done()
62+
}(i)
63+
}
64+
65+
go func() {
66+
wg.Wait()
67+
close(c)
68+
}()
69+
70+
for s := range c {
71+
r = append(r, s)
72+
}
73+
74+
return MergeSlicesV2(r...)
75+
}
76+
77+
func NewStringListIter(s []string) *StringListIter {
78+
return &StringListIter{l: s}
79+
}
80+
81+
type StringListIter struct {
82+
l []string
83+
cur string
84+
}
85+
86+
func (s *StringListIter) Next() bool {
87+
if len(s.l) == 0 {
88+
return false
89+
}
90+
s.cur = s.l[0]
91+
s.l = s.l[1:]
92+
return true
93+
}
94+
95+
func (s *StringListIter) At() string { return s.cur }
96+
97+
var MAX_STRING = string([]byte{0xff})
98+
99+
// MergeSlicesV2 merges a set of sorted string slices into a single ones
100+
// while removing all duplicates.
101+
func MergeSlicesV2(a ...[]string) []string {
102+
if len(a) == 1 {
103+
return a[0]
104+
}
105+
its := make([]*StringListIter, 0, len(a))
106+
sumLengh := 0
107+
for _, s := range a {
108+
sumLengh += len(s)
109+
its = append(its, NewStringListIter(s))
110+
}
111+
lt := loser.New(its, MAX_STRING)
112+
113+
if sumLengh == 0 {
114+
return []string{}
115+
}
116+
117+
r := make([]string, 0, sumLengh*2/10)
118+
var current string
119+
for lt.Next() {
120+
if lt.At() != current {
121+
current = lt.At()
122+
r = append(r, current)
123+
}
124+
}
125+
return r
126+
}

0 commit comments

Comments
 (0)