diff --git a/examples/riakc_nhs_general.config b/examples/riakc_nhs_general.config index a9bfb02f3..f73a708fc 100644 --- a/examples/riakc_nhs_general.config +++ b/examples/riakc_nhs_general.config @@ -1,6 +1,6 @@ {mode, max}. -{duration, 1440}. +{duration, 2880}. {report_interval, 10}. @@ -18,7 +18,7 @@ %% For alwaysget operations what is: %% - the maximum number of keys per worker (max number of keys = this * concurrent) %% - whether the inserts should be in key_order -{alwaysget, {240000, 100000, skew_order}}. +{alwaysget, {1000000, 300000, skew_order}}. {unique, {8000, skew_order}}. {pb_ips, [{127,0,0,1}]}. @@ -26,10 +26,12 @@ {riakc_pb_replies, 1}. -{operations, [{alwaysget_pb, 651}, {alwaysget_updatewith2i, 120}, - {put_unique, 95}, {get_unique, 130}, +{operations, [{alwaysget_pb, 621}, {alwaysget_updatewith2i, 130}, + {put_unique, 90}, {get_unique, 130}, {delete_unique, 25}, {postcodequery_http, 3}, {dobquery_http, 1}]}. +% + %% Use {auto_reconnect, false} to get "old" behavior (prior to April 2013). %% See deps/riakc/src/riakc_pb_socket.erl for all valid socket options. {pb_connect_options, [{auto_reconnect, true}]}. diff --git a/rebar.config b/rebar.config index 7c2cedb6a..02139899c 100644 --- a/rebar.config +++ b/rebar.config @@ -19,11 +19,11 @@ {folsom, ".*", {git, "git://github.com/basho/folsom.git", {branch, "boundary-0.7.1+basho-bench-float"}}}, {lager, "2.*", {git, "git://github.com/basho/lager", {tag, "2.1.0"}}}, {ibrowse, ".*", - {git, "git://github.com/cmullaparthi/ibrowse.git", {tag, "v4.0.2"}}}, + {git, "git://github.com/basho/ibrowse.git", {branch, "develop-2.9"}}}, {riakc, ".*", - {git, "git://github.com/basho/riak-erlang-client", {branch, "master"}}}, - {mochiweb, "2.9.*", - {git, "git://github.com/basho/mochiweb", {tag, "v2.9.0"}}}, + {git, "git://github.com/basho/riak-erlang-client", {branch, "develop-2.9"}}}, + {mochiweb, ".*", + {git, "git://github.com/basho/mochiweb", {branch, "develop-2.9"}}}, {getopt, ".*", {git, "git://github.com/jcomellas/getopt", {tag, "v0.8.2"}}}, diff --git a/src/basho_bench_driver_nhs.erl b/src/basho_bench_driver_nhs.erl index 9c3aa2c54..1848ffde0 100644 --- a/src/basho_bench_driver_nhs.erl +++ b/src/basho_bench_driver_nhs.erl @@ -32,6 +32,7 @@ -record(state, { pb_pid, + repl_pid, http_host, http_port, recordBucket, @@ -53,6 +54,7 @@ % ID 1 is nominated to do special work singleton_pid :: pid() | undefined, unique_key_count = 1 :: non_neg_integer(), + unique_key_lowcount = 1 :: non_neg_integer(), alwaysget_key_count = 1 :: non_neg_integer(), keyid :: binary(), last_forceaae = os:timestamp() :: erlang:timestamp() @@ -98,6 +100,7 @@ new(Id) -> PBPort = basho_bench_config:get(pb_port, 8087), HTTPIPs = basho_bench_config:get(http_ips, ["127.0.0.1"]), HTTPPort = basho_bench_config:get(http_port, 8098), + ReplPBIPs = basho_bench_config:get(replpb_ips, ["127.0.0.1"]), PBTimeout = basho_bench_config:get(pb_timeout_general, 30*1000), HTTPTimeout = basho_bench_config:get(http_timeout_general, 30*1000), @@ -120,6 +123,12 @@ new(Id) -> ?INFO("Using pb target ~p:~p for worker ~p\n", [PBTargetIp, PBTargetPort, Id]), + ReplTargets = basho_bench_config:normalize_ips(ReplPBIPs, PBPort), + {ReplTargetIp, + ReplTargetPort} = lists:nth((Id rem length(ReplTargets) + 1), + ReplTargets), + ?INFO("Using repl target ~p:~p for worker ~p\n", + [ReplTargetIp, ReplTargetPort, Id]), {AGMaxKC, AGMinKC, AGKeyOrder} = basho_bench_config:get(alwaysget, {1, 1, key_order}), @@ -134,8 +143,17 @@ new(Id) -> case riakc_pb_socket:start_link(PBTargetIp, PBTargetPort) of {ok, Pid} -> NominatedID = Id == 7, + ReplPid = + case riakc_pb_socket:start_link(ReplTargetIp, ReplTargetPort) of + {ok, RP} -> + RP; + _ -> + lager:info("Starting with no repl check"), + no_repl_check + end, {ok, #state { pb_pid = Pid, + repl_pid = ReplPid, http_host = HTTPTargetIp, http_port = HTTPTargetPort, recordBucket = <<"domainRecord">>, @@ -276,6 +294,58 @@ run(alwaysget_updatewith2i, _KeyGen, ValueGen, State) -> {error, Reason, State} end; +run(alwaysget_updatewithout2i, _KeyGen, ValueGen, State) -> + Pid = State#state.pb_pid, + Bucket = State#state.recordBucket, + AGKC = State#state.alwaysget_key_count, + Value = ValueGen(), + KeyInt = eightytwenty_keycount(AGKC), + ToExtend = + random:uniform(State#state.alwaysget_perworker_maxkeycount) > AGKC, + + {Robj0, NewAGKC} = + case ToExtend of + true -> + % Expand the key count + ExpansionKey = + generate_uniquekey(AGKC + 1, State#state.keyid, + State#state.alwaysget_keyorder), + case {AGKC rem 1000, State#state.nominated_id} of + {0, true} -> + lager:info("Always grow key count passing ~w " + ++ "for nominated worker", + [AGKC]); + _ -> + ok + end, + {riakc_obj:new(Bucket, ExpansionKey), + AGKC + 1}; + false -> + % update an existing key + ExistingKey = + generate_uniquekey(KeyInt, State#state.keyid, + State#state.alwaysget_keyorder), + {ok, Robj} = + riakc_pb_socket:get(Pid, + Bucket, ExistingKey, + State#state.pb_timeout), + {Robj, AGKC} + end, + + % MD0 = riakc_obj:get_update_metadata(Robj0), + % MD1 = riakc_obj:clear_secondary_indexes(MD0), + % MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()), + Robj2 = riakc_obj:update_value(Robj0, Value), + % Robj2 = riakc_obj:update_metadata(Robj1, MD2), + + %% Write the object... + case riakc_pb_socket:put(Pid, Robj2, State#state.pb_timeout) of + ok -> + {ok, State#state{alwaysget_key_count = NewAGKC}}; + {error, Reason} -> + {error, Reason, State} + end; + %% Update an object with secondary indexes. run(update_with2i, KeyGen, ValueGen, State) -> @@ -306,9 +376,16 @@ run(update_with2i, KeyGen, ValueGen, State) -> {error, Reason, State} end; %% Put an object with a unique key and a non-compressable value -run(put_unique, _KeyGen, _ValueGen, State) -> +run(put_unique_bet365, _KeyGen, _ValueGen, State) -> Pid = State#state.pb_pid, - Bucket = State#state.documentBucket, + + Bucket = + case erlang:phash2(Pid) rem 2 of + 0 -> + <<"abcdefghijklmnopqrstuvwxyz_1">>; + 1 -> + <<"abcdefghijklmnopqrstuvwxyz_2">> + end, UKC = State#state.unique_key_count, Key = @@ -319,8 +396,8 @@ run(put_unique, _KeyGen, _ValueGen, State) -> Value = non_compressible_value(State#state.unique_size), Robj0 = riakc_obj:new(Bucket, to_binary(Key)), - MD1 = riakc_obj:get_update_metadata(Robj0), - MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()), + MD2 = riakc_obj:get_update_metadata(Robj0), + % MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()), Robj1 = riakc_obj:update_value(Robj0, Value), Robj2 = riakc_obj:update_metadata(Robj1, MD2), @@ -331,12 +408,33 @@ run(put_unique, _KeyGen, _ValueGen, State) -> {error, Reason} -> {error, Reason, State} end; +%% Put an object with a unique key and a non-compressable value +run(put_unique, _KeyGen, _ValueGen, State) -> + {Pid, _Bucket, _Key, Robj, UKC} = prepare_unique_put(State), + %% Write the object... + case riakc_pb_socket:put(Pid, Robj, State#state.pb_timeout) of + ok -> + {ok, State#state{unique_key_count = UKC + 1}}; + {error, Reason} -> + {error, Reason, State} + end; +run(put_unique_checkrepl, _KeyGen, _ValueGen, State) -> + {Pid, Bucket, Key, Robj, UKC} = prepare_unique_put(State), + %% Write the object... + case riakc_pb_socket:put(Pid, Robj, State#state.pb_timeout) of + ok -> + check_repl(State#state.repl_pid, Bucket, to_binary(Key), State#state.pb_timeout), + {ok, State#state{unique_key_count = UKC + 1}}; + {error, Reason} -> + {error, Reason, State} + end; run(get_unique, _KeyGen, _ValueGen, State) -> % Get one of the objects with unique keys Pid = State#state.pb_pid, Bucket = State#state.documentBucket, UKC = State#state.unique_key_count, - Key = generate_uniquekey(random:uniform(UKC), + LKC = State#state.unique_key_lowcount, + Key = generate_uniquekey(LKC + random:uniform(UKC - LKC), State#state.keyid, State#state.unique_keyorder), case riakc_pb_socket:get(Pid, Bucket, Key, State#state.pb_timeout) of @@ -347,6 +445,28 @@ run(get_unique, _KeyGen, _ValueGen, State) -> {error, Reason} -> {error, Reason, State} end; +run(delete_unique, _KeyGen, _ValueGen, State) -> + %% Delete one of the unique keys, assuming that the deletions have not + %% caught up with the PUTs + Pid = State#state.pb_pid, + B = State#state.documentBucket, + UKC = State#state.unique_key_count, + LKC = State#state.unique_key_lowcount, + case LKC < UKC of + true -> + Key = generate_uniquekey(LKC, + State#state.keyid, + State#state.unique_keyorder), + R = riakc_pb_socket:delete(Pid, B, Key, State#state.pb_timeout), + case R of + ok -> + {ok, State#state{unique_key_lowcount = LKC + 1}}; + {error, Reason} -> + {error, Reason, State#state{unique_key_lowcount = LKC + 1}} + end; + false -> + {ok, State} + end; %% Query results via the HTTP interface. run(postcodequery_http, _KeyGen, _ValueGen, State) -> @@ -514,6 +634,24 @@ run(Other, _, _, _) -> %% ==================================================================== +prepare_unique_put(State) -> + Pid = State#state.pb_pid, + Bucket = State#state.documentBucket, + + UKC = State#state.unique_key_count, + Key = + generate_uniquekey(UKC, + State#state.keyid, + State#state.unique_keyorder), + + Value = non_compressible_value(State#state.unique_size), + + Robj0 = riakc_obj:new(Bucket, to_binary(Key)), + MD1 = riakc_obj:get_update_metadata(Robj0), + MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()), + Robj1 = riakc_obj:update_value(Robj0, Value), + Robj2 = riakc_obj:update_metadata(Robj1, MD2), + {Pid, Bucket, Key, Robj2, UKC}. json_get(Url, Timeout) -> json_get(Url, Timeout, true). @@ -561,6 +699,15 @@ ensure_module(Module) -> ok end. +check_repl(no_repl_check, _B, _K, _TO) -> + ok; +check_repl(ReplPid, Bucket, Key, Timeout) -> + case riakc_pb_socket:get(ReplPid, Bucket, Key, Timeout) of + {ok, _Obj} -> + ok; + {error, _} -> + check_repl(ReplPid, Bucket, Key, Timeout) + end. %% ==================================================================== %% Spawned Runners @@ -727,4 +874,4 @@ convert_tolist(I) when is_integer(I) -> list_to_binary(lists:flatten(io_lib:format("~9..0B", [I]))); convert_tolist(Bin) -> <> = Bin, - convert_tolist(I). \ No newline at end of file + convert_tolist(I).