Graphframes: An Integrated Api For Mixing Graph and Relational Queries
Graphframes: An Integrated Api For Mixing Graph and Relational Queries
Graphframes: An Integrated Api For Mixing Graph and Relational Queries
Relational Queries
Ankur Dave? , Alekh Jindal , Li Erran Li† , Reynold Xin , Joseph Gonzalez? , Matei ZahariaO
? University of California, Berkeley O MIT † Uber Technologies Databricks Microsoft
Graph data is prevalent in many domains, but it has usually required triples = gf.pattern("(x:User)->(p:Product)<-(y:User)")
specialized engines to analyze. This design is onerous for users pairs.where(pairs.p.category == "Books")
and precludes optimization across complete workflows. We present .groupBy(pairs.p.name)
GraphFrames, an integrated system that lets users combine graph .count()
algorithms, pattern matching and relational queries, and optimizes
work across them. GraphFrames generalize the ideas in previous Listing 1: An example of the GraphFrames API. We create a Graph-
graph-on-RDBMS systems, such as GraphX and Vertexica, by let- Frame from two tables of vertices and edges, and then we search for
ting the system materialize multiple views of the graph (not just the all instances pattern, namely two users that bought the same product.
specific triplet views in these systems) and executing both iterative The result of this search is another table that we can then perform
algorithms and pattern matching using joins. To make applications filtering and aggregation on. The system will optimize across these
easy to write, GraphFrames provide a concise, declarative API based steps, e.g., pushing the filter above the pattern search.
on the “data frame” concept in R that can be used for both interactive
queries and standalone programs. Under this API, GraphFrames
use a graph-aware join optimization algorithm across the whole
computations on the result. With isolated graph analysis systems,
computation that can select from the available views.
users have to move data manually and there is no optimization of
We implement GraphFrames over Spark SQL, enabling parallel
computation across these phases of the workflow. Several recent
execution on Spark and integration with custom code. We find
systems have started to bridge this gap by running graph algorithms
that GraphFrames make it easy to express end-to-end workflows
on a relational engine [7, 9], but they have no support for pattern
and match or exceed the performance of standalone tools, while
matching and do not optimize across graph and relational queries.
enabling optimizations across workflow steps that cannot occur in
We present GraphFrames, an integrated system that can combine
current systems. In addition, we show that GraphFrames’ view
relational processing, pattern matching and graph algorithms and
abstraction makes it easy to further speed up interactive queries
optimize computations across them. GraphFrames generalize the
by registering the appropriate view, and that the combination of
ideas behind GraphX [7] and Vertexica [9] by maintaining arbitrary
graph and relational data allows for other optimizations, such as
views of a graph (e.g., triplets or triangles) and executing queries
attribute-aware partitioning.
using joins across them. They then optimize execution across the
relational and graph portions of the computation. A key challenge
1. INTRODUCTION in achieving this goal is query planning. For this purpose, we extend
Analyzing the graphs of relationships that occur in modern datasets a graph-aware dynamic programming algorithm by Huang et al. [8]
is increasingly important, in domains including commerce, social to select among multiple input views and compute a join plan. We
networks, and medicine. To date, this analysis has been done also propose an algorithm for suggesting new views based on the
through specialized systems like Neo4J [17], Titan [16] and GraphLab [11]. query workload.
These systems offer two main capabilities: pattern matching to find To make complete graph analytics workflows easy to write, Graph-
subgraphs of interest [17] and graph algorithms such as shortest Frames provide a declarative API similar to “data frames” in R,
paths and PageRank [12, 11, 7]. Python and Spark that integrates into procedural languages like
While graph analytics is powerful, running it in a separate system Python. Users build up a computation out of relational operators,
is both onerous and inefficient. Most workflows involve building pattern matching, and calls to algorithms, as shown in Listing 1.
a graph from existing data, likely in a relational format, then run- The system then optimizes across these steps, selecting join plans
ning search or graph algorithms on it, and then performing further and performing algebraic optimizations. Similar to systems like
Pig [14] and Spark SQL [4], the API makes it easy to build a com-
Permission to make digital or hard copies of all or part of this work for personal or putation incrementally while receiving the full benefits of relational
classroom use is granted without fee provided that copies are not made or distributed
for profit or commercial advantage and that copies bear this notice and the full citation optimization. Finally, the GraphFrames API is also designed to be
on the first page. Copyrights for components of this work owned by others than the used interactively: users can launch a session, define views that
author(s) must be honored. Abstracting with credit is permitted. To copy otherwise, or will aid their queries, and query data interactively from a Python
republish, to post on servers or to redistribute to lists, requires prior specific permission
and/or a fee. Request permissions from permissions@acm.org. shell. Unlike current tools, GraphFrames let analysts perform their
GRADES 2016, June 24 2016, Redwood Shores, CA, USA complete workflow in a single system.
© 2016 Copyright held by the owner/author(s). Publication rights licensed to ACM.
We have implemented GraphFrames over Spark SQL [4], and
ISBN 978-1-4503-4780-8/16/06. . . $15.00 made them compatible with Spark’s existing DataFrame API. We
DOI: http://dx.doi.org/10.1145/2960414.2960416 show that GraphFrames match the performance of other distributed
graph engines for various tasks, while enabling optimizations across arithmetic ones (+, -, etc). They also support aggregates, such as
the tasks that would not happen in other systems. Support for count("name").
multiple views of the graph adds significant benefits: for example, Internally, a DataFrame object represents a logical plan to com-
materializing a few simple views can speed up queries by 10× over pute a dataset. A DataFrame does not need to be materialized, until
the algorithm in [8]. Finally, by building on Spark, GraphFrames the user calls a special “output operation” such as save. This enables
interoperate easily with custom UDFs (e.g., ETL code) and with rich optimization across all operations that were used to build the
Spark’s machine learning library and external data sources. DataFrame.1
In summary, our contributions are: In terms of data type support, DataFrame columns support all
• A declarative API that lets users combine relational process- major SQL data types, including boolean, integer, double, decimal,
ing, pattern matching and graph algorithms into complex string, date, and timestamp, as well as complex (i.e., non-atomic)
workflows and optimizes across them. data types: structs, arrays, maps and unions. Complex data types can
also be nested together to create more powerful types. In addition,
• An execution strategy that generalizes those in Vertexica and DataFrame also supports user-defined types [4].
GraphX to support multiple views of the graph.
• A graph-aware query optimization algorithm that selects join
2.2 GraphFrame Data Model
plans based on the available views. A GraphFrame is logically represented as two DataFrames: an
edge DataFrame and a vertex DataFrame. That is to say, edges and
• An implementation and evaluation of GraphFrames on Spark. vertices are represented in separate DataFrames, and each of them
can contain attributes that are part of the supported types. Take
2. GRAPHFRAME API a social network graph for an example. The vertices can contain
The main programming abstraction in GraphFrames’ API is a attributes including name (string), age (integer), and geographic
GraphFrame. Conceptually, it consists of two relations (tables) location (a struct consisting of two floating point values for longitude
representing the vertices and edges of the graph, as well as a set of and latitude), while the edges can contain an attribute about the
materialized views of subgraphs. The vertices and edges may have time a user friended another (timestamp). The GraphFrame model
multiple attributes that are used in queries. The views are defined supports user-defined attributes with each vertex and edges, and
using patterns to match various shapes of subgraphs, as we shall thus is equivalent to the property graph model used in many graph
describe in Section 2.2.2. For example, a user might create a view systems including GraphX and GraphLab. GraphFrame is more
of all the triangles in the graph, which can then be used to quickly general than Pregel/Giraph since GraphFrame supports user-defined
answer other queries involving triangles. attributes on edges.
GraphFrames expose a concise language-integrated API that uni- Similar to DataFrames, a GraphFrame object is internally repre-
fies graph analytics and relational queries. We based this API on sented as a logical plan, and as a result the declaration of a Graph-
DataFrames, a common abstraction for data science in Python and Frame object does not necessarily imply the materialization of its
R that is also available as a declarative API on Spark SQL [4]. In data.
this section, we first cover some background on DataFrames, and Next, we explain how a GraphFrame can be constructed and
then discuss the additional operations available on GraphFrames. operations available on them.
We demonstrate the generality of GraphFrames for analytics by
mapping the core primitives in GraphX into GraphFrame operations. class GraphFrame {
// Different views on the graph
Finally, we discuss how GraphFrames integrate with the rest of def vertices: DataFrame
Apache Spark (e.g., the machine learning library). def edges: DataFrame
All the code examples are shown in Python. We show the Graph- def triplets: DataFrame
// Pattern matching
Frame API itself in Scala because it explicitly lists data types. def pattern(pattern: String): DataFrame
Runtime (s)
350 With views 150 149
300 View creation
Runtime (s)
250 100
200 204
150 110 122 138 50
100 105
50 21 19 34 30 34 0
0 Multiple Single
Q1 Q2 Q3 Q4 Q5 Q6 systems system
(a) Pattern queries [8] (b) Performance of pattern queries with and with- (c) End-to-end pipeline per-
out views formance: multiple systems
vs. single system
it easy to call user-defined functions (e.g., ETL code) and, in our processing dynamic graphs. In the future, we would like to develop
Spark-based implementation, to call into Spark’s built-in libraries, efficient incremental graph update and processing support. We plan
giving users a single environment in which to write end-to-end to leverage the newly available IndexedRDD [3] project to do this
workflows. over Spark, or a relational database engine as an alternative backend.
Graph-Parallel Programming Standalone systems including One interesting addition here will be deciding which graph views
Pregel and GraphLab [12, 11] have been designed to run graph we wish to maintain incrementally as the graph changes.
algorithms, but they require separate data export and import and thus
make end-to-end workflows complex to build. GraphFrames use
similar parallel execution plans to many of these systems (e.g., the 7. REFERENCES
Gather-Apply-Scatter pattern) while supporting broader workflows.
Graph Processing over RDBMS GraphX and Vertexica [7, 9] [1] GraphFrames: DataFrame-based graphs.
have explored running graph algorithms on relational databases or https://github.com/graphframes/graphframes, Apr. 2016.
dataflow engines. Of these, GraphX materializes a triplets view of [2] Afrati, F. N., et al. GYM: a multiround join algorithm in
the graph to speed up the most common join in iterative graph algo- MapReduce. CoRR abs/1410.4156 (2014).
rithms, while the others use the raw tables in the underlying database.
[3] Apache Spark. Spark IndexedRDD: An efficient updatable
GraphFrames generalize the execution strategy in these systems by
key-value store for Apache Spark.
letting the user materialize multiple views of arbitrary patterns,
https://github.com/amplab/spark-indexedrdd, 2015.
which can greatly speed up common types of queries. GraphFrames
also provide a much broader API, including pattern matching and [4] Armbrust, M., et al. Spark SQL: relational data processing in
relational queries, where these tasks can all be combined, whereas Spark. In SIGMOD (2015).
previous systems only focused on graph algorithms. [5] Chirkova, R., et al. A formal perspective on the view
selection problem. VLDB 2002.
[6] Chu, S., et al. From theory to practice: Efficient join query
6. DISCUSSION AND CONCLUSION evaluation in a parallel database system. In SIGMOD (2015).
Graph analytics applications typically require relational process- [7] Gonzalez, J., et al. GraphX: Graph processing in a
ing, pattern matching and iterative graph algorithms. However, these distributed dataflow framework. In OSDI (2014).
applications previously had to be implemented in multiple systems, [8] Huang, J., et al. Query optimization of distributed pattern
adding both overhead and complexity. In this paper, we aim to unify matching. In ICDE (2014).
the three with the GraphFrames abstraction. The GraphFrames API [9] Jindal, A., et al. Vertexica: Your relational friend for graph
which is concise and declarative, based on the “data frame” con- analytics! VLDB (2014).
cept in R, and enables easy expression and mixing of these three [10] Leskovec, J., et al. SNAP Datasets: Stanford large network
paradigms. GraphFrames optimize the entire computation using dataset collection. http://snap.stanford.edu/data, June 2014.
graph-aware join optimization and view selection algorithm that [11] Low, Y., et al. GraphLab: A new framework for parallel
generalizes the execution strategies in previous graph-on-RDBMS machine learning. In UAI (2010).
systems. GraphFrames are implemented over Spark SQL, enabling
[12] Malewicz, G., et al. Pregel: A system for large-scale graph
parallel execution on Spark and easy integration with Spark’s ex-
processing. In SIGMOD (2010).
ternal data sources, built-in libraries, and custom ETL code. We
showed that GraphFrames make it easy to write complete graph [13] McAuley, J., et al. Inferring networks of substitutable and
processing pipelines and enable optimizations across them that are complementary products. In KDD (2015).
not possible in current systems. [14] Olston, C., Reed, B., Srivastava, U., Kumar, R., and Tomkins,
We have open sourced our system to allow other researchers to A. Pig Latin: A not-so-foreign language for data processing.
build upon it [1]. In SIGMOD (2008).
Our current optimization algorithm produces a tree of pairwise [15] Rodriguez, M. A. The Gremlin graph traversal machine and
join operators. As part of future work, we would like to support other language. CoRR abs/1508.03843 (2015).
options, such as one-shot join algorithms over multiple tables [2] [16] Titan distributed graph database.
and worse-case optimal join algorithms [6]. It should be possible to http://thinkaurelius.github.io/titan/.
integrate these algorithms into our System-R based framework. [17] Webber, J. A programmatic introduction to Neo4j. In
Additionally, GraphFrames currently do not provide support for SPLASH (2012).
Algorithm 1: FindPlanWithViews Algorithm 2: MatchViews
Input : query Q; graph G; views GV1, .., GVn ; partitioning P Input : query plan solution set S; graph views GV1, .., GVn
Output : Solutions for running Q Output : query plan solution set from matching views
1 if Q.sol != null then 1 V=φ;
2 return null; // already generated plans for Q 2 foreach Solution S in S do
3 if Q has only one edge e = (v1, v2)) then 3 foreach Graph View GVi do
4 Q.sol = (“match e", scan cost of E i , P(e i ), cei ); 4 queryFragment = S.plan.query;
5 Q.sol = Q.sol ∪ MatchViews(Q.sol, views); 5 viewQuery = GVi .query;
6 return; 6 if viewQuery is equivalent to queryFragment then
7 V = V ∪ (“scan", scan cost of GVi , GVi .p, C E i );
7 if all edges in Q are co-partitioned w.r.t P then
8 Q.sol = (“co-l join of Q",co-l join cost i, P(Q), cei ); 8 return V;
9 Q.sol = Q.sol ∪ MatchViews(Q.sol, views);
10 return;
11 T=φ;
12 LD = LinearDecomposition(Q) ; The algorithm starts by recursively decomposing the pattern query
13 foreach linear decomposition (q1,q2) in LD do
14 FindPlan(q1);
into smaller fragments and building the query plan for each fragment.
15 FindPlan(q2 ); At the leaf level, i.e., when the fragment consists of only a single
16 linearPlans = GenerateLinearPlans(q1,q2); edge, we lookup the edge (along with its associated predicates) in
17 T = T ∪ linearPlans; the base graph. At higher levels, we combine the child query plans to
18 T = T ∪ MatchViews(linearPlans, views); produce larger plans. At each level, we also check whether the query
19 LDAGs = GenerateLayeredDAGs(Q) ; fragment matches with the view query of any of the graph views. In
20 foreach layered DAG d in LDAGs do case a match is found, we add the view as a candidate solution to the
21 (q 1, q 2, ..., q n−1, q n ) = BushyDecomposition(d) ; query fragment. This also takes care of combining child query plans
22 for i from 1 to n do from multiple graph views, i.e., we consider all combinations of
23 FindPlan(q i );
the graph views and later pick the best one. Algorithm 1 shows the
24 bushyPlans = GenerateBushyPlans(q 1, ..., q n ); extended algorithm for finding plans using views. Each time a new
25 T = T ∪ bushyPlans;
26 T = T ∪ MatchViews(bushyPlans, views); plan is generated for a query fragment, we match the fragment with
the set of graph views, as shown in blue in Algorithm 1. Algorithm 2
27 Q.sol = EliminateNonMinCosts(T);
shows the pseudocode for view matching. For every query plan
solution, we check whether its query fragment is equivalent to a
view query3 and add the view to the solution set in case a match is
APPENDIX found. Note that we keep both the solutions, one which uses the
view and one which does not, and later pick the best one. Also
A. QUERY OPTIMIZATION note that we match the views on the logical query fragments in a
GraphFrame operators, including both the graph as well as the bottom-up fashion, i.e., a view matched a lower levels could still be
relational operators, are compiled to relational operators. Thereafter, replaced by a larger view (and thus more useful view) at the higher
we optimize the complete pipeline by extending Catalyst. To do this, levels.
the GraphFrame query planner extends the dynamic programming Combining graph views, however, produces a new (intermediate)
algorithm of Huang et al. [8] to the distributed setting and adds the graph view and so we need to consider the new (combined) cost
view rewrite capability. The user can register arbitrary materialized estimate when combining it further. To handle this, we keep track
views and the planner will automatically rewrite the query to reuse a of four things in the query plan solution at each level: (i) the query
materialized view when appropriate. This is useful because pattern plan, (ii) the estimated cost, (iii) the partitioning, and (iv) the cost
queries could be very expensive to run and reusing computations estimator. When combining the solutions, we combine their cost
across several queries can improve the user experience. GraphFrame estimates as well4.
API also allows users to get suggestions for the views to create. We Figure 2 illustrates the query planning using views. The system
also describe how we can further extend the query planning to create takes the given pattern query and the three graph views, V1 , V2 ,
the views adaptively. and V3 as inputs. The linear decomposition (recursively) splits the
Additionally, by building on top of Spark SQL, GraphFrames query into two fragments, such that at least one of them is not
also benefit from whole-stage code generation. decomposable (i.e., it is either a single edge or co-partitioned set
of edges). The lower left part of Figure 2 shows one such linear
A.1 Query Planner decomposition and the corresponding candidate query plan using
views. Here we match a view with a query fragment only when
The dynamic programming algorithm proposed in [8] recursively it contains the exact same set of edges. The bushy decomposition
decomposes a pattern query into fragments, the smallest fragment generates query fragments none of which may be non-decomposable
being a set of co-partitioned edges, and builds a query plan in a (i.e., each query fragment could be further split into linear or bushy
bottom-up fashion. The original algorithm considers a single input decompositions). The lower right part of Figure 2 shows one such
graph. In this paper, we extend it to views, i.e., the algorithm bushy decomposition. We can see that the corresponding candidate
matches the views in addition to matching the pattern query. The query plan is different and could not have been generated by the
input to the algorithm is the base graph G, a set of graph views linear decomposition alone.
{GV1 , GV2 , .., GVn }, and the pattern query Q = (Vq , Eq ). Each
graph view GVi consists of the view query that was used to create 3 Instead of looking for exact match, the algorithm could also be
the view and a cost estimator CEi . The algorithm also takes the extended to match views which contain the query fragment, as in
partitioning function P as an input, as opposed to a fixed partitioning traditional view matching literature.
in the original algorithm. The output is the best query plan (lowest 4 We can do this more efficiently by pre-computing the estimates for
cost) to process the query Q. all combinations of the graph views.
Query Views
We discussed how the system can help users to select views.
C B D C B B D D However, the views are still created manually as an offline pro-
cess. This is expensive and often the utility of a view is not known
A E A E E a-priori. Let us now see how we can leverage the query plan-
V1 V2 V3
ning algorithm to adaptively create the graph views. The key idea
is to start by materializing smaller query fragments and progres-
Linear Decomposition Bushy Decomposition sively combine them to create views of larger query fragments.
To do this, we annotate each solution with the list of graph views
C B D D C D
it processes, i.e., solution s now have five pieces of information:
A B B E
(plan, cost, partitioning, estimator, {GVi }). When combining the
A E E
child query plans, we union the graph views from the children.
Candidate Plan: (V1 ⋈ V2) ⋈ V3 B D
When the algorithm runs for the first time there is only a single
input graph view which is the base graph itself. We look at all
Candidate Plan: V1 ⋈ (V2 ⋈ V3) the leaf level query plans, and materializing the one(s) having the
maximum utility, i.e., they are the most useful. In each subsequent
runs, we consider materializing the query plans which combine
Figure 2: View Matching during Linear and Bushy Decompositions.
existing views, i.e., we essentially move the view higher up in the
query tree. We still consider materializing new leaf level plans from
S.No. View the base graph. Rather than combining the graph views greedily, a
1 A ← B, A ← C, B ← F more fancy version can also keep counters on how many times each
2 A ← B, A ← C, C ← E graph view is used. We can then combine the most frequent as well
3 A ← B, A ← C, D ← B as most useful graph views.
4 A ← B, A ← C, E ← C The above adaptive view creation technique has two major advan-
5 B ← A, B ← D, E ← B tages. First, it amortizes the cost of creating the view over several
queries. This is because creating views at the lower levels involve
Table 2: Top-5 views of size three suggested by the system for the
fewer joins and hence it is cheaper. The system only spends more
workload in [8]
resources on creating a view in case it is used more often. Second,
this approach starts from more general leaf level views, which could
be used across a larger set of queries, and gradually specializes to
Our extend view matching algorithm can still leverage all of the larger views higher up in the query tree. This is useful in scenarios
optimizations proposed by Huang et al. [8], including considering where a user starts from ad-hoc analysis and later converges to a
co-partitioned edges as the smallest fragment since they can be specific query workload — something which is plausible in pattern
computed locally, performing both linear and bushy decomposition matching queries.
of the queries, and applying cycle-based optimization.