Python-Handler-Beispiele für gespeicherte Prozeduren

Ausführen von gleichzeitigen Aufgaben mit Worker-Prozessen

Sie können gleichzeitige Aufgaben mithilfe von Python-Worker-Prozessen ausführen. Dies kann nützlich sein, wenn Sie parallele Aufgaben ausführen müssen, die die Vorteile mehrerer CPU-Kerne auf den Warehouse-Knoten ausnutzen.

Bemerkung

Snowflake empfiehlt, das integrierte Python-Multiprocessing-Modul nicht zu verwenden.

Um Fälle zu umgehen, in denen die Python Global Interpreter Lock verhindert, dass ein Multitasking-Ansatz über alle CPU-Kerne skaliert, können Sie gleichzeitige Aufgaben über separate Worker-Prozesse statt über Threads ausführen.

Sie können dies bei Snowflake-Warehouses tun, indem Sie die Klasse Parallel der Bibliothek joblib verwenden, wie in dem folgenden Beispiel.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

Bemerkung

Das für joblib.Parallel verwendete Standard-Backend unterscheidet zwischen Standard- und Snowpark-optimierten Snowflake-Warehouses.

  • Standardeinstellung für Standard-Warehouses: threading

  • Standardeinstellung für Snowpark-optimierte Warehouses: loky (Multiprocessing)

Sie können die Standardeinstellung des Backends außer Kraft setzen, indem Sie die Funktion joblib.parallel_backend aufrufen, wie im folgenden Beispiel.

import joblib
joblib.parallel_backend('loky')
Copy

Verwenden von Snowpark-APIs zur asynchronen Verarbeitung

Die folgenden Beispiele zeigen, wie Sie Snowpark-APIs verwenden können, um asynchrone untergeordnete Jobs zu starten, und wie sich diese Jobs unter verschiedenen Bedingungen verhalten.

Prüfen des Status eines asynchronen untergeordneten Jobs

Im folgenden Beispiel führt die Prozedur checkStatus einen asynchronen untergeordneten Job aus, der 60 Sekunden wartet. Die Prozedur prüft dann den Status des Jobs, bevor dieser beendet sein kann, sodass die Prüfung False zurückgibt.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;
Copy

Der folgende Code ruft die Prozedur auf.

CALL checkStatus();
Copy
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

Abbruch eines asynchronen untergeordneten Jobs

Im folgenden Beispiel verwendet die Prozedur cancelJob SQL, um Daten mit einem asynchronen untergeordneten Job in die Tabelle test_tb einzufügen, der 10 Sekunden bis zur Fertigstellung benötigen würde. Der untergeordnete Job wird dann abgebrochen, bevor er beendet ist und die Daten eingefügt wurden.

CREATE OR REPLACE TABLE test_tb(c1 STRING);
Copy
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();
Copy

Der folgende Code fragt die Tabelle test_tb ab, liefert aber keine Ergebnisse, da keine Daten eingefügt wurden.

SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+

Warten und Blockieren, während ein asynchroner untergeordneter Job ausgeführt wird

Im folgenden Beispiel führt die Prozedur blockUntilDone einen asynchronen untergeordneten Job aus, der 5 Sekunden bis zur Fertigstellung benötigt. Mit der Methode snowflake.snowpark.AsyncJob.result wartet die Prozedur und kehrt zurück, wenn der Job beendet ist.

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;
Copy

Der folgende Code ruft die Prozedur blockUntilDone auf, die nach 5 Sekunden Wartezeit zurückkehrt.

CALL blockUntilDone();
Copy
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

Zurückgeben eines Fehlers nach Abfrage von Ergebnissen aus einem nicht beendeten asynchronen untergeordneten Job

Im folgenden Beispiel führt die Prozedur earlyReturn einen asynchronen untergeordneten Job aus, der 60 Sekunden bis zur Fertigstellung benötigt. Die Prozedur versucht dann, einen DataFrame aus dem Ergebnis des Jobs zurückzugeben, bevor dieser beendet sein kann. Das Ergebnis ist ein Fehler.

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;
Copy

Der folgende Code ruft die Prozedur earlyReturn auf und gibt den Fehler zurück.

CALL earlyReturn();
Copy
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

Beenden eines übergeordneten Jobs vor Beendigung eines untergeordneten Jobs, Abbruch des untergeordneten Jobs

Im folgenden Beispiel führt die Prozedur earlyCancelJob einen asynchronen untergeordneten Job zum Einfügen von Daten in eine Tabelle aus und benötigt 10 Sekunden bis zur Fertigstellung. Der übergeordnete Job async_handler kehrt jedoch zurück, bevor der untergeordnete Job beendet ist, wodurch der untergeordnete Job abgebrochen wird.

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Copy

Der folgende Code ruft die Prozedur earlyCancelJob auf. Er fragt dann die Tabelle test_tb ab, die kein Ergebnis liefert, da der abgebrochene untergeordnete Job keine Daten eingefügt hat.

CALL earlyCancelJob();
SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+