Skip to content

Commit

Permalink
Clear commit state after commit error (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Jun 9, 2023
1 parent 61bf45d commit ad49e52
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ subprojects {
apply plugin: "maven-publish"

group "io.tabular.connect"
version "0.3.2-SNAPSHOT"
version "0.3.2"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.Tasks;
Expand Down Expand Up @@ -181,6 +182,9 @@ private void commit(boolean partialCommit) {
doCommit(partialCommit);
} catch (Exception e) {
LOG.warn("Commit failed, will try again next cycle", e);
} finally {
readyBuffer.clear();
currentCommitId = null;
}
}

Expand Down Expand Up @@ -208,15 +212,16 @@ private void doCommit(boolean partialCommit) {
// we should only get here if all tables committed successfully...
commitConsumerOffsets();
commitBuffer.clear();
readyBuffer.clear();
UUID commitId = currentCommitId;
currentCommitId = null;

Event event = new Event(EventType.COMMIT_COMPLETE, new CommitCompletePayload(commitId, vtts));
Event event =
new Event(EventType.COMMIT_COMPLETE, new CommitCompletePayload(currentCommitId, vtts));
send(event);

LOG.info(
"Commit {} complete, commited to {} table(s), vtts {}", commitId, commitMap.size(), vtts);
"Commit {} complete, commited to {} table(s), vtts {}",
currentCommitId,
commitMap.size(),
vtts);
}

private String getOffsetsJson() {
Expand Down Expand Up @@ -251,7 +256,14 @@ static Long getVtts(boolean partialCommit, List<CommitReadyPayload> buffer) {

private void commitToTable(
TableIdentifier tableIdentifier, List<Envelope> envelopeList, String offsetsJson, Long vtts) {
Table table = catalog.loadTable(tableIdentifier);
Table table;
try {
table = catalog.loadTable(tableIdentifier);
} catch (NoSuchTableException e) {
LOG.warn("Table not found, skipping commit: {}", tableIdentifier);
return;
}

Map<Integer, Long> commitedOffsets = getLastCommittedOffsetsForTable(table);

List<CommitResponsePayload> payloads =
Expand Down

0 comments on commit ad49e52

Please sign in to comment.