Exemples de gestionnaires Python pour les procédures stockées

Exécution de tâches simultanées à l’aide de processus de tâches worker

Vous pouvez exécuter des tâches simultanées à l’aide de processus de travail Python. Cela peut s’avérer utile lorsque vous devez exécuter des tâches parallèles qui tirent parti de plusieurs cœurs CPU sur les nœuds de l’entrepôt.

Note

Snowflake vous recommande de ne pas utiliser le module de multitraitement intégré à Python.

Pour contourner les cas où le Python Global Interpreter Lock empêche une approche multitâche de s’étendre à tous les cœurs CPU, vous pouvez exécuter des tâches concurrentes à l’aide de processus de travail distincts, plutôt que de threads.

Vous pouvez le faire sur des entrepôts Snowflake en utilisant la classe Parallel de la bibliothèque joblib comme dans l’exemple suivant.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

Note

Le backend par défaut utilisé pour joblib.Parallel diffère entre les entrepôts standards de Snowflake et les entrepôts optimisés pour Snowpark.

  • Valeur par défaut de l’entrepôt standard : threading

  • Valeur par défaut de l’entrepôt optimisé pour Snowpark : loky (multitraitement)

Vous pouvez remplacer le paramètre de backend par défaut en appelant la fonction joblib.parallel_backend comme dans l’exemple suivant.

import joblib
joblib.parallel_backend('loky')
Copy

Utilisation d’APIs Snowpark pour le traitement asynchrone

Les exemples suivants illustrent la manière dont vous pouvez utiliser des APIs Snowpark pour lancer des tâches enfant asynchrones, ainsi que le comportement de ces tâches dans différentes conditions.

Vérification du statut d’un job enfant asynchrone

Dans l’exemple suivant, la procédure checkStatus exécute une tâche enfant asynchrone qui attend 60 secondes. La procédure vérifie ensuite le statut de la tâche avant qu’elle ne soit terminée, de sorte que la vérification renvoie False.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;
Copy

Le code suivant appelle la procédure.

CALL checkStatus();
Copy
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

Annulation d’un job enfant asynchrone

Dans l’exemple suivant, la procédure cancelJob utilise SQL pour insérer des données dans la table test_tb avec une tâche enfant asynchrone qui prendrait 10 secondes pour se terminer. Elle annule ensuite la tâche enfant avant qu’elle ne soit terminée et que les données n’aient été insérées.

CREATE OR REPLACE TABLE test_tb(c1 STRING);
Copy
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();
Copy

Le code suivant interroge la table test_tb, mais ne renvoie aucun résultat car aucune donnée n’a été insérée.

SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+

Attente et blocage pendant l’exécution d’un job enfant asynchrone

Dans l’exemple suivant, la procédure blockUntilDone exécute une tâche enfant asynchrone qui prend 5 secondes pour se terminer. En utilisant la méthode snowflake.snowpark.AsyncJob.result, la procédure attend et revient lorsque la tâche est terminée.

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;
Copy

Le code suivant appelle la procédure blockUntilDone, qui revient après 5 secondes d’attente.

CALL blockUntilDone();
Copy
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

Renvoi d’une erreur après la requête de résultats d’un job enfant asynchrone non terminé

Dans l’exemple suivant, la procédure earlyReturn exécute une tâche enfant asynchrone qui prend 60 secondes pour se terminer. La procédure tente alors de renvoyer un DataFrame à partir du résultat de la tâche avant qu’elle n’ait pu se terminer. Le résultat est une erreur.

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;
Copy

Le code suivant appelle la procédure earlyReturn et renvoie l’erreur.

CALL earlyReturn();
Copy
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

Fin d’un job parent avant la fin d’un job enfant, annulation du job enfant

Dans l’exemple suivant, la procédure earlyCancelJob exécute une tâche enfant asynchrone pour insérer des données dans une table et prend 10 secondes pour se terminer. Cependant, la tâche parent — async_handler — revient avant que la tâche enfant ne se termine, ce qui annule la tâche enfant.

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Copy

Le code suivant appelle la procédure earlyCancelJob. Il interroge ensuite la table test_tb, qui ne renvoie aucun résultat car aucune donnée n’a été insérée par la tâche enfant annulée.

CALL earlyCancelJob();
SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+