Darwin Scale in Cidr22

Download as pdf or txt
Download as pdf or txt
You are on page 1of 7

Darwin: Scale-In Stream Processing

Lawrence Benson Tilmann Rabl


Hasso Plattner Institute Hasso Plattner Institute
University of Potsdam, Germany University of Potsdam, Germany
lawrence.benson@hpi.de tilmann.rabl@hpi.de

ABSTRACT Running a general purpose VM (e.g., ecs.g6.4xlarge) with 16 cores


Companies increasingly rely on stream processing engines (SPEs) to on Alibaba’s cloud currently costs $1/hour [2]. Scaling this to 1.5
quickly analyze data and monitor infrastructure. These systems en- million cores with 93,750 VMs totals at $93,750/hour or $2.25 million
able continuous querying of data at high rates. Current production- for the entire day, just in nominal infrastructure cost.
level systems, such as Apache Flink and Spark, rely on clusters of To overcome resource inefficiency, new scale-up SPEs were pro-
servers to scale out processing capacity. Yet, these scale-out systems posed that, e.g., use query compilation [11], optimize for NUMA-
are resource inefficient and cannot fully utilize the hardware. As awareness [28], or utilize GPU-CPU co-processing [16], promising
a solution, hardware-optimized, single-server, scale-up SPEs were up to hundreds of millions of events per second on a single node.
developed. To get the best performance, they neglect essential fea- To showcase the stark contrast of scale-up to scale-out system per-
tures for industry adoption, such as larger-than-memory state and formance, we assume a scale-up system with 100 million events
recovery. This requires users to choose between high performance per second in our Alibaba example. With this system, the workload
or system availability. While some streaming workloads can afford could run on 40 VMs with 52 cores each (e.g., ecs.g6.26xlarge at
to lose or reprocess large amounts of data, others cannot, forcing $6/h) for a total of $5760/day [2], resulting in a nearly 400× reduc-
them to accept lower performance. Users also face a large perfor- tion in price. While this calculation is simplified, it clearly shows
mance drop once their workloads slightly exceed a single server the huge gap between what is achieved with current scale-out SPEs
and force them to use scale-out SPEs. and what is possible with proposed scale-up systems.
To acknowledge that real-world stream processing setups have However, the price to pay for this high performance and reduced
drastically varying performance and availability requirements, we infrastructure cost lies in a reduced feature set. While scale-up
propose scale-in processing. Scale-in processing is a new para- systems utilize the hardware more efficiently, they lack support
digm that adapts to various application demands by achieving high for larger-than-memory state and crash recovery, which limits
hardware utilization on a wide range of single- and multi-node their use in production setups. When the server or application
hardware setups, reducing overall infrastructure requirements. In crashes, all state is lost and must be reprocessed. For workloads
contrast to scaling-up or -out, it focuses on fully utilizing the given with small windows, reprocessing includes only a few hours of old
hardware instead of demanding more or ever-larger servers. We data. For unbounded or large-window streaming jobs with global
present Darwin, our scale-in SPE prototype that tailors its execution state, reprocessing may span days or weeks of old data, which
towards arbitrary target environments through compiling stream quickly becomes infeasible.
processing queries while recoverable larger-than-memory state In our example, additional problems arise that limit the use of
management. Early results show that Darwin achieves an order of scale-up systems. Even with modern high speed networks, it is
magnitude speed-up over current scale-out systems and matches currently not possible to ingest TB of data per second into a single
processing rates of scale-up systems. server. To overcome this limitation, large-scale pipeline must build
on scale-out systems, accepting the performance penalty they entail.
1 INTRODUCTION Thus, we see a huge performance gap between workloads that fit
onto a single server and can run in scale-up systems and ones that
Today’s large-scale Internet companies use stream processing en-
do not fit onto a single server and must retreat to scale-out SPEs.
gines (SPEs) to process up to terabytes of incoming data per sec-
In addition to the limitations discussed above, most companies
ond [1]. However, recent studies show that widely used SPEs such as
simply do not operate at Internet-scale and process significantly
Apache Flink and Spark Streaming do not fully utilize the underly-
less data. These smaller companies often cannot afford large infras-
ing hardware and are resource inefficient [26, 27]. Thus, companies
tructure teams to optimize and tune their multi-node pipelines. Due
must scale-out their analytics jobs to millions of cores and tens of
to the resource inefficiency of current systems, this puts them at
thousands of commodity servers. At this scale, large infrastructure
the disadvantage of still needing clusters or large machines to ana-
teams are necessary to optimize analytics pipelines to maintain
lyze smaller data volumes. And even for large companies, reducing
such a high level of processing capacity.
these costs is highly beneficial as infrastructure is the dominant
We briefly illustrate the required scale with an example from
cost factor in cloud environments [10].
Alibaba’s 2020 Singles’ Day. At its peak, they processed 4 billion
Also, regardless of scale, various business workloads require
events per second in a Flink cluster with 1.5 million CPUs [1].
high availability and cannot afford a full reprocessing after all in-
This paper is published under the Creative Commons Attribution 4.0 International memory data is lost due to a crash in scale-up systems. This again
(CC-BY 4.0) license. Authors reserve their rights to disseminate the work on their
personal and corporate Web sites with the appropriate attribution, provided that you forces users to chose inefficient scale-out systems over highly tuned
attribute the original work to the authors and CIDR 2022. 12th Annual Conference on scale-up SPEs, as production-grade scale-out systems support per-
Innovative Data Systems Research (CIDR ’22). January 9-12, 2022, Chaminade, USA. sistent, larger-than-memory state and crash recovery. However,
CIDR ’22, January 9–12, 2022, Chaminade, USA Lawrence Benson and Tilmann Rabl

