Skip to content

Commit

Permalink
feat(core, jdbc): introduce a JDBC indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Sep 20, 2024
1 parent 7320397 commit a822926
Show file tree
Hide file tree
Showing 15 changed files with 229 additions and 177 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package io.kestra.core.repositories;

public interface SaveRepositoryInterface <T> {
T save(T flow);
import java.util.List;

public interface SaveRepositoryInterface<T> {
T save(T item);

default void saveBatch(List<T> items) {
throw new UnsupportedOperationException();
}
}
133 changes: 2 additions & 131 deletions core/src/main/java/io/kestra/core/runners/Indexer.java
Original file line number Diff line number Diff line change
@@ -1,133 +1,4 @@
package io.kestra.core.runners;

import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.annotation.Requires;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

@SuppressWarnings("this-escape")
@Slf4j
@Singleton
@Requires(beans = {ExecutionRepositoryInterface.class, LogRepositoryInterface.class, TriggerRepositoryInterface.class})
public class Indexer implements IndexerInterface {
private final ExecutionRepositoryInterface executionRepository;
private final QueueInterface<Execution> executionQueue;
private final LogRepositoryInterface logRepository;
private final QueueInterface<LogEntry> logQueue;

private final MetricRepositoryInterface metricRepository;
private final QueueInterface<MetricEntry> metricQueue;
private final MetricRegistry metricRegistry;
private final List<Runnable> receiveCancellations = new ArrayList<>();

private final String id = IdUtils.create();
private final AtomicReference<ServiceState> state = new AtomicReference<>();
private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;

@Inject
public Indexer(
ExecutionRepositoryInterface executionRepository,
@Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface<Execution> executionQueue,
LogRepositoryInterface logRepository,
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface<LogEntry> logQueue,
MetricRepositoryInterface metricRepositor,
@Named(QueueFactoryInterface.METRIC_QUEUE) QueueInterface<MetricEntry> metricQueue,
MetricRegistry metricRegistry,
ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher
) {
this.executionRepository = executionRepository;
this.executionQueue = executionQueue;
this.logRepository = logRepository;
this.logQueue = logQueue;
this.metricRepository = metricRepositor;
this.metricQueue = metricQueue;
this.metricRegistry = metricRegistry;
this.eventPublisher = eventPublisher;
setState(ServiceState.CREATED);
}

@Override
public void run() {
this.send(executionQueue, executionRepository);
this.send(logQueue, logRepository);
this.send(metricQueue, metricRepository);
setState(ServiceState.RUNNING);
}

protected <T> void send(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
this.receiveCancellations.addFirst(queueInterface.receive(Indexer.class, either -> {
if (either.isRight()) {
log.error("unable to deserialize an item: {}", either.getRight().getMessage());
return;
}

T item = either.getLeft();
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_REQUEST_COUNT, "type", item.getClass().getName()).increment();
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, "type", item.getClass().getName()).increment();

this.metricRegistry.timer(MetricRegistry.METRIC_INDEXER_REQUEST_DURATION, "type", item.getClass().getName()).record(() -> {
saveRepositoryInterface.save(item);
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT, "type", item.getClass().getName()).increment();
});
}));
}

protected void setState(final ServiceState state) {
this.state.set(state);
this.eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
}

/** {@inheritDoc} **/
@Override
public String getId() {
return id;
}
/** {@inheritDoc} **/
@Override
public ServiceType getType() {
return ServiceType.INDEXER;
}
/** {@inheritDoc} **/
@Override
public ServiceState getState() {
return state.get();
}

