Skip to content

Commit 5cd489f

Browse files
Priyanka K UPriyanka K U
authored andcommitted
feat: Added external schema support for connector
Contributes to: event-integration/eventstreams-planning/12766 KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverte Signed-off-by: Priyanka K U <[email protected]>
1 parent 21a080f commit 5cd489f

File tree

3 files changed

+80
-6
lines changed

3 files changed

+80
-6
lines changed

connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned {
6868

6969
private static final Map<Schema.Type, JsonToConnectTypeConverter> TO_CONNECT_CONVERTERS = new EnumMap<>(Schema.Type.class);
7070

71+
// if a schema is provided in config, this schema will
72+
// be used for all messages
73+
private Schema schema = null;
74+
7175
static {
7276
TO_CONNECT_CONVERTERS.put(Schema.Type.BOOLEAN, (schema, value, config) -> value.booleanValue());
7377
TO_CONNECT_CONVERTERS.put(Schema.Type.INT8, (schema, value, config) -> (byte) value.intValue());
@@ -291,6 +295,17 @@ public void configure(Map<String, ?> configs) {
291295

292296
fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize()));
293297
toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize()));
298+
299+
try {
300+
final byte[] schemaContent = config.schemaContent();
301+
if (schemaContent != null && schemaContent.length > 0) {
302+
final JsonNode schemaNode = deserializer.deserialize("", schemaContent);
303+
this.schema = asConnectSchema(schemaNode);
304+
}
305+
} catch (SerializationException e) {
306+
throw new DataException("Failed to parse schema in converter config due to serialization error: ", e);
307+
}
308+
294309
}
295310

296311
@Override
@@ -345,13 +360,16 @@ public SchemaAndValue toConnectData(String topic, byte[] value) {
345360
throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
346361
}
347362

348-
if (config.schemasEnabled() && (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)))
349-
throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
363+
if (config.schemasEnabled()) {
364+
if (this.schema != null) {
365+
return new SchemaAndValue(schema, convertToConnect(schema, jsonValue, config));
366+
} else if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) {
367+
throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
350368
" If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.");
351-
352-
// The deserialized data should either be an envelope object containing the schema and the payload or the schema
353-
// was stripped during serialization and we need to fill in an all-encompassing schema.
354-
if (!config.schemasEnabled()) {
369+
}
370+
} else {
371+
// The deserialized data should either be an envelope object containing the schema and the payload or the schema
372+
// was stripped during serialization and we need to fill in an all-encompassing schema.
355373
ObjectNode envelope = JSON_NODE_FACTORY.objectNode();
356374
envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null);
357375
envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue);

connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.common.config.ConfigDef.Width;
2323
import org.apache.kafka.connect.storage.ConverterConfig;
2424

25+
import java.nio.charset.StandardCharsets;
2526
import java.util.Locale;
2627
import java.util.Map;
2728

