diff --git a/redis/client.py b/redis/client.py index 8b5a3faed2..0b9837a0f7 100755 --- a/redis/client.py +++ b/redis/client.py @@ -278,6 +278,87 @@ def parse_slowlog_get(response, **options): } for item in response] +def parse_cluster_slots(resp, **options): + current_host = options.get('current_host', '') + fix_server = lambda (host, port): (host or current_host, port) + + slots = {} + for slot in resp: + start, end, master = slot[:3] + slaves = slot[3:] + slots[start, end] = { + 'master': fix_server(master), + 'slaves': [fix_server(slave) for slave in slaves], + } + + return slots + + +def parse_cluster_nodes(resp, **options): + """ + @see: http://redis.io/commands/cluster-nodes # string + @see: http://redis.io/commands/cluster-slaves # list of string + """ + current_host = options.get('current_host', '') + + def parse_slots(s): + slots, migrations = [], [] + for r in s.split(' '): + if '->-' in r: + slot_id, dst_node_id = r[1:-1].split('->-', 1) + migrations.append({ + 'slot': int(slot_id), + 'node_id': dst_node_id, + 'state': 'migrating' + }) + elif '-<-' in r: + slot_id, src_node_id = r[1:-1].split('-<-', 1) + migrations.append({ + 'slot': int(slot_id), + 'node_id': src_node_id, + 'state': 'importing' + }) + elif '-' in r: + start, end = r.split('-') + slots.extend(range(int(start), int(end) + 1)) + else: + slots.append(int(r)) + + return slots, migrations + + if isinstance(resp, basestring): + resp = resp.splitlines() + + nodes = [] + for line in resp: + parts = line.split(' ', 8) + self_id, addr, flags, master_id, ping_sent, \ + pong_recv, config_epoch, link_state = parts[:8] + + host, port = addr.rsplit(':', 1) + + node = { + 'id': self_id, + 'host': host or current_host, + 'port': int(port), + 'flags': tuple(flags.split(',')), + 'master': master_id if master_id != '-' else None, + 'ping-sent': int(ping_sent), + 'pong-recv': int(pong_recv), + 'link-state': link_state, + 'slots': [], + 'migrations': [], + } + + if len(parts) >= 9: + slots, migrations = parse_slots(parts[8]) + node['slots'], node['migrations'] = tuple(slots), migrations + + nodes.append(node) + + return nodes + + class StrictRedis(object): """ Implementation of the Redis protocol. @@ -361,6 +442,30 @@ class StrictRedis(object): 'SSCAN': parse_scan, 'TIME': lambda x: (int(x[0]), int(x[1])), 'ZSCAN': parse_zscan + }, + # cluster + { + 'CLUSTER ADDSLOTS': bool_ok, + 'CLUSTER COUNT-FAILURE-REPORTS': int, + 'CLUSTER COUNTKEYSINSLOT': int, + 'CLUSTER DELSLOTS': bool_ok, + 'CLUSTER FAILOVER': bool_ok, + 'CLUSTER FORGET': bool_ok, + 'CLUSTER GETKEYSINSLOT': int, + 'CLUSTER INFO': parse_info, + 'CLUSTER KEYSLOT': int, + 'CLUSTER MEET': bool_ok, + 'CLUSTER NODES': parse_cluster_nodes, + 'CLUSTER REPLICATE': bool_ok, + 'CLUSTER RESET': bool_ok, + 'CLUSTER SAVECONFIG': bool_ok, + 'CLUSTER SET-CONFIG-EPOCH': bool_ok, + 'CLUSTER SETSLOT': bool_ok, + 'CLUSTER SLAVES': parse_cluster_nodes, + 'CLUSTER SLOTS': parse_cluster_slots, + 'ASKING': bool_ok, + 'READONLY': bool_ok, + 'READWRITE': bool_ok, } ) @@ -444,6 +549,7 @@ def __init__(self, host='localhost', port=6379, 'ssl_ca_certs': ssl_ca_certs, }) connection_pool = ConnectionPool(**kwargs) + self.host = host self.connection_pool = connection_pool self._use_lua_lock = None @@ -612,8 +718,80 @@ def client_setname(self, name): "Sets the current connection name" return self.execute_command('CLIENT SETNAME', name) + def cluster_addslots(self, *slots): + """Assign new hash slots to receiving node""" + return self.execute_command('CLUSTER ADDSLOTS', *slots) + + def cluster_countkeysinslot(self, slot_id): + """Return the number of local keys in the specified hash slot""" + return self.execute_command('CLUSTER COUNTKEYSINSLOT', slot_id) + + def cluster_count_failure_report(self, node_id): + """Return the number of failure reports active for a given node""" + return self.execute_command('CLUSTER COUNT-FAILURE-REPORTS', node_id) + + def cluster_delslots(self, *slots): + """Set hash slots as unbound in receiving node""" + return self.execute_command('CLUSTER DELSLOTS', *slots) + + def cluster_failover(self, option): + """Forces a slave to perform a manual failover of its master.""" + assert option.upper() in ('FORCE', 'TAKEOVER') + return self.execute_command('CLUSTER FAILOVER', Token(option)) + + def cluster_info(self): + """Provides info about Redis Cluster node state""" + return self.execute_command('CLUSTER INFO') + + def cluster_keyslot(self, name): + """Returns the hash slot of the specified key""" + return self.execute_command('CLUSTER KEYSLOT', name) + + def cluster_meet(self, host, port): + """Force a node cluster to handshake with another node""" + return self.execute_command('CLUSTER MEET', host, port) + + def cluster_nodes(self): + """Force a node cluster to handshake with another node""" + return self.execute_command('CLUSTER NODES', current_host=self.host) + + def cluster_replicate(self, node_id): + """Reconfigure a node as a slave of the specified master node""" + return self.execute_command('CLUSTER REPLICATE', node_id) + + def cluster_reset(self, option='SOFT'): + """Reset a Redis Cluster node""" + assert option.upper() in ('SOFT', 'HARD') + return self.execute_command('CLUSTER RESET', Token(option)) + + def cluster_save_config(self): + """Forces the node to save cluster state on disk""" + return self.execute_command('CLUSTER SAVECONFIG') + + def cluster_set_config_epoch(self, epoch): + """Set the configuration epoch in a new node""" + return self.execute_command('CLUSTER SET-CONFIG-EPOCH', epoch) + + def cluster_setslot(self, slot_id, state, node_id=None): + """Bind an hash slot to a specific node""" + if state.upper() in ('IMPORTING', 'MIGRATING', 'NODE'): + if node_id is not None: + return self.execute_command('CLUSTER SETSLOT', slot_id, Token(state), node_id) + elif state.upper() == 'STABLE': + return self.execute_command('CLUSTER SETSLOT', slot_id, Token('STABLE')) + else: + raise RedisError('Invalid slot state: %s' % state) + + def cluster_slaves(self, node_id): + """Force a node cluster to handshake with another node""" + return self.execute_command('CLUSTER SLAVES', node_id) + + def cluster_slots(self): + """Get array of Cluster slot to node mappings""" + return self.execute_command('CLUSTER SLOTS', current_host=self.host) + def config_get(self, pattern="*"): - "Return a dictionary of configuration based on the ``pattern``" + """Return a dictionary of configuration based on the ``pattern``""" return self.execute_command('CONFIG GET', pattern) def config_set(self, name, value): diff --git a/redis/cluster.py b/redis/cluster.py new file mode 100644 index 0000000000..3b10c97176 --- /dev/null +++ b/redis/cluster.py @@ -0,0 +1,726 @@ +"""cluster support ported from https://github.com/Grokzen/redis-py-cluster + +MY GOALS: + +1. expose abilities which redis cluster provided + - high availability, endure partially slot coverage + - scaling up read operations using slave nodes + - interactive load balance interface via python generator +2. Strict interface provides + - commands without ambiguity + - multiple-key operations MUST located in single slot +3. Loose interface provides + - cross slot helper methods +4. ability to adapt tornado's Future, via some external component(not-included) +5. let exception just raise to user if it's not in redis protocol. + + +REFERENCES: + +about pipeline: +@see: https://groups.google.com/forum/#!topic/jedis_redis/u6j8slokO3E +@see: https://groups.google.com/forum/#!msg/redis-db/4I0ELYnf3bk/Lrctk0ULm6AJ + +about readonly slaves: +@see: https://github.com/antirez/redis/issues/2216 +@see: https://github.com/xetorthio/jedis/issues/790 + +""" +import time +import logging + +from redis.crc import crc16 +from redis._compat import iteritems, nativestr +from redis.client import StrictRedis, dict_merge, list_or_args +from redis.connection import Connection, ConnectionPool, DefaultParser +from redis.exceptions import ( + ConnectionError, ClusterPartitionError, ClusterError, + TimeoutError, ResponseError, BusyLoadingError, ClusterCrossSlotError, + ClusterSlotNotServedError, ClusterDownError, +) + +# TODO: partially update cluster slot info +# TODO: every possible situation in cluster +# TODO: master slave changed +# TODO: master timed out +# TODO: slave timed out +# TODO: READWRITE/READONLY switching +# TODO: connection_pool (partially) rebuild +# TODO: migrate test cases from redis-py-cluster +# TODO: pipeline +# TODO: generator as interactive load balancer + +# TODO: read from slave, but slave changed to master +# TODO: pubsub +# TODO: lock +# TODO: advanced balancer +# TODO: loose redis interface(cross slot ops) +# TODO: script +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) +LOGGER.addHandler(logging.StreamHandler()) + + +class ClusterBalancer(object): + def get_node_for_key(self, key_name, readonly): + return self.get_node_for_slot(Cluster.keyslot(key_name), readonly=readonly) + + def get_node_for_slot(self, slot_id, readonly): + raise NotImplementedError() + + def get_random_node(self, readonly): + raise NotImplementedError() + + def get_shards_nodes(self, readonly): + """one each shard""" + raise NotImplementedError() + + +class RoundRobinClusterNodeBalancer(ClusterBalancer): + RR_COUNTER = 0 + + def __init__(self, manager): + self.manager = manager + + def _incr(self, by=1): + counter = self.__class__.RR_COUNTER = (self.__class__.RR_COUNTER + by) % Cluster.KEY_SLOTS + return counter + + def get_node_for_slot(self, slot_id, readonly): + self.manager.discover_cluster() + + if not readonly: + return self.manager.get_master_node(slot_id) + else: + nodes = self.manager.get_slave_nodes(slot_id, slave_only=False) + return list(nodes)[self._incr() % len(nodes)] + + def get_random_node(self, readonly): + self.manager.discover_cluster() + + if readonly: + nodes = self.manager.nodes.keys() + else: + nodes = self.manager.shards.keys() + + return nodes[self._incr() % len(nodes)] + + def get_shards_nodes(self, readonly): + self.manager.discover_cluster() + + for shard in self.manager.shards.values(): + if readonly: + nodes = list(shard['slaves']) + [shard['master']] + yield list(nodes)[self._incr() % len(nodes)] + else: + yield shard['master'] + + +class ClusterParser(DefaultParser): + class AskError(ResponseError): + """ + src node: MIGRATING to dst node + get > ASK error + ask dst node > ASKING command + dst node: IMPORTING from src node + asking command only affects next command + any op will be allowed after asking command + """ + def __init__(self, resp): + """should only redirect to master node""" + slot_id, new_node = resp.split(' ') + host, port = new_node.rsplit(':', 1) + self.slot_id = int(slot_id) + self.node_addr = self.host, self.port = host, int(port) + + class TryAgainError(ResponseError): + def __init__(self, resp): + pass + + class MovedError(AskError): + pass + + EXCEPTION_CLASSES = dict_merge( + DefaultParser.EXCEPTION_CLASSES, { + 'ASK': AskError, + 'TRYAGAIN': TryAgainError, + 'MOVED': MovedError, + 'CLUSTERDOWN': ClusterDownError, + 'CROSSSLOT': ClusterCrossSlotError, + }) + + +class ClusterConnection(Connection): + description_format = "ClusterConnection" + + def __init__(self, *args, **kwargs): + self.readonly = kwargs.pop('readonly', False) + kwargs['parser_class'] = ClusterParser + super(ClusterConnection, self).__init__(*args, **kwargs) + + def on_connect(self): + """Initialize the connection, set readonly is required""" + super(ClusterConnection, self).on_connect() + if self.readonly: + self.send_command('READONLY') + if nativestr(self.read_response()) != 'OK': + raise ResponseError('Cannot set READONLY flag') + + +class Cluster(object): + """keep knowledge of cluster""" + + KEY_SLOTS = 16384 + # timeout for collecting from startup nodes + DEFAULT_TIMEOUT = 0.5 + + def __init__(self, startup_nodes=None, allow_partition=False, **cluster_kwargs): + """allow_partition: raise Exception when partition appears or not.""" + + if not startup_nodes: + raise ValueError('No startup nodes provided') + + self.cluster_kwargs = dict([ + (k, v) for k, v in iteritems(cluster_kwargs) + if k.startswith('socket_') + ]) + + self.cluster_kwargs['decode_responses'] = True + self.cluster_kwargs['password'] = cluster_kwargs.get('password') + self.cluster_kwargs.setdefault('socket_timeout', self.DEFAULT_TIMEOUT) + self.allow_partition = allow_partition + + self.shards = {} # master_id -> shard_info + self.nodes = {} + self.startup_nodes = set(startup_nodes) # [(host, port), ...] + self.slots = {} + self.pubsub_node = None + + # version for keep generators work + self.slots_epoch = 0 + + @classmethod + def keyslot(cls, key): + """Calculate keyslot for a given key. + + This also works for binary keys that is used in python 3. + """ + k = unicode(key) + start = k.find('{') + if start > -1: + end = k.find('}', start + 1) + if end > -1 and end != start + 1: + k = k[start + 1:end] + return crc16(k) % cls.KEY_SLOTS + + def discover_cluster(self, force=False, check_all=False): + """ + check_all: read from each startup nodes for detect cluster partition + force: do the discover action even cluster slot info is complete + """ + if not force and len(self.slots) == self.KEY_SLOTS: + return + + self.nodes, self.shards = {}, {} + # TODO: discover more node dynamically + for startup_node in self.startup_nodes: + try: + nodes = StrictRedis(*startup_node, **self.cluster_kwargs).cluster_nodes() + except ConnectionError: + LOGGER.warning('Startup node: %s:%s not responding in time.' % startup_node) + continue + + # build shards + for node in nodes: + node_id = node['id'] + self.nodes[node_id] = { + 'connected': node['link-state'] == 'connected', + 'id': node['id'], + 'host': node['host'], + 'port': node['port'], + 'addr': (node['host'], node['port']), + 'flags': node['flags'], + 'master': node['master'], + 'is_master': not node['master'], + 'slots': node['slots'], + } + + if 'master' not in node['flags']: + continue + + self.shards[node_id] = { + 'master': node_id, + 'slaves': set(), + 'slots': node['slots'], + } + + # fill slaves & slots + for node in nodes: + shard_id = node['master'] or node['id'] + + if 'slave' in node['flags']: + self.shards[shard_id]['slaves'].add(node['id']) + + for slot_id in node['slots']: + old_master = self.slots.setdefault(slot_id, shard_id) + if old_master != shard_id: + raise ClusterPartitionError( + 'Cluster partition appears: slot #%s, node: %s:[%s] and %s:[%s]' % ( + slot_id, old_master, self.node_addr(old_master), + shard_id, self.node_addr(shard_id))) + else: + self.slots[slot_id] = shard_id + + if not check_all and len(self.slots) == self.KEY_SLOTS: + break + + if not self.nodes: + raise ClusterDownError('No startup node can be reached. [\n%s\n]' % self.startup_nodes) + + self.startup_nodes = set([node['addr'] for node in self.nodes.values()]) + self.pubsub_node = self.determine_pubsub_node() + self.slots_epoch += 1 + + def get_master_node(self, slot_id): + self.discover_cluster() + try: + shard_id = self.slots[slot_id] + except KeyError: + raise ClusterSlotNotServedError(slot_id) + else: + return self.shards[shard_id]['master'] + + def get_slave_nodes(self, slot_id, slave_only=True): + self.discover_cluster() + try: + shard_id = self.slots[slot_id] + except KeyError: + raise ClusterSlotNotServedError(slot_id) + else: + shard = self.shards[shard_id] + if slave_only: + return list(shard['slaves']) + else: + return list(shard['slaves']) + [shard['master']] + + def determine_pubsub_node(self): + """ + Determine what node object should be used for pubsub commands. + + All clients in the cluster will talk to the same pubsub node to ensure + all code stay compatible. See pubsub doc for more details why. + + Always use the server with highest port number + """ + highest = -1 + node = None + for node in self.nodes.values(): + host, port = node['addr'] + if port > highest: + highest = port + + return node + + def all_nodes(self): + return self.nodes.values() + + def node_addr(self, node_id): + return self.nodes[node_id]['addr'] + + def slot_moved(self, slot_id, addr): + """signal from response, target node should be master""" + # XXX: maybe no rebuild cluster? only current slot? + self.startup_nodes.add(addr) + self.discover_cluster(force=True) + + def ask_node(self, slot_id, addr): + """signal from response, target node should be master""" + self.startup_nodes.add(addr) + + +class ClusterConnectionPool(object): + """connection pool for redis cluster + collection of pools + """ + DEFAULT_TIMEOUT = 0.1 + DEFAULT_MAX_CONN = 32 + + def __init__(self, manager, connection_class=ClusterConnection, + max_connections=None, **connection_kwargs): + + max_connections = max_connections or self.DEFAULT_MAX_CONN + if not isinstance(max_connections, (int, long)) or max_connections < 0: + raise ValueError('"max_connections" must be a positive integer') + + self.manager = manager + self.connection_class = connection_class + self.connection_kwargs = connection_kwargs + self.connection_kwargs.setdefault('socket_timeout', self.DEFAULT_TIMEOUT) + self.max_connections = max_connections + + # (host, port) -> pool + self.pools = {} + self.reset() + + def reset(self, force=False): + self.manager.discover_cluster(force=force) + self.pools = dict([ + (node['addr'], self.make_connection_pool(node['addr'], not node['is_master'])) + for node in self.manager.all_nodes() + ]) + + def get_connection(self, addr, command_name=None, *keys, **options): + """Get a connection from the pool""" + return self.pools[addr].get_connection(command_name, *keys, **options) + + def make_connection_pool(self, (host, port), readonly): + """Create a new connection pool""" + return ConnectionPool(host=host, port=port, + connection_class=self.connection_class, + max_connections=self.max_connections, + readonly=readonly, + **self.connection_kwargs) + + def release(self, connection): + """Releases the connection back to the pool""" + self.pools[connection.host, connection.port].release(connection) + + def disconnect(self): + """Disconnects all connections in the pool""" + for pool in self.pools.values(): + pool.disconnect() + + +class StrictClusterRedis(StrictRedis): + """ + If a command is implemented over the one in StrictRedis then it requires some changes compared to + the regular implementation of the method. + """ + COMMAND_TTL = 16 + COMMAND_FLAGS = dict_merge( + dict.fromkeys([ + 'RANDOMKEY', 'CLUSTER KEYSLOT', 'ECHO', + ], 'random'), + dict.fromkeys([ + 'CLUSTER COUNTKEYSINSLOT', + ], 'slot-id'), + dict.fromkeys([ + # impossible in cluster mode + 'SELECT', 'MOVE', 'SLAVEOF', + + # sentinels + 'SENTINEL GET-MASTER-ADDR-BY-NAME', 'SENTINEL MASTER', 'SENTINEL MASTERS', + 'SENTINEL MONITOR', 'SENTINEL REMOVE', 'SENTINEL SENTINELS', 'SENTINEL SET', + 'SENTINEL SLAVES', + + # admin commands + 'BGREWRITEAOF', 'BGSAVE', 'SAVE', 'INFO', 'LASTSAVE', + 'CONFIG GET', 'CONFIG SET', 'CONFIG RESETSTAT', 'CONFIG REWRITE', + 'CLIENT KILL', 'CLIENT LIST', 'CLIENT GETNAME', 'CLIENT SETNAME', + 'SLOWLOG GET', 'SLOWLOG RESET', 'SLOWLOG LEN', 'SHUTDOWN', + + # lua script + 'EVALSHA', 'SCRIPT EXISTS', 'SCRIPT KILL', 'SCRIPT LOAD', 'SCRIPT FLUSH', + + # unknown behaviors in cluster + 'PING', 'MONITOR', 'TIME', 'READONLY', 'READWRITE', + 'OBJECT REFCOUNT', 'OBJECT ENCODING', 'OBJECT IDLETIME', + + # test/doc command + 'PFSELFTEST', 'COMMAND', 'COMMAND COUNT', 'COMMAND GETKEYS', 'COMMAND INFO', + + # pipeline related + 'DISCARD', 'MULTI', 'WATCH', 'UNWATCH', + + # latency monitor + 'LATENCY LATEST', 'LATENCY HISTORY', 'LATENCY RESET', 'LATENCY GRAPH', 'LATENCY DOCTOR', + + # unknown commands + 'REPLCONF', 'SYNC', 'PSYNC', + + # all_master for loose + 'FLUSHALL', 'FLUSHDB', 'SCAN', + + # pubsub_node for loose + 'PUBLISH', 'SUBSCRIBE', 'PSUBSCRIBE', 'UNSUBSCRIBE', 'PUNSUBSCRIBE', + 'PUBSUB CHANNELS', 'PUBSUB NUMSUB', 'PUBSUB NUMPAT', + ], 'blocked'), # block for strict client + ) + COMMAND_PARSE_KEYS = dict_merge( + dict.fromkeys([ + 'BRPOPLPUSH', 'RPOPLPUSH', 'RENAME', 'SMOVE', + ], lambda args: [args[1], args[2]]), + dict.fromkeys([ + 'BLPOP', 'BRPOP', + ], lambda args: args[2:-1]), + dict.fromkeys([ + 'ZINTERSTORE', 'ZUNIONSTORE', + ], lambda args: [args[1]] + args[3:3+args[2]]), + dict.fromkeys([ + 'MSET', 'MSETNX', + ], lambda args: args[1::2]), + dict.fromkeys([ + 'DEL', 'RPOPLPUSH', 'RENAME', 'RENAMENX', 'SMOVE', 'SDIFF', 'SDIFFSTORE', + 'SINTER', 'SINTERSTORE', 'SUNION', 'SUNIONSTORE', 'PFMERGE', 'MGET', 'PFCOUNT', + ], lambda args: args[1:]), + { + 'BITOP': lambda args: args[2:], + }, + ) + READONLY_COMMANDS = { + # single key ops + # - bits + 'BITCOUNT', 'BITPOS', 'GETBIT', + + # - string + 'GET', 'MGET', 'TTL', 'PTTL', 'EXISTS', 'GETRANGE', 'SUBSTR', 'STRLEN', + 'DUMP', 'TYPE', 'RANDOMKEY', + + # - hash + 'HEXISTS', 'HGET', 'HGETALL', 'HKEYS', 'HLEN', 'HMGET', 'HSCAN', 'HVALS', + + # - list + 'LINDEX', 'LLEN', 'LRANGE', + + # - SET + 'SISMEMBER', 'SMEMBERS', 'SRANDMEMBER', 'SCARD', 'SSCAN', 'SDIFF', 'SINTER', 'SUNION', + + # - ZSET + 'ZCARD', 'ZCOUNT', 'ZLEXCOUNT', 'ZRANGE', 'ZRANGEBYLEX', 'ZRANGEBYSCORE', 'ZSCAN', + 'ZRANK', 'ZREVRANGE', 'ZREVRANGEBYLEX', 'ZREVRANGEBYSCORE', 'ZREVRANK', 'ZSCORE', + + # - hyper loglog + 'PFCOUNT', + } + + def __init__(self, startup_nodes, max_connections=32, discover_cluster=True, + node_balancer=None, packer_kwargs=None, **kwargs): + """ + startup_nodes --> List of nodes that initial bootstrapping can be done from + max_connections --> Maximum number of connections that should be kept open at one time + pipeline_use_threads -> By default, use threads in pipeline if this flag is set to True + **kwargs --> Extra arguments that will be sent into StrictRedis instance when created + (See Official redis-py doc for supported kwargs + [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) + Some kwargs is not supported and will raise RedisClusterException + - db (Redis do not support database SELECT in cluster mode) + """ + super(StrictClusterRedis, self).__init__(**kwargs) + + if 'db' in kwargs: + raise ClusterError("Argument 'db' is not possible to use in cluster mode") + + self.manager = Cluster(startup_nodes=startup_nodes, **kwargs) + self.connection_pool = ClusterConnectionPool(manager=self.manager, max_connections=max_connections, **kwargs) + self.node_balancer = node_balancer or RoundRobinClusterNodeBalancer(self.manager) + + if discover_cluster: + self.manager.discover_cluster() + + self.packer_conn = Connection(**packer_kwargs or {}) + self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() + + def mget(self, keys, *args): + """collects from slots + """ + slot_keys = {} + origin_keys = list_or_args(keys, args) + for key in origin_keys: + slot_keys.setdefault(Cluster.keyslot(key), set()).add(key) + + results = {} + for slot_id, keys in slot_keys.iteritems(): + keys = list(keys) + values = super(StrictClusterRedis, self).mget(keys) + results.update(dict(zip(keys, values))) + + return [results[key] for key in origin_keys] + + def delete(self, *names): + """collects from slots + """ + slot_keys = {} + for key in names: + slot_keys.setdefault(Cluster.keyslot(key), set()).add(key) + + result = 0 + for slot_id, keys in slot_keys.iteritems(): + result += super(StrictClusterRedis, self).delete(*keys) + + return result + + def dbsize(self): + """collects from all shards + """ + result = 0 + for node in self.node_balancer.get_shards_nodes(readonly=True): + connection = self.connection_pool.get_connection(node) + try: + result += self.execute_connection_command(connection, ('DBSIZE', )) + finally: + self.connection_pool.release(connection) + + return result + + def keys(self, pattern='*'): + """collects from all shards + """ + result = [] + for node in self.node_balancer.get_shards_nodes(readonly=True): + connection = self.connection_pool.get_connection(node) + try: + result += self.execute_connection_command(connection, ('KEYS', pattern)) + finally: + self.connection_pool.release(connection) + + return result + + def determine_node(self, command_args): + command = command_args[0] + readonly = command in self.READONLY_COMMANDS + + node_flag = self.COMMAND_FLAGS.get(command) + if node_flag == 'blocked': + raise ClusterError('Blocked command: %s' % command) + elif node_flag == 'random': + node_id = self.node_balancer.get_random_node(readonly=readonly) + elif node_flag == 'slot-id': + node_id = self.node_balancer.get_node_for_slot(slot_id=int(command_args[1]), readonly=readonly) + elif command in self.COMMAND_PARSE_KEYS: + slot_ids = set() + for key_name in self.COMMAND_PARSE_KEYS[command](command_args): + slot_ids.add(Cluster.keyslot(key_name)) + + if len(slot_ids) != 1: + raise ClusterCrossSlotError() + + node_id = self.node_balancer.get_node_for_slot(slot_id=slot_ids.pop(), readonly=readonly) + else: + key_name = command_args[1] + node_id = self.node_balancer.get_node_for_key(key_name=key_name, readonly=readonly) + + return node_id + + @classmethod + def _desc_node(cls, node_or_conn): + if isinstance(node_or_conn, Connection): + node_or_conn = node_or_conn.host, node_or_conn.port + + return '%s:%s' % node_or_conn + + def execute_connection_command(self, connection, command_args, parser_args=None): + command = command_args[0] + connection.send_command(*command_args) + return self.parse_response(connection, command, **parser_args or {}) + + def stack_commands(self, connection): + """like a pipeline, collect and execute and parse + """ + + # TODO: debug code + # collect commands + stack = [] + while True: + command = yield len(stack) + if command is None: + break + + if not isinstance(command, tuple) or len(command) != 2: + raise ValueError('command should be 2-tuple (command_args, parser_args)') + + stack.append(command) + + packed_command = connection.pack_commands(cmd_args for cmd_args, _ in stack) + connection.send_packed_command(packed_command) + for command_args, parse_args in stack: + # ensure all responses are consumed + try: + resp = self.parse_response(connection, command_args[0], **parse_args) + except Exception as e: + resp = e + + yield resp + + def execute_command(self, *command_args, **parser_args): + """Send a command to a node in the cluster + SINGLE & SIMPLE MODE + 1. single slot command [v] + 2. random node command [v] + 3. multiple slot command [loose] + 3.1. *cross slot command [loose] + + 1. single key [v] + 2. no key [v] + 3. multiple key in single slot [v] + """ + command = command_args[0] + packed_command = self.packer_conn.pack_command(*command_args) + + ttl = self.COMMAND_TTL + redirect_addr = None + asking = False + while ttl > 0: + ttl -= 1 + + if not redirect_addr: + node_id = self.determine_node(command_args) + node_addr = self.manager.node_addr(node_id) + else: + node_addr, redirect_addr = redirect_addr, None + + connection = self.connection_pool.get_connection(node_addr) + try: + if asking: + asking = False + multi_command = connection.pack_commands((('ASKING', ), command_args)) + connection.send_packed_command(multi_command) + if not self.parse_response(connection, 'ASKING'): + raise ResponseError('ASKING %s is not OK' % (self._desc_node(connection))) + else: + connection.send_packed_command(packed_command) + return self.parse_response(connection, command, **parser_args) + except BusyLoadingError: + raise + except (ConnectionError, TimeoutError) as e: + LOGGER.warning('Node %s: %s' % (e.__class__.__name__, self._desc_node(connection))) + if ttl < self.COMMAND_TTL / 2: + time.sleep(0.05) + except ClusterParser.TryAgainError: + LOGGER.warning('Cluster in unstable state.') + if ttl < self.COMMAND_TTL / 2: + time.sleep(0.05) + + # prevent re-determine node + redirect_addr = node_addr + except ClusterParser.MovedError as e: + LOGGER.warning('MOVED: %s [%s] -> [%s]' % ( + e.slot_id, self._desc_node(connection), self._desc_node(e.node_addr))) + self.manager.slot_moved(e.slot_id, e.node_addr) + redirect_addr = e.node_addr + except ClusterParser.AskError as e: + LOGGER.warning('ASK redirect: %s [%s] -> [%s]' % ( + e.slot_id, self._desc_node(connection), self._desc_node(e.node_addr))) + self.manager.ask_node(e.slot_id, e.node_addr) + redirect_addr, asking = e.node_addr, True + finally: + self.connection_pool.release(connection) + + raise ClusterError('TTL exhausted.') + + +class ClusterRedis(StrictClusterRedis): + """ + KEYS pattern + Find all keys matching the given pattern + MIGRATE host port key destination-db timeout [COPY] [REPLACE] + Atomically transfer a key from a Redis instance to another one. + + implements cross-slot/all-nodes operations + """ + + COMMAND_FLAGS = dict_merge( + StrictClusterRedis.COMMAND_FLAGS, + dict.fromkeys([ + 'FLUSHALL', 'FLUSHDB', + ], 'all-master'), + ) diff --git a/redis/crc.py b/redis/crc.py new file mode 100644 index 0000000000..91b29fb407 --- /dev/null +++ b/redis/crc.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- + +# python std lib +import sys + + +XMODEMCRC16Lookup = [ + 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, + 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, + 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, + 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, + 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, + 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, + 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, + 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, + 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, + 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, + 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, + 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, + 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, + 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, + 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, + 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, + 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, + 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, + 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, + 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, + 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, + 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, + 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, + 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, + 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, + 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, + 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, + 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, + 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, + 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, + 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, + 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0 +] + + +def _crc16_py3(data): + crc = 0 + for byte in data.encode("utf-8"): + crc = ((crc << 8) & 0xff00) ^ XMODEMCRC16Lookup[((crc >> 8) & 0xff) ^ byte] + return crc & 0xffff + + +def _crc16_py2(data): + crc = 0 + for byte in data.encode("utf-8"): + crc = ((crc << 8) & 0xff00) ^ XMODEMCRC16Lookup[((crc >> 8) & 0xff) ^ ord(byte)] + return crc & 0xffff + + +if sys.version_info >= (3, 0, 0): + crc16 = _crc16_py3 +else: + crc16 = _crc16_py2 diff --git a/redis/exceptions.py b/redis/exceptions.py index a8518c708a..7467204c75 100644 --- a/redis/exceptions.py +++ b/redis/exceptions.py @@ -69,3 +69,24 @@ class LockError(RedisError, ValueError): # NOTE: For backwards compatability, this class derives from ValueError. # This was originally chosen to behave like threading.Lock. pass + + +class ClusterError(RedisError): + pass + + +class ClusterCrossSlotError(ResponseError): + message = "Keys in request don't hash to the same slot" + + +class ClusterSlotNotServedError(ResponseError): + pass + + +class ClusterPartitionError(RedisError): + pass + + +class ClusterDownError(ClusterError, ResponseError): + def __init__(self, resp): + print resp