Samples
Basic
This sample demonstrates how to create a basic one-step chain with parameters. It uses CTE to directly update the timetable schema tables.
SELECT timetable.add_job(
job_name => 'notify every minute',
job_schedule => '* * * * *',
job_command => 'SELECT pg_notify($1, $2)',
job_parameters => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb,
job_kind => 'SQL'::timetable.command_kind,
job_client_name => NULL,
job_max_instances => 1,
job_live => TRUE,
job_self_destruct => FALSE,
job_ignore_errors => TRUE
) as chain_id;
Send email
This sample demonstrates how to create an advanced email job. It will check if there are emails to send, will send them and log the status of the command execution. You don't need to setup anything, every parameter can be specified during the chain creation.
DO $$
-- An example for using the SendMail task.
DECLARE
v_mail_task_id bigint;
v_log_task_id bigint;
v_chain_id bigint;
BEGIN
-- Get the chain id
INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE)
RETURNING chain_id INTO v_chain_id;
-- Add SendMail task
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 10, 'BUILTIN', 'SendMail'
RETURNING task_id INTO v_mail_task_id;
-- Create the parameters for the SensMail task
-- "username": The username used for authenticating on the mail server
-- "password": The password used for authenticating on the mail server
-- "serverhost": The IP address or hostname of the mail server
-- "serverport": The port of the mail server
-- "senderaddr": The email that will appear as the sender
-- "ccaddr": String array of the recipients(Cc) email addresses
-- "bccaddr": String array of the recipients(Bcc) email addresses
-- "toaddr": String array of the recipients(To) email addresses
-- "subject": Subject of the email
-- "attachment": String array of the attachments (local file)
-- "attachmentdata": Pairs of name and base64-encoded content
-- "msgbody": The body of the email
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_mail_task_id, 1, '{
"username": "user@example.com",
"password": "password",
"serverhost": "smtp.example.com",
"serverport": 587,
"senderaddr": "user@example.com",
"ccaddr": ["recipient_cc@example.com"],
"bccaddr": ["recipient_bcc@example.com"],
"toaddr": ["recipient@example.com"],
"subject": "pg_timetable - No Reply",
"attachment": ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"],
"attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
"msgbody": "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!",
"contenttype": "text/html; charset=UTF-8"
}'::jsonb);
-- Add Log task and make it the last task using `task_order` column (=30)
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 30, 'BUILTIN', 'Log'
RETURNING task_id INTO v_log_task_id;
-- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task
-- Since we're using special add_task() function we don't need to specify the `chain_id`.
-- Function will take the same `chain_id` from the parent task, SendMail in this particular case
PERFORM timetable.add_task(
kind => 'SQL',
parent_id => v_mail_task_id,
command => format(
$query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>'username')
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';'))
FROM sent_mail
ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$,
v_mail_task_id, v_log_task_id
),
order_delta => 10
);
-- In the end we should have something like this. Note, that even Log task was created earlier it will be executed later
-- due to `task_order` column.
-- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order;
-- task_id | chain_id | task_order | kind | left
-- ---------+----------+------------+---------+---------------------------------------------------------------
-- 45 | 24 | 10 | BUILTIN | SendMail
-- 47 | 24 | 20 | SQL | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p
-- 46 | 24 | 30 | BUILTIN | Log
-- (3 rows)
END;
$$
LANGUAGE PLPGSQL;
Download, Transform and Import
This sample demonstrates how to create enhanced three-step chain with parameters. It uses DO statement to directly update the timetable schema tables.
-- Prepare the destination table 'location'
CREATE TABLE IF NOT EXISTS public.city(
city text,
lat numeric,
lng numeric,
country text,
iso2 text,
admin_name text,
capital text,
population bigint,
population_proper bigint);
GRANT ALL ON public.city TO scheduler;
-- An enhanced example consisting of three tasks:
-- 1. Download text file from internet using BUILT-IN command
-- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL extension)
-- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`)
DO $$
DECLARE
v_task_id bigint;
v_chain_id bigint;
BEGIN
-- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
INSERT INTO timetable.chain (chain_name, live)
VALUES ('Download locations and aggregate', TRUE)
RETURNING chain_id INTO v_chain_id;
-- Step 1. Download file from the server
-- Create the chain
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE)
RETURNING task_id INTO v_task_id;
-- Create the parameters for the step 1:
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1,
'{
"workersnum": 1,
"fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"],
"destpath": "."
}'::jsonb);
RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, v_task_id;
-- Step 2. Transform Unicode characters into ASCII
-- Create the program task to call 'uconv' and name it 'unaccent'
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent')
RETURNING task_id INTO v_task_id;
-- Create the parameters for the 'unaccent' task. Input and output files in this case
-- Under Windows we should call PowerShell instead of "uconv" with command:
-- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '')
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb);
RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id;
-- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command
INSERT INTO timetable.task (chain_id, task_order, kind, command)
VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile')
RETURNING task_id INTO v_task_id;
-- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt'
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.csv" }'::jsonb);
RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id;
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv')
RETURNING task_id INTO v_task_id;
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb);
RAISE NOTICE 'Step 4 completed. Cleanup task added with ID: %', v_task_id;
END;
$$ LANGUAGE PLPGSQL;
Run tasks in autonomous transaction
This sample demonstrates how to run special tasks out of chain transaction context. This is useful for special routines and/or non-transactional operations, e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
-- An advanced example showing how to use atutonomous tasks.
-- This one-task chain will execute test_proc() procedure.
-- Since procedure will make two commits (after f1() and f2())
-- we cannot use it as a regular task, because all regular tasks
-- must be executed in the context of a single chain transaction.
-- Same rule applies for some other SQL commands,
-- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$
BEGIN
RAISE notice '%', msg;
END;
$$ LANGUAGE PLPGSQL;
CREATE OR REPLACE PROCEDURE test_proc () AS $$
BEGIN
PERFORM f('hey 1');
COMMIT;
PERFORM f('hey 2');
COMMIT;
END;
$$
LANGUAGE PLPGSQL;
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct)
VALUES (
'call proc() every 10 sec', -- chain_name,
'@every 10 seconds', -- run_at,
1, -- max_instances,
TRUE, -- live,
FALSE -- self_destruct
) RETURNING chain_id
),
cte_task(v_task_id) AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE
FROM cte_chain
RETURNING task_id
)
SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;
Shutdown the scheduler and terminate the session
This sample demonstrates how to shutdown the scheduler using special built-in task. This can be used to control maintenance windows, to restart the scheduler for update purposes, or to stop session before the database should be dropped.
-- This one-task chain (aka job) will terminate pg_timetable session.
-- This is useful for maintaining purposes or before database being destroyed.
-- One should take care of restarting pg_timetable if needed.
SELECT timetable.add_job (
job_name => 'Shutdown pg_timetable session on schedule',
job_schedule => '* * 1 * *',
job_command => 'Shutdown',
job_kind => 'BUILTIN'
);
Access previous task result code and output from the next task
This sample demonstrates how to check the result code and output of a previous task. If the last task failed, that is possible only if ignore_error boolean = true is set for that task. Otherwise, a scheduler will stop the chain. This sample shows how to calculate failed, successful, and the total number of tasks executed. Based on these values, we can calculate the success ratio.
WITH
cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live)
VALUES ('many tasks', '* * * * *', 1, true)
RETURNING chain_id
),
cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true
FROM cte_chain, generate_series(1, 500) AS g(s)
RETURNING task_id
),
report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic
INSERT INTO timetable.task (chain_id, task_order, kind, command)
SELECT v_chain_id, 501, 'SQL', $CMD$DO
$$
DECLARE
s TEXT;
BEGIN
WITH report AS (
SELECT
count(*) FILTER (WHERE returncode = 0) AS success,
count(*) FILTER (WHERE returncode != 0) AS fail,
count(*) AS total
FROM timetable.execution_log
WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint
AND txid = txid_current()
)
SELECT 'Tasks executed:' || total ||
'; succeeded: ' || success ||
'; failed: ' || fail ||
'; ratio: ' || 100.0*success/GREATEST(total,1)
INTO s
FROM report;
RAISE NOTICE '%', s;
END;
$$
$CMD$
FROM cte_chain
RETURNING task_id
)
SELECT v_chain_id FROM cte_chain