From 0704e800a926f39a65f13c284f4987c7f18e5d05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Fri, 20 Sep 2024 10:51:42 +0200 Subject: [PATCH] feat(core, jdbc): introduce a JDBC indexer --- .../repositories/SaveRepositoryInterface.java | 10 +- .../java/io/kestra/core/runners/Indexer.java | 133 +----------------- .../postgres/PostgresRepository.java | 22 +++ .../kestra/jdbc/AbstractJdbcRepository.java | 19 +++ .../AbstractJdbcExecutionRepository.java | 14 +- .../AbstractJdbcFlowTopologyRepository.java | 4 +- .../repository/AbstractJdbcLogRepository.java | 21 +-- .../AbstractJdbcMetricRepository.java | 21 +-- .../AbstractJdbcTriggerRepository.java | 4 +- .../io/kestra/jdbc/runner/JdbcExecutor.java | 1 + .../io/kestra/jdbc/runner/JdbcIndexer.java | 113 +++++++++++++-- .../java/io/kestra/jdbc/runner/JdbcQueue.java | 20 +++ .../kestra/jdbc/runner/JdbcQueueIndexer.java | 9 +- ...ce.java => JdbcQueueIndexerInterface.java} | 2 +- .../io/kestra/jdbc/runner/JdbcQueueTest.java | 9 +- 15 files changed, 226 insertions(+), 176 deletions(-) rename jdbc/src/main/java/io/kestra/jdbc/runner/{JdbcIndexerInterface.java => JdbcQueueIndexerInterface.java} (68%) diff --git a/core/src/main/java/io/kestra/core/repositories/SaveRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/SaveRepositoryInterface.java index bcc85356e8..5005971005 100644 --- a/core/src/main/java/io/kestra/core/repositories/SaveRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/SaveRepositoryInterface.java @@ -1,5 +1,11 @@ package io.kestra.core.repositories; -public interface SaveRepositoryInterface { - T save(T flow); +import java.util.List; + +public interface SaveRepositoryInterface { + T save(T item); + + default int saveBatch(List items) { + throw new UnsupportedOperationException(); + } } diff --git a/core/src/main/java/io/kestra/core/runners/Indexer.java b/core/src/main/java/io/kestra/core/runners/Indexer.java index 24f561cdf8..bd42182f5a 100644 --- a/core/src/main/java/io/kestra/core/runners/Indexer.java +++ b/core/src/main/java/io/kestra/core/runners/Indexer.java @@ -1,133 +1,4 @@ package io.kestra.core.runners; - -import io.kestra.core.metrics.MetricRegistry; -import io.kestra.core.models.executions.Execution; -import io.kestra.core.models.executions.LogEntry; -import io.kestra.core.models.executions.MetricEntry; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; -import io.kestra.core.repositories.ExecutionRepositoryInterface; -import io.kestra.core.repositories.LogRepositoryInterface; -import io.kestra.core.repositories.MetricRepositoryInterface; -import io.kestra.core.repositories.SaveRepositoryInterface; -import io.kestra.core.repositories.TriggerRepositoryInterface; -import io.kestra.core.server.ServiceStateChangeEvent; -import io.kestra.core.utils.IdUtils; -import io.micronaut.context.annotation.Requires; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; - -import io.micronaut.context.event.ApplicationEventPublisher; -import jakarta.annotation.PreDestroy; -import jakarta.inject.Inject; -import jakarta.inject.Named; -import jakarta.inject.Singleton; -import lombok.extern.slf4j.Slf4j; - -@SuppressWarnings("this-escape") -@Slf4j -@Singleton -@Requires(beans = {ExecutionRepositoryInterface.class, LogRepositoryInterface.class, TriggerRepositoryInterface.class}) -public class Indexer implements IndexerInterface { - private final ExecutionRepositoryInterface executionRepository; - private final QueueInterface executionQueue; - private final LogRepositoryInterface logRepository; - private final QueueInterface logQueue; - - private final MetricRepositoryInterface metricRepository; - private final QueueInterface metricQueue; - private final MetricRegistry metricRegistry; - private final List receiveCancellations = new ArrayList<>(); - - private final String id = IdUtils.create(); - private final AtomicReference state = new AtomicReference<>(); - private final ApplicationEventPublisher eventPublisher; - - @Inject - public Indexer( - ExecutionRepositoryInterface executionRepository, - @Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface executionQueue, - LogRepositoryInterface logRepository, - @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface logQueue, - MetricRepositoryInterface metricRepositor, - @Named(QueueFactoryInterface.METRIC_QUEUE) QueueInterface metricQueue, - MetricRegistry metricRegistry, - ApplicationEventPublisher eventPublisher - ) { - this.executionRepository = executionRepository; - this.executionQueue = executionQueue; - this.logRepository = logRepository; - this.logQueue = logQueue; - this.metricRepository = metricRepositor; - this.metricQueue = metricQueue; - this.metricRegistry = metricRegistry; - this.eventPublisher = eventPublisher; - setState(ServiceState.CREATED); - } - - @Override - public void run() { - this.send(executionQueue, executionRepository); - this.send(logQueue, logRepository); - this.send(metricQueue, metricRepository); - setState(ServiceState.RUNNING); - } - - protected void send(QueueInterface queueInterface, SaveRepositoryInterface saveRepositoryInterface) { - this.receiveCancellations.addFirst(queueInterface.receive(Indexer.class, either -> { - if (either.isRight()) { - log.error("unable to deserialize an item: {}", either.getRight().getMessage()); - return; - } - - T item = either.getLeft(); - this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_REQUEST_COUNT, "type", item.getClass().getName()).increment(); - this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, "type", item.getClass().getName()).increment(); - - this.metricRegistry.timer(MetricRegistry.METRIC_INDEXER_REQUEST_DURATION, "type", item.getClass().getName()).record(() -> { - saveRepositoryInterface.save(item); - this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT, "type", item.getClass().getName()).increment(); - }); - })); - } - - protected void setState(final ServiceState state) { - this.state.set(state); - this.eventPublisher.publishEvent(new ServiceStateChangeEvent(this)); - } - - /** {@inheritDoc} **/ - @Override - public String getId() { - return id; - } - /** {@inheritDoc} **/ - @Override - public ServiceType getType() { - return ServiceType.INDEXER; - } - /** {@inheritDoc} **/ - @Override - public ServiceState getState() { - return state.get(); - } - - @PreDestroy - @Override - public void close() { - setState(ServiceState.TERMINATING); - this.receiveCancellations.forEach(Runnable::run); - try { - this.executionQueue.close(); - this.logQueue.close(); - this.metricQueue.close(); - setState(ServiceState.TERMINATED_GRACEFULLY); - } catch (IOException e) { - log.error("Failed to close the queue", e); - setState(ServiceState.TERMINATED_FORCED); - } - } +// NOTE: this class is not used anymore but must be kept as it is used in as queue consumer both in JDBC and Kafka +public class Indexer { } diff --git a/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresRepository.java b/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresRepository.java index 2b8d0b1620..2421baf35a 100644 --- a/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresRepository.java +++ b/jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresRepository.java @@ -22,6 +22,7 @@ import org.jooq.SelectConditionStep; import org.jooq.impl.DSL; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -75,6 +76,27 @@ public void persist(T entity, DSLContext context, @Nullable Map, .execute(); } + @Override + public int persistBatch(List items) { + return dslContextWrapper.transactionResult(configuration -> { + DSLContext dslContext = DSL.using(configuration); + var inserts = items.stream().map(item -> { + Map, Object> finalFields = this.persistFields(item); + + return dslContext + .insertInto(table) + .set(AbstractJdbcRepository.field("key"), key(item)) + .set(finalFields) + .onConflict(AbstractJdbcRepository.field("key")) + .doUpdate() + .set(finalFields); + }) + .toList(); + + return Arrays.stream(dslContext.batch(inserts).execute()).sum(); + }); + } + @SuppressWarnings("unchecked") @Override public ArrayListTotal fetchPage(DSLContext context, SelectConditionStep select, Pageable pageable, RecordMapper mapper) { diff --git a/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java b/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java index bb487b4cb6..a988480719 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java @@ -96,6 +96,25 @@ public void persist(T entity, DSLContext dslContext, Map, Object> .execute(); } + public int persistBatch(List items) { + return dslContextWrapper.transactionResult(configuration -> { + DSLContext dslContext = DSL.using(configuration); + var inserts = items.stream().map(item -> { + Map, Object> finalFields = this.persistFields(item); + + return dslContext + .insertInto(table) + .set(io.kestra.jdbc.repository.AbstractJdbcRepository.field("key"), key(item)) + .set(finalFields) + .onDuplicateKeyUpdate() + .set(finalFields); + }) + .toList(); + + return Arrays.stream(dslContext.batch(inserts).execute()).sum(); + }); + } + public int delete(T entity) { return dslContextWrapper.transactionResult(configuration -> { return this.delete(DSL.using(configuration), entity); diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java index c4790dbc41..94b54e865a 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepository.java @@ -18,9 +18,10 @@ import io.kestra.core.runners.Executor; import io.kestra.core.runners.ExecutorState; import io.kestra.core.utils.DateUtils; +import io.kestra.core.utils.ListUtils; import io.kestra.core.utils.NamespaceUtils; import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage; -import io.kestra.jdbc.runner.JdbcIndexerInterface; +import io.kestra.jdbc.runner.JdbcQueueIndexerInterface; import io.micronaut.context.ApplicationContext; import io.micronaut.context.event.ApplicationEventPublisher; import io.micronaut.data.model.Pageable; @@ -57,7 +58,7 @@ import java.util.function.Function; import java.util.stream.Collectors; -public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcIndexerInterface { +public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcQueueIndexerInterface { private static final int FETCH_SIZE = 100; private static final Field STATE_CURRENT_FIELD = field("state_current", String.class); private static final Field NAMESPACE_FIELD = field("namespace", String.class); @@ -942,6 +943,15 @@ public Execution save(DSLContext dslContext, Execution execution) { return execution; } + @Override + public int saveBatch(List items) { + if (ListUtils.isEmpty(items)) { + return 0; + } + + return this.jdbcRepository.persistBatch(items); + } + @Override public Execution update(Execution execution) { return this.jdbcRepository diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowTopologyRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowTopologyRepository.java index f608169ef3..d67263f77c 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowTopologyRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcFlowTopologyRepository.java @@ -3,7 +3,7 @@ import io.kestra.core.models.flows.Flow; import io.kestra.core.models.topologies.FlowTopology; import io.kestra.core.repositories.FlowTopologyRepositoryInterface; -import io.kestra.jdbc.runner.JdbcIndexerInterface; +import io.kestra.jdbc.runner.JdbcQueueIndexerInterface; import org.jooq.*; import org.jooq.Record; import org.jooq.impl.DSL; @@ -12,7 +12,7 @@ import java.util.List; import java.util.Map; -public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRepository implements FlowTopologyRepositoryInterface, JdbcIndexerInterface { +public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRepository implements FlowTopologyRepositoryInterface, JdbcQueueIndexerInterface { protected final io.kestra.jdbc.AbstractJdbcRepository jdbcRepository; public AbstractJdbcFlowTopologyRepository(io.kestra.jdbc.AbstractJdbcRepository jdbcRepository) { diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcLogRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcLogRepository.java index c172380178..d31e8567e8 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcLogRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcLogRepository.java @@ -6,7 +6,7 @@ import io.kestra.core.repositories.ArrayListTotal; import io.kestra.core.repositories.LogRepositoryInterface; import io.kestra.core.utils.DateUtils; -import io.kestra.jdbc.runner.JdbcIndexerInterface; +import io.kestra.core.utils.ListUtils; import io.micronaut.data.model.Pageable; import jakarta.annotation.Nullable; import org.jooq.Record; @@ -24,7 +24,7 @@ import java.util.*; import java.util.stream.Collectors; -public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface, JdbcIndexerInterface { +public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface { protected io.kestra.jdbc.AbstractJdbcRepository jdbcRepository; public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository jdbcRepository) { @@ -342,6 +342,15 @@ public LogEntry save(LogEntry log) { return log; } + @Override + public int saveBatch(List items) { + if (ListUtils.isEmpty(items)) { + return 0; + } + + return this.jdbcRepository.persistBatch(items); + } + @Override public Integer purge(Execution execution) { return this.jdbcRepository @@ -355,14 +364,6 @@ public Integer purge(Execution execution) { }); } - @Override - public LogEntry save(DSLContext dslContext, LogEntry logEntry) { - Map, Object> fields = this.jdbcRepository.persistFields(logEntry); - this.jdbcRepository.persist(logEntry, dslContext, fields); - - return logEntry; - } - @Override public void deleteByQuery(String tenantId, String executionId, String taskId, String taskRunId, Level minLevel, Integer attempt) { this.jdbcRepository diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepository.java index d920a0ebe0..ea489e705f 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcMetricRepository.java @@ -7,7 +7,7 @@ import io.kestra.core.repositories.ArrayListTotal; import io.kestra.core.repositories.MetricRepositoryInterface; import io.kestra.core.utils.DateUtils; -import io.kestra.jdbc.runner.JdbcIndexerInterface; +import io.kestra.core.utils.ListUtils; import io.micrometer.common.lang.Nullable; import io.micronaut.data.model.Pageable; import org.jooq.*; @@ -23,7 +23,7 @@ import java.util.Map; import java.util.function.Function; -public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface, JdbcIndexerInterface { +public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepository implements MetricRepositoryInterface { protected io.kestra.jdbc.AbstractJdbcRepository jdbcRepository; public AbstractJdbcMetricRepository(io.kestra.jdbc.AbstractJdbcRepository jdbcRepository) { @@ -142,6 +142,15 @@ public MetricEntry save(MetricEntry metric) { return metric; } + @Override + public int saveBatch(List items) { + if (ListUtils.isEmpty(items)) { + return 0; + } + + return this.jdbcRepository.persistBatch(items); + } + @Override public Integer purge(Execution execution) { return this.jdbcRepository @@ -155,14 +164,6 @@ public Integer purge(Execution execution) { }); } - @Override - public MetricEntry save(DSLContext dslContext, MetricEntry metric) { - Map, Object> fields = this.jdbcRepository.persistFields(metric); - this.jdbcRepository.persist(metric, dslContext, fields); - - return metric; - } - private List queryDistinct(String tenantId, Condition condition, String field) { return this.jdbcRepository .getDslContextWrapper() diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java index 4afc6e31d7..312542e0eb 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java @@ -9,7 +9,7 @@ import io.kestra.core.repositories.ArrayListTotal; import io.kestra.core.repositories.TriggerRepositoryInterface; import io.kestra.core.schedulers.ScheduleContextInterface; -import io.kestra.jdbc.runner.JdbcIndexerInterface; +import io.kestra.jdbc.runner.JdbcQueueIndexerInterface; import io.kestra.jdbc.runner.JdbcSchedulerContext; import io.micronaut.data.model.Pageable; import jakarta.annotation.Nullable; @@ -24,7 +24,7 @@ import java.util.Optional; import java.util.function.Function; -public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcRepository implements TriggerRepositoryInterface, JdbcIndexerInterface { +public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcRepository implements TriggerRepositoryInterface, JdbcQueueIndexerInterface { public static final Field NAMESPACE_FIELD = field("namespace"); protected io.kestra.jdbc.AbstractJdbcRepository jdbcRepository; diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 4c16a12c64..bdecf58542 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -339,6 +339,7 @@ void reEmitWorkerJobsForWorkers(final Configuration configuration, }); } + // TODO proto executionprocess queue without exec content private void executionQueue(Either either) { if (either.isRight()) { log.error("Unable to deserialize an execution: {}", either.getRight().getMessage()); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexer.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexer.java index f7c5a5b0db..459354cdf6 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexer.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexer.java @@ -1,36 +1,131 @@ package io.kestra.jdbc.runner; +import io.kestra.core.metrics.MetricRegistry; +import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.executions.MetricEntry; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.repositories.LogRepositoryInterface; +import io.kestra.core.repositories.MetricRepositoryInterface; +import io.kestra.core.repositories.SaveRepositoryInterface; import io.kestra.core.runners.Indexer; import io.kestra.core.runners.IndexerInterface; -import io.micronaut.context.annotation.Replaces; +import io.kestra.core.server.ServiceStateChangeEvent; +import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.ListUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import io.micronaut.context.event.ApplicationEventPublisher; +import jakarta.annotation.PreDestroy; +import jakarta.inject.Inject; +import jakarta.inject.Named; import jakarta.inject.Singleton; import lombok.extern.slf4j.Slf4j; +/** + * This class is responsible to batch-indexed asynchronously queue messages.

