Skip to content

Mas nhsload allfound #224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions examples/riakc_nhs_general.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{mode, max}.

{duration, 1440}.
{duration, 2880}.

{report_interval, 10}.

Expand All @@ -18,18 +18,20 @@
%% For alwaysget operations what is:
%% - the maximum number of keys per worker (max number of keys = this * concurrent)
%% - whether the inserts should be in key_order
{alwaysget, {240000, 100000, skew_order}}.
{alwaysget, {1000000, 300000, skew_order}}.
{unique, {8000, skew_order}}.

{pb_ips, [{127,0,0,1}]}.
{http_ips, [{127,0,0,1}]}.

{riakc_pb_replies, 1}.

{operations, [{alwaysget_pb, 651}, {alwaysget_updatewith2i, 120},
{put_unique, 95}, {get_unique, 130},
{operations, [{alwaysget_pb, 621}, {alwaysget_updatewith2i, 130},
{put_unique, 90}, {get_unique, 130}, {delete_unique, 25},
{postcodequery_http, 3}, {dobquery_http, 1}]}.

%

%% Use {auto_reconnect, false} to get "old" behavior (prior to April 2013).
%% See deps/riakc/src/riakc_pb_socket.erl for all valid socket options.
{pb_connect_options, [{auto_reconnect, true}]}.
Expand Down
8 changes: 4 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
{folsom, ".*", {git, "git://github.com/basho/folsom.git", {branch, "boundary-0.7.1+basho-bench-float"}}},
{lager, "2.*", {git, "git://github.com/basho/lager", {tag, "2.1.0"}}},
{ibrowse, ".*",
{git, "git://github.com/cmullaparthi/ibrowse.git", {tag, "v4.0.2"}}},
{git, "git://github.com/basho/ibrowse.git", {branch, "develop-2.9"}}},
{riakc, ".*",
{git, "git://github.com/basho/riak-erlang-client", {branch, "master"}}},
{mochiweb, "2.9.*",
{git, "git://github.com/basho/mochiweb", {tag, "v2.9.0"}}},
{git, "git://github.com/basho/riak-erlang-client", {branch, "develop-2.9"}}},
{mochiweb, ".*",
{git, "git://github.com/basho/mochiweb", {branch, "develop-2.9"}}},
{getopt, ".*",
{git, "git://github.com/jcomellas/getopt", {tag, "v0.8.2"}}},

Expand Down
159 changes: 153 additions & 6 deletions src/basho_bench_driver_nhs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

