蔡東邦 (DB Tsai) 把嵌套列表作为 Apache Spark SQL 的首选

文字内容
1. Enhancements and Usage of Apache Spark at Apple DB Tsai ArchSummit 2019 © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
5. Siri The world’s most popular intelligent assistant service powering every iPhone, iPad, Mac, Apple TV, Apple Watch, and HomePod © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
6. Siri Open Source Team • We’re Spark, Hadoop, HBase PMCs / Committers / Contributors • We’re the advocate for Open Source • Pushing our internal changes back to the upstreams • Working with the communities to review pull requests, develop new features and bug fixes © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
7. Siri Data • Machine learning is used to personalize your experience throughout your day • We believe privacy is a fundamental human right © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
8. Data in a ML Product Hidden Technical Debt in Machine Learning Systems, from NIPS 2015 © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
9. Siri Analytics Scale • Large amounts of requests, Data Centers all over the world • Hadoop / Yarn Cluster with thousands of nodes • HDFS has hundred of PB • 100's TB of raw event data per day • Processing with Spark, Pig and MapReduce © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
10. Siri Data Pipelines • Gold Standard Dataset • Provide an accurate and reliable references • Easy to explore the existing data • Data consumers should be able to iterate fast • Easy to share the new data • Apache Spark is one of the the frameworks in the pipelines © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
11. Unified Pipeline • Single repo for Spark application across the Siri • Shared business logic code to avoid any discrepancy • Raw data is cleaned, joined, and transformed into one standardized data model for data consumers to query on © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
12. Technical Details about our Data • Schema of data is checked in as case class, and CI ensures schema changes won’t break the jobs • Deeply nested relational model data with 5 top level columns • The total fields are around 2k • Stored in Parquet format partitioned by UTC date © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
13. Data Usage Patterns • Data access patterns are very different for data producers and consumers • Data producers trend to read all big data to integrate them to produce another big data such as Gold Standard Dataset • Data consumers trend to read subset of big data to produce a very small or aggregated data © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
14. An Example of Hierarchically Organized Table Real estate information can be naturally modeled by case class Address(houseNumber: Int, streetAddress: String, city: String, state: String, zipCode: String) case class Facts(price: Int, size: Int, yearBuilt: Int) case class School(name: String) case class Home(address: Address, facts: Facts, schools: List[School]) © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
15. Nested SQL Schema sql("select * from homes”).printSchema() root -- address: struct (nullable = true) -- houseNumber: integer (nullable = true) -- streetAddress: string (nullable = true) -- city: string (nullable = true) -- state: string (nullable = true) -- zipCode: string (nullable = true) -- facts: struct (nullable = true) -- price: integer (nullable = true) -- size: integer (nullable = true) -- yearBuilt: integer (nullable = true) -- schools: array (nullable = true) -- element: struct (containsNull = true) -- name: string (nullable = true) © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
16. Find cities with houses worth more than 2M sql("select address.city from homes where facts.price > 2000000”) .explain(true) == Physical Plan == *(1) Project [address#55.city AS city#75] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56], DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts)], ReadSchema: struct<address:struct<houseNumber:int,streetAddress:string, city:string,state:string,zipCode:strin…, facts:struct(address:int…)> • We only need two nested columns, address.city and facts.prices • But entire address and facts are read © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
17. [SPARK-4502], [SPARK-25363] Parquet with Nested Columns • Parquet is a columnar storage format with complex nested data structures in mind • Support very efficient compression and encoding schemes • As a columnar format, each nested column is stored separately as if it's a flattened table • No easy way to cherry pick couple nested columns in Spark • Foundation - Allow reading subset of nested columns right after Parquet FileScan © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
18. Find cities with houses worth more than 2M • With [SPARK-4502], [SPARK-25363] sql("select address.city from homes where facts.price > 2000000”) == Physical Plan == *(1) Project [address#55.city AS city#77] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56] DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts)], ReadSchema: struct<address:struct<city:string>,facts:struct<price:int>> • Only two nested columns are read! © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
19. Find cities with houses worth more than 2M • With [SPARK-4502], [SPARK-25363] sql("select address.city from homes where facts.price > 2000000”) == Physical Plan == *(1) Project [address#55.city AS city#77] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56] DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts)], ReadSchema: struct<address:struct<city:string>,facts:struct<price:int>> • Parquet predicate pushdown are not working for nested fields in Spark © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
20. Find cities with houses worth more than 2M • With [SPARK-25556] sql("select address.city from homes where facts.price > 2000000”) == Physical Plan == *(1) Project [address#55.city AS city#77] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56] DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts), GreaterThan(facts.price,2000000)], ReadSchema: struct<address:struct<city:string>,facts:struct<price:int>> • Predicate Pushdown in Parquet for nested fields provides significant performance gain by eliminating non-matches earlier to read less data and save the cost of processing them © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
21. Applying an UDF after repartition val areaUdf = udf{ (city: String, state: String, zipCode: String) => s"$city, $state $zipCode" } val query = sql("select * from homes").repartition(1).select( areaUdf(col("address.city"), col("address.state"), col("address.zipCode")) ).explain() == Physical Plan == *(2) Project [UDF(address#58.city, address#58.state, address#58.zipCode) AS UDF(address.city, address.state, address.zipCode)#70] +- Exchange RoundRobinPartitioning(1) +- *(1) Project [address#58] +- *(1) FileScan parquet [address#58] Format: Parquet, ReadSchema: struct<address:struct<houseNumber:int,streetAddress:string, city:string,state:string,zipCode:string>> © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
22. Problems in Supporting Nested Structures in Spark • Root level columns are represented by Attribute which is base of leaf named expressions • To get a nested field from a root level column, a GetStructField expression with child of Attribute has to be used • All column pruning logics are done in Attribute levels, resulting either the entire root column is taken or pruned • No easy way to cherry pick couple nested columns in this model © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
23. [SPARK-25603] Generalize Nested Column Pruning • [SPARK-4502], [SPARK-25363] are foundation to support nested structures better with Parquet in Spark • If an operator such as Repartition, Sample, or Limit are applied after Parquet FileScan, nested column pruning will not work • We address this by flattening the nested fields using Alias right after data read © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
24. Applying an UDF after repartition val query = sql("select * from homes").repartition(1).select( areaUdf(col("address.city"), col("address.state"), col("address.zipCode")) ).explain() == Physical Plan == *(2) Project [UDF(_gen_alias_84#84, _gen_alias_85#85, _gen_alias_86#86) AS UDF(address.city, address.state, address.zipCode)#64] +- Exchange RoundRobinPartitioning(1) +- *(1) Project [address#55.city AS _gen_alias_84#84, address#55.state AS _gen_alias_85#85, address#55.zipCode AS _gen_alias_86#86] +- *(1) FileScan parquet [address#55] ReadSchema: struct<address:struct<city:string,state:string,zipCode:string>> • Nested fields are replaced by Alias with flatten structures • Only three used nested fields are read from Parquet files © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
25. Production Query - Finding a Needle in a Haystack © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
26. Spark 2.3.1 1.2h © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple. 7.1TB
27. Spark 2.4 with [SPARK-4502], [SPARK-25363], and [SPARK-25556] © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple. 1.2h 7.1TB 3.3min 840GB
28. • 21x faster in wall clock time • 8x less data being read • More power efficient © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
29. Conclusions With some work, engineering rigor and some optimizations Spark can run at very large scale in lightning speed © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
30. Thank you © 2019 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
33. ™ and © 2019 Apple Inc. All rights reserved. TM and © 2018 Apple Inc. All rights reserved.