Akash Box Akash Notes3
Akash Box Akash Notes3
Akash Box Akash Notes3
topic/gpdb-users/-6Q8Dv_caGM Greenplum
benchmark
https://www.dremio.com/apache-arrow-explained/
https://community.dremio.com/t/what-dremio-is-exactly/3244/4
First and foremost, Dremio is a scale-out SQL engine based on Apache Arrow. You can
think of it as an alternative to Presto, Hive LLAP, Impala, etc. Note that Dremio is focused
on fast, interactive queries, so there’s usually still a role for Hive or Spark for long-running
ETL workloads (eg, things that take hours to complete).
Unlike Presto, Hive LLAP, Impala, and other SQL engines, Dremio includes extensive
support for Data Reflections (similar to materialized views), and query rewriting. So, like
Teradata and Oracle, Dremio will automatically rewrite your queries to make use of Data
Reflections when the costs are determined to be less than running the query on the raw
source data. This can easily be 100x-1000x faster than querying the raw data. Also similar
to Teradata, Dremio provides extensive workload management capabilities, so operators
can prioritize certain workloads.
As you have noted, in addition to providing the world’s fastest way to query data on the data
lake, Dremio also provides capabilities around 1) data curation, 2) data virtualization, 3) row
and column-level access controls, 4) an integrated data catalog, and 5) data lineage
capabilities. Yes there are products for each of these, but they are mostly proprietary and
stand-alone tools. We think the key to making data consumers for self-sufficient is to
provide an integrated platform that removes friction and simplifies the daily workflow for
analytics.
https://www.dremio.com/podcasts/columnar-data-apache-arrow-parquet/
https://www.dremio.com/webinars/apache-arrow-calcite-parquet-relational-cache/
https://blog.cloudera.com/apache-hive-druid-part-1-3/
One good combo Druid on HIVE and Tableau for Visualization
https://info.dataengconf.com/hubfs/bcn18-
talks/Reza%20Shiftehfar_DataEng2018_Barcelona_Creating%20an%20Extensible%20Big%20Data%20Pla
tform.pdf
https://github.com/solversa/marmaray
https://www2.omnisci.com/resources/technical-
whitepaper/lp?_ga=2.104027008.1019306009.1588761553-194589801.1588761553
http://www.timestored.com/time-series-data/column-oriented-databases
Dremio includes a vectorized Parquet reader that reads directly to Arrow, so you should see
some nice benefits when Parquet is the source if you’re seeing reads as the bottleneck
(assuming I/O isn’t the limiting factor, of course).
And I agree that Hive and Spark still have a role in the modern Big Data stack. Hive/Spark
can be especially good for “long haul” ETL, for jobs that run for extended periods of time.
Dremio is especially good low latency query processing, and for “last mile” ETL where
transformations are applied, without making copies of the data. Of course Dremio does
other things, like a data catalog users can search, data lineage, curation abilities, etc. But it
can make a lot of sense to combine Hive, Spark, and Dremio together.
The only thing I see is that Presto was able to view the “map” type we have in our data
while for Dremio some “flattenning” (and complex interpretation) was needed when working
with Hive.
There are trade-offs. For Presto to be of any good, it needs Hive and tables. Dremio can go
directly to HDFS and that works quite nice, including reading from “maps” (if you do some
simple flattening) but Hive integration lacks support for the “map” and “lists” (because
Apache Drill also lacks it I believe).
Both runs had the exact same query, adapted to the SQL specific of each.
On the one hand, we have a wealth of options at pretty much all levels of the
stack including:
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-
cheap-machines
https://ondataengineering.net/tech-vendors/apache/ drill
https://stackoverflow.com/questions/41107363/benchmarks-of-apache-geode
https://stackoverflow.com/questions/41107363/benchmarks-of-apache-geode
4
1
I'm doing some research and I need benchmarks of Apache Geode maybe in comparison with
Redis or Hazelcast. Would be appreciated if someone could point me to such.
In one of presentations of Pivotal I saw some benchmarks but they just shown that "Geode is
faster than Cassandra" in some unknown aspects and they promised to publish benchmarks
but I cannot find them.
https://medium.com/@leventov/comparison-of-the-open-source-olap-systems-for-big-data-clickhouse-
druid-and-pinot-8e042a5ed1c7 ClickHouse, Druid, and Pinot
https://en.wikipedia.org/wiki/List_of_in-memory_databases
https://www.quora.com/Are-there-any-open-source-columnar-column-store-databases
Red Hat Red Hat JBoss Data Grid 7.0 October 1, 2016
Redis Labs Redis Labs Enterprise Cluster (RLEC) 4.3 October 1, 2016
(same tech that runs Redis Cloud)
https://towardsdatascience.com/interactively-analyse-100gb-of-json-data-with-spark-e018f9436e76
https://www.reddit.com/r/datasets/comments/3bxlg7/i_have_every_publicly_available_reddit
_comment/
1TB word of data of 2007 to 2015 reddit comments containing 1.7 billion comments
http://sentiwordnet.isti.cnr.it/
101: MPI vs Hadoop ( MPI wins)
https://thescipub.com/pdf/10.3844/jcssp.2017.781.794.pdf
https://www.quora.com/When-do-we-use-Hadoop-Spark-Flink-MPI-PVM-and-OpenMP-and-why
Detailed study on spark processing was conducted in Spark2.0.2 on Hadoop2.7.3 and YARN on a Google
cloud cluster setup. Experimental results were analyzed based on peak memory utilization, peak CPU
utilization and execution time. Sentiment analysis application was chosen as the application and it was
implemented in Scala for spark processing and programmed in C++ for the implementation in MPI. In
the experiment, the execution time on spark and MPI shows that the execution speed of MPI was
approximately 2* faster than spark processing with the configuration II at par with the results in
(ReyesOrtiz et al., 2015).
https://www.quora.com/When-do-we-use-Hadoop-Spark-Flink-MPI-PVM-and-OpenMP-and-why
102.
Apache Spark and Apache Flink are emerging as big data platforms for large-scale parallel machine
learning. Even though these big data frameworks are designed differently, they follow the data flow
model for execution and user
103.
https://journalofbigdata.springeropen.com/articles/10.1186/s40537-019-0215-2 ioT Application for
smart cities large scale architecture
104.
https://www.cs.utah.edu/~hari/files/pubs/sc13.pdf
In our largest run, sorting 100TB of records using 1792 hosts, we achieved an end-to-end throughput of
1.24TB/min using our general-purpose sorter, improving on the current Daytona record holder by 65%.
105.
http://dsc.soic.indiana.edu/publications/OgreFacetsv10.pdf
Benchmark of benchmarks. Wonderful paper
106.
https://s.yimg.com/ge/labs/v1/files/ycsb-v4.pdf
http://www.brianfrankcooper.net/home/publications/ycsb.pdf
Setup – Six server-class machines • 8 cores (2 x quadcore) 2.5 GHz CPUs, 8 GB RAM, 6 x 146GB 15K RPM
SAS drives in RAID 1+0, Gigabit ethernet, RHEL 4 – Plus extra machines for clients, routers, controllers,
etc. – Cassandra 0.5.0 (0.6.0-beta2 for range queries) – HBase 0.20.3 – MySQL 5.1.32 organized into a
sharded configuration – Sherpa 1.8 with MySQL 5.1.24 – No replication; force updates to disk (except
HBase, which primarily commits to memory) • Workloads – 120 million 1 KB records = 20 GB per server
– Reads retrieve whole record; updates write a single field – 100 or more client threads • Caveats –
Write performance would be improved for Sherpa, sharded MySQL and Cassandra with a dedicated log
disk – We tuned each system as well as we knew how, with assistance from the teams of developers
Comment: Cassandra is optimized for writes, and achieves higher throughput and lower latency. Sherpa
and MySQL achieve roughly comparable performance, as both are limited by MySQL’s capabilities.
HBase has good write latency, because of commits to memory, and somewhat higher read latency,
because of the need to reconstruct records
106.
http://www.vldb.org/pvldb/vol12/p2300-cooper.pdf
107.
https://cassandra.apache.org/
Some of the largest production deployments include Apple's, with over 75,000 nodes
storing over 10 PB of data, Netflix (2,500 nodes, 420 TB, over 1 trillion requests per day),
Chinese search engine Easou (270 nodes, 300 TB, over 800 million requests per day), and
eBay (over 100 nodes, 250 TB).
108.
https://thenewstack.io/apache-streaming-projects-exploratory-guide/
Apache streaming projects, also used for streamed data ETL or general purpose ETL.
Notable is Apache Apex ( by DataTorrent, 100 times faster than apache spark)
Apache Apex is positioned as industry’s only open-source enterprise-grade engine capable of handling
batch data as well as streaming data needs. It is a data-in-motion platform that allows for a unification
of processing of real-time streams of unbounded data (streaming job), or bounded data in conventional
files (batch job).
Apache Samza Apache Samza
Apache Samza was developed at LinkedIn to avoid the large turn-around times involved in Hadoop’s
batch processing. It is built on top of Apache Kafka, a low-latency distributed messaging system. Samza
was built to provide a lightweight framework for continuous data processing.
The combination of Kafka and Samza is analogous to HDFS and MapReduce. If HDFS acts as the input for
MapReduce jobs, Kafka ingests data processed by Samza. Samza can continuously compute results as
and when the data arrives delivering sub-second response times.
109.
https://towardsdatascience.com/a-brief-introduction-to-two-data-processing-architectures-lambda-
and-kappa-for-big-data-4f35c28005bb
https://hazelcast.com/glossary/kappa-architecture/
110
https://aws.amazon.com/blogs/apn/optimizing-presto-sql-on-amazon-emr-to-deliver-faster-query-
processing/
The migration to Amazon EMR Presto SQL allowed Seagate Technologies to continue processing
petabytes of data fast, at a reduced cost of Amazon EMR, and resulted in a 10x improvement of the
execution time of queries.
Since we deployed the solution with network load balancing, Seagate could switch clusters without
impacting their end user. Seagate saved a lot of CPU time, and their end users could get query
results in minutes rather than hours. This improved the overall efficiency of both their Amazon EMR
cluster and business operations.
Read more about Amazon EMR in our other posts in this series:
111.
https://aws.amazon.com/blogs/database/run-a-petabyte-scale-cluster-in-amazon-elasticsearch-service/
When you use Amazon Elasticsearch Service (Amazon ES) for log data, you’re drinking from what
usually becomes a forceful firehose. As your Elasticsearch and Kibana knowledge deepens, you find
many compelling uses of your data. As your customer base scales up and you scale your
infrastructure to handle it, you generate even more log data.
We recently doubled our support limit for large clusters in Amazon ES. Today, we support a default
limit of 20 data instances, with up to 200 data instances in a single cluster with a limit raise for your
domain. If you choose I3.16xlarge.elasticsearch instances for your data instances, that gives you
up to 3 PB of storage in your cluster. What are the best practices for successfully running these XL
workloads?
Use I3 instances
You need to use I3s to get to petabyte scale. I3 instances have NVMe SSD ephemeral store. At the
large end, I3.16xlarge.elasticsearch instances have 15.2 TB storage per node, for a total of 3.04 PB
in a 200 node domain. If you choose one of our EBS-backed instance types, you can deploy up to
1.5 TB of EBS volume per data instance in Amazon ES, giving you a maximum cluster size of 300
TB. Note that we are always evaluating and adding new instance types, and evaluating and
changing our service limits. Future service improvements might change the recommendations
highlighted in this post.
Amazon Elasticsearch Service domains offer attached storage of up to 3 PB. You can configure a
domain with 200 i3.16xlarge.elasticsearch instance types, each with 15 TB of storage. Because of
the sheer difference in scale, recommendations for domains of this size differ from our general
recommendations. This section discusses considerations for creating domains, costs, storage, and
shard size.
While this section frequently references the i3.16xlarge.elasticsearch instance types, you can use
several other instance types to reach 1 PB of total domain storage.
Amazon Elasticsearch Service (Amazon ES) is a managed service that makes it easy to deploy, operate,
and scale Elasticsearch clusters in the AWS Cloud. Elasticsearch is a popular open-source search and
analytics engine for use cases such as log analytics, real-time application monitoring, and clickstream
analysis. With Amazon ES, you get direct access to the Elasticsearch APIs; existing code and applications
work seamlessly with the service.
https://aws.amazon.com/elasticsearch-service/pricing/
Storage Optimized - Current
vCPU Memory (GiB) Instance Storage (GB) Price Per hour
Generation
i3.large.elasticsearch 2 15.25 1 x 475 NVMe SSD $0.25
i3.xlarge.elasticsearch 4 30.5 1 x 950 NVMe SSD $0.50
i3.2xlarge.elasticsearch 8 61 1 x 1900 NVMe SSD $1.00
i3.4xlarge.elasticsearch 16 122 2 x 1900 NVMe SSD $2.00
i3.8xlarge.elasticsearch 32 244 4 x 1900 NVMe SSD $3.99
i3.16xlarge.elasticsearch 64 488 8 x 1900 NVMe SSD $7.99
112.
https://cdn2.hubspot.net/hubfs/488249/Asset%20PDFs/Benchmark_BI-on-
Hadoop_Performance_Q4_2016.pdf#page=11
In order to evaluate the base performance of the SQL-on-Hadoop engines we compared the
performance of Cloudera Impala, Hive-on-Tez, and Spark SQL, and Presto running against a retail data
set as defined in the initial section of this document. The test environment was an isolated 12 node
cluster with 1 master node, 1 gateway node, and 10 data nodes. Benchmark Cluster Configuration
Details RAM per node 128G CPU specs for data (worker) nodes 32 CPU cores Storage specs for data
(worker) nodes 2x 512mb SSD The SQL-on-Hadoop engine benchmarks were performed by running each
engine individually against the same based Hadoop cluster configuration and data set. Details about
engine-specific configurations and settings are below:
Conclusion: All engines demonstrate consistent query performance degradation under concurrent
workload. • There were no query failures for any of the engines tested, up to 25 concurrent queries. •
While all engines can effectively handle a large number of concurrent analytic queries, Spark, Impala,
and Presto maintain a better average performance profile vs. Hive as concurrent query workload
increases.
113.
https://eng.uber.com/uber-big-data-platform/
https://eng.uber.com/hoodie/
library to improve Hadoop performance
Generation 3: Rebuilding our Big Data platform for the long
term
By early 2017, our Big Data platform was used by engineering and operations teams
across the company, enabling them to access new and historical data all in one place.
Users could easily access data in Hive, Presto, Spark, Vertica, Notebook, and more
warehouse options all through a single UI portal tailored to their needs. With over 100
petabytes of data in HDFS, 100,000 vcores in our compute cluster, 100,000 Presto queries
per day, 10,000 Spark jobs per day, and 20,000 Hive queries per day, our Hadoop
analytics architecture was hitting scalability limitations and many services were affected
by high data latency.
Fortunately, since our underlying infrastructure was horizontally scalable to address the
immediate business needs, we had enough time to study our data content, data access
patterns, and user-specific requirements to identify the most pressing concerns before building
the next generation. Our research revealed four main pain points:
1. HDFS scalability limitation: This issue is faced by many companies who rely on
HDFS to scale their big data infrastructures. By design, HDFS is bottlenecked by
its NameNode capacity, so that storing large numbers of small files can
significantly affect performance. This limitation usually occurs when data size
grows beyond ten petabytes and becomes a real issue beyond 50-100 petabytes.
Fortunately, there are relatively straightforward solutions to scale HDFS from a
few tens to a few hundreds of petabytes, for instance leveraging ViewFS and
using HDFS NameNode Federation. By controlling the number of small files and
moving different parts of our data to separate clusters (e.g., HBase and Yarn app
logs moved into a separate HDFS cluster), we were able to mitigate this HDFS
limitation.
2. Faster data in Hadoop: Uber’s business operates in real time and as such, our
services require access to data that is as fresh as possible. As a result, 24-hour
data latency was way too slow for many use cases and there was huge demand
for faster data delivery. Our second generation Big Data platform’s snapshot-
based ingestion method was inefficient and prevented us from ingesting data with
lower latency. To speed up data delivery, we had to re-architect our pipeline to
the incremental ingestion of only updated and new data.
3. Support of updates and deletes in Hadoop and Parquet: Uber’s data contains
a lot of updates, ranging in age from the past few days (e.g., a rider or driver-
partner adjusting a recent trip fare) to a few weeks (e.g., a rider rating their last
trip the next time they take a new trip) or even a few months (e.g., backfilling or
adjusting past data due to a business need). With snapshot-based ingestion of
data, we ingest a fresh copy of the source data every 24 hours. In other words,
we ingest all updates at one time, once per day. However, with the need for
fresher data and incremental ingestion, our solution must be able to support
update and delete operations for existing data. However, since our Big Data is
stored in HDFS and Parquet, it is not possible to directly support update
operations on the existing data. On the other hand, our data contains extremely
wide tables (around 1,000 columns per table) with five or more levels of nesting
while user queries usually only touch a few of these columns, preventing us from
using non-columnar formats in a cost-efficient way. To prepare our Big Data
platform for long-term growth, we had to find a way to solve this limitation within
our HDFS file system so that we can support update/delete operations too.
4. Faster ETL and modeling: Similar to raw data ingestion, ETL and modeling jobs
were snapshot-based, requiring our platform to rebuild derived tables in every
run. To reduce data latency for modeled tables, ETL jobs also needed to become
incremental. This required the ETL jobs to incrementally pull out only the changed
data from the raw source table and update the previous derived output table
instead of rebuilding the entire output table every few hours.
Introducing Hudi
With the above requirements in mind, we built Hadoop Upserts anD Incremental (Hudi), an open
source Spark library that provides an abstraction layer on top of HDFS and Parquet to support
the required update and delete operations. Hudi can be used from any Spark job, is horizontally
scalable, and only relies on HDFS to operate. As a result, any Big Data platform that needs to
support update/delete operations for the historical data can leverage Hudi.
Hudi enables us to update, insert, and delete existing Parquet data in Hadoop. Moreover, Hudi
allows data users to incrementally pull out only changed data, significantly improving query
efficiency and allowing for incremental updates of derived modeled tables.
Raw data in our Hadoop ecosystem is partitioned based on time and any of the old partitions
can potentially receive updates at a later time. Thus, for a data user or an ETL job relying on
these raw source data tables, the only way to know what date partition contains updated data is
to scan the entire source table and filter out records based on some known notion of time. This
results in a computationally expensive query requiring a full source table scan and prevents ETL
jobs from running very frequently.
With Hudi, users can simply pass on their last checkpoint timestamp and retrieve all the records
that have been updated since, regardless of whether these updates are new records added to
recent date partitions or updates to older data (e.g., a new trip happening today versus an
updated trip from 6 months ago), without running an expensive query that scans the entire
source table.
Using the Hudi library, we were able to move away from the snapshot-based ingestion of raw
data to an incremental ingestion model that enables us to reduce data latency from 24 hours to
less than one hour. Figure 5, below, depicts our Big Data platform after the incorporation of
Hudi:
Figure 5: The third generation of our Big Data platform incorporates faster, incremental data ingestion
(using our open source Marmaray framework), as well as more efficient storage and serving of data via
our open source Hudi library.
114.
https://eng.uber.com/introducing-ludwig/
Ludwig AI
115.
https://github.com/horovod/horovod
https://uber.github.io/ludwig/
116.
http://boston.lti.cs.cmu.edu/classes/95-865-K/HW/HW2/
117.
https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html
118.
https://journalofbigdata.springeropen.com/track/pdf/10.1186/s40537-019-0215-2
Evaluation of distributed stream processing frameworks for IoT applications in Smart Cities
http://pages.cs.wisc.edu/~shivaram/cs744-readings/Heron.pdf
Apache Heron Twitter open sourced Heron is re-imagined Storm with emphasis on higher scalability and
better debug ability [32]. Te goals of developing Heron are handling petabytes of data, improving
developer productivity, simplifying debugging and providing better efciency [33]. As it is shown in Fig. 7,
Heron consists of three key components:
Apache Storm, Apache Flink and Spark Streaming. Flink performs best on latency.
119.
Big Data Size Total ~20 PB DW on S3 Read ~10% DW daily Write ~10% of read data daily ~ 500 billion
events daily ~ 350 ac(ve pla=orm users (~100 analysts)
120
https://conferences.oreilly.com/strata/strata-ny-2019/public/schedule/detail/76539
Uber relies heavily on making data-driven decisions in every product area and needs to store
and process an ever-increasing amount of data. But building a reliable big data platform is
extremely challenging when it has to store and serve hundreds of petabytes of data in real time.
The company redesigned traditional big data platform solutions to provide faster, more reliable,
and more performant access by adding a few critical technologies that overcome their
limitations.
Reza Shiftehfar reflects on the challenges faced and proposes architectural solutions to scale a
big data platform to ingest, store, and serve 100+ PB of data with minute-level latency while
efficiently utilizing the hardware and meeting security needs. You’ll get a behind-the-scenes look
at the current big data technology landscape, including various existing open source
technologies (e.g., Hadoop, Spark, Hive, Presto, Kafka, and Avro) as well as what Uber’s tools
such as Hudi and Marmaray.
https://github.com/apache/incubator-hudi
Hudi is an open source analytical storage system created at Uber to manage petabytes of data
on HDFS-like distributed storage. Hudi provides near-real-time ingestion and provides
different views of the data: a read-optimized view for batch analytics, a real-time view for driving
dashboards, and an incremental view for powering data pipelines. Hudi also effectively
manages files on underlying storage to maximize operational health and reliability. Reza details
how Hudi lowers data latency across the board while simultaneously achieving orders of
magnitude of efficiency over traditional batch ingestion. He then makes the case for near-real-
time dashboards built on top of Hudi datasets, which can be cheaper than pure streaming
architectures.
Marmaray is an open source plug-in based pipeline platform connecting any arbitrary data
source to any data sink. It allows unified and efficient ingestion of raw data from a variety of
sources to Hadoop as well as the dispersal of the derived analysis result out of Hadoop to any
online data store. Reza explains how Uber built and designed a common set of abstractions to
handle both the ingestion and dispersal use cases, along with the challenges and lessons
learned from developing the core library and setting up an on-demand self-service workflow.
Along the way, you’ll see how Uber scaled the platform to move around billions of records per
day.
You’ll also dive into the technical aspects of how to rearchitect the ingestion platform to bring in
10+ trillion events per day at minute-level latency, how to scale the storage platform, and how to
redesign the processing platform to efficiently serve millions of queries and jobs per day. You’ll
leave with greater insight into how things work in an extensible modern big data platform and
inspired to reenvision your own data platform to make it more generic and flexible for future new
requirements.
121.
https://hal.archives-ouvertes.fr/hal-01984327/document
122.
http://downloads.hindawi.com/journals/sp/2020/4176794.pdf
123.
https://dzone.com/articles/hadoop-cluster-capacity-planning
https://www.cisco.com/c/dam/en_us/training-events/events/Strata-Data-Conference/Strata-Data-
PDFs/Analytics-Zoo_Building-End-to-End-Integrated-Big-Data-Analytics-and-AI-Solutions.pdf
125.
https://www.vldb.org/pvldb/vol10/p901-anderson.pdf
Code at https://github.com/intel/spark-mpi-adapter
https://github.com/asonje/PAT
Apache Spark is a popular framework for data analytics with attractive features such as fault tolerance
and interoperability with the Hadoop ecosystem. Unfortunately, many analytics operations in Spark are
an order of magnitude or more slower compared to native implementations written with high
performance computing tools such as MPI. There is a need to bridge the performance gap while
retaining the benefits of the Spark ecosystem such as availability, productivity, and fault tolerance. In
this paper, we propose a system for integrating MPI with Spark and analyze the costs and benefits of
doing so for four distributed graph and machine learning applications. We show that offloading
computation to an MPI environment from within Spark provides 3.1−17.7× speedups on the four sparse
applications, including all of the overheads. This opens up an avenue to reuse existing MPI libraries in
Spark with little effort.
Spark+MPI For the Spark+MPI implementation of the applications, we use two readily available MPI-
based libraries. For PR, SSSP, and LDA, we use GraphMat [32], a framework for graph analytics. It
provides high performance by leveraging optimized OpenMP-parallel sparse matrix primitives.
GraphMat was recently extended to distributed systems through the integration of MPI-based
distributed primitives [2]. Graph data is distributed across nodes in a 2D block cyclic format and vertex
properties are distributed across nodes as well. Messages derived from vertex properties are
compressed into sparse vectors and sent to other nodes using MPI. Local sparse matrix-vector
multiplications are parallelized with OpenMP. For CPD, we use the Surprisingly ParalleL spArse Tensor
Toolkit (SPLATT), an MPI+OpenMP library for factoring large, sparse tensors [30]. It uses a medium-
grained decomposition which distributes an N-mode tensor over an N-dimensional grid of MPI processes
[31]. The mediumgrained decomposition requires two communication stages, but limits communication
in the ith mode to processes that share a coordinate in the ith dimension of the process grid.
To test the performance of Spark+MPI, we perform experiments on a cluster with up-to-date hardware
and software using datasets representative for the applications we consider.1 6.1 Configuration Each
machine in our cluster has 2 Intel R Xeon R E5- 2699 v4 processors (22 cores, 44 threads each), 128 GiB
DDR4 RAM, an SSD, a 1 Gbit/s Ethernet interface, and a 100 Gbit/s Intel R Omni-Path interface. We use
CentOS 7.2 (with Linux 3.10), Intel R Compiler 16.0.3, Intel R MKL 11.3.3, Intel R MPI 5.1.3, Oracle Java
1.8.0, HDFS 2.7.2, and Spark 2.0.0. Our cluster has 13 machines. We run the HDFS and Spark master on
one machine, the Spark driver on another machine, and 11 Spark workers on the remaining machines in
the cluster. When we run MPI on the cluster, it uses the 12 compute nodes (the driver + 11 workers).
Spark is configured to use the G1GC garbage collector and the Kryo serializer. We considered (1)
allowing Spark to choose the default number of partitions based on the number of HDFS blocks, (2)
setting the number of partitions to the total number of cores, and (3) setting the number of partitions to
twice the number of cores. We found that (1) provided the best overall performance for all Spark
implementations except for CPD which excelled with (3). We use this setting for our results. We also use
the default partitioning strategy for GraphX data structures. When Spark is running alone, we give 96
GiB of memory for both the driver and the executors. When we run MPI alongside Spark, we give the
Spark driver and executors 80 GiB of memory. We set the number of driver and executor cores to 88,
which is equal to the number of threads the CPUs can execute concurrently, meaning that one executor
runs per worker. When we launch MPI, we set the number of OpenMP threads to 44, equal to the
number of cores in each machine. We run one MPI process per machine. We disregard the time to
initially load input datasets from HDFS into Spark. Each benchmark begins timing only after the input
dataset has been loaded from HDFS, stored in an RDD which is cached using cache, and then counted
which ensures that the previous tasks complete before moving on to the rest of the application. Because
Spark uses lazy execution, we need to take actions such as these to ensure that all computations are
actually executed at the expected time during each benchmark. PR, SSSP, and LDA all produce their
outputs in RDDs. We call cache and count on these outputs to do this, and we include the runtime for
this overhead as part of the total algorithm runtime. For CPD, both the Spark implementation and
Spark+MPI implementation produce outputs on the driver. The overhead time to transfer these outputs
from RDDs to the driver for the Spark+MPI implementation is included as part of the total algorithm
runtime. We also call cache and count on the Spark data structures after their construction and include
this as part of Spark data structure construction time. PR, LDA, and CPD all use double-precision
arithmetic for all implementations. For SSSP, we store the depths as Int for all implementations. GraphX
represents vertex IDs as Long, while GraphMat and OptSpark store these as Int. We run PR for 54
iterations, which is the total number of iterations required for convergence in GraphMat for our dataset
(for Spark and SparkOpt, we set the number of iterations to this number). We solve LDA with 20-
dimensional factors, α and η equal to the Spark defaults (3.5 and 1.1, respectively), and run for 10
iterations. We run CPD with 10-dimensional factors for 22 iterations, which is the total number of
iterations needed for convergence in SPLATT. SSSP naturally runs the same number of iterations in each
implementation. Micro-benchmarks run for 10 iterations total and the runtime of the first iteration is
discarded. For system-level performance analysis, we use the sar tool from the Sysstat package
executed with the Intel Performance Analysis Tool2 . We collect samples every 1 s for CPU usage and
network transfer
126.
https://opensource.com/business/15/1/apache-spark-new-world-record
InOctober 2014, Databricks participated in the Sort Benchmark and set a new
world record for sorting 100 terabytes (TB) of data, or 1 trillion 100-byte
records. The team used Apache Spark on 207 EC2 virtual machines and
sorted 100 TB of data in 23 minutes.
Named after Jim Gray, the benchmark workload is resource intensive by any
measure: sorting 100 TB of data following the strict rules generates 500 TB of
disk I/O and 200 TB of network I/O. Organizations from around the world often
build dedicated sort machines (specialized software and sometimes
specialized hardware) to compete in this benchmark.
Spark Spark
Hadoop MR
Record 1 PB
Record
Data Size 102.5 TB 100 TB 1000 TB
Elapsed Time 72 mins 23 mins 234 mins
# Nodes 2100 206 190
# Cores 50400 physical 6592 virtualized 6080 virtualized
Cluster disk 3150 GB/s
618 GB/s 570 GB/s
throughput (est.)
Sort Benchmark
Yes Yes No
Daytona Rules
dedicated data virtualized (EC2) virtualized (EC2)
Network
center, 10Gbps 10Gbps network 10Gbps network
Sort rate 1.42 TB/min 4.27 TB/min 4.27 TB/min
Sort rate/node 0.67 GB/min 20.7 GB/min 22.5 GB/min
127.
https://aws.amazon.com/snow/
AWS Snowball
AWS Snowball is a data migration and edge computing device that comes in two device options:
Compute Optimized and Storage Optimized. Snowball Edge Storage Optimized devices provide 40
vCPUs of compute capacity coupled with 80 terabytes of usable block or Amazon S3-compatible
object storage. It is well-suited for local storage and large-scale data transfer. Snowball Edge
Compute Optimized devices provide 52 vCPUs, 42 terabytes of usable block or object storage, and
an optional GPU for use cases such as advanced machine learning and full motion video analysis in
disconnected environments. Customers can use these two options for data collection, machine
learning and processing, and storage in environments with intermittent connectivity (such as
manufacturing, industrial, and transportation) or in extremely remote locations (such as military or
maritime operations) before shipping it back to AWS. These devices may also be rack mounted and
clustered together to build larger, temporary installations.
AWS Snowmobile
AWS Snowmobile moves up to 100 PB of data in a 45-foot long ruggedized shipping container and
is ideal for multi-petabyte or Exabyte-scale digital media migrations and data center shutdowns. A
Snowmobile arrives at the customer site and appears as a network-attached data store for more
secure, high-speed data transfer. After data is transferred to Snowmobile, it is driven back to an
AWS Region where the data is loaded into Amazon S3. Snowmobile is tamper-resistant, waterproof,
and temperature controlled with multiple layers of logical and physical security -- including
encryption, fire suppression, dedicated security personnel, GPS tracking, alarm monitoring, 24/7
video surveillance, and an escort security vehicle during transit.
AWS Snowball Edge AWS Snowball Edge
AWS Snowmobile
Storage Optimized Compute Optimized
Data Migration, data analytics,
http://helper.ipam.ucla.edu/publications/dmc2017/dmc2017_14378.pdf
https://arxiv.org/pdf/1508.01443.pdf
129.
https://opensource.com/business/15/1/apache-spark-new-world-record
In October 2014, Databricks participated in the Sort Benchmark and set a new
world record for sorting 100 terabytes (TB) of data, or 1 trillion 100-byte
records. The team used Apache Spark on 207 EC2 virtual machines and
sorted 100 TB of data in 23 minutes.
Named after Jim Gray, the benchmark workload is resource intensive by any
measure: sorting 100 TB of data following the strict rules generates 500 TB of
disk I/O and 200 TB of network I/O. Organizations from around the world often
build dedicated sort machines (specialized software and sometimes
specialized hardware) to compete in this benchmark.
Spark Spark
Hadoop MR
Record 1 PB
Record
Data Size 102.5 TB 100 TB 1000 TB
Elapsed Time 72 mins 23 mins 234 mins
# Nodes 2100 206 190
# Cores 50400 physical 6592 virtualized 6080 virtualized
Cluster disk 3150 GB/s
618 GB/s 570 GB/s
throughput (est.)
Sort Benchmark
Yes Yes No
Daytona Rules
dedicated data virtualized (EC2) virtualized (EC2)
Network
center, 10Gbps 10Gbps network 10Gbps network
Sort rate 1.42 TB/min 4.27 TB/min 4.27 TB/min
Sort rate/node 0.67 GB/min 20.7 GB/min 22.5 GB/min
130.
https://databricks.com/blog/2018/05/03/benchmarking-apache-spark-on-a-single-node-machine.html
We also tested the minimal file size that Pandas will fail to load on the i3.8xlarge
instance.
For Spark, it is easy to scale from small data set on a laptop to “big data” on a
cluster with one single API. Even on a single node, Spark’s operators spill data to
disk if it does not fit in memory, allowing it to run well on any sized data.
Software
OS: Ubuntu 16.04
Spark: Apache Spark 2.3.0 in local cluster mode
Pandas version: 0.20.3
Python version: 2.7.12
PySpark and Pandas
The input dataset for our benchmark is table “store_sales” from TPC-DS, which
has 23 columns and the data types are Long/Double.
Scalability
Pandas requires a lot of memory resource to load data files. The following test
loads table “store_sales” with scales 10 to 270 using Pandas and Pyarrow and
records the maximum resident set size of a Python process. As the graph below
suggests that as the data size linearly increases so does the resident set size
(RSS) on the single node machine.
132.
Jason (Jinquan) Dai Intel Corporation Yiheng Wang∗ Tencent Inc. Xin Qiu Intel Corporation Ding Ding
Intel Corporation
https://arxiv.org/pdf/1804.05839.pdf
https://github.com/intel-analytics/analytics-zoo
https://software.intel.com/content/www/us/en/develop/tools/math-kernel-library.html
https://software.intel.com/content/www/us/en/develop/articles/intel-mkl-dnn-part-1-library-overview-
and-installation.html
This section evaluates the computing performance and scalability of neural network training in BigDL. In
addition, while we do not report inference performance results in this section, Section 5.1 shows the
comparison of a real-world object detection inference pipeline running on BigDL vs. Caffe (and as
reported by JD.com, the BigDL inference pipeline running on 24 Intel Xeon servers is 3.83x faster than
Caffe running on 5 servers and 20 GPU cards ).
To study the computing performance of BigDL, we compare the training speed of the NCF model using
BigDL and PyTorch. MLPerf has provided a reference implementation of the NCF program [40] based on
PyTorch 0.4, which trains a movie recommender using the MovieLens 20Million dataset (ml-20m) [41], a
widely used benchmark dataset with 20 million ratings and 465,000 tags applied to 27,000 movies by
138,000 users. It also provides the reference training speed of the PyTorch implementation (to achieve
the target accuracy goal) on a single Nvidia P100 GPU. We have implemented the same NCF program
using BigDL 0.7.0 and Spark 2.1.0 [42]. We then trained the program on a dual-socket Intel Skylake 8180
2.5GHz server (with 56 cores in total and 384GB memory), and it took 29.8 minutes to converge and
achieve the same accuracy goal.
Two categories of neural network models are used in this section to evaluate the performance and
scalability of BigDL, namely, neural collaborative filtering (NCF) and convolutional neural network (CNN),
which are representatives of the workloads that BigDL users run in their production Big Data platform.
Neural Collaborative Filtering (NCF) [34] is one of most commonly used neural network models for
recommendation, and has also been included in MLPerf [35], a widely used benchmark suite for
measuring training and inference performance of machine learning hardware, software, and services. In
our experiments, we compare the training performance of BigDL (running on Intel Xeon server) vs.
PyTorch (running on GPU).
132.
https://engineering.fb.com/core-data/apache-spark-scale-a-60-tb-production-use-case/
While this post details our most challenging use case for Spark, a growing
number of customer teams have deployed Spark workloads into production.
Performance, maintainability, and flexibility are the strengths that continue to
drive more use cases to Spark. Facebook is excited to be a part of the Spark
open source community and will work together to develop Spark toward its full
potential.
133.
https://opensource.com/article/19/3/sql-scale-apache-spark-sql-and-dataframes
134.
https://ssc.io/pdf/tpctc18.pdf
135.
https://supercomputersfordl2017.github.io/
https://supercomputersfordl2017.github.io/Presentations/TrainLongerPresentation.pdf
https://supercomputersfordl2017.github.io/Presentations/DeepReinforcementLearningatScale.pdf
136.
https://singa.apache.org/docs/graph/
137:
Jie Jiang1,2,†, Lele Yu1,†, Jiawei Jiang1, Yuhong Liu2 and Bin Cui1,∗
1Key Lab of High Confidence Software Technologies (MOE), School of EECS, Peking University, Beijing
100871, China and 2Data Platform, Tencent Inc., Shenzhen 518057, China
i)Integration with existing ecosystems: To integrate easily with existing ecosystems, ideally, our users
wanted a system that is written in Java and can be deployed easily with HDFS and Yarn. In fact, in our
experience, it is difficult to deploy systems such as Petuum in their environment. However, this imposes
a question about performance—can we achieve reasonable speed with a Java-based distributed ML
system instead of C/C++-based systems?
(ii) One-system-fits-all: existing ML systems either support data parallelism [3,6,7] or model parallelism
[8,9,10]. Because of the diverse range of applications that we need to support, our users wanted a
‘single’ system to support all these different types of parallelisms. This requires us to carefully design the
system architecture to accommodate both type of parallelisms in an effective and efficient way
C++ vs Java ( Spark and others only use one type of parallelism, model or data). ANGEL solves this
problem.
Techniques /algorithms it supports: . Angel is used in the production environment of Tencent to run
models including LR(Logistics Regression), Support Vector Machine (SVM), LDA, GBDT, KMeans, Matrix
Factorization (MF) for commercial recommendation, user profiling and various applications.We have
successfully scaled Angel up to hundreds of machines and processing hundreds of gigabytes of data with
billions of model parameters. Compared with existing systems such as Petuum and Spark, Angel is at
least 2×, and sometimes up to 10× faster.
138.
https://mesosphere.github.io/marathon/
https://princessleia.com/presentations/2017/The_SMACK_Stack_at_Dublin_Kafka_Meetup.pdf
https://mesosphere.github.io/presentations/2016-09-20-velocitycon-ny/continuous-delivery-with-dcos-
and-jenkins.pdf
https://indico.cern.ch/event/469775/contributions/2148329/attachments/1285672/1912108/Introducti
on_to_Mesos_and_DCOS.pdf
https://www.confluent.io/wp-content/uploads/2016/09/2016-09-27-Confluent-Mesosphere-White-
Paper-1.pdf
https://www.jfokus.se/jfokus20-preso/Orchestration-deep-dive.pdf
https://assets.ext.hpe.com/is/content/hpedam/documents/a00039000-
9999/a00039256/a00039256enw.pdf
https://princessleia.com/presentations/2017/The_SMACK_Stack_at_Dublin_Kafka_Meetup.pdf
https://h20195.www2.hpe.com/V2/GetPDF.aspx/4AA0-8758ENW.pdf
Marathon
A container orchestration platform for Mesos and DC/OS
https://www.voltdb.com/wp-content/uploads/2017/03/hv-white-paper-voltdb-technical-overview.pdf
https://www.voltdb.com/wp-content/uploads/2017/04/VoltDB_Technical_Overview_14Sept17.pdf
For fast, data-driven applications, VoltDB offers substantial performance and cost advantages as
illustrated by the results of the TPC-C-like benchmark below, in which VoltDB and a well-known OLTP
DBMS were compared running the same test on identical hardware (Dell R610, 2x 2.66Ghz Quad-Core
Xeon 5550 with 12x 4GB (48GB) DDR3-1333 Registered ECC DIMMs, 3x72GB 15K RPM 2.5in Enterprise
SAS 6GBPS Drives):
As expected with increasing partition size and increasing core count the throughput increases
significantly. For the 27 node cluster with 4 cores and 4 partitions, the throughput was just under 75K,
while for the 27 node cluster with 16 cores and 16 partitions, the throughput reached to over 3MM
operations per second!
https://downloads.voltdb.com/documentation/UsingVoltDB.pdf
140.
What about HDFS & Amazon Glacier? • Use HDFS for very frequently accessed (hot) data • Use Amazon
S3 Standard for frequently accessed data • Use Amazon S3 Standard – IA for infrequently accessed data
• Use Amazon Glacier for archiving cold data
142.
AWS
Presto SQL Query engine that works with noSQL and RDBMS.
Presto can query data where it is stored, without needing to move data into a separate analytics
system. Query execution runs in parallel over a pure memory-based architecture, with most
results returning in seconds. You’ll find it used by many well-known companies
like Facebook, Airbnb, Netflix, Atlassian, and Nasdaq.
What is the history of Presto?
Presto started as a project at Facebook, to run interactive analytic queries against a 300PB data
warehouse, built with large Hadoop/HDFS-based clusters. Prior to building Presto, Facebook used
Apache Hive, which it created and rolled out in 2008, to bring the familiarity of the SQL syntax to the
Hadoop ecosystem. Hive had a significant impact on the Hadoop ecosystem for simplifying complex
Java MapReduce jobs into SQL-like queries, while being able to execute jobs at high scale.
However, it wasn’t optimized for fast performance needed in interactive queries.
In 2012, the Facebook Data Infrastructure group built Presto, an interactive query system that could
operate quickly at petabyte scale. It was rolled out company-wide in spring, 2013. In November,
2013, Facebook open sourced Presto under the Apache Software License, and made it available for
anyone to download on Github. Today, Presto has become a popular choice for doing interactive
queries on Hadoop, and has a lot of contributions from Facebook, and other organizations.
Facebook’s implementation of Presto is used by over a thousand employees, who run more than
30,000 queries, processing one petabyte of data daily.
https://aws.amazon.com/blogs/big-data/amazon-emr-introduces-emr-runtime-for-apache-spark/
143
https://arxiv.org/ftp/arxiv/papers/1907/1907.06690.pdf
A Scalable Framework for Multilevel Streaming Data Analytics using Deep Learning
Shihao Ge School of Computing Queen’s University Kingston, ON, Canada ge.g@queensu.ca Haruna Isah
School of Computing Queen’s University Kingston, ON, Canada isah@cs.queensu.ca
3 Million Ops per second benchmark
144. https://research.fb.com/wp-
content/uploads/2016/11/realtime_data_processing_at_facebook.pdf
Realtime data processing systems are used widely to provide insights about events as they happen.
Many companies have developed their own systems: examples include Twitter’s Storm [28] and Heron
[20], Google’s Millwheel [9], and LinkedIn’s Samza [4]. We present Facebook’s Puma, Swift, and Stylus
stream processing systems here. The following qualities are all important in the design of a realtime
data system.
Design Puma Stylus Swift Storm Heron Spark Millwheel Flink Samza
decision Streaming
Language SQL C++ Python Java Java Functional C++ Functional Java
paradigm
Data Scribe Scribe Scribe RPC Stream RPC RPC RPC Kafka
transfer Manager
Processing at least at least at least at least at least best effort at least at least at least
semantics at most at most
exactly exactly exactly exactly
State-saving remote DB local DB limited remote DB global local DB
mechanism remote DB snapshot
Reprocessing same code same code no batch same DSL same DSL same code same code same code no batch
145. https://assets.ext.hpe.com/is/content/hpedam/documents/a00066000-
6999/a00066812/a00066812enw.pdf
HPE Reference Architecture for
Real-Time streaming analytics with
HPE Elastic Platform for Analytics
(EPA)
and MapR Data Platform (MapR)
Spark Streaming Flink Storm
Throughput High
Resource Management Native, YARN, MESOS, Kubernetes YARN, Kubernetes YARN, MESOS
Solution overview
The HPE Elastic Platform for Analytics (EPA) is a premier modular infrastructure foundation to accelerate business
insights, enabling organizations to rapidly deploy, efficiently scale. The HPE EPA solution for real-time streaming
analytics enables high volume data ingest, analytics, and data visualization with HPE Apollo Servers. The advantages
of adopting a combined HPE EPA and MapR solution are below.
• Ingesting huge volumes and generating a variety of real-time data by sensors with Edge Infrastructure
• Streaming a massive volume of (structured and unstructured) data to a scalable data pipeline with Core Infrastructure
• Persisting high velocity data (structured and unstructured) to Data Lake Storage Infrastructure.
Solution Architecture
At the core of this Reference Architecture is the underlying infrastructure building blocks. These blocks form the foundation
of the EPA solution designs and can be pieced together in different ways to solve unique customer needs. This section will
detail storage, compute, and AI accelerator blocks based on the HPE Apollo 4200, HPE Apollo 2000 and HPE Apollo 6500
servers required to build and implement streaming analytics on EPA Platform. The blocks defined in this section may be
modified (e.g., processor model, memory, etc.) to address new or changing workloads and environments. Specific workload
services and behavior will drive the final configuration requirements including compute and storage definition and
quantity. Below is the infrastructure building blocks required to implement streaming analytics on HPE EPA for real-time
streaming analytics solution with MapR Data Platform.
Table 1. Solution components
Blocks Model
Batch and Streaming Block HPE Apollo 2000 (XL 170r) Gen10
ELK
Amazon Elasticsearch Service also integrates with other AWS services such as Amazon Kinesis
Data Firehose, Amazon CloudWatch Logs, and AWS IoT giving you the flexibility to select the data
ingestion tool that meets your use case requirements.
https://aws.amazon.com/elasticsearch-service/the-elk-stack/
147.