Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added experimental background job queue #20985

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ghost/core/core/server/data/schema/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,9 @@ module.exports = {
started_at: {type: 'dateTime', nullable: true},
finished_at: {type: 'dateTime', nullable: true},
created_at: {type: 'dateTime', nullable: false},
updated_at: {type: 'dateTime', nullable: true}
updated_at: {type: 'dateTime', nullable: true},
metadata: {type: 'text', maxlength: 1000000000, fieldtype: 'long', nullable: true},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a sense of how big we'd expect this field to actually be? I wonder if allowing a maxlength this large is potentially encouraging misuse of this field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had it like other text fields at 2000 chars. I think that ought to be sufficient, but this is why I was holding off on the migration. We could start with 2000 and go from there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah 2000 seems more sane to me — in most cases this should basically be a resource ID and maybe a couple small pieces of metadata (e.g. a timestamp or two, maybe a url pointing to a file or storage bucket, etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I'll bump back. Basically, since this is a 'JSON' field I had copied what we had elsewhere. Agree it seems to be overkill and it's easier to bump up than down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted. If I mark this resolved, does it also resolve it for you? I forget how GH handles that.

queue_entry: {type: 'integer', nullable: true, unsigned: true}
9larsons marked this conversation as resolved.
Show resolved Hide resolved
},
redirects: {
id: {type: 'string', maxlength: 24, nullable: false, primary: true},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const logging = require('@tryghost/logging');
const JobManager = require('../../services/jobs');
const path = require('path');

class EmailAnalyticsServiceWrapper {
init() {
Expand All @@ -11,7 +13,7 @@ class EmailAnalyticsServiceWrapper {
const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun');
const {EmailRecipientFailure, EmailSpamComplaintEvent, Email} = require('../../models');
const StartEmailAnalyticsJobEvent = require('./events/StartEmailAnalyticsJobEvent');

const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');
const settings = require('../../../shared/settings-cache');
Expand Down Expand Up @@ -47,14 +49,28 @@ class EmailAnalyticsServiceWrapper {
providers: [
new MailgunProvider({config, settings})
],
queries
queries,
domainEvents
});

// We currently cannot trigger a non-offloaded job from the job manager
// So the email analytics jobs simply emits an event.
domainEvents.subscribe(StartEmailAnalyticsJobEvent, async () => {
await this.startFetch();
});

domainEvents.subscribe(MemberEmailAnalyticsUpdateEvent, async (event) => {
const memberId = event.data.memberId;
await JobManager.addQueuedJob({
name: `update-member-email-analytics-${memberId}`,
metadata: {
job: path.resolve(__dirname, 'jobs/update-member-email-analytics'),
data: {
memberId
}
}
});
});
}

async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module.exports = {
const m = Math.floor(Math.random() * 5); // 0-4

jobsService.addJob({
at: `${s} ${m}/5 * * * *`,
at: `${s} ${m}/1 * * * *`,
9larsons marked this conversation as resolved.
Show resolved Hide resolved
job: path.resolve(__dirname, 'fetch-latest/index.js'),
name: 'email-analytics-fetch-latest'
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const queries = require('../../lib/queries');

/**
* Updates email analytics for a specific member
*
* @param {Object} options - The options object
* @param {string} options.memberId - The ID of the member to update analytics for
* @returns {Promise<Object>} The result of the aggregation query (1/0)
*/
module.exports = async function updateMemberEmailAnalytics({memberId}) {
const result = await queries.aggregateMemberStats(memberId);
return result;
};
4 changes: 3 additions & 1 deletion ghost/core/core/server/services/jobs/job-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const logging = require('@tryghost/logging');
const models = require('../../models');
const sentry = require('../../../shared/sentry');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');
const db = require('../../data/db');

const errorHandler = (error, workerMeta) => {
logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`);
Expand Down Expand Up @@ -42,7 +44,7 @@ const initTestMode = () => {
}, 5000);
};

const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config, db: db});

module.exports = jobManager;
module.exports.initTestMode = initTestMode;
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const initTestMode = () => {
}, 5000);
};

const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, isDuplicate: true});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker but just a note for future: it would be good to get to the bottom of why we need two instances of the job system, and ideally fix that so we don't need this anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! I've got a note to look into this because this is rather clunky way of handling it that I'm also really not a fan of.


module.exports = jobManager;
module.exports.initTestMode = initTestMode;
12 changes: 10 additions & 2 deletions ghost/email-analytics-service/lib/EmailAnalyticsService.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const EventProcessingResult = require('./EventProcessingResult');
const logging = require('@tryghost/logging');
const errors = require('@tryghost/errors');
const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events');

/**
* @typedef {import('@tryghost/email-service').EmailEventProcessor} EmailEventProcessor
Expand Down Expand Up @@ -73,13 +74,15 @@ module.exports = class EmailAnalyticsService {
* @param {object} dependencies.queries
* @param {EmailEventProcessor} dependencies.eventProcessor
* @param {object} dependencies.providers
* @param {import('@tryghost/domain-events')} dependencies.domainEvents
*/
constructor({config, settings, queries, eventProcessor, providers}) {
constructor({config, settings, queries, eventProcessor, providers, domainEvents}) {
this.config = config;
this.settings = settings;
this.queries = queries;
this.eventProcessor = eventProcessor;
this.providers = providers;
this.domainEvents = domainEvents;
}

getStatus() {
Expand Down Expand Up @@ -511,7 +514,12 @@ module.exports = class EmailAnalyticsService {
startTime = Date.now();
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members`);
for (const memberId of memberIds) {
await this.aggregateMemberStats(memberId);
if (this.config.get('services:jobs:queue:enabled')) {
// With the queue enabled we will dispatch an event to update the member email analytics on the background queue (multithreaded :))
await this.domainEvents.dispatch(MemberEmailAnalyticsUpdateEvent.create({memberId}));
} else {
await this.aggregateMemberStats(memberId);
}
}
endTime = Date.now() - startTime;
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members took ${endTime}ms`);
Expand Down
84 changes: 65 additions & 19 deletions ghost/job-manager/lib/JobManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const logging = require('@tryghost/logging');
const isCronExpression = require('./is-cron-expression');
const assembleBreeJob = require('./assemble-bree-job');
const JobsRepository = require('./JobsRepository');
const JobQueueManager = require('./JobQueueManager');

const worker = async (task, callback) => {
try {
Expand All @@ -27,22 +28,36 @@ const ALL_STATUSES = {
queued: 'queued'
};

/**
* @typedef {Object} ScheduledJob
* @property {Function | string} job - Function or path to a module defining a job
* @property {string} [name] - Unique job name, if not provided takes function name or job script filename
* @property {string | Date} [at] - Date, cron or human readable schedule format
* @property {Object} [data] - Data to be passed into the job
* @property {boolean} [offloaded=true] - If true, creates an "offloaded" job running in a worker thread. If false, runs an "inline" job on the same event loop
*/
class JobManager {
#domainEvents;
#completionPromises = new Map();
#jobQueueManager = null;
#config;

/**
* @param {Object} options
* @param {Function} [options.errorHandler] - custom job error handler
* @param {Function} [options.workerMessageHandler] - custom message handler coming from workers
* @param {Object} [options.JobModel] - a model which can persist job data in the storage
* @param {Object} [options.domainEvents] - domain events emitter
* @param {Object} [options.config] - config
* @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue
* @param {Object} [options.db] - the database object
*/
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents}) {
this.queue = fastq(this, worker, 3);
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, db}) {
this.inlineQueue = fastq(this, worker, 3);
9larsons marked this conversation as resolved.
Show resolved Hide resolved
this._jobMessageHandler = this._jobMessageHandler.bind(this);
this._jobErrorHandler = this._jobErrorHandler.bind(this);
this.#domainEvents = domainEvents;
this.#config = config;

const combinedMessageHandler = workerMessageHandler
? ({name, message}) => {
Expand Down Expand Up @@ -72,7 +87,15 @@ class JobManager {
});

if (JobModel) {
this._jobsRepository = new JobsRepository({JobModel});
this._jobsRepository = new JobsRepository({JobModel, db});
}

// We have a duplicate job manager in Ghost core for the mentions job service that should be
// refactored to use the job queue when we're able.
if (!isDuplicate && this.#config.get('services:jobs:queue:enabled')) {
logging.info(`[JobManager] Initializing job queue based on config`);
this.#jobQueueManager = new JobQueueManager({JobModel, config, db});
this.#jobQueueManager.init();
}
}

Expand All @@ -94,6 +117,31 @@ class JobManager {
};
}

/**
* @typedef {Object} QueuedJob
* @property {string} name - The name or identifier of the job.
* @property {Object} metadata - Metadata associated with the job.
* @property {string} metadata.job - The absolute path to the job to execute.
* @property {Object} metadata.data - The data associated with the job.
*/

/**
* @method addQueuedJob
* @async
* @description Adds a new job to the job repository, which will be polled and executed by the job queue manager.
* @param {QueuedJob} job - The job to be added to the queue.
* @returns {Promise<Object>} The added job model.
*/
async addQueuedJob({name, metadata}) {
// Adding some extra security so we don't add jobs when the queue is disabled from callers.
if (this.#config.get('services:jobs:queue:enabled')) {
const model = await this.#jobQueueManager.addJob({name, metadata});
return model;
}
logging.warn('[JobManager] Job queue is disabled but job was attempted to be added. job: ', name);
return Promise.reject();
}

async _jobMessageHandler({name, message}) {
if (name) {
if (message === ALL_STATUSES.started) {
Expand Down Expand Up @@ -128,7 +176,7 @@ class JobManager {
this.#completionPromises.delete(name);
}

if (this.queue.length() <= 1) {
if (this.inlineQueue.length() <= 1) {
if (this.#completionPromises.has('all')) {
for (const listeners of this.#completionPromises.get('all')) {
listeners.resolve();
Expand Down Expand Up @@ -168,7 +216,7 @@ class JobManager {
this.#completionPromises.delete(jobMeta.name);
}

if (this.queue.length() <= 1) {
if (this.inlineQueue.length() <= 1) {
if (this.#completionPromises.has('all')) {
for (const listeners of this.#completionPromises.get('all')) {
listeners.reject(error);
Expand All @@ -181,7 +229,7 @@ class JobManager {

/**
* By default schedules an "offloaded" job. If `offloaded: true` parameter is set,
* puts an "inline" immediate job into the queue.
* puts an "inline" immediate job into the inlineQueue.
*
* @param {Object} GhostJob - job options
* @prop {Function | String} GhostJob.job - function or path to a module defining a job
Expand All @@ -192,7 +240,7 @@ class JobManager {
*/
addJob({name, at, job, data, offloaded = true}) {
if (offloaded) {
logging.info('Adding offloaded job to the queue');
logging.info('Adding offloaded job to the inline job queue');
let schedule;

if (!name) {
Expand Down Expand Up @@ -229,9 +277,9 @@ class JobManager {
this.bree.add(breeJob);
return this.bree.start(name);
} else {
logging.info(`Adding one-off job to queue with current length = ${this.queue.length()} called '${name || 'anonymous'}'`);
logging.info(`Adding one-off job to inlineQueue with current length = ${this.inlineQueue.length()} called '${name || 'anonymous'}'`);

this.queue.push(async () => {
this.inlineQueue.push(async () => {
try {
// NOTE: setting the status here otherwise it is impossible to
// distinguish between states when the job fails immediately
Expand Down Expand Up @@ -325,13 +373,11 @@ class JobManager {
/**
* Awaits completion of the offloaded one-off job.
* CAUTION: it might take a long time to resolve!
* @param {String} name one-off job name
* @param {string} name one-off job name
* @returns resolves with a Job model at current state
*/
async awaitOneOffCompletion(name) {
const persistedJob = await this._jobsRepository.read({
name
});
const persistedJob = await this._jobsRepository.read(name);

if (!persistedJob || ![ALL_STATUSES.finished, ALL_STATUSES.failed].includes(persistedJob.get('status'))) {
// NOTE: can implement exponential backoff here if that's ever needed
Expand Down Expand Up @@ -366,7 +412,7 @@ class JobManager {
const name = 'all';

return new Promise((resolve, reject) => {
if (this.queue.idle()) {
if (this.inlineQueue.idle()) {
resolve();
return;
}
Expand All @@ -379,7 +425,7 @@ class JobManager {
}

/**
* Removes an "offloaded" job from scheduled jobs queue.
* Removes an "offloaded" job from scheduled jobs inlineQueue.
* It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68).
* The method will throw an Error if job with provided name does not exist.
*
Expand All @@ -398,15 +444,15 @@ class JobManager {
async shutdown(options) {
await this.bree.stop();

if (this.queue.idle()) {
if (this.inlineQueue.idle()) {
return;
}

logging.warn('Waiting for busy job queue');
logging.warn('Waiting for busy job in inline job queue');

await pWaitFor(() => this.queue.idle() === true, options);
await pWaitFor(() => this.inlineQueue.idle() === true, options);

logging.warn('Job queue finished');
logging.warn('Inline job queue finished');
}
}

Expand Down
Loading
Loading