저장 프로시저용 Python 처리기 예

워커 프로세스로 동시 작업 실행하기

Python 작업자 프로세스를 사용하여 동시 작업을 실행할 수 있습니다. 웨어하우스 노드에서 여러 CPU 코어를 활용하는 병렬 작업을 실행해야 할 때 이 기능이 유용할 수 있습니다.

참고

기본 제공된 Python 다중 처리 모듈을 사용하지 않는 것이 좋습니다.

Python Global Interpreter Lock 으로 인해 멀티태스킹 접근 방식이 모든 CPU 코어에서 확장되지 못하는 문제를 해결하려면 스레드가 아닌 별도의 작업자 프로세스를 사용하여 동시 작업을 실행할 수 있습니다.

다음 예에서처럼 joblib 라이브러리의 Parallel 클래스를 사용하여 Snowflake 웨어하우스에서 이 작업을 수행할 수 있습니다.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

참고

joblib.Parallel 에 사용되는 기본 백엔드는 Snowflake 표준과 Snowpark 최적화 웨어하우스 간에 다릅니다.

  • 표준 웨어하우스 기본값: threading

  • Snowpark 최적화 웨어하우스 기본값: loky (다중 처리)

다음 예에서처럼 joblib.parallel_backend 함수를 호출하여 기본 백엔드 설정을 재정의할 수 있습니다.

import joblib
joblib.parallel_backend('loky')
Copy

비동기 처리에 Snowpark APIs 사용하기

다음 예제에서는 Snowpark APIs를 사용하여 비동기 하위 작업을 시작하는 방법과 다양한 조건에서 이러한 작업이 어떻게 작동하는지 설명합니다.

비동기 하위 작업의 상태 확인하기

다음 예제에서 checkStatus 프로시저는 60초 동안 기다리는 비동기 하위 작업을 실행합니다. 그런 다음 프로시저는 작업이 완료되기 전에 작업의 상태를 확인하므로 확인은 False 를 반환합니다.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;
Copy

다음 코드는 프로시저를 호출합니다.

CALL checkStatus();
Copy
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

비동기 하위 작업 취소하기

다음 예제에서, cancelJob 프로시저는 SQL을 사용하여 완료하는 데 10초가 걸리는 비동기 하위 작업으로 test_tb 테이블에 데이터를 삽입합니다. 그런 다음 작업이 완료되고 데이터가 삽입되기 전에 하위 작업을 취소합니다.

CREATE OR REPLACE TABLE test_tb(c1 STRING);
Copy
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();
Copy

다음 코드는 test_tb 테이블을 쿼리하지만, 데이터가 삽입되지 않았기 때문에 결과를 반환하지 않습니다.

SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+

비동기 하위 작업이 실행되는 동안 대기 및 차단하기

다음 예제에서, blockUntilDone 프로시저는 완료하는 데 5초가 걸리는 비동기 하위 작업을 실행합니다. snowflake.snowpark.AsyncJob.result 메서드를 사용하면 프로시저가 대기하다가 작업이 완료되면 반환합니다.

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;
Copy

다음 코드는 5초를 기다린 후 반환되는 blockUntilDone 프로시저를 호출합니다.

CALL blockUntilDone();
Copy
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

완료되지 않은 비동기 하위 작업의 결과를 요청한 후 오류 반환하기

다음 예제에서, earlyReturn 프로시저는 완료하는 데 60초가 걸리는 비동기 하위 작업을 실행합니다. 그런 다음 프로시저는 작업이 완료되기 전에 작업 결과에서 DataFrame 의 반환을 시도합니다. 결과적으로 오류가 발생합니다.

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;
Copy

다음 코드는 earlyReturn 프로시저를 호출하여 오류를 반환합니다.

CALL earlyReturn();
Copy
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

하위 작업이 완료되기 전에 상위 작업을 완료하고 하위 작업 취소하기

다음 예제에서, earlyCancelJob 프로시저는 비동기 하위 작업을 실행하여 테이블에 데이터를 삽입하고 완료하는 데 10초가 걸립니다. 그러나 상위 작업 async_handler 는 하위 작업이 완료되기 전에 반환되어 하위 작업이 취소됩니다.

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Copy

다음 코드는 earlyCancelJob 프로시저를 호출합니다. 그런 다음 test_tb 테이블을 쿼리하며, 이 작업은 취소된 하위 작업에서 삽입된 데이터가 없기 때문에 결과를 반환하지 않습니다.

CALL earlyCancelJob();
SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+