Graphframes: An Integrated Api For Mixing Graph and Relational Queries

Download as pdf or txt
Download as pdf or txt
You are on page 1of 8

GraphFrames: An Integrated API for Mixing Graph and

Relational Queries

Ankur Dave? , Alekh Jindal , Li Erran Li† , Reynold Xin , Joseph Gonzalez? , Matei ZahariaO 
? University of California, Berkeley O MIT † Uber Technologies  Databricks  Microsoft

ABSTRACT gf = GraphFrame(vertices, edges)

Graph data is prevalent in many domains, but it has usually required triples = gf.pattern("(x:User)->(p:Product)<-(y:User)")
specialized engines to analyze. This design is onerous for users pairs.where(pairs.p.category == "Books")
and precludes optimization across complete workflows. We present .groupBy(pairs.p.name)
GraphFrames, an integrated system that lets users combine graph .count()
algorithms, pattern matching and relational queries, and optimizes
work across them. GraphFrames generalize the ideas in previous Listing 1: An example of the GraphFrames API. We create a Graph-
graph-on-RDBMS systems, such as GraphX and Vertexica, by let- Frame from two tables of vertices and edges, and then we search for
ting the system materialize multiple views of the graph (not just the all instances pattern, namely two users that bought the same product.
specific triplet views in these systems) and executing both iterative The result of this search is another table that we can then perform
algorithms and pattern matching using joins. To make applications filtering and aggregation on. The system will optimize across these
easy to write, GraphFrames provide a concise, declarative API based steps, e.g., pushing the filter above the pattern search.
on the “data frame” concept in R that can be used for both interactive
queries and standalone programs. Under this API, GraphFrames
use a graph-aware join optimization algorithm across the whole
computations on the result. With isolated graph analysis systems,
computation that can select from the available views.
users have to move data manually and there is no optimization of
We implement GraphFrames over Spark SQL, enabling parallel
computation across these phases of the workflow. Several recent
execution on Spark and integration with custom code. We find
systems have started to bridge this gap by running graph algorithms
that GraphFrames make it easy to express end-to-end workflows
on a relational engine [7, 9], but they have no support for pattern
and match or exceed the performance of standalone tools, while
matching and do not optimize across graph and relational queries.
enabling optimizations across workflow steps that cannot occur in
We present GraphFrames, an integrated system that can combine
current systems. In addition, we show that GraphFrames’ view
relational processing, pattern matching and graph algorithms and
abstraction makes it easy to further speed up interactive queries
optimize computations across them. GraphFrames generalize the
by registering the appropriate view, and that the combination of
ideas behind GraphX [7] and Vertexica [9] by maintaining arbitrary
graph and relational data allows for other optimizations, such as
views of a graph (e.g., triplets or triangles) and executing queries
attribute-aware partitioning.
using joins across them. They then optimize execution across the
relational and graph portions of the computation. A key challenge
1. INTRODUCTION in achieving this goal is query planning. For this purpose, we extend
Analyzing the graphs of relationships that occur in modern datasets a graph-aware dynamic programming algorithm by Huang et al. [8]
is increasingly important, in domains including commerce, social to select among multiple input views and compute a join plan. We
networks, and medicine. To date, this analysis has been done also propose an algorithm for suggesting new views based on the
through specialized systems like Neo4J [17], Titan [16] and GraphLab [11]. query workload.
These systems offer two main capabilities: pattern matching to find To make complete graph analytics workflows easy to write, Graph-
subgraphs of interest [17] and graph algorithms such as shortest Frames provide a declarative API similar to “data frames” in R,
paths and PageRank [12, 11, 7]. Python and Spark that integrates into procedural languages like
While graph analytics is powerful, running it in a separate system Python. Users build up a computation out of relational operators,
is both onerous and inefficient. Most workflows involve building pattern matching, and calls to algorithms, as shown in Listing 1.
a graph from existing data, likely in a relational format, then run- The system then optimizes across these steps, selecting join plans
ning search or graph algorithms on it, and then performing further and performing algebraic optimizations. Similar to systems like
Pig [14] and Spark SQL [4], the API makes it easy to build a com-
Permission to make digital or hard copies of all or part of this work for personal or putation incrementally while receiving the full benefits of relational
classroom use is granted without fee provided that copies are not made or distributed
for profit or commercial advantage and that copies bear this notice and the full citation optimization. Finally, the GraphFrames API is also designed to be
on the first page. Copyrights for components of this work owned by others than the used interactively: users can launch a session, define views that
author(s) must be honored. Abstracting with credit is permitted. To copy otherwise, or will aid their queries, and query data interactively from a Python
republish, to post on servers or to redistribute to lists, requires prior specific permission
and/or a fee. Request permissions from permissions@acm.org. shell. Unlike current tools, GraphFrames let analysts perform their
GRADES 2016, June 24 2016, Redwood Shores, CA, USA complete workflow in a single system.
© 2016 Copyright held by the owner/author(s). Publication rights licensed to ACM.
We have implemented GraphFrames over Spark SQL [4], and
ISBN 978-1-4503-4780-8/16/06. . . $15.00 made them compatible with Spark’s existing DataFrame API. We
DOI: http://dx.doi.org/10.1145/2960414.2960416 show that GraphFrames match the performance of other distributed
graph engines for various tasks, while enabling optimizations across arithmetic ones (+, -, etc). They also support aggregates, such as
the tasks that would not happen in other systems. Support for count("name").
multiple views of the graph adds significant benefits: for example, Internally, a DataFrame object represents a logical plan to com-
materializing a few simple views can speed up queries by 10× over pute a dataset. A DataFrame does not need to be materialized, until
the algorithm in [8]. Finally, by building on Spark, GraphFrames the user calls a special “output operation” such as save. This enables
interoperate easily with custom UDFs (e.g., ETL code) and with rich optimization across all operations that were used to build the
Spark’s machine learning library and external data sources. DataFrame.1
In summary, our contributions are: In terms of data type support, DataFrame columns support all
• A declarative API that lets users combine relational process- major SQL data types, including boolean, integer, double, decimal,
ing, pattern matching and graph algorithms into complex string, date, and timestamp, as well as complex (i.e., non-atomic)
workflows and optimizes across them. data types: structs, arrays, maps and unions. Complex data types can
also be nested together to create more powerful types. In addition,
• An execution strategy that generalizes those in Vertexica and DataFrame also supports user-defined types [4].
GraphX to support multiple views of the graph.
• A graph-aware query optimization algorithm that selects join
2.2 GraphFrame Data Model
plans based on the available views. A GraphFrame is logically represented as two DataFrames: an
edge DataFrame and a vertex DataFrame. That is to say, edges and
• An implementation and evaluation of GraphFrames on Spark. vertices are represented in separate DataFrames, and each of them
can contain attributes that are part of the supported types. Take
2. GRAPHFRAME API a social network graph for an example. The vertices can contain
The main programming abstraction in GraphFrames’ API is a attributes including name (string), age (integer), and geographic
GraphFrame. Conceptually, it consists of two relations (tables) location (a struct consisting of two floating point values for longitude
representing the vertices and edges of the graph, as well as a set of and latitude), while the edges can contain an attribute about the
materialized views of subgraphs. The vertices and edges may have time a user friended another (timestamp). The GraphFrame model
multiple attributes that are used in queries. The views are defined supports user-defined attributes with each vertex and edges, and
using patterns to match various shapes of subgraphs, as we shall thus is equivalent to the property graph model used in many graph
describe in Section 2.2.2. For example, a user might create a view systems including GraphX and GraphLab. GraphFrame is more
of all the triangles in the graph, which can then be used to quickly general than Pregel/Giraph since GraphFrame supports user-defined
answer other queries involving triangles. attributes on edges.
GraphFrames expose a concise language-integrated API that uni- Similar to DataFrames, a GraphFrame object is internally repre-
fies graph analytics and relational queries. We based this API on sented as a logical plan, and as a result the declaration of a Graph-
DataFrames, a common abstraction for data science in Python and Frame object does not necessarily imply the materialization of its
R that is also available as a declarative API on Spark SQL [4]. In data.
this section, we first cover some background on DataFrames, and Next, we explain how a GraphFrame can be constructed and
then discuss the additional operations available on GraphFrames. operations available on them.
We demonstrate the generality of GraphFrames for analytics by
mapping the core primitives in GraphX into GraphFrame operations. class GraphFrame {
// Different views on the graph
Finally, we discuss how GraphFrames integrate with the rest of def vertices: DataFrame
Apache Spark (e.g., the machine learning library). def edges: DataFrame
All the code examples are shown in Python. We show the Graph- def triplets: DataFrame
// Pattern matching
Frame API itself in Scala because it explicitly lists data types. def pattern(pattern: String): DataFrame

2.1 DataFrame Background // Relational-like operators


def filter(predicate: Column): GraphFrame
DataFrames are the main programming abstraction for manipu- def select(cols: Column*): GraphFrame
lating tables of structured data in R, Python, and Spark. Different def joinV(v: DataFrame, predicate: Column): GraphFrame
variants of DataFrames have slightly different semantics. For the pur- def joinE(e: DataFrame, predicate: Column): GraphFrame
pose of this paper, we describe Spark’s DataFrame implementation,
// View creation
which we build on [4]. Each DataFrame contains data grouped into def createView(pattern: String): DataFrame
named columns, and keeps track of its own schema. A DataFrame is
equivalent to a table in a relational database, and can be transformed // Partition function
def partitionBy(Column*) GraphFrame
into new DataFrames using various relational operators available in }
the API.
As an example, the following code snippet computes the number Listing 2: GraphFrame API in Scala
of female employees in each department by performing aggregation
and join between two data frames:
employees 2.2.1 Graph Construction
.join(dept , employees . deptId == dept.id)
.where( employees . gender == " female ") A GraphFrame can be constructed using two DataFrames: a
. groupBy (dept.id , dept.name) vertex DataFrame and an edge DataFrame. A DataFrame is merely
.agg(count (" name ")) a logical view (plan) and can support a wide range of sources that
employees is a DataFrame, and employees.deptId is an expres- 1 This aspect of Spark DataFrames is different from R and Python;
sion representing the deptId column. Expression objects have many in those languages, DataFrame contents are materialized eagerly
operators that return new expressions, including the usual compar- after each operation, which precludes optimization across the whole
ison operators (e.g., == for equality test, > for greater than) and logical plan [4].
implement a data source API. Some examples of a DataFrame input
include:
• a table registered in Spark SQL’s system catalog
• a table in an external relational database through JDBC
• JSON, Parquet, Avro, CSV files on disk
• a table in memory in columnar format
• a set of documents in ElasticSearch or Solr Figure 1: View Reuse
• results from relational transformations on the above
The following code demonstrates constructing a graph using a Programmatic Pattern Generation and Pattern Library Pat-
user table in a live transactional database and the edges table from terns such as cliques can be cumbersome to specify for interactive
some JSON based log files in Amazon S3: queries. We therefore provide a pattern library to programmatically
users = read.jdbc (" mysql ://...") generate and compose patterns. For example, a star of size K can
likes = read.json ("s3 ://...") be specified as (hub, spokes) = star(nodePred, edgePred, K).
graph = GraphFrame (users , likes ) nodePred and edgePred filters out nodes and edges. The pattern
returns a hub and a list of K-1 spokes. Their names can then be used
Again, since DataFrames and GraphFrames are logical abstrac-
in further pattern specification, or materialized immediately.
tions, the above code does not imply that users, likes, or graph are
materialized. 2.2.3 View Creation
2.2.2 Edges, Vertices, Triplets, and Patterns To enable reuse of computation, GraphFrames support view cre-
ation. The system materializes the view internally and uses it to
A GraphFrame exposes four tabular views of a graph: edges,
speed up subsequent queries. For example, in Figure 1, if we create
vertices, triplets, and a pattern view that supports specifying graph
a view on triplets, we can reuse it to create a triangle view. The
patterns using a syntax similar to the Cypher pattern language in
system will avoid rematerializing the triplet view for computing the
Neo4J [17].
triangle view. We will discuss how our query planner performs view
The edges view and the vertices view should be self-evident. The
selection to optimize the computation in the next section.
triplets view consists of each edge and its corresponding source and
destination vertex attributes. It can actually be constructed using the 2.2.4 Relational Operators
following 3-way join:
Since a GraphFrame exposes the four tabular views, it already
e.join(v, v.id == e.srcId) supports all the relational operators (e.g. project, filter, join) avail-
.join(v, v.id == e.dstId) able on these tabular views. Relational operators on these views can
already support many graph analysis algorithms. For example, the
We provide it directly since the triplets view is used commonly
following snippet computes the out-degree for each vertex:
enough. Note that edges, vertices, and triplets views are also the
three fundamental views in GraphX, and GraphFrames is at least as g. edges . groupBy (g.edges. srcId ). count ()
expressive as GraphX from the perspective of views.
In addition to the three basic tabular views, a GraphFrame also In addition to relational operators on the tabular views, a Graph-
supports a pattern operator that accepts a graph pattern in a Cypher- Frame also includes a few relational-like operators on a graph,
like syntax and returns a DataFrame consisting of edges and vertices namely select, filter, and join. These graph operators apply
specified by the pattern. This pattern operator enables easy expres- the corresponding relational operators to the vertices and/or edges,
sion of pattern matching in graphs. as appropriate. For example, filter applies a predicate to the ver-
Typical graph patterns consist of two nodes connected by a di- tex and edge tables, then removes dangling edges to ensure graph
rected edge relationship, which is represented in the format ()-[]->(). integrity. Similarly, join allows updating the vertex or edge table
Nodes are specified using parentheses (), and relationships are spec- by joining in external data.
ified using square brackets, []. Nodes and relationships are linked
using an arrow-like syntax to express edge direction. The same node 2.2.5 Attribute-Based Partitioning
may be referenced in multiple relationships, allowing relationships Similar to GraphX, GraphFrames by default partitions a graph
to be composed into complex patterns. Additionally, nodes and based on the natural partitioning scheme of the edges. In [7], it was
edges can be constrained using inline type predicates expressed shown that natural partitioning can lead to great performance when
using colons. the input is pre-partitioned.
For example, the following snippet shows a user u who viewed In addition, GraphX supports partitioning a graph based on arbi-
both item x and item y. trary vertex or edge attributes. This is more general than GraphX or
Giraph because they only support partitioning on vertex identifiers.
(u: Person )-[ viewed ]->(x: Item), u-[ viewed ]->(y: Item)
This enables users to partition a graph based on their domain-specific
The resulting DataFrame from the above pattern should contain 3 knowledge that can lead to strong data locality and minimize data
structs: u, x, and y. communications.
Note that the pattern operator is a simple and intuitive way to Take the Amazon dataset [13] for example. The following snippet
specify pattern matching. Under the hood it is implemented using partitions the bipartite graph based on product categories:
the join and filter operators available on a GraphFrame. We provide g. partitionBy (g. vertices . productCategory )
it because it is often more natural to reason about graphs using
patterns than using relational joins. The pattern operator can also Intuitively, customers are more likely to buy products in the same
be combined with other operators, as demonstrated in the next category. Partitioning the Amazon graph this way puts products of
subsection. the same categories and their associated edges closer to each other.
2.2.6 User-defined Functions into existing Spark SQL data sources such as HDFS files in Json,
GraphFrame also supports arbitrary user-defined functions (UDFs) Parquet format, HBASE, Cassandra, etc. Second, GraphFrame can
in Scala, Java, and Python. The udf function accepts a lambda func- use a growing list of machine learning algorithms in MLlib. Third,
tion as input and creates an expression that can be used in relational GraphFrames can call Spark DataFrame API. As an example, the
and graph projection. For example, given a model object for a ma- following code reads user-product rating information from HDFS
chine learning model, we could create a UDF predicting some user into a DataFrame. We then select the review text and use user ID
behavior based on users’ age and registrationTime attributes. and product ID pair as the key. We can call the topic model to
learn the topics of reviews. With the topics, we can compute similar
model : LogisticRegressionModel = ...
predict = udf( lambda x, y: model . predict ( Vector (x, y))) products, etc as in [13] and do graph pattern matching to uncover
g. select ( user communities who bought similar products.
predict (g. vertices .age , g. vertices . registrationTime ))
corpus = rating .read. parquet (" hdfs :///...")
. select (pair(user_id , product_id ), review_txt )
Unlike database systems which often require UDFs to be de- ldaModel = LDA. train(corpus , k =10000)
fined in a separate programming environment that is different from topics = ldaModel . topicsMatrix ()
the primary query interfaces, our GraphFrame API supports inline
definition of UDFs. We do not need complicated packaging and
registration process found in other database systems. 2.5 Putting It Together
2.3 Generality of GraphFrames #
#
1. ETL
users: [id: int, attributes: MapType(user_name)]
As simple as it is, the GraphFrame abstraction is powerful enough # products: [id: int, attributes: MapType(brand,
to express many workloads. We demonstrate the expressiveness # category, price)]
# ratings: [user_id: int, product_id: int,
by mapping all GraphX operators to operators in GraphFrame. # rating: int, review_text: string]
Since GraphX can be used to model the programing abstractions # cobought: [product_1_id: int, product_2_id: int]
in GraphLab, Pregel, and BSP [7], by mapping GraphX operations
to GraphFrame, we demonstrate that GraphFrame is at least as vertices = users.union(products)
graph = GraphFrame(vertices, ratings)
expressive as GraphX, GraphLab, Pregel, and BSP.
GraphX’s operators can be divided into three buckets: collection # 2. Run ALS to get top 1M inferred recommendations
views, relational-like operators, and graph-parallel computations. # predictedRatings: [user_id: int, product_id: int,
# predicted_rating: int]
Section 2.2.2 already demonstrated that GraphFrame provides all predictedRatings = ALS.train(graph, iterations=20)
the three fundamental collection views in GraphX (edges, vertices, .recommendForAll(1e6)
and tripets).
densifiedGraph = GraphFrame(vertices,
All relational-like operators in GraphX can be trivially mapped ratings.union(predictedRatings).union(cobought))
one-to-one to GraphFrame operators. For example, the select
operator is a superset of GraphX’s mapV and mapE operators, and # 3. Find groups of users with the same interests
densifiedGraph.pattern("""(u1)-[r1]->(p1);
joinV and joinE are the generalized variant of GraphX’s leftJoinV (u2)-[r2]->(p2); (p1)-[]->(p2)""")
and leftJoinE operators. The filter operator is a more general .filter("r1.rating > 3 && r2.rating > 3")
version of GraphX’s subgraph operator. .select("u1.id", "u2.id")
In GraphX, graph-parallel computations consist of aggregateMessages 2
and its variants. Similar to the Gather phase in the GAS abstraction, Listing 3: GraphFrame End-to-End Example in Python
aggregateMessages encodes a two-stage process of graph-parallel
computation. Logically, it is the composition of a projection fol- We show that the ease of developing an end-to-end graph analytics
lowed by an aggregation on the triplets view. In [7], it was illustrated pipeline with an example in Listing 3. The example is an ecommerce
using the following SQL query: application that groups users of similar interests.
SELECT t.dstId , reduceF (mapF(t)) AS msgSum The first step is to perform ETL to extract information on users,
FROM triplets AS t GROUP BY t. dstId products, ratings and cobought. They are represented as DataFrames.
This SQL query can indeed be expressed using the following We then construct a GraphFrame graph. The vertices contain both
GraphFrame operators: user nodes and product nodes. The edges are between users and
products. An edge exists between a user and a product if the user
g. triplets rated the product. This is a bipartite graph.
. select (mapF(g. triplets . attribute [s]). as (" mapOutput "))
. groupBy (g. triplets . dstId ) For the second step, we run collaborative filtering to compute
.agg( reduceF (" mapOutput ")) predicted ratings of users, i.e. to uncover latent ratings not present in
the dataset. We then create a graph densifiedGraph with the same
We demonstrated that GraphFrame can support all the operators vertex node as graph and more edges by adding product-product
available in GraphX and consequently can support all operations edges. A product-product edge is added if the two are cobought.
in GraphLab, Pregel, and BSP. For convenience, we also provide As the final step, we will find pairs of users who have good ratings
similar APIs as GraphX’s Pregel variant in GraphFrame for imple- for at least two products together. We can also find group of users
menting iterative algorithms. We have also implemented common of size K.
graph algorithms including connected components, PageRank, tri- This example shows the ease of using the GraphFrames API.
angle counting. We performed ETL, iterative graph algorithms and graph pattern
2.4 Spark Integration matching in one system. It is much more intuitve than coding the
pipeline in SQL. Language integration also makes it easy to plug in
Because GraphFrames builds on top of Spark, this brings three UDFs. For example, we can create a UDF to extract product topics
benefits. First, GraphFrames can load data from and save data and topics user interested.
2 aggregateMessages was called mrTriplets in [7], but renamed in In the next section, we will high light the opportunities for joint
the open source GraphX system. optimization.
3. IMPLEMENTATION View Query Size in Google graph
2-cycle (a)->(b)->(a) 1,565,976
We implemented GraphFrames as a library on top of Spark SQL.
V (c)<-(a)->(b) 67,833,471
The library consists of the GraphFrame interface described in Sec-
Triangle (a)<-(b)->(c)->(a) 28,198,954
tion § 2, a pattern parser, and our view-based query planner. We also
3-cycle (a)->(b)->(c)->(a) 11,669,313
made improvements to Spark SQL’s Catalyst optimizer to support
Table 1: Views registered in the system to explore their impact on
GraphFrames.
queries in [8]
Each GraphFrame is represented as two Spark DataFrames (a ver-
tex DataFrame and a edge DataFrame), a collection of user-defined
views. Implementations of each of the GraphFrame methods follow
naturally from this representation, and the GraphFrame interface edge between vertices A and B can use the 2-cycle view, but by far
is 250 lines of Scala. The GraphFrame operations delegate to the the more expensive part of the plan is joining C and D to the view,
pattern parser and the query planner. because this generates all pairs of such vertices.
Our query planner is implemented as a layer on top of Spark However, in Query 4 we observe a large speedup when using
Catalyst, taking patterns as input, collecting statistics using Cata- views. In Query 4, the order-of-magnitude speedup is because
lyst APIs, and emitting a Catalyst logical plan. At query time, the the view equivalence check exposes an opportunity to reuse an
planner receives the user-specified views from the GraphFrame in- intermediate result that the planner would otherwise miss. This
terface. The planner additionally can suggest views when requested is because the reuse requires recognizing that two subgraphs are
by the user. The query planner also accepts custom attribute-based isomorphic despite having different node labels, a problem that is
partitioners which it uses to make more accurate cost estimates and difficult in general but becomes much easier with the right choice
incorporates into the generated plans. of view. In particular, the Triangle view is applicable both to the
To simplify the query planner, we modified Catalyst to support BCD triangle and the BCE triangle in Query 4, so the planner can
join elimination when allowed by the foreign key relationship be- replace the naive 5-way join with a single self-join of the Triangle
tween vertices and edges. This change required adding support for view with equality constraints on vertices B and C.
unique and foreign key constraints on DataFrames to Spark SQL. Additionally, in Query 5, precomputing views speeds up the main
Join elimination enables the query planner to emit one join per refer- query by a factor of 2 by moving the work of computing the BCD
enced vertex, and joins unnecessary to produce the final output will and BED triangles from the main query into the Triangle 2 and
be eliminated. This change required 800 lines of Scala. 3-cycle views. These views are expensive to create, and since they
Our pattern parser uses the Scala parser combinator library and is are common patterns it is reasonable to precompute them.
implemented in 50 lines of Scala.
Finally, building on top of Spark enables GraphFrames to easily
4.2 End-to-End Pipeline Performance
integrate with data sources and call its machine learning libraries. We next evaluate the end-to-end performance of a multi-step
pipeline that finds groups of users with the same interests in an
3.1 Query Optimization Amazon review dataset. We will see that using Spark and Graph-
The GraphFrame query planner extends the dynamic program- Frames for the whole pipeline allows more powerful optimizations
ming algorithm of Huang et al. [8] to the distributed setting and adds and avoids the overhead of moving data between system boundaries.
a view rewrite capability. The user can register arbitrary material- We ran the pipeline described in Listing 3 on an Amazon re-
ized views and the planner will automatically rewrite the query to view dataset [13] with 82,836,502 reviews and 168,954,245 pairs
reuse a materialized view when appropriate. This is useful because of related products. Additionally, after finding groups of users with
pattern queries could be very expensive to run and reusing computa- the same interests in step 3, we aggregated the result for each user
tions across several queries can improve the user experience. Our to find the number of users with the same interests. To simulate
optimizer also offers suggestions for which views to create. See running this pipeline without GraphFrames as a comparison point,
appendix for details on this algorithm. we ran each stage separately using Spark, saving and loading the
working data between stages. In addition to the I/O overhead, this
prevented projections from being pushed down into the data scan,
4. EVALUATION increasing the ETL time. Figure 2c shows this comparison.
In this section we demonstrate that GraphFrames benefit greatly
in some cases by materializing appropriate views and outperform a
mix of systems on analytics pipelines by avoiding communication 5. RELATED WORK
between systems and optimizing across the entire pipeline. To our knowledge, GraphFrames is the first system that lets users
All experiments were conducted on Amazon EC2 using 8 r3.2xlarge combine graph algorithms, pattern matching and relational queries
worker nodes in November 2015. Each node has 8 virtual cores, 61 in a single API, and optimizes computations across them. Graph-
GB of memory, and one 160 GB SSD. Frames builds on previous work in graph analytics using relational
databases, query optimization for pattern matching, and declarative
4.1 Impact of Views APIs for data analytics.
We first demonstrate that materializing the appropriate views Graph Databases Graph databases such as Neo4j [17] and Ti-
reduces query time, and in some cases can greatly improve the plan tan [16] focus on mostly on graph queries, often using pattern
selection. We ran the six queries shown in Figure 2a on a web graph matching languages like Cypher [17] and Gremlin [15]. They have
dataset released by Google in 2002 [10]. This graph has 875,713 very limited support for graph algorithms such as PageRank and for
vertices and 5,105,039 edges. It is the largest graph used for these connecting with relational data outside the graph database. Graph-
queries in [8]. Before running these queries we registered the views Frames use the pattern matching abstraction from these systems,
listed in Table 1. We then ran each query with and without view but can also support other parts of the graph processing workflow,
rewrite enabled. The results are reported in Figure 2b. such as building the graph itself out of relational queries on multi-
Queries 1, 2, 3, and 6 do not benefit much from views, because the ple tables, and running analytics algorithms in addition to pattern
main cost in these queries comes from generating unavoidably large matching. GraphFrames then optimizes query execution across this
intermediate result sets. For example, in Query 1 the bidirectional entire workflow. GraphFrames’ language-integrated API also makes
300
ETL
250 Training
Query
450 417 409 200 187
400 Without views