Table 1: Feature set of existing SPEs with regard to resource efficiency and state management.

System Scale Language State Recovery Hardware Resource Efficiency


Flink [4] out JVM in-memory + persistent ✓ –
Spark Streaming [25] out JVM in-memory + persistent ✓ –
Drizzle [23] out JVM in-memory + persistent ✓ –
Hazelcast Jet [9] out JVM distributed in-memory ✓ cooperative multi-threading
Briskstream [28] up JVM in-memory – NUMA-aware scheduling
Saber [16] up JVM in-memory – CPU/GPU co-processing
Trill [5] up C# in-memory – query compilation, row/column layout
StreamBox [20] up C++ in-memory – NUMA-aware, lock-free
Grizzly [11] up C++ in-memory – adaptive query compilation, NUMA-aware

scale-out SPEs rely on slow secondary storage for this, further 1) We propose scale-in stream processing, a new paradigm that
decreasing overall system performance. Recent developments in adapts to varying application demands by achieving high hard-
storage technology significantly improve the performance of per- ware utilization on a wide range of hardware setups, reducing
sistent storage devices, allowing us to reduce the gap between overall infrastructure requirements.
high-performance and persistent state in SPEs. Persistent Memory 2) We present Darwin, a scale-in SPE prototype that optimizes for
(PMem) offers byte-addressability at close-to-DRAM speed with high overall hardware utilization while supporting recoverable
SSD-like capacity [7, 24] and modern NVMe SSDs achieve up to larger-than-memory state.
7 GB/s and two million random IOPS [13]. Efficiently incorporating
these technologies into streaming applications has the potential to The remainder of this paper is structured as follows. In Section 2,
radically shift the way SPEs interact with persistent state. we present current challenges in SPEs. In Section 3, we present
Following both efficient hardware utilization and durable state scale-in processing and its opportunities. We introduce our scale-in
management, we observe that systems support either one or the SPE prototype Darwin in Section 4 before concluding in Section 5.
other, but not both. In this space, we identify three key challenges
current SPEs face, resource inefficiency, state management, and over-
all system optimization. Overcoming these challenges heavily im- 2 CURRENT SPE CHALLENGES
pacts SPE performance and constitutes an important step towards Recent work in stream processing focuses either on scale-up or
industry adoption and system maturity. scale-out concepts. Scale-up systems optimize for high system uti-
Based on these challenges, we propose scale-in stream process- lization while scale-out systems focus on application stability and
ing, a new paradigm that adapts to varying application demands by efficient large state management. Both areas show promising ad-
achieving high hardware utilization on a wide range of hardware vancements, but combining them has received little attention. In
setups, reducing overall infrastructure requirements. In contrast to Section 2.1, we compare nine SPEs to see how they offer resource ef-
scaling-up or -out, it focuses on fully utilizing the given hardware ficiency and state management. From this comparison, we observe
instead of demanding more or ever-larger servers. Scale-in process- that numerous challenges remain in the intersection of scale-up
ing combines scale-out and scale-up concepts to efficiently process and scale-out systems. In Sections 2.2 to 2.4, we present three ma-
streaming data without sacrificing larger-than-memory state, crash jor challenges that current SPEs face, resource inefficiency, state
recovery. To scale-in, we adapt common scale-up approaches that management, and overall system optimization.
optimize for the underlying hardware and common scale-out ap-
proaches that enable large state management. On the one hand, this
allows scale-in SPEs to support large-scale processing when raw 2.1 Focus of Existing Systems
processing power is needed. On the other hand, when data volumes In this section, we briefly discuss the feature sets of common SPEs
are low and infrastructure cost is more important than performance, with regard to resource efficiency and state management. We show
scaling-in reduces the hardware requirements, resulting in lower the comparison in Table 1. For most features, we observe a clear
infrastructure cost and operational complexity, while still offering distinction between scale-up and scale-out systems. While scale-out
key functionality such as crash recovery. Compared to the current systems support recoverable, larger-than-memory state with persis-
performance drop when switching from scale-up to scale-out SPEs, tent or distributed state management, all scale-up systems support
scale-in allows for graceful scaling when workloads exceed single only in-memory state without recovery. Instead, scale-up systems
server by optimizing for both single- and multi-node setups. offer optimizations for higher hardware resource utilization on a
To this end, we introduce Darwin, our scale-in SPE prototype. single server. These include query compilation, NUMA-awareness,
Darwin leverages query compilation and modern storage to fully lock-free data sharing, and GPU co-processing. Except for NUMA-
utilize the underlying hardware and handle large recoverable state. awareness, none of these are exclusive to large servers and are
In summary, we make the following contributions. applicable optimizations also in commodity machines. They rep-
resent a large area of improvement for scale-out systems, which
currently offer very little hardware optimization.
Darwin: Scale-In Stream Processing CIDR ’22, January 9–12, 2022, Chaminade, USA

