From af0dcd7047954ecdb64739bd61f8e060e276ea66 Mon Sep 17 00:00:00 2001 From: Nikita Semenov Date: Wed, 11 Jun 2025 20:40:12 +0700 Subject: [PATCH 1/2] [ISSUE-3402]: Ring.Pipelined return dial timeout error --- ring.go | 17 +++++++++++++---- ring_test.go | 15 +++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/ring.go b/ring.go index 4da0b21a6..fd169e2b7 100644 --- a/ring.go +++ b/ring.go @@ -784,6 +784,8 @@ func (c *Ring) generalProcessPipeline( } var wg sync.WaitGroup + errs := make(chan error, len(cmdsMap)) + for hash, cmds := range cmdsMap { wg.Add(1) go func(hash string, cmds []Cmder) { @@ -796,16 +798,23 @@ func (c *Ring) generalProcessPipeline( return } + hook := shard.Client.processPipelineHook if tx { - cmds = wrapMultiExec(ctx, cmds) - _ = shard.Client.processTxPipelineHook(ctx, cmds) - } else { - _ = shard.Client.processPipelineHook(ctx, cmds) + cmds, hook = wrapMultiExec(ctx, cmds), shard.Client.processTxPipelineHook + } + + if err = hook(ctx, cmds); err != nil { + errs <- err } }(hash, cmds) } wg.Wait() + close(errs) + + if err := <-errs; err != nil { + return err + } return cmdsFirstErr(cmds) } diff --git a/ring_test.go b/ring_test.go index aaac74dc9..908807ac3 100644 --- a/ring_test.go +++ b/ring_test.go @@ -271,6 +271,21 @@ var _ = Describe("Redis Ring", func() { Expect(ringShard1.Info(ctx).Val()).ToNot(ContainSubstring("keys=")) Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=100")) }) + + It("return dial timeout error", func() { + opt := redisRingOptions() + opt.DialTimeout = 250 * time.Millisecond + opt.Addrs = map[string]string{"ringShardNotExist": ":1997"} + ring = redis.NewRing(opt) + + _, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error { + pipe.HSet(ctx, "key", "value") + pipe.Expire(ctx, "key", time.Minute) + return nil + }) + + Expect(err).To(HaveOccurred()) + }) }) Describe("new client callback", func() { From d8bfadd8fe0fca8e75d0285b1c1be4a1deff0255 Mon Sep 17 00:00:00 2001 From: Nikita Semenov Date: Mon, 30 Jun 2025 01:15:12 +0700 Subject: [PATCH 2/2] review fixes --- ring.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ring.go b/ring.go index 542497318..0c1566019 100644 --- a/ring.go +++ b/ring.go @@ -814,7 +814,8 @@ func (c *Ring) generalProcessPipeline( hook := shard.Client.processPipelineHook if tx { - cmds, hook = wrapMultiExec(ctx, cmds), shard.Client.processTxPipelineHook + cmds = wrapMultiExec(ctx, cmds) + hook = shard.Client.processTxPipelineHook } if err = hook(ctx, cmds); err != nil {