Skip to content

Commit 6bb5e91

Browse files
author
KJ Tsanaktsidis
committed
Implement RedisClient::Cluster#with
This implements support for "pinning" a cluster client to a particular keyslot. This allows the use of WATCH/MULTI/EXEC transactions on that node without any ambiguity. Because RedisClient also implements watch as "yield self" and ignores all arguments, this means that you can easily write transactions that are compatible with both cluster and non-clustered redis (provided you use hashtags for your keys).
1 parent b6da37f commit 6bb5e91

File tree

6 files changed

+374
-0
lines changed

6 files changed

+374
-0
lines changed

.rubocop.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ Metrics/ClassLength:
1919

2020
Metrics/ModuleLength:
2121
Max: 500
22+
Exclude:
23+
- 'test/**/*'
2224

2325
Metrics/MethodLength:
2426
Max: 50

README.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,94 @@ cli.call('MGET', '{key}1', '{key}2', '{key}3')
168168
#=> [nil, nil, nil]
169169
```
170170

171+
## Transactions
172+
This gem supports [Redis transactions](https://redis.io/topics/transactions), including atomicity with `MULTI`/`EXEC`,
173+
and conditional execution with `WATCH`. Redis does not support cross-slot transactions, so all keys used within a
174+
transaction must live in the same key slot. To use transactions, you must thus "pin" your client to a single slot using
175+
`#with`. Pass a key to `#with`, and the yielded connection object will enforce that all keys used in the block must
176+
belong to the same slot.
177+
178+
```ruby
179+
cli.with(key: '{key}1') do |conn|
180+
conn.call('SET', '{key}1', 'This is OK')
181+
#=> "OK"
182+
conn.call('SET', '{key}2', 'This is also OK; it has the same hashtag, and so the same slot')
183+
#=> "OK"
184+
conn.call('SET', '{otherslot}3', 'This will raise an exception; this key is not in the same slot as {key}1')
185+
#=> Connection pinned to slot 12539 but command ["SET", "{otherslot}3", "..."] includes key {otherslot}3 in slot 10271 (RedisClient::Cluster::Transaction::ConsistencyError)
186+
end
187+
```
188+
189+
When using hash tags to enforce same-slot key hashing, it's often neat to simply pass the hashtag _only_ to `#with`:
190+
191+
```ruby
192+
cli.with(key: '{myslot}') do |conn|
193+
# You can use any key starting with {myslot} in this block
194+
end
195+
```
196+
197+
Once you have pinned a client to a particular slot, you can use the same transaction APIs as the
198+
[redis-client](https://github.com/redis-rb/redis-client#usage) gem allows.
199+
200+
```ruby
201+
# No concurrent client will ever see the value 1 in 'mykey'; it will see either zero or two.
202+
cli.call('SET', 'key', 0)
203+
cli.with(key: 'key') do |conn|
204+
conn.multi do |txn|
205+
txn.call('INCR', 'key')
206+
txn.call('INCR', 'key')
207+
end
208+
#=> ['OK', 'OK']
209+
end
210+
211+
# Conditional execution with WATCH can be used to e.g. atomically swap two keys
212+
cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2')
213+
cli.with(key: '{key}') do |conn|
214+
conn.call('WATCH', '{key}1', '{key}2')
215+
conn.multi do |txn|
216+
old_key1 = conn.call('GET', '{key}1')
217+
old_key2 = conn.call('GET', '{key}2')
218+
txn.call('SET', '{key}1', old_key2)
219+
txn.call('SET', '{key}2', old_key1)
220+
end
221+
# This transaction will swap the values of {key}1 and {key}2 only if no concurrent connection modified
222+
# either of the values
223+
end
224+
225+
# You can also pass watch: to #multi as a shortcut
226+
cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2')
227+
cli.with(key: '{key}') do |conn|
228+
conn.multi(watch: ['{key}1', '{key}2']) do |txn|
229+
old_key1, old_key2 = conn.call('MGET', '{key}1', '{key}2')
230+
txn.call('MSET', '{key}1', old_key2, '{key}2', old_key1)
231+
end
232+
end
233+
```
234+
235+
Pinned connections are aware of redirections and node failures like ordinary calls to `RedisClient::Cluster`, but because
236+
you may have written non-idempotent code inside your block, the block is not automatically retried if e.g. the slot
237+
it is operating on moves to a different node. If you want this, you can opt-in to retries by passing nonzero
238+
`retry_count` to `#with`.
239+
240+
```ruby
241+
cli.with(key: '{key}', retry_count: 1) do |conn|
242+
conn.call('GET', '{key}1')
243+
#=> "value1"
244+
245+
# Now, some changes in cluster topology mean that {key} is moved to a different node!
246+
247+
conn.call('GET', '{key}2')
248+
#=> MOVED 9039 127.0.0.1:16381 (RedisClient::CommandError)
249+
250+
# Luckily, the block will get retried (once) and so both GETs will be re-executed on the newly-discovered
251+
# correct node.
252+
end
253+
```
254+
255+
Because `RedisClient` from the redis-client gem implements `#with` as simply `yield self` and ignores all of its
256+
arguments, it's possible to write code which is compatible with both redis-client and redis-cluster-client; the `#with`
257+
call will pin the connection to a slot when using clustering, or be a no-op when not.
258+
171259
## ACL
172260
The cluster client internally calls [COMMAND](https://redis.io/commands/command/) and [CLUSTER NODES](https://redis.io/commands/cluster-nodes/) commands to operate correctly.
173261
So please permit it like the followings.

lib/redis_client/cluster.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require 'redis_client/cluster/concurrent_worker'
4+
require 'redis_client/cluster/pinning'
45
require 'redis_client/cluster/pipeline'
56
require 'redis_client/cluster/pub_sub'
67
require 'redis_client/cluster/router'
@@ -97,6 +98,12 @@ def pubsub
9798
::RedisClient::Cluster::PubSub.new(@router, @command_builder)
9899
end
99100

101+
def with(key:, write: true, retry_count: 0)
102+
@router.with(key: key, write: write, retry_count: retry_count) do |conn, slot|
103+
yield ::RedisClient::Cluster::Pinning::ConnectionProxy.new(conn, slot, @router, @command_builder)
104+
end
105+
end
106+
100107
def close
101108
@concurrent_worker.close
102109
@router.close

lib/redis_client/cluster/router.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ def command_exists?(name)
199199
@command.exists?(name)
200200
end
201201

202+
def command_extract_all_keys(command)
203+
@command.extract_all_keys(command)
204+
end
205+
202206
def assign_redirection_node(err_msg)
203207
_, slot, node_key = err_msg.split
204208
slot = slot.to_i
@@ -219,6 +223,26 @@ def close
219223
@node.each(&:close)
220224
end
221225

226+
def with(key:, write: true, retry_count: 0)
227+
raise ArgumentError, 'key must be provided' if key.nil?
228+
229+
slot = ::RedisClient::Cluster::KeySlotConverter.convert_with_hashtag(key)
230+
node_key = if write
231+
@node.find_node_key_of_primary(slot)
232+
else
233+
@node.find_node_key_of_replica(slot)
234+
end
235+
node = find_node(node_key)
236+
# Calling #with checks out the underlying connection if this is a pooled connection
237+
# Calling it through #try_delegate ensures we handle any redirections and retry the entire
238+
# transaction if so.
239+
try_delegate(node, :with, retry_count: retry_count) do |conn|
240+
conn.disable_reconnection do
241+
yield conn, slot
242+
end
243+
end
244+
end
245+
222246
private
223247

224248
def send_wait_command(method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/AbcSize

test/redis_client/test_cluster.rb

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,213 @@ def test_circuit_breakers
480480
cli&.close
481481
end
482482

483+
def test_pinning_single_key
484+
got = @client.with(key: 'key1') do |conn|
485+
conn.call('SET', 'key1', 'hello')
486+
conn.call('GET', 'key1')
487+
end
488+
assert_equal('hello', got)
489+
end
490+
491+
def test_pinning_no_key
492+
assert_raises(ArgumentError) do
493+
@client.with(key: nil) {}
494+
end
495+
end
496+
497+
def test_pinning_two_keys
498+
got = @client.with(key: '{slot}') do |conn|
499+
conn.call('SET', '{slot}key1', 'v1')
500+
conn.call('SET', '{slot}key2', 'v2')
501+
conn.call('MGET', '{slot}key1', '{slot}key2')
502+
end
503+
assert_equal(%w[v1 v2], got)
504+
end
505+
506+
def test_pinning_cross_slot
507+
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
508+
@client.with(key: '{slot1}') do |conn|
509+
conn.call('GET', '{slot2}')
510+
end
511+
end
512+
end
513+
514+
def test_pinning_pipeline
515+
got = @client.with(key: '{slot}') do |conn|
516+
conn.call_v(['SET', '{slot}counter', 0])
517+
conn.pipelined do |pipe|
518+
pipe.call_v(['INCR', '{slot}counter'])
519+
pipe.call_v(['INCR', '{slot}counter'])
520+
pipe.call_v(['INCR', '{slot}counter'])
521+
end
522+
conn.call_v(['GET', '{slot}counter']).to_i
523+
end
524+
525+
assert_equal(3, got)
526+
end
527+
528+
def test_pinning_pipeline_with_error
529+
assert_raises(RedisClient::CommandError) do
530+
@client.with(key: '{slot}') do |conn|
531+
conn.pipelined do |pipeline|
532+
pipeline.call('SET', '{slot}key', 'first')
533+
pipeline.call('SET', '{slot}key', 'second', 'too many args')
534+
pipeline.call('SET', '{slot}key', 'third')
535+
end
536+
end
537+
end
538+
539+
wait_for_replication
540+
assert_equal('third', @client.call('GET', '{slot}key'))
541+
end
542+
543+
def test_pinning_transaction
544+
got = @client.with(key: '{slot}') do |conn|
545+
conn.multi do |txn|
546+
txn.call('SET', '{slot}key1', 'value1')
547+
txn.call('SET', '{slot}key2', 'value2')
548+
end
549+
end
550+
551+
assert_equal(%w[OK OK], got)
552+
end
553+
554+
def test_pinning_transaction_watch_arg
555+
@client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2')
556+
@captured_commands.clear
557+
558+
got = @client.with(key: '{slot}') do |conn|
559+
conn.multi(watch: ['{slot}key1', '{slot}key2']) do |txn|
560+
old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2')
561+
txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1)
562+
end
563+
end
564+
565+
assert_equal([
566+
%w[WATCH {slot}key1 {slot}key2],
567+
%w[MGET {slot}key1 {slot}key2],
568+
%w[MULTI],
569+
%w[MSET {slot}key1 val2 {slot}key2 val1],
570+
%w[EXEC]
571+
], @captured_commands.map(&:command))
572+
573+
wait_for_replication
574+
assert_equal(['OK'], got)
575+
assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2'))
576+
end
577+
578+
def test_pinning_transaction_watch_arg_unwatches_on_raise
579+
ex = Class.new(StandardError)
580+
@captured_commands.clear
581+
582+
assert_raises(ex) do
583+
@client.with(key: '{slot}') do |conn|
584+
conn.multi(watch: ['{slot}key1']) do |_txn|
585+
conn.call('GET', '{slot}key1')
586+
raise ex, 'boom'
587+
end
588+
end
589+
end
590+
591+
assert_equal([
592+
%w[WATCH {slot}key1],
593+
%w[GET {slot}key1],
594+
%w[UNWATCH]
595+
], @captured_commands.map(&:command))
596+
end
597+
598+
def test_pinning_transaction_can_watch_manually
599+
@client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2')
600+
@captured_commands.clear
601+
602+
got = @client.with(key: '{slot}') do |conn|
603+
conn.call('WATCH', '{slot}key1', '{slot}key2')
604+
old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2')
605+
conn.multi do |txn|
606+
txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1)
607+
end
608+
end
609+
610+
assert_equal([
611+
%w[WATCH {slot}key1 {slot}key2],
612+
%w[MGET {slot}key1 {slot}key2],
613+
%w[MULTI],
614+
%w[MSET {slot}key1 val2 {slot}key2 val1],
615+
%w[EXEC]
616+
], @captured_commands.map(&:command))
617+
618+
wait_for_replication
619+
assert_equal(['OK'], got)
620+
assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2'))
621+
end
622+
623+
def test_pinning_transaction_can_unwatch_manually
624+
got = @client.with(key: '{slot}') do |conn|
625+
conn.call('WATCH', '{slot}key1')
626+
conn.call('UNWATCH')
627+
end
628+
629+
assert_equal('OK', got)
630+
end
631+
632+
def test_pinning_nonblocking_timeouts_update_topology
633+
@client.call('DEL', '{slot}list')
634+
@captured_commands.clear
635+
636+
assert_raises(::RedisClient::ReadTimeoutError) do
637+
@client.with(key: '{slot}') do |conn|
638+
conn.call_v(['BLPOP', '{slot}list', 0])
639+
end
640+
end
641+
assert_includes(@captured_commands.map(&:command), ['CLUSTER', 'NODES'])
642+
end
643+
644+
def test_pinning_blocking_timeouts_do_not_update_topology
645+
@client.call('DEL', '{slot}list')
646+
@captured_commands.clear
647+
648+
assert_raises(::RedisClient::ReadTimeoutError) do
649+
@client.with(key: '{slot}') do |conn|
650+
conn.blocking_call_v(1, ['BLPOP', '{slot}list', 0])
651+
end
652+
end
653+
refute_includes(@captured_commands.map(&:command), ['CLUSTER', 'NODES'])
654+
end
655+
656+
def test_pinning_sscan
657+
@client.call('DEL', '{slot}set')
658+
expected_set = Set.new
659+
scanned_set = Set.new
660+
1000.times do |i|
661+
expected_set << i
662+
@client.call('SADD', '{slot}set', i)
663+
end
664+
@client.with(key: '{slot}') do |conn|
665+
conn.sscan('{slot}set') do |i|
666+
scanned_set << i.to_i
667+
end
668+
end
669+
670+
assert_equal(expected_set, scanned_set)
671+
end
672+
673+
def test_pinning_zscan
674+
@client.call('DEL', '{slot}set')
675+
expected_set = Set.new
676+
scanned_set = Set.new
677+
1000.times do |i|
678+
expected_set << "member#{i}"
679+
@client.call('ZADD', '{slot}set', i, "member#{i}")
680+
end
681+
@client.with(key: '{slot}') do |conn|
682+
conn.zscan('{slot}set') do |i|
683+
scanned_set << i
684+
end
685+
end
686+
687+
assert_equal(expected_set, scanned_set)
688+
end
689+
483690
private
484691

485692
def wait_for_replication

0 commit comments

Comments
 (0)