Finally, all selected scale-out systems target the JVM with man- On the other hand, many widely used SPEs are written in high-
aged languages. The group of scale-up systems is not as homoge- level languages such as Java and Scala, targeting the JVM. Especially
neous, spanning managed languages and C++. The recent scale- memory-management has a high performance impact due to, e.g.,
up SPE Grizzly demonstrates large performance gains by using a garbage collection overhead. Also, common SPEs often do not opti-
system language such as C++, outperforming other scale-up and mize internal operators at the level known from databases. Overall,
JVM-based systems by orders of magnitude [11]. Thus, using a we observe a major gap between optimization levels in SPEs and
system-level language is highly advantageous to achieve high uti- database systems. With the increasing maturity of SPEs, reducing
lization when targeting the underlying hardware. this gap is essential to improve the performance of future applica-
Overall, we observe distinct characteristics for scale-up and scale- tions. Fortunately, many operations are similar in databases and
out systems. While none of the scale-up systems offer recoverable SPEs, allowing us to benefit from database optimization research.
state management, the scale-out systems neglect hardware opti-
mizations. To achieve high performance and resource efficiency,
future SPEs must focus on combining these features. Scaling-up 3 SCALE-IN STREAM PROCESSING
should not come at the price of data loss and scaling-out should To overcome the current challenges in stream processing and ac-
not come at the price of poor hardware utilization. knowledge the fact that real-world setups have drastically varying
performance and availability requirements, we propose scale-in pro-
cessing. Scale-in processing is a new paradigm that adapts to vary-
2.2 Resource Inefficiency
ing application demands by achieving high hardware utilization
Recent studies show that widely used scale-out SPEs do not fully on a wide range of hardware setups, reducing overall infrastruc-
utilize the underlying hardware [26, 27]. When designing future ture requirements. In contrast to scaling-up or -out, it focuses on
SPEs, overcoming this resource inefficiency has great potential to fully utilizing the given hardware instead of requiring more or ever-
reduce cost and improve performance. Higher resource utilization larger servers. We identify six opportunities for scale-in SPEs, based
leads to higher system performance, i.e., higher throughput or on the challenges discussed in Section 2. In Section 3.1, we discuss
lower latency. However, when processing smaller data volumes, how query compilation and specialized network communication
scalability is not the primary concern for many users. Efficient aid in overcoming general resource inefficiency. In Section 3.2, we
server use allows users to reduce the number of required servers present three opportunities to improve state management in scale-
while still satisfying their performance needs. This not only reduces in systems: emerging persistent storage media, crash recovery, and
infrastructure cost but also overall system complexity. streaming-specific access patterns. To increase the overall system
performance, we discuss CPU-aware optimizations in Section 3.3.
2.3 State Management To achieve high performance, scale-up systems commonly op-
Recent scale-up SPEs focus on maximizing hardware utilization timize for large high-end servers and scale-out SPEs commonly
through, e.g., query compilation [11], CPU-GPU co-processing [16], add more commodity servers. Solving performance limitations by
or NUMA-awareness [28]. Yet, none of these systems support larger- adding more machines results in neglected individual server perfor-
than-memory state or crash recovery, both important features for mance and poor resource utilization in scale-out systems. On the
industry adoption. We identify efficient state management as a other hand, current scale-up systems are confined to a single server
largely uninvestigated topic in scale-up SPEs compared to numer- and require ever-larger machines to overcome performance issues.
ous computational improvements. By combining both scale-up and scale-out concepts, scale-in SPEs
Common scale-out SPEs such as Apache Flink use persistent treat every machine as a server that requires optimization, even
state backends (e.g., RocksDB) to handle larger-than-memory state. if it contains only off-the-shelf components. To this end, scale-in
However, general-purpose key-value stores do not always fit stream- systems adapt their execution to the underlying hardware and the
specific state access patterns [14]. They treat state as a black box, specified queries. For large workloads, scale-in systems achieve the
while many streaming-specific patterns are known in advance. Also, raw performance known from scale-up systems and for medium
currently used general-purpose stores are not optimized for emerg- or small pipelines, they reduce hardware requirements while still
ing storage technology. Research on modern storage-aware systems offering key functionality such as crash recovery.
shows significant performance gains compared to traditional ap-
proaches [3, 17]. Storage-aware and streaming-specific state man-
3.1 Opportunities for Resource Inefficiency
agement presents a wide range of research challenges to improve
the overall performance of modern SPEs. In this section, we discuss how query compilation and network
communication aid in overcoming general resource inefficiency.
Query Compilation. At the core of scale-in processing, query
2.4 Overall System Optimization compilation allows for hardware-conscious optimization on each
Database systems show that optimizing the overall system brings server. This has many advantages, which are clearly demonstrated
large performance benefits. Databases are commonly implemented in previous work on SPEs [11] and database systems [21]. Compiling
in system languages such as C and C++, which compile to machine queries allows the compiler to optimize the execution for the given
code. They are highly tuned towards the underlying system for CPU without pre-compiling the runtime engine for all possible
maximum performance and offer, e.g., hardware-conscious joins systems. This enables compiler features such as auto-vectorization
and indexes, CPU-optimized scans, or NUMA-aware scheduling. and architecture-aware tuning without any development overhead.
CIDR ’22, January 9–12, 2022, Chaminade, USA Lawrence Benson and Tilmann Rabl