Runtime (s)
350 With views 150 149
300 View creation

Runtime (s)
250 100
200 204
150 110 122 138 50
100 105
50 21 19 34 30 34 0
0 Multiple Single
Q1 Q2 Q3 Q4 Q5 Q6 systems system

(a) Pattern queries [8] (b) Performance of pattern queries with and with- (c) End-to-end pipeline per-
out views formance: multiple systems
vs. single system

it easy to call user-defined functions (e.g., ETL code) and, in our processing dynamic graphs. In the future, we would like to develop
Spark-based implementation, to call into Spark’s built-in libraries, efficient incremental graph update and processing support. We plan
giving users a single environment in which to write end-to-end to leverage the newly available IndexedRDD [3] project to do this
workflows. over Spark, or a relational database engine as an alternative backend.
Graph-Parallel Programming Standalone systems including One interesting addition here will be deciding which graph views
Pregel and GraphLab [12, 11] have been designed to run graph we wish to maintain incrementally as the graph changes.
algorithms, but they require separate data export and import and thus
make end-to-end workflows complex to build. GraphFrames use
similar parallel execution plans to many of these systems (e.g., the 7. REFERENCES
Gather-Apply-Scatter pattern) while supporting broader workflows.
Graph Processing over RDBMS GraphX and Vertexica [7, 9] [1] GraphFrames: DataFrame-based graphs.
have explored running graph algorithms on relational databases or https://github.com/graphframes/graphframes, Apr. 2016.
dataflow engines. Of these, GraphX materializes a triplets view of [2] Afrati, F. N., et al. GYM: a multiround join algorithm in
the graph to speed up the most common join in iterative graph algo- MapReduce. CoRR abs/1410.4156 (2014).
rithms, while the others use the raw tables in the underlying database.
[3] Apache Spark. Spark IndexedRDD: An efficient updatable
GraphFrames generalize the execution strategy in these systems by
key-value store for Apache Spark.
letting the user materialize multiple views of arbitrary patterns,
https://github.com/amplab/spark-indexedrdd, 2015.
which can greatly speed up common types of queries. GraphFrames
also provide a much broader API, including pattern matching and [4] Armbrust, M., et al. Spark SQL: relational data processing in
relational queries, where these tasks can all be combined, whereas Spark. In SIGMOD (2015).
previous systems only focused on graph algorithms. [5] Chirkova, R., et al. A formal perspective on the view
selection problem. VLDB 2002.
[6] Chu, S., et al. From theory to practice: Efficient join query
6. DISCUSSION AND CONCLUSION evaluation in a parallel database system. In SIGMOD (2015).
Graph analytics applications typically require relational process- [7] Gonzalez, J., et al. GraphX: Graph processing in a
ing, pattern matching and iterative graph algorithms. However, these distributed dataflow framework. In OSDI (2014).
applications previously had to be implemented in multiple systems, [8] Huang, J., et al. Query optimization of distributed pattern
adding both overhead and complexity. In this paper, we aim to unify matching. In ICDE (2014).
the three with the GraphFrames abstraction. The GraphFrames API [9] Jindal, A., et al. Vertexica: Your relational friend for graph
which is concise and declarative, based on the “data frame” con- analytics! VLDB (2014).
cept in R, and enables easy expression and mixing of these three [10] Leskovec, J., et al. SNAP Datasets: Stanford large network
paradigms. GraphFrames optimize the entire computation using dataset collection. http://snap.stanford.edu/data, June 2014.
graph-aware join optimization and view selection algorithm that [11] Low, Y., et al. GraphLab: A new framework for parallel
generalizes the execution strategies in previous graph-on-RDBMS machine learning. In UAI (2010).
systems. GraphFrames are implemented over Spark SQL, enabling
[12] Malewicz, G., et al. Pregel: A system for large-scale graph
parallel execution on Spark and easy integration with Spark’s ex-
processing. In SIGMOD (2010).
ternal data sources, built-in libraries, and custom ETL code. We
showed that GraphFrames make it easy to write complete graph [13] McAuley, J., et al. Inferring networks of substitutable and
processing pipelines and enable optimizations across them that are complementary products. In KDD (2015).
not possible in current systems. [14] Olston, C., Reed, B., Srivastava, U., Kumar, R., and Tomkins,
We have open sourced our system to allow other researchers to A. Pig Latin: A not-so-foreign language for data processing.
build upon it [1]. In SIGMOD (2008).
Our current optimization algorithm produces a tree of pairwise [15] Rodriguez, M. A. The Gremlin graph traversal machine and
join operators. As part of future work, we would like to support other language. CoRR abs/1508.03843 (2015).
options, such as one-shot join algorithms over multiple tables [2] [16] Titan distributed graph database.
and worse-case optimal join algorithms [6]. It should be possible to http://thinkaurelius.github.io/titan/.
integrate these algorithms into our System-R based framework. [17] Webber, J. A programmatic introduction to Neo4j. In
Additionally, GraphFrames currently do not provide support for SPLASH (2012).
Algorithm 1: FindPlanWithViews Algorithm 2: MatchViews
Input : query Q; graph G; views GV1, .., GVn ; partitioning P Input : query plan solution set S; graph views GV1, .., GVn
Output : Solutions for running Q Output : query plan solution set from matching views
1 if Q.sol != null then 1 V=φ;
2 return null; // already generated plans for Q 2 foreach Solution S in S do
3 if Q has only one edge e = (v1, v2)) then 3 foreach Graph View GVi do
4 Q.sol = (“match e", scan cost of E i , P(e i ), cei ); 4 queryFragment = S.plan.query;
5 Q.sol = Q.sol ∪ MatchViews(Q.sol, views); 5 viewQuery = GVi .query;
6 return; 6 if viewQuery is equivalent to queryFragment then
7 V = V ∪ (“scan", scan cost of GVi , GVi .p, C E i );
7 if all edges in Q are co-partitioned w.r.t P then
8 Q.sol = (“co-l join of Q",co-l join cost i, P(Q), cei ); 8 return V;
9 Q.sol = Q.sol ∪ MatchViews(Q.sol, views);
10 return;
11 T=φ;
12 LD = LinearDecomposition(Q) ; The algorithm starts by recursively decomposing the pattern query
13 foreach linear decomposition (q1,q2) in LD do
14 FindPlan(q1);
into smaller fragments and building the query plan for each fragment.
15 FindPlan(q2 ); At the leaf level, i.e., when the fragment consists of only a single
16 linearPlans = GenerateLinearPlans(q1,q2); edge, we lookup the edge (along with its associated predicates) in
17 T = T ∪ linearPlans; the base graph. At higher levels, we combine the child query plans to
18 T = T ∪ MatchViews(linearPlans, views); produce larger plans. At each level, we also check whether the query
19 LDAGs = GenerateLayeredDAGs(Q) ; fragment matches with the view query of any of the graph views. In
20 foreach layered DAG d in LDAGs do case a match is found, we add the view as a candidate solution to the
21 (q 1, q 2, ..., q n−1, q n ) = BushyDecomposition(d) ; query fragment. This also takes care of combining child query plans
22 for i from 1 to n do from multiple graph views, i.e., we consider all combinations of
23 FindPlan(q i );
the graph views and later pick the best one. Algorithm 1 shows the
24 bushyPlans = GenerateBushyPlans(q 1, ..., q n ); extended algorithm for finding plans using views. Each time a new
25 T = T ∪ bushyPlans;
26 T = T ∪ MatchViews(bushyPlans, views); plan is generated for a query fragment, we match the fragment with
the set of graph views, as shown in blue in Algorithm 1. Algorithm 2
27 Q.sol = EliminateNonMinCosts(T);
shows the pseudocode for view matching. For every query plan
solution, we check whether its query fragment is equivalent to a
view query3 and add the view to the solution set in case a match is
APPENDIX found. Note that we keep both the solutions, one which uses the
view and one which does not, and later pick the best one. Also
A. QUERY OPTIMIZATION note that we match the views on the logical query fragments in a
GraphFrame operators, including both the graph as well as the bottom-up fashion, i.e., a view matched a lower levels could still be
relational operators, are compiled to relational operators. Thereafter, replaced by a larger view (and thus more useful view) at the higher
we optimize the complete pipeline by extending Catalyst. To do this, levels.
the GraphFrame query planner extends the dynamic programming Combining graph views, however, produces a new (intermediate)
algorithm of Huang et al. [8] to the distributed setting and adds the graph view and so we need to consider the new (combined) cost
view rewrite capability. The user can register arbitrary materialized estimate when combining it further. To handle this, we keep track
views and the planner will automatically rewrite the query to reuse a of four things in the query plan solution at each level: (i) the query
materialized view when appropriate. This is useful because pattern plan, (ii) the estimated cost, (iii) the partitioning, and (iv) the cost
queries could be very expensive to run and reusing computations estimator. When combining the solutions, we combine their cost
across several queries can improve the user experience. GraphFrame estimates as well4.
API also allows users to get suggestions for the views to create. We Figure 2 illustrates the query planning using views. The system
also describe how we can further extend the query planning to create takes the given pattern query and the three graph views, V1 , V2 ,
the views adaptively. and V3 as inputs. The linear decomposition (recursively) splits the
Additionally, by building on top of Spark SQL, GraphFrames query into two fragments, such that at least one of them is not
also benefit from whole-stage code generation. decomposable (i.e., it is either a single edge or co-partitioned set
of edges). The lower left part of Figure 2 shows one such linear
A.1 Query Planner decomposition and the corresponding candidate query plan using
views. Here we match a view with a query fragment only when
The dynamic programming algorithm proposed in [8] recursively it contains the exact same set of edges. The bushy decomposition
decomposes a pattern query into fragments, the smallest fragment generates query fragments none of which may be non-decomposable
being a set of co-partitioned edges, and builds a query plan in a (i.e., each query fragment could be further split into linear or bushy
bottom-up fashion. The original algorithm considers a single input decompositions). The lower right part of Figure 2 shows one such
graph. In this paper, we extend it to views, i.e., the algorithm bushy decomposition. We can see that the corresponding candidate
matches the views in addition to matching the pattern query. The query plan is different and could not have been generated by the
input to the algorithm is the base graph G, a set of graph views linear decomposition alone.
{GV1 , GV2 , .., GVn }, and the pattern query Q = (Vq , Eq ). Each
graph view GVi consists of the view query that was used to create 3 Instead of looking for exact match, the algorithm could also be
the view and a cost estimator CEi . The algorithm also takes the extended to match views which contain the query fragment, as in
partitioning function P as an input, as opposed to a fixed partitioning traditional view matching literature.
in the original algorithm. The output is the best query plan (lowest 4 We can do this more efficiently by pre-computing the estimates for
cost) to process the query Q. all combinations of the graph views.
Query Views
We discussed how the system can help users to select views.
C B D C B B D D However, the views are still created manually as an offline pro-
cess. This is expensive and often the utility of a view is not known
A E A E E a-priori. Let us now see how we can leverage the query plan-
V1 V2 V3
ning algorithm to adaptively create the graph views. The key idea
is to start by materializing smaller query fragments and progres-
Linear Decomposition Bushy Decomposition sively combine them to create views of larger query fragments.
To do this, we annotate each solution with the list of graph views
C B D D C D
it processes, i.e., solution s now have five pieces of information:
A B B E
(plan, cost, partitioning, estimator, {GVi }). When combining the
A E E
child query plans, we union the graph views from the children.
Candidate Plan: (V1 ⋈ V2) ⋈ V3 B D
When the algorithm runs for the first time there is only a single
input graph view which is the base graph itself. We look at all
Candidate Plan: V1 ⋈ (V2 ⋈ V3) the leaf level query plans, and materializing the one(s) having the
maximum utility, i.e., they are the most useful. In each subsequent
runs, we consider materializing the query plans which combine
Figure 2: View Matching during Linear and Bushy Decompositions.
existing views, i.e., we essentially move the view higher up in the
query tree. We still consider materializing new leaf level plans from
S.No. View the base graph. Rather than combining the graph views greedily, a
1 A ← B, A ← C, B ← F more fancy version can also keep counters on how many times each
2 A ← B, A ← C, C ← E graph view is used. We can then combine the most frequent as well
3 A ← B, A ← C, D ← B as most useful graph views.
4 A ← B, A ← C, E ← C The above adaptive view creation technique has two major advan-
5 B ← A, B ← D, E ← B tages. First, it amortizes the cost of creating the view over several
queries. This is because creating views at the lower levels involve
Table 2: Top-5 views of size three suggested by the system for the
fewer joins and hence it is cheaper. The system only spends more
workload in [8]
resources on creating a view in case it is used more often. Second,
this approach starts from more general leaf level views, which could
be used across a larger set of queries, and gradually specializes to
Our extend view matching algorithm can still leverage all of the larger views higher up in the query tree. This is useful in scenarios
optimizations proposed by Huang et al. [8], including considering where a user starts from ad-hoc analysis and later converges to a
co-partitioned edges as the smallest fragment since they can be specific query workload — something which is plausible in pattern
computed locally, performing both linear and bushy decomposition matching queries.
of the queries, and applying cycle-based optimization.

A.2 View Selection via Query Planning


The assumption so far was that the user manually creates the
views. The system then generates query plans to process pattern
queries using one or more of them. However, in some cases, the
user may not be able to select which views to create. The question
therefore is whether the system can suggest users the views to
create in such scenarios. Notice that the nice thing about recursive
query planning is that we are anyways traversing the space of all
possible query fragments that are relevant for the given query. We
can consider each of these query fragments as candidate views. This
means that every time we generate a query plan for a fragment, we
add it to a global set of candidate views.
In the end, we can rank the candidate views using different utility
functions and present the top ones. One such function could be the
ratio of cost, i.e., savings due to the view, and size, i.e., the spend-
ing on the view. Recall that each query plan solution (the candidate
view) contains its corresponding cost estimator as well, which could
be used to compute these metrics. The system could also collect
the candidate views over several queries before presenting the in-
teresting ones to the user. We refer the readers to traditional view
selection literature for more details [5].
The key thing to take away from here is that we can generate
interesting candidate views as a side-effect of dynamic programming
based query planning. This means that the user can start running his
pattern queries on the input graph and later create one or more of
the suggested views to improve the performance. To illustrate, we
ran the view suggestion API for the six query workload from [8].
Table 2 shows the top-5 views of size three produced by the system.

A.3 Adaptive View Creation

You might also like