+ * Some queue messages are indexed synchronously via the {@link JdbcQueueIndexer}. + */ +@SuppressWarnings("this-escape") +@Slf4j @Singleton -@Replaces(Indexer.class) @JdbcRunnerEnabled -@Slf4j public class JdbcIndexer implements IndexerInterface { + private final LogRepositoryInterface logRepository; + private final JdbcQueue logQueue; + + private final MetricRepositoryInterface metricRepository; + private final JdbcQueue metricQueue; + private final MetricRegistry metricRegistry; + private final List receiveCancellations = new ArrayList<>(); + + private final String id = IdUtils.create(); + private final AtomicReference state = new AtomicReference<>(); + private final ApplicationEventPublisher eventPublisher; + + @Inject + public JdbcIndexer( + LogRepositoryInterface logRepository, + @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface logQueue, + MetricRepositoryInterface metricRepositor, + @Named(QueueFactoryInterface.METRIC_QUEUE) QueueInterface metricQueue, + MetricRegistry metricRegistry, + ApplicationEventPublisher eventPublisher + ) { + this.logRepository = logRepository; + this.logQueue = (JdbcQueue) logQueue; + this.metricRepository = metricRepositor; + this.metricQueue = (JdbcQueue) metricQueue; + this.metricRegistry = metricRegistry; + this.eventPublisher = eventPublisher; + + setState(ServiceState.CREATED); + } + + @Override public void run() { + this.sendBatch(logQueue, logRepository); + this.sendBatch(metricQueue, metricRepository); + setState(ServiceState.RUNNING); + } + + protected void sendBatch(JdbcQueue queueInterface, SaveRepositoryInterface saveRepositoryInterface) { + this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> { + // first, log all deserialization issues + eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", either.getRight().getMessage())); + + // then index all correctly deserialized items + List items = eithers.stream().filter(either -> either.isLeft()).map(either -> either.getLeft()).toList(); + if (!ListUtils.isEmpty(items)) { + String itemClassName = items.getFirst().getClass().getName(); + this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_REQUEST_COUNT, "type", itemClassName).increment(); + this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, "type", itemClassName).increment(items.size()); + + this.metricRegistry.timer(MetricRegistry.METRIC_INDEXER_REQUEST_DURATION, "type", itemClassName).record(() -> { + int saved = saveRepositoryInterface.saveBatch(items); + this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT, "type", itemClassName).increment(saved); + }); + } + })); } + protected void setState(final ServiceState state) { + this.state.set(state); + this.eventPublisher.publishEvent(new ServiceStateChangeEvent(this)); + } + + /** {@inheritDoc} **/ @Override public String getId() { - return ""; + return id; } - + /** {@inheritDoc} **/ @Override public ServiceType getType() { - return null; + return ServiceType.INDEXER; } - + /** {@inheritDoc} **/ @Override public ServiceState getState() { - return ServiceState.RUNNING; + return state.get(); } + @PreDestroy @Override public void close() { - + setState(ServiceState.TERMINATING); + this.receiveCancellations.forEach(Runnable::run); + try { + this.logQueue.close(); + this.metricQueue.close(); + setState(ServiceState.TERMINATED_GRACEFULLY); + } catch (IOException e) { + log.error("Failed to close the queue", e); + setState(ServiceState.TERMINATED_FORCED); + } } } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java index 6608a8c96f..8e05fa26ea 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java @@ -221,6 +221,26 @@ public Runnable receive(String consumerGroup, Class queueType, Consumer queueType, Consumer>> consumer) { + return receiveBatch(null, queueType, consumer); + } + + public Runnable receiveBatch(String consumerGroup, Class queueType, Consumer>> consumer) { + return receiveBatch(consumerGroup, queueType, consumer, true); + } + + public Runnable receiveBatch(String consumerGroup, Class queueType, Consumer>> consumer, boolean forUpdate) { + return this.receiveImpl( + consumerGroup, + queueType, + (dslContext, eithers) -> { + consumer.accept(eithers); + }, + false, + forUpdate + ); + } + public Runnable receiveTransaction(String consumerGroup, Class queueType, BiConsumer>> consumer) { return this.receiveImpl( consumerGroup, diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexer.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexer.java index 44af37776a..4b3a63be3c 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexer.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexer.java @@ -11,16 +11,21 @@ import java.util.HashMap; import java.util.Map; +/** + * This class is responsible to index the queue synchronously at message production time.

+ * Some queue messages are batch-indexed asynchronously via the {@link JdbcIndexer} + * which listen to (receive) those queue messages. + */ @Slf4j @Singleton public class JdbcQueueIndexer { - private final Map, JdbcIndexerInterface> repositories = new HashMap<>(); + private final Map, JdbcQueueIndexerInterface> repositories = new HashMap<>(); private final MetricRegistry metricRegistry; @Inject public JdbcQueueIndexer(ApplicationContext applicationContext) { - applicationContext.getBeansOfType(JdbcIndexerInterface.class) + applicationContext.getBeansOfType(JdbcQueueIndexerInterface.class) .forEach(saveRepositoryInterface -> { String typeName = ((ParameterizedType) ((Class) saveRepositoryInterface.getClass() .getGenericSuperclass()).getGenericInterfaces()[1]).getActualTypeArguments()[0].getTypeName(); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexerInterface.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexerInterface.java similarity index 68% rename from jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexerInterface.java rename to jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexerInterface.java index b05dcb8c95..64b635dbac 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexerInterface.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueueIndexerInterface.java @@ -2,6 +2,6 @@ import org.jooq.DSLContext; -public interface JdbcIndexerInterface { +public interface JdbcQueueIndexerInterface { T save(DSLContext context, T message); } diff --git a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java index 4a98e0c19a..ea2bda7852 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java @@ -4,7 +4,6 @@ import io.kestra.core.queues.QueueException; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; -import io.kestra.core.runners.Indexer; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.core.debug.Return; import io.kestra.core.utils.IdUtils; @@ -81,7 +80,7 @@ void withType() throws InterruptedException, QueueException { flowQueue.emit(builder("io.kestra.f1")); CountDownLatch countDownLatch = new CountDownLatch(1); - Flux receive = TestsUtils.receive(flowQueue, Indexer.class, either -> { + Flux receive = TestsUtils.receive(flowQueue, JdbcIndexer.class, either -> { countDownLatch.countDown(); }); @@ -93,7 +92,7 @@ void withType() throws InterruptedException, QueueException { flowQueue.emit(builder("io.kestra.f2")); CountDownLatch countDownLatch2 = new CountDownLatch(1); - receive = TestsUtils.receive(flowQueue, Indexer.class, either -> { + receive = TestsUtils.receive(flowQueue, JdbcIndexer.class, either -> { countDownLatch2.countDown(); }); countDownLatch2.await(5, TimeUnit.SECONDS); @@ -107,7 +106,7 @@ void withGroupAndType() throws InterruptedException, QueueException { flowQueue.emit("consumer_group", builder("io.kestra.f1")); CountDownLatch countDownLatch = new CountDownLatch(1); - Flux receive = TestsUtils.receive(flowQueue, "consumer_group", Indexer.class, either -> { + Flux receive = TestsUtils.receive(flowQueue, "consumer_group", JdbcIndexer.class, either -> { countDownLatch.countDown(); }); @@ -119,7 +118,7 @@ void withGroupAndType() throws InterruptedException, QueueException { flowQueue.emit("consumer_group", builder("io.kestra.f2")); CountDownLatch countDownLatch2 = new CountDownLatch(1); - receive = TestsUtils.receive(flowQueue, "consumer_group", Indexer.class, either -> { + receive = TestsUtils.receive(flowQueue, "consumer_group", JdbcIndexer.class, either -> { countDownLatch2.countDown(); }); countDownLatch2.await(5, TimeUnit.SECONDS);