Viper TBB RocksDB


To highlight this advantage, we run a short experiment with
10 37
CPU-specific optimization enabled and disabled. We execute a query 12

µs per Record

Duration in s
in-memory based on Nexmark Q3, containing an auction-like setup 8
6 9
with a join and aggregation. We first compile the query generi-
cally, i.e., no CPU-aware compiler flags. This represents the general 4 6
case in which we do not target the underlying hardware and use 2 3
a generic implementation for all servers. We then enable CPU- 0 0
INSERT GET Checkpoint Recovery
optimizations via -march=native. On an Intel Xeon Gold 5220S
Figure 1: Insert and get Figure 2: Checkpoint and
CPU, adding the CPU optimizations achieves a 12% higher through-
performance. recovery duration.
put without any changes to the code. This shows that even without
explicitly writing CPU-optimized code, automatically targeting the
underlying hardware is very beneficial.
Additionally, query compilation allows us to integrate query between slow secondary storage and fast volatile memory. Recent
information as compile-time information, enabling additional opti- work on PMem storage systems shows that persistency can be
mizations. A push-based execution model improves data locality achieved with less than 2× performance decrease [3]. Also, fast
and reduces the number of virtual method calls compared to in- NVMe SSDs are used in modern database systems to extend storage
terpreted execution by compiling the entire execution into a tight capacity while still offering close-to-DRAM performance [17].
loop [21]. Overall, query compilation allows us to tailor the execu- We show the performance of modern storage systems in Figure 1.
tion exactly to the given query, system, and user requirements. It is We choose Intel’s TBB concurrent hash map as a representative
the foundation for numerous other optimizations that we describe of in-memory state management, as it is used in recent scale-up
below, such as CPU optimization and state management. SPEs [11, 20]. We choose RocksDB as a representative of a classical
Network Communication. An important component of scale- generic byte-based key-value store, as it is used in Apache Flink.
out SPEs is network communication between the servers. Recent Finally, we choose Viper as a representative of modern storage-
work shows that the network constitutes a performance bottleneck aware key-value stores, which is based on a hybrid DRAM-PMem
and causes resource inefficiency on the servers [15]. In their study, index and log structure [3]. Viper stores records directly in a PMem
the authors show that scale-out systems like Flink reach network log without intermediate buffering in DRAM. To leverage the higher
limits long before they reach the actual saturating data rate. When random access performance of DRAM compared to PMem for index
performing a distributed aggregation across multiple nodes with 1 updates, its index is located in DRAM. This hybrid design achieves
GBit LAN, Flink sustains only 1.2 million events/s, which amount to 4× higher insert rates than PMem-only stores.
only 40 MB or 1/3 of the network bandwidth. While this constitutes We prefill 100 million 50 Byte records before measuring another
the main bottleneck in 1 GBit LAN, today even medium-sized cloud 100 million inserts/gets with 32 threads. Viper slightly outperforms
instances have 10 or more GBit/s network connections, matching TBB for inserts and is only ~2× slower for gets. We note that re-
or surpassing SSD storage bandwidth. Managing this bandwidth cent work shows TBB to not be the fastest concurrent in-memory
with techniques like late merging [26] to reduce data shuffling or storage system [6], but it is used in common scale-up SPEs [11, 20]
user-space networking to reduce TCP/IP overhead [18], enables and serves as an in-memory reference. The clear gap between
higher effective bandwidth utilization even for smaller workloads. Viper/TBB and RocksDB shows the major shift in persistent stor-
For large-scale workloads, advances in network technology dras- age performance that systems can leverage. Unlike existing scale-
tically improve cross-server communication performance via high out systems, storage-aware scale-in systems do not need to trade
bandwidth Infiniband and RDMA connections of up to 200 GBit/s performance for persistence. Fast storage enables both efficient
per network card [19]. Current research demonstrates that SPEs ben- recoverability and high overall throughput.
efit from RDMA for data ingestion [26] and that RDMA-based mes- Recovery. Considering server-local state is necessary when
sage passing achieves very high throughput with low latency [22]. restarting an application after a crash and highly beneficial for reg-
These two findings show that there is a large potential for opti- ular application restarts. Scale-up systems run on high-end servers
mizing SPEs through fast RDMA connections. Especially in combi- that contain hundreds of GBs of state. For specialized hardware,
nation with modern byte-addressable storage, such as PMem, this replacing the server is not always possible and transferring its state
opens new opportunities for more efficient checkpointing, state to another server quickly becomes a recovery bottleneck. In this
migration, and recovery approaches. case, state recovery must occur on the same server. Additionally,
current scale-out SPEs use server-local state when restarting an
application on the same node without a crash, e.g., when re-scaling
3.2 Opportunities for State Management or deploying a newer version. For both recovery and restarting, it
In this section, we present three opportunities to improve state man- is essential to have persistent state that outlives the application.
agement in scale-in systems: emerging persistent storage media, We show the advantage of using modern storage technology to
crash recovery, and streaming-specific access patterns. achieve efficient checkpointing and recovery in Figure 2. In this
Persistent Storage. Scale-out systems use persistent storage to microbenchmark, we store 200 million 50 Byte records, i.e., 10 GB
handle larger-than-memory state. This entails a large performance raw data, in three different storage instances. As representatives
decrease as storage access is significantly slower than DRAM access. of their respective system classes, we again use TBB, Viper, and
However, new persistent storage technology is closing the gap RocksDB. To persist TBB’s data, we store all entries in a tightly
Darwin: Scale-In Stream Processing CIDR ’22, January 9–12, 2022, Chaminade, USA

