Exemples de gestionnaires Python pour les procédures stockées¶
Exécution de tâches simultanées à l’aide de processus de tâches worker¶
Vous pouvez exécuter des tâches simultanées à l’aide de processus de travail Python. Cela peut s’avérer utile lorsque vous devez exécuter des tâches parallèles qui tirent parti de plusieurs cœurs CPU sur les nœuds de l’entrepôt.
Note
Snowflake vous recommande de ne pas utiliser le module de multitraitement intégré à Python.
Pour contourner les cas où le Python Global Interpreter Lock empêche une approche multitâche de s’étendre à tous les cœurs CPU, vous pouvez exécuter des tâches concurrentes à l’aide de processus de travail distincts, plutôt que de threads.
Vous pouvez le faire sur des entrepôts Snowflake en utilisant la classe Parallel
de la bibliothèque joblib
comme dans l’exemple suivant.
CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'joblib_multiprocessing'
PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt
def joblib_multiprocessing(session, i):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
return str(result)
$$;
Note
Le backend par défaut utilisé pour joblib.Parallel
diffère entre les entrepôts standards de Snowflake et les entrepôts optimisés pour Snowpark.
Valeur par défaut de l’entrepôt standard :
threading
Valeur par défaut de l’entrepôt optimisé pour Snowpark :
loky
(multitraitement)
Vous pouvez remplacer le paramètre de backend par défaut en appelant la fonction joblib.parallel_backend
comme dans l’exemple suivant.
import joblib
joblib.parallel_backend('loky')
Utilisation d’APIs Snowpark pour le traitement asynchrone¶
Les exemples suivants illustrent la manière dont vous pouvez utiliser des APIs Snowpark pour lancer des tâches enfant asynchrones, ainsi que le comportement de ces tâches dans différentes conditions.
Vérification du statut d’un job enfant asynchrone¶
Dans l’exemple suivant, la procédure checkStatus
exécute une tâche enfant asynchrone qui attend 60 secondes. La procédure vérifie ensuite le statut de la tâche avant qu’elle ne soit terminée, de sorte que la vérification renvoie False
.
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
return async_job.is_done()
$$;
Le code suivant appelle la procédure.
CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False |
+-------------+
Annulation d’un job enfant asynchrone¶
Dans l’exemple suivant, la procédure cancelJob
utilise SQL pour insérer des données dans la table test_tb
avec une tâche enfant asynchrone qui prendrait 10 secondes pour se terminer. Elle annule ensuite la tâche enfant avant qu’elle ne soit terminée et que les données n’aient été insérées.
CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
return async_job.cancel()
$$;
CALL cancelJob();
Le code suivant interroge la table test_tb
, mais ne renvoie aucun résultat car aucune donnée n’a été insérée.
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+
Attente et blocage pendant l’exécution d’un job enfant asynchrone¶
Dans l’exemple suivant, la procédure blockUntilDone
exécute une tâche enfant asynchrone qui prend 5 secondes pour se terminer. En utilisant la méthode snowflake.snowpark.AsyncJob.result
, la procédure attend et revient lorsque la tâche est terminée.
CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(5)").collect_nowait()
return async_job.result()
$$;
Le code suivant appelle la procédure blockUntilDone
, qui revient après 5 secondes d’attente.
CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+
Renvoi d’une erreur après la requête de résultats d’un job enfant asynchrone non terminé¶
Dans l’exemple suivant, la procédure earlyReturn
exécute une tâche enfant asynchrone qui prend 60 secondes pour se terminer. La procédure tente alors de renvoyer un DataFrame
à partir du résultat de la tâche avant qu’elle n’ait pu se terminer. Le résultat est une erreur.
CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
df = async_job.to_df()
try:
df.collect()
except Exception as ex:
return 'Error: (02000): Result for query <UUID> has expired'
$$;
Le code suivant appelle la procédure earlyReturn
et renvoie l’erreur.
CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired |
+------------------------------------------------------------+
Fin d’un job parent avant la fin d’un job enfant, annulation du job enfant¶
Dans l’exemple suivant, la procédure earlyCancelJob
exécute une tâche enfant asynchrone pour insérer des données dans une table et prend 10 secondes pour se terminer. Cependant, la tâche parent — async_handler
— revient avant que la tâche enfant ne se termine, ce qui annule la tâche enfant.
CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Le code suivant appelle la procédure earlyCancelJob
. Il interroge ensuite la table test_tb
, qui ne renvoie aucun résultat car aucune donnée n’a été insérée par la tâche enfant annulée.
CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+