Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added experimental background job queue #20985

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ghost/core/core/server/data/schema/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,9 @@ module.exports = {
started_at: {type: 'dateTime', nullable: true},
finished_at: {type: 'dateTime', nullable: true},
created_at: {type: 'dateTime', nullable: false},
updated_at: {type: 'dateTime', nullable: true}
updated_at: {type: 'dateTime', nullable: true},
metadata: {type: 'string', maxlength: 2000, nullable: true},
queue_entry: {type: 'integer', nullable: true, unsigned: true}
9larsons marked this conversation as resolved.
Show resolved Hide resolved
},
redirects: {
id: {type: 'string', maxlength: 24, nullable: false, primary: true},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const logging = require('@tryghost/logging');
const JobManager = require('../../services/jobs');
const path = require('path');

class EmailAnalyticsServiceWrapper {
init() {
Expand All @@ -11,7 +13,7 @@ class EmailAnalyticsServiceWrapper {
const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun');
const {EmailRecipientFailure, EmailSpamComplaintEvent, Email} = require('../../models');
const StartEmailAnalyticsJobEvent = require('./events/StartEmailAnalyticsJobEvent');

const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');
const settings = require('../../../shared/settings-cache');
Expand Down Expand Up @@ -47,14 +49,28 @@ class EmailAnalyticsServiceWrapper {
providers: [
new MailgunProvider({config, settings})
],
queries
queries,
domainEvents
});

// We currently cannot trigger a non-offloaded job from the job manager
// So the email analytics jobs simply emits an event.
domainEvents.subscribe(StartEmailAnalyticsJobEvent, async () => {
await this.startFetch();
});

domainEvents.subscribe(MemberEmailAnalyticsUpdateEvent, async (event) => {
const memberId = event.data.memberId;
await JobManager.addQueuedJob({
name: `update-member-email-analytics-${memberId}`,
metadata: {
job: path.resolve(__dirname, 'jobs/update-member-email-analytics'),
data: {
memberId
}
}
});
});
}

async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const queries = require('../../lib/queries');

/**
* Updates email analytics for a specific member
*
* @param {Object} options - The options object
* @param {string} options.memberId - The ID of the member to update analytics for
* @returns {Promise<Object>} The result of the aggregation query (1/0)
*/
module.exports = async function updateMemberEmailAnalytics({memberId}) {
const result = await queries.aggregateMemberStats(memberId);
return result;
};
5 changes: 3 additions & 2 deletions ghost/core/core/server/services/jobs/job-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const logging = require('@tryghost/logging');
const models = require('../../models');
const sentry = require('../../../shared/sentry');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');

const errorHandler = (error, workerMeta) => {
logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`);
Expand All @@ -24,7 +25,7 @@ const workerMessageHandler = ({name, message}) => {
const initTestMode = () => {
// Output job queue length every 5 seconds
setInterval(() => {
logging.warn(`${jobManager.queue.length()} jobs in the queue. Idle: ${jobManager.queue.idle()}`);
logging.warn(`${jobManager.inlineQueue.length()} jobs in the queue. Idle: ${jobManager.inlineQueue.idle()}`);

const runningScheduledjobs = Object.keys(jobManager.bree.workers);
if (Object.keys(jobManager.bree.workers).length) {
Expand All @@ -42,7 +43,7 @@ const initTestMode = () => {
}, 5000);
};

const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config});

module.exports = jobManager;
module.exports.initTestMode = initTestMode;
4 changes: 2 additions & 2 deletions ghost/core/core/server/services/mentions-jobs/job-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const workerMessageHandler = ({name, message}) => {
const initTestMode = () => {
// Output job queue length every 5 seconds
setInterval(() => {
logging.warn(`${jobManager.queue.length()} jobs in the queue. Idle: ${jobManager.queue.idle()}`);
logging.warn(`${jobManager.inlineQueue.length()} jobs in the queue. Idle: ${jobManager.inlineQueue.idle()}`);

const runningScheduledjobs = Object.keys(jobManager.bree.workers);
if (Object.keys(jobManager.bree.workers).length) {
Expand All @@ -42,7 +42,7 @@ const initTestMode = () => {
}, 5000);
};

const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, isDuplicate: true});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker but just a note for future: it would be good to get to the bottom of why we need two instances of the job system, and ideally fix that so we don't need this anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! I've got a note to look into this because this is rather clunky way of handling it that I'm also really not a fan of.


module.exports = jobManager;
module.exports.initTestMode = initTestMode;
100 changes: 100 additions & 0 deletions ghost/core/test/integration/jobs/job-queue.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
const assert = require('assert/strict');
const path = require('path');
const configUtils = require('../../utils/configUtils');
const dbUtils = require('../../utils/db-utils');
const models = require('../../../core/server/models');

// Helper function to wait for job completion
async function waitForJobCompletion(jobName, maxWaitTimeMs = 5000, checkIntervalMs = 50) {
return new Promise((resolve, reject) => {
const startTime = Date.now();
const intervalId = setInterval(async () => {
if (Date.now() - startTime >= maxWaitTimeMs) {
clearInterval(intervalId);
reject(new Error(`Job ${jobName} did not complete within ${maxWaitTimeMs}ms`));
}
const job = await models.Job.findOne({name: jobName});
if (!job) {
clearInterval(intervalId);
resolve();
}
}, checkIntervalMs);
});
}

describe('Job Queue', function () {
let jobService;

describe.only('enabled by config', function () {
beforeEach(async function () {
models.init();
configUtils.set('services:jobs:queue:enabled', true);
await new Promise((resolve) => {
setTimeout(resolve, 500);
});
jobService = require('../../../core/server/services/jobs/job-service');
});

afterEach(async function () {
await configUtils.restore();
await dbUtils.teardown();
});

it('should add and execute a job in the queue', async function () {
this.timeout(10000); // Increase timeout if needed

const job = {
name: `add-random-numbers-${Date.now()}`,
metadata: {
job: path.resolve(__dirname, './test-job.js'),
data: {}
}
};

// Add the job to the queue
console.log('Adding job to queue:', job.name);
const result = await jobService.addQueuedJob(job);
console.log('Job added:', result);
assert.ok(result);

Check warning on line 58 in ghost/core/test/integration/jobs/job-queue.test.js

View workflow job for this annotation

GitHub Actions / Database tests (Node 18.12.1, sqlite3)

Retried 'Job Queue enabled by config should add and execute a job in the queue' due to 'The expression evaluated to a falsy value: assert.ok(result) '

// Wait for the job to complete
await waitForJobCompletion(job.name, 8000); // Increase wait time

// Check job status
const jobEntry = await models.Job.findOne({name: job.name});
console.log('Job status:', jobEntry ? jobEntry.status : 'Not found');

// Verify that the job no longer exists in the queue
assert.equal(jobEntry, null);
});
});

describe('not enabled', function () {
beforeEach(async function () {
models.init();
jobService = require('../../../core/server/services/jobs/job-service');
});

afterEach(async function () {
await dbUtils.teardown();
});

it('should not add a job to the queue when disabled', async function () {
const job = {
name: `add-random-numbers-${Date.now()}`,
metadata: {
job: path.resolve(__dirname, './test-job.js'),
data: {}
}
};

// Attempt to add the job to the queue
const result = await jobService.addQueuedJob(job);
assert.equal(result, undefined);

// Verify that the job doesn't exist in the queue
const jobEntry = await models.Job.findOne({name: job.name});
assert.equal(jobEntry, null);
});
});
});
7 changes: 7 additions & 0 deletions ghost/core/test/integration/jobs/test-job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module.exports = function testJob() {
const num1 = Math.floor(Math.random() * 100);
const num2 = Math.floor(Math.random() * 100);
const result = num1 + num2;

return result;
};
2 changes: 1 addition & 1 deletion ghost/core/test/unit/server/data/schema/integrity.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const validateRouteSettings = require('../../../../../core/server/services/route
*/
describe('DB version integrity', function () {
// Only these variables should need updating
const currentSchemaHash = 'a4f016480ff73c6f52ee4c86482b45a7';
const currentSchemaHash = '1110f25f639c22135b9845c72f0be7ef';
const currentFixturesHash = 'a489d615989eab1023d4b8af0ecee7fd';
const currentSettingsHash = '051ef2a50e2edb8723e89461448313cb';
const currentRoutesHash = '3d180d52c663d173a6be791ef411ed01';
Expand Down
12 changes: 10 additions & 2 deletions ghost/email-analytics-service/lib/EmailAnalyticsService.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const EventProcessingResult = require('./EventProcessingResult');
const logging = require('@tryghost/logging');
const errors = require('@tryghost/errors');
const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events');

/**
* @typedef {import('@tryghost/email-service').EmailEventProcessor} EmailEventProcessor
Expand Down Expand Up @@ -73,13 +74,15 @@ module.exports = class EmailAnalyticsService {
* @param {object} dependencies.queries
* @param {EmailEventProcessor} dependencies.eventProcessor
* @param {object} dependencies.providers
* @param {import('@tryghost/domain-events')} dependencies.domainEvents
*/
constructor({config, settings, queries, eventProcessor, providers}) {
constructor({config, settings, queries, eventProcessor, providers, domainEvents}) {
this.config = config;
this.settings = settings;
this.queries = queries;
this.eventProcessor = eventProcessor;
this.providers = providers;
this.domainEvents = domainEvents;
}

getStatus() {
Expand Down Expand Up @@ -511,7 +514,12 @@ module.exports = class EmailAnalyticsService {
startTime = Date.now();
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members`);
for (const memberId of memberIds) {
await this.aggregateMemberStats(memberId);
if (this.config?.get('services:jobs:queue:enabled')) {
// With the queue enabled we will dispatch an event to update the member email analytics on the background queue (multithreaded :))
await this.domainEvents.dispatch(MemberEmailAnalyticsUpdateEvent.create({memberId}));
} else {
await this.aggregateMemberStats(memberId);
}
}
endTime = Date.now() - startTime;
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members took ${endTime}ms`);
Expand Down
Loading
Loading