Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core, jdbc): introduce a JDBC indexer #4974

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package io.kestra.core.repositories;

public interface SaveRepositoryInterface <T> {
T save(T flow);
import java.util.List;

public interface SaveRepositoryInterface<T> {
T save(T item);

default int saveBatch(List<T> items) {
throw new UnsupportedOperationException();
}
}
133 changes: 2 additions & 131 deletions core/src/main/java/io/kestra/core/runners/Indexer.java
Original file line number Diff line number Diff line change
@@ -1,133 +1,4 @@
package io.kestra.core.runners;

import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.annotation.Requires;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

@SuppressWarnings("this-escape")
@Slf4j
@Singleton
@Requires(beans = {ExecutionRepositoryInterface.class, LogRepositoryInterface.class, TriggerRepositoryInterface.class})
public class Indexer implements IndexerInterface {
private final ExecutionRepositoryInterface executionRepository;
private final QueueInterface<Execution> executionQueue;
private final LogRepositoryInterface logRepository;
private final QueueInterface<LogEntry> logQueue;

private final MetricRepositoryInterface metricRepository;
private final QueueInterface<MetricEntry> metricQueue;
private final MetricRegistry metricRegistry;
private final List<Runnable> receiveCancellations = new ArrayList<>();

private final String id = IdUtils.create();
private final AtomicReference<ServiceState> state = new AtomicReference<>();
private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;

@Inject
public Indexer(
ExecutionRepositoryInterface executionRepository,
@Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface<Execution> executionQueue,
LogRepositoryInterface logRepository,
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface<LogEntry> logQueue,
MetricRepositoryInterface metricRepositor,
@Named(QueueFactoryInterface.METRIC_QUEUE) QueueInterface<MetricEntry> metricQueue,
MetricRegistry metricRegistry,
ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher
) {
this.executionRepository = executionRepository;
this.executionQueue = executionQueue;
this.logRepository = logRepository;
this.logQueue = logQueue;
this.metricRepository = metricRepositor;
this.metricQueue = metricQueue;
this.metricRegistry = metricRegistry;
this.eventPublisher = eventPublisher;
setState(ServiceState.CREATED);
}

@Override
public void run() {
this.send(executionQueue, executionRepository);
this.send(logQueue, logRepository);
this.send(metricQueue, metricRepository);
setState(ServiceState.RUNNING);
}

protected <T> void send(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
this.receiveCancellations.addFirst(queueInterface.receive(Indexer.class, either -> {
if (either.isRight()) {
log.error("unable to deserialize an item: {}", either.getRight().getMessage());
return;
}

T item = either.getLeft();
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_REQUEST_COUNT, "type", item.getClass().getName()).increment();
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, "type", item.getClass().getName()).increment();

this.metricRegistry.timer(MetricRegistry.METRIC_INDEXER_REQUEST_DURATION, "type", item.getClass().getName()).record(() -> {
saveRepositoryInterface.save(item);
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT, "type", item.getClass().getName()).increment();
});
}));
}

protected void setState(final ServiceState state) {
this.state.set(state);
this.eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
}

/** {@inheritDoc} **/
@Override
public String getId() {
return id;
}
/** {@inheritDoc} **/
@Override
public ServiceType getType() {
return ServiceType.INDEXER;
}
/** {@inheritDoc} **/
@Override
public ServiceState getState() {
return state.get();
}

