@@ -26,7 +26,7 @@ JsonStream::~JsonStream() {
26
26
#endif
27
27
}
28
28
29
-
29
+ /* // this implementation is untested and unlikely to work
30
30
void JsonStream::set_new_buffer(const char *buf, size_t len) {
31
31
#ifdef SIMDJSON_THREADS_ENABLED
32
32
if(stage_1_thread.joinable()) {
@@ -35,41 +35,40 @@ void JsonStream::set_new_buffer(const char *buf, size_t len) {
35
35
#endif
36
36
this->_buf = buf;
37
37
this->_len = len;
38
- _batch_size = 0 ;
39
- _batch_size = 0 ;
38
+ _batch_size = 0; // why zero?
39
+ _batch_size = 0; // waat??
40
40
next_json = 0;
41
41
current_buffer_loc = 0;
42
42
n_parsed_docs = 0;
43
- error_on_last_attempt= false ;
44
43
load_next_batch = true;
45
- }
44
+ }*/
46
45
47
- // todo: this code is too complicated, it should be greatly simplified
46
+
47
+ #ifdef SIMDJSON_THREADS_ENABLED
48
+
49
+ // threaded version of json_parse
50
+ // todo: simplify this code further
48
51
int JsonStream::json_parse (ParsedJson &pj) {
49
- if (pj.byte_capacity == 0 ) {
52
+ if (unlikely ( pj.byte_capacity == 0 ) ) {
50
53
const bool allocok = pj.allocate_capacity (_batch_size);
51
- const bool allocok_thread = pj_thread.allocate_capacity (_batch_size);
52
- if (!allocok || !allocok_thread) {
53
- std::cerr << " can't allocate memory" << std::endl;
54
- return false ;
54
+ if (!allocok) {
55
+ pj.error_code = simdjson::MEMALLOC;
56
+ return pj.error_code ;
55
57
}
58
+ } else if (unlikely (pj.byte_capacity < _batch_size)) {
59
+ pj.error_code = simdjson::CAPACITY;
60
+ return pj.error_code ;
56
61
}
57
- else if (pj .byte_capacity < _batch_size) {
58
- return simdjson::CAPACITY ;
59
- }
60
- # ifdef SIMDJSON_THREADS_ENABLED
61
- if (current_buffer_loc == last_json_buffer_loc) {
62
- load_next_batch = true ;
62
+ if ( unlikely (pj_thread .byte_capacity < _batch_size) ) {
63
+ const bool allocok_thread = pj_thread. allocate_capacity (_batch_size) ;
64
+ if (!allocok_thread) {
65
+ pj. error_code = simdjson::MEMALLOC;
66
+ return pj. error_code ;
67
+ }
63
68
}
64
- #endif
65
-
66
- if (load_next_batch) {
67
- #ifdef SIMDJSON_THREADS_ENABLED
69
+ if (unlikely (load_next_batch)) {
68
70
// First time loading
69
71
if (!stage_1_thread.joinable ()) {
70
- _buf = _buf + current_buffer_loc;
71
- _len -= current_buffer_loc;
72
- n_bytes_parsed += current_buffer_loc;
73
72
_batch_size = std::min (_batch_size, _len);
74
73
_batch_size = trimmed_length_safe_utf8 ((const char *)_buf, _batch_size);
75
74
if (_batch_size == 0 ) {
@@ -100,8 +99,8 @@ int JsonStream::json_parse(ParsedJson &pj) {
100
99
_buf = _buf + last_json_buffer_loc;
101
100
_len -= last_json_buffer_loc;
102
101
n_bytes_parsed += last_json_buffer_loc;
103
- last_json_buffer_loc = 0 ; // because we want to use it in the if above.
104
102
}
103
+ // let us decide whether we will start a new thread
105
104
if (_len - _batch_size > 0 ) {
106
105
last_json_buffer_loc = pj.structural_indexes [find_last_json_buf_idx (_buf,_batch_size,pj)];
107
106
_batch_size = std::min (_batch_size, _len - last_json_buffer_loc);
@@ -122,15 +121,43 @@ int JsonStream::json_parse(ParsedJson &pj) {
122
121
});
123
122
}
124
123
}
124
+ next_json = 0 ;
125
+ load_next_batch = false ;
126
+ } // load_next_batch
127
+ int res = best_stage2 (_buf, _len, pj, next_json);
128
+ if (res == simdjson::SUCCESS_AND_HAS_MORE) {
129
+ n_parsed_docs++;
130
+ current_buffer_loc = pj.structural_indexes [next_json];
131
+ load_next_batch = (current_buffer_loc == last_json_buffer_loc);
132
+ } else if (res == simdjson::SUCCESS) {
133
+ n_parsed_docs++;
134
+ if (_len > _batch_size) {
135
+ current_buffer_loc = pj.structural_indexes [next_json - 1 ];
136
+ load_next_batch = true ;
137
+ res = simdjson::SUCCESS_AND_HAS_MORE;
138
+ }
139
+ }
140
+ return res;
141
+ }
142
+
143
+ #else // SIMDJSON_THREADS_ENABLED
125
144
126
- // If we loaded a perfect amount of documents last time, we need to skip the first element,
127
- // because it represents the end of the last document
128
- next_json = next_json == 1 ;
129
- #else
145
+ // single-threaded version of json_parse
146
+ int JsonStream::json_parse (ParsedJson &pj) {
147
+ if (unlikely (pj.byte_capacity == 0 )) {
148
+ const bool allocok = pj.allocate_capacity (_batch_size);
149
+ if (!allocok) {
150
+ pj.error_code = simdjson::MEMALLOC;
151
+ return pj.error_code ;
152
+ }
153
+ } else if (unlikely (pj.byte_capacity < _batch_size)) {
154
+ pj.error_code = simdjson::CAPACITY;
155
+ return pj.error_code ;
156
+ }
157
+ if (unlikely (load_next_batch)) {
130
158
_buf = _buf + current_buffer_loc;
131
159
_len -= current_buffer_loc;
132
160
n_bytes_parsed += current_buffer_loc;
133
-
134
161
_batch_size = std::min (_batch_size, _len);
135
162
_batch_size = trimmed_length_safe_utf8 ((const char *)_buf, _batch_size);
136
163
int stage1_is_ok = best_stage1 (_buf, _batch_size, pj, true );
@@ -144,51 +171,27 @@ int JsonStream::json_parse(ParsedJson &pj) {
144
171
return pj.error_code ;
145
172
}
146
173
pj.n_structural_indexes = last_index + 1 ;
147
- #endif
148
174
load_next_batch = false ;
149
-
150
- }
151
- // #define SIMDJSON_IREALLYNEEDHELP
152
- #ifdef SIMDJSON_IREALLYNEEDHELP // for debugging
153
- size_t oldnext_json = next_json;
154
- #endif
175
+ } // load_next_batch
155
176
int res = best_stage2 (_buf, _len, pj, next_json);
156
- #ifdef SIMDJSON_IREALLYNEEDHELP // for debugging
157
- int sizeofdoc = pj.structural_indexes [next_json]-pj.structural_indexes [oldnext_json];
158
- printf (" size = %d\n " , sizeofdoc);
159
- if (sizeofdoc > 0 ) {
160
- printf (" %.*s\n " ,sizeofdoc, _buf + pj.structural_indexes [oldnext_json]);
161
- } else {
162
- printf (" <empty>\n " );
163
- }
164
- #endif
165
-
166
- if (res == simdjson::SUCCESS_AND_HAS_MORE) {
167
- error_on_last_attempt = false ;
177
+ if (likely (res == simdjson::SUCCESS_AND_HAS_MORE)) {
168
178
n_parsed_docs++;
169
179
current_buffer_loc = pj.structural_indexes [next_json];
170
180
} else if (res == simdjson::SUCCESS) {
171
- error_on_last_attempt = false ;
172
181
n_parsed_docs++;
173
182
if (_len > _batch_size) {
174
183
current_buffer_loc = pj.structural_indexes [next_json - 1 ];
175
- #ifndef SIMDJSON_THREADS_ENABLED
176
184
next_json = 1 ;
177
- #endif
178
185
load_next_batch = true ;
179
186
res = simdjson::SUCCESS_AND_HAS_MORE;
180
187
}
181
188
}
182
- // We assume the error is because the json was not loaded completely in this batch.
183
- // Load a new batch and if the error persists, it's a genuine error.
184
- else if (!error_on_last_attempt) {
185
- load_next_batch = true ;
186
- error_on_last_attempt = true ;
187
- res = json_parse (pj);
188
- }
189
189
return res;
190
190
}
191
191
192
+ #endif // SIMDJSON_THREADS_ENABLED
193
+
194
+
192
195
size_t JsonStream::get_current_buffer_loc () const {
193
196
return current_buffer_loc;
194
197
}
0 commit comments