-record(state, {
pb_pid,
repl_pid,
http_host,
http_port,
recordBucket,
Expand All @@ -53,6 +54,7 @@
% ID 1 is nominated to do special work
singleton_pid :: pid() | undefined,
unique_key_count = 1 :: non_neg_integer(),
unique_key_lowcount = 1 :: non_neg_integer(),
alwaysget_key_count = 1 :: non_neg_integer(),
keyid :: binary(),
last_forceaae = os:timestamp() :: erlang:timestamp()
Expand Down Expand Up @@ -98,6 +100,7 @@ new(Id) ->
PBPort = basho_bench_config:get(pb_port, 8087),
HTTPIPs = basho_bench_config:get(http_ips, ["127.0.0.1"]),
HTTPPort = basho_bench_config:get(http_port, 8098),
ReplPBIPs = basho_bench_config:get(replpb_ips, ["127.0.0.1"]),

PBTimeout = basho_bench_config:get(pb_timeout_general, 30*1000),
HTTPTimeout = basho_bench_config:get(http_timeout_general, 30*1000),
Expand All @@ -120,6 +123,12 @@ new(Id) ->
?INFO("Using pb target ~p:~p for worker ~p\n", [PBTargetIp,
PBTargetPort,
Id]),
ReplTargets = basho_bench_config:normalize_ips(ReplPBIPs, PBPort),
{ReplTargetIp,
ReplTargetPort} = lists:nth((Id rem length(ReplTargets) + 1),
ReplTargets),
?INFO("Using repl target ~p:~p for worker ~p\n",
[ReplTargetIp, ReplTargetPort, Id]),

{AGMaxKC, AGMinKC, AGKeyOrder} =
basho_bench_config:get(alwaysget, {1, 1, key_order}),
Expand All @@ -134,8 +143,17 @@ new(Id) ->
case riakc_pb_socket:start_link(PBTargetIp, PBTargetPort) of
{ok, Pid} ->
NominatedID = Id == 7,
ReplPid =
case riakc_pb_socket:start_link(ReplTargetIp, ReplTargetPort) of
{ok, RP} ->
RP;
_ ->
lager:info("Starting with no repl check"),
no_repl_check
end,
{ok, #state {
pb_pid = Pid,
repl_pid = ReplPid,
http_host = HTTPTargetIp,
http_port = HTTPTargetPort,
recordBucket = <<"domainRecord">>,
Expand Down Expand Up @@ -276,6 +294,58 @@ run(alwaysget_updatewith2i, _KeyGen, ValueGen, State) ->
{error, Reason, State}
end;

run(alwaysget_updatewithout2i, _KeyGen, ValueGen, State) ->
Pid = State#state.pb_pid,
Bucket = State#state.recordBucket,
AGKC = State#state.alwaysget_key_count,
Value = ValueGen(),
KeyInt = eightytwenty_keycount(AGKC),
ToExtend =
random:uniform(State#state.alwaysget_perworker_maxkeycount) > AGKC,

{Robj0, NewAGKC} =
case ToExtend of
true ->
% Expand the key count
ExpansionKey =
generate_uniquekey(AGKC + 1, State#state.keyid,
State#state.alwaysget_keyorder),
case {AGKC rem 1000, State#state.nominated_id} of
{0, true} ->
lager:info("Always grow key count passing ~w "
++ "for nominated worker",
[AGKC]);
_ ->
ok
end,
{riakc_obj:new(Bucket, ExpansionKey),
AGKC + 1};
false ->
% update an existing key
ExistingKey =
generate_uniquekey(KeyInt, State#state.keyid,
State#state.alwaysget_keyorder),
{ok, Robj} =
riakc_pb_socket:get(Pid,
Bucket, ExistingKey,
State#state.pb_timeout),
{Robj, AGKC}
end,

% MD0 = riakc_obj:get_update_metadata(Robj0),
% MD1 = riakc_obj:clear_secondary_indexes(MD0),
% MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()),
Robj2 = riakc_obj:update_value(Robj0, Value),
% Robj2 = riakc_obj:update_metadata(Robj1, MD2),

%% Write the object...
case riakc_pb_socket:put(Pid, Robj2, State#state.pb_timeout) of
ok ->
{ok, State#state{alwaysget_key_count = NewAGKC}};
{error, Reason} ->
{error, Reason, State}
end;


%% Update an object with secondary indexes.
run(update_with2i, KeyGen, ValueGen, State) ->
Expand Down Expand Up @@ -306,9 +376,16 @@ run(update_with2i, KeyGen, ValueGen, State) ->
{error, Reason, State}
end;
%% Put an object with a unique key and a non-compressable value
run(put_unique, _KeyGen, _ValueGen, State) ->
run(put_unique_bet365, _KeyGen, _ValueGen, State) ->
Pid = State#state.pb_pid,
Bucket = State#state.documentBucket,

Bucket =
case erlang:phash2(Pid) rem 2 of
0 ->
<<"abcdefghijklmnopqrstuvwxyz_1">>;
1 ->
<<"abcdefghijklmnopqrstuvwxyz_2">>
end,

UKC = State#state.unique_key_count,
Key =
Expand All @@ -319,8 +396,8 @@ run(put_unique, _KeyGen, _ValueGen, State) ->
Value = non_compressible_value(State#state.unique_size),

Robj0 = riakc_obj:new(Bucket, to_binary(Key)),
MD1 = riakc_obj:get_update_metadata(Robj0),
MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()),
MD2 = riakc_obj:get_update_metadata(Robj0),
% MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()),
Robj1 = riakc_obj:update_value(Robj0, Value),
Robj2 = riakc_obj:update_metadata(Robj1, MD2),

Expand All @@ -331,12 +408,33 @@ run(put_unique, _KeyGen, _ValueGen, State) ->
{error, Reason} ->
{error, Reason, State}
end;
%% Put an object with a unique key and a non-compressable value
run(put_unique, _KeyGen, _ValueGen, State) ->
{Pid, _Bucket, _Key, Robj, UKC} = prepare_unique_put(State),
%% Write the object...
case riakc_pb_socket:put(Pid, Robj, State#state.pb_timeout) of
ok ->
{ok, State#state{unique_key_count = UKC + 1}};
{error, Reason} ->
{error, Reason, State}
end;
run(put_unique_checkrepl, _KeyGen, _ValueGen, State) ->
{Pid, Bucket, Key, Robj, UKC} = prepare_unique_put(State),
%% Write the object...
case riakc_pb_socket:put(Pid, Robj, State#state.pb_timeout) of
ok ->
check_repl(State#state.repl_pid, Bucket, to_binary(Key), State#state.pb_timeout),
{ok, State#state{unique_key_count = UKC + 1}};
{error, Reason} ->
{error, Reason, State}
end;
run(get_unique, _KeyGen, _ValueGen, State) ->
% Get one of the objects with unique keys
Pid = State#state.pb_pid,
Bucket = State#state.documentBucket,
UKC = State#state.unique_key_count,
Key = generate_uniquekey(random:uniform(UKC),
LKC = State#state.unique_key_lowcount,
Key = generate_uniquekey(LKC + random:uniform(UKC - LKC),
State#state.keyid,
State#state.unique_keyorder),
case riakc_pb_socket:get(Pid, Bucket, Key, State#state.pb_timeout) of
Expand All @@ -347,6 +445,28 @@ run(get_unique, _KeyGen, _ValueGen, State) ->
{error, Reason} ->
{error, Reason, State}
end;
run(delete_unique, _KeyGen, _ValueGen, State) ->
%% Delete one of the unique keys, assuming that the deletions have not
%% caught up with the PUTs
Pid = State#state.pb_pid,
B = State#state.documentBucket,
UKC = State#state.unique_key_count,
LKC = State#state.unique_key_lowcount,
case LKC < UKC of
true ->
Key = generate_uniquekey(LKC,
State#state.keyid,
State#state.unique_keyorder),
R = riakc_pb_socket:delete(Pid, B, Key, State#state.pb_timeout),
case R of
ok ->
{ok, State#state{unique_key_lowcount = LKC + 1}};
{error, Reason} ->
{error, Reason, State#state{unique_key_lowcount = LKC + 1}}
end;
false ->
{ok, State}
end;

%% Query results via the HTTP interface.
run(postcodequery_http, _KeyGen, _ValueGen, State) ->
Expand Down Expand Up @@ -514,6 +634,24 @@ run(Other, _, _, _) ->
%% ====================================================================


prepare_unique_put(State) ->
Pid = State#state.pb_pid,
Bucket = State#state.documentBucket,

UKC = State#state.unique_key_count,
Key =
generate_uniquekey(UKC,
State#state.keyid,
State#state.unique_keyorder),

Value = non_compressible_value(State#state.unique_size),

Robj0 = riakc_obj:new(Bucket, to_binary(Key)),
MD1 = riakc_obj:get_update_metadata(Robj0),
MD2 = riakc_obj:set_secondary_index(MD1, generate_binary_indexes()),
Robj1 = riakc_obj:update_value(Robj0, Value),
Robj2 = riakc_obj:update_metadata(Robj1, MD2),
{Pid, Bucket, Key, Robj2, UKC}.

json_get(Url, Timeout) ->
json_get(Url, Timeout, true).
Expand Down Expand Up @@ -561,6 +699,15 @@ ensure_module(Module) ->
ok
end.

check_repl(no_repl_check, _B, _K, _TO) ->
ok;
check_repl(ReplPid, Bucket, Key, Timeout) ->
case riakc_pb_socket:get(ReplPid, Bucket, Key, Timeout) of
{ok, _Obj} ->
ok;
{error, _} ->
check_repl(ReplPid, Bucket, Key, Timeout)
end.

%% ====================================================================
%% Spawned Runners
Expand Down Expand Up @@ -727,4 +874,4 @@ convert_tolist(I) when is_integer(I) ->
list_to_binary(lists:flatten(io_lib:format("~9..0B", [I])));
convert_tolist(Bin) ->
<<I:26/integer, _Tail:6/bitstring>> = Bin,
convert_tolist(I).
convert_tolist(I).