Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2201.9.x] Improve query performance #745

Merged
merged 3 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ modules = [
[[package]]
org = "ballerina"
name = "http"
version = "2.11.3"
version = "2.11.5"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "auth"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -34,7 +35,8 @@ private SQLWorkerThreadPool() {

// This is similar to cachedThreadPool util from Executors.newCachedThreadPool(..); but with upper cap on threads
public static final ExecutorService SQL_EXECUTOR_SERVICE = new ThreadPoolExecutor(0, 50,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new SQLThreadFactory());
60L, TimeUnit.SECONDS, new BlockingTaskQueue(), new SQLThreadFactory(),
new RetryTaskRejectionPolicy());

static class SQLThreadFactory implements ThreadFactory {
@Override
Expand All @@ -44,4 +46,31 @@ public Thread newThread(Runnable r) {
return ballerinaSql;
}
}

static class BlockingTaskQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;

@Override
public boolean offer(Runnable task) {
// By returning false, we signal the ThreadPoolExecutor to bypass this queue and attempt to
// spawn a new thread if it hasn't reached the maximum pool size. This approach favors creating
// new threads over queuing tasks, thereby enabling more aggressive parallelism.
return false;
}

public void retryTask(Runnable task) {
if (!super.offer(task)) {
throw new IllegalStateException("Falied to requeue task: " + task);
}
}
}

static class RetryTaskRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
if (executor.getQueue() instanceof BlockingTaskQueue cbq) {
cbq.retryTask(task);
}
}
}
}
Loading