diff --git a/go.mod b/go.mod index 19b4a75927f..19bb844e87c 100644 --- a/go.mod +++ b/go.mod @@ -77,6 +77,7 @@ require ( require ( github.com/VictoriaMetrics/fastcache v1.12.1 + github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 github.com/cespare/xxhash/v2 v2.2.0 github.com/google/go-cmp v0.6.0 github.com/sercand/kuberesolver/v4 v4.0.0 @@ -111,7 +112,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.5 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.16.19 // indirect github.com/aws/smithy-go v1.13.3 // indirect - github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4b4eea0bf91..dcd563ef448 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -64,6 +64,10 @@ const ( typeMetadata = "metadata" instanceIngestionRateTickInterval = time.Second + + // mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and + // it was based on empirical observation: See BenchmarkMergeSlicesParallel + mergeSlicesParallelism = 8 ) // Distributor is a storage.SampleAppender and a client.Querier which @@ -959,23 +963,13 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t span, _ = opentracing.StartSpanFromContext(ctx, "response_merge") defer span.Finish() - valueSet := map[string]struct{}{} - for _, resp := range resps { - for _, v := range resp.([]string) { - valueSet[v] = struct{}{} - } + values := make([][]string, len(resps)) + for i, resp := range resps { + values[i] = resp.([]string) } - - values := make([]string, 0, len(valueSet)) - for v := range valueSet { - values = append(values, v) - } - - // We need the values returned to be sorted. - sort.Strings(values) - span.SetTag("result_length", len(values)) - - return values, nil + r := util.MergeSlicesParallel(mergeSlicesParallelism, values...) + span.SetTag("result_length", len(r)) + return r, nil } // LabelValuesForLabelName returns all the label values that are associated with a given label name. @@ -1039,22 +1033,14 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, span, _ = opentracing.StartSpanFromContext(ctx, "response_merge") defer span.Finish() - valueSet := map[string]struct{}{} - for _, resp := range resps { - for _, v := range resp.([]string) { - valueSet[v] = struct{}{} - } + values := make([][]string, len(resps)) + for i, resp := range resps { + values[i] = resp.([]string) } + r := util.MergeSlicesParallel(mergeSlicesParallelism, values...) + span.SetTag("result_length", len(r)) - values := make([]string, 0, len(valueSet)) - for v := range valueSet { - values = append(values, v) - } - - sort.Strings(values) - span.SetTag("result_length", len(values)) - - return values, nil + return r, nil } func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 74e9080da21..8b4e933daf7 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "math" + "math/rand" "net/http" "sort" "strconv" @@ -1654,6 +1655,66 @@ func TestDistributor_Push_ExemplarValidation(t *testing.T) { } } +func BenchmarkDistributor_GetLabelsValues(b *testing.B) { + ctx := user.InjectOrgID(context.Background(), "user") + + testCases := []struct { + numIngesters int + lblValuesPerIngester int + lblValuesDuplicateRatio float64 + }{ + { + numIngesters: 16, + lblValuesPerIngester: 1000, + lblValuesDuplicateRatio: 0.67, // Final Result will have 33% of the total size - replication factor of 3 and no duplicates + }, + { + numIngesters: 16, + lblValuesPerIngester: 1000, + lblValuesDuplicateRatio: 0.98, + }, + { + numIngesters: 150, + lblValuesPerIngester: 1000, + lblValuesDuplicateRatio: 0.67, // Final Result will have 33% of the total size - replication factor of 3 and no duplicates + }, + { + numIngesters: 150, + lblValuesPerIngester: 1000, + lblValuesDuplicateRatio: 0.98, + }, + { + numIngesters: 500, + lblValuesPerIngester: 1000, + lblValuesDuplicateRatio: 0.67, // Final Result will have 33% of the total size - replication factor of 3 and no duplicates + }, + { + numIngesters: 500, + lblValuesPerIngester: 1000, + lblValuesDuplicateRatio: 0.98, + }, + } + + for _, tc := range testCases { + name := fmt.Sprintf("numIngesters%v,lblValuesPerIngester%v,lblValuesDuplicateRatio%v", tc.numIngesters, tc.lblValuesPerIngester, tc.lblValuesDuplicateRatio) + ds, _, _, _ := prepare(b, prepConfig{ + numIngesters: tc.numIngesters, + happyIngesters: tc.numIngesters, + numDistributors: 1, + lblValuesPerIngester: tc.lblValuesPerIngester, + lblValuesDuplicateRatio: tc.lblValuesDuplicateRatio, + }) + b.Run(name, func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _, err := ds[0].LabelValuesForLabelName(ctx, model.Time(time.Now().UnixMilli()), model.Time(time.Now().UnixMilli()), "__name__") + require.NoError(b, err) + } + }) + } +} + func BenchmarkDistributor_Push(b *testing.B) { const ( numSeriesPerRequest = 1000 @@ -1942,7 +2003,6 @@ func BenchmarkDistributor_Push(b *testing.B) { for n := 0; n < b.N; n++ { _, err := distributor.Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, nil, cortexpb.API)) - if testData.expectedErr == "" && err != nil { b.Fatalf("no error expected but got %v", err) } @@ -2392,6 +2452,8 @@ type prepConfig struct { shardByAllLabels bool shuffleShardEnabled bool shuffleShardSize int + lblValuesPerIngester int + lblValuesDuplicateRatio float64 limits *validation.Limits numDistributors int skipLabelNameValidation bool @@ -2403,13 +2465,23 @@ type prepConfig struct { tokens [][]uint32 } +type prepState struct { + unusedStrings, usedStrings []string +} + func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*prometheus.Registry, *ring.Ring) { + // Strings to be used for get labels values/Names + var unusedStrings []string + if cfg.lblValuesPerIngester > 0 { + unusedStrings = make([]string, min(len(util.RandomStrings), cfg.numIngesters*cfg.lblValuesPerIngester)) + copy(unusedStrings, util.RandomStrings) + } + s := &prepState{ + unusedStrings: unusedStrings, + } ingesters := []*mockIngester{} for i := 0; i < cfg.happyIngesters; i++ { - ingesters = append(ingesters, &mockIngester{ - happy: *atomic.NewBool(true), - queryDelay: cfg.queryDelay, - }) + ingesters = append(ingesters, newMockIngester(i, s, cfg)) } for i := cfg.happyIngesters; i < cfg.numIngesters; i++ { miError := errFail @@ -2679,6 +2751,33 @@ type mockIngester struct { metadata map[uint32]map[cortexpb.MetricMetadata]struct{} queryDelay time.Duration calls map[string]int + lblsValues []string +} + +func newMockIngester(id int, ps *prepState, cfg prepConfig) *mockIngester { + lblsValues := make([]string, 0, cfg.lblValuesPerIngester) + usedStrings := make([]string, len(ps.usedStrings)) + copy(usedStrings, ps.usedStrings) + + for i := 0; i < cfg.lblValuesPerIngester; i++ { + var s string + if i < int(float64(cfg.lblValuesPerIngester)*cfg.lblValuesDuplicateRatio) && id > 0 { + index := rand.Int() % len(usedStrings) + s = usedStrings[index] + usedStrings = append(usedStrings[:index], usedStrings[index+1:]...) + } else { + s = ps.unusedStrings[0] + ps.usedStrings = append(ps.usedStrings, s) + ps.unusedStrings = ps.unusedStrings[1:] + } + lblsValues = append(lblsValues, s) + } + sort.Strings(lblsValues) + return &mockIngester{ + happy: *atomic.NewBool(true), + queryDelay: cfg.queryDelay, + lblsValues: lblsValues, + } } func (i *mockIngester) series() map[uint32]*cortexpb.PreallocTimeseries { @@ -2705,6 +2804,12 @@ func (i *mockIngester) Close() error { return nil } +func (i *mockIngester) LabelValues(_ context.Context, _ *client.LabelValuesRequest, _ ...grpc.CallOption) (*client.LabelValuesResponse, error) { + return &client.LabelValuesResponse{ + LabelValues: i.lblsValues, + }, nil +} + func (i *mockIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { return i.Push(ctx, &in.WriteRequest, opts...) } diff --git a/pkg/util/strings.go b/pkg/util/strings.go index e7f7c3aceb4..30a3283c536 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -1,6 +1,11 @@ package util -import "unsafe" +import ( + "sync" + "unsafe" + + "github.com/bboreham/go-loser" +) // StringsContain returns true if the search value is within the list of input values. func StringsContain(values []string, search string) bool { @@ -29,3 +34,93 @@ func StringsClone(s string) string { copy(b, s) return *(*string)(unsafe.Pointer(&b)) } + +// MergeSlicesParallel merge sorted slices in parallel +// using the MergeSortedSlices function +func MergeSlicesParallel(parallelism int, a ...[]string) []string { + if parallelism <= 1 { + return MergeSortedSlices(a...) + } + if len(a) == 0 { + return nil + } + if len(a) == 1 { + return a[0] + } + c := make(chan []string, len(a)) + wg := sync.WaitGroup{} + var r [][]string + p := min(parallelism, len(a)/2) + batchSize := len(a) / p + + for i := 0; i < len(a); i += batchSize { + wg.Add(1) + go func(i int) { + m := min(len(a), i+batchSize) + c <- MergeSortedSlices(a[i:m]...) + wg.Done() + }(i) + } + + go func() { + wg.Wait() + close(c) + }() + + for s := range c { + r = append(r, s) + } + + return MergeSortedSlices(r...) +} + +func NewStringListIter(s []string) *StringListIter { + return &StringListIter{l: s} +} + +type StringListIter struct { + l []string + cur string +} + +func (s *StringListIter) Next() bool { + if len(s.l) == 0 { + return false + } + s.cur = s.l[0] + s.l = s.l[1:] + return true +} + +func (s *StringListIter) At() string { return s.cur } + +var MAX_STRING = string([]byte{0xff}) + +// MergeSortedSlices merges a set of sorted string slices into a single ones +// while removing all duplicates. +func MergeSortedSlices(a ...[]string) []string { + if len(a) == 1 { + return a[0] + } + its := make([]*StringListIter, 0, len(a)) + sumLengh := 0 + for _, s := range a { + sumLengh += len(s) + its = append(its, NewStringListIter(s)) + } + lt := loser.New(its, MAX_STRING) + + if sumLengh == 0 { + return []string{} + } + + r := make([]string, 0, sumLengh*2/10) + var current string + for lt.Next() { + if lt.At() != current { + current = lt.At() + r = append(r, current) + } + } + return r +} diff --git a/pkg/util/strings_test.go b/pkg/util/strings_test.go new file mode 100644 index 00000000000..05cf9d20969 --- /dev/null +++ b/pkg/util/strings_test.go @@ -0,0 +1,129 @@ +package util + +import ( + "fmt" + "math/rand" + "sort" + "testing" + + "github.com/stretchr/testify/require" +) + +func BenchmarkMergeSlicesParallel(b *testing.B) { + testCases := []struct { + inputSize int + stringsPerInput int + duplicateRatio float64 + }{ + { + inputSize: 100, + stringsPerInput: 100, + duplicateRatio: 0.3, // Deduped array size will be 70% of the total of the input + }, + { + inputSize: 100, + stringsPerInput: 100, + duplicateRatio: 0.8, // Deduped array size will be 20% of the total of the input + }, + { + inputSize: 100, + stringsPerInput: 100, + duplicateRatio: 0.95, // Deduped array size will be 5% of the total of the input + }, + { + inputSize: 150, + stringsPerInput: 300, + duplicateRatio: 0.3, + }, + { + inputSize: 150, + stringsPerInput: 300, + duplicateRatio: 0.8, + }, + { + inputSize: 150, + stringsPerInput: 300, + duplicateRatio: 0.95, + }, + } + + type ParallelismType int + + const ( + usingMap ParallelismType = iota + ) + + parallelism := []ParallelismType{usingMap, 1, 8} + + for _, tc := range testCases { + input := make([][]string, tc.inputSize) + unusedStrings := make([]string, min(len(RandomStrings), tc.inputSize*tc.stringsPerInput)) + usedStrings := make([]string, 0, len(unusedStrings)) + copy(unusedStrings, RandomStrings) + + for i := 0; i < tc.inputSize; i++ { + stringsToBeReused := make([]string, len(usedStrings)) + copy(stringsToBeReused, usedStrings) + for j := 0; j < tc.stringsPerInput; j++ { + // Get a random string already used + var s string + if j < int(float64(tc.stringsPerInput)*tc.duplicateRatio) && i > 0 { + index := rand.Int() % len(stringsToBeReused) + s = stringsToBeReused[index] + stringsToBeReused = append(stringsToBeReused[:index], stringsToBeReused[index+1:]...) + } else { + for { + s = unusedStrings[0] + usedStrings = append(usedStrings, s) + unusedStrings = unusedStrings[1:] + break + } + } + input[i] = append(input[i], s) + } + sort.Strings(input[i]) + } + for _, p := range parallelism { + var name string + if p == usingMap { + name = fmt.Sprintf("usingMap,inputSize:%v,stringsPerInput:%v,duplicateRatio:%v", tc.inputSize, tc.stringsPerInput, tc.duplicateRatio) + } else { + name = fmt.Sprintf("parallelism:%v,inputSize:%v,stringsPerInput:%v,duplicateRatio:%v", p, tc.inputSize, tc.stringsPerInput, tc.duplicateRatio) + } + expected := sortUsingMap(input...) + b.Run(name, func(b *testing.B) { + // Run the benchmark. + b.ReportAllocs() + b.ResetTimer() + var r []string + for i := 0; i < b.N; i++ { + if p == usingMap { + r = sortUsingMap(input...) + require.NotEmpty(b, r) + } else { + r = MergeSlicesParallel(int(p), input...) + require.NotEmpty(b, r) + } + } + require.Equal(b, r, expected) + }) + } + } +} + +func sortUsingMap(resps ...[]string) []string { + valueSet := map[string]struct{}{} + for _, resp := range resps { + for _, v := range resp { + valueSet[v] = struct{}{} + } + } + + values := make([]string, 0, len(valueSet)) + for v := range valueSet { + values = append(values, v) + } + + sort.Strings(values) + return values +} diff --git a/pkg/util/test_util.go b/pkg/util/test_util.go new file mode 100644 index 00000000000..4f9c65f010a --- /dev/null +++ b/pkg/util/test_util.go @@ -0,0 +1,23 @@ +package util + +import ( + "math/rand" + "strings" +) + +var ( + randomChar = "0123456789abcdef" + RandomStrings = []string{} +) + +func init() { + sb := strings.Builder{} + for i := 0; i < 1000000; i++ { + sb.Reset() + sb.WriteString("pod://") + for j := 0; j < 14; j++ { + sb.WriteByte(randomChar[rand.Int()%len(randomChar)]) + } + RandomStrings = append(RandomStrings, sb.String()) + } +}