Skip to content

Commit 2b3ac3a

Browse files
authored
Remove parade from code (treeverse#1389)
1 parent 8a11945 commit 2b3ac3a

File tree

13 files changed

+217
-1959
lines changed

13 files changed

+217
-1959
lines changed

api/controller.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/treeverse/lakefs/graveler"
3636
"github.com/treeverse/lakefs/httputil"
3737
"github.com/treeverse/lakefs/logging"
38-
"github.com/treeverse/lakefs/parade"
3938
"github.com/treeverse/lakefs/permissions"
4039
"github.com/treeverse/lakefs/stats"
4140
"github.com/treeverse/lakefs/upload"
@@ -56,7 +55,6 @@ type Dependencies struct {
5655
Cataloger catalog.Cataloger
5756
Auth auth.Service
5857
BlockAdapter block.Adapter
59-
Parade parade.Parade
6058
MetadataManager auth.MetadataManager
6159
Migrator db.Migrator
6260
Collector stats.Collector
@@ -69,7 +67,6 @@ func (d *Dependencies) WithContext(ctx context.Context) *Dependencies {
6967
Cataloger: d.Cataloger,
7068
Auth: d.Auth,
7169
BlockAdapter: d.BlockAdapter.WithContext(ctx),
72-
Parade: d.Parade,
7370
MetadataManager: d.MetadataManager,
7471
Migrator: d.Migrator,
7572
Collector: d.Collector,

api/serve_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ func setupHandler(t testing.TB, blockstoreType string, opts ...testutil.GetDBOpt
9191
Cataloger: cataloger,
9292
Auth: authService,
9393
BlockAdapter: blockAdapter,
94-
Parade: nil,
9594
MetadataManager: meta,
9695
Migrator: migrator,
9796
Collector: &nullCollector{},

cmd/lakefs/cmd/run.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/treeverse/lakefs/gateway/simulator"
2929
"github.com/treeverse/lakefs/httputil"
3030
"github.com/treeverse/lakefs/logging"
31-
"github.com/treeverse/lakefs/parade"
3231
"github.com/treeverse/lakefs/stats"
3332
)
3433

@@ -95,8 +94,6 @@ var runCmd = &cobra.Command{
9594
// update health info with installation ID
9695
httputil.SetHealthHandlerInfo(metadata.InstallationID)
9796

98-
// parade
99-
paradeDB := parade.NewParadeDB(dbPool.Pool())
10097
defer func() {
10198
_ = cataloger.Close()
10299
}()
@@ -110,7 +107,6 @@ var runCmd = &cobra.Command{
110107
Cataloger: cataloger,
111108
Auth: authService,
112109
BlockAdapter: blockStore,
113-
Parade: paradeDB,
114110
MetadataManager: authMetadataManager,
115111
Migrator: migrator,
116112
Collector: bufferedCollector,

ddl/000022_drop_parade.down.sql

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
BEGIN;
2+
3+
CREATE EXTENSION IF NOT EXISTS pgcrypto SCHEMA public CASCADE;
4+
5+
CREATE TYPE task_status_code_value AS ENUM (
6+
'pending', -- waiting for an actor to perform it (new or being retried)
7+
'in-progress', -- task is being performed by an actor
8+
'aborted', -- an actor has aborted this task with message, will not be reissued
9+
'completed' -- an actor has completed this task with message, will not reissued
10+
);
11+
12+
CREATE TABLE IF NOT EXISTS tasks (
13+
id VARCHAR NOT NULL PRIMARY KEY, -- nanoid
14+
15+
action VARCHAR NOT NULL, -- name (type) of action to perform
16+
body TEXT, -- data used by action
17+
status TEXT, -- status text defined by action, visible to action
18+
19+
status_code task_status_code_value NOT NULL DEFAULT 'pending', -- internal status code, used by parade to issue tasks
20+
21+
num_tries INTEGER NOT NULL DEFAULT 0, -- number of attempts actors have made on this task
22+
num_failures INTEGER NOT NULL DEFAULT 0, -- number of those attempts that failed
23+
max_tries INTEGER,
24+
25+
total_dependencies INTEGER, -- number of tasks that must signal this task
26+
num_signals INTEGER NOT NULL DEFAULT 0, -- number of tasks that have already signalled this task
27+
28+
actor_id VARCHAR, -- ID of performing actor if in-progress
29+
action_deadline TIMESTAMPTZ, -- offer this task to other actors once action_deadline has elapsed
30+
performance_token UUID,
31+
32+
-- BUG(ariels): add REFERENCES dependency to each of the to_signal_after
33+
-- tasks. Or at least add triggers that perform ON DELETE
34+
-- CASCADE.
35+
to_signal_after VARCHAR ARRAY, -- IDs to signal after performing this task
36+
notify_channel_after VARCHAR -- (if non-NULL) name of a channel to NOTIFY when this task ends
37+
);
38+
39+
-- Returns true if task with this id, code and deadline can
40+
-- be allocated.
41+
CREATE OR REPLACE FUNCTION can_allocate_task(id VARCHAR, code task_status_code_value, deadline TIMESTAMPTZ, num_signals INTEGER, total_dependencies INTEGER)
42+
RETURNS BOOLEAN
43+
LANGUAGE sql IMMUTABLE AS $$
44+
SELECT (code = 'pending' OR (code = 'in-progress' AND deadline < NOW())) AND
45+
(total_dependencies IS NULL OR num_signals = total_dependencies)
46+
$$;
47+
48+
-- Marks up to `max_tasks' on one of `actions' as in-progress and
49+
-- belonging to `actor_id' and returns their ids and a "performance
50+
-- token". Both must be returned to complete the task successfully.
51+
CREATE OR REPLACE FUNCTION own_tasks(
52+
max_tasks INTEGER, actions VARCHAR ARRAY, owner_id VARCHAR, max_duration INTERVAL
53+
)
54+
RETURNS TABLE(task_id VARCHAR, token UUID, num_failures INTEGER, action VARCHAR, body TEXT)
55+
LANGUAGE sql VOLATILE AS $$
56+
UPDATE tasks
57+
SET actor_id = owner_id,
58+
status_code = 'in-progress',
59+
num_tries = num_tries + 1,
60+
performance_token = public.gen_random_uuid(),
61+
action_deadline = NOW() + max_duration -- NULL if max_duration IS NULL
62+
WHERE id IN (
63+
SELECT id
64+
FROM tasks
65+
WHERE can_allocate_task(id, status_code, action_deadline, num_signals, total_dependencies) AND
66+
action = ANY(actions) AND
67+
(max_tries IS NULL OR num_tries < max_tries)
68+
-- maybe: AND not_before <= NOW()
69+
-- maybe: ORDER BY priority (eventually)
70+
ORDER BY random()
71+
FOR UPDATE SKIP LOCKED
72+
LIMIT max_tasks)
73+
RETURNING id, performance_token, num_failures, action, body
74+
$$;
75+
76+
-- Extends ownership of task id by an extra max_duration, if it is still locked with performance
77+
-- token.
78+
CREATE OR REPLACE FUNCTION extend_task_deadline(
79+
task_id VARCHAR, token UUID, max_duration INTERVAL
80+
) RETURNS BOOLEAN
81+
LANGUAGE sql VOLATILE AS $$
82+
UPDATE tasks
83+
SET action_deadline = NOW() + max_duration
84+
WHERE id = task_id AND performance_token = token
85+
RETURNING true;
86+
$$;
87+
88+
CREATE OR REPLACE FUNCTION no_more_tries(r tasks)
89+
RETURNS BOOLEAN LANGUAGE sql IMMUTABLE AS $$
90+
SELECT $1.num_tries >= $1.max_tries
91+
$$;
92+
93+
-- Returns an owned task id that was locked with token. It is an error
94+
-- to return a task with the wrong token; that can happen if the
95+
-- deadline expired and the task was given to another actor.
96+
CREATE OR REPLACE FUNCTION return_task(
97+
task_id VARCHAR, token UUID, result_status TEXT, result_status_code task_status_code_value
98+
) RETURNS INTEGER
99+
LANGUAGE plpgsql AS $$
100+
DECLARE
101+
num_updated INTEGER;
102+
channel VARCHAR;
103+
to_signal VARCHAR ARRAY;
104+
105+
106+
107+
BEGIN
108+
CASE result_status_code
109+
WHEN 'aborted', 'completed' THEN
110+
UPDATE tasks INTO channel, to_signal
111+
SET status = result_status,
112+
status_code = result_status_code,
113+
actor_id = NULL,
114+
performance_token = NULL
115+
WHERE id = task_id AND performance_token = token
116+
RETURNING notify_channel_after, to_signal_after;
117+
WHEN 'pending' THEN
118+
UPDATE tasks INTO channel, to_signal
119+
SET status = result_status,
120+
status_code = (CASE WHEN no_more_tries(tasks) THEN 'aborted' ELSE 'pending' END)::task_status_code_value,
121+
actor_id = NULL,
122+
performance_token = NULL
123+
WHERE id = task_id AND performance_token = token
124+
RETURNING (CASE WHEN no_more_tries(tasks) THEN notify_channel_after ELSE NULL END),
125+
(CASE WHEN no_more_tries(tasks) THEN to_signal_after ELSE NULL END);
126+
ELSE
127+
RAISE EXCEPTION 'cannot return task to status %', result_status;
128+
END CASE;
129+
130+
GET DIAGNOSTICS num_updated := ROW_COUNT;
131+
132+
UPDATE tasks
133+
SET num_signals = num_signals+1,
134+
num_failures = num_failures + CASE WHEN result_status_code = 'aborted'::task_status_code_value THEN 1 ELSE 0 END
135+
WHERE id = ANY(to_signal);
136+
137+
IF channel IS NOT NULL THEN
138+
PERFORM pg_notify(channel, NULL);
139+
END IF;
140+
141+
RETURN num_updated;
142+
END;
143+
$$;
144+
145+
-- (Utility for delete_task function: remove all dependencies from task ID, returning ids of any
146+
-- tasks with no remaining dependencies.)
147+
CREATE OR REPLACE FUNCTION remove_task_dependencies(task_id VARCHAR)
148+
RETURNS SETOF VARCHAR
149+
LANGUAGE sql VOLATILE AS $$
150+
WITH updates AS (
151+
SELECT UNNEST(to_signal_after) effect_id,
152+
(CASE WHEN status_code IN ('aborted', 'completed') THEN 0 ELSE 1 END) delta
153+
FROM tasks
154+
WHERE id=task_id),
155+
signalled_ids AS (
156+
UPDATE tasks
157+
SET total_dependencies = total_dependencies-updates.delta
158+
FROM updates
159+
WHERE id=updates.effect_id
160+
RETURNING (CASE WHEN total_dependencies = num_signals THEN tasks.id ELSE NULL END) id
161+
)
162+
SELECT id FROM signalled_ids WHERE id IS NOT NULL;
163+
$$;
164+
165+
CREATE TYPE tasks_recurse_value AS ENUM ('new', 'in-progress', 'done');
166+
167+
-- Deletes taskIds from column id of task_id_name (with columns id (an ID) and mark (a
168+
-- recurse_value), presumably a temporary table) and empties it, decrements each of its
169+
-- dependent tasks, and deletes that task (effectively recursively) if it has no further
170+
-- dependencies. Uses table tasks for storage of to-be-deleted tasks during the operation.
171+
-- Returns the total number of tasks deleted. No abort marking is performed -- make sure to
172+
-- abort the task first!
173+
CREATE OR REPLACE FUNCTION delete_tasks(task_id_name TEXT) RETURNS VOID LANGUAGE plpgsql AS $$
174+
DECLARE
175+
total_num_updated INTEGER;
176+
num_updated INTEGER;
177+
row_count INTEGER;
178+
BEGIN
179+
LOOP
180+
EXECUTE format($Q$
181+
UPDATE %1$I SET mark='in-progress' WHERE mark='new'
182+
$Q$, task_id_name);
183+
EXECUTE format($Q$
184+
WITH new_to_delete AS (
185+
SELECT remove_task_dependencies(id) id FROM %1$I WHERE mark='in-progress'
186+
)
187+
INSERT INTO %1$I (SELECT id, 'new' mark FROM new_to_delete)
188+
$Q$, task_id_name);
189+
GET DIAGNOSTICS row_count = ROW_COUNT;
190+
EXIT WHEN row_count=0;
191+
EXECUTE format($Q$
192+
UPDATE %1$I SET mark='done' WHERE mark='in-progress'
193+
$Q$, task_id_name);
194+
END LOOP;
195+
EXECUTE format($Q$
196+
DELETE FROM tasks WHERE id IN (SELECT id FROM %1$I)
197+
$Q$, task_id_name);
198+
END;
199+
$$;
200+
201+
END;

ddl/000022_drop_parade.up.sql

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
BEGIN;
2+
3+
DROP FUNCTION IF EXISTS delete_tasks;
4+
DROP TYPE IF EXISTS tasks_recurse_value;
5+
DROP FUNCTION IF EXISTS remove_task_dependencies;
6+
DROP FUNCTION IF EXISTS return_task;
7+
DROP FUNCTION IF EXISTS no_more_tries;
8+
DROP FUNCTION IF EXISTS extend_task_deadline;
9+
DROP FUNCTION IF EXISTS own_tasks;
10+
DROP FUNCTION IF EXISTS can_allocate_task;
11+
DROP TABLE IF EXISTS tasks;
12+
DROP TYPE IF EXISTS task_status_code_value;
13+
14+
END;

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@ require (
4545
github.com/jackc/pgconn v1.8.0
4646
github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
4747
github.com/jackc/pgproto3/v2 v2.0.6
48-
github.com/jackc/pgtype v1.6.2
48+
github.com/jackc/pgtype v1.6.2 // indirect
4949
github.com/jackc/pgx/v4 v4.10.1
5050
github.com/jamiealquiza/tachymeter v2.0.0+incompatible
5151
github.com/jedib0t/go-pretty v4.3.0+incompatible
5252
github.com/jessevdk/go-flags v1.4.0
5353
github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc
5454
github.com/kr/pretty v0.2.1 // indirect
55-
github.com/lib/pq v1.9.0
55+
github.com/lib/pq v1.9.0 // indirect
5656
github.com/lunixbochs/vtclean v1.0.0 // indirect
5757
github.com/manifoldco/promptui v0.8.0
5858
github.com/matoous/go-nanoid v1.5.0

loadtest/local_load_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ func TestLocalLoad(t *testing.T) {
7373
Cataloger: cataloger,
7474
Auth: authService,
7575
BlockAdapter: blockAdapter,
76-
Parade: nil,
7776
MetadataManager: meta,
7877
Migrator: migrator,
7978
Collector: &nullCollector{},

0 commit comments

Comments
 (0)