Skip to content

Add support to write in parquet format #115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Mar 18, 2023
Merged

Conversation

srgk26
Copy link
Contributor

@srgk26 srgk26 commented Mar 9, 2023

This is a rather large PR with several different features and bugfixes. I've added comments along the way to further explain.

  1. The current implementation reads temporary JSON files line by line and appends them to a single output file in string mode. The following optimisations were made in this edit to more efficiently concatenate temporary JSON files:
  • Calling shutil.copyfileobj to append contents of one file to another in binary mode, in chunks of 16MB at a time.
  • To complement this, edited to open files in binary mode rather than plain text mode.
  1. Add support to write in parquet format:
  • The current version of master in the codebase has commented out dask.dataframe import. Uncommenting this line allows for dask.dataframe to be used to read JSON/CSV files and write as parquet.
  • The current version doesn't allow parquet format conversion from JSON format. Instead only the smaller, summarised versions of the output in CSV format (with varying delimiters and dtypes depending on output types ["imgt", "tabular", "airr"]) are read into dask dataframes before converting to parquet. Edits were made to first define dtypes to the JSON input. Apart from the 4 integer fields ["isotype_score", "junc_len", "cdr3_len", "mut_count_nt"], all other fields are given dtype as str. The dask dataframe are later written as parquet file with standard "snappy" compression, ignoring index writes.
  • A complete pyarrow schema is specified for each struct, field and datatype expected for both BCR and TCR dataset JSON output. This schema is then specified as an argument when writing to parquet file.
  • Coerced raw_position, position, codon, imgt_position, and imgt_codon to string type. The data types for these fields were a mixture of integers and floats, which were not only inconsistent but also not accurate as these numbers are simply genomic positions where arithmetic computations would not be performed.
  • Added code to read json file in chunk sizes of 256MiB (2**28 MiB), adding dtype information when reading into dataframe. Also producing metadata information by providing an empty pandas dataframe with the correct column order and data types for dask to use as reference. Then finally writing processed dataframe into parquet files.
  • Where output type is specified as json and writing to parquet is also enabled, edited to write parquet files in place of temporary JSON files. This saves time and compute in the writing and processing of JSON files.
  1. Fix chunk size:
  • The partitioning of fastq files was incorporating an extra unnecessary \n, which caused the handling by biopython to fail for that last chunk. This is now fixed.
  • Additionally, the suffix counter was wrong for the final chunk because the counter was incremented before the filename was created, whereas elsewhere the counter was incremented afterwards. This led to suffices: 0, 1, 2, ..., n-1, n+1, and hence missing the nth suffix. Also fixed.
  • A new test checks this change. To make this test easily specified, I changed the way command-line arguments are passed around to facilitate calling the main methods from tests.
  1. Remove specifying maxtasksperchild when creating multiprocess pool resource:

Abstar randomly hangs at the end of the multi-threaded process. One of the reasons might be that specifying maxtasksperchild when creating python's multiprocessing's pool resource while also in a Linux system where the default startmethod is fork is memory and thread unsafe. This leads to a RACE condition where the multiprocessing system is not able to safely end resources when the pool resource is terminated, resulting in an indefinite wait for a signal from the child process.

  1. Set matplotlib and numpy versions to requirements.txt as newer versions are incompatible with the codebase.
  2. Add .vscode to .gitignore to ignore vscode specific-config files.

briney and others added 24 commits January 13, 2022 07:12
fix bug when processing an input file (rather than sequences) using t…
convert floats to str in AIRR output building
Refactoring temporary json file concatenation
* Uncomment dask dataframe import.

* Remove json output type from list of parquet incompatible formats.

* Specify dtypes for dask dataframe read from json.

* Enable reading json files into dask dataframe and writing as parquet file.

* Enable files to be read in binary mode for all output formats.

* Change json output field for j gene from score to assigner_score.

* Revert change in line position to read file in binary mode.

* Converting IMGT positions from integers or floats to string.

* Coerce raw_position to be stringtype.

* Add schema for JSON fields datatypes to override when writing to parquet.

* Additional schema attributes.

* Convert schema to full pyarrow schema for full dataset.

* Add columns desired order and dtypes for dataframe metadata.

* Reorder dtype fields.

* Remove unneeded column and dtype information.

* Edit json reading and parquet writing code.

* Add additional schema attributes involved in BCR.

* Reorder schema fields.

* Reorder pyarrow schema.
* Replace string dtypes to object dtypes.

* Add function attribute to indicate if parquet will be written to `write_output` function.

* Write parquet files directly in place of temporary JSON files.

* Add flag to ignore datatype conversion errors when casting integer columns with NaNs, and change output file name.

* Edit concat_outputs to simply move files instead for parquet files generated from json output.

* Edit file path from string concatenation to os path join.

* Minor edit to ps.path.join.

* Added if statement to check if file exists before attempting to delete temporary file.

* Add `.snappy` file extension to parquet files.

* Simplified file name to simply moving to directory instead.

* Simplify specifying columns by changing `schema.names` to `dtypes`.

* Parse strings of dictionary into dictionary with `json.loads` before loading into dataframe.

* Read in temporary parquet files, repartition and write back parquet files.

* Remove setting writing metadata file in parquet to False as it's the default function argument.

* Remove unused imports.

* Remove if condition to check for temp files before deleting them.
* Fix chunking of fastq files

* ignore vscode
* Replace double quotation marks to single quotes for consistency with rest of codebase.

* Add empty line at EOF.

* Allow matplotlib to be installed to the latest version since scanpy has upgraded their matplotlib support.

* Add comments to better explain code edits.
@srgk26 srgk26 marked this pull request as ready for review March 9, 2023 13:23
@srgk26 srgk26 marked this pull request as draft March 9, 2023 13:23
@@ -440,32 +439,45 @@ def concat_outputs(input_file, temp_output_file_dicts, output_dir, args):
if args.gzip:
ohandle = gzip.open(ofile + ".gz", 'wb')
else:
ohandle = open(ofile, 'w')
ohandle = open(ofile, 'wb')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opening in binary mode is important to use shutil.copyfileobj.

with ohandle as out_file:
# JSON-formatted files don't have headers, so we don't worry about it
if output_type == 'json':
if output_type == 'json' and not args.parquet:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're not writing JSON files when args.parquet is passed.

for line in f:
out_file.write(line)
out_file.write('\n')
with open(temp_file, 'rb') as f:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly increases file transfer efficiency.

for j, line in enumerate(f):
if i == 0:
out_file.write(line)
elif j >= 1:
out_file.write(line)
out_file.write('\n')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing writing this new line, instead writing this new line in abstar/utils/output.py line 664 instead.


if output_type == 'json':
df = dd.read_parquet(os.path.dirname(temp_files[0])) # Read in all parquet files in temp dir
df.repartition(partition_size='100MB').to_parquet(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repartitioning is necessary to coalesce the large number of small parquet files produced by pandas, to fewer number of larger partititons by dask. This also has the advantage of existing as a single parquet file consisting of multiple partitions, rather than many small separate files.

@@ -96,8 +96,8 @@ def json_formatted(self):
j = {'in_frame': 'yes' if self.in_frame else 'no',
'length': self.length,
'sequence': self.sequence,
'position': self.imgt_position,
'codon': self.imgt_codon}
'position': str(self.imgt_position),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coercing as string, otherwise some values are integers while others are floating points for these IMGT positions. For example position 112.1. These positions do not have any numerical significance, hence they can instead be treated as string values.

@@ -240,7 +241,7 @@ def _build_json_output(self, raw=False):
'score': self.antibody.j.score,
'assigner_score': self.antibody.j.assigner_score,
'others': [{'full': o.full,
'score': o.assigner_score}
'assigner_score': o.assigner_score}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming this since for every other reference in the codebase, o.assigner_score have been assigned to the assigner_score key.

@@ -545,6 +546,70 @@ def get_parquet_dtypes(output_format):
if output_format.lower() == 'tabular':
dtypes = {'var_ins': 'object', 'var_del': 'object',
'var_muts_nt': 'object', 'var_muts_aa': 'object'}
elif output_format == "json":
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dtypes is important to read into a pandas dataframe appropriately.

@@ -545,6 +546,70 @@ def get_parquet_dtypes(output_format):
if output_format.lower() == 'tabular':
dtypes = {'var_ins': 'object', 'var_del': 'object',
'var_muts_nt': 'object', 'var_muts_aa': 'object'}
elif output_format == "json":
dtypes = {
'seq_id': object,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting as object instead of str to be able to accommodate nested dictionaries.

with open(output_file, 'w') as f:
f.write('\n'.join(output_dict[fmt]))

if fmt == 'json' and write_parquet:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly writing parquet files where possible, to save time and computation. It is unlikely a user would need to use both json and parquet at the same time.

@srgk26 srgk26 marked this pull request as ready for review March 9, 2023 16:08
@briney briney merged commit 7fbe9fe into brineylab:development Mar 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants