ストアドプロシージャの Python ハンドラーの例¶
ワーカープロセスによる並行タスクの実行¶
Pythonワーカープロセスを使用して、並行タスクを実行することができます。これは、ウェアハウスノードの複数の CPU コアを活用した並列タスクを実行する必要がある場合に役立つ可能性があります。
注釈
Snowflakeは、組み込みのPythonのマルチプロセスモジュールは使用しないことを推奨しています。
Pythonグローバルインタープリターロック によって、マルチタスクのアプローチが CPU のすべてのコアに渡ってスケーリングできない場合に対処するために、スレッドではなく、別々のワーカープロセスを使って並行タスクを実行することができます。
Snowflakeウェアハウスでは、次の例のように、 joblib
ライブラリの Parallel
クラスを使用してこれを実行できます。
CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'joblib_multiprocessing'
PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt
def joblib_multiprocessing(session, i):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
return str(result)
$$;
注釈
joblib.Parallel
に使用されるデフォルトのバックエンドは、Snowflake標準のウェアハウスとSnowparkに最適されたウェアハウスで異なります。
標準ウェアハウスのデフォルト:
threading
Snowparkに最適化されたウェアハウスのデフォルト:
loky
(マルチプロセス)
次の例のように、 joblib.parallel_backend
関数を呼び出すと、デフォルトのバックエンド設定を上書きできます。
import joblib
joblib.parallel_backend('loky')
Snowpark APIs を使った非同期処理¶
以下の例では、Snowpark APIs を使用して非同期の子ジョブを開始する方法と、さまざまな条件下でのジョブの動作について説明します。
非同期子ジョブのステータスチェック¶
以下の例では、 checkStatus
プロシージャが60秒待機する非同期子ジョブを実行します。このプロシージャは、ジョブが終了する前にそのステータスをチェックするため、 False
を返します。
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
return async_job.is_done()
$$;
次のコードは、このプロシージャを呼び出します。
CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False |
+-------------+
非同期の子ジョブのキャンセル¶
以下の例では、 cancelJob
プロシージャは、SQL を使用して test_tb
テーブルにデータを挿入し、終了までに10秒かかる非同期子ジョブを実行します。そして、子ジョブが終了してデータが挿入される前に、子ジョブをキャンセルします。
CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
return async_job.cancel()
$$;
CALL cancelJob();
次のコードは test_tb
テーブルにクエリしますが、データが挿入されていないため結果は返されません。
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+
非同期子ジョブの実行中の待機とブロック¶
次の例では、 blockUntilDone
プロシージャが終了までに5秒かかる非同期の子ジョブを実行します。 snowflake.snowpark.AsyncJob.result
メソッドを使用すると、プロシージャは待機し、ジョブが終了すると戻ります。
CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(5)").collect_nowait()
return async_job.result()
$$;
次のコードでは、 blockUntilDone
プロシージャを呼び出し、5秒待って返します。
CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+
未完了の非同期子ジョブの結果をリクエストした後にエラーを返す¶
以下の例では、 earlyReturn
プロシージャが、終了までに60秒かかる非同期子ジョブを実行します。このプロシージャは、ジョブが終了する前に、ジョブの結果から DataFrame
を返そうとします。結果はエラーです。
CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
df = async_job.to_df()
try:
df.collect()
except Exception as ex:
return 'Error: (02000): Result for query <UUID> has expired'
$$;
次のコードは、 earlyReturn
プロシージャを呼び出し、エラーを返します。
CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired |
+------------------------------------------------------------+
子ジョブが終了する前に親ジョブが終了すると、子ジョブがキャンセルされる¶
以下の例では、 earlyCancelJob
プロシージャがテーブルにデータを挿入する非同期子ジョブを実行し、終了までに10秒かかります。しかし、親ジョブ --- async_handler
--- は子ジョブが終了する前に戻り、子ジョブはキャンセルされます。
CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
次のコードは earlyCancelJob
プロシージャを呼び出します。その後、 test_tb
テーブルをクエリしますが、キャンセルされた子ジョブによってデータが挿入されていないため、結果は返されません。
CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+