Viper TBB RocksDB 3.3 Opportunities for System Optimization


a) Shared b) Individual In this section, we discuss CPU-aware optimizations to increase
6 37 6 14
µs per Record

5 5 the overall system performance.


4 4 CPU-aware Optimization. Poor CPU utilization is a major
3 3
2 2 contributor to resource inefficiency in current scale-out SPEs [27].
1 1 To overcome this, scale-in SPEs target the system’s CPU to achieve
0 0
1 5 10 20 50 1 5 10 20 50 higher overall utilization. Recent work shows the potential of adapt-
# Records per Block # Records per Block ing OLAP queries towards a given workload and system setup [12].
Figure 3: Grouped state access performance for Exploring different computation modes, such as compiled or vector-
a) shared and b) individual storage instances. ized execution, is heavily researched in databases. However, they
have received little attention in SPEs so far. Transferring these con-
cepts to SPEs has the potential to further increase the overall system
packed byte array in a file stored on SSD. We see that the persistent performance. For example, storing network-buffered records in a
systems RocksDB and Viper perform a checkpoint very efficiently. row or column format depending on the data and query allows for a
RocksDB must only flush its volatile write buffer and Viper must performance trade-off between processing time and ingestion rate,
persist only metadata. TBB takes significantly longer, as all in- while also enabling scalar or vectorized execution modes.
memory data is converted and copied to secondary storage, which Another optimization is based on simultaneous multithreading
is I/O-bound. For recovery, we see that RocksDB performs best, as it (SMT). Depending on the workload, using SMT hides memory
reads only metadata and immediately accepts requests. Viper must access latency while not using it improves cache locality. When
recover its volatile index, which depends on the number of entries. multiple operators have low CPU consumption, they are placed on
TBB reads all data from storage and re-creates its in-memory state, the same core to achieve better utilization. When CPU utilization
which takes significantly longer than the other two systems. is high, features such as explicit SIMD instructions achieve higher
This experiment shows a large difference in the recovery perfor- throughput with the same utilization. While query compilation
mance of volatile and persistent state. Expanding on this is a key generally targets the underlying CPU, explicit optimizations and
element of scale-in stream processing, as it impacts both runtime domain knowledge additionally improve performance.
performance while checkpointing and start-up time after a crash.
With state in the order of TBs, re-creating in-memory state from
secondary storage becomes infeasible. 4 INTRODUCING DARWIN
Streaming-specific State Access. In addition to storage-aware
state management, streaming-specific access patterns improve per- In this section, we present Darwin, our scale-in SPE prototype. Dar-
formance even on slow storage media. We demonstrate this based win treats each server it runs on like a scale-up system by fully
on current behavior in Apache Flink. In Flink, windowed opera- utilizing the given hardware configuration. To this end, it uses
tions store incoming events in a list of records belonging to a given query compilation to generate efficient execution plans for each
window. When using the RocksDB backend, the operator gets the query targeting the server’s hardware. This targeting currently
current value, deserializes it, appends the new value, and serializes includes storage-aware state management and CPU-specific opti-
the updated list back to its byte representation. This incurs an un- mizations. As Darwin is still in early stages, we plan to add support
necessarily high overhead for each record. As the records are not for more opportunities discussed in Section 3, e.g., network-aware
needed immediately and are accessed only as a list, they can be data transfer to achieve efficient multi-server processing or efficient
buffered in small in-memory lists before writing to RocksDB. checkpointing and recovery mechanisms.
We show the effects of buffering records in Figure 3. In this ex-
periment, we prefill 100 million 50 Byte records before performing
another 100 million inserts with 32 threads. We distinguish between 4.1 Darwin Architecture
a shared instance, in which all threads operate on the same store, In this section, we present Darwin’s high-level architecture, com-
and individual instances, in which each thread has its own store. ponents, and execution flow, as shown in Figure 4.
We store the records in small blocks, consisting of up to 50 records. Data Pipeline. Users create queries via a data pipeline object,
We observe that Viper outperforms TBB for individual instances which currently offers an SQL-like API inspired by Apache Flink’s
but performs worse for the shared one. This difference is important Table API [8]. Additionally, the user configures runtime options
when designing streaming state, as it can either be shared across such as the compiler to use, which storage medium to use for state,
operator instances, e.g., in Grizzly [11], or partitioned by key, e.g., and which architecture to optimize for. To run on heterogeneous
in Flink. Our results show that depending on the underlying stor- hardware without manually adapting for each server, this config
age and chosen system, one or the other is more beneficial. More also auto-detects system characteristics.
importantly, RocksDB performs between 14–20× worse than TBB Query Plan. From the query, a query plan is created. The query
when storing individual records, showing the high overhead of per- plan represents the logical version of the query, similar to rela-
sistent storage. However, compared to individual records in TBB, 50 tional algebra for classical database queries. Compared to relational
grouped records are only 2–3× slower in RocksDB. This shows that algebra, it requires a few additions, e.g., for windowing logic or
streaming-specific access significantly improves state performance, external I/O. The query plan is the first step in the execution in
even for low-end storage media. which optimizations are performed, e.g., predicate push-down.
CIDR ’22, January 9–12, 2022, Chaminade, USA Lawrence Benson and Tilmann Rabl