@PreDestroy
@Override
public void close() {
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
try {
this.executionQueue.close();
this.logQueue.close();
this.metricQueue.close();
setState(ServiceState.TERMINATED_GRACEFULLY);
} catch (IOException e) {
log.error("Failed to close the queue", e);
setState(ServiceState.TERMINATED_FORCED);
}
}
// NOTE: this class is not used anymore but must be kept as it is used in as queue consumer both in JDBC and Kafka
public class Indexer {
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -75,6 +76,27 @@ public void persist(T entity, DSLContext context, @Nullable Map<Field<Object>,
.execute();
}

@Override
public int persistBatch(List<T> items) {
return dslContextWrapper.transactionResult(configuration -> {
DSLContext dslContext = DSL.using(configuration);
var inserts = items.stream().map(item -> {
Map<Field<Object>, Object> finalFields = this.persistFields(item);

return dslContext
.insertInto(table)
.set(AbstractJdbcRepository.field("key"), key(item))
.set(finalFields)
.onConflict(AbstractJdbcRepository.field("key"))
.doUpdate()
.set(finalFields);
})
.toList();

return Arrays.stream(dslContext.batch(inserts).execute()).sum();
});
}

@SuppressWarnings("unchecked")
@Override
public <R extends Record, E> ArrayListTotal<E> fetchPage(DSLContext context, SelectConditionStep<R> select, Pageable pageable, RecordMapper<R, E> mapper) {
Expand Down
19 changes: 19 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,25 @@ public void persist(T entity, DSLContext dslContext, Map<Field<Object>, Object>
.execute();
}

public int persistBatch(List<T> items) {
return dslContextWrapper.transactionResult(configuration -> {
DSLContext dslContext = DSL.using(configuration);
var inserts = items.stream().map(item -> {
Map<Field<Object>, Object> finalFields = this.persistFields(item);

return dslContext
.insertInto(table)
.set(io.kestra.jdbc.repository.AbstractJdbcRepository.field("key"), key(item))
.set(finalFields)
.onDuplicateKeyUpdate()
.set(finalFields);
})
.toList();

return Arrays.stream(dslContext.batch(inserts).execute()).sum();
});
}

public int delete(T entity) {
return dslContextWrapper.transactionResult(configuration -> {
return this.delete(DSL.using(configuration), entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorState;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.data.model.Pageable;
Expand Down Expand Up @@ -57,7 +58,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcIndexerInterface<Execution> {
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcQueueIndexerInterface<Execution> {
private static final int FETCH_SIZE = 100;
private static final Field<String> STATE_CURRENT_FIELD = field("state_current", String.class);
private static final Field<String> NAMESPACE_FIELD = field("namespace", String.class);
Expand Down Expand Up @@ -942,6 +943,15 @@ public Execution save(DSLContext dslContext, Execution execution) {
return execution;
}

@Override
public int saveBatch(List<Execution> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}

return this.jdbcRepository.persistBatch(items);
}

@Override
public Execution update(Execution execution) {
return this.jdbcRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
Expand All @@ -12,7 +12,7 @@
import java.util.List;
import java.util.Map;

public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRepository implements FlowTopologyRepositoryInterface, JdbcIndexerInterface<FlowTopology> {
public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRepository implements FlowTopologyRepositoryInterface, JdbcQueueIndexerInterface<FlowTopology> {
protected final io.kestra.jdbc.AbstractJdbcRepository<FlowTopology> jdbcRepository;

public AbstractJdbcFlowTopologyRepository(io.kestra.jdbc.AbstractJdbcRepository<FlowTopology> jdbcRepository) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.kestra.core.utils.ListUtils;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
import org.jooq.Record;
Expand All @@ -24,7 +24,7 @@
import java.util.*;
import java.util.stream.Collectors;

public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface, JdbcIndexerInterface<LogEntry> {
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
protected io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository;

public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository) {
Expand Down Expand Up @@ -342,6 +342,15 @@ public LogEntry save(LogEntry log) {
return log;
}

@Override
public int saveBatch(List<LogEntry> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}

return this.jdbcRepository.persistBatch(items);
}

@Override
public Integer purge(Execution execution) {
return this.jdbcRepository
Expand All @@ -355,14 +364,6 @@ public Integer purge(Execution execution) {
});
}

@Override
public LogEntry save(DSLContext dslContext, LogEntry logEntry) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(logEntry);
this.jdbcRepository.persist(logEntry, dslContext, fields);

return logEntry;
}

@Override
public void deleteByQuery(String tenantId, String executionId, String taskId, String taskRunId, Level minLevel, Integer attempt) {
this.jdbcRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.utils.DateUtils;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.kestra.core.utils.ListUtils;
import io.micrometer.common.lang.Nullable;
import io.micronaut.data.model.Pageable;
import org.jooq.*;
Expand All @@ -23,7 +23,7 @@
import java.util.Map;
import java.util.function.Function;

public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface, JdbcIndexerInterface<MetricEntry> {
public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface {
protected io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository;

public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository<MetricEntry> jdbcRepository) {
Expand Down Expand Up @@ -142,6 +142,15 @@ public MetricEntry save(MetricEntry metric) {
return metric;
}

@Override
public int saveBatch(List<MetricEntry> items) {
if (ListUtils.isEmpty(items)) {
return 0;
}

return this.jdbcRepository.persistBatch(items);
}

@Override
public Integer purge(Execution execution) {
return this.jdbcRepository
Expand All @@ -155,14 +164,6 @@ public Integer purge(Execution execution) {
});
}

@Override
public MetricEntry save(DSLContext dslContext, MetricEntry metric) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(metric);
this.jdbcRepository.persist(metric, dslContext, fields);

return metric;
}

private List<String> queryDistinct(String tenantId, Condition condition, String field) {
return this.jdbcRepository
.getDslContextWrapper()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.schedulers.ScheduleContextInterface;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.kestra.jdbc.runner.JdbcQueueIndexerInterface;
import io.kestra.jdbc.runner.JdbcSchedulerContext;
import io.micronaut.data.model.Pageable;
import jakarta.annotation.Nullable;
Expand All @@ -24,7 +24,7 @@
import java.util.Optional;
import java.util.function.Function;

public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcRepository implements TriggerRepositoryInterface, JdbcIndexerInterface<Trigger> {
public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcRepository implements TriggerRepositoryInterface, JdbcQueueIndexerInterface<Trigger> {
public static final Field<Object> NAMESPACE_FIELD = field("namespace");

protected io.kestra.jdbc.AbstractJdbcRepository<Trigger> jdbcRepository;
Expand Down
1 change: 1 addition & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ void reEmitWorkerJobsForWorkers(final Configuration configuration,
});
}

// TODO proto executionprocess queue without exec content
private void executionQueue(Either<Execution, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize an execution: {}", either.getRight().getMessage());
Expand Down
Loading
Loading