Skip to content

READY: Ensure msg code 0 is always handled. #468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions riak/codecs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import collections

import riak.pb.messages

from riak import RiakError
from riak.codecs.util import parse_pbuf_msg
from riak.util import bytes_to_str

Msg = collections.namedtuple('Msg',
['msg_code', 'data', 'resp_code'],
Expand All @@ -16,10 +20,10 @@ def maybe_incorrect_code(self, resp_code, expect=None):
raise RiakError("unexpected message code: %d, expected %d"
% (resp_code, expect))

def maybe_riak_error(self, err_code, msg_code, data=None):
if msg_code == err_code:
def maybe_riak_error(self, msg_code, data=None):
if msg_code == riak.pb.messages.MSG_CODE_ERROR_RESP:
if data is None:
raise RiakError('no error provided!')
return data
else:
return None
else:
err = parse_pbuf_msg(msg_code, data)
raise RiakError(bytes_to_str(err.errmsg))
16 changes: 2 additions & 14 deletions riak/codecs/pbuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from riak import RiakError
from riak.codecs import Codec, Msg
from riak.codecs.util import parse_pbuf_msg
from riak.content import RiakContent
from riak.pb.riak_ts_pb2 import TsColumnType
from riak.riak_object import VClock
Expand Down Expand Up @@ -90,20 +91,7 @@ def __init__(self,
self._bucket_types = bucket_types

def parse_msg(self, msg_code, data):
pbclass = riak.pb.messages.MESSAGE_CLASSES.get(msg_code, None)
if pbclass is None:
return None
pbo = pbclass()
pbo.ParseFromString(data)
return pbo

def maybe_riak_error(self, msg_code, data=None):
err_code = riak.pb.messages.MSG_CODE_ERROR_RESP
err_data = super(PbufCodec, self).maybe_riak_error(
err_code, msg_code, data)
if err_data:
err = self.parse_msg(msg_code, err_data)
raise RiakError(bytes_to_str(err.errmsg))
return parse_pbuf_msg(msg_code, data)

def encode_auth(self, username, password):
req = riak.pb.riak_pb2.RpbAuthReq()
Expand Down
5 changes: 1 addition & 4 deletions riak/codecs/ttb.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ def maybe_err_ttb(self, err_ttb):
# errcode = err_ttb[2]
raise RiakError(bytes_to_str(errmsg))

def maybe_riak_error(self, msg_code, data=None):
pass

def encode_to_ts_cell(self, cell):
if cell is None:
return []
Expand Down Expand Up @@ -133,7 +130,7 @@ def encode_timeseries_query(self, table, query, interpolations=None):
if '{table}' in q:
q = q.format(table=table.name)
tsi = tsinterpolation_a, q, []
req = tsqueryreq_a, tsi, False, []
req = tsqueryreq_a, tsi, False, udef_a
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mc = MSG_CODE_TS_TTB_MSG
rc = MSG_CODE_TS_TTB_MSG
return Msg(mc, encode(req), rc)
Expand Down
10 changes: 10 additions & 0 deletions riak/codecs/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import riak.pb.messages


def parse_pbuf_msg(msg_code, data):
pbclass = riak.pb.messages.MESSAGE_CLASSES.get(msg_code, None)
if pbclass is None:
return None
pbo = pbclass()
pbo.ParseFromString(data)
return pbo
27 changes: 25 additions & 2 deletions riak/tests/test_timeseries_ttb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from riak.table import Table
from riak.ts_object import TsObject
from riak.codecs.ttb import TtbCodec
from riak.util import str_to_bytes, \
from riak.util import str_to_bytes, bytes_to_str, \
unix_time_millis, is_timeseries_supported
from riak.tests import RUN_TIMESERIES
from riak.tests.base import IntegrationTestBase
Expand Down Expand Up @@ -141,7 +141,7 @@ def test_query_that_returns_table_description(self):
row = ts_obj.rows[0]
self.assertEqual(len(row), 5)

def test_store_and_fetch(self):
def test_store_and_fetch_and_query(self):
now = datetime.datetime.utcfromtimestamp(144379690.987000)
fiveMinsAgo = now - fiveMins
tenMinsAgo = fiveMinsAgo - fiveMins
Expand Down Expand Up @@ -187,6 +187,29 @@ def test_store_and_fetch(self):
self.assertEqual(len(row), 5)
self.assertEqual(row, exp)

fmt = """
select * from {table} where
time > {t1} and time < {t2} and
geohash = 'hash1' and
user = 'user2'
"""
query = fmt.format(
table=table_name,
t1=unix_time_millis(tenMinsAgo),
t2=unix_time_millis(now))
ts_obj = self.client.ts_query(table_name, query)
if ts_obj.columns is not None:
self.assertEqual(len(ts_obj.columns.names), 5)
self.assertEqual(len(ts_obj.columns.types), 5)
self.assertEqual(len(ts_obj.rows), 1)
row = ts_obj.rows[0]
self.assertEqual(bytes_to_str(row[0]), 'hash1')
self.assertEqual(bytes_to_str(row[1]), 'user2')
self.assertEqual(row[2], fiveMinsAgo)
self.assertEqual(row[2].microsecond, 987000)
self.assertEqual(bytes_to_str(row[3]), 'wind')
self.assertIsNone(row[4])

def test_create_error_via_put(self):
table = Table(self.client, table_name)
ts_obj = table.new([])
Expand Down
1 change: 1 addition & 0 deletions riak/transports/tcp/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ def _request(self, msg, codec=None):
raise ValueError('expected a Codec argument')

resp_code, data = self._send_recv(msg_code, data)
# NB: decodes errors with msg code 0
codec.maybe_riak_error(resp_code, data)
codec.maybe_incorrect_code(resp_code, expect)
if resp_code == MSG_CODE_TS_TTB_MSG or \
Expand Down