@@ -36,6 +37,10 @@ public final class JsonConverterConfig extends ConverterConfig {
3637
private static final String SCHEMAS_ENABLE_DISPLAY = "Enable Schemas";
3738

3839
public static final String SCHEMAS_CACHE_SIZE_CONFIG = "schemas.cache.size";
40+
public static final String SCHEMA_CONTENT_CONFIG = "schema.content";
41+
public static final String SCHEMA_CONTENT_DEFAULT = null;
42+
private static final String SCHEMA_CONTENT_DOC = "When set, this is used as the schema for all messages. Otherwise, the schema will should be in the contents of each message.";
43+
private static final String SCHEMA_CONTENT_DISPLAY = "Schema Content";
3944
public static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
4045
private static final String SCHEMAS_CACHE_SIZE_DOC = "The maximum number of schemas that can be cached in this converter instance.";
4146
private static final String SCHEMAS_CACHE_SIZE_DISPLAY = "Schema Cache Size";
@@ -61,6 +66,8 @@ public final class JsonConverterConfig extends ConverterConfig {
6166
orderInGroup++, Width.MEDIUM, SCHEMAS_ENABLE_DISPLAY);
6267
CONFIG.define(SCHEMAS_CACHE_SIZE_CONFIG, Type.INT, SCHEMAS_CACHE_SIZE_DEFAULT, Importance.HIGH, SCHEMAS_CACHE_SIZE_DOC, group,
6368
orderInGroup++, Width.MEDIUM, SCHEMAS_CACHE_SIZE_DISPLAY);
69+
CONFIG.define(SCHEMA_CONTENT_CONFIG, Type.STRING, SCHEMA_CONTENT_DEFAULT, Importance.HIGH, SCHEMA_CONTENT_DOC, group,
70+
orderInGroup++, Width.MEDIUM, SCHEMA_CONTENT_DISPLAY);
6471

6572
group = "Serialization";
6673
orderInGroup = 0;
@@ -86,13 +93,16 @@ public static ConfigDef configDef() {
8693
private final int schemaCacheSize;
8794
private final DecimalFormat decimalFormat;
8895
private final boolean replaceNullWithDefault;
96+
private final byte[] schemaContent;
8997

9098
public JsonConverterConfig(Map<String, ?> props) {
9199
super(CONFIG, props);
92100
this.schemasEnabled = getBoolean(SCHEMAS_ENABLE_CONFIG);
93101
this.schemaCacheSize = getInt(SCHEMAS_CACHE_SIZE_CONFIG);
94102
this.decimalFormat = DecimalFormat.valueOf(getString(DECIMAL_FORMAT_CONFIG).toUpperCase(Locale.ROOT));
95103
this.replaceNullWithDefault = getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG);
104+
String schemaContentStr = getString(SCHEMA_CONTENT_CONFIG);
105+
this.schemaContent = schemaContentStr == null ? null : schemaContentStr.getBytes(StandardCharsets.UTF_8);
96106
}
97107

98108
/**
@@ -130,4 +140,17 @@ public boolean replaceNullWithDefault() {
130140
return replaceNullWithDefault;
131141
}
132142

143+
/**
144+
* If a default schema is provided in the converter config, this will be
145+
* used for all messages.
146+
*
147+
* This is only relevant if schemas are enabled.
148+
*
149+
* @return Schema Contents, will return null if no value is provided
150+
*/
151+
public byte[] schemaContent() {
152+
return schemaContent;
153+
}
154+
155+
133156
}

connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -978,6 +978,39 @@ public void testVersionRetrievedFromAppInfoParser() {
978978
assertEquals(AppInfoParser.getVersion(), converter.version());
979979
}
980980

981+
@Test
982+
public void testSchemaContentIsNull() {
983+
converter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, null), false);
984+
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
985+
}
986+
987+
@Test
988+
public void testSchemaContentIsEmptyString() {
989+
converter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, ""), false);
990+
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
991+
}
992+
993+
@Test
994+
public void testSchemaContentValidSchema() {
995+
converter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"type\": \"string\" }"), false);
996+
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "\"foo-bar-baz\"".getBytes()));
997+
}
998+
999+
@Test
1000+
public void testSchemaContentInValidSchema() {
1001+
assertThrows(
1002+
DataException.class,
1003+
() -> converter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"string\" }"), false),
1004+
" Provided schema is invalid , please recheck the schema you have provided");
1005+
}
1006+
1007+
@Test
1008+
public void testSchemaContentLooksLikeSchema() {
1009+
converter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"type\": \"struct\", \"fields\": [{\"field\": \"schema\", \"type\": \"struct\",\"fields\": [{\"field\": \"type\", \"type\": \"string\" }]}, {\"field\": \"payload\", \"type\": \"string\"}]}"), false);
1010+
SchemaAndValue connectData = converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes());
1011+
assertEquals("foo-bar-baz", ((Struct) connectData.value()).getString("payload"));
1012+
}
1013+
9811014
private JsonNode parse(byte[] json) {
9821015
try {
9831016
return objectMapper.readTree(json);

0 commit comments

Comments
 (0)