6
6
from copy import deepcopy
7
7
from glob import glob
8
8
import os
9
+ import pickle
9
10
import pwd
10
11
import shutil
11
12
from socket import gethostname
@@ -55,7 +56,7 @@ def report_crash(node, traceback=None, hostname=None):
55
56
exc_traceback )
56
57
timeofcrash = strftime ('%Y%m%d-%H%M%S' )
57
58
login_name = pwd .getpwuid (os .geteuid ())[0 ]
58
- crashfile = 'crash-%s-%s-%s.npz ' % (timeofcrash ,
59
+ crashfile = 'crash-%s-%s-%s.pklz ' % (timeofcrash ,
59
60
login_name ,
60
61
name )
61
62
crashdir = node .config ['execution' ]['crashdump_dir' ]
@@ -66,7 +67,8 @@ def report_crash(node, traceback=None, hostname=None):
66
67
crashfile = os .path .join (crashdir , crashfile )
67
68
logger .info ('Saving crash info to %s' % crashfile )
68
69
logger .info ('' .join (traceback ))
69
- np .savez (crashfile , node = node , traceback = traceback )
70
+ savepkl (crashfile , dict (node = node , traceback = traceback ))
71
+ #np.savez(crashfile, node=node, traceback=traceback)
70
72
return crashfile
71
73
72
74
@@ -303,8 +305,8 @@ def _submit_mapnode(self, jobid):
303
305
self .mapnodesubids [self .depidx .shape [0 ] + i ] = jobid
304
306
self .procs .extend (mapnodesubids )
305
307
self .depidx = ssp .vstack ((self .depidx ,
306
- ssp .lil_matrix (np .zeros (( numnodes ,
307
- self .depidx .shape [1 ])))),
308
+ ssp .lil_matrix (np .zeros (
309
+ ( numnodes , self .depidx .shape [1 ])))),
308
310
'lil' )
309
311
self .depidx = ssp .hstack ((self .depidx ,
310
312
ssp .lil_matrix (
@@ -349,16 +351,19 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
349
351
if self ._status_callback :
350
352
self ._status_callback (self .procs [jobid ], 'start' )
351
353
continue_with_submission = True
352
- if str2bool (self .procs [jobid ].config ['execution' ]['local_hash_check' ]):
354
+ if str2bool (self .procs [jobid ].config ['execution' ]
355
+ ['local_hash_check' ]):
353
356
logger .debug ('checking hash locally' )
354
357
try :
355
358
hash_exists , _ , _ , _ = self .procs [
356
359
jobid ].hash_exists ()
357
360
logger .debug ('Hash exists %s' % str (hash_exists ))
358
361
if (hash_exists and
359
- (self .procs [jobid ].overwrite == False or
360
- (self .procs [jobid ].overwrite == None and
361
- not self .procs [jobid ]._interface .always_run ))):
362
+ (self .procs [jobid ].overwrite == False or
363
+ (self .procs [jobid ].overwrite == None and
364
+ not self .procs [jobid ]._interface .always_run )
365
+ )
366
+ ):
362
367
continue_with_submission = False
363
368
self ._task_finished_cb (jobid )
364
369
self ._remove_node_dirs ()
@@ -436,7 +441,8 @@ def _remove_node_dirs(self):
436
441
"""Removes directories whose outputs have already been used up
437
442
"""
438
443
if str2bool (self ._config ['execution' ]['remove_node_directories' ]):
439
- for idx in np .nonzero ((self .refidx .sum (axis = 1 ) == 0 ).__array__ ())[0 ]:
444
+ for idx in np .nonzero (
445
+ (self .refidx .sum (axis = 1 ) == 0 ).__array__ ())[0 ]:
440
446
if idx in self .mapnodesubids :
441
447
continue
442
448
if self .proc_done [idx ] and (not self .proc_pending [idx ]):
@@ -506,9 +512,13 @@ def _get_result(self, taskid):
506
512
'traceback' : None }
507
513
results_file = None
508
514
try :
509
- raise IOError (('Job (%s) finished or terminated, but results file '
510
- 'does not exist. Batch dir contains crashdump '
511
- 'file if node raised an exception' % node_dir ))
515
+ error_message = ('Job id ({0}) finished or terminated, but '
516
+ 'results file does not exist after ({1}) '
517
+ 'seconds. Batch dir contains crashdump file '
518
+ 'if node raised an exception.\n '
519
+ 'Node working directory: ({2}) ' .format (
520
+ taskid ,timeout ,node_dir ) )
521
+ raise IOError (error_message )
512
522
except IOError , e :
513
523
result_data ['traceback' ] = format_exc ()
514
524
else :
@@ -582,13 +592,17 @@ def _get_args(self, node, keywords):
582
592
value = getattr (self , "_" + keyword )
583
593
if keyword == "template" and os .path .isfile (value ):
584
594
value = open (value ).read ()
585
- if hasattr (node , "plugin_args" ) and isinstance (node .plugin_args , dict ) and keyword in node .plugin_args :
586
- if keyword == "template" and os .path .isfile (node .plugin_args [keyword ]):
595
+ if (hasattr (node , "plugin_args" ) and
596
+ isinstance (node .plugin_args , dict ) and
597
+ keyword in node .plugin_args ):
598
+ if (keyword == "template" and
599
+ os .path .isfile (node .plugin_args [keyword ])):
587
600
tmp_value = open (node .plugin_args [keyword ]).read ()
588
601
else :
589
602
tmp_value = node .plugin_args [keyword ]
590
603
591
- if 'overwrite' in node .plugin_args and node .plugin_args ['overwrite' ]:
604
+ if ('overwrite' in node .plugin_args and
605
+ node .plugin_args ['overwrite' ]):
592
606
value = tmp_value
593
607
else :
594
608
value += tmp_value
0 commit comments