Skip to content

Add support for fetching and ingesting Arrow Table/RecordBatch data #375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mauropagano opened this issue Aug 5, 2024 · 42 comments
Closed
Labels
enhancement New feature or request patch available

Comments

@mauropagano
Copy link

mauropagano commented Aug 5, 2024

It's hard to deny how Arrow has become the standard de-facto for in-memory representation of tabular data.
Multiple competing product (Snowflake, BigQuery, etc) as well as libraries (ADBC, TurboODBC) enable extracting data as Arrow, unlocking better performance, lower memory usage and increased consistency / interoperability between platforms.

In the python space, there are a number of libraries, especially around data processing, data science and ML (pandas, polars, duckdb, etc) that work on Arrow data natively, normally via zero-copy.

It would be extremely beneficial to have python-oracledb be able to:

  1. Extract data as Arrow, something like cursor.fetch_as_arrow() that can return either an Arrow Table or RecordBatch from the query. This method could bypass a python representation, speed up data extraction and ultimately keep Oracle closer to where some processing occurs.
  2. Opposite direction, enabling ingesting Arrow Table/RecordBatch into the database, something like cursor.executemany("', arrow_object) could skip python representation, use less memory and ultimately entice users to rely more on Oracle for that processing that works better in-db / storing of data produced elsewhere
@mauropagano mauropagano added the enhancement New feature or request label Aug 5, 2024
@mauropagano mauropagano changed the title Add support for fetching and ingesting data as Arrow Tables/RecordBatch Add support for fetching and ingesting Arrow Table/RecordBatch data Aug 5, 2024
@anthony-tuininga
Copy link
Member

Thanks, @mauropagano. We will look into this and get back to you.

@cjbj
Copy link
Member

cjbj commented Aug 8, 2024

Tagging @aosingh who is exploring the space (e.g. he created apache/arrow-nanoarrow#502) and working with us on what python-oracledb could do.

@mauropagano if you have psuedocode of end-to-end uses of this datatype, and how it would plug in to Pandas, SQLAlchemy etc (or suggestions of what would need to be done to such libraries), please share.

@mauropagano
Copy link
Author

Thanks @cjbj

These are a couple quick pseudo-code examples that come to mind, but really this feature would enable saving time / memory of almost all use cases where data starts or land in Oracle and any of the modern python data processing/science libs are in the mix.

The main goal of this ER in my mind is to bypass going through python types just to immediately after going into arrow (or viceversa), that takes a significant toll on performance and memory for no real benefit.

Some examples below will look a bit clunky because as of now there is no yet a zero-copy way to create a pandas df from arrow(1), I believe

(1)lots of ways have been already added in pandas2 to go from at-rest formats to pandas with arrow backend, you just can't yet do "map" a df on top of that arrow table

Extracting data from Oracle to pandas because you need to apply fancy lib X that requires pandas

In batches, via pyarrow.RecordBatch

cursor.execute("....")
 # this would return a pyarrow.RecordBatch, could work like fetchmany()
for rb in cursor.fetch_record_batch(n=100_000):
     # the to_pandas() is because of (1) , but this is DRASTICALLY faster than tuple/dict -> pandas
     fancy_lib_output = fancy_lib.do_fancy_stuff(rb.to_pandas()) 

All at once, via pyarrow.Table

cursor.execute("....")
# same (1) applies, once to_pandas() can be skipped for some pd.DataFrame(arrow_obj) you'd save the temporary 2x memory 
df = cursor.fetch_arrow_table().to_pandas(self_destruct=True)
fancy_lib_output = fancy_lib.do_fancy_stuff(df) 

Ingesting data into Oracle

In batches, from pq

pf = pyarrow.parquet.ParquetFile(f)
# there are a lot of ways to do this, picking reading a single row_group at time vs iter_batches() just to minimize waste
for rg_num in range(pf.num_row_groups) :
     cursor.execute("insert into ...", pf.read_row_group(rg_num)) # executemany maybe? turboodbc calls it executemanycolumns()

All at once, via pandas.read_csv(..., dtype_backend="pyarrow") (somewhat example of (1) above, thought this is quite silly, if the goal is to just load csv there are better ways

df = pd.read_csv(f, dtype_backend="pyarrow") 
cursor.execute("insert into ...", df)

From other libraries with arrow backend, e.g. polars

for df_slice in df.iter_slices(n_rows=100_000):  # df here is polars.DataFrame
    cursor.execute("insert into ...", df_slice.to_arrow()) # to_arrow() is mostly zero-copy 

Hope it helps, anything else I can provide just let me know

@hvbtup
Copy link

hvbtup commented Aug 21, 2024

Note: Arrow is not the only format for storing big tables in a columnar format.
There is also the Apache Parquet format, but Oracle already supports reading and writing parquet files on the DB side, see https://blogs.oracle.com/datawarehousing/post/parquet-files-oracle-database.

I think what would generally be more useful is a SQL*Plus (and/or sqlcli) command to upload (or download) files as BLOB data from/to the client to the DB.

@mauropagano
Copy link
Author

@cjbj I was wondering if you guys made a decision about supporting Arrow or not

@cjbj
Copy link
Member

cjbj commented Jan 14, 2025

@mauropagano work is ongoing. There are some technical and legal things to be done before we can release anything.

@cjbj
Copy link
Member

cjbj commented Feb 17, 2025

An update on this project, thanks to @aosingh we are well on track for the next release of python-oracledb to have support for querying data into Python DataFrame Interchange Protocol format. (Inserting into the DB is a future task).

Comments welcome. (Tagging @mauropagano)

Examples might be like the following (subject to last minute changes!).

To use data in Pandas:

# Get an OracleDataFrame.
sql = "select id, name from SampleQueryTab order by id"
odf = connection.fetch_df_all(statement=sql, arraysize=100)

# Get a Pandas DataFrame from the data: this is a zero copy call
df = pandas.api.interchange.from_dataframe(odf)

# Perform various Pandas operations on the DataFrame
print(df.columns)

and to write in Parquet file format

sql = "select id, name from SampleQueryTab order by id"
pqwriter = None

for odf in connection.fetch_df_batches(statement=sql, size=FETCH_BATCH_SIZE):

    pyarrow_table = pyarrow.Table.from_arrays(
        arrays=odf.column_arrays(), names=odf.column_names()
    )

    if not pqwriter:
        pqwriter = pq.ParquetWriter(PARQUET_FILE_NAME, pyarrow_table.schema)

    print(f"Writing a batch of {odf.num_rows()} rows")
    pqwriter.write_table(pyarrow_table)

pqwriter.close()

We'll do some more testing before pushing anything to GitHub.

@mauropagano
Copy link
Author

Thanks @cjbj , this is really exciting!

Few questions based on the examples:

  1. What does .fetch_df_all() and .fetch_df_batches() return exactly? Asking because pyarrow.Table.from_arrays() is a little misleading (pyarrow.Table.from_dataframe() seems a bit more natural)
  2. Does it mean that python-oracledb will provide a concrete implementation of the DataFrame class (which is abstract in the protocol specification) ?
  3. Internally, does the API still fetches data from db into python objects? I'm assuming no (to unblock the perf gains this change can bring), just checking
  4. There is no mention about the arrow conversion being zero-copy, from the protocol spec that should be as well (not just pandas), again just checking
  5. Do you have a sense if the ingest will come in the release after the next or if it's a longer term goal?

Again, this is really exciting, thanks for making it happen!

@cjbj
Copy link
Member

cjbj commented Feb 17, 2025

@mauropagano They return OracleDataFrame which is an Oracle implementation of the DataFrame class shown in https://data-apis.org/dataframe-protocol/latest/API.html

Current doc is:

Connection.fetch_df_all(statement, parameters=None, arraysize=None)

Fetches all rows of the SQL query statement, returning them in an OracleDataFrame object. An empty OracleDataFrame is returned if there are no rows available.

The parameters parameter can be a list of tuples, where each tuple item maps to one bind variable placeholder in statement. It can also be a list of dictionaries, where the keys match the bind variable placeholder names in statement.

The arraysize parameter can specified to tune performance of fetching data across the network. It defaults to defaults.arraysize. Internally, the fetch_df_all()’s Cursor.prefetchrows size is always set to the value of the explicit or default arraysize parameter value.

and

Connection.fetch_df_batches(statement, parameters=None, size=None)

This returns an iterator yielding the next size rows of the SQL query statement in each iteration as an OracleDataFrame object. An empty OracleDataFrame is returned if there are no rows available.

The parameters parameter can be a list of tuples, where each tuple item maps to one bind variable placeholder in statement. It can also be a list of dictionaries, where the keys match the bind variable placeholder names in statement.

The size parameter controls the number of records fetched in each batch. It defaults to defaults.arraysize. Internally, the fetch_df_batches()’s Cursor.arraysize. and Cursor.prefetchrows sizes are always set to the value of the explicit or default size parameter value.

    sql = "select * from departments"
    odf = connection.fetch_df_all(statement=sql, arraysize=1000)
    print(type(odf))

gives:

<class 'oracledb.interchange.dataframe.OracleDataFrame'>

We'd be very happy to hear of suggestions. I'm tempted to designate the support as "pre-view" which will give us scope to break the API if better ideas come up during the early days.

I don't have a timeframe for INSERT support. I'd like it to be soon, but let us get this release out before we prioritize all the tasks to do.

@mauropagano
Copy link
Author

Thanks for sharing the docstrings.

I believe these two operations cover what most people would like to do here.
Even though I can't think of any significant change on top of what you shared, very much agreed the preview API makes sense this early on.

Is this going to work in both thin and thick mode?

Out of curiosity, do you have any very-rough-and-subject-to-change numbers around speedup and memory footprint changes?

@cjbj
Copy link
Member

cjbj commented Feb 18, 2025

It will work in Thin & Thick.

Performance is being checked; we have one area of improvement that needs further investigation post initial release.

I will update my previous comments and change to use the current API doc.

anthony-tuininga added a commit that referenced this issue Feb 18, 2025
zero copy interchange with popular data frame libraries (#375).
@anthony-tuininga
Copy link
Member

@mauropagano, the code has been uploaded. I have initiated a build which you can use for testing once it completes. Let us know any feedback you have!

@cjbj
Copy link
Member

cjbj commented Feb 18, 2025

@mauropagano
Copy link
Author

Quick thing, PEP 604 didn't make it until 3.10 so something like this

def __dlpack_device__(self) -> Tuple["DlpackDeviceType", int | None]

crashes on

TypeError: unsupported operand type(s) for |: 'type' and 'NoneType'

for version below 3.10. Reporting it since the wheels go all the way back to 3.8

@anthony-tuininga
Copy link
Member

Thanks, @mauropagano. I changed most of those but apparently missed one. I'll get that corrected!

@mauropagano
Copy link
Author

Thanks @anthony-tuininga , just let me know when you get a build going so I can follow that (I can test more on older versions)

@anthony-tuininga
Copy link
Member

The patches have been uploaded (including support for asyncio with data frames) and a new build has been initiated.

@mauropagano
Copy link
Author

Thanks @anthony-tuininga , will test as soon as it's out

Out of curiosity, are the .fetch_df* defined on top of the Connection object instead of the Cursor because you need to ensure all the other options are set at the cursor level and so you need to control a level up?

@anthony-tuininga
Copy link
Member

The primary reason is that in my understanding it doesn't make sense to have a cursor which allows for fetching rows natively, then swaps over to fetching Arrow data, then swaps back to fetching rows natively. My understanding is that if you want a dataframe, you want a dataframe containing all of the rows and not just some of them. Is there a legitimate need for swapping back and forth between data frames and native rows on the same cursor?

@mauropagano
Copy link
Author

IMO no, I agree with you removing the option to even open that door is a good decision.

@mauropagano
Copy link
Author

Just tested and it works great!
I forgot how slow pandas.api.interchange.from_dataframe() is :-)

Btw there is a comment in 5.1.9.2 that says pandas.api.interchange.from_dataframe() is zero-copy and unless I'm doing something wrong the call is not zero copy (since you can't have an Arrow-backed Pandas DF at this time, I believe).

From memory_profiler (SQL is just a select * from a materialization of dba_objects)

Filename: test_odf.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    25    108.1 MiB    108.1 MiB           1   @profile
    26                                         def f2():
    27    136.0 MiB     27.9 MiB           1       odf = conn.fetch_df_all(statement=SQL, arraysize=5000)
    28    136.3 MiB      0.3 MiB           1       t = pyarrow.interchange.from_dataframe(odf)
    29    136.3 MiB      0.0 MiB           1       return t


Done fetching
Filename: test_odf.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    31    136.3 MiB    136.3 MiB           1   @profile
    32                                         def f3():
    33    161.5 MiB     25.2 MiB           1       odf = conn.fetch_df_all(statement=SQL, arraysize=5000)
    34    161.5 MiB      0.0 MiB           1       print(f"Done fetching")
    35    242.6 MiB     81.1 MiB           1       df = pd.api.interchange.from_dataframe(odf)
    36    242.6 MiB      0.0 MiB           1       return df

@anthony-tuininga
Copy link
Member

Btw there is a comment in 5.1.9.2 that says pandas.api.interchange.from_dataframe() is zero-copy and unless I'm doing something wrong the call is not zero copy (since you can't have an Arrow-backed Pandas DF at this time, I believe).

I vaguely recall that there are circumstances under which zero copy does not happen -- such as when nulls exist, if I remember correctly. @aosingh would know better but he is away at the moment.

Glad to hear it is working well for you, though!

@cjbj
Copy link
Member

cjbj commented Feb 19, 2025

@mauropagano what were your memory & perf stats like compared with getting the same dataframe format in python-oracledb 2.5.1?

@hvbtup
Copy link

hvbtup commented Feb 19, 2025

This sounds cool.

On SQL*Net level, the data is probably still transferred the old-fashioned way.

I wonder if a future SQL*Net version could support a column-based protocol as well.
Couldn't that reduce the network transfer volume significantly?

@cjbj
Copy link
Member

cjbj commented Feb 19, 2025

I will see what the current thoughts are around this, particularly re VECTORS. The current protocol is very efficient for what it does now. Overall the data size would presumably be roughly the same, since it's the same data. But maybe there could be time benefits in some cases e.g when the data is stored in columns. There are a lot of DB layers, disk formats, Exadata storage cell magic things etc etc going on past the SQL*Net layer.

@mauropagano
Copy link
Author

what were your memory & perf stats like compared with getting the same dataframe format in python-oracledb 2.5.1?

About double the memory trying to get pandas off cursor.fetchall() straight.

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    14     71.9 MiB     71.9 MiB           1   @profile
    15                                         def f():
    16     71.9 MiB      0.0 MiB           1       with conn.cursor() as cursor:
    17     71.9 MiB      0.0 MiB           1           cursor.arraysize = 5000
    18     72.2 MiB      0.3 MiB           1           cursor.execute(SQL)
    19    126.9 MiB     54.6 MiB          29           df = pd.DataFrame(cursor.fetchall(), columns=[c[0] for c in cursor.description])
    20    126.9 MiB      0.0 MiB           1           return df

In my previous post, I believe pd.api.interchange.from_dataframe() is taking a very unhappy path internally, which is why it's more memory and so much slower.
Probably going arrow -> pandas is always a better idea (and it's what pandas does internally in case the df provided implements PyCapsule, I believe)

Perf wise it's about 50% slower, 0.6 secs for conn.fetch_df_all vs 0.9 for the code above, granted it's not exactly fair comparison as it's doing more work (but it's work I'd argue most people do not want done).

Btw outputs are not equivalent in case of NULLs, None vs NaN comes out, which is expected imo, just figure I'd say it loud.

@aosingh
Copy link
Member

aosingh commented Feb 20, 2025

@mauropagano Thank you for trying out the fetch_df_* APIs and giving us feedback.

I would like to answer some questions raised in the thread. Hopefully, this helps:

  • OracleDataFrame is an Arrow-backed DataFrame. python-oracledb fetches the data directly into Arrow arrays without going through Python values

  • Each column in OracleDataFrame exposes an Arrow PyCapsule interface giving access to the underlying Arrow array

  • pyarrow.Table can be constructed by directly accessing the arrow arrays as shown below. Here, pyarrow uses the aforementioned Arrow PyCapsule Interface to build a Table object

pyarrow_table = pyarrow.Table.from_arrays(
        arrays=odf.column_arrays(), names=odf.column_names()
    )

@mauropagano
Copy link
Author

@aosingh thanks so much for sharing additional info on the implementation!

python-oracledb fetches the data directly into Arrow arrays without going through Python values

This is really exciting!
It will also enable writing efficient dump to parquet libraries.

pyarrow.Table can be constructed by directly accessing the arrow arrays as shown below.

Why pa.Table.from_arrays() vs pa.interchange.from_dataframe()?
Is that because behind the scene the data is already Arrow? The latter seems to work just fine btw, asking to make sure I use the API the way it's intended

Also using pyarrow.Table.to_pandas() will be memory efficient because it uses pandas.ArrowDtype that allows using Arrow-backed data types for better memory efficiency and performance

From the pandas API (here), I would expect the DF to be converted via Arrow.
I'm AFK right now but I seems to recall OracleDataFrame didn't have __arrow_c_stream__ set, is that possible? If yes, why?

@mauropagano
Copy link
Author

NVM the question on pa.Table.from_arrays() vs interchange.from_dataframe(), former is WAY faster!

@mauropagano
Copy link
Author

@aosingh asking only since you mentioned, how much do you want to support Polars here?

Straight conversion from OracleDataFrame to polars doesn't work for a query on dba_objects.
I believe it's on the polars side but figure I'd share.

  File ".../test_odf.py", line 30, in f2
    t = pl.from_dataframe(odf)
  File ".../.venv/lib/python3.9/site-packages/polars/convert/general.py", line 883, in from_dataframe
    return from_dataframe(df, allow_copy=allow_copy)
  File ".../.venv/lib/python3.9/site-packages/polars/interchange/from_dataframe.py", line 46, in from_dataframe
    return _from_dataframe(
  File ".../.venv/lib/python3.9/site-packages/polars/interchange/from_dataframe.py", line 55, in _from_dataframe
    polars_chunk = _protocol_df_chunk_to_polars(chunk, allow_copy=allow_copy)
  File ".../.venv/lib/python3.9/site-packages/polars/interchange/from_dataframe.py", line 71, in _protocol_df_chunk_to_polars
    dtype = dtype_to_polars_dtype(column.dtype)
  File ".../.venv/lib/python3.9/site-packages/polars/interchange/utils.py", line 117, in dtype_to_polars_dtype
    return _temporal_dtype_to_polars_dtype(format_str, dtype)
  File ".../.venv/lib/python3.9/site-packages/polars/interchange/utils.py", line 145, in _temporal_dtype_to_polars_dtype
    raise NotImplementedError(msg)
NotImplementedError: unsupported temporal data type: (<DtypeKind.DATETIME: 22>, 64, 'tss:', '=')

@aosingh
Copy link
Member

aosingh commented Feb 21, 2025

@mauropagano

From the pandas API (here), I would expect the DF to be converted via Arrow.
I'm AFK right now but I seems to recall OracleDataFrame didn't have arrow_c_stream set, is that possible? If yes, why?

You are right, OracleDataFrame doesn't have the stream interface implemented yet. We have implemented __arrow_c_array() interface on individual column. To Implement the streaming interface __arrow_c_stream() for 2D data we will need to do some investigation. For now, pandas.from_dataframe() will use the __dataframe__() to build a pandas compatible dataframe. This is mostly zero-copy and I expect memory performance to be much better starting with pandas 3.0 (or whenever pandas switches to Arrow string types)

Straight conversion from OracleDataFrame to polars doesn't work for a query on dba_objects.
I believe it's on the polars side but figure I'd share.

Yes, correct. It seems polars' implementation of from_dataframe doesn't support the seconds timeunit. It supports milliseconds, microseconds and nanoseconds. I would try polars.from_arrow(pa.table) as a workaround.

@cjbj
Copy link
Member

cjbj commented Feb 21, 2025

We should review & update the examples.

@anthony-tuininga
Copy link
Member

This has been included in python-oracledb 3.0.0 which was just released.

@cjbj
Copy link
Member

cjbj commented Mar 3, 2025

Although we closed the issue, work will continue on in this area.

@cjbj
Copy link
Member

cjbj commented Mar 3, 2025

And some of that data frame optimization work has already been pushed to GitHub. A package build has been initiated here.

Anyone playing with Connection.fetch_df_all(), Connection.fetch_df_batches(), AsyncConnection.fetch_df_all(), or AsyncConnection.fetch_df_batches() might want to grab this new development build.

@mauropagano
Copy link
Author

Thanks @cjbj , @anthony-tuininga and @aosingh for making it happen! I've already pulled 3.0 and started using it!

These new changes will go in 3.0.1, right?

@cjbj
Copy link
Member

cjbj commented Mar 4, 2025

These new changes will go in 3.0.1, right?

Yes.

@dev-named-jay
Copy link

Just want to say THANK YOU to the team who worked on this!! I'm seeing, on average a 60% speed up in my extract scripts compared with pd.read_sql, some tables are as much as 78-85% faster.

With read_sql, I can use dtype to force some of my columns to str. Is that possible with fetch_df_batches?

@anthony-tuininga
Copy link
Member

Glad to hear that it is working well for you! Can you share the method you use for converting to string with read_sql? We don't have that capability at this time, but that can be considered for a future enhancement.

@cjbj
Copy link
Member

cjbj commented Mar 12, 2025

@dev-named-jay can you open a new issue (as an enhancement) so we can track & discuss your type mapping request?

Is there anything else you think would enhance interaction with data frame libraries?

@mauropagano
Copy link
Author

Reporting some results now that I was able to test this out in the wild:

  1. Creating (sparse) numpy ndarrays from DB table, between 2x and 3x faster, gain is both extract and being able to work directly in a more efficient format
  2. Creating (large) numpy recarrays from DB table, between 3x and 10x faster. The larger the data, the larger the benefit (and this is with non zero-copy).
  3. Creating PQ files from RecordBatch is about 3x faster and a lot more memory friendly

In general, this change enabled some serious performance improvements for me.

@cjbj
Copy link
Member

cjbj commented May 29, 2025

@mauropagano thanks for sharing. Those are impressive improvements!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request patch available
Projects
None yet
Development

No branches or pull requests

6 participants