diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 000000000..81229e8d2 Binary files /dev/null and b/.DS_Store differ diff --git a/examples/riakc_nhs.config b/examples/riakc_nhs.config new file mode 100644 index 000000000..4a78dd7d9 --- /dev/null +++ b/examples/riakc_nhs.config @@ -0,0 +1,46 @@ +{mode, max}. + +{duration, 1440}. +{report_interval, 10}. + +{node_name, testnode1}. + +{concurrent, 100}. + +{driver, basho_bench_driver_nhs}. + +%% Ignored by alwaysget and unique operations +{key_generator, {eightytwenty_int, 100000000}}. + +{value_generator, {semi_compressible, 5000, 3000, 5, 0.2}}. + +%% For alwaysget operations what is: +%% - the maximum number of keys per worker (max number of keys = this * concurrent) +%% - whether the inserts should be in key_order, or whether the order should be skewed by a 1 byte hash at the head of the key +{alwaysget, {600000, 1, skew_order}}. + +{pb_ips, [{127,0,0,1}]}. +{http_ips, [{127,0,0,1}]}. + +{riakc_pb_replies, 1}. + +{operations, [{alwaysget_pb, 612}, {alwaysget_updatewith2i, 192}, + {put_unique, 64}, {get_unique, 128}, + {postcodequery_http, 3}, {dobquery_http, 1}]}. + +%% Use {auto_reconnect, false} to get "old" behavior (prior to April 2013). +%% See deps/riakc/src/riakc_pb_socket.erl for all valid socket options. +{pb_connect_options, [{auto_reconnect, true}]}. + +%% Overrides for the PB client's default 60 second timeout, on a +%% per-type-of-operation basis. All timeout units are specified in +%% milliseconds. The pb_timeout_general config item provides a +%% default timeout if the read/write/listkeys/mapreduce timeout is not +%% specified. + +{pb_timeout_general, 60000}. +{pb_timeout_read, 10000}. +{pb_timeout_write, 10000}. +{pb_timeout_listkeys, 60000}. +%% The general timeout will be used because this specific item is commented: +%% {pb_timeout_mapreduce, 50000}. diff --git a/examples/riakc_nhs_clinicals.config b/examples/riakc_nhs_clinicals.config new file mode 100644 index 000000000..075553798 --- /dev/null +++ b/examples/riakc_nhs_clinicals.config @@ -0,0 +1,46 @@ +{mode, max}. + +{duration, 1440}. +{report_interval, 10}. + +{node_name, testnode1}. + +{concurrent, 100}. + +{driver, basho_bench_driver_nhs}. + +%% Ignored by alwaysget and unique operations +{key_generator, {eightytwenty_int, 100000000}}. + +{value_generator, {semi_compressible, 80000, 20000, 2, 0.1}}. + +%% For alwaysget operations what is: +%% - the maximum number of keys per worker (max number of keys = this * concurrent) +%% - whether the inserts should be in key_order +{alwaysget, {400000, 120000, skew_order}}. +{unique, {7000, skew_order}}. + +{pb_ips, [{127,0,0,1}]}. +{http_ips, [{127,0,0,1}]}. + +{riakc_pb_replies, 1}. + +{operations, [{alwaysget_pb, 700}, {alwaysget_updatewith2i, 120}, + {put_unique, 70}, {get_unique, 110}]}. + +%% Use {auto_reconnect, false} to get "old" behavior (prior to April 2013). +%% See deps/riakc/src/riakc_pb_socket.erl for all valid socket options. +{pb_connect_options, [{auto_reconnect, true}]}. + +%% Overrides for the PB client's default 60 second timeout, on a +%% per-type-of-operation basis. All timeout units are specified in +%% milliseconds. The pb_timeout_general config item provides a +%% default timeout if the read/write/listkeys/mapreduce timeout is not +%% specified. + +{pb_timeout_general, 60000}. +{pb_timeout_read, 10000}. +{pb_timeout_write, 10000}. +{pb_timeout_listkeys, 60000}. +%% The general timeout will be used because this specific item is commented: +%% {pb_timeout_mapreduce, 50000}. diff --git a/examples/riakc_nhs_general.config b/examples/riakc_nhs_general.config new file mode 100644 index 000000000..a9bfb02f3 --- /dev/null +++ b/examples/riakc_nhs_general.config @@ -0,0 +1,48 @@ +{mode, max}. + +{duration, 1440}. + +{report_interval, 10}. + +{node_name, testnode1}. + +{concurrent, 100}. + +{driver, basho_bench_driver_nhs}. + +%% Ignored by alwaysget and unique operations +{key_generator, {eightytwenty_int, 100000000}}. + +{value_generator, {semi_compressible, 8000, 2000, 10, 0.2}}. + +%% For alwaysget operations what is: +%% - the maximum number of keys per worker (max number of keys = this * concurrent) +%% - whether the inserts should be in key_order +{alwaysget, {240000, 100000, skew_order}}. +{unique, {8000, skew_order}}. + +{pb_ips, [{127,0,0,1}]}. +{http_ips, [{127,0,0,1}]}. + +{riakc_pb_replies, 1}. + +{operations, [{alwaysget_pb, 651}, {alwaysget_updatewith2i, 120}, + {put_unique, 95}, {get_unique, 130}, + {postcodequery_http, 3}, {dobquery_http, 1}]}. + +%% Use {auto_reconnect, false} to get "old" behavior (prior to April 2013). +%% See deps/riakc/src/riakc_pb_socket.erl for all valid socket options. +{pb_connect_options, [{auto_reconnect, true}]}. + +%% Overrides for the PB client's default 60 second timeout, on a +%% per-type-of-operation basis. All timeout units are specified in +%% milliseconds. The pb_timeout_general config item provides a +%% default timeout if the read/write/listkeys/mapreduce timeout is not +%% specified. + +{pb_timeout_general, 60000}. +{pb_timeout_read, 10000}. +{pb_timeout_write, 10000}. +{pb_timeout_listkeys, 60000}. +%% The general timeout will be used because this specific item is commented: +%% {pb_timeout_mapreduce, 50000}. \ No newline at end of file diff --git a/examples/riakc_nhs_passive.config b/examples/riakc_nhs_passive.config new file mode 100644 index 000000000..469898be8 --- /dev/null +++ b/examples/riakc_nhs_passive.config @@ -0,0 +1,48 @@ +{mode, {rate, 10}}. + +{duration, 1440}. + +{report_interval, 10}. + +{node_name, testnode1}. + +{concurrent, 10}. + +{driver, basho_bench_driver_nhs}. + +%% Ignored by alwaysget and unique operations +{key_generator, {eightytwenty_int, 100000000}}. + +{value_generator, {semi_compressible, 8000, 2000, 10, 0.2}}. + +%% For alwaysget operations what is: +%% - the maximum number of keys per worker (max number of keys = this * concurrent) +%% - whether the inserts should be in key_order +{alwaysget, {240000, 100000, skew_order}}. +{unique, {8000, skew_order}}. + +{pb_ips, [{127,0,0,1}]}. +{http_ips, [{127,0,0,1}]}. + +{riakc_pb_replies, 1}. + +{operations, [{alwaysget_pb, 4}, {alwaysget_updatewith2i, 600}, + {put_unique, 390}, {get_unique, 4}, + {postcodequery_http, 1}, {dobquery_http, 1}]}. + +%% Use {auto_reconnect, false} to get "old" behavior (prior to April 2013). +%% See deps/riakc/src/riakc_pb_socket.erl for all valid socket options. +{pb_connect_options, [{auto_reconnect, true}]}. + +%% Overrides for the PB client's default 60 second timeout, on a +%% per-type-of-operation basis. All timeout units are specified in +%% milliseconds. The pb_timeout_general config item provides a +%% default timeout if the read/write/listkeys/mapreduce timeout is not +%% specified. + +{pb_timeout_general, 60000}. +{pb_timeout_read, 10000}. +{pb_timeout_write, 10000}. +{pb_timeout_listkeys, 60000}. +%% The general timeout will be used because this specific item is commented: +%% {pb_timeout_mapreduce, 50000}. \ No newline at end of file diff --git a/examples/riakc_pb.config b/examples/riakc_pb.config index 02ec6ad75..f4673821e 100644 --- a/examples/riakc_pb.config +++ b/examples/riakc_pb.config @@ -1,22 +1,30 @@ {mode, max}. {duration, 10}. -{report_interval,1}. +{report_interval, 1}. {concurrent, 10}. {driver, basho_bench_driver_riakc_pb}. -{key_generator, {int_to_bin_bigendian, {uniform_int, 10000}}}. +{key_generator, + {concat_binary, + {base64, + {int_to_bin_bigendian, + {pareto_int, 200000000} + } + }, + <<>>} +}. -{value_generator, {fixed_bin, 10000}}. +{value_generator, {semi_compressible, 4000, 4000, 10, 0.001}}. {riakc_pb_ips, [{127,0,0,1}]}. {riakc_pb_replies, 1}. %%% {operations, [{get, 1}]}. -{operations, [{get, 1}, {update, 1}]}. +{operations, [{get, 5}, {update, 1}]}. %% Use {auto_reconnect, false} to get "old" behavior (prior to April 2013). %% See deps/riakc/src/riakc_pb_socket.erl for all valid socket options. @@ -29,8 +37,8 @@ %% specified. {pb_timeout_general, 30000}. -{pb_timeout_read, 5000}. -{pb_timeout_write, 5000}. -{pb_timeout_listkeys, 50000}. +{pb_timeout_read, 10000}. +{pb_timeout_write, 10000}. +{pb_timeout_listkeys, 3600000}. %% The general timeout will be used because this specific item is commented: %% {pb_timeout_mapreduce, 50000}. diff --git a/examples/riakc_pb_listkeys.config b/examples/riakc_pb_listkeys.config new file mode 100644 index 000000000..89b22ff89 --- /dev/null +++ b/examples/riakc_pb_listkeys.config @@ -0,0 +1,44 @@ +{mode, max}. + +{duration, 300}. +{report_interval, 300}. + +{concurrent, 1}. + +{driver, basho_bench_driver_riakc_pb}. + +{key_generator, + {concat_binary, + {base64, + {int_to_bin_bigendian, + {pareto_int, 200000000} + } + }, + <<>>} +}. + +{value_generator, {semi_compressible, 4000, 8000}}. + +{riakc_pb_ips, [{127,0,0,1}]}. + +{riakc_pb_replies, 1}. + +%%% {operations, [{get, 1}]}. +{operations, [{pause_minute, 4}, {listkeys, 1}]}. + +%% Use {auto_reconnect, false} to get "old" behavior (prior to April 2013). +%% See deps/riakc/src/riakc_pb_socket.erl for all valid socket options. +{pb_connect_options, [{auto_reconnect, true}]}. + +%% Overrides for the PB client's default 60 second timeout, on a +%% per-type-of-operation basis. All timeout units are specified in +%% milliseconds. The pb_timeout_general config item provides a +%% default timeout if the read/write/listkeys/mapreduce timeout is not +%% specified. + +{pb_timeout_general, 30000}. +{pb_timeout_read, 10000}. +{pb_timeout_write, 10000}. +{pb_timeout_listkeys, 3600000}. +%% The general timeout will be used because this specific item is commented: +%% {pb_timeout_mapreduce, 50000}. diff --git a/priv/summary_ii.r b/priv/summary_ii.r new file mode 100644 index 000000000..0d54cf147 --- /dev/null +++ b/priv/summary_ii.r @@ -0,0 +1,129 @@ +#!/usr/bin/env Rscript + +# Parse the --file= argument out of command line args and +# determine where base directory is so that we can source +# our common sub-routines +arg0 <- sub("--file=(.*)", "\\1", grep("--file=", commandArgs(), value = TRUE)) +dir0 <- dirname(arg0) +source(file.path(dir0, "common.r")) + +theme_set(theme_grey(base_size = 17)) + +# Setup parameters for the script +params = matrix(c( + 'help', 'h', 0, "logical", + 'width', 'x', 2, "integer", + 'height', 'y', 2, "integer", + 'outfile', 'o', 2, "character", + 'indir', 'i', 2, "character", + 'tstart', '1', 2, "integer", + 'tend', '2', 2, "integer", + 'ylabel1stgraph', 'Y', 2, "character", + 'title', 't', 2, "character" + ), ncol=4, byrow=TRUE) + +# Parse the parameters +opt = getopt(params) + +if (!is.null(opt$help)) + { + cat(paste(getopt(params, command = basename(arg0), usage = TRUE))) + q(status=1) + } + +# Initialize defaults for opt +if (is.null(opt$width)) { opt$width = 1280 } +if (is.null(opt$height)) { opt$height = 960 } +if (is.null(opt$indir)) { opt$indir = "current"} +if (is.null(opt$outfile)) { opt$outfile = file.path(opt$indir, "summary.png") } +if (is.null(opt$ylabel1stgraph)) { opt$ylabel1stgraph = "Ops/sec" } +if (is.null(opt$title)) { opt$title = "Throughput" } + +# Load the benchmark data, passing the time-index range we're interested in +b = load_benchmark(opt$indir, opt$tstart, opt$tend) + +# If there is no actual data available, bail +if (nrow(b$latencies) == 0) +{ + stop("No latency information available to analyze in ", opt$indir) +} + +png(file = opt$outfile, width = opt$width, height = opt$height) + +# First plot req/sec from summary +plot1 <- qplot(elapsed, successful / window, data = b$summary, + geom = c("smooth", "point"), + xlab = "Elapsed Secs", ylab = opt$ylabel1stgraph, + main = opt$title) + + + ylim(0, 25000) + + + geom_smooth(aes(y = successful / window, colour = "ok"), size=0.5) + + geom_point(aes(y = successful / window, colour = "ok"), size=2.0) + + + scale_colour_manual("Response", values = c("#188125")) + + +# Setup common elements of the latency plots +latency_plot <- ggplot(b$latencies, aes(x = elapsed)) + + facet_grid(. ~ op) + + labs(x = "Elapsed Secs", y = "Latency (ms)") + +# Plot median, mean and 95th percentiles +plot2 <- latency_plot + labs(title = "Mean and Median Latency") + + + ylim(0, 100) + + + geom_smooth(aes(y = median, color = "median"), size=0.5) + + + geom_smooth(aes(y = mean, color = "mean"), size=0.5) + + + + scale_colour_manual("Percentile", values = c("#009D91", "#FFA700")) + # scale_color_hue("Percentile", + # breaks = c("X95th", "mean", "median"), + # labels = c("95th", "Mean", "Median")) + +# Plot 99th percentile +plot3 <- latency_plot + labs(title = "99th Percentile Latency") + + + ylim(0, 5000) + + + geom_smooth(aes(y = X99th, color = "99th"), size=0.5) + + geom_point(aes(y = X99th, color = "99th"), size=2.0) + + scale_colour_manual("Percentile", values = c("#FF665F", "#009D91")) + # scale_color_hue("Percentile", + # breaks = c("X99_9th","X99th" ), + # labels = c("99.9th", "99th")) + +# Plot 99.9th percentile +# plot4 <- latency_plot + labs(title = "99.9th Percentile Latency") + +# +# ylim(0, 2000) + +# +# geom_smooth(aes(y = X99_9th, color = "99.9th"), size=0.5) + +# geom_point(aes(y = X99_9th, color = "99.9th"), size=2.0) + +# scale_colour_manual("Percentile", values = c("#FF665F", "#009D91", "#FFA700")) + +# Plot 100th percentile +plot5 <- latency_plot + labs(title = "Maximum Latency") + + + ylim(0, 10000) + + + geom_smooth(aes(y = max, color = "max"), size=0.5) + + geom_point(aes(y = max, color = "max"), size=2.0) + + scale_colour_manual("Percentile", values = c("#FF665F", "#009D91", "#FFA700")) + +grid.newpage() + +pushViewport(viewport(layout = grid.layout(4, 1))) + +vplayout <- function(x,y) viewport(layout.pos.row = x, layout.pos.col = y) + +print(plot1, vp = vplayout(1,1)) +print(plot2, vp = vplayout(2,1)) +print(plot3, vp = vplayout(3,1)) +# print(plot4, vp = vplayout(4,1)) +print(plot5, vp = vplayout(4,1)) + +dev.off() diff --git a/src/basho_bench_driver_nhs.erl b/src/basho_bench_driver_nhs.erl new file mode 100644 index 000000000..9c3aa2c54 --- /dev/null +++ b/src/basho_bench_driver_nhs.erl @@ -0,0 +1,730 @@ +%% ------------------------------------------------------------------- +%% +%% basho_bench_driver_2i_nhs: Driver for NHS-like workloads +%% +%% Copyright (c) 2009 Basho Techonologies +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(basho_bench_driver_nhs). + +-export([new/1, + run/4]). + +-export([run_aaequery/1, + run_listkeys/1, + run_segmentfold/1]). + +-include("basho_bench.hrl"). + +-record(state, { + pb_pid, + http_host, + http_port, + recordBucket, + documentBucket, + pb_timeout, + http_timeout, + fold_timeout, + alwaysget_perworker_maxkeycount = 1 :: integer(), + alwaysget_perworker_minkeycount = 1 :: integer(), + alwaysget_keyorder :: key_order|skew_order, + unique_size :: integer(), + unique_keyorder :: key_order|skew_order, + postcodeq_count = 0 :: integer(), + postcodeq_sum = 0 :: integer(), + dobq_count = 0 :: integer(), + dobq_sum = 0 :: integer(), + query_logfreq :: integer(), + nominated_id :: boolean(), + % ID 1 is nominated to do special work + singleton_pid :: pid() | undefined, + unique_key_count = 1 :: non_neg_integer(), + alwaysget_key_count = 1 :: non_neg_integer(), + keyid :: binary(), + last_forceaae = os:timestamp() :: erlang:timestamp() + }). + +-define(QUERYLOG_FREQ, 1000). +-define(FORCEAAE_FREQ, 10). % Every 10 seconds + +-define(POSTCODE_AREAS, + [{1, "AB"}, {2, "AL"}, {3, "B"}, {4, "BA"}, {5, "BB"}, + {6, "BD"}, {7, "BH"}, {8, "BL"}, {9, "BN"}, {10, "BR"}, + {11, "BS"}, {12, "BT"}, {13, "CA"}, {14, "CB"}, {15, "CF"}, + {16, "CH"}, {17, "CM"}, {18, "CO"}, {19, "CR"}, {20, "CT"}, + {21, "CV"}, {22, "CW"}, {23, "DA"}, {24, "DD"}, {25, "DE"}, + {26, "DG"}, {27, "DH"}, {28, "DL"}, {29, "DN"}, {30, "DT"}, + {31, "DU"}, {32, "E"}, {33, "EC"}, {34, "EH"}, {35, "EN"}, + {36, "EX"}, {37, "FK"}, {38, "FY"}, {39, "G"}, {40, "GL"}, + {41, "GU"}, {42, "HA"}, {43, "HD"}, {44, "HG"}, {45, "HP"}, + {46, "HR"}, {47, "HS"}, {48, "HU"}, {49, "HX"}, {50, "IG"}, + {51, "IP"}, {52, "IV"}, {53, "KA"}, {54, "KT"}, {55, "KW"}, + {56, "KY"}, {57, "L"}, {58, "LA"}, {59, "LD"}, {60, "LE"}, + {61, "LL"}, {62, "LS"}, {63, "LU"}, {64, "M"}, {65, "ME"}, + {66, "MK"}, {67, "ML"}, {68, "N"}, {69, "NE"}, {70, "NG"}, + {71, "MM"}, {72, "NP"}, {73, "NR"}, {74, "NW"}, {75, "OL"}, + {76, "OX"}]). +-define(DATETIME_FORMAT, "~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w"). +-define(DATE_FORMAT, "~b~2..0b~2..0b"). + +%% ==================================================================== +%% API +%% ==================================================================== + +new(Id) -> + %% Ensure that ibrowse is started... + application:start(ibrowse), + + %% Ensure that riakc library is in the path... + ensure_module(riakc_pb_socket), + ensure_module(mochijson2), + + %% Read config settings... + PBIPs = basho_bench_config:get(pb_ips, ["127.0.0.1"]), + PBPort = basho_bench_config:get(pb_port, 8087), + HTTPIPs = basho_bench_config:get(http_ips, ["127.0.0.1"]), + HTTPPort = basho_bench_config:get(http_port, 8098), + + PBTimeout = basho_bench_config:get(pb_timeout_general, 30*1000), + HTTPTimeout = basho_bench_config:get(http_timeout_general, 30*1000), + FoldTimeout = basho_bench_config:get(fold_timeout_general, 60*60*1000), + + %% Choose the target node using our ID as a modulus + HTTPTargets = basho_bench_config:normalize_ips(HTTPIPs, HTTPPort), + {HTTPTargetIp, + HTTPTargetPort} = lists:nth((Id rem length(HTTPTargets) + 1), + HTTPTargets), + ?INFO("Using http target ~p:~p for worker ~p\n", [HTTPTargetIp, + HTTPTargetPort, + Id]), + + %% Choose the target node using our ID as a modulus + PBTargets = basho_bench_config:normalize_ips(PBIPs, PBPort), + {PBTargetIp, + PBTargetPort} = lists:nth((Id rem length(PBTargets) + 1), + PBTargets), + ?INFO("Using pb target ~p:~p for worker ~p\n", [PBTargetIp, + PBTargetPort, + Id]), + + {AGMaxKC, AGMinKC, AGKeyOrder} = + basho_bench_config:get(alwaysget, {1, 1, key_order}), + {DocSize, DocKeyOrder} = + basho_bench_config:get(unique, {8000, key_order}), + + NodeID = basho_bench_config:get(node_name, node()), + + KeyIDint = erlang:phash2(Id) bxor erlang:phash2(NodeID), + ?INFO("Using Node ID ~w to generate ID ~w\n", [node(), KeyIDint]), + + case riakc_pb_socket:start_link(PBTargetIp, PBTargetPort) of + {ok, Pid} -> + NominatedID = Id == 7, + {ok, #state { + pb_pid = Pid, + http_host = HTTPTargetIp, + http_port = HTTPTargetPort, + recordBucket = <<"domainRecord">>, + documentBucket = <<"domainDocument">>, + pb_timeout = PBTimeout, + http_timeout = HTTPTimeout, + fold_timeout = FoldTimeout, + query_logfreq = random:uniform(?QUERYLOG_FREQ), + nominated_id = NominatedID, + unique_key_count = 1, + alwaysget_key_count = 0, + alwaysget_perworker_maxkeycount = AGMaxKC, + alwaysget_perworker_minkeycount = AGMinKC, + alwaysget_keyorder = AGKeyOrder, + unique_size = DocSize, + unique_keyorder = DocKeyOrder, + keyid = <> + }}; + {error, Reason2} -> + ?FAIL_MSG("Failed to connect riakc_pb_socket to ~p port ~p: ~p\n", + [PBTargetIp, PBTargetPort, Reason2]) + end. + +%% Get a single object. +run(get_pb, KeyGen, _ValueGen, State) -> + Pid = State#state.pb_pid, + Bucket = State#state.recordBucket, + Key = to_binary(KeyGen()), + case riakc_pb_socket:get(Pid, Bucket, Key, State#state.pb_timeout) of + {ok, _Obj} -> + {ok, State}; + {error, notfound} -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} + end; + +run(alwaysget_http, _KeyGen, _ValueGen, State) -> + Host = inet_parse:ntoa(State#state.http_host), + Port = State#state.http_port, + Bucket = State#state.recordBucket, + AGKC = State#state.alwaysget_key_count, + case AGKC > State#state.alwaysget_perworker_minkeycount of + true -> + KeyInt = eightytwenty_keycount(AGKC), + Key = generate_uniquekey(KeyInt, State#state.keyid, + State#state.alwaysget_keyorder), + URL = + io_lib:format("http://~s:~p/buckets/~s/keys/~s", + [Host, Port, Bucket, Key]), + + case get_existing(URL, State#state.http_timeout) of + ok -> + {ok, State}; + {error, Reason} -> + % not_found is not OK + {error, Reason, State} + end; + false -> + {silent, State} + end; + +run(alwaysget_pb, _KeyGen, _ValueGen, State) -> + % Get one of the objects with unique keys + Pid = State#state.pb_pid, + Bucket = State#state.recordBucket, + AGKC = State#state.alwaysget_key_count, + case AGKC > State#state.alwaysget_perworker_minkeycount of + true -> + KeyInt = eightytwenty_keycount(AGKC), + Key = generate_uniquekey(KeyInt, State#state.keyid, + State#state.alwaysget_keyorder), + + case riakc_pb_socket:get(Pid, + Bucket, Key, + State#state.pb_timeout) of + {ok, _Obj} -> + {ok, State}; + {error, Reason} -> + % not_found is not OK + {error, Reason, State} + end; + + false -> + {silent, State} + + end; + +run(alwaysget_updatewith2i, _KeyGen, ValueGen, State) -> + Pid = State#state.pb_pid, + Bucket = State#state.recordBucket, + AGKC = State#state.alwaysget_key_count, + Value = ValueGen(), + KeyInt = eightytwenty_keycount(AGKC), + ToExtend = + random:uniform(State#state.alwaysget_perworker_maxkeycount) > AGKC, + + {Robj0, NewAGKC} = + case ToExtend of + true -> + % Expand the key count + ExpansionKey = + generate_uniquekey(AGKC + 1, State#state.keyid, + State#state.alwaysget_keyorder), + case {AGKC rem 1000, State#state.nominated_id} of + {0, true} -> + lager:info("Always grow key count passing ~w " + ++ "for nominated worker", + [AGKC]); + _ -> + ok + end, + {riakc_obj:new(Bucket, ExpansionKey), + AGKC + 1}; + false -> + % update an existing key + ExistingKey = + generate_uniquekey(KeyInt, State#state.keyid, + State#state.alwaysget_keyorder), + {ok, Robj} = + riakc_pb_socket:get(Pid, + Bucket, ExistingKey, + State#state.pb_timeout), + {Robj, AGKC} + end, + + MD0 = riakc_obj:get_update_metadata(Robj0), + MD1 = riakc_obj:clear_secondary_indexes(MD0), + MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()), + Robj1 = riakc_obj:update_value(Robj0, Value), + Robj2 = riakc_obj:update_metadata(Robj1, MD2), + + %% Write the object... + case riakc_pb_socket:put(Pid, Robj2, State#state.pb_timeout) of + ok -> + {ok, State#state{alwaysget_key_count = NewAGKC}}; + {error, Reason} -> + {error, Reason, State} + end; + + +%% Update an object with secondary indexes. +run(update_with2i, KeyGen, ValueGen, State) -> + Pid = State#state.pb_pid, + Bucket = State#state.recordBucket, + Key = to_binary(KeyGen()), + Value = ValueGen(), + + Robj0 = + case riakc_pb_socket:get(Pid, Bucket, Key, State#state.pb_timeout) of + {ok, Robj} -> + Robj; + {error, notfound} -> + riakc_obj:new(Bucket, to_binary(Key)) + end, + + MD0 = riakc_obj:get_update_metadata(Robj0), + MD1 = riakc_obj:clear_secondary_indexes(MD0), + MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()), + Robj1 = riakc_obj:update_value(Robj0, Value), + Robj2 = riakc_obj:update_metadata(Robj1, MD2), + + %% Write the object... + case riakc_pb_socket:put(Pid, Robj2, State#state.pb_timeout) of + ok -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} + end; +%% Put an object with a unique key and a non-compressable value +run(put_unique, _KeyGen, _ValueGen, State) -> + Pid = State#state.pb_pid, + Bucket = State#state.documentBucket, + + UKC = State#state.unique_key_count, + Key = + generate_uniquekey(UKC, + State#state.keyid, + State#state.unique_keyorder), + + Value = non_compressible_value(State#state.unique_size), + + Robj0 = riakc_obj:new(Bucket, to_binary(Key)), + MD1 = riakc_obj:get_update_metadata(Robj0), + MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()), + Robj1 = riakc_obj:update_value(Robj0, Value), + Robj2 = riakc_obj:update_metadata(Robj1, MD2), + + %% Write the object... + case riakc_pb_socket:put(Pid, Robj2, State#state.pb_timeout) of + ok -> + {ok, State#state{unique_key_count = UKC + 1}}; + {error, Reason} -> + {error, Reason, State} + end; +run(get_unique, _KeyGen, _ValueGen, State) -> + % Get one of the objects with unique keys + Pid = State#state.pb_pid, + Bucket = State#state.documentBucket, + UKC = State#state.unique_key_count, + Key = generate_uniquekey(random:uniform(UKC), + State#state.keyid, + State#state.unique_keyorder), + case riakc_pb_socket:get(Pid, Bucket, Key, State#state.pb_timeout) of + {ok, _Obj} -> + {ok, State}; + {error, notfound} -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} + end; + +%% Query results via the HTTP interface. +run(postcodequery_http, _KeyGen, _ValueGen, State) -> + Host = inet_parse:ntoa(State#state.http_host), + Port = State#state.http_port, + Bucket = State#state.recordBucket, + + L = length(?POSTCODE_AREAS), + {_, Area} = lists:keyfind(random:uniform(L), 1, ?POSTCODE_AREAS), + District = Area ++ integer_to_list(random:uniform(26)), + StartKey = District ++ "|" ++ "a", + EndKey = District ++ "|" ++ "h", + URL = io_lib:format("http://~s:~p/buckets/~s/index/postcode_bin/~s/~s", + [Host, Port, Bucket, StartKey, EndKey]), + + case json_get(URL, State#state.http_timeout) of + {ok, {struct, Proplist}} -> + Results = proplists:get_value(<<"keys">>, Proplist), + C0 = State#state.postcodeq_count, + S0 = State#state.postcodeq_sum, + {C1, S1} = + case {C0, C0 rem State#state.query_logfreq} of + {C0, 0} when C0 > 0 -> + Avg = float_to_list(S0 / C0, [{decimals, 3}]), + lager:info("Average postcode query result size of ~s", + [Avg]), + {1, length(Results)}; + _ -> + {C0 + 1, S0 + length(Results)} + end, + {ok, State#state{postcodeq_count = C1, postcodeq_sum = S1}}; + {error, Reason} -> + io:format("[~s:~p] ERROR - Reason: ~p~n", + [?MODULE, ?LINE, Reason]), + {error, Reason, State} + end; + +%% Query results via the HTTP interface. +run(dobquery_http, _KeyGen, _ValueGen, State) -> + Host = inet_parse:ntoa(State#state.http_host), + Port = State#state.http_port, + Bucket = State#state.recordBucket, + + RandYear = random:uniform(70) + 1950, + DoBStart = integer_to_list(RandYear) ++ "0101", + DoBEnd = integer_to_list(RandYear) ++ "0110", + + URLSrc = + "http://~s:~p/buckets/~s/index/dateofbirth_bin/~s/~s?term_regex=~s", + RE= "[0-9]{8}...[a-d]", + URL = io_lib:format(URLSrc, + [Host, Port, Bucket, DoBStart, DoBEnd, RE]), + + case json_get(URL, State#state.http_timeout) of + {ok, {struct, Proplist}} -> + Results = proplists:get_value(<<"keys">>, Proplist), + C0 = State#state.dobq_count, + S0 = State#state.dobq_sum, + {C1, S1} = + case {C0, C0 rem State#state.query_logfreq} of + {C0, 0} when C0 > 0 -> + Avg = float_to_list(S0 / C0, [{decimals, 3}]), + lager:info("Average dob query result size of ~s", + [Avg]), + {1, length(Results)}; + _ -> + {C0 + 1, S0 + length(Results)} + end, + {ok, State#state{dobq_count = C1, dobq_sum = S1}}; + {error, Reason} -> + io:format("[~s:~p] ERROR - Reason: ~p~n", + [?MODULE, ?LINE, Reason]), + {error, Reason, State} + end; + +run(aae_query, _KeyGen, _ValueGen, State) -> + IsAlive = + case State#state.singleton_pid of + undefined -> + false; + LastPid -> + is_process_alive(LastPid) + end, + case {State#state.nominated_id, IsAlive} of + {true, true} -> + lager:info("Skipping aae query for overlap"), + {ok, State}; + {true, false} -> + Pid = spawn(?MODULE, run_aaequery, [State]), + {ok, State#state{singleton_pid = Pid}}; + _ -> + {ok, State} + end; + +run(list_keys, _KeyGen, _ValueGen, State) -> + IsAlive = + case State#state.singleton_pid of + undefined -> + false; + LastPid -> + is_process_alive(LastPid) + end, + case {State#state.nominated_id, IsAlive} of + {true, true} -> + lager:info("Skipping listkeys for overlap"), + {ok, State}; + {true, false} -> + Pid = spawn(?MODULE, run_listkeys, [State]), + {ok, State#state{singleton_pid = Pid}}; + _ -> + {ok, State} + end; + +run(segment_fold, _KeyGen, _ValueGen, State) -> + IsAlive = + case State#state.singleton_pid of + undefined -> + false; + LastPid -> + is_process_alive(LastPid) + end, + case {State#state.nominated_id, IsAlive} of + {true, true} -> + lager:info("Skipping segment fold for overlap"), + {ok, State}; + {true, false} -> + Pid = spawn(?MODULE, run_segmentfold, [State]), + {ok, State#state{singleton_pid = Pid}}; + _ -> + {ok, State} + end; + +run(force_aae, KeyGen, ValueGen, State) -> + SinceLastForceSec = + timer:now_diff(os:timestamp(), State#state.last_forceaae)/1000000, + case {State#state.nominated_id, SinceLastForceSec > ?FORCEAAE_FREQ} of + {true, true} -> + Host = inet_parse:ntoa(State#state.http_host), + Port = State#state.http_port, + Bucket = binary_to_list(State#state.recordBucket), + Key = KeyGen(), + Timeout = State#state.http_timeout, + + URLSrc = "http://~s:~p/buckets/~s/keys/~p?force_aae=true", + URL = io_lib:format(URLSrc, [Host, Port, Bucket, Key]), + Target = lists:flatten(URL), + + case ibrowse:send_req(Target, [], get, [], [], Timeout) of + {ok, "200", _, _Body} -> + {ok, State#state{last_forceaae = os:timestamp()}}; + {ok, "404", _, _NotFound} -> + {ok, State#state{last_forceaae = os:timestamp()}}; + Other -> + {error, Other, State} + end; + _ -> + run(get_pb, KeyGen, ValueGen, State) + end; + +run(Other, _, _, _) -> + throw({unknown_operation, Other}). + +%% ==================================================================== +%% Internal functions +%% ==================================================================== + + + +json_get(Url, Timeout) -> + json_get(Url, Timeout, true). + +json_get(Url, Timeout, UsePool) -> + Target = lists:flatten(Url), + Response = + case UsePool of + true -> + ibrowse:send_req(Target, [], get, [], [], Timeout); + false -> + {ok, C} = ibrowse:spawn_worker_process(Target), + ibrowse:send_req_direct(C, Target, [], get, [], [], Timeout) + end, + case Response of + {ok, "200", _, Body} -> + {ok, mochijson2:decode(Body)}; + Other -> + {error, Other} + end. + +get_existing(Url, Timeout) -> + case ibrowse:send_req(lists:flatten(Url), [], get, [], [], Timeout) of + {ok, "200", _, _Body} -> + ok; + Other -> + {error, Other} + end. + + +to_binary(B) when is_binary(B) -> + B; +to_binary(I) when is_integer(I) -> + list_to_binary(integer_to_list(I)); +to_binary(L) when is_list(L) -> + list_to_binary(L). + +ensure_module(Module) -> + case code:which(Module) of + non_existing -> + ?FAIL_MSG("~s requires " ++ atom_to_list(Module) ++ + " module to be available on code path.\n", + [?MODULE]); + _ -> + ok + end. + + +%% ==================================================================== +%% Spawned Runners +%% ==================================================================== + + +run_aaequery(State) -> + SW = os:timestamp(), + lager:info("Commencing aaequery request"), + + Host = inet_parse:ntoa(State#state.http_host), + Port = State#state.http_port, + Bucket = State#state.recordBucket, + + KeyStart = "0", + KeyEnd = "z", + + MapFoldMod = "riak_kv_tictac_folder", + + URLSrc = + "http://~s:~p/buckets/~s/index/$key/~s/~s?mapfold=true&mapfoldmod=~s", + URL = io_lib:format(URLSrc, + [Host, Port, Bucket, KeyStart, KeyEnd, MapFoldMod]), + + case json_get(URL, State#state.fold_timeout, false) of + {ok, {struct, TreeL}} -> + {<<"count">>, Count} = lists:keyfind(<<"count">>, 1, TreeL), + lager:info("AAE query returned in ~w seconds covering ~s keys", + [timer:now_diff(os:timestamp(), SW)/1000000, Count]), + + {ok, State}; + {error, Reason} -> + io:format("[~s:~p] ERROR - Reason: ~p~n", + [?MODULE, ?LINE, Reason]), + {error, Reason, State} + end. + +run_listkeys(State) -> + SW = os:timestamp(), + lager:info("Commencing list keys request"), + + Host = inet_parse:ntoa(State#state.http_host), + Port = State#state.http_port, + Bucket = State#state.recordBucket, + + URLSrc = + "http://~s:~p/buckets/~s/keys?keys=true", + URL = io_lib:format(URLSrc, + [Host, Port, Bucket]), + + case json_get(URL, State#state.fold_timeout, false) of + {ok, {struct, [{<<"keys">>, KeyList}]}} -> + lager:info("List keys returned ~w keys in ~w seconds", + [length(KeyList), + timer:now_diff(os:timestamp(), SW)/1000000]), + + {ok, State}; + {error, Reason} -> + io:format("[~s:~p] ERROR - Reason: ~p~n", + [?MODULE, ?LINE, Reason]), + {error, Reason, State} + end. + + +run_segmentfold(State) -> + SW = os:timestamp(), + lager:info("Commencing segment fold request"), + + Host = inet_parse:ntoa(State#state.http_host), + Port = State#state.http_port, + Bucket = State#state.recordBucket, + + KeyStart = "0", + KeyEnd = "z", + + MapFoldMod = "riak_kv_segment_folder", + + % '{"check_presence": "false", + % "tree_size": "small", + % "segment_list": [1, 10001]}' + MapFoldOpts = + "eyJjaGVja19wcmVzZW5jZSI6ICJmYWxzZSIsICJ0cmVlX3NpemUiOiAic21hbGwiLCA" + ++ "ic2VnbWVudF9saXN0IjogWzEsIDEwMDAxXX0=", + + URLSrc = + "http://~s:~p/buckets/~s/index/$key/~s/~s?mapfold=true&mapfoldmod=~s" + ++ "&mapfoldoptions=~s", + URL = io_lib:format(URLSrc, + [Host, Port, Bucket, KeyStart, KeyEnd, + MapFoldMod, MapFoldOpts]), + + case json_get(URL, State#state.fold_timeout, false) of + {ok, {struct, [{<<"deltas">>, SegL}]}} -> + lager:info("Segment fold returned in ~w seconds finding ~w keys", + [timer:now_diff(os:timestamp(), SW)/1000000, length(SegL)]), + {ok, State}; + {error, Reason} -> + io:format("[~s:~p] ERROR - Reason: ~p~n", + [?MODULE, ?LINE, Reason]), + {error, Reason, State} + end. + +%% ==================================================================== +%% Index seeds +%% ==================================================================== + +generate_binary_indexes() -> + [{{binary_index, "postcode"}, postcode_index()}, + {{binary_index, "dateofbirth"}, dateofbirth_index()}, + {{binary_index, "lastmodified"}, lastmodified_index()}]. + +postcode_index() -> + NotVeryNameLikeThing = base64:encode_to_string(crypto:rand_bytes(4)), + lists:map(fun(_X) -> + L = length(?POSTCODE_AREAS), + {_, Area} = lists:keyfind(random:uniform(L), 1, ?POSTCODE_AREAS), + District = Area ++ integer_to_list(random:uniform(26)), + F = District ++ "|" ++ NotVeryNameLikeThing, + list_to_binary(F) end, + lists:seq(1, random:uniform(3))). + +dateofbirth_index() -> + Delta = random:uniform(2500000000), + {{Y, M, D}, + _} = calendar:gregorian_seconds_to_datetime(Delta + 61000000000), + F = lists:flatten(io_lib:format(?DATE_FORMAT, [Y, M, D])) ++ "|" ++ base64:encode_to_string(crypto:rand_bytes(4)), + [list_to_binary(F)]. + +lastmodified_index() -> + {{Year, Month, Day}, + {Hr, Min, Sec}} = calendar:now_to_datetime(os:timestamp()), + F = lists:flatten(io_lib:format(?DATETIME_FORMAT, + [Year, Month, Day, Hr, Min, Sec])), + [list_to_binary(F)]. + + +generate_uniquekey(C, RandBytes, skew_order) -> + H0 = convert_tolist(erlang:phash2(C)), + RB = convert_tolist(RandBytes), + <>; +generate_uniquekey(C, RandBytes, key_order) -> + B0 = convert_tolist(C), + RB = convert_tolist(RandBytes), + <>. + + +non_compressible_value(Size) -> + crypto:rand_bytes(Size). + + +eightytwenty_keycount(UKC) -> + % 80% of the time choose a key in the bottom 20% of the + % result range, and 20% of the time in the upper 80% of the range + TwentyPoint = random:uniform(max(1, UKC div 5)), + case random:uniform(max(1, UKC)) < TwentyPoint of + true -> + random:uniform(UKC - TwentyPoint) + max(1, TwentyPoint); + false -> + random:uniform(max(1, TwentyPoint)) + end. + + +convert_tolist(I) when is_integer(I) -> + list_to_binary(lists:flatten(io_lib:format("~9..0B", [I]))); +convert_tolist(Bin) -> + <> = Bin, + convert_tolist(I). \ No newline at end of file diff --git a/src/basho_bench_driver_riakc_pb.erl b/src/basho_bench_driver_riakc_pb.erl index 7338dd2af..96bfb3fc0 100644 --- a/src/basho_bench_driver_riakc_pb.erl +++ b/src/basho_bench_driver_riakc_pb.erl @@ -26,6 +26,8 @@ mapred_valgen/2, mapred_ordered_valgen/1]). +-export([run_listkeys/1]). + -include("basho_bench.hrl"). -record(state, { pid, @@ -46,11 +48,21 @@ timeout_read, timeout_write, timeout_listkeys, - timeout_mapreduce + timeout_mapreduce, + twoi_qcount = 0 :: integer(), + twoi_rcount = 0 :: integer(), + nominated_id = false ::boolean(), + % ID 1 is nominated to do special work + singleton_targets :: list(), + % List of targets to be used for singleton async pid + singleton_pid :: pid() | undefined }). -define(TIMEOUT_GENERAL, 62*1000). % Riak PB default + 2 sec +% the bigger the number the less frequent the logs of 2i query results +-define(RANDOMLOG_FREQ, 50000). + -define(ERLANG_MR, [{map, {modfun, riak_kv_mapreduce, map_object_value}, none, false}, {reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, none, true}]). @@ -58,6 +70,26 @@ [{map, {jsfun, <<"Riak.mapValuesJson">>}, none, false}, {reduce, {jsfun, <<"Riak.reduceSum">>}, none, true}]). +-define(POSTCODE_AREAS, + [{1, "AB"}, {2, "AL"}, {3, "B"}, {4, "BA"}, {5, "BB"}, + {6, "BD"}, {7, "BH"}, {8, "BL"}, {9, "BN"}, {10, "BR"}, + {11, "BS"}, {12, "BT"}, {13, "CA"}, {14, "CB"}, {15, "CF"}, + {16, "CH"}, {17, "CM"}, {18, "CO"}, {19, "CR"}, {20, "CT"}, + {21, "CV"}, {22, "CW"}, {23, "DA"}, {24, "DD"}, {25, "DE"}, + {26, "DG"}, {27, "DH"}, {28, "DL"}, {29, "DN"}, {30, "DT"}, + {31, "DU"}, {32, "E"}, {33, "EC"}, {34, "EH"}, {35, "EN"}, + {36, "EX"}, {37, "FK"}, {38, "FY"}, {39, "G"}, {40, "GL"}, + {41, "GU"}, {42, "HA"}, {43, "HD"}, {44, "HG"}, {45, "HP"}, + {46, "HR"}, {47, "HS"}, {48, "HU"}, {49, "HX"}, {50, "IG"}, + {51, "IP"}, {52, "IV"}, {53, "KA"}, {54, "KT"}, {55, "KW"}, + {56, "KY"}, {57, "L"}, {58, "LA"}, {59, "LD"}, {60, "LE"}, + {61, "LL"}, {62, "LS"}, {63, "LU"}, {64, "M"}, {65, "ME"}, + {66, "MK"}, {67, "ML"}, {68, "N"}, {69, "NE"}, {70, "NG"}, + {71, "MM"}, {72, "NP"}, {73, "NR"}, {74, "NW"}, {75, "OL"}, + {76, "OX"}]). +-define(DATETIME_FORMAT, "~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w"). +-define(DATE_FORMAT, "~b-~2..0b-~2..0b"). + %% ==================================================================== %% API %% ==================================================================== @@ -98,6 +130,7 @@ new(Id) -> ?INFO("Using target ~p:~p for worker ~p\n", [TargetIp, TargetPort, Id]), case riakc_pb_socket:start_link(TargetIp, TargetPort, get_connect_options()) of {ok, Pid} -> + NominatedID = Id == 1, {ok, #state { pid = Pid, bucket = Bucket, r = R, @@ -109,14 +142,16 @@ new(Id) -> content_type = CT, search_queries = SearchQs, query_step_interval = SearchQStepIval, - start_time = erlang:now(), + start_time = os:timestamp(), keylist_length = KeylistLength, preloaded_keys = PreloadedKeys, timeout_general = get_timeout_general(), timeout_read = get_timeout(pb_timeout_read), timeout_write = get_timeout(pb_timeout_write), timeout_listkeys = get_timeout(pb_timeout_listkeys), - timeout_mapreduce = get_timeout(pb_timeout_mapreduce) + timeout_mapreduce = get_timeout(pb_timeout_mapreduce), + nominated_id = NominatedID, + singleton_targets = Targets }}; {error, Reason2} -> ?FAIL_MSG("Failed to connect riakc_pb_socket to ~p:~p: ~p\n", @@ -346,6 +381,79 @@ run(update, KeyGen, ValueGen, State) -> {error, Reason} -> {error, Reason, State} end; + +%% Update an object with secondary indexes. +run(update_with2i, KeyGen, ValueGen, State) -> + Pid = State#state.pid, + Key = KeyGen(), + Value = ValueGen(), + + Robj0 = + case riakc_pb_socket:get(Pid, + State#state.bucket, + Key, + State#state.timeout_read) of + {ok, Robj} -> + Robj; + {error, notfound} -> + riakc_obj:new(State#state.bucket, Key) + end, + + MD0 = riakc_obj:get_update_metadata(Robj0), + MD1 = riakc_obj:clear_secondary_indexes(MD0), + MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()), + Robj1 = riakc_obj:update_value(Robj0, Value), + Robj2 = riakc_obj:update_metadata(Robj1, MD2), + + %% Write the object... + case riakc_pb_socket:put(Pid, Robj2, State#state.timeout_write) of + ok -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} + end; + +run(query_postcode, _KeyGen, _ValueGen, State) -> + Pid = State#state.pid, + Bucket = State#state.bucket, + L = length(?POSTCODE_AREAS), + {_R, Area} = lists:keyfind(random:uniform(L), 1, ?POSTCODE_AREAS), + District = Area ++ integer_to_list(random:uniform(26)), + StartKey = District ++ "|" ++ "a", + EndKey = District ++ "|" ++ "b", + case riakc_pb_socket:get_index_range(Pid, + Bucket, + <<"postcode_bin">>, + list_to_binary(StartKey), + list_to_binary(EndKey), + [{timeout, State#state.timeout_general}, + {return_terms, true}]) of + {ok, Results} -> + record_2i_results(Results, State); + {error, Reason} -> + io:format("[~s:~p] ERROR - Reason: ~p~n", [?MODULE, ?LINE, Reason]), + {error, Reason, State} + end; +run(query_dob, _KeyGen, _ValueGen, State) -> + Pid = State#state.pid, + Bucket = State#state.bucket, + R = random:uniform(2500000000), + DOB_SK = pick_dateofbirth(R), + DOB_EK = pick_dateofbirth(R + random:uniform(86400 * 3)), + case riakc_pb_socket:get_index_range(Pid, + Bucket, + <<"dateofbirth_bin">>, + list_to_binary(DOB_SK), + list_to_binary(DOB_EK ++ "|"), + [{timeout, State#state.timeout_general}, + {return_terms, true}]) of + {ok, Results} -> + record_2i_results(Results, State); + {error, Reason} -> + io:format("[~s:~p] ERROR - Reason: ~p~n", [?MODULE, ?LINE, Reason]), + {error, Reason, State} + end; + run(update_existing, KeyGen, ValueGen, State) -> Key = KeyGen(), case riakc_pb_socket:get(State#state.pid, State#state.bucket, @@ -384,15 +492,26 @@ run(delete, KeyGen, _ValueGen, State) -> {error, Reason, State} end; run(listkeys, _KeyGen, _ValueGen, State) -> - %% Pass on rw - case riakc_pb_socket:list_keys(State#state.pid, State#state.bucket, State#state.timeout_listkeys) of - {ok, _Keys} -> + IsAlive = + case State#state.singleton_pid of + undefined -> + false; + LastPid -> + is_process_alive(LastPid) + end, + case {State#state.nominated_id, IsAlive} of + {true, true} -> + lager:info("Skipping listkeys for overlap"), {ok, State}; - {error, disconnected} -> - run(listkeys, _KeyGen, _ValueGen, State); - {error, Reason} -> - {error, Reason, State} + {true, false} -> + Pid = spawn(?MODULE, run_listkeys, [State]), + {ok, State#state{singleton_pid = Pid}}; + _ -> + {ok, State} end; +run(pause_minute, _KeyGen, _ValueGen, State) -> + timer:sleep(60000), + {ok, State}; run(search, _KeyGen, _ValueGen, #state{search_queries=SearchQs}=State) -> [{Index, Query, Options}|_] = SearchQs, @@ -409,11 +528,11 @@ run(search, _KeyGen, _ValueGen, #state{search_queries=SearchQs}=State) -> run(search_interval, _KeyGen, _ValueGen, #state{search_queries=SearchQs, start_time=StartTime, query_step_interval=Interval}=State) -> [{Index, Query, Options}|_] = SearchQs, - Now = erlang:now(), + Now = os:timestamp(), case timer:now_diff(Now, StartTime) of _MicroSec when _MicroSec > (Interval * 1000000) -> NewState = State#state{search_queries=roll_list(SearchQs),start_time=Now}; - _MicroSec -> + _MicroSec -> NewState = State end, @@ -609,3 +728,93 @@ get_timeout(Name) when Name == pb_timeout_read; get_connect_options() -> basho_bench_config:get(pb_connect_options, [{auto_reconnect, true}]). + + +%% ==================================================================== +%% Index seeds +%% ==================================================================== + + +generate_binary_indexes() -> + [{{binary_index, "postcode"}, postcode_index()}, + {{binary_index, "dateofbirth"}, dateofbirth_index()}, + {{binary_index, "lastmodified"}, lastmodified_index()}]. + +postcode_index() -> + NotVeryNameLikeThing = base64:encode_to_string(crypto:rand_bytes(4)), + lists:map(fun(_X) -> + L = length(?POSTCODE_AREAS), + {_R, Area} = lists:keyfind(random:uniform(L), 1, ?POSTCODE_AREAS), + District = Area ++ integer_to_list(random:uniform(26)), + F = District ++ "|" ++ NotVeryNameLikeThing, + list_to_binary(F) end, + lists:seq(1, random:uniform(3))). + +dateofbirth_index() -> + F = pick_dateofbirth() ++ "|" ++ + base64:encode_to_string(crypto:rand_bytes(4)), + [list_to_binary(F)]. + +pick_dateofbirth() -> + pick_dateofbirth(random:uniform(2500000000)). + +pick_dateofbirth(Delta) -> + {{Y, M, D}, + _} = calendar:gregorian_seconds_to_datetime(Delta + 61000000000), + lists:flatten(io_lib:format(?DATE_FORMAT, [Y, M, D])). + +lastmodified_index() -> + {{Year, Month, Day}, + {Hr, Min, Sec}} = calendar:now_to_datetime(os:timestamp()), + F = lists:flatten(io_lib:format(?DATETIME_FORMAT, + [Year, Month, Day, Hr, Min, Sec])), + [list_to_binary(F)]. + + +record_2i_results(Results, State) -> + RCount_ThisQuery = + case Results of + {index_results_v1, undefined, ResultList, undefined} -> + length(ResultList); + _ -> + 0 + end, + QCount = State#state.twoi_qcount + 1, + RCount = State#state.twoi_rcount + RCount_ThisQuery, + case random:uniform(?RANDOMLOG_FREQ) < QCount of + true -> + AvgRSize = RCount / QCount, + TS = timer:now_diff(os:timestamp(), + State#state.start_time) / 1000000, + io:format("After ~w seconds average result size of ~.2f~n", + [TS, AvgRSize]), + {ok, State#state{twoi_qcount = 0, twoi_rcount = 0}}; + false -> + {ok, State#state{twoi_qcount = QCount, twoi_rcount = RCount}} + end. + +run_listkeys(State) -> + SW = os:timestamp(), + lager:info("Commencing listkeys request"), + + Targets = State#state.singleton_targets, + {TargetIp, TargetPort} = lists:nth(random:uniform(length(Targets)+1), + Targets), + ?INFO("Using target ~p:~p for new singleton asyncworker\n", + [TargetIp, TargetPort]), + {ok, Pid} = riakc_pb_socket:start_link(TargetIp, + TargetPort, + get_connect_options()), + case riakc_pb_socket:list_keys(Pid, + State#state.bucket, + State#state.timeout_listkeys) of + {ok, Keys} -> + lager:info("listkeys request returned ~w keys" ++ + " in ~w seconds", + [length(Keys), + timer:now_diff(os:timestamp(), SW)/1000000]), + ok; + {error, Reason} -> + lager:info("listkeys failed due to reason ~w", [Reason]), + ok + end. diff --git a/src/basho_bench_keygen.erl b/src/basho_bench_keygen.erl index 943705437..6ff1c3bf4 100644 --- a/src/basho_bench_keygen.erl +++ b/src/basho_bench_keygen.erl @@ -110,6 +110,16 @@ new({uniform_int, MaxKey}, _Id) new({uniform_int, StartKey, NumKeys}, _Id) when is_integer(StartKey), is_integer(NumKeys), NumKeys > 0 -> fun() -> random:uniform(NumKeys) + StartKey - 1 end; +new({eightytwenty_int, MaxKey}, _Id) when is_integer(MaxKey), MaxKey > 0 -> + fun() -> + Step = MaxKey div 5, + case random:uniform() < 0.8 of + true -> + random:uniform(Step); + false -> + Step + random:uniform(Step * 4) + end + end; new({pareto_int, MaxKey}, _Id) when is_integer(MaxKey), MaxKey > 0 -> pareto(trunc(MaxKey * 0.2), ?PARETO_SHAPE); diff --git a/src/basho_bench_valgen.erl b/src/basho_bench_valgen.erl index a211701a5..5656ca306 100644 --- a/src/basho_bench_valgen.erl +++ b/src/basho_bench_valgen.erl @@ -64,6 +64,21 @@ new({uniform_int, MaxVal}, _Id) new({uniform_int, MinVal, MaxVal}, _Id) when is_integer(MinVal), is_integer(MaxVal), MaxVal > MinVal -> fun() -> random:uniform(MinVal, MaxVal) end; +new({semi_compressible, MinSize, Mean, XLMult, XLProb}, Id) + when is_integer(MinSize), MinSize >= 0, is_number(Mean), Mean > 0 -> + Source = init_altsource(Id), + fun() -> + R = random:uniform(), + {ModMin, ModMean} = + case R < XLProb of + true -> + {XLMult * MinSize, XLMult * Mean}; + false -> + {MinSize, Mean} + end, + data_block(Source, + ModMin + trunc(basho_bench_stats:exponential(1 / ModMean))) + end; new(Other, _Id) -> ?FAIL_MSG("Invalid value generator requested: ~p\n", [Other]). @@ -104,6 +119,54 @@ init_source(Id, Path) -> end, {?VAL_GEN_BLOB_CFG, size(Bin), Bin}. +init_altsource(Id) -> + init_altsource(Id, basho_bench_config:get(?VAL_GEN_BLOB_CFG, undefined)). + +init_altsource(1, undefined) -> + GenRandStrFun = fun(_X) -> random:uniform(95) + 31 end, + RandomStrs = + lists:map(fun(X) -> + SL = lists:map(GenRandStrFun, lists:seq(1, 128)), + {X, list_to_binary(SL)} + end, + lists:seq(1, 16)), + ComboBlockFun = + fun(_X, Acc) -> + Bin1 = crypto:rand_bytes(4096), + Bin2 = create_random_textblock(32, RandomStrs), + % Both the compressible and uncompressible parts will be + % 4096 bytes in size. zlib will compress the compressible + % part down 3:1 + <> + end, + Bytes = lists:foldl(ComboBlockFun, <<>>, lists:seq(1, 8192)), + SourceSz = byte_size(Bytes), + try + ?TAB = ets:new(?TAB, [public, named_table]), + true = ets:insert(?TAB, {x, Bytes}) + catch _:_ -> rerunning_id_1_init_source_table_already_exists + end, + ?INFO("Finished generating random source size (~w)\n", [SourceSz]), + {?VAL_GEN_SRC_SIZE, SourceSz, Bytes}; +init_altsource(_Id, undefined) -> + [{_, Bytes}] = ets:lookup(?TAB, x), + {?VAL_GEN_SRC_SIZE, size(Bytes), Bytes}; +init_altsource(Id, Path) -> + {Path, {ok, Bin}} = {Path, file:read_file(Path)}, + if Id == 1 -> ?DEBUG("path source ~p ~p\n", [size(Bin), Path]); + true -> ok + end, + {?VAL_GEN_BLOB_CFG, size(Bin), Bin}. + +create_random_textblock(BlockLength, RandomStrs) -> + GetRandomBlockFun = + fun(X, Acc) -> + Rand = random:uniform(min(X, 16)), + {Rand, Block} = lists:keyfind(Rand, 1, RandomStrs), + <> + end, + lists:foldl(GetRandomBlockFun, <<>>, lists:seq(1, BlockLength)). + data_block({SourceCfg, SourceSz, Source}, BlockSize) -> case SourceSz - BlockSize > 0 of true -> @@ -115,3 +178,5 @@ data_block({SourceCfg, SourceSz, Source}, BlockSize) -> [SourceCfg, SourceSz, BlockSize]), Source end. + +