|
261 | 261 | (fn [tuple-batch sequence-id end-of-batch?]
|
262 | 262 | ;;(log-debug "Processing message " msg)
|
263 | 263 | (doseq [[task-id msg] tuple-batch]
|
264 |
| - (let [^Tuple tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))] |
| 264 | + (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))] |
265 | 265 | (tuple-action-fn task-id tuple)
|
266 | 266 | )))))
|
267 | 267 |
|
|
290 | 290 | (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
|
291 | 291 | (.add event-queue #(fail-spout-msg executor-data (task-datas task-id) spout-id tuple-info time-delta)))
|
292 | 292 | )))
|
293 |
| - tuple-action-fn (fn [task-id ^Tuple tuple] |
| 293 | + tuple-action-fn (fn [task-id ^TupleImpl tuple] |
294 | 294 | (let [id (.getValue tuple 0)
|
295 | 295 | [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
|
296 | 296 | (when spout-id
|
|
320 | 320 | (let [tuple-id (if rooted?
|
321 | 321 | (MessageId/makeRootId root-id id)
|
322 | 322 | (MessageId/makeUnanchored))]
|
323 |
| - (Tuple. worker-context |
324 |
| - values |
325 |
| - task-id |
326 |
| - out-stream-id |
327 |
| - tuple-id)))] |
| 323 | + (TupleImpl. worker-context |
| 324 | + values |
| 325 | + task-id |
| 326 | + out-stream-id |
| 327 | + tuple-id)))] |
328 | 328 | (dorun
|
329 | 329 | (map transfer-fn out-tasks out-tuples))
|
330 | 330 | (if rooted?
|
|
386 | 386 | ]
|
387 | 387 | ))
|
388 | 388 |
|
389 |
| -(defn- tuple-time-delta! [^Map start-times ^Tuple tuple] |
390 |
| - (let [ms (.remove start-times tuple)] |
| 389 | +(defn- tuple-time-delta! [^TupleImpl tuple] |
| 390 | + (let [ms (.getSampleStartTime tuple)] |
391 | 391 | (if ms
|
392 | 392 | (time-delta-ms ms))))
|
393 | 393 |
|
394 | 394 | (defn put-xor! [^Map pending key id]
|
395 | 395 | (let [curr (or (.get pending key) (long 0))]
|
396 |
| - ;; TODO: this portion is not thread safe (multiple threads updating same value at same time) |
397 | 396 | (.put pending key (bit-xor curr id))))
|
398 | 397 |
|
399 | 398 | (defmethod mk-threads :bolt [executor-data task-datas]
|
400 | 399 | (let [component-id (:component-id executor-data)
|
401 |
| - tuple-start-times (HashMap.) |
402 | 400 | transfer-fn (:transfer-fn executor-data)
|
403 | 401 | worker-context (:worker-context executor-data)
|
404 | 402 | storm-conf (:storm-conf executor-data)
|
405 | 403 | executor-stats (:stats executor-data)
|
406 |
| - pending-acks (HashMap.) |
407 | 404 | report-error-fn (:report-error-fn executor-data)
|
408 | 405 | sampler (:sampler executor-data)
|
409 |
| - tuple-action-fn (fn [task-id ^Tuple tuple] |
| 406 | + tuple-action-fn (fn [task-id ^TupleImpl tuple] |
410 | 407 | ;; synchronization needs to be done with a key provided by this bolt, otherwise:
|
411 | 408 | ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
|
412 | 409 | ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
|
|
425 | 422 | ;; need to do it this way to avoid reflection
|
426 | 423 | (let [^IBolt bolt-obj (-> task-id task-datas :object)]
|
427 | 424 | (when (sampler)
|
428 |
| - (.put tuple-start-times tuple (System/currentTimeMillis))) |
| 425 | + (.setSampleStartTime tuple (System/currentTimeMillis))) |
429 | 426 | (.execute bolt-obj tuple)))]
|
430 | 427 | (log-message "Preparing bolt " component-id ":" (keys task-datas))
|
431 | 428 | (doseq [[task-id task-data] task-datas
|
|
438 | 435 | (tasks-fn stream values))]
|
439 | 436 | (doseq [t out-tasks
|
440 | 437 | :let [anchors-to-ids (HashMap.)]]
|
441 |
| - (doseq [^Tuple a anchors |
| 438 | + (doseq [^TupleImpl a anchors |
442 | 439 | :let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]]
|
443 | 440 | (when (pos? (count root-ids))
|
444 | 441 | (let [edge-id (MessageId/generateId)]
|
445 |
| - (put-xor! pending-acks a edge-id) |
| 442 | + (.updateAckVal a edge-id) |
446 | 443 | (doseq [root-id root-ids]
|
447 | 444 | (put-xor! anchors-to-ids root-id edge-id))
|
448 | 445 | )))
|
449 | 446 | (transfer-fn t
|
450 |
| - (Tuple. worker-context |
451 |
| - values |
452 |
| - task-id |
453 |
| - stream |
454 |
| - (MessageId/makeId anchors-to-ids)))) |
| 447 | + (TupleImpl. worker-context |
| 448 | + values |
| 449 | + task-id |
| 450 | + stream |
| 451 | + (MessageId/makeId anchors-to-ids)))) |
455 | 452 | (or out-tasks [])))]]
|
456 | 453 | (.prepare bolt-obj
|
457 | 454 | storm-conf
|
|
463 | 460 | (emitDirect [this task stream anchors values]
|
464 | 461 | (bolt-emit stream anchors values task))
|
465 | 462 | (^void ack [this ^Tuple tuple]
|
466 |
| - (let [ack-val (or (.remove pending-acks tuple) (long 0))] |
| 463 | + (let [^TupleImpl tuple tuple |
| 464 | + ack-val (.getAckVal tuple)] |
467 | 465 | (doseq [[root id] (.. tuple getMessageId getAnchorsToIds)]
|
468 | 466 | (task/send-unanchored task-data
|
469 | 467 | ACKER-ACK-STREAM-ID
|
470 | 468 | [root (bit-xor id ack-val)])
|
471 | 469 | ))
|
472 |
| - (let [delta (tuple-time-delta! tuple-start-times tuple)] |
| 470 | + (let [delta (tuple-time-delta! tuple)] |
473 | 471 | (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple delta))
|
474 | 472 | (when delta
|
475 | 473 | (stats/bolt-acked-tuple! executor-stats
|
|
478 | 476 | delta)
|
479 | 477 | )))
|
480 | 478 | (^void fail [this ^Tuple tuple]
|
481 |
| - (.remove pending-acks tuple) |
482 | 479 | (doseq [root (.. tuple getMessageId getAnchors)]
|
483 | 480 | (task/send-unanchored task-data
|
484 | 481 | ACKER-FAIL-STREAM-ID
|
485 | 482 | [root]))
|
486 |
| - (let [delta (tuple-time-delta! tuple-start-times tuple) |
| 483 | + (let [delta (tuple-time-delta! tuple) |
487 | 484 | ]
|
488 | 485 | (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple delta))
|
489 | 486 | (when delta
|
|
0 commit comments