Skip to content

Request response policies #2996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions examples/lua-multi-incr.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const client = createClient({
scripts: {
mincr: defineScript({
NUMBER_OF_KEYS: 2,
// TODO add RequestPolicy: ,
SCRIPT:
'return {' +
'redis.pcall("INCRBY", KEYS[1], ARGV[1]),' +
Expand Down
8 changes: 8 additions & 0 deletions packages/client/lib/client/parser.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { RedisArgument } from '../RESP/types';
import { RedisVariadicArgument } from '../commands/generic-transformers';

export type CommandIdentifier = { command: string, subcommand: string };
export interface CommandParser {
redisArgs: ReadonlyArray<RedisArgument>;
keys: ReadonlyArray<RedisArgument>;
firstKey: RedisArgument | undefined;
preserve: unknown;
commandIdentifier: CommandIdentifier;

push: (...arg: Array<RedisArgument>) => unknown;
pushVariadic: (vals: RedisVariadicArgument) => unknown;
Expand Down Expand Up @@ -44,6 +46,12 @@ export class BasicCommandParser implements CommandParser {
return tmp.join('_');
}

get commandIdentifier(): CommandIdentifier {
const command = this.#redisArgs[0] instanceof Buffer ? this.#redisArgs[0].toString() : this.#redisArgs[0];
const subcommand = this.#redisArgs[1] instanceof Buffer ? this.#redisArgs[1].toString() : this.#redisArgs[1];
return { command, subcommand };
}

push(...arg: Array<RedisArgument>) {
this.#redisArgs.push(...arg);
};
Expand Down
14 changes: 14 additions & 0 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,20 @@ export default class RedisClusterSlots<
await Promise.allSettled(promises);
}

getAllClients() {
return Array.from(this.#clients());
}

getAllMasterClients() {
const result = [];
for (const master of this.masters) {
if (master.client) {
result.push(master.client);
}
}
return result;
}

getClient(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined
Expand Down
191 changes: 152 additions & 39 deletions packages/client/lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import { PubSubListener } from '../client/pub-sub';
import { ErrorReply } from '../errors';
import { RedisTcpSocketOptions } from '../client/socket';
import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
import { BasicCommandParser } from '../client/parser';
import { BasicCommandParser, CommandParser } from '../client/parser';
import { ASKING_CMD } from '../commands/ASKING';
import SingleEntryCache from '../single-entry-cache'
import { POLICIES, PolicyResolver, REQUEST_POLICIES_WITH_DEFAULTS, RESPONSE_POLICIES_WITH_DEFAULTS, StaticPolicyResolver } from './request-response-policies';
import { aggregateLogicalAnd, aggregateLogicalOr, aggregateMax, aggregateMerge, aggregateMin, aggregateSum } from './request-response-policies/generic-aggregators';
interface ClusterCommander<
M extends RedisModules,
F extends RedisFunctions,
Expand Down Expand Up @@ -189,7 +191,7 @@ export default class RedisCluster<
command.parseCommand(parser, ...args);

return this._self._execute(
parser.firstKey,
parser,
command.IS_READ_ONLY,
this._commandOptions,
(client, opts) => client._executeCommand(command, parser, opts, transformReply)
Expand All @@ -205,7 +207,7 @@ export default class RedisCluster<
command.parseCommand(parser, ...args);

return this._self._execute(
parser.firstKey,
parser,
command.IS_READ_ONLY,
this._self._commandOptions,
(client, opts) => client._executeCommand(command, parser, opts, transformReply)
Expand All @@ -223,7 +225,7 @@ export default class RedisCluster<
fn.parseCommand(parser, ...args);

return this._self._execute(
parser.firstKey,
parser,
fn.IS_READ_ONLY,
this._self._commandOptions,
(client, opts) => client._executeCommand(fn, parser, opts, transformReply)
Expand All @@ -241,7 +243,7 @@ export default class RedisCluster<
script.parseCommand(parser, ...args);

return this._self._execute(
parser.firstKey,
parser,
script.IS_READ_ONLY,
this._commandOptions,
(client, opts) => client._executeScript(script, parser, opts, transformReply)
Expand Down Expand Up @@ -299,6 +301,7 @@ export default class RedisCluster<

private _self = this;
private _commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>;
private _policyResolver: PolicyResolver;

/**
* An array of the cluster slots, each slot contain its `master` and `replicas`.
Expand Down Expand Up @@ -356,6 +359,8 @@ export default class RedisCluster<
if (options?.commandOptions) {
this._commandOptions = options.commandOptions;
}

this._policyResolver = new StaticPolicyResolver(POLICIES);
}

duplicate<
Expand Down Expand Up @@ -451,54 +456,157 @@ export default class RedisCluster<
}

async _execute<T>(
firstKey: RedisArgument | undefined,
parser: CommandParser,
isReadonly: boolean | undefined,
options: ClusterCommandOptions | undefined,
fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, opts?: ClusterCommandOptions) => Promise<T>
): Promise<T> {

const maxCommandRedirections = this._options.maxCommandRedirections ?? 16;
let client = await this._slots.getClient(firstKey, isReadonly);
let i = 0;

let myFn = fn;
const policyResult = this._policyResolver.resolvePolicy(parser.commandIdentifier);

while (true) {
try {
return await myFn(client, options);
} catch (err) {
myFn = fn;
if(!policyResult.ok) {
throw new Error(`Policy resolution error for ${parser.commandIdentifier}: ${policyResult.error}`);
}

// TODO: error class
if (++i > maxCommandRedirections || !(err instanceof Error)) {
throw err;
}
const requestPolicy = policyResult.value.request
const responsePolicy = policyResult.value.response

let clients: Array<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>;
// https://redis.io/docs/latest/develop/reference/command-tips
switch (requestPolicy) {

case REQUEST_POLICIES_WITH_DEFAULTS.ALL_NODES:
clients = this._slots.getAllClients()
break;

case REQUEST_POLICIES_WITH_DEFAULTS.ALL_SHARDS:
clients = this._slots.getAllMasterClients()
break;

case REQUEST_POLICIES_WITH_DEFAULTS.MULTI_SHARD:
clients = await Promise.all(
parser.keys.map((key) => this._slots.getClient(key, isReadonly))
);
break;

case REQUEST_POLICIES_WITH_DEFAULTS.SPECIAL:
throw new Error(`Special request policy not implemented for ${parser.commandIdentifier}`);

case REQUEST_POLICIES_WITH_DEFAULTS.DEFAULT_KEYLESS:
//TODO handle undefined case?
clients = [this._slots.getRandomNode().client!]
break;

case REQUEST_POLICIES_WITH_DEFAULTS.DEFAULT_KEYED:
clients = [await this._slots.getClient(parser.firstKey, isReadonly)]
break;

default:
throw new Error(`Unknown request policy ${requestPolicy}`);

if (err.message.startsWith('ASK')) {
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
let redirectTo = await this._slots.getMasterByAddress(address);
if (!redirectTo) {
await this._slots.rediscover(client);
redirectTo = await this._slots.getMasterByAddress(address);
}

const responsePromises = clients.map(async client => {

let i = 0;

let myFn = fn;

while (true) {
try {
return await myFn(client, options);
} catch (err) {
myFn = fn;

// TODO: error class
if (++i > maxCommandRedirections || !(err instanceof Error)) {
throw err;
}

if (!redirectTo) {
throw new Error(`Cannot find node ${address}`);
if (err.message.startsWith('ASK')) {
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
let redirectTo = await this._slots.getMasterByAddress(address);
if (!redirectTo) {
await this._slots.rediscover(client);
redirectTo = await this._slots.getMasterByAddress(address);
}

if (!redirectTo) {
throw new Error(`Cannot find node ${address}`);
}

client = redirectTo;
myFn = this._handleAsk(fn);
continue;
}

if (err.message.startsWith('MOVED')) {
await this._slots.rediscover(client);
client = await this._slots.getClient(parser.firstKey, isReadonly);
continue;
}

client = redirectTo;
myFn = this._handleAsk(fn);
continue;
}

if (err.message.startsWith('MOVED')) {
await this._slots.rediscover(client);
client = await this._slots.getClient(firstKey, isReadonly);
continue;
}
throw err;
}
}

throw err;
}
})

switch (responsePolicy) {
case RESPONSE_POLICIES_WITH_DEFAULTS.ONE_SUCCEEDED: {
return Promise.any(responsePromises);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.ALL_SUCCEEDED: {
const responses = await Promise.all(responsePromises);
return responses[0]
}

case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_LOGICAL_AND: {
const responses = await Promise.all(responsePromises)
return aggregateLogicalAnd(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_LOGICAL_OR: {
const responses = await Promise.all(responsePromises)
return aggregateLogicalOr(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_MIN: {
const responses = await Promise.all(responsePromises);
return aggregateMin(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_MAX: {
const responses = await Promise.all(responsePromises);
return aggregateMax(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_SUM: {
const responses = await Promise.all(responsePromises);
return aggregateSum(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.SPECIAL: {
throw new Error(`Special response policy not implemented for ${parser.commandIdentifier}`);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.DEFAULT_KEYLESS: {
const responses = await Promise.all(responsePromises);
return aggregateMerge(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.DEFAULT_KEYED: {
const responses = await Promise.all(responsePromises);
return responses as T;
}

default:
throw new Error(`Unknown response policy ${responsePolicy}`);
}

}

async sendCommand<T = ReplyUnion>(
Expand All @@ -508,8 +616,13 @@ export default class RedisCluster<
options?: ClusterCommandOptions,
// defaultPolicies?: CommandPolicies
): Promise<T> {

const parser = new BasicCommandParser();
firstKey && parser.push(firstKey)
args.forEach(arg => parser.push(arg));

return this._self._execute(
firstKey,
parser,
isReadonly,
options,
(client, opts) => client.sendCommand(args, opts)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// import { RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from "../../RESP/types";
// import { ShardNode } from "../cluster-slots";
// import type { Either } from './types';

// export interface CommandRouter<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts,
// RESP extends RespVersions,
// TYPE_MAPPING extends TypeMapping> {
// routeCommand(
// command: string,
// policy: RequestPolicy,
// ): Either<ShardNode<M, F, S, RESP, TYPE_MAPPING>, 'no-available-nodes' | 'routing-failed'>;
// }
Loading
Loading