0% found this document useful (0 votes)
6 views

functions

The document defines two PostgreSQL functions for managing data warehouse operations. The first function, 'dwh_dynamic_insert_update_delete', handles inserting, updating, and deleting records between source and target tables while logging the changes. The second function, 'process_job_queue', processes jobs from a queue, executing the first function for each pending job and updating their status accordingly.

Uploaded by

rafreenimam
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
6 views

functions

The document defines two PostgreSQL functions for managing data warehouse operations. The first function, 'dwh_dynamic_insert_update_delete', handles inserting, updating, and deleting records between source and target tables while logging the changes. The second function, 'process_job_queue', processes jobs from a queue, executing the first function for each pending job and updating their status accordingly.

Uploaded by

rafreenimam
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 3

-- DROP FUNCTION datawarehouse.

dwh_dynamic_insert_update_delete(text, text, text,


text, text, text);

CREATE OR REPLACE FUNCTION


datawarehouse.dwh_dynamic_insert_update_delete(source_schema text, source_table
text, target_schema text, target_table text, unique_key text, hash_column text)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
inserted_count integer;
updated_count integer;
deleted_count integer;
BEGIN
-- Generate a dynamic SQL statement for inserting new records
EXECUTE format('
INSERT INTO %I.%I
SELECT
st.*,
''Y'' AS wa_iscurr,
''N'' AS wa_isdel,
nextval(''datawarehouse.wk_root'')::bigint AS wk_root,
current_timestamp AS load_date
FROM
%I.%I AS st
LEFT JOIN
%I.%I AS dw ON st.%I = dw.%I
WHERE
dw.%I IS NULL;
', target_schema, target_table, source_schema, source_table, target_schema,
target_table, unique_key, unique_key, unique_key);

GET DIAGNOSTICS inserted_count = ROW_COUNT;


-- Generate a dynamic SQL statement for updating records and closing previous
versions
EXECUTE format('
UPDATE
%I.%I AS dw
SET
wa_iscurr = ''N''
FROM
%I.%I AS st
WHERE
dw.%I = st.%I
AND dw.%I <> st.%I
AND EXISTS (
SELECT 1
FROM
%I.%I AS inserted
WHERE
inserted.wa_iscurr = ''Y''
AND inserted.%I = st.%I
);
', target_schema, target_table, source_schema, source_table, unique_key,
unique_key, hash_column, hash_column, target_schema, target_table, unique_key,
unique_key);

-- Generate a dynamic SQL statement for inserting updated records


EXECUTE format('
INSERT INTO %I.%I
SELECT
st.*,
''Y'' AS wa_iscurr,
''N'' AS wa_isdel,
nextval(''datawarehouse.wk_root'')::bigint AS wk_root,
current_timestamp AS load_date
FROM
%I.%I AS st
left join
%I.%I as dw on dw.%I = st.%I
where dw.hash_column <> st.hash_column
;
', target_schema, target_table, source_schema, source_table, target_schema,
target_table, unique_key, unique_key, hash_column, hash_column);

GET DIAGNOSTICS updated_count = ROW_COUNT;


-- Generate a dynamic SQL statement for updating records that are deleted in
the staging
EXECUTE format('
UPDATE
%I.%I AS dw
SET
wa_isdel = ''Y'', wa_iscurr = ''N''
WHERE
NOT EXISTS (
SELECT 1
FROM
%I.%I AS inserted
WHERE
inserted.wa_iscurr = ''Y''
AND inserted.%I = dw.%I
);
', target_schema, target_table, target_schema, target_table, unique_key,
unique_key);

GET DIAGNOSTICS deleted_count = ROW_COUNT;


-- Insert the get diagnostics into a log table so we can always check what
happend regarding in amount of records. (table is log_gegevens)
INSERT INTO datawarehouse.log_gegevens (table_name, inserted_count,
updated_count, deleted_count, load_time)
VALUES (target_table, inserted_count, updated_count, deleted_count,
current_timestamp);

END;
$function$
;

-- DROP FUNCTION datawarehouse.process_job_queue();

CREATE OR REPLACE FUNCTION datawarehouse.process_job_queue()


RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
remaining_jobs INTEGER;
curr_job RECORD;
BEGIN
-- Set all jobs to 'Pending' if all are 'Completed' (to prepare for the next
run)
SELECT COUNT(*) INTO remaining_jobs FROM job_queue_dwh WHERE status =
'Completed'
and target_table like 'dwh_wender%';

IF remaining_jobs = 0 THEN
UPDATE job_queue_dwh SET status = 'Pending';
END IF;

LOOP
SELECT * FROM job_queue_dwh WHERE status = 'Pending' ORDER BY id LIMIT 1
FOR UPDATE SKIP LOCKED INTO curr_job;

IF NOT FOUND THEN


EXIT; -- Exit loop when no more 'Pending' jobs found
END IF;

-- Execute dwh_dynamic_insert_update_delete_2 function using job details


PERFORM
datawarehouse.dwh_dynamic_insert_update_delete_2(curr_job.source_schema,
curr_job.source_table, curr_job.target_schema, curr_job.target_table,
curr_job.unique_key, curr_job.hash_column, curr_job.unique_key2,
curr_job.unique_key3);

-- Update job status to indicate completion


UPDATE job_queue_dwh SET status = 'Completed' WHERE id = curr_job.id;
END LOOP;
END;
$function$
;

You might also like