@@ -7,8 +7,6 @@ use std::fmt::Debug;
7
7
use std:: io:: Write ;
8
8
use tokenizer:: Operator ;
9
9
10
- #[ cfg( feature = "rayon" ) ]
11
- use rayon:: iter:: IntoParallelRefIterator ;
12
10
#[ cfg( feature = "rayon" ) ]
13
11
use rayon:: prelude:: * ;
14
12
@@ -74,58 +72,43 @@ impl<
74
72
}
75
73
76
74
/// Renders a template across multiple items in parallel using Rayon with
77
- /// convenient internally-managed buffers.
75
+ /// convenient internally-managed buffers. par_chunk_size controls the number
76
+ /// of iterations that are rendered on each thread before locking the mutex
77
+ /// that wraps `output` to write out the work done so far.
78
78
///
79
- /// NOTE: This function makes serious trade-offs to enable the _maximum_ throughput.
80
- /// It is far less efficient, and builds up a single buffer containing all results
81
- /// prior to writing this buffer into the output, so it can consume much more memory,
82
- /// and the latency to first-write is also significantly higher.
79
+ /// NOTE: This function makes trade-offs to enable the _maximum_ throughput.
80
+ /// It is less efficient, but given the right par_chunk_size and right number
81
+ /// of cores, it can increase total throughput.
83
82
///
84
- /// Only use if total throughput is the sole concern .
83
+ /// A recommended starting point for par_chunk_size is 20 .
85
84
#[ cfg( feature = "rayon" ) ]
86
- pub fn par_render < ' b , RunnerItem > (
85
+ pub fn par_render < ' b , RunnerItem , Writer > (
87
86
& self ,
88
87
runner : & [ RunnerItem ] ,
89
- output : & mut Write ,
90
- ) -> Result < ( ) , Vec < :: std:: io:: Error > >
88
+ output : & mut Writer ,
89
+ par_chunk_size : usize ,
90
+ ) -> Result < ( ) , :: std:: io:: Error >
91
91
where
92
92
RunnerItem : ' b + Runner < NumEnum , StrEnum , FilterEnum > + Send + Sync ,
93
+ Writer : Write + Send ,
93
94
{
94
95
thread_local ! ( static STORE : ( Vec <f64 >, String ) = ( Vec :: with_capacity( 8 ) , String :: with_capacity( 8 ) ) ) ;
95
96
96
- let results: Result < Vec < u8 > , Vec < :: std:: io:: Error > > = runner
97
- . par_iter ( )
98
- . map ( |item| {
97
+ let output = :: std:: sync:: Mutex :: new ( output) ;
98
+
99
+ runner
100
+ . par_chunks ( par_chunk_size)
101
+ . map ( |items| {
99
102
STORE . with ( |( ref mut stack, ref mut buffer) | {
100
103
let mut write_buf = Vec :: with_capacity ( 8 ) ;
101
- write_buf. clear ( ) ;
102
- self . render_with ( item, & mut write_buf, stack, buffer)
103
- . map ( |_| write_buf)
104
- . map_err ( |e| vec ! [ e] )
104
+ for item in items {
105
+ self . render_with ( item, & mut write_buf, stack, buffer) ?;
106
+ }
107
+ output. lock ( ) . unwrap ( ) . write_all ( & write_buf) ?;
108
+ return Ok ( ( ) ) ;
105
109
} )
106
110
} )
107
- . reduce (
108
- || Ok ( Vec :: with_capacity ( 32 ) ) ,
109
- |acc, item| match ( acc, item) {
110
- ( Ok ( mut buf) , Ok ( new_buf) ) => {
111
- buf. extend ( new_buf) ;
112
- Ok ( buf)
113
- }
114
- ( Err ( mut errors) , Err ( new_errors) ) => {
115
- errors. extend ( new_errors) ;
116
- Err ( errors)
117
- }
118
- ( Err ( errs) , _) | ( _, Err ( errs) ) => Err ( errs) ,
119
- } ,
120
- ) ;
121
-
122
- match results {
123
- Ok ( result) => match output. write_all ( & result) {
124
- Ok ( _) => Ok ( ( ) ) ,
125
- Err ( e) => Err ( vec ! [ e] ) ,
126
- } ,
127
- Err ( errors) => Err ( errors) ,
128
- }
111
+ . collect ( )
129
112
}
130
113
131
114
/// Renders a template using convenient internally-managed buffers, which requires a mutable reference to self.
0 commit comments