diff --git a/kubernetes_asyncio/watch/watch.py b/kubernetes_asyncio/watch/watch.py index ab09f2e3..f8b26a33 100644 --- a/kubernetes_asyncio/watch/watch.py +++ b/kubernetes_asyncio/watch/watch.py @@ -102,15 +102,15 @@ def unmarshal_event(self, data: str, response_type): reason = "{}: {}".format(obj['reason'], obj['message']) raise client.exceptions.ApiException(status=obj['code'], reason=reason) - # If possible, compile the JSON response into a Python native response - # type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ... - if response_type: - js['object'] = self._api_client.deserialize( - response=SimpleNamespace(data=json.dumps(js['raw_object'])), - response_type=response_type - ) - if js['type'].lower() != 'bookmark': + # If possible, compile the JSON response into a Python native response + # type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ... + if response_type: + js['object'] = self._api_client.deserialize( + response=SimpleNamespace(data=json.dumps(js['raw_object'])), + response_type=response_type + ) + # decode and save resource_version to continue watching if hasattr(js['object'], 'metadata'): self.resource_version = js['object'].metadata.resource_version @@ -123,7 +123,14 @@ def unmarshal_event(self, data: str, response_type): self.resource_version = js['object']['metadata']['resourceVersion'] elif js['type'].lower() == 'bookmark': - self.resource_version = js['object']['metadata']['resourceVersion'] + if (isinstance(js['raw_object'], dict) + and 'metadata' in js['raw_object'] + and 'resourceVersion' in js['raw_object']['metadata']): + self.resource_version = js['raw_object']['metadata']['resourceVersion'] + else: + raise Exception(("Malformed JSON response for bookmark event, " + "'metadata' or 'resourceVersion' field is missing. " + "JSON: {}").format(js)) return js diff --git a/kubernetes_asyncio/watch/watch_test.py b/kubernetes_asyncio/watch/watch_test.py index 8b513793..7c5aac7d 100644 --- a/kubernetes_asyncio/watch/watch_test.py +++ b/kubernetes_asyncio/watch/watch_test.py @@ -389,6 +389,37 @@ async def test_unmarshal_bookmark_succeeds_and_preserves_resource_version(self): # make sure the resource version is preserved, # and the watcher's resource_version is updated - self.assertTrue(isinstance(event['object'], dict)) - self.assertEqual("1", event['object']['metadata']['resourceVersion']) + self.assertTrue(isinstance(event['raw_object'], dict)) + self.assertEqual("1", event['raw_object']['metadata']['resourceVersion']) + self.assertEqual("1", w.resource_version) + + async def test_unmarshal_job_bookmark_succeeds_and_preserves_resource_version(self): + w = Watch() + event = w.unmarshal_event('{"type": "BOOKMARK", "object": {"apiVersion":' + '"batch/v1","kind":"Job","metadata":' + '{"name": "bar", "resourceVersion": "1"},' + '"spec": {"template": {"metadata": ' + '{"creationTimestamp":null}, "spec": ' + '{"containers":null}}}}}', + 'object') + self.assertEqual("BOOKMARK", event['type']) + + # make sure the resource version is preserved, + # and the watcher's resource_version is updated + self.assertTrue(isinstance(event['raw_object'], dict)) + self.assertEqual("1", event['raw_object']['metadata']['resourceVersion']) self.assertEqual("1", w.resource_version) + + async def test_unmarshall_job_bookmark_malformed_object_fails(self): + # An actual error response sent by K8s during testing. + k8s_err = { + 'type': 'BOOKMARK', + 'object': { + 'kind': 'Job', + 'apiVersion': 'batch/v1', + 'metadata': {}, + } + } + + with self.assertRaises(Exception): + Watch().unmarshal_event(json.dumps(k8s_err), None)