Skip to content

Commit

Permalink
Workaround for memory leak in deserializer (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Jul 12, 2023
1 parent e1bb0e5 commit f5e7296
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 61 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ subprojects {
apply plugin: "maven-publish"

group "io.tabular.connect"
version "0.4.4-SNAPSHOT"
version "0.4.4"

repositories {
mavenCentral()
Expand Down
1 change: 1 addition & 0 deletions kafka-connect-events/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ plugins {

dependencies {
implementation libs.iceberg.core
implementation libs.iceberg.common
implementation libs.avro

testImplementation libs.junit.api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,30 @@ public class CommitCompletePayload implements Payload {
private Long vtts;
private Schema avroSchema;

private static final Schema AVRO_SCHEMA =
SchemaBuilder.builder()
.record(CommitCompletePayload.class.getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type(UUID_SCHEMA)
.noDefault()
.name("vtts")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable()
.longType()
.noDefault()
.endRecord();

public CommitCompletePayload(Schema avroSchema) {
this.avroSchema = avroSchema;
}

public CommitCompletePayload(UUID commitId, Long vtts) {
this.commitId = commitId;
this.vtts = vtts;

this.avroSchema =
SchemaBuilder.builder()
.record(getClass().getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type(UUID_SCHEMA)
.noDefault()
.name("vtts")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable()
.longType()
.noDefault()
.endRecord();
this.avroSchema = AVRO_SCHEMA;
}

public UUID getCommitId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,31 @@ public class CommitReadyPayload implements Payload {
private List<TopicPartitionOffset> assignments;
private Schema avroSchema;

private static final Schema AVRO_SCHEMA =
SchemaBuilder.builder()
.record(CommitReadyPayload.class.getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type(UUID_SCHEMA)
.noDefault()
.name("assignments")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable()
.array()
.items(TopicPartitionOffset.AVRO_SCHEMA)
.noDefault()
.endRecord();

public CommitReadyPayload(Schema avroSchema) {
this.avroSchema = avroSchema;
}

public CommitReadyPayload(UUID commitId, List<TopicPartitionOffset> assignments) {
this.commitId = commitId;
this.assignments = assignments;

this.avroSchema =
SchemaBuilder.builder()
.record(getClass().getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type(UUID_SCHEMA)
.noDefault()
.name("assignments")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable()
.array()
.items(TopicPartitionOffset.AVRO_SCHEMA)
.noDefault()
.endRecord();
this.avroSchema = AVRO_SCHEMA;
}

public UUID getCommitId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,32 @@ public class CommitTablePayload implements Payload {
private Long vtts;
private Schema avroSchema;

private static final Schema AVRO_SCHEMA =
SchemaBuilder.builder()
.record(CommitTablePayload.class.getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type(UUID_SCHEMA)
.noDefault()
.name("tableName")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type(TableName.AVRO_SCHEMA)
.noDefault()
.name("snapshotId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable()
.longType()
.noDefault()
.name("vtts")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable()
.longType()
.noDefault()
.endRecord();

public CommitTablePayload(Schema avroSchema) {
this.avroSchema = avroSchema;
}
Expand All @@ -41,32 +67,7 @@ public CommitTablePayload(UUID commitId, TableName tableName, Long snapshotId, L
this.tableName = tableName;
this.snapshotId = snapshotId;
this.vtts = vtts;

this.avroSchema =
SchemaBuilder.builder()
.record(getClass().getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type(UUID_SCHEMA)
.noDefault()
.name("tableName")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type(TableName.AVRO_SCHEMA)
.noDefault()
.name("snapshotId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable()
.longType()
.noDefault()
.name("vtts")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable()
.longType()
.noDefault()
.endRecord();
this.avroSchema = AVRO_SCHEMA;
}

public UUID getCommitId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.data.avro.DecoderResolver;

public class Event implements Element {

Expand All @@ -36,6 +39,8 @@ public class Event implements Element {
private Payload payload;
private Schema avroSchema;

private static final ThreadLocal<Map<?, ?>> DECODER_CACHES = getDecoderCaches();

public static byte[] encode(Event event) {
try {
return AvroEncoderUtil.encode(event, event.getSchema());
Expand All @@ -46,7 +51,10 @@ public static byte[] encode(Event event) {

public static Event decode(byte[] bytes) {
try {
return AvroEncoderUtil.decode(bytes);
Event event = AvroEncoderUtil.decode(bytes);
// workaround for memory leak, until this is addressed upstream
DECODER_CACHES.get().clear();
return event;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -158,4 +166,10 @@ public Object get(int i) {
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
}

@SuppressWarnings("unchecked")
private static ThreadLocal<Map<?, ?>> getDecoderCaches() {
return (ThreadLocal<Map<?, ?>>)
DynFields.builder().hiddenImpl(DecoderResolver.class, "DECODER_CACHES").buildStatic().get();
}
}

0 comments on commit f5e7296

Please sign in to comment.