#pragma once #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include #include #include #include #include #include #include #include #include #include #include namespace node { // Represents a sequenced collection of data sources that can be // consumed as a single logical stream of data. Sources can be // memory-resident or streaming. // // There are two essential kinds of DataQueue: // // * Idempotent - Multiple reads always produce the same result. // This is even the case if individual sources // are not memory-resident. Reads never change // the state of the DataQueue. Every entry in // an Idempotent DataQueue must also be idempotent. // // * Non-idempotent - Reads are destructive of the internal state. // A non-idempotent DataQueue can be read at // most once and only by a single reader. // Entries in a non-idempotent DataQueue can // be a mix of idempotent and non-idempotent // entries. // // The DataQueue is essentially a collection of DataQueue::Entry // instances. A DataQueue::Entry is a single logical source of // data. The data may be memory-resident or streaming. The entry // can be idempotent or non-idempotent. An entry cannot be read // by itself, it must be part of a DataQueue to be consumed. // // Example of creating an idempotent DataQueue: // // std::shared_ptr store1 = getBackingStoreSomehow(); // std::shared_ptr store2 = getBackingStoreSomehow(); // // std::vector> list; // list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( // store1, 0, len1)); // list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore( // store2, 0, len2)); // // std::shared_ptr data_queue = // DataQueue::CreateIdempotent(std::move(list)); // // Importantly, idempotent DataQueue's are immutable and all entries // must be provided when the DataQueue is constructed. Every entry // must be idempotent with known sizes. The entries may be memory // resident or streaming. Streaming entries must be capable of // being read multiple times. // // Because idempotent DataQueue's will always produce the same results // when read, they can be sliced. Slices yield a new DataQueue instance // that is a subset view over the original: // // std::shared_ptr slice = data_queue.slice( // 5, v8::Just(10UL)); // // Example of creating a non-idempotent DataQueue: // // std::shared_ptr store1 = getBackingStoreSomehow(); // std::shared_ptr store2 = getBackingStoreSomehow(); // // std::shared_ptr data_queue = DataQueue::Create(); // // data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore( // store1, 0, len1)); // // data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore( // store2, 0, len2)); // // These data-queues can have new entries appended to them. Entries can // be memory-resident or streaming. Streaming entries might not have // a known size. Entries may not be capable of being read multiple // times. // // A non-idempotent data queue will, by default, allow any amount of // entries to be appended to it. To limit the size of the DataQueue, // or the close the DataQueue (preventing new entries from being // appending), use the cap() method. The DataQueue can be capped // at a specific size or whatever size it currently it. // // It might not be possible for a non-idempotent DataQueue to provide // a size because it might not know how much data a streaming entry // will ultimately provide. // // Non-idempotent DataQueues cannot be sliced. // // To read from a DataQueue, we use the node::bob::Source API // (see src/node_bob.h). // // std::shared_ptr reader = data_queue->get_reader(); // // reader->Pull( // [](int status, const DataQueue::Vec* vecs, // uint64_t count, Done done) { // // status is one of node::bob::Status // // vecs is zero or more data buffers containing the read data // // count is the number of vecs // // done is a callback to be invoked when done processing the data // }, options, nullptr, 0, 16); // // Keep calling Pull() until status is equal to node::bob::Status::STATUS_EOS. // // For idempotent DataQueues, any number of readers can be created and // pull concurrently from the same DataQueue. The DataQueue can be read // multiple times. Successful reads should always produce the same result. // If, for whatever reason, the implementation cannot ensure that the // data read will remain the same, the read must fail with an error status. // // For non-idempotent DataQueues, only a single reader is ever allowed for // the DataQueue, and the data can only ever be read once. class DataQueue : public MemoryRetainer { public: struct Vec { uint8_t* base; uint64_t len; }; // A DataQueue::Reader consumes the DataQueue. If the data queue is // idempotent, multiple Readers can be attached to the DataQueue at // any given time, all guaranteed to yield the same result when the // data is read. Otherwise, only a single Reader can be attached. class Reader : public MemoryRetainer, public bob::Source { public: using Next = bob::Next; using Done = bob::Done; }; // A BackpressureListener can be used to receive notifications // when a non-idempotent DataQueue releases entries as they // are consumed. class BackpressureListener { public: virtual void EntryRead(size_t amount) = 0; }; // A DataQueue::Entry represents a logical chunk of data in the queue. // The entry may or may not represent memory-resident data. It may // or may not be consumable more than once. class Entry : public MemoryRetainer { public: // Returns a new Entry that is a view over this entries data // from the start offset to the ending offset. If the end // offset is omitted, the slice extends to the end of the // data. // // Creating a slice is only possible if is_idempotent() returns // true. This is because consuming either the original entry or // the new entry would change the state of the other in non- // deterministic ways. When is_idempotent() returns false, slice() // must return a nulled unique_ptr. // // Creating a slice is also only possible if the size of the // entry is known. If size() returns std::nullopt, slice() // must return a nulled unique_ptr. virtual std::unique_ptr slice( uint64_t start, std::optional end = std::nullopt) = 0; // Returns the number of bytes represented by this Entry if it is // known. Certain types of entries, such as those backed by streams // might not know the size in advance and therefore cannot provide // a value. In such cases, size() must return v8::Nothing. // // If the entry is idempotent, a size should always be available. virtual std::optional size() const = 0; // When true, multiple reads on the object must produce the exact // same data or the reads will fail. Some sources of entry data, // such as streams, may not be capable of preserving idempotency // and therefore must not claim to be. If an entry claims to be // idempotent and cannot preserve that quality, subsequent reads // must fail with an error when a variance is detected. virtual bool is_idempotent() const = 0; }; // Creates an idempotent DataQueue with a pre-established collection // of entries. All of the entries must also be idempotent otherwise // an empty std::unique_ptr will be returned. static std::shared_ptr CreateIdempotent( std::vector> list); // Creates a non-idempotent DataQueue. This kind of queue can be // mutated and updated such that multiple reads are not guaranteed // to produce the same result. The entries added can be of any type. static std::shared_ptr Create( std::optional capped = std::nullopt); // Creates an idempotent Entry from a v8::ArrayBufferView. To help // ensure idempotency, the underlying ArrayBuffer is detached from // the BackingStore. It is the callers responsibility to ensure that // the BackingStore is not otherwise modified through any other // means. If the ArrayBuffer is not detachable, nullptr will be // returned. static std::unique_ptr CreateInMemoryEntryFromView( v8::Local view); // Creates an idempotent Entry from a v8::BackingStore. It is the // callers responsibility to ensure that the BackingStore is not // otherwise modified through any other means. If the ArrayBuffer // is not detachable, nullptr will be returned. static std::unique_ptr CreateInMemoryEntryFromBackingStore( std::shared_ptr store, uint64_t offset, uint64_t length); static std::unique_ptr CreateDataQueueEntry( std::shared_ptr data_queue); static std::unique_ptr CreateFdEntry(Environment* env, v8::Local path); // Creates a Reader for the given queue. If the queue is idempotent, // any number of readers can be created, all of which are guaranteed // to provide the same data. Otherwise, only a single reader is // permitted. virtual std::shared_ptr get_reader() = 0; // Append a single new entry to the queue. Appending is only allowed // when is_idempotent() is false. std::nullopt will be returned // if is_idempotent() is true. std::optional(false) will be returned if the // data queue is not idempotent but the entry otherwise cannot be added. virtual std::optional append(std::unique_ptr entry) = 0; // Caps the size of this DataQueue preventing additional entries to // be added if those cause the size to extend beyond the specified // limit. // // If limit is zero, or is less than the known current size of the // data queue, the limit is set to the current known size, meaning // that no additional entries can be added at all. // // If the size of the data queue is not known, the limit will be // ignored and no additional entries will be allowed at all. // // If is_idempotent is true capping is unnecessary because the data // queue cannot be appended to. In that case, cap() is a non-op. // // If the data queue has already been capped, cap can be called // again with a smaller size. virtual void cap(uint64_t limit = 0) = 0; // Returns a new DataQueue that is a view over this queues data // from the start offset to the ending offset. If the end offset // is omitted, the slice extends to the end of the data. // // The slice will coverage a range from start up to, but excluding, end. // // Creating a slice is only possible if is_idempotent() returns // true. This is because consuming either the original DataQueue or // the new queue would change the state of the other in non- // deterministic ways. When is_idempotent() returns false, slice() // must return a nulled unique_ptr. // // Creating a slice is also only possible if the size of the // DataQueue is known. If size() returns std::nullopt, slice() // must return a null unique_ptr. virtual std::shared_ptr slice( uint64_t start, std::optional end = std::nullopt) = 0; // The size of DataQueue is the total size of all of its member entries. // If any of the entries is not able to specify a size, the DataQueue // will also be incapable of doing so, in which case size() must return // std::nullopt. virtual std::optional size() const = 0; // A DataQueue is idempotent only if all of its member entries are // idempotent. virtual bool is_idempotent() const = 0; // True only if cap is called or the data queue is a limited to a // fixed size. virtual bool is_capped() const = 0; // If the data queue has been capped, and the size of the data queue // is known, maybeCapRemaining will return the number of additional // bytes the data queue can receive before reaching the cap limit. // If the size of the queue cannot be known, or the cap has not // been set, maybeCapRemaining() will return std::nullopt. virtual std::optional maybeCapRemaining() const = 0; // BackpressureListeners only work on non-idempotent DataQueues. virtual void addBackpressureListener(BackpressureListener* listener) = 0; virtual void removeBackpressureListener(BackpressureListener* listener) = 0; static void Initialize(Environment* env, v8::Local target); static void RegisterExternalReferences(ExternalReferenceRegistry* registry); }; } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS