-
Notifications
You must be signed in to change notification settings - Fork 16
Add support to write in parquet format #115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fix bug when processing an input file (rather than sequences) using t…
convert floats to str in AIRR output building
bump version to v0.5.8
Refactoring temporary json file concatenation
* Uncomment dask dataframe import. * Remove json output type from list of parquet incompatible formats. * Specify dtypes for dask dataframe read from json. * Enable reading json files into dask dataframe and writing as parquet file. * Enable files to be read in binary mode for all output formats. * Change json output field for j gene from score to assigner_score. * Revert change in line position to read file in binary mode. * Converting IMGT positions from integers or floats to string. * Coerce raw_position to be stringtype. * Add schema for JSON fields datatypes to override when writing to parquet. * Additional schema attributes. * Convert schema to full pyarrow schema for full dataset. * Add columns desired order and dtypes for dataframe metadata. * Reorder dtype fields. * Remove unneeded column and dtype information. * Edit json reading and parquet writing code. * Add additional schema attributes involved in BCR. * Reorder schema fields. * Reorder pyarrow schema.
* Replace string dtypes to object dtypes. * Add function attribute to indicate if parquet will be written to `write_output` function. * Write parquet files directly in place of temporary JSON files. * Add flag to ignore datatype conversion errors when casting integer columns with NaNs, and change output file name. * Edit concat_outputs to simply move files instead for parquet files generated from json output. * Edit file path from string concatenation to os path join. * Minor edit to ps.path.join. * Added if statement to check if file exists before attempting to delete temporary file. * Add `.snappy` file extension to parquet files. * Simplified file name to simply moving to directory instead. * Simplify specifying columns by changing `schema.names` to `dtypes`. * Parse strings of dictionary into dictionary with `json.loads` before loading into dataframe. * Read in temporary parquet files, repartition and write back parquet files. * Remove setting writing metadata file in parquet to False as it's the default function argument. * Remove unused imports. * Remove if condition to check for temp files before deleting them.
* Fix chunking of fastq files * ignore vscode
* Replace double quotation marks to single quotes for consistency with rest of codebase. * Add empty line at EOF. * Allow matplotlib to be installed to the latest version since scanpy has upgraded their matplotlib support. * Add comments to better explain code edits.
@@ -440,32 +439,45 @@ def concat_outputs(input_file, temp_output_file_dicts, output_dir, args): | |||
if args.gzip: | |||
ohandle = gzip.open(ofile + ".gz", 'wb') | |||
else: | |||
ohandle = open(ofile, 'w') | |||
ohandle = open(ofile, 'wb') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opening in binary mode is important to use shutil.copyfileobj
.
with ohandle as out_file: | ||
# JSON-formatted files don't have headers, so we don't worry about it | ||
if output_type == 'json': | ||
if output_type == 'json' and not args.parquet: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're not writing JSON files when args.parquet
is passed.
for line in f: | ||
out_file.write(line) | ||
out_file.write('\n') | ||
with open(temp_file, 'rb') as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly increases file transfer efficiency.
for j, line in enumerate(f): | ||
if i == 0: | ||
out_file.write(line) | ||
elif j >= 1: | ||
out_file.write(line) | ||
out_file.write('\n') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing writing this new line, instead writing this new line in abstar/utils/output.py
line 664 instead.
|
||
if output_type == 'json': | ||
df = dd.read_parquet(os.path.dirname(temp_files[0])) # Read in all parquet files in temp dir | ||
df.repartition(partition_size='100MB').to_parquet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repartitioning is necessary to coalesce the large number of small parquet files produced by pandas, to fewer number of larger partititons by dask. This also has the advantage of existing as a single parquet file consisting of multiple partitions, rather than many small separate files.
@@ -96,8 +96,8 @@ def json_formatted(self): | |||
j = {'in_frame': 'yes' if self.in_frame else 'no', | |||
'length': self.length, | |||
'sequence': self.sequence, | |||
'position': self.imgt_position, | |||
'codon': self.imgt_codon} | |||
'position': str(self.imgt_position), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coercing as string, otherwise some values are integers while others are floating points for these IMGT positions. For example position 112.1
. These positions do not have any numerical significance, hence they can instead be treated as string values.
@@ -240,7 +241,7 @@ def _build_json_output(self, raw=False): | |||
'score': self.antibody.j.score, | |||
'assigner_score': self.antibody.j.assigner_score, | |||
'others': [{'full': o.full, | |||
'score': o.assigner_score} | |||
'assigner_score': o.assigner_score} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming this since for every other reference in the codebase, o.assigner_score
have been assigned to the assigner_score
key.
@@ -545,6 +546,70 @@ def get_parquet_dtypes(output_format): | |||
if output_format.lower() == 'tabular': | |||
dtypes = {'var_ins': 'object', 'var_del': 'object', | |||
'var_muts_nt': 'object', 'var_muts_aa': 'object'} | |||
elif output_format == "json": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dtypes
is important to read into a pandas dataframe appropriately.
@@ -545,6 +546,70 @@ def get_parquet_dtypes(output_format): | |||
if output_format.lower() == 'tabular': | |||
dtypes = {'var_ins': 'object', 'var_del': 'object', | |||
'var_muts_nt': 'object', 'var_muts_aa': 'object'} | |||
elif output_format == "json": | |||
dtypes = { | |||
'seq_id': object, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting as object
instead of str
to be able to accommodate nested dictionaries.
with open(output_file, 'w') as f: | ||
f.write('\n'.join(output_dict[fmt])) | ||
|
||
if fmt == 'json' and write_parquet: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Directly writing parquet files where possible, to save time and computation. It is unlikely a user would need to use both json
and parquet
at the same time.
This is a rather large PR with several different features and bugfixes. I've added comments along the way to further explain.
shutil.copyfileobj
to append contents of one file to another in binary mode, in chunks of 16MB at a time.master
in the codebase has commented outdask.dataframe
import. Uncommenting this line allows fordask.dataframe
to be used to read JSON/CSV files and write as parquet.dtypes
depending on output types["imgt", "tabular", "airr"]
) are read into dask dataframes before converting to parquet. Edits were made to first definedtypes
to the JSON input. Apart from the 4 integer fields["isotype_score", "junc_len", "cdr3_len", "mut_count_nt"]
, all other fields are givendtype
asstr
. The dask dataframe are later written as parquet file with standard"snappy"
compression, ignoring index writes.raw_position
,position
,codon
,imgt_position
, andimgt_codon
to string type. The data types for these fields were a mixture of integers and floats, which were not only inconsistent but also not accurate as these numbers are simply genomic positions where arithmetic computations would not be performed.2**28
MiB), addingdtype
information when reading into dataframe. Also producing metadata information by providing an empty pandas dataframe with the correct column order and data types for dask to use as reference. Then finally writing processed dataframe into parquet files.json
and writing to parquet is also enabled, edited to write parquet files in place of temporary JSON files. This saves time and compute in the writing and processing of JSON files.\n
, which caused the handling bybiopython
to fail for that last chunk. This is now fixed.maxtasksperchild
when creating multiprocess pool resource:Abstar randomly hangs at the end of the multi-threaded process. One of the reasons might be that specifying
maxtasksperchild
when creating python'smultiprocessing
's pool resource while also in a Linux system where the default startmethod is fork is memory and thread unsafe. This leads to a RACE condition where the multiprocessing system is not able to safely end resources when the pool resource is terminated, resulting in an indefinite wait for a signal from the child process.matplotlib
andnumpy
versions torequirements.txt
as newer versions are incompatible with the codebase..vscode
to.gitignore
to ignore vscode specific-config files.