Darwin Scale in Cidr22
Darwin Scale in Cidr22
Darwin Scale in Cidr22
Table 1: Feature set of existing SPEs with regard to resource efficiency and state management.
scale-out SPEs rely on slow secondary storage for this, further 1) We propose scale-in stream processing, a new paradigm that
decreasing overall system performance. Recent developments in adapts to varying application demands by achieving high hard-
storage technology significantly improve the performance of per- ware utilization on a wide range of hardware setups, reducing
sistent storage devices, allowing us to reduce the gap between overall infrastructure requirements.
high-performance and persistent state in SPEs. Persistent Memory 2) We present Darwin, a scale-in SPE prototype that optimizes for
(PMem) offers byte-addressability at close-to-DRAM speed with high overall hardware utilization while supporting recoverable
SSD-like capacity [7, 24] and modern NVMe SSDs achieve up to larger-than-memory state.
7 GB/s and two million random IOPS [13]. Efficiently incorporating
these technologies into streaming applications has the potential to The remainder of this paper is structured as follows. In Section 2,
radically shift the way SPEs interact with persistent state. we present current challenges in SPEs. In Section 3, we present
Following both efficient hardware utilization and durable state scale-in processing and its opportunities. We introduce our scale-in
management, we observe that systems support either one or the SPE prototype Darwin in Section 4 before concluding in Section 5.
other, but not both. In this space, we identify three key challenges
current SPEs face, resource inefficiency, state management, and over-
all system optimization. Overcoming these challenges heavily im- 2 CURRENT SPE CHALLENGES
pacts SPE performance and constitutes an important step towards Recent work in stream processing focuses either on scale-up or
industry adoption and system maturity. scale-out concepts. Scale-up systems optimize for high system uti-
Based on these challenges, we propose scale-in stream process- lization while scale-out systems focus on application stability and
ing, a new paradigm that adapts to varying application demands by efficient large state management. Both areas show promising ad-
achieving high hardware utilization on a wide range of hardware vancements, but combining them has received little attention. In
setups, reducing overall infrastructure requirements. In contrast to Section 2.1, we compare nine SPEs to see how they offer resource ef-
scaling-up or -out, it focuses on fully utilizing the given hardware ficiency and state management. From this comparison, we observe
instead of demanding more or ever-larger servers. Scale-in process- that numerous challenges remain in the intersection of scale-up
ing combines scale-out and scale-up concepts to efficiently process and scale-out systems. In Sections 2.2 to 2.4, we present three ma-
streaming data without sacrificing larger-than-memory state, crash jor challenges that current SPEs face, resource inefficiency, state
recovery. To scale-in, we adapt common scale-up approaches that management, and overall system optimization.
optimize for the underlying hardware and common scale-out ap-
proaches that enable large state management. On the one hand, this
allows scale-in SPEs to support large-scale processing when raw 2.1 Focus of Existing Systems
processing power is needed. On the other hand, when data volumes In this section, we briefly discuss the feature sets of common SPEs
are low and infrastructure cost is more important than performance, with regard to resource efficiency and state management. We show
scaling-in reduces the hardware requirements, resulting in lower the comparison in Table 1. For most features, we observe a clear
infrastructure cost and operational complexity, while still offering distinction between scale-up and scale-out systems. While scale-out
key functionality such as crash recovery. Compared to the current systems support recoverable, larger-than-memory state with persis-
performance drop when switching from scale-up to scale-out SPEs, tent or distributed state management, all scale-up systems support
scale-in allows for graceful scaling when workloads exceed single only in-memory state without recovery. Instead, scale-up systems
server by optimizing for both single- and multi-node setups. offer optimizations for higher hardware resource utilization on a
To this end, we introduce Darwin, our scale-in SPE prototype. single server. These include query compilation, NUMA-awareness,
Darwin leverages query compilation and modern storage to fully lock-free data sharing, and GPU co-processing. Except for NUMA-
utilize the underlying hardware and handle large recoverable state. awareness, none of these are exclusive to large servers and are
In summary, we make the following contributions. applicable optimizations also in commodity machines. They rep-
resent a large area of improvement for scale-out systems, which
currently offer very little hardware optimization.
Darwin: Scale-In Stream Processing CIDR ’22, January 9–12, 2022, Chaminade, USA
Finally, all selected scale-out systems target the JVM with man- On the other hand, many widely used SPEs are written in high-
aged languages. The group of scale-up systems is not as homoge- level languages such as Java and Scala, targeting the JVM. Especially
neous, spanning managed languages and C++. The recent scale- memory-management has a high performance impact due to, e.g.,
up SPE Grizzly demonstrates large performance gains by using a garbage collection overhead. Also, common SPEs often do not opti-
system language such as C++, outperforming other scale-up and mize internal operators at the level known from databases. Overall,
JVM-based systems by orders of magnitude [11]. Thus, using a we observe a major gap between optimization levels in SPEs and
system-level language is highly advantageous to achieve high uti- database systems. With the increasing maturity of SPEs, reducing
lization when targeting the underlying hardware. this gap is essential to improve the performance of future applica-
Overall, we observe distinct characteristics for scale-up and scale- tions. Fortunately, many operations are similar in databases and
out systems. While none of the scale-up systems offer recoverable SPEs, allowing us to benefit from database optimization research.
state management, the scale-out systems neglect hardware opti-
mizations. To achieve high performance and resource efficiency,
future SPEs must focus on combining these features. Scaling-up 3 SCALE-IN STREAM PROCESSING
should not come at the price of data loss and scaling-out should To overcome the current challenges in stream processing and ac-
not come at the price of poor hardware utilization. knowledge the fact that real-world setups have drastically varying
performance and availability requirements, we propose scale-in pro-
cessing. Scale-in processing is a new paradigm that adapts to vary-
2.2 Resource Inefficiency
ing application demands by achieving high hardware utilization
Recent studies show that widely used scale-out SPEs do not fully on a wide range of hardware setups, reducing overall infrastruc-
utilize the underlying hardware [26, 27]. When designing future ture requirements. In contrast to scaling-up or -out, it focuses on
SPEs, overcoming this resource inefficiency has great potential to fully utilizing the given hardware instead of requiring more or ever-
reduce cost and improve performance. Higher resource utilization larger servers. We identify six opportunities for scale-in SPEs, based
leads to higher system performance, i.e., higher throughput or on the challenges discussed in Section 2. In Section 3.1, we discuss
lower latency. However, when processing smaller data volumes, how query compilation and specialized network communication
scalability is not the primary concern for many users. Efficient aid in overcoming general resource inefficiency. In Section 3.2, we
server use allows users to reduce the number of required servers present three opportunities to improve state management in scale-
while still satisfying their performance needs. This not only reduces in systems: emerging persistent storage media, crash recovery, and
infrastructure cost but also overall system complexity. streaming-specific access patterns. To increase the overall system
performance, we discuss CPU-aware optimizations in Section 3.3.
2.3 State Management To achieve high performance, scale-up systems commonly op-
Recent scale-up SPEs focus on maximizing hardware utilization timize for large high-end servers and scale-out SPEs commonly
through, e.g., query compilation [11], CPU-GPU co-processing [16], add more commodity servers. Solving performance limitations by
or NUMA-awareness [28]. Yet, none of these systems support larger- adding more machines results in neglected individual server perfor-
than-memory state or crash recovery, both important features for mance and poor resource utilization in scale-out systems. On the
industry adoption. We identify efficient state management as a other hand, current scale-up systems are confined to a single server
largely uninvestigated topic in scale-up SPEs compared to numer- and require ever-larger machines to overcome performance issues.
ous computational improvements. By combining both scale-up and scale-out concepts, scale-in SPEs
Common scale-out SPEs such as Apache Flink use persistent treat every machine as a server that requires optimization, even
state backends (e.g., RocksDB) to handle larger-than-memory state. if it contains only off-the-shelf components. To this end, scale-in
However, general-purpose key-value stores do not always fit stream- systems adapt their execution to the underlying hardware and the
specific state access patterns [14]. They treat state as a black box, specified queries. For large workloads, scale-in systems achieve the
while many streaming-specific patterns are known in advance. Also, raw performance known from scale-up systems and for medium
currently used general-purpose stores are not optimized for emerg- or small pipelines, they reduce hardware requirements while still
ing storage technology. Research on modern storage-aware systems offering key functionality such as crash recovery.
shows significant performance gains compared to traditional ap-
proaches [3, 17]. Storage-aware and streaming-specific state man-
3.1 Opportunities for Resource Inefficiency
agement presents a wide range of research challenges to improve
the overall performance of modern SPEs. In this section, we discuss how query compilation and network
communication aid in overcoming general resource inefficiency.
Query Compilation. At the core of scale-in processing, query
2.4 Overall System Optimization compilation allows for hardware-conscious optimization on each
Database systems show that optimizing the overall system brings server. This has many advantages, which are clearly demonstrated
large performance benefits. Databases are commonly implemented in previous work on SPEs [11] and database systems [21]. Compiling
in system languages such as C and C++, which compile to machine queries allows the compiler to optimize the execution for the given
code. They are highly tuned towards the underlying system for CPU without pre-compiling the runtime engine for all possible
maximum performance and offer, e.g., hardware-conscious joins systems. This enables compiler features such as auto-vectorization
and indexes, CPU-optimized scans, or NUMA-aware scheduling. and architecture-aware tuning without any development overhead.
CIDR ’22, January 9–12, 2022, Chaminade, USA Lawrence Benson and Tilmann Rabl
µs per Record
Duration in s
in-memory based on Nexmark Q3, containing an auction-like setup 8
6 9
with a join and aggregation. We first compile the query generi-
cally, i.e., no CPU-aware compiler flags. This represents the general 4 6
case in which we do not target the underlying hardware and use 2 3
a generic implementation for all servers. We then enable CPU- 0 0
INSERT GET Checkpoint Recovery
optimizations via -march=native. On an Intel Xeon Gold 5220S
Figure 1: Insert and get Figure 2: Checkpoint and
CPU, adding the CPU optimizations achieves a 12% higher through-
performance. recovery duration.
put without any changes to the code. This shows that even without
explicitly writing CPU-optimized code, automatically targeting the
underlying hardware is very beneficial.
Additionally, query compilation allows us to integrate query between slow secondary storage and fast volatile memory. Recent
information as compile-time information, enabling additional opti- work on PMem storage systems shows that persistency can be
mizations. A push-based execution model improves data locality achieved with less than 2× performance decrease [3]. Also, fast
and reduces the number of virtual method calls compared to in- NVMe SSDs are used in modern database systems to extend storage
terpreted execution by compiling the entire execution into a tight capacity while still offering close-to-DRAM performance [17].
loop [21]. Overall, query compilation allows us to tailor the execu- We show the performance of modern storage systems in Figure 1.
tion exactly to the given query, system, and user requirements. It is We choose Intel’s TBB concurrent hash map as a representative
the foundation for numerous other optimizations that we describe of in-memory state management, as it is used in recent scale-up
below, such as CPU optimization and state management. SPEs [11, 20]. We choose RocksDB as a representative of a classical
Network Communication. An important component of scale- generic byte-based key-value store, as it is used in Apache Flink.
out SPEs is network communication between the servers. Recent Finally, we choose Viper as a representative of modern storage-
work shows that the network constitutes a performance bottleneck aware key-value stores, which is based on a hybrid DRAM-PMem
and causes resource inefficiency on the servers [15]. In their study, index and log structure [3]. Viper stores records directly in a PMem
the authors show that scale-out systems like Flink reach network log without intermediate buffering in DRAM. To leverage the higher
limits long before they reach the actual saturating data rate. When random access performance of DRAM compared to PMem for index
performing a distributed aggregation across multiple nodes with 1 updates, its index is located in DRAM. This hybrid design achieves
GBit LAN, Flink sustains only 1.2 million events/s, which amount to 4× higher insert rates than PMem-only stores.
only 40 MB or 1/3 of the network bandwidth. While this constitutes We prefill 100 million 50 Byte records before measuring another
the main bottleneck in 1 GBit LAN, today even medium-sized cloud 100 million inserts/gets with 32 threads. Viper slightly outperforms
instances have 10 or more GBit/s network connections, matching TBB for inserts and is only ~2× slower for gets. We note that re-
or surpassing SSD storage bandwidth. Managing this bandwidth cent work shows TBB to not be the fastest concurrent in-memory
with techniques like late merging [26] to reduce data shuffling or storage system [6], but it is used in common scale-up SPEs [11, 20]
user-space networking to reduce TCP/IP overhead [18], enables and serves as an in-memory reference. The clear gap between
higher effective bandwidth utilization even for smaller workloads. Viper/TBB and RocksDB shows the major shift in persistent stor-
For large-scale workloads, advances in network technology dras- age performance that systems can leverage. Unlike existing scale-
tically improve cross-server communication performance via high out systems, storage-aware scale-in systems do not need to trade
bandwidth Infiniband and RDMA connections of up to 200 GBit/s performance for persistence. Fast storage enables both efficient
per network card [19]. Current research demonstrates that SPEs ben- recoverability and high overall throughput.
efit from RDMA for data ingestion [26] and that RDMA-based mes- Recovery. Considering server-local state is necessary when
sage passing achieves very high throughput with low latency [22]. restarting an application after a crash and highly beneficial for reg-
These two findings show that there is a large potential for opti- ular application restarts. Scale-up systems run on high-end servers
mizing SPEs through fast RDMA connections. Especially in combi- that contain hundreds of GBs of state. For specialized hardware,
nation with modern byte-addressable storage, such as PMem, this replacing the server is not always possible and transferring its state
opens new opportunities for more efficient checkpointing, state to another server quickly becomes a recovery bottleneck. In this
migration, and recovery approaches. case, state recovery must occur on the same server. Additionally,
current scale-out SPEs use server-local state when restarting an
application on the same node without a crash, e.g., when re-scaling
3.2 Opportunities for State Management or deploying a newer version. For both recovery and restarting, it
In this section, we present three opportunities to improve state man- is essential to have persistent state that outlives the application.
agement in scale-in systems: emerging persistent storage media, We show the advantage of using modern storage technology to
crash recovery, and streaming-specific access patterns. achieve efficient checkpointing and recovery in Figure 2. In this
Persistent Storage. Scale-out systems use persistent storage to microbenchmark, we store 200 million 50 Byte records, i.e., 10 GB
handle larger-than-memory state. This entails a large performance raw data, in three different storage instances. As representatives
decrease as storage access is significantly slower than DRAM access. of their respective system classes, we again use TBB, Viper, and
However, new persistent storage technology is closing the gap RocksDB. To persist TBB’s data, we store all entries in a tightly
Darwin: Scale-In Stream Processing CIDR ’22, January 9–12, 2022, Chaminade, USA
Operator Graph. Together with the config, the query plan is information removes serialization and deserialization overhead and
translated to an operator graph. We briefly describe the transla- allows the compiler to optimize data move instructions, e.g., by
tion based on the query plan shown in Figure 4. Starting from the issuing SIMD loads/stores instead of regular 8 Byte movs [3].
sink, each node recursively translates its input node. The resulting Query Compiler. Once the query code is generated, the query
operator graph represents the physical operators, i.e., the specific compiler compiles it. It uses information in the config to target
implementation chosen for the given query, system, and config. As the underlying hardware, e.g., by enabling vectorization features
start and end nodes, source and sink operators require special treat- of the CPU. As the compiler runs independently of Darwin, users
ment. They contain buffering logic, e.g., for external network-based can specify a different compiler than was used to compile Darwin.
I/O, and either have no input or output operators. After the source Darwin can be compiled once and distributed while still providing
is translated, the selection node is translated to an equality-filter flexibility towards the system it executes queries on. The code is
operator. The translation of window nodes requires reordering, as compiled into a shared library that is dynamically loaded by Darwin
windowing logic impacts the operator order. The tumbling window during execution. This allows Darwin to interact with the query,
assignment and trigger run before the aggregation, but count-based e.g., when passing allocated memory, data, or other resources.
triggers run afterward. Thus, window translation requires splitting Query Execution. In the last step, the query is loaded and ex-
and distributing certain nodes across the operator graph. For the ecuted. Depending on the generated pipeline and specified par-
aggregation, the translator chooses a hash-based sum aggregation allelism, multiple source, sink, and operator instances are started.
operator with state in PMem, as specified by the user. During execution, Darwin monitors the performance of the query to
Query Generator. The query generator takes the operator graph allow for changes in parallelism and thread placement. If pipelines
and generates a C++ string representation of the actual query. In have low utilization, they are merged to free resources. If pipelines
Figure 4, we show a pseudo-code version of the produced code. are creating backpressure, Darwin splits them to keep up with the
When the FilterOperator is called, it receives a record with schema data rate. This approach allows for some flexibility during run-
information. From this, it generates a conditional statement with time when data loads vary or are skewed. It also supports adaptive
the correct predicate (e.g., equality) based on the filtered attribute changes to the query if gathered performance metrics and data
(e.g, e.a). Depending on the predicate and execution model, the characteristics allow for more aggressive optimization [11].
filter operator can also produce vectorized or SIMD-based filters,
allowing for more fine-grained hardware optimization. Afterward, 4.2 Performance
it calls the downstream tumbling assigner. The assigner generates
We compare the performance of Darwin with the state-of-the-art
code to assign the record to a tumbling window by mapping the
scale-up SPE Grizzly [11] and the widely used scale-out SPE Apache
record’s timestamp to a window key. This process is continued until
Flink [4]. In this experiment, we run a 60-second tumbling window
all operators are called and the query is fully generated.
sum aggregation on 32 Byte records with 15000 unique keys. Our
As the windowed aggregation buffers data, it represents a pipeline
server contains an Intel Xeon Gold 6240L CPU with 18 cores, 96 GB
breaker [11, 21]. The sink operator is executed after the aggregation
DRAM, and 1.5 TB (6× 256 GB) Intel Optane DC Persistent Memory
is complete, i.e., when the window is triggered. The query generator
100 Series. The experiments are run with 32 threads. For the in-
creates a new function for the sink, which represents a new pipeline
memory version, Grizzly and Darwin use the TBB concurrent map.
that can be executed independently. Splitting pipelines allows us to
For the persistent version, Darwin uses the hybrid DRAM-PMem
independently scale sources, sinks, and other operators.
key-value store Viper [3] and Flink uses RocksDB.
As the query generation contains runtime information such as
We show the results in Figure 5. On the left, we see that Dar-
data types or filter conditions, the generated query is optimized
win performs equally to Grizzly for in-memory processing, as both
accordingly. The SumOperator knows the key and value types, so
systems are limited by TBB to store the aggregations. We note
it instantiates a state object with them. This is an advantage over
that this is not the highest performance that Grizzly can achieve,
key-value store interfaces such as RocksDB in Flink, which operate
but the additional optimizations proposed by the authors are or-
on generic byte representations. Storing records with explicit type
thogonal to the basic concept and could also be applied to Darwin.
Darwin: Scale-In Stream Processing CIDR ’22, January 9–12, 2022, Chaminade, USA
Darwin Grizzly Flink [6] Badrish Chandramouli, Guna Prasaad, Donald Kossmann, Justin Levandoski,
James Hunter, and Mike Barnett. 2018. FASTER: A Concurrent Key-Value Store
Throughput
56.7 56.7
in Mops/s