1
1
import bz2
2
+ from datetime import timedelta
2
3
import gzip
3
4
from multiprocessing import Pool
4
5
import os
5
6
from shutil import copyfileobj
7
+ import subprocess
6
8
import time
7
9
8
10
import cxutils as cx
9
11
import lz4 .frame
10
12
import numpy as np
11
13
import pandas as pd
14
+ import wfdb
15
+ from wfdb .io ._signal import wfdbfmtres
12
16
import zstd
13
17
14
18
15
19
def compress_file (file , fmt , level ):
16
20
"""
17
- Compress and decompress a file with a particular fmt. Return
18
- compressed size and time for compression/decompression.
21
+ Compress and decompress a single file.
22
+
23
+ Parameters
24
+ ----------
25
+ file : str
26
+ Full file path
27
+ fmt : str
28
+ The compression format
29
+ level : int
30
+ The compression level
31
+
32
+ Returns
33
+ -------
34
+ compressed_size : int
35
+ Compressed file size in bytes
36
+ compression_time : float
37
+ Time taken to compress, in seconds.
38
+ decompression_time : float
39
+ Time taken to decompress, in seconds.
19
40
20
41
"""
21
-
22
42
with open (file , 'rb' ) as f_in :
23
43
u_data = f_in .read ()
24
44
@@ -42,34 +62,39 @@ def compress_file(file, fmt, level):
42
62
c_data = lz4 .frame .compress (u_data , compression_level = level )
43
63
t1 = time .time ()
44
64
u_data = lz4 .frame .decompress (c_data )
45
-
46
- compressed_size = len (c_data )
65
+ elif fmt == 'flac' :
66
+ # command line processing
67
+ record = wfdb .rdheader (file [:- 4 ])
68
+ out_file = os .path .join ('/home/cx1111/Downloads/writedir/' , os .path .basename (file ).strip ('.dat' ) + '.flac' )
69
+ # Write the file since we need to decompress it
70
+ compress_command = "flac %s --endian=little --channels=%d --sample-rate=%d --bps=%d --sign=signed -%d -o %s" % (
71
+ file , record .n_sig , record .fs , wfdbfmtres (record .fmt [0 ]), level , out_file )
72
+ subprocess .run (compress_command , shell = True )
73
+ t1 = time .time ()
74
+ decompress_command = "flac -d %s -c" % out_file
75
+ subprocess .run (decompress_command , shell = True )
47
76
48
77
t2 = time .time ()
49
- t_compress = t1 - t0
50
- t_decompress = t2 - t1
51
78
52
- return compressed_size , t_compress , t_decompress
79
+ if fmt == 'flac' :
80
+ compressed_size = os .path .getsize (out_file )
81
+ os .remove (out_file )
82
+ else :
83
+ compressed_size = len (c_data )
53
84
85
+ compression_time = t1 - t0
86
+ decompression_time = t2 - t1
54
87
55
- # can change header
56
- # flac, wabpack
57
- def test_compression (fmt , compress_level ):
58
- """
59
- Test compression on target dat files.
60
-
61
- From mitdb and first 50 patient records of mimic3wdb/matched/
88
+ return compressed_size , compression_time , decompression_time
62
89
63
- Total size is about 10 Gb.
64
90
91
+ def test_compression (fmt , compress_level , test_dat_files ):
65
92
"""
66
- data_dirs = (['/home/cx1111/Downloads/data/mitdb' ]
67
- + cx .list_dirs ('/home/cx1111/Downloads/data/mimic3wdb/matched' ))
68
-
69
- test_dat_files = cx .list_files (data_dirs )
93
+ Test a type of compression of a specified level, on all target dat
94
+ files.
70
95
96
+ """
71
97
n_files = len (test_dat_files )
72
-
73
98
uncompressed_sizes = [os .path .getsize (file ) for file in test_dat_files ]
74
99
75
100
with Pool (os .cpu_count () - 1 ) as pool :
@@ -79,92 +104,65 @@ def test_compression(fmt, compress_level):
79
104
n_files * [compress_level ]))
80
105
compressed_sizes , compression_times , decompression_times = zip (* output )
81
106
82
- uncompressed_sizes = np . array ( uncompressed_sizes )
83
- compressed_sizes = np .array (compressed_sizes )
84
- decompression_times = np .array ( decompression_times )
85
- compression_ratios = uncompressed_sizes / compressed_sizes
107
+ # Calculate performance summary
108
+ compression_ratio = np .sum ( uncompressed_sizes ) / np . sum (compressed_sizes )
109
+ compression_time = np .sum ( compression_times )
110
+ decompression_time = np . sum ( decompression_times )
86
111
87
- # Return the compression ratios and time taken
88
- return (uncompressed_sizes , compressed_sizes , compression_ratios ,
89
- compression_times , decompression_times )
112
+ return compression_ratio , compression_time , decompression_time
90
113
91
114
92
- def summarize_compression (uncompressed_sizes , compressed_sizes ,
93
- compression_ratios , compression_times ,
94
- decompression_times , mode = 'print' ):
115
+ def compare_compressions (fmts , compress_levels ):
95
116
"""
96
- Print or return a summary of the compression
117
+ For each compression format/level pair, run the full compression
118
+ test. Return the aggregate results. Rounds to nearest second.
97
119
98
- Input parameters are outputs of `test_compression`.
120
+ The data is the waveforms of the first 100 patients
121
+ mimic3wdb/matched/ Total size is about 22 Gb.
122
+
123
+ Returns
124
+ -------
125
+ compression_results : pandas dataframe
126
+ Dataframe of results for each compression format/level combination.
127
+ Results include compression ratio, compression time, and decompression
128
+ time.
129
+ dataset_info : dict
130
+ Dictionary of
99
131
100
132
"""
101
- n_files = len (uncompressed_sizes )
102
- uncompressed_total = np .sum (uncompressed_sizes )
103
- compressed_total = np .sum (compressed_sizes )
104
-
105
- overall_compression_ratio = uncompressed_total / compressed_total
106
-
107
- # Sum of min(compressed, uncompressed) for all files
108
- smallest_total = np .sum ([min (uncompressed_sizes [i ], compressed_sizes [i ])
109
- for i in range (n_files )])
110
- smallest_overall_compression_ratio = uncompressed_total / smallest_total
111
-
112
- # Total times
113
- t_compress = np .sum (compression_times )
114
- t_decompress = np .sum (decompression_times )
115
-
116
- if mode == 'print' :
117
- print ('Number of files compressed: %d' % n_files )
118
- print ('Total size of uncompressed files: %s'
119
- % cx .readable_size (uncompressed_total , 'string' ))
120
- print ('Total size of compressed files: %s'
121
- % cx .readable_size (compressed_total , 'string' ))
122
- print ('Overall compression ratio: %.2f'
123
- % overall_compression_ratio )
124
- print ('Overall compression ratio without compressing inflated files: %.2f'
125
- % smallest_overall_compression_ratio )
126
- print ('Total compression time: %.2f' % t_compress )
127
- print ('Total compression time: %.2f' % t_decompress )
128
- else :
129
- return (n_files , uncompressed_total , compressed_total ,
130
- overall_compression_ratio , t_compress , t_decompress )
133
+ # Files to be compressed
134
+ data_dirs = cx .list_dirs ('/home/cx1111/Downloads/data/mimic3wdb/matched' )
135
+ test_dat_files = cx .list_files (data_dirs , extensions = ['dat' ])
131
136
137
+ # kloogy inaccurate fix for flac files
138
+ if 'flac' in fmts :
139
+ test_dat_files = [file for file in test_dat_files if not file .endswith ('n.dat' )]
132
140
133
- def compare_compressions ( fmts , compress_levels ):
134
- """
135
- Run the compression tests and summarize the results of multiple
136
- formats/compress pairs.
141
+ n_files = len ( test_dat_files )
142
+ uncompressed_sizes = [ os . path . getsize ( file ) for file in test_dat_files ]
143
+ uncompressed_total = cx . readable_size ( np . sum ( uncompressed_sizes ))
144
+ dataset_info = { 'n_files' : n_files , 'uncompressed_total' : uncompressed_total }
137
145
138
- """
139
- df = pd .DataFrame (columns = ['fmt' , 'compress_level' , 'n_files' ,
140
- 'uncompressed_total' , 'compressed_total' ,
141
- 'compression_ratio' , 'time_compress' ,
142
- 'time_decompress' ])
146
+ # Compression results
147
+ compression_results = pd .DataFrame (columns = ['fmt' , 'compress_level' ,
148
+ 'compression_ratio' , 'time_compress' ,
149
+ 'time_decompress' ])
143
150
144
151
# Iterate through formats and compress levels
145
152
for i in range (len (fmts )):
146
153
fmt = fmts [i ]
147
154
compress_level = compress_levels [i ]
148
- print ('Testing fmt: %s, compress level: %d' % (fmt , compress_level ))
149
-
150
- (uncompressed_sizes , compressed_sizes , compression_ratios ,
151
- compression_times , decompression_times ) = test_compression (fmt = fmt ,
152
- compress_level = compress_level )
153
-
154
- (n_files , uncompressed_total ,
155
- compressed_total ,
156
- overall_compression_ratio ,
157
- t_compress , t_decompress ) = summarize_compression (uncompressed_sizes ,
158
- compressed_sizes ,
159
- compression_ratios ,
160
- compression_times ,
161
- decompression_times ,
162
- mode = 'return' )
163
-
164
- df .loc [i ] = [fmt , compress_level , n_files ,
165
- cx .readable_size (uncompressed_total , 'string' ),
166
- cx .readable_size (compressed_total , 'string' ),
167
- '%.2f' % overall_compression_ratio , '%.2f' % t_compress ,
168
- '%.2f' % t_decompress ]
169
-
170
- return df
155
+ print ('Testing %s with compress level=%d ...' % (fmt , compress_level ))
156
+
157
+ (compression_ratio , compression_time ,
158
+ decompression_time ) = test_compression (fmt = fmt ,
159
+ compress_level = compress_level ,
160
+ test_dat_files = test_dat_files )
161
+
162
+ compression_results .loc [i ] = [fmt , compress_level ,
163
+ '%.2f' % compression_ratio ,
164
+ str (timedelta (seconds = int (compression_time ))),
165
+ str (timedelta (seconds = int (decompression_time )))]
166
+
167
+ print ('Full benchmark complete' )
168
+ return compression_results , dataset_info
0 commit comments