Skip to content

Commit 951c4be

Browse files
authored
Simpler jsonstream (simdjson#436)
* One simplification. * Removing untested functions.
1 parent 9842e1f commit 951c4be

File tree

2 files changed

+68
-64
lines changed

2 files changed

+68
-64
lines changed

include/simdjson/jsonstream.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,14 @@ namespace simdjson {
9898
/* Sets a new buffer for this JsonStream. Will also reinitialize all the variables,
9999
* which acts as a reset. A new JsonStream without initializing again.
100100
* */
101-
void set_new_buffer(const char *buf, size_t len);
101+
// todo: implement and test this function, note that _batch_size is mutable
102+
// void set_new_buffer(const char *buf, size_t len);
102103

103104
/* Sets a new buffer for this JsonStream. Will also reinitialize all the variables,
104105
* which is basically a reset. A new JsonStream without initializing again.
105106
* */
106-
void set_new_buffer(const std::string &s) { set_new_buffer(s.data(), s.size()); }
107+
// todo: implement and test this function, note that _batch_size is mutable
108+
// void set_new_buffer(const std::string &s) { set_new_buffer(s.data(), s.size()); }
107109

108110
/* Returns the location (index) of where the next document should be in the buffer.
109111
* Can be used for debugging, it tells the user the position of the end of the last
@@ -123,18 +125,17 @@ namespace simdjson {
123125
size_t _len;
124126
size_t _batch_size;
125127
size_t next_json{0};
126-
bool error_on_last_attempt{false};
127128
bool load_next_batch{true};
128129
size_t current_buffer_loc{0};
129130
size_t last_json_buffer_loc{0};
130131
size_t n_parsed_docs{0};
131132
size_t n_bytes_parsed{0};
132133
#ifdef SIMDJSON_THREADS_ENABLED
133134
int stage1_is_ok_thread{0};
134-
#endif
135-
136135
std::thread stage_1_thread;
137136
simdjson::ParsedJson pj_thread;
137+
#endif
138+
138139
};
139140

140141

src/jsonstream.cpp

Lines changed: 62 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ JsonStream::~JsonStream() {
2626
#endif
2727
}
2828

29-
29+
/* // this implementation is untested and unlikely to work
3030
void JsonStream::set_new_buffer(const char *buf, size_t len) {
3131
#ifdef SIMDJSON_THREADS_ENABLED
3232
if(stage_1_thread.joinable()) {
@@ -35,41 +35,40 @@ void JsonStream::set_new_buffer(const char *buf, size_t len) {
3535
#endif
3636
this->_buf = buf;
3737
this->_len = len;
38-
_batch_size = 0;
39-
_batch_size = 0;
38+
_batch_size = 0; // why zero?
39+
_batch_size = 0; // waat??
4040
next_json = 0;
4141
current_buffer_loc = 0;
4242
n_parsed_docs = 0;
43-
error_on_last_attempt= false;
4443
load_next_batch = true;
45-
}
44+
}*/
4645

47-
// todo: this code is too complicated, it should be greatly simplified
46+
47+
#ifdef SIMDJSON_THREADS_ENABLED
48+
49+
// threaded version of json_parse
50+
// todo: simplify this code further
4851
int JsonStream::json_parse(ParsedJson &pj) {
49-
if (pj.byte_capacity == 0) {
52+
if (unlikely(pj.byte_capacity == 0)) {
5053
const bool allocok = pj.allocate_capacity(_batch_size);
51-
const bool allocok_thread = pj_thread.allocate_capacity(_batch_size);
52-
if (!allocok || !allocok_thread) {
53-
std::cerr << "can't allocate memory" << std::endl;
54-
return false;
54+
if (!allocok) {
55+
pj.error_code = simdjson::MEMALLOC;
56+
return pj.error_code;
5557
}
58+
} else if (unlikely(pj.byte_capacity < _batch_size)) {
59+
pj.error_code = simdjson::CAPACITY;
60+
return pj.error_code;
5661
}
57-
else if (pj.byte_capacity < _batch_size) {
58-
return simdjson::CAPACITY;
59-
}
60-
#ifdef SIMDJSON_THREADS_ENABLED
61-
if(current_buffer_loc == last_json_buffer_loc) {
62-
load_next_batch = true;
62+
if(unlikely(pj_thread.byte_capacity < _batch_size)) {
63+
const bool allocok_thread = pj_thread.allocate_capacity(_batch_size);
64+
if (!allocok_thread) {
65+
pj.error_code = simdjson::MEMALLOC;
66+
return pj.error_code;
67+
}
6368
}
64-
#endif
65-
66-
if (load_next_batch) {
67-
#ifdef SIMDJSON_THREADS_ENABLED
69+
if (unlikely(load_next_batch)) {
6870
//First time loading
6971
if(!stage_1_thread.joinable()) {
70-
_buf = _buf + current_buffer_loc;
71-
_len -= current_buffer_loc;
72-
n_bytes_parsed += current_buffer_loc;
7372
_batch_size = std::min(_batch_size, _len);
7473
_batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size);
7574
if(_batch_size == 0) {
@@ -100,8 +99,8 @@ int JsonStream::json_parse(ParsedJson &pj) {
10099
_buf = _buf + last_json_buffer_loc;
101100
_len -= last_json_buffer_loc;
102101
n_bytes_parsed += last_json_buffer_loc;
103-
last_json_buffer_loc = 0; //because we want to use it in the if above.
104102
}
103+
// let us decide whether we will start a new thread
105104
if(_len - _batch_size > 0) {
106105
last_json_buffer_loc = pj.structural_indexes[find_last_json_buf_idx(_buf,_batch_size,pj)];
107106
_batch_size = std::min(_batch_size, _len - last_json_buffer_loc);
@@ -122,15 +121,43 @@ int JsonStream::json_parse(ParsedJson &pj) {
122121
});
123122
}
124123
}
124+
next_json = 0;
125+
load_next_batch = false;
126+
} // load_next_batch
127+
int res = best_stage2(_buf, _len, pj, next_json);
128+
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
129+
n_parsed_docs++;
130+
current_buffer_loc = pj.structural_indexes[next_json];
131+
load_next_batch = (current_buffer_loc == last_json_buffer_loc);
132+
} else if (res == simdjson::SUCCESS) {
133+
n_parsed_docs++;
134+
if(_len > _batch_size) {
135+
current_buffer_loc = pj.structural_indexes[next_json - 1];
136+
load_next_batch = true;
137+
res = simdjson::SUCCESS_AND_HAS_MORE;
138+
}
139+
}
140+
return res;
141+
}
142+
143+
#else // SIMDJSON_THREADS_ENABLED
125144

126-
//If we loaded a perfect amount of documents last time, we need to skip the first element,
127-
// because it represents the end of the last document
128-
next_json = next_json == 1;
129-
#else
145+
// single-threaded version of json_parse
146+
int JsonStream::json_parse(ParsedJson &pj) {
147+
if (unlikely(pj.byte_capacity == 0)) {
148+
const bool allocok = pj.allocate_capacity(_batch_size);
149+
if (!allocok) {
150+
pj.error_code = simdjson::MEMALLOC;
151+
return pj.error_code;
152+
}
153+
} else if (unlikely(pj.byte_capacity < _batch_size)) {
154+
pj.error_code = simdjson::CAPACITY;
155+
return pj.error_code;
156+
}
157+
if (unlikely(load_next_batch)) {
130158
_buf = _buf + current_buffer_loc;
131159
_len -= current_buffer_loc;
132160
n_bytes_parsed += current_buffer_loc;
133-
134161
_batch_size = std::min(_batch_size, _len);
135162
_batch_size = trimmed_length_safe_utf8((const char*)_buf, _batch_size);
136163
int stage1_is_ok = best_stage1(_buf, _batch_size, pj, true);
@@ -144,51 +171,27 @@ int JsonStream::json_parse(ParsedJson &pj) {
144171
return pj.error_code;
145172
}
146173
pj.n_structural_indexes = last_index + 1;
147-
#endif
148174
load_next_batch = false;
149-
150-
}
151-
//#define SIMDJSON_IREALLYNEEDHELP
152-
#ifdef SIMDJSON_IREALLYNEEDHELP // for debugging
153-
size_t oldnext_json = next_json;
154-
#endif
175+
} // load_next_batch
155176
int res = best_stage2(_buf, _len, pj, next_json);
156-
#ifdef SIMDJSON_IREALLYNEEDHELP // for debugging
157-
int sizeofdoc = pj.structural_indexes[next_json]-pj.structural_indexes[oldnext_json];
158-
printf("size = %d\n", sizeofdoc);
159-
if(sizeofdoc > 0) {
160-
printf("%.*s\n",sizeofdoc, _buf + pj.structural_indexes[oldnext_json]);
161-
} else {
162-
printf("<empty>\n");
163-
}
164-
#endif
165-
166-
if (res == simdjson::SUCCESS_AND_HAS_MORE) {
167-
error_on_last_attempt = false;
177+
if (likely(res == simdjson::SUCCESS_AND_HAS_MORE)) {
168178
n_parsed_docs++;
169179
current_buffer_loc = pj.structural_indexes[next_json];
170180
} else if (res == simdjson::SUCCESS) {
171-
error_on_last_attempt = false;
172181
n_parsed_docs++;
173182
if(_len > _batch_size) {
174183
current_buffer_loc = pj.structural_indexes[next_json - 1];
175-
#ifndef SIMDJSON_THREADS_ENABLED
176184
next_json = 1;
177-
#endif
178185
load_next_batch = true;
179186
res = simdjson::SUCCESS_AND_HAS_MORE;
180187
}
181188
}
182-
// We assume the error is because the json was not loaded completely in this batch.
183-
// Load a new batch and if the error persists, it's a genuine error.
184-
else if(!error_on_last_attempt) {
185-
load_next_batch = true;
186-
error_on_last_attempt = true;
187-
res = json_parse(pj);
188-
}
189189
return res;
190190
}
191191

192+
#endif // SIMDJSON_THREADS_ENABLED
193+
194+
192195
size_t JsonStream::get_current_buffer_loc() const {
193196
return current_buffer_loc;
194197
}

0 commit comments

Comments
 (0)