Figure 4: Darwin’s architecture and execution flow.

Operator Graph. Together with the config, the query plan is information removes serialization and deserialization overhead and
translated to an operator graph. We briefly describe the transla- allows the compiler to optimize data move instructions, e.g., by
tion based on the query plan shown in Figure 4. Starting from the issuing SIMD loads/stores instead of regular 8 Byte movs [3].
sink, each node recursively translates its input node. The resulting Query Compiler. Once the query code is generated, the query
operator graph represents the physical operators, i.e., the specific compiler compiles it. It uses information in the config to target
implementation chosen for the given query, system, and config. As the underlying hardware, e.g., by enabling vectorization features
start and end nodes, source and sink operators require special treat- of the CPU. As the compiler runs independently of Darwin, users
ment. They contain buffering logic, e.g., for external network-based can specify a different compiler than was used to compile Darwin.
I/O, and either have no input or output operators. After the source Darwin can be compiled once and distributed while still providing
is translated, the selection node is translated to an equality-filter flexibility towards the system it executes queries on. The code is
operator. The translation of window nodes requires reordering, as compiled into a shared library that is dynamically loaded by Darwin
windowing logic impacts the operator order. The tumbling window during execution. This allows Darwin to interact with the query,
assignment and trigger run before the aggregation, but count-based e.g., when passing allocated memory, data, or other resources.
triggers run afterward. Thus, window translation requires splitting Query Execution. In the last step, the query is loaded and ex-
and distributing certain nodes across the operator graph. For the ecuted. Depending on the generated pipeline and specified par-
aggregation, the translator chooses a hash-based sum aggregation allelism, multiple source, sink, and operator instances are started.
operator with state in PMem, as specified by the user. During execution, Darwin monitors the performance of the query to
Query Generator. The query generator takes the operator graph allow for changes in parallelism and thread placement. If pipelines
and generates a C++ string representation of the actual query. In have low utilization, they are merged to free resources. If pipelines
Figure 4, we show a pseudo-code version of the produced code. are creating backpressure, Darwin splits them to keep up with the
When the FilterOperator is called, it receives a record with schema data rate. This approach allows for some flexibility during run-
information. From this, it generates a conditional statement with time when data loads vary or are skewed. It also supports adaptive
the correct predicate (e.g., equality) based on the filtered attribute changes to the query if gathered performance metrics and data
(e.g, e.a). Depending on the predicate and execution model, the characteristics allow for more aggressive optimization [11].
filter operator can also produce vectorized or SIMD-based filters,
allowing for more fine-grained hardware optimization. Afterward, 4.2 Performance
it calls the downstream tumbling assigner. The assigner generates
We compare the performance of Darwin with the state-of-the-art
code to assign the record to a tumbling window by mapping the
scale-up SPE Grizzly [11] and the widely used scale-out SPE Apache
record’s timestamp to a window key. This process is continued until
Flink [4]. In this experiment, we run a 60-second tumbling window
all operators are called and the query is fully generated.
sum aggregation on 32 Byte records with 15000 unique keys. Our
As the windowed aggregation buffers data, it represents a pipeline
server contains an Intel Xeon Gold 6240L CPU with 18 cores, 96 GB
breaker [11, 21]. The sink operator is executed after the aggregation
DRAM, and 1.5 TB (6× 256 GB) Intel Optane DC Persistent Memory
is complete, i.e., when the window is triggered. The query generator
100 Series. The experiments are run with 32 threads. For the in-
creates a new function for the sink, which represents a new pipeline
memory version, Grizzly and Darwin use the TBB concurrent map.
that can be executed independently. Splitting pipelines allows us to
For the persistent version, Darwin uses the hybrid DRAM-PMem
independently scale sources, sinks, and other operators.
key-value store Viper [3] and Flink uses RocksDB.
As the query generation contains runtime information such as
We show the results in Figure 5. On the left, we see that Dar-
data types or filter conditions, the generated query is optimized
win performs equally to Grizzly for in-memory processing, as both
accordingly. The SumOperator knows the key and value types, so
systems are limited by TBB to store the aggregations. We note
it instantiates a state object with them. This is an advantage over
that this is not the highest performance that Grizzly can achieve,
key-value store interfaces such as RocksDB in Flink, which operate
but the additional optimizations proposed by the authors are or-
on generic byte representations. Storing records with explicit type
thogonal to the basic concept and could also be applied to Darwin.
Darwin: Scale-In Stream Processing CIDR ’22, January 9–12, 2022, Chaminade, USA

