Query Optimization for Massively Parallel Data Processing
MapReduce has been widely recognized as an efficient tool for large-scale data analysis. It achieves high performance by exploiting parallelism among processing nodes while providing a simple interface for upper-layer applications. Some vendors have enhanced their data warehouse systems by integrating MapReduce into the systems. However, existing MapReduce-based query processing systems, such as Hive, fall short of the query optimization and competency of conventional database systems . Given an SQL query, Hive translates the query into a set of MapReduce jobs sentence by sentence. This design assumes that the user can optimize his query before submitting it to the system. Unfortunately, manual query optimization is time consuming and difficult, even to an experienced database user or administrator.
In this paper, we propose a query optimization scheme for MapReduce-based processing systems. Specifically, we embed into Hive a query optimizer which is designed to generate an efficient query plan based on our proposed cost model. Experiments carried out on our in-house cluster con- firm the effectiveness of our query optimizer.
Keywords : MapReduce, Hive, Query Optimization, Multi-way Join
MapReduce [15] has been widely recognized as an efficient tool for large-scale data analysis. It achieves high performance by exploiting parallelism among a set of nodes. Massively Parallel Processing (MPP) data warehouse systems, such as Aster [3] and Green plum [4], have recently integrated MapReduce into their systems.
Experiments in [17] show that combining MapReduce and data warehouse systems produces better performance. Besides efficiency, MapReduce simplifies the deployment of MPP systems by providing two user-friendly interfaces: map and reduce. Applications implemented through the extension of the framework are naturally parallelizable and fault-tolerant.
To build applications on MapReduce, users must transform and code them as customized map and reduce functions. One major weakness of MapReduce is its lack of high-level declarative languages. In comparison, SQL, which is supported by most DBMSs, hides implementation details (e.g., access method and plan optimization), thereby simplifying application programming. Recently, some high-level languages have been proposed for MapReduce, such as Pig [24] and Hive [28, 29]. These languages resemble SQL in many ways and are thus familiar to database users. Given a query, they automatically transform the query into a set of MapReduce jobs. Compared to the original MapReduce system, such systems are more suited for MPP data warehousing applications. Users can leverage them to process their data without having to model their application as a sequence of MapReduce operators. Although the syntax and grammar of these systems are similar to SQL, such systems interpret declarative queries procedurally and strictly follow the processing logic specified by users in generating the corresponding map and reduce operations [2, 24]. For example, consider the following Hive query for the TPC-H [5] schema:
SELECT avg(quantity), avg(totalprice), nationkey
SELECT temp.quantity, temp.totalprice, c.nationkey
SELECT l.quantity, o.totalprice, o.custkey
FROM lineitem l JOIN orders o
ON (l.orderkey=o.orderkey)
) temp JOIN customer c ON (temp.custkey=c.custkey)
) fifinaltable GROUP BY nationkey
As has been well recognized in conventional query processing, good plans can indeed improve query performance by orders of magnitude. In current systems, such as Pig [24] and Hive [28, 29], users submit their queries in the corresponding query language supported by the system. The query specification to a large degree fixes the specific query plans used by the underlying system to evaluate the queries. Therefore, as in conventional database systems, a query optimizer is needed to produce near-optimal query execution plans. In this paper, we propose AQUA (Automatic QUery Analyzer), a query optimization method designed for MapReduce-based MPP systems. Based on our experience of query processing in Hive, we find that the performance bottleneck of a MapReduce-based system is the cost of saving intermediate results. In MapReduce systems, to provide fine-grained fault tolerance, the results of each job are flushed back to the DFS (Distributed File System) as a backup. The consecutive job reads results of the previous job to continue with the processing. The I/O cost of DFS is significantly higher than that of the local storage system as network cost is incurred and multiple replicas are usually kept. An efficient MapReduce query plan should therefore avoid generating too many intermediate results.
To address the above requirement, AQUA adopts a two-phase query optimizer. In phase 1, the userrsquo;s query is parsed into a join graph, based on which we adaptively group the join operators. Each group may contain more than one join operator, which will be evaluated by a single MapReduce job. In this way, the total number of MapReduce jobs and the intermediate results that need to be written back to DFS are reduced. In phase 2, the intermediate results of groups are joined together to generate the final query results. We examine all plausibly good plans and select the one that minimizes processing cost. The second phase is similar to a conventional cost based query optimizer in DBMS.
To facilitate our cost estimation, we design a cost model to analyze relational operators in MapReduce jobs. Just as in traditional query optimization, the system maintains statistics about the underlying database to enable the optimizer to estimate the cost of various query plans. After a plan is selected, the expression tree is changed adaptively an
1 简介
MapReduce[15]已被广泛认为是大规模数据分析的有效工具。它通过利用一组节点之间的并行性来实现高性能。大规模并行处理(MPP)数据仓库系统,如Aster[3]和Green Plum[4],最近已经将MapReduce集成到他们的系统中。
SELECT avg(quantity), avg(totalprice), nationkey
SELECT temp.quantity, temp.totalprice, c.nationkey
SELECT l.quantity, o.totalprice, o.custkey
FROM lineitem l JOIN orders o
ON (l.orderkey=o.orderkey)
) temp JOIN customer c ON (temp.custkey=c.custkey)
) fifinaltable GROUP BY nationkey
2 相关工作
Aster[3]和Greenplum[4]是两个集成了MapReduce框架的商业系统。在这些系统中,MapReduce被用来实现传统并行数据库系统中缺乏有效支持的自定义功能。在[17]中,Greenplum展示了如何组合MapReduce运算符和其他关系运算符以实现卓越的性能。与上述方法不同, Pig-Latin[24]和Hive[28]提供了纯MapReduce解决方案,它们定义了一种类似SQL的高级语言来处理大规模分析工作负载。用高级语言表达的查询被转换为一组MapReduce作业,提交给Hadoop[1]。所有的处理逻辑都在MapReduce框架中实现,这使得系统易于部署。在[27]中进行了性能比较,结果表明蜂巢算法比Pig算法效率更高。在蜂巢算法中,基于规则的查询优化被应用于下推选择/投影和生成地图端连接,基于规则的优化器专注于优化单个MapReduce作业的处理,而在我们的方法中,基于成本的优化器被应用来一起优化多个作业。最近,在[23]中,提出了一种针对蜂窝的工作共享框架,它将来自不同查询的作业合并到共享工作中。在AQUA中,我们采用了类似的思想,即内部查询共享来提高性能。
一般而言,AQUA遵循传统查询优化方法的原则,因为在传统数据库系统[19,9]、并行数据库系统[18]和分布式数据库系统[7]中的查询优化已经被研究了多年,并且已经提出了许多最先进的解决方案。然而,基于MapReduce系统的查询优化与传统的查询优化有三个显著不同:1)MapReduce是块操作,所有内部结果都需要物理物化;2)MapReduce采用扫描作为处理策略,不支持流水线;3)数据在映射器和还原器之间进行混洗。因此,我们提出了一个为MapReduce框架量身定做的代价模型,并在此基础上构建了一个优化器来删减 不良计划。
3 查询优化
3.1 阶段1:选择加入策略
DEFINITION 3. Joining Graph
Given a query Q, its joining graph is defined as GQ
= (V,E),
If table Ti is involved in Q, we have a node ni in V that
denotes the table.
If Ti !'Ti .k=Tj .k Tj is a join operation in Q, we create an
undirected edge e = (ni, nj ) and ersquo;s label is set as k.
Figure 2: Joining Graph For TPC-H Q9
4 实施细节
5 结论