@PreDestroy
@Override
public void close() {
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
try {
this.executionQueue.close();
this.logQueue.close();
this.metricQueue.close();
setState(ServiceState.TERMINATED_GRACEFULLY);
} catch (IOException e) {
log.error("Failed to close the queue", e);
setState(ServiceState.TERMINATED_FORCED);
}
}
// NOTE: this class is not used anymore but must be kept as it is used in as queue consumer both in JDBC and Kafka
public class Indexer {
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@ public void persist(T entity, DSLContext context, @Nullable Map<Field<Object>,
.execute();
}

@Override
public void persistBatch(List<T> items) {
dslContextWrapper.transaction(configuration -> {
DSLContext dslContext = DSL.using(configuration);
var inserts = items.stream().map(item -> {
Map<Field<Object>, Object> finalFields = this.persistFields(item);

return dslContext
.insertInto(table)
.set(AbstractJdbcRepository.field("key"), key(item))
.set(finalFields)
.onConflict(AbstractJdbcRepository.field("key"))
.doUpdate()
.set(finalFields);
})
.toList();

dslContext.batch(inserts).execute();
});
}

@SuppressWarnings("unchecked")
@Override
public <R extends Record, E> ArrayListTotal<E> fetchPage(DSLContext context, SelectConditionStep<R> select, Pageable pageable, RecordMapper<R, E> mapper) {
Expand Down
19 changes: 19 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,25 @@ public void persist(T entity, DSLContext dslContext, Map<Field<Object>, Object>
.execute();
}

public void persistBatch(List<T> items) {
dslContextWrapper.transaction(configuration -> {
DSLContext dslContext = DSL.using(configuration);
var inserts = items.stream().map(item -> {
Map<Field<Object>, Object> finalFields = this.persistFields(item);

return dslContext
.insertInto(table)
.set(io.kestra.jdbc.repository.AbstractJdbcRepository.field("key"), key(item))
.set(finalFields)
.onDuplicateKeyUpdate()
.set(finalFields);
})
.toList();

dslContext.batch(inserts).execute();
});
}

public int delete(T entity) {
return dslContextWrapper.transactionResult(configuration -> {
return this.delete(DSL.using(configuration), entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.data.model.Pageable;
Expand Down Expand Up @@ -57,7 +57,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcIndexerInterface<Execution> {
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcQueueIndexerInterface<Execution> {
private static final int FETCH_SIZE = 100;
private static final Field<String> STATE_CURRENT_FIELD = field("state_current", String.class);
private static final Field<String> NAMESPACE_FIELD = field("namespace", String.class);
Expand Down Expand Up @@ -942,6 +942,15 @@ public Execution save(DSLContext dslContext, Execution execution) {
return execution;
}

@Override
public void saveBatch(List<Execution> items) {
if (items == null) {
return;
}

this.jdbcRepository.persistBatch(items);
}

@Override
public Execution update(Execution execution) {
return this.jdbcRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
Expand All @@ -12,7 +12,7 @@
import java.util.List;
import java.util.Map;

public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRepository implements FlowTopologyRepositoryInterface, JdbcIndexerInterface<FlowTopology> {
public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRepository implements FlowTopologyRepositoryInterface, JdbcQueueIndexerInterface<FlowTopology> {
protected final io.kestra.jdbc.AbstractJdbcRepository<FlowTopology> jdbcRepository;

public AbstractJdbcFlowTopologyRepository(io.kestra.jdbc.AbstractJdbcRepository<FlowTopology> jdbcRepository) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
import org.jooq.Record;
Expand All @@ -24,7 +23,7 @@
import java.util.*;
import java.util.stream.Collectors;

public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface, JdbcIndexerInterface<LogEntry> {
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
protected io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository;

public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository) {
Expand Down Expand Up @@ -342,6 +341,15 @@ public LogEntry save(LogEntry log) {
return log;
}

@Override
public void saveBatch(List<LogEntry> items) {
if (items == null) {
return;
}

this.jdbcRepository.persistBatch(items);
}

@Override
public Integer purge(Execution execution) {
return this.jdbcRepository
Expand All @@ -355,14 +363,6 @@ public Integer purge(Execution execution) {
});
}

@Override
public LogEntry save(DSLContext dslContext, LogEntry logEntry) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(logEntry);
this.jdbcRepository.persist(logEntry, dslContext, fields);

return logEntry;
}

@Override
public void deleteByQuery(String tenantId, String executionId, String taskId, String taskRunId, Level minLevel, Integer attempt) {
this.jdbcRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.micrometer.common.lang.Nullable;
import io.micronaut.data.model.Pageable;
import org.jooq.*;
Expand All @@ -23,7 +22,7 @@
import java.util.Map;
import java.util.function.Function;

public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface, JdbcIndexerInterface<MetricEntry> {
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface {
protected io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository;

public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository) {
Expand Down Expand Up @@ -142,6 +141,15 @@ public MetricEntry save(MetricEntry metric) {
return metric;
}

@Override
public void saveBatch(List<MetricEntry> items) {
if (items == null) {
return;
}

this.jdbcRepository.persistBatch(items);
}

@Override
public Integer purge(Execution execution) {
return this.jdbcRepository
Expand All @@ -155,14 +163,6 @@ public Integer purge(Execution execution) {
});
}

@Override
public MetricEntry save(DSLContext dslContext, MetricEntry metric) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(metric);
this.jdbcRepository.persist(metric, dslContext, fields);

return metric;
}

private List<String> queryDistinct(String tenantId, Condition condition, String field) {
return this.jdbcRepository
.getDslContextWrapper()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.schedulers.ScheduleContextInterface;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
import io.kestra.jdbc.runner.JdbcSchedulerContext;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
Expand All @@ -24,7 +24,7 @@
import java.util.Optional;
import java.util.function.Function;

public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcRepository implements TriggerRepositoryInterface, JdbcIndexerInterface<Trigger> {
public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcRepository implements TriggerRepositoryInterface, JdbcQueueIndexerInterface<Trigger> {
public static final Field<Object> NAMESPACE_FIELD = field("namespace");

protected io.kestra.jdbc.AbstractJdbcRepository<Trigger> jdbcRepository;
Expand Down
1 change: 1 addition & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ void reEmitWorkerJobsForWorkers(final Configuration configuration,
});
}

// TODO proto executionprocess queue without exec content
private void executionQueue(Either<Execution, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
Expand Down
Loading

0 comments on commit a822926

Please sign in to comment.