Spark SQL 在字节跳动的优化实践 郭俊

Razor

2019/10/19 发布于 技术 分类

文字内容
1. Spark SQL
3. • • • • • Spark SQL / Druid ETL OLAP
4. • Spark SQL • Spark SQL • Spark Shuffle •
5. Spark SQL RBO Catalog DataFrame SQL Analyzer Unresolved Logical Plan Optimizer Resolved Logical Plan Query Planner Optimized Logical Plan Catalyst AE Physical Plan Cost Model Dataset Parser CBO Selected Physical Plan DAG RDDs
6. Spark SQL • Bucket Join • • •
7. Spark SQL ——Bucket Join Shuffle Table 1 Join Table 2 Sort Merge Join partition 0 partition 0 partition 1 partition 1 partition 1 partition 2 partition 2 partition 2 partition 2 … … … … partition m partition n partition n partition k partition 0 partition 1 Shuffle Sort Shuffle Sort partition 0
8. Spark SQL ——Bucket Join Table 1 Shuffle Join Join Table 2 bucket 0 bucket 0 bucket 1 bucket 1 bucket 2 bucket 2 … … bucket n bucket n
9. Spark SQL ——Bucket outputPartitioning:'>outputPartitioning: UnknownPartitioning outputOrdering:'>outputOrdering: Nil HiveTableScan HiveTableScan Exchange Exchange Sort Sort Sort Merge Join outputPartitioning:'>outputPartitioning: HashPartitioning(id, n, HiveHash) outputOrdering:'>outputOrdering: SortOrder(id) HiveTableScan HiveTableScan Sort Merge Join requireChildDistribution:'>requireChildDistribution: HashClusteredDistribution(id, n, HiveHash) requireChildOrdering:'>requireChildOrdering: SortOrder(id) requireChildDistribution:'>requireChildDistribution: HashClusteredDistribution(id, n, Murmur3Hash) requireChildOrdering:'>requireChildOrdering: SortOrder(id)
10. Spark SQL ——Bucket Hive Hive M M ...... Spak SQL M M M R bucket 0 …... M M R R bucket 1 M …... bucket (n-1) bucket 0 bucket 0 bucket 0 bucket 0 bucket 1 …... bucket (n-1)
11. Spark SQL ——Bucket Bucket Join Table A (3 bucket) (0, 3, 6, 9, 12, 15) bucket 0 (1, 4, 7, 10,13, 16) Table B(6 bucket) bucket 0 (0, 6, 12) bucket 1 bucket 1 (1, 7, 13) (2, 5, 8, 11, 14, 17) bucket 2 bucket 2 (2, 8, 14) bucket 3 (3, 9, 15) Table A (3 bucket) Table B (6 bucket) TableScan TableScan Sort Sort Merge Join bucket 4 (4, 10, 16) bucket 5 (5, 11, 17)
12. Spark SQL ——Bucket Bucket Join Table A (3 bucket) (0, 3, 6, 9, 12, 15) bucket 0 (1, 4, 7, 10,13, 16) Table B(6 bucket) bucket 0 (0, 6, 12) bucket 1 bucket 1 (1, 7, 13) (2, 5, 8, 11, 14, 17) bucket 2 bucket 2 (2, 8, 14) (0, 3, 6, 9, 12, 15) bucket 0’ bucket 3 (3, 9, 15) Table A (3 bucket) Table A (3 bucket) Table B (6 bucket) TableScan TableScan TableScan BucketUnion Sort Merge Join (1, 4, 7, 10,13, 16) bucket 1’ bucket 4 (4, 10, 16) (2, 5, 8, 11, 14, 17) bucket 2’ bucket 5 (5, 11, 17)
13. Spark SQL ——Bucket Bucket Join • Bucket • Bucket Bucket • Bucket • Bucket Bucket • Ø Ø Bucket Table Property Bucket Join SortMergeJoin
14. Spark SQL Table X Bucket by A A X X X Y Y Y Z Z Z B C 1 2 4 6 7 8 2 4 5 1 3 2 7 3 5 8 3 2 ——Bucket Table X Bucket by A Table Y Bucket by A TableScan TableScan Sort on A B Sort on A B Sort Merge Join on A B Table Y Bucket by A A X X X Y Y Y Z Z Z B 2 1 4 8 6 7 2 5 4 C 3 1 2 5 7 3 8 2 3
15. Spark SQL —— Spark SQL (Map/Struct/Array) • • • • Filter JSON CPU
16. Spark SQL —— • key • Map/ Array/ Struct Example • • • event_log event date/hour/app params (MapType) stg_app_event_log key event • • • : 60TB memory+3000 core / hour 180TB disk /day key 15 key 8 key • • • • 31% (6628/21130) 29% (21559/75203) params params 860h/ 2800h/
17. Spark SQL ——
18. Spark SQL —— —— / IO Rewrite SQL_adhoc_6 6.3 min / 797.6 GB 3.4 min / 111.8 GB 85.3%↑ 86% ↓ SQL_adhoc_7 16.5 min / 3.2 TB 5.0 min / 111.1 GB 230%↑ 96.6%↓ SQL_etl_2 24 min / 3.7 TB 9.1 min / 686.1 GB 130.8%↑ 82%↓
19. Spark SQL —— • OLAP • Aggregate Aggregate Join Join • • SQL • • Rollup Query Rewrite
20. Spark SQL —— CREATE TABLE user_event ( user STRING, event_type STRING, num INT, date INT, hour INT ) USING parquet PARTITIONED BY (date, hour) CREATE METERIALIZED VIEW date_hour_user_agg PARTITIONED BY (date) ON TABLE user_event AS SELECT date, user, avg(num) FROM user_event GROUP BY date, user SELECT user, sum(num) FROM user_event WHERE date = 20190101 GROUP BY user SELECT user, sum_num FROM date_hour_user_agg WHERE date = 20190101 rewrite
21. Spark SQL —— • Partial / Final Aggregator • Spark Shuffle • Split • • Adaptive Execution • • Bloom Filter • Runtime Filter • RTO • Shuffle • Local Sort • MultiJoin • • Bucket
22. Spark Shuffle • HDFS • Shuffle Shuffle
23. Spark Shuffle Shuffle Shuffle Map Stage Shuffle Mapper 1 Partition 0 Partition 1 Partition 2 Shuffle Mapper 2 Partition 0 Partition 1 Partition 2 Shuffle Mapper 3 Partition 0 Partition 1 Partition 2 Shuffle Reduce Stage Partition 0 Shuffle Reducer 1 Partition 1 Shuffle Reducer 2 Partition 2 Shuffle Reducer 3
24. Spark Shuffle Shuffle
25. Spark Shuffle Shuffle Read
26. Spark Shuffle cpu.busy = 99% cpu.user = 97% Node Manager CPU Shuffle Read
27. Spark Shuffle Node Manager CPU Shuffle Read
28. Spark Shuffle DataNode IO Shuffle Read
29. Spark Shuffle Shuffle HDFS
30. Spark Shuffle Shuffle 800GB Mapper 1200 Reducer 2200 Shuffle Shuffle VS Mapper 450GB 10000 Reducer: 10000 Shuffle Read
31. Spark Shuffle 10000 mapper 10000 reducer 60 45 30 2=2 RPC
32. Spark Shuffle Shuffle HDFS Shuffle Mapper 1 Shuffle Mapper 2 Shuffle Writer Shuffle Writer Local Disk … Local Disk Shuffle HDFS Shuffle Mapper N Shuffle Reducer 1 Shuffle Reducer 2 Shuffle Writer Shuffle Reader Shuffle Reader Local Disk Shuffle Service HDFS … Shuffle Reducer N Shuffle Reader
33. Spark Shuffle HDFS 57%+ Shuffle 14%+ Shuffle 18%+ 12%+
34. Spark Shuffle Mapper Mapper Shuffle Read IO
35. Spark Shuffle Reducer M*N IO N IO
36. Spark Shuffle
37. Spark Shuffle
38. Spark SQL • • • ACID • Dynamic Group Partition • Runtime Filter • • Aggregation • Filter • Catalog Service