Exemplos de manipuladores Python para procedimentos armazenados¶
Como executar tarefas simultâneas com processos de trabalhador¶
Você pode executar tarefas simultâneas usando processos de trabalho do Python. Você pode achar isso útil quando precisar executar tarefas paralelas que aproveitam vários núcleos de CPU em nós do warehouse.
Nota
A Snowflake recomenda que você não use o módulo de multiprocessamento integrado do Python.
Para contornar os casos em que o Bloqueio de intérprete global do Python impede que uma abordagem multitarefa se espalhe por todos os núcleos da CPU, você pode executar tarefas simultâneas usando processos de trabalho separados, em vez de threads.
Você pode fazer isso nos warehouses Snowflake usando a classe joblib
da biblioteca Parallel
, como no exemplo a seguir.
CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'joblib_multiprocessing'
PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt
def joblib_multiprocessing(session, i):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
return str(result)
$$;
Nota
O back-end padrão usado para joblib.Parallel
difere entre os warehouses padrão Snowflake e otimizados para Snowpark.
Padrão do warehouse:
threading
Padrão do warehouse otimizado para Snowpark:
loky
(multiprocessamento)
Você pode substituir a configuração de back-end padrão chamando a função joblib.parallel_backend
, como no exemplo a seguir.
import joblib
joblib.parallel_backend('loky')
Como usar APIs Snowpark para processamento assíncrono¶
Os exemplos a seguir ilustram como você pode usar as APIs Snowpark para iniciar trabalhos filho assíncronos, além de como esses trabalhos se comportam sob diferentes condições.
Verificação do status de um trabalho filho assíncrono¶
No exemplo a seguir, o procedimento checkStatus
executa um trabalho filho assíncrono que aguarda 60 segundos. O procedimento então verifica o status do trabalho antes que ele possa ser concluído, então a verificação retorna False
.
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
return async_job.is_done()
$$;
O código a seguir chama o procedimento.
CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False |
+-------------+
Cancelamento de um trabalho filho assíncrono¶
No exemplo a seguir, o procedimento cancelJob
usa SQL para inserir dados na tabela test_tb
com um trabalho filho assíncrono que levaria 10 segundos para terminar. Em seguida, ele cancela o trabalho filho antes que ele termine e os dados tenham sido inseridos.
CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
return async_job.cancel()
$$;
CALL cancelJob();
O código a seguir consulta a tabela test_tb
, mas não retorna resultados porque nenhum dado foi inserido.
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+
Espera e bloqueio enquanto um trabalho filho assíncrono é executado¶
No exemplo a seguir, o procedimento blockUntilDone
executa um trabalho filho assíncrono que leva 5 segundos para terminar. Usando o método snowflake.snowpark.AsyncJob.result
, o procedimento espera e retorna quando o trabalho termina.
CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(5)").collect_nowait()
return async_job.result()
$$;
O código a seguir chama o procedimento blockUntilDone
, que retorna após esperar 5 segundos.
CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+
Retorno de um erro após solicitar resultados de um trabalho filho assíncrono inacabado¶
No exemplo a seguir, o procedimento earlyReturn
executa um trabalho filho assíncrono que leva 60 segundos para terminar. O procedimento então tenta retornar um DataFrame
do resultado do trabalho antes que ele possa ser concluído. O resultado é um erro.
CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
df = async_job.to_df()
try:
df.collect()
except Exception as ex:
return 'Error: (02000): Result for query <UUID> has expired'
$$;
O código a seguir chama o procedimento earlyReturn
, retornando o erro.
CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired |
+------------------------------------------------------------+
Conclusão de um trabalho pai antes que um trabalho filho termine, cancelamento do trabalho filho¶
No exemplo a seguir, o procedimento earlyCancelJob
executa um trabalho filho assíncrono para inserir dados em uma tabela e leva 10 segundos para terminar. Entretanto, a tarefa dos pais — async_handler
— retorna antes que o trabalho filho termine, o que cancela o trabalho filho.
CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
O código a seguir chama o procedimento earlyCancelJob
. Em seguida, ele consulta a tabela test_tb
, que não retorna nenhum resultado porque nenhum dado foi inserido pelo trabalho filho cancelado.
CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+