Python-Handler-Beispiele für gespeicherte Prozeduren¶
Ausführen von gleichzeitigen Aufgaben mit Worker-Prozessen¶
Sie können gleichzeitige Aufgaben mithilfe von Python-Worker-Prozessen ausführen. Dies kann nützlich sein, wenn Sie parallele Aufgaben ausführen müssen, die die Vorteile mehrerer CPU-Kerne auf den Warehouse-Knoten ausnutzen.
Bemerkung
Snowflake empfiehlt, das integrierte Python-Multiprocessing-Modul nicht zu verwenden.
Um Fälle zu umgehen, in denen die Python Global Interpreter Lock verhindert, dass ein Multitasking-Ansatz über alle CPU-Kerne skaliert, können Sie gleichzeitige Aufgaben über separate Worker-Prozesse statt über Threads ausführen.
Sie können dies bei Snowflake-Warehouses tun, indem Sie die Klasse Parallel
der Bibliothek joblib
verwenden, wie in dem folgenden Beispiel.
CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'joblib_multiprocessing'
PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt
def joblib_multiprocessing(session, i):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
return str(result)
$$;
Bemerkung
Das für joblib.Parallel
verwendete Standard-Backend unterscheidet zwischen Standard- und Snowpark-optimierten Snowflake-Warehouses.
Standardeinstellung für Standard-Warehouses:
threading
Standardeinstellung für Snowpark-optimierte Warehouses:
loky
(Multiprocessing)
Sie können die Standardeinstellung des Backends außer Kraft setzen, indem Sie die Funktion joblib.parallel_backend
aufrufen, wie im folgenden Beispiel.
import joblib
joblib.parallel_backend('loky')
Verwenden von Snowpark-APIs zur asynchronen Verarbeitung¶
Die folgenden Beispiele zeigen, wie Sie Snowpark-APIs verwenden können, um asynchrone untergeordnete Jobs zu starten, und wie sich diese Jobs unter verschiedenen Bedingungen verhalten.
Prüfen des Status eines asynchronen untergeordneten Jobs¶
Im folgenden Beispiel führt die Prozedur checkStatus
einen asynchronen untergeordneten Job aus, der 60 Sekunden wartet. Die Prozedur prüft dann den Status des Jobs, bevor dieser beendet sein kann, sodass die Prüfung False
zurückgibt.
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
return async_job.is_done()
$$;
Der folgende Code ruft die Prozedur auf.
CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False |
+-------------+
Abbruch eines asynchronen untergeordneten Jobs¶
Im folgenden Beispiel verwendet die Prozedur cancelJob
SQL, um Daten mit einem asynchronen untergeordneten Job in die Tabelle test_tb
einzufügen, der 10 Sekunden bis zur Fertigstellung benötigen würde. Der untergeordnete Job wird dann abgebrochen, bevor er beendet ist und die Daten eingefügt wurden.
CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
return async_job.cancel()
$$;
CALL cancelJob();
Der folgende Code fragt die Tabelle test_tb
ab, liefert aber keine Ergebnisse, da keine Daten eingefügt wurden.
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+
Warten und Blockieren, während ein asynchroner untergeordneter Job ausgeführt wird¶
Im folgenden Beispiel führt die Prozedur blockUntilDone
einen asynchronen untergeordneten Job aus, der 5 Sekunden bis zur Fertigstellung benötigt. Mit der Methode snowflake.snowpark.AsyncJob.result
wartet die Prozedur und kehrt zurück, wenn der Job beendet ist.
CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(5)").collect_nowait()
return async_job.result()
$$;
Der folgende Code ruft die Prozedur blockUntilDone
auf, die nach 5 Sekunden Wartezeit zurückkehrt.
CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+
Zurückgeben eines Fehlers nach Abfrage von Ergebnissen aus einem nicht beendeten asynchronen untergeordneten Job¶
Im folgenden Beispiel führt die Prozedur earlyReturn
einen asynchronen untergeordneten Job aus, der 60 Sekunden bis zur Fertigstellung benötigt. Die Prozedur versucht dann, einen DataFrame
aus dem Ergebnis des Jobs zurückzugeben, bevor dieser beendet sein kann. Das Ergebnis ist ein Fehler.
CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
df = async_job.to_df()
try:
df.collect()
except Exception as ex:
return 'Error: (02000): Result for query <UUID> has expired'
$$;
Der folgende Code ruft die Prozedur earlyReturn
auf und gibt den Fehler zurück.
CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired |
+------------------------------------------------------------+
Beenden eines übergeordneten Jobs vor Beendigung eines untergeordneten Jobs, Abbruch des untergeordneten Jobs¶
Im folgenden Beispiel führt die Prozedur earlyCancelJob
einen asynchronen untergeordneten Job zum Einfügen von Daten in eine Tabelle aus und benötigt 10 Sekunden bis zur Fertigstellung. Der übergeordnete Job async_handler
kehrt jedoch zurück, bevor der untergeordnete Job beendet ist, wodurch der untergeordnete Job abgebrochen wird.
CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Der folgende Code ruft die Prozedur earlyCancelJob
auf. Er fragt dann die Tabelle test_tb
ab, die kein Ergebnis liefert, da der abgebrochene untergeordnete Job keine Daten eingefügt hat.
CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+