Darwin Grizzly Flink [6] Badrish Chandramouli, Guna Prasaad, Donald Kossmann, Justin Levandoski,
James Hunter, and Mike Barnett. 2018. FASTER: A Concurrent Key-Value Store
Throughput

56.7 56.7
in Mops/s

60 60 with In-Place Updates. In SIGMOD ’18. ACM, 275–290. https://doi.org/10.1145/


40 40 32.5 3183713.3196898
20 20 [7] Björn Daase, Lars Jonas Bollmeier, Lawrence Benson, and Tilmann Rabl. 2021.
2.7 Maximizing persistent memory bandwidth utilization for OLAP workloads. In
0 0
a) In-Memory b) Persistent SIGMOD ’21. ACM. https://doi.org/10.1145/3448016.3457292
[8] Apache Flink. 2021. Table API & SQL. ci.apache.org/projects/flink/flink-docs-
Figure 5: Throughput of Darwin, Grizzly, and Flink. release-1.13/docs/dev/table/overview
[9] Can Gencer, Marko Topolnik, Viliam Ďurina, Emin Demirci, Ensar B. Kahveci, Ali
Gürbüz Ondřej Lukáš, József Bartók, Grzegorz Gierlach, František Hartman, Ufuk
On the right, we see that Darwin outperforms Flink by over an Yılmaz, Mehmet Doğan, Mohamed Mandouh, Marios Fragkoulis, and Asterios
order of magnitude (12×). Additionally, in-memory Darwin is only Katsifodimos. 2021. Hazelcast Jet: Low-latency Stream Processing at the 99.99th
1.7× better than the persistent version, which is in line with the Percentile. arXiv:2103.10169 [cs].
[10] Albert Greenberg, James Hamilton, Dave Maltz, and Parveen Patel. 2009. The
PMem–DRAM gap presented in the Viper paper [3]. This shows Cost of a Cloud: Research Problems in Data Center Networks. Computer Com-
that modern storage significantly closes the gap between volatile munications Review 39, 1.
and existing durable state management, enabling us to efficiently [11] Philipp M. Grulich, Sebastian Breß, Steffen Zeuch, Jonas Traub, Janis von Ble-
ichert, Zongxiong Chen, Tilmann Rabl, and Volker Markl. 2020. Grizzly: Efficient
support larger-than-memory state. Overall, our results show that Stream Processing Through Adaptive Query Compilation. In SIGMOD ’20. ACM,
Darwin achieves both state-of-the-art scale-up performance and an 2487–2503. https://doi.org/10.1145/3318464.3389739
[12] Tim Gubner and Peter Boncz. 2021. Charting the design space of query execution
order of magnitude improvement over existing larger-than-memory using VOILA. PVLDB 14, 6, 1067–1079. https://doi.org/10.14778/3447689.3447709
scale-out systems. [13] Intel. 2021. Optane SSD P5800X. www.intel.com/content/www/us/en/products/
docs/memory-storage/solid-state-drives/data-center-ssds/optane-ssd-p5800x-
p5801x-brief.html
5 CONCLUSION [14] Vasiliki Kalavri and John Liagouris. 2020. In support of workload-aware
To bridge the gap between performance and core features, we pro- streaming state management. https://www.usenix.org/conference/hotstorage20/
presentation/kalavri
pose scale-in stream processing and present our prototype system [15] Jeyhun Karimov, Tilmann Rabl, Asterios Katsifodimos, Roman Samarev, Henri
Darwin. By achieving high hardware utilization on a wide range Heiskanen, and Volker Markl. 2018. Benchmarking Distributed Stream Processing
of hardware setups, scale-in systems adapt to application-specific Engines. In ICDE. IEEE, 1507–1518.
[16] Alexandros Koliousis, Matthias Weidlich, Raul Castro Fernandez, Alexander L
demands. Combining scale-out concepts with advancements in per- Wolf, Paolo Costa, and Peter Pietzuch. 2016. SABER: Window-Based Hybrid
sistent storage and scale-up concepts that focus on the underlying Stream Processing for Heterogeneous Architectures. In SIGMOD ’16. ACM, 555–
hardware, scale-in processing achieves high system utilization with- 569. https://doi.org/10.1145/2882903.2882906
[17] Viktor Leis, Michael Haubenschild, Alfons Kemper, and Thomas Neumann. 2018.
out sacrificing key features for industry adoption, such as recover- LeanStore: In-Memory Data Management beyond Main Memory. In ICDE ’18.
able, larger-than-memory state. Our scale-in SPE prototype Darwin IEEE, 185–196. https://doi.org/10.1109/ICDE.2018.00026
[18] Lucas Lersch, Ivan Schreter, Ismail Oukid, and Wolfgang Lehner. 2020. Enabling
uses query compilation and storage-aware state management to low tail latency on multicore key-value stores. Proceedings of the VLDB Endow-
match state-of-the-art scale-up performance while outperforming ment 13, 7, 1091–1104. https://doi.org/10.14778/3384345.3384356
existing scale-out systems by an order magnitude. Scale-in process- [19] Mellanox. 2021. 200Gb/s ConnectX-6 Ethernet Single/Dual-Port Adapter IC.
www.mellanox.com/products/ethernet-adapter-ic/connectx-6-en-ic
ing enables application-proportional scaling of server requirements, [20] Hongyu Miao, Heejin Park, Myeongjae Jeon, Gennady Pekhimenko, Kathryn
making it economical for all levels of performance needs. Mckinley, and Felix Xiaozhu Lin. 2017. StreamBox: Modern Stream Processing
on a Multicore Machine. In ATC. USENIX Association, 617–629.
[21] Thomas Neumann. 2011. Efficiently compiling efficient query plans for modern
ACKNOWLEDGMENTS hardware. PVLDB 4, 9, 539–550. https://doi.org/10.14778/2002938.2002940
This work was partially funded by the German Ministry for Ed- [22] Lasse Thostrup, Jan Skrzypczak, Matthias Jasny, Tobias Ziegler, and Carsten
Binnig. 2021. DFI: The Data Flow Interface for High-Speed Networks. In SIGMOD
ucation and Research (ref. 01IS18025A and ref. 01IS18037A), the ’21. ACM, 1825–1837. https://doi.org/10.1145/3448016.3452816
German Research Foundation (ref. 414984028), and the European [23] Shivaram Venkataraman, Aurojit Panda, Kay Ousterhout, Michael Armbrust,
Ali Ghodsi, Michael J. Franklin, Benjamin Recht, and Ion Stoica. 2017. Drizzle:
Union’s Horizon 2020 research and innovation programme (ref. Fast and Adaptable Stream Processing at Scale. In SOSP ’17. ACM, 374–389.
957407). https://doi.org/10.1145/3132747.3132750
[24] Jian Yang, Juno Kim, Morteza Hoseinzadeh, Joseph Izraelevitz, and Steven Swan-
son. 2020. An Empirical Guide to the Behavior and Use of Scalable Persistent
REFERENCES Memory. In FAST ’20. USENIX Association, 169–182.
[1] Alibaba. 2020. Four Billion Records per Second! www.alibabacloud. [25] Matei Zaharia, Scott Shenker, Haoyuan Li, Tathagata Das, Timothy Hunter, and
com/blog/four-billion-records-per-second-stream-batch-integration- Ion Stoica. 2013. Discretized streams: fault-tolerant streaming computation at
implementation-of-alibaba-cloud-realtime-compute-for-apache-flink-during- scale. In SOSP ’13. ACM, 423–438. https://doi.org/10.1145/2517349.2522737
double-11_596962 [26] Steffen Zeuch, Bonaventura Del Monte, Jeyhun Karimov, Clemens Lutz, Manuel
[2] Alibaba. 2021. Elastic Compute Service. www.alibabacloud.com/product/ecs Renz, Jonas Traub, Sebastian Breß, Tilmann Rabl, and Volker Markl. 2019. An-
[3] Lawrence Benson, Hendrik Makait, and Tilmann Rabl. 2021. Viper: An Efficient alyzing Efficient Stream Processing on Modern Hardware. Proceedings of the
Hybrid PMem-DRAM Key-Value Store. Proceedings of the VLDB Endowment 14, VLDB Endowment 12, 5, 516–530. https://doi.org/10.14778/3303753.3303758
9, 1544 – 1556. https://doi.org/10.14778/3461535.3461543 [27] Shuhao Zhang, Bingsheng He, Daniel Dahlmeier, Amelie Chi Zhou, and Thomas
[4] Paris Carbone, Stephan Ewen, Seif Haridi, Asterios Katsifodimos, Volker Markl, Heinze. 2017. Revisiting the Design of Data Stream Processing Systems on Multi-
and Kostas Tzoumas. 2015. Apache Flink(TM): Stream and Batch Processing in a Core Processors. In ICDE ’17. 659–670. https://doi.org/10.1109/ICDE.2017.119
Single Engine. IEEE Data Eng. Bull. 38, 4, 28–38. [28] Shuhao Zhang, Jiong He, Amelie Chi Zhou, and Bingsheng He. 2019. BriskStream:
[5] Badrish Chandramouli, Jonathan Goldstein, Mike Barnett, Robert DeLine, Danyel Scaling Data Stream Processing on Shared-Memory Multicore Architectures. In
Fisher, John C. Platt, James F. Terwilliger, and John Wernsing. 2014. Trill: a SIGMOD ’19. ACM, 705–722. https://doi.org/10.1145/3299869.3300067
high-performance incremental query processor for diverse analytics. PVLDB 8,
4, 401–412. https://doi.org/10.14778/2735496.2735503

You might also like