Maximo Guerrero
Posted on December 31, 2019
Sometimes you need a simple job queue to enable offline or deferred processing of data. While this is not a pub/sub system which is what a lot of people us Queues for, you could expand upon it.
I will be using Postgres as the database (you could apply the concepts in this post to other RDMS). What this is not is a tutorial in SQL, I assume you know how to write and understand basic sql(select, insert, update, and delete).
Let's start with the definition of our table. One thing you will notice is that we have a column called jobData of type json, this is so that you store the data need to process a job in a structured way. Feel free to change it to TEXT or anything else.
CREATE TABLE public."jobQueue"
(
"jobId" serial NOT NULL ,
"jobData" json NOT NULL,
status character varying ,
added timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
started timestamp without time zone,
ended timestamp without time zone,
CONSTRAINT "jobQueue_pkey" PRIMARY KEY ("jobId")
)
Now the secret to using a database table as a queue, is to lock the row for updates while you're getting a job from the queue.
Let's create a function that will take as an argument the number of jobs to pull off the queue.
CREATE OR REPLACE FUNCTION public."jobQueue_getJobs"(
"_numJobsToGet" integer)
RETURNS TABLE("jobId" integer, status character varying)
LANGUAGE 'plpgsql'
AS $BODY$
DECLARE _jobId int;
DECLARE _status character varying;
BEGIN
FOR _jobId,_status IN
SELECT jq1."jobId", jq1.status, jq1.type
FROM "jobQueue" jq1
WHERE
jq1.status = 'new'
and
jq1.added < NOW()
ORDER BY added
LIMIT "_numJobsToGet" FOR UPDATE SKIP LOCKED
LOOP
UPDATE "jobQueue" jq2 SET jq2."status" = 'pickedup'
WHERE jq2."jobId" = _jobId;
RETURN NEXT ;
END LOOP;
RETURN;
END
So executing this function will return a list of jobIds' that then you can be used to select and update the jobQueue table without any other process stepping on your jobs.
One thing about this process is that if a process dies and a job is stuck in limbo you can restart it without any data loss.
Once your done processing the job you can mark it as done or remove it from the queue if you are space constrained.
Posted on December 31, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 26, 2024
November 25, 2024
November 23, 2024