Skip to content

Commit 68e8bb7

Browse files
committed
Use advisory lock around load_message.py
Avoid loading two messages at the same time. In particular this can cause issues if it's two copies of the same message on different lists, which can cause a UNIQUE violation in the loader. It could also be a problem if two messages on a new thread arrives in parallel, which could cause two separate threads to be created. This could be made more efficient by properly ordering the operations on storage and using ON CONFLICT, but it's a very rare occassion and it doesn't matter that we have to wait for a second or two for a previous storage to complete.
1 parent a2a9033 commit 68e8bb7

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

loader/load_message.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,19 @@ def log_failed_message(listid, srctype, src, msg, err):
8585
connstr = 'need_connstr'
8686

8787
conn = psycopg2.connect(connstr)
88+
curs = conn.cursor()
89+
90+
# Take an advisory lock to force serialization.
91+
# We could do this "properly" by reordering operations and using ON CONFLICT,
92+
# but concurrency is not that important and this is easier...
93+
try:
94+
curs.execute("SET statement_timeout='30s'")
95+
curs.execute("SELECT pg_advisory_xact_lock(8059944559669076)")
96+
except Exception, e:
97+
print("Failed to wait on advisory lock: %s" % e)
98+
sys.exit(1)
8899

89100
# Get the listid we're working on
90-
curs = conn.cursor()
91101
curs.execute("SELECT listid FROM lists WHERE listname=%(list)s", {
92102
'list': opt.list
93103
})

0 commit comments

Comments
 (0)