ストアドプロシージャの Python ハンドラーの例

ワーカープロセスによる並行タスクの実行

Pythonワーカープロセスを使用して、並行タスクを実行することができます。これは、ウェアハウスノードの複数の CPU コアを活用した並列タスクを実行する必要がある場合に役立つ可能性があります。

注釈

Snowflakeは、組み込みのPythonのマルチプロセスモジュールは使用しないことを推奨しています。

Pythonグローバルインタープリターロック によって、マルチタスクのアプローチが CPU のすべてのコアに渡ってスケーリングできない場合に対処するために、スレッドではなく、別々のワーカープロセスを使って並行タスクを実行することができます。

Snowflakeウェアハウスでは、次の例のように、 joblib ライブラリの Parallel クラスを使用してこれを実行できます。

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

注釈

joblib.Parallel に使用されるデフォルトのバックエンドは、Snowflake標準のウェアハウスとSnowparkに最適されたウェアハウスで異なります。

  • 標準ウェアハウスのデフォルト: threading

  • Snowparkに最適化されたウェアハウスのデフォルト: loky (マルチプロセス)

次の例のように、 joblib.parallel_backend 関数を呼び出すと、デフォルトのバックエンド設定を上書きできます。

import joblib
joblib.parallel_backend('loky')
Copy

Snowpark APIs を使った非同期処理

以下の例では、Snowpark APIs を使用して非同期の子ジョブを開始する方法と、さまざまな条件下でのジョブの動作について説明します。

非同期子ジョブのステータスチェック

以下の例では、 checkStatus プロシージャが60秒待機する非同期子ジョブを実行します。このプロシージャは、ジョブが終了する前にそのステータスをチェックするため、 False を返します。

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;
Copy

次のコードは、このプロシージャを呼び出します。

CALL checkStatus();
Copy
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

非同期の子ジョブのキャンセル

以下の例では、 cancelJob プロシージャは、SQL を使用して test_tb テーブルにデータを挿入し、終了までに10秒かかる非同期子ジョブを実行します。そして、子ジョブが終了してデータが挿入される前に、子ジョブをキャンセルします。

CREATE OR REPLACE TABLE test_tb(c1 STRING);
Copy
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();
Copy

次のコードは test_tb テーブルにクエリしますが、データが挿入されていないため結果は返されません。

SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+

非同期子ジョブの実行中の待機とブロック

次の例では、 blockUntilDone プロシージャが終了までに5秒かかる非同期の子ジョブを実行します。 snowflake.snowpark.AsyncJob.result メソッドを使用すると、プロシージャは待機し、ジョブが終了すると戻ります。

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;
Copy

次のコードでは、 blockUntilDone プロシージャを呼び出し、5秒待って返します。

CALL blockUntilDone();
Copy
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

未完了の非同期子ジョブの結果をリクエストした後にエラーを返す

以下の例では、 earlyReturn プロシージャが、終了までに60秒かかる非同期子ジョブを実行します。このプロシージャは、ジョブが終了する前に、ジョブの結果から DataFrame を返そうとします。結果はエラーです。

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;
Copy

次のコードは、 earlyReturn プロシージャを呼び出し、エラーを返します。

CALL earlyReturn();
Copy
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

子ジョブが終了する前に親ジョブが終了すると、子ジョブがキャンセルされる

以下の例では、 earlyCancelJob プロシージャがテーブルにデータを挿入する非同期子ジョブを実行し、終了までに10秒かかります。しかし、親ジョブ --- async_handler --- は子ジョブが終了する前に戻り、子ジョブはキャンセルされます。

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Copy

次のコードは earlyCancelJob プロシージャを呼び出します。その後、 test_tb テーブルをクエリしますが、キャンセルされた子ジョブによってデータが挿入されていないため、結果は返されません。

CALL earlyCancelJob();
SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+