From 8eec2199bf8be4f264e431313052e76cb6c3215e Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 3 May 2016 15:09:30 -0700 Subject: [PATCH] Ensure msg code 0 is always handled. Fix cover context arg in tsqueryreq --- riak/codecs/__init__.py | 14 +++++++++----- riak/codecs/pbuf.py | 16 ++-------------- riak/codecs/ttb.py | 5 +---- riak/codecs/util.py | 10 ++++++++++ riak/tests/test_timeseries_ttb.py | 27 +++++++++++++++++++++++++-- riak/transports/tcp/transport.py | 1 + 6 files changed, 48 insertions(+), 25 deletions(-) create mode 100644 riak/codecs/util.py diff --git a/riak/codecs/__init__.py b/riak/codecs/__init__.py index e356b5f9..b155bdff 100644 --- a/riak/codecs/__init__.py +++ b/riak/codecs/__init__.py @@ -1,6 +1,10 @@ import collections +import riak.pb.messages + from riak import RiakError +from riak.codecs.util import parse_pbuf_msg +from riak.util import bytes_to_str Msg = collections.namedtuple('Msg', ['msg_code', 'data', 'resp_code'], @@ -16,10 +20,10 @@ def maybe_incorrect_code(self, resp_code, expect=None): raise RiakError("unexpected message code: %d, expected %d" % (resp_code, expect)) - def maybe_riak_error(self, err_code, msg_code, data=None): - if msg_code == err_code: + def maybe_riak_error(self, msg_code, data=None): + if msg_code == riak.pb.messages.MSG_CODE_ERROR_RESP: if data is None: raise RiakError('no error provided!') - return data - else: - return None + else: + err = parse_pbuf_msg(msg_code, data) + raise RiakError(bytes_to_str(err.errmsg)) diff --git a/riak/codecs/pbuf.py b/riak/codecs/pbuf.py index 1d96c831..b78b585e 100644 --- a/riak/codecs/pbuf.py +++ b/riak/codecs/pbuf.py @@ -9,6 +9,7 @@ from riak import RiakError from riak.codecs import Codec, Msg +from riak.codecs.util import parse_pbuf_msg from riak.content import RiakContent from riak.pb.riak_ts_pb2 import TsColumnType from riak.riak_object import VClock @@ -90,20 +91,7 @@ def __init__(self, self._bucket_types = bucket_types def parse_msg(self, msg_code, data): - pbclass = riak.pb.messages.MESSAGE_CLASSES.get(msg_code, None) - if pbclass is None: - return None - pbo = pbclass() - pbo.ParseFromString(data) - return pbo - - def maybe_riak_error(self, msg_code, data=None): - err_code = riak.pb.messages.MSG_CODE_ERROR_RESP - err_data = super(PbufCodec, self).maybe_riak_error( - err_code, msg_code, data) - if err_data: - err = self.parse_msg(msg_code, err_data) - raise RiakError(bytes_to_str(err.errmsg)) + return parse_pbuf_msg(msg_code, data) def encode_auth(self, username, password): req = riak.pb.riak_pb2.RpbAuthReq() diff --git a/riak/codecs/ttb.py b/riak/codecs/ttb.py index 1c0b3bfe..00def900 100644 --- a/riak/codecs/ttb.py +++ b/riak/codecs/ttb.py @@ -50,9 +50,6 @@ def maybe_err_ttb(self, err_ttb): # errcode = err_ttb[2] raise RiakError(bytes_to_str(errmsg)) - def maybe_riak_error(self, msg_code, data=None): - pass - def encode_to_ts_cell(self, cell): if cell is None: return [] @@ -133,7 +130,7 @@ def encode_timeseries_query(self, table, query, interpolations=None): if '{table}' in q: q = q.format(table=table.name) tsi = tsinterpolation_a, q, [] - req = tsqueryreq_a, tsi, False, [] + req = tsqueryreq_a, tsi, False, udef_a mc = MSG_CODE_TS_TTB_MSG rc = MSG_CODE_TS_TTB_MSG return Msg(mc, encode(req), rc) diff --git a/riak/codecs/util.py b/riak/codecs/util.py new file mode 100644 index 00000000..52aecb9f --- /dev/null +++ b/riak/codecs/util.py @@ -0,0 +1,10 @@ +import riak.pb.messages + + +def parse_pbuf_msg(msg_code, data): + pbclass = riak.pb.messages.MESSAGE_CLASSES.get(msg_code, None) + if pbclass is None: + return None + pbo = pbclass() + pbo.ParseFromString(data) + return pbo diff --git a/riak/tests/test_timeseries_ttb.py b/riak/tests/test_timeseries_ttb.py index 45ba6faf..e1bf96a8 100644 --- a/riak/tests/test_timeseries_ttb.py +++ b/riak/tests/test_timeseries_ttb.py @@ -11,7 +11,7 @@ from riak.table import Table from riak.ts_object import TsObject from riak.codecs.ttb import TtbCodec -from riak.util import str_to_bytes, \ +from riak.util import str_to_bytes, bytes_to_str, \ unix_time_millis, is_timeseries_supported from riak.tests import RUN_TIMESERIES from riak.tests.base import IntegrationTestBase @@ -141,7 +141,7 @@ def test_query_that_returns_table_description(self): row = ts_obj.rows[0] self.assertEqual(len(row), 5) - def test_store_and_fetch(self): + def test_store_and_fetch_and_query(self): now = datetime.datetime.utcfromtimestamp(144379690.987000) fiveMinsAgo = now - fiveMins tenMinsAgo = fiveMinsAgo - fiveMins @@ -187,6 +187,29 @@ def test_store_and_fetch(self): self.assertEqual(len(row), 5) self.assertEqual(row, exp) + fmt = """ + select * from {table} where + time > {t1} and time < {t2} and + geohash = 'hash1' and + user = 'user2' + """ + query = fmt.format( + table=table_name, + t1=unix_time_millis(tenMinsAgo), + t2=unix_time_millis(now)) + ts_obj = self.client.ts_query(table_name, query) + if ts_obj.columns is not None: + self.assertEqual(len(ts_obj.columns.names), 5) + self.assertEqual(len(ts_obj.columns.types), 5) + self.assertEqual(len(ts_obj.rows), 1) + row = ts_obj.rows[0] + self.assertEqual(bytes_to_str(row[0]), 'hash1') + self.assertEqual(bytes_to_str(row[1]), 'user2') + self.assertEqual(row[2], fiveMinsAgo) + self.assertEqual(row[2].microsecond, 987000) + self.assertEqual(bytes_to_str(row[3]), 'wind') + self.assertIsNone(row[4]) + def test_create_error_via_put(self): table = Table(self.client, table_name) ts_obj = table.new([]) diff --git a/riak/transports/tcp/transport.py b/riak/transports/tcp/transport.py index 466ac8d8..8b7841b9 100644 --- a/riak/transports/tcp/transport.py +++ b/riak/transports/tcp/transport.py @@ -535,6 +535,7 @@ def _request(self, msg, codec=None): raise ValueError('expected a Codec argument') resp_code, data = self._send_recv(msg_code, data) + # NB: decodes errors with msg code 0 codec.maybe_riak_error(resp_code, data) codec.maybe_incorrect_code(resp_code, expect) if resp_code == MSG_CODE_TS_TTB_MSG or \