Apache Spark 2,4 和未来 王耿亮

QCon大会

2019/06/25 发布于 技术 分类

QCon  QCon2019 

文字内容
1. Apache Spark 2.4 和未来 王耿亮 2019年5月
3. 关于我 • Apache Spark 社区活跃贡献者 (Github: gengliangwang) • Software Engineer at
7. Release: 2018年 11 月 8 日 Blog: https://t.co/k7kEHrNZXp 共计 1100 个tickets左右. 7
8. Spark 2.4 的主要功能 Barrier Execution Spark on Kubernetes Beta support Scala 2.12 PySpark Improvement Structured Streaming Image Source Native Avro Support Built-in source Improvement Higher-order Functions Various SQL Features 8
9. Major Features on Spark 2.4 Barrier Execution Spark on Kubernetes PySpark Improvement Higher-order Functions Native Avro Support Scala 2.12 Image Source Built-in source Improvement Structured Streaming Various SQL Features 9
10. Apache Spark: The First Unified Analytics Engine Uniquely combines Data & AI technologies Runtime Delta Spark Core Engine Big Data Processing ETL + SQL +Streaming Machine Learning MLlib + SparkR
11. The cross? ?? Horovod Continuous Processing Distributed TensorFlow Pandas UDF Structured Streaming tf.data tf.transform TensorFrames 50+ Data Sources DataFrame-based APIs Python/Java/R interfaces Map/Reduce RDD TF XLA TensorFlow Project Tungsten ML Pipelines API Keras TensorFlowOnSpark CaffeOnSpark Caffe/PyTorch/MXNet AI/M L GraphLab xgboost scikit-learn LIBLINEAR glmnet pandas/numpy/scipy R 11
12. Different execution models Spark Tasks are independent of each other Embarrassingly parallel & massively scalable If one crashes, rerun that one Task 1 Task 2 Task 3 Distributed Training Complete coordination among tasks Optimized for communication If one crashes, must rerun all tasks 12
13. Project Hydrogen: Spark + AI 添加了Gang scheduling,将分布式执行深度学习作业嵌入 Spark阶段。 13
14. Project Hydrogen:'>Hydrogen: Spark + AI Barrier Execution Mode Optimized Data Exchange Accelerator Aware Scheduling Demo: Project Hydrogen:'>Hydrogen: https://vimeo.com/274267107 & https://youtu.be/hwbsWkhBeWM 14
15. Major Features on Spark 2.4 Barrier Execution Spark on Kubernetes PySpark Improvement Higher-order Functions Native Avro Support Scala 2.12 Image Source Built-in source Improvement Structured Streaming Various SQL Features 15
16. Pandas UDFs ● Spark 2.3 引入了 vectorized Pandas UDFs, 其使用Pandas 和 Arrow格式来处理数据。 这 种 UDF 性能比之前的 Python UDF 性能有了很大的提升。 ● Spark 2.4 引入了Grouped Aggregate Pandas UDFs,可用 来定义聚合函数(group by, window) 16
17. 其他重要改动 [SPARK-24396] Add Structured Streaming ForeachWriter for Python [SPARK-23030] Use Arrow stream format for creating from and collecting Pandas DataFrames [SPARK-24624] Support mixture of Python UDF and Scalar Pandas UDF [SPARK-23874] Upgrade Apache Arrow to 0.10.0 • Allow for adding BinaryType support [ARROW-2141] [SPARK-25004] Add spark.executor.pyspark.memory limit 17
18. Major Features on Spark 2.4 Barrier Execution Spark on Kubernetes PySpark Improvement Higher-order Functions Native Avro Support Scala 2.12 Image Source Built-in source Improvement Structured Streaming Various SQL Features 18
19. Flexible Streaming Sink [SPARK-24565] 将每一个 microbatch 的输出作为一个 DataFrame foreachBatch(f: Dataset[T] => Unit) 19
20. Reuse existing batch data sources 20
21. Write to multiple location 21
22. Structured Streaming [SPARK-24662] Support for the LIMIT operator for streams in Append and Complete output modes. [SPARK-24763] Remove redundant key data from value in streaming aggregation [SPARK-24156] Faster generation of output results and/or state cleanup with stateful operations (mapGroupsWithState, stream-stream join, streaming aggregation, streaming dropDuplicates) when there is no data in the input stream. [SPARK-24730] Support for choosing either the min or max watermark when there are multiple input streams in a query. 22
23. Major Features on Spark 2.4 Barrier Execution Spark on Kubernetes PySpark Improvement Higher-order Functions Native Avro Support Scala 2.12 Image Source Built-in source Improvement Structured Streaming Various SQL Features 23
24. AVRO • Apache Avro (https://avro.apache.org) 一种数据序列化格式 • 广泛应用在Spark and Hadoop 生态系统 Spark-Avro package (https://github.com/databricks/spark-avro) Inlining Spark-Avro package [SPARK-24768] • 提升使用 Spark SQL and structured streaming 体验 • • • 24
25. AVRO [SPARK-24811] from_avro/to_avro 支持在一个 DataFrame 内读写数 据,例如: 1. 将 Avro 数据解码 2. 按照 `favorite_color`过滤 3. 用 Avro 格式输出`name` 25
26. AVRO Performance [SPARK-24800] Refactor Avro Serializer and Deserializer 2X faster External library AVRO Data InternalRow AVRO Data Runtime comparison (Lower is better) Row Row InternalRow Native reader AVRO Data InternalRow AVRO Data InternalRow Notebook: https://dbricks.co/AvroPerf 26
27. AVRO Logical Types Avro upgrade from 1.7.7 to 1.8. [SPARK-24771] Logical type support: • Date [SPARK-24772] • Decimal [SPARK-24774] • Timestamp [SPARK-24773] Options: • compression • ignoreExtension • recordNamespace • recordName • avroSchema Blog: Apache Avro as a Built-in Data Source in Apache Spark 2.4. https://t.co/jks7j27PxJ 27
28. Major Features on Spark 2.4 Barrier Execution Spark on Kubernetes PySpark Improvement Higher-order Functions Native Avro Support Scala 2.12 Image Source Built-in source Improvement Structured Streaming Various SQL Features 28
29. Image schema data source [SPARK-22666] Spark datasource for image format Blog: Introducing Built-in Image Data Source in Apache Spark 2.4 https://t.co/VUN3VWK75z 29
30. Build a custom image classifier 30
31. Major Features on Spark 2.4 Barrier Execution Spark on Kubernetes PySpark Improvement Higher-order Functions Native Avro Support Scala 2.12 Image Source Built-in source Improvement Structured Streaming Various SQL Features 31
32. Parquet Update from 1.8.2 to 1.10.0 [SPARK-23972]. • PARQUET-1025 - Support new min-max statistics in parquet-mr • PARQUET-225 - INT64 support for delta encoding • PARQUET-1142 Enable parquet.filter.dictionary.enabled by default. Predicate pushdown • • • • STRING [SPARK-23972] [20x faster] • Decimal [SPARK-24549] • Timestamp [SPARK-24718] • Date [SPARK-23727] Byte/Short [SPARK-24706] StringStartsWith [SPARK-24638] IN [SPARK-17091] 32
33. ORC Native vectorized ORC reader is GAed! • Native ORC reader is on by default [SPARK-23456] • Update ORC from 1.4.1 to 1.5.2 [SPARK-24576] • Turn on ORC filter push-down by default [SPARK-21783] • Use native ORC reader to read Hive serde tables by default [SPARK-22279] • Avoid creating reader for all ORC files [SPARK-25126] 33
34. CSV • Option samplingRatio for schema inference [SPARK-23846] • Option enforceSchema for throwing an exception when user-specified schema doesn‘t match the CSV header [SPARK-23786] • Option encoding for specifying the encoding of outputs. [SPARK-19018] Performance: • Parsing only required columns to the CSV parser [SPARK-24244] • Speed up count() for JSON and CSV [SPARK-24959] • Better performance by switching to uniVocity 2.7.3 [SPARK-24945] 34
35. JSON • Option encoding for specifying the encoding of inputs and outputs. [SPARK-23723] • Option dropFieldIfAllNull for ignoring column of all null values or empty array/struct during JSON schema inference [SPARK-23772] • Option lineSep for defining the line separator that should be used for parsing [SPARK-23765] • Speed up count() for JSON and CSV [SPARK-24959] 35
36. JDBC • Option queryTimeout for the number of seconds the the driver will wait for a Statement object to execute. [SPARK-23856] • Option query for specifying the query to read from JDBC [SPARK-24423] • Option pushDownFilters for specifying whether the filter pushdown is allowed [SPARK-24288] • Auto-correction of partition column names [SPARK-24327] • Support Date/Timestamp in a JDBC partition column when reading in parallel from multiple workers. [SPARK-22814] • Add cascadeTruncate option to JDBC datasource [SPARK-22880] 36
37. Major Features on Upcoming Spark 2.4 Barrier Execution Spark on Kubernetes PySpark Improvement Higher-order Functions Native Avro Support Scala 2.12 Image Source Built-in source Improvement Structured Streaming Various SQL Features 37
38. 高阶函数 遍历 arrays, maps 和 structures. tbl_nested -- key: long (nullable = false) -- values: array (nullable = false) -- element: long (containsNull = false) UDF ? Expensive data serialization 38
39. 高阶函数 1) 检查是否存在元素 SELECT EXISTS(values, e -> e > 30) AS v FROM tbl_nested; 2) 对一个数组进行变形 SELECT TRANSFORM(values, e -> e * e) AS v FROM tbl_nested; tbl_nested -- key: long (nullable = false) -- values: array (nullable = false) -- element: long (containsNull = false)
40. 高阶函数 tbl_nested -- key: long (nullable = false) SELECT FILTER(values, e -> e > 30) AS v -- values: array (nullable = false) FROM tbl_nested; -- element: long (containsNull = 4) 聚合 false) 3) 过滤一个数组 SELECT REDUCE(values, 0, (value, acc) -> value + acc) AS sum FROM tbl_nested;
41. 内置函数 [SPARK-23899] New or extended built-in functions for ArrayTypes and MapTypes • • 26 functions for ArrayTypes transform, filter, reduce, array_distinct, array_intersect, array_union, array_except, array_join, array_max, array_min, ... 3 functions for MapTypes map_from_arrays, map_from_entries, map_concat Blog: Introducing New Built-in and Higher-Order Functions for Complex Data Types in Apache Spark 2.4. https://t.co/p1TRRtabJJ 41
42. Major Features on Spark 2.4 Barrier Execution Spark on Kubernetes PySpark Improvement Higher-order Functions Native Avro Support Scala 2.12 Image Source Built-in source Improvement Structured Streaming Various SQL Features 42
43. When having many columns/functions [SPARK-16406] Analyzer:'>Analyzer: Improve performance of LogicalPlan.resolve Add an indexing structure to resolve(...) in order to find potential matches quicker. [SPARK-23963] Properly handle large number of columns in query on textbased Hive table Turns a list to array, makes a hive table scan 10 times faster when there are a lot of columns. [SPARK-23486] Analyzer:'>Analyzer: Cache the function name from the external catalog for lookupFunctions 43
44. Optimizer/Planner [SPARK-23803] Support Bucket Pruning Prune buckets that cannot satisfy equal-to predicates, to reduce the number of files to scan. [SPARK-24802] Optimization Rule Exclusion Disable a list of optimization rules in the optimizer [SPARK-4502] Nested schema pruning for Parquet tables. Column pruning on nested fields. More: [SPARK-24339] [SPARK-23877] [SPARK-23957] [SPARK-25212] … 44
45. Major Features on Spark 2.4 Barrier Execution Spark on Kubernetes PySpark Improvement Higher-order Functions Native Avro Support Scala 2.12 Image Source Built-in source Improvement Structured Streaming Various SQL Features 45
46. Native Spark App in K8S New Spark scheduler backend Blog: What’s New for Apache Spark on Kubernetes in the Upcoming Apache Spark 2.4 Release https://t.co/uUpdUj2Z4B on 46
47. Major Features on Spark 2.4 Barrier Execution Spark on Kubernetes PySpark Improvement Higher-order Functions Native Avro Support Scala 2.12 Image Source Built-in source Improvement Structured Streaming Various SQL Features 47
48. Scala 2.12 Beta Support [SPARK-14220] Build Spark against Scala 2.12 All the tests PASS! https://dbricks.co/Scala212Jenkins 48
49. What’s Next?
50. 免责声明 本文稿可能包括Apache Spark 未来3.0版本的预测和前景。 这些内容旨在概述开源社区的总体方向,并作为参考提供。它不承诺在 未来提供任何代码或功能。 关于本演示文稿中提到的任何功能的内容,是否发布和具体时间由ASF 和Apache Spark PMC决定。
51. What’s Next? GPU-aware Scheduling Vectorization in SparkR Spark Graph Data Source APIs Hadoop 3.x Scala 2.12 Adaptive Execution Spark on Kubernetes ANSI-SQL Parser Join Hint 51
52. Project Hydrogen: Spark + AI GPU Aware Scheduling • 广泛用于加速特定应用,比如深度学习 52
53. Data Source API V2 • 全新稳定的API, 支持各种数据源 • 批处理和流处理使用统一的 API • 更好的支持多种元数据存储(Hive, Delta等)
54. 自适应查询优化 基于已完成的执行计划节点, 优化剩余的查询执行计划 • 减少 Reducer 的数量 • 将 Sort Merge Join 转换为 Broadcast Hash Join • 处理数据倾斜 Intel Blog: https://tinyurl.com/y3rjwcos
55. 自适应查询优化
56. Spark on K8S • 支持使用pod 模板来定制化driver 和 executor 对应的pods. • 动态资源申请 • 外置的 shuffle 服务 on 56
57. Apache Spark 3.0 其他目标 • 支持Hadoop 3.x • Hive execution 从 1.2.1 升级至 2.3.4 • Scala 2.12 GA • 更加遵从 ANSI SQL • 提高 PySpark 可用性 57
58. Pandas DataFrame vs Spark DataFrame Pandas DataFrame Spark DataFrame Column df[‘col’] df[‘col’] Mutability Mutable Immutable Add a column df[‘c’] = df[‘a’] + df[‘b’] df.withColumn(‘c’, df[‘a’] + df[‘b’]) Rename columns df.columns = [‘a’,’b’] df.select(df[‘c1’].alias(‘a’), df[‘c2’].alias(‘b’)) Value count df[‘col’].value_counts() df.groupBy(df[‘col’].count().orderB y(‘count’, ascending = False) 58
59. Koalas: Pandas API on Spark New open source project
60. Koalas: Pandas API on Spark Unified API for small datasets on laptops & large datasets on clusters github.com/databricks/koalas (Apache 2.0 license) pip install koalas 60
62. Q&A 62