diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index 68c0274..781e62c 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -21,12 +21,6 @@ import java.util.Map; import java.util.stream.Collectors; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonParser; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; - import static java.util.stream.Collectors.toList; /** @@ -84,17 +78,8 @@ public SourceRecord toSourceRecord( // getUnmarshallItems from Dynamo Document //Map unMarshalledItems = ItemUtils.toSimpleMapValue(attributes); - //JSON conversion - String outputJsonString = null; - try { - String jsonString = ItemUtils.toItem(attributes).toJSON(); - JsonObject jsonObject = new JsonParser().parse(jsonString).getAsJsonObject(); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - outputJsonString = gson.toJson(jsonObject); - } catch (JsonParseException e) { - e.printStackTrace(); - throw new Exception("Error Occured in JSON Parsing " + e.getMessage(), e); - } + //JSON conversion + String outputJsonString = ItemUtils.toItem(attributes).toJSON(); // Leveraging offsets to store shard and sequence number with each item pushed to Kafka. // This info will only be used to update `shardRegister` and won't be used to reset state after restart @@ -123,7 +108,7 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) - .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(outputJsonString)) + .put(Envelope.FieldName.DOCUMENT, outputJsonString) // objectMapper.writeValueAsString(outputJsonString)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index d3ff394..b199e64 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -23,8 +23,8 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; +import com.google.gson.JsonObject; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -275,21 +275,16 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx task.start(configs); List response = task.poll(); - String expected = "\"{\\n \\\"col2\\\": \\\"val1\\\",\\n \\\"col3\\\": 1,\\n \\\"col1\\\": \\\"key1\\\"\\n}\""; - String actual = (((Struct) response.get(0).value()).getString("document")); - - // Converting both expected and actual to JSON string - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - expected = gson.toJson(expected); - actual = gson.toJson(actual); - // Assert assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), task.getSourceInfo().lastInitSyncStart); assertEquals(1, task.getSourceInfo().initSyncCount); + String expected = "{col2:val1,col3:1,col1:key1}"; + JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); + assertEquals(1, response.size()); assertEquals("r", ((Struct) response.get(0).value()).getString("op")); - assertEquals(expected , actual); + assertEquals(expectedJson.toString(), ((Struct) response.get(0).value()).getString("document")); assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus); assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey); } @@ -568,23 +563,16 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException { // Act task.start(configs); List response = task.poll(); - - String expected = "\"{\\n \\\"col2\\\": \\\"val1\\\",\\n \\\"col3\\\": 1,\\n \\\"col1\\\": \\\"key1\\\"\\n}\""; - String expected_document_key = "\"{\\n \\\"col1\\\": \\\"key2\\\"\\n}\""; - String actual = (((Struct) response.get(0).value()).getString("document")); - String actual_document_key = ((Struct) response.get(1).value()).getString("document"); - - // Converting both expected and actual to JSON string - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - expected = gson.toJson(expected); - expected_document_key = gson.toJson(expected_document_key); - actual = gson.toJson(actual); - actual_document_key = gson.toJson(actual_document_key); + + String expected = "{col2:val1,col3:1,col1:key1}"; + String expectedKey = "{col1:key2}"; + JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); + JsonObject expectedKeyJson = new JsonParser().parse(expectedKey).getAsJsonObject(); // Assert assertEquals(3, response.size()); - assertEquals(expected, actual); - assertEquals(expected_document_key, actual_document_key); + assertEquals(expectedJson.toString(), ((Struct) response.get(0).value()).getString("document")); + assertEquals(expectedKeyJson.toString(), ((Struct) response.get(1).value()).getString("document")); assertNull(response.get(2).value()); // tombstone } @@ -898,4 +886,4 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } -} +} \ No newline at end of file diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java index ffd2caf..ee01a30 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java @@ -19,8 +19,8 @@ import java.util.List; import java.util.Map; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; +import com.google.gson.JsonObject; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -75,7 +75,7 @@ private SourceInfo getSourceInfo(String table) { @Test public void correctTopicNameIsConstructed() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -88,13 +88,13 @@ public void correctTopicNameIsConstructed() throws Exception { ); // Assert - assertEquals("TestTopicPrefix-", record.topic()); + assertEquals("TestTopicPrefix", record.topic()); } @Test public void sourceInfoIsPutToOffset() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -113,7 +113,7 @@ public void sourceInfoIsPutToOffset() throws Exception { @Test public void shardIdAndSequenceNumberIsPutToOffset() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -136,7 +136,7 @@ public void singleItemKeyIsAddedToRecord() throws Exception { List keySchema = new LinkedList<>(); keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("testKV1")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -161,7 +161,7 @@ public void multiItemKeyIsAddedToRecord() throws Exception { keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("testKV1")); keySchema.add(new KeySchemaElement().withKeyType("N").withAttributeName("testKV2")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -186,7 +186,7 @@ public void multiItemKeyIsAddedToRecord() throws Exception { @Test public void recordAttributesAreAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -198,16 +198,12 @@ public void recordAttributesAreAddedToValueData() throws Exception { "testSequenceNumberID1" ); - String expected = "\"{\\n \\\"testKV1\\\": \\\"testKV1Value\\\",\\n \\\"testKV2\\\": \\\"2\\\",\\n \\\"testV2\\\": \\\"testStringValue\\\",\\n \\\"testV1\\\": 1\\n}\""; - String actual = ((Struct) record.value()).getString("document"); - - // Converting both expected and actual to JSON string - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - expected = gson.toJson(expected); - actual = gson.toJson(actual); + String expected = "{testKV1:testKV1Value,testKV2:'2',testV2:testStringValue,testV1:1}"; + JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); // Assert - assertEquals(expected, actual); + assertEquals(expectedJson.toString(), + ((Struct) record.value()).getString("document")); } @Test @@ -216,7 +212,7 @@ public void singleItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throw List keySchema = new LinkedList<>(); keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -241,7 +237,7 @@ public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234")); keySchema.add(new KeySchemaElement().withKeyType("N").withAttributeName("1-starts-with-number")); - RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -266,7 +262,7 @@ public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws @Test public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidCharacters() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -278,22 +274,18 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar "testSequenceNumberID1" ); - String expected = "\"{\\n \\\"test-1234\\\": \\\"testKV1Value\\\",\\n \\\"_starts_with_underscore\\\": 1,\\n \\\"1-starts-with-number\\\": \\\"2\\\",\\n \\\"test!@£$%^\\\": \\\"testStringValue\\\"\\n}\""; - String actual = ((Struct) record.value()).getString("document"); - - // Converting both expected and actual to JSON string - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - expected = gson.toJson(expected); - actual = gson.toJson(actual); + String expected = "{test-1234:testKV1Value,_starts_with_underscore:1,1-starts-with-number:'2',test!@£$%^:testStringValue}"; + JsonObject expectedJson = new JsonParser().parse(expected).getAsJsonObject(); // Assert - assertEquals(expected, actual); + assertEquals(expectedJson.toString(), + ((Struct) record.value()).getString("document")); } @Test public void sourceInfoIsAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -315,7 +307,7 @@ public void sourceInfoIsAddedToValueData() throws Exception { @Test public void operationIsAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -334,7 +326,7 @@ public void operationIsAddedToValueData() throws Exception { @Test public void arrivalTimestampIsAddedToValueData() throws Exception { // Arrange - RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-"); + RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix"); // Act SourceRecord record = converter.toSourceRecord( @@ -350,4 +342,4 @@ public void arrivalTimestampIsAddedToValueData() throws Exception { assertEquals(978393600000L, ((Struct) record.value()).getInt64("ts_ms")); } -} +} \ No newline at end of file