Exemplos de manipuladores Python para procedimentos armazenados

Como executar tarefas simultâneas com processos de trabalhador

Você pode executar tarefas simultâneas usando processos de trabalho do Python. Você pode achar isso útil quando precisar executar tarefas paralelas que aproveitam vários núcleos de CPU em nós do warehouse.

Nota

A Snowflake recomenda que você não use o módulo de multiprocessamento integrado do Python.

Para contornar os casos em que o Bloqueio de intérprete global do Python impede que uma abordagem multitarefa se espalhe por todos os núcleos da CPU, você pode executar tarefas simultâneas usando processos de trabalho separados, em vez de threads.

Você pode fazer isso nos warehouses Snowflake usando a classe joblib da biblioteca Parallel, como no exemplo a seguir.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

Nota

O back-end padrão usado para joblib.Parallel difere entre os warehouses padrão Snowflake e otimizados para Snowpark.

  • Padrão do warehouse: threading

  • Padrão do warehouse otimizado para Snowpark: loky (multiprocessamento)

Você pode substituir a configuração de back-end padrão chamando a função joblib.parallel_backend, como no exemplo a seguir.

import joblib
joblib.parallel_backend('loky')
Copy

Como usar APIs Snowpark para processamento assíncrono

Os exemplos a seguir ilustram como você pode usar as APIs Snowpark para iniciar trabalhos filho assíncronos, além de como esses trabalhos se comportam sob diferentes condições.

Verificação do status de um trabalho filho assíncrono

No exemplo a seguir, o procedimento checkStatus executa um trabalho filho assíncrono que aguarda 60 segundos. O procedimento então verifica o status do trabalho antes que ele possa ser concluído, então a verificação retorna False.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;
Copy

O código a seguir chama o procedimento.

CALL checkStatus();
Copy
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

Cancelamento de um trabalho filho assíncrono

No exemplo a seguir, o procedimento cancelJob usa SQL para inserir dados na tabela test_tb com um trabalho filho assíncrono que levaria 10 segundos para terminar. Em seguida, ele cancela o trabalho filho antes que ele termine e os dados tenham sido inseridos.

CREATE OR REPLACE TABLE test_tb(c1 STRING);
Copy
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();
Copy

O código a seguir consulta a tabela test_tb, mas não retorna resultados porque nenhum dado foi inserido.

SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+

Espera e bloqueio enquanto um trabalho filho assíncrono é executado

No exemplo a seguir, o procedimento blockUntilDone executa um trabalho filho assíncrono que leva 5 segundos para terminar. Usando o método snowflake.snowpark.AsyncJob.result, o procedimento espera e retorna quando o trabalho termina.

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;
Copy

O código a seguir chama o procedimento blockUntilDone, que retorna após esperar 5 segundos.

CALL blockUntilDone();
Copy
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

Retorno de um erro após solicitar resultados de um trabalho filho assíncrono inacabado

No exemplo a seguir, o procedimento earlyReturn executa um trabalho filho assíncrono que leva 60 segundos para terminar. O procedimento então tenta retornar um DataFrame do resultado do trabalho antes que ele possa ser concluído. O resultado é um erro.

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;
Copy

O código a seguir chama o procedimento earlyReturn, retornando o erro.

CALL earlyReturn();
Copy
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

Conclusão de um trabalho pai antes que um trabalho filho termine, cancelamento do trabalho filho

No exemplo a seguir, o procedimento earlyCancelJob executa um trabalho filho assíncrono para inserir dados em uma tabela e leva 10 segundos para terminar. Entretanto, a tarefa dos pais — async_handler — retorna antes que o trabalho filho termine, o que cancela o trabalho filho.

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Copy

O código a seguir chama o procedimento earlyCancelJob. Em seguida, ele consulta a tabela test_tb, que não retorna nenhum resultado porque nenhum dado foi inserido pelo trabalho filho cancelado.

CALL earlyCancelJob();
SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+