A Deep Dive into Structured Streaming

1. A Deep Dive into Structured Streaming 范文臣
2. Complexities in stream processing COMPLEX DATA COMPLEX WORKLOADS COMPLEX SYSTEMS Diverse data formats Combining streaming with interactive queries Diverse storage systems Machine learning System failures (json, avro, binary, …) Data can be dirty, late, out-of-order (Kafka, S3, Kinesis, RDBMS, …)
3. building robust stream processing apps is hard
4. Structured Streaming stream processing on Spark SQL engine fast, scalable, fault-tolerant rich, unified, high level APIs deal with complex data and complex workloads rich ecosystem of data sources integrate with many storage systems
5. you should not have to reason about streaming
6. you should write simple queries & Spark should continuously update the answer
7. Structured Streaming Model
8. Model Trigger: every 1 sec 1 Time Trigger: how frequently to check input for new data Query: operations on input usual map/filter/reduce new window, session ops Input data up to 1 Query Input: data from source as an append-only table 2 data up to 2 3 data up to 3
9. Model Trigger: every 1 sec 1 Time Input data up to 1 3 data up to 2 data up to 3 output for data up to 2 output for data up to 3 Query Result: final operated table updated every trigger interval 2 Output: what part of result to write to data sink after every trigger Result Complete output: Write full result table every time Output output for data up to 1 complete output
10. Model Trigger: every 1 sec 1 Time Input Output: what part of result to write to data sink after every trigger Result Complete output: Write full result table every time Update output: Write only the rows that changed in result from previous batch Append output: Write only new rows *Not all output modes are feasible with all queries data up to 1 3 data up to 2 data up to 3 output for data up to 2 output for data up to 3 Query Result: final operated table updated every trigger interval 2 Output output for data up to 1 update output
11. API - Dataset/DataFrame Static, bounded data Streaming, unbounded data Single API !
12. Streaming word count
13. Anatomy of a Streaming Word Count spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy($"value".cast("string")) .count() .writeStream .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode(OutputMode.Complete()) .option("checkpointLocation", "…") .start() Source • Specify one or more locations to read data from • Built in support for Files/Kafka/Socket, pluggable. • Can include multiple sources of different types using union()
14. Anatomy of a Streaming Query spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) .writeStream .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode(OutputMode.Complete()) .option("checkpointLocation", "…") .start() Transformation • Using DataFrames, Datasets and/or SQL. • Catalyst figures out how to execute the transformation incrementally. • Internal processing always exactly-once.
15. Anatomy of a Streaming Query spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) .writeStream .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode(OutputMode.Complete()) .option("checkpointLocation", "…") .start() Sink • Accepts the output of each batch. • When supported sinks are transactional and exactly once (Files). • Use foreach to execute arbitrary code.
16. Anatomy of a Streaming Query spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) .writeStream .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode("append") .option("checkpointLocation", "…") .start() Output mode – What's output • Complete – Output the whole answer every time • Update – Output changed rows • Append – Output new rows only Trigger – When to output • Specified as a time, eventually supports data size • No trigger means as fast as possible
17. Anatomy of a Streaming Query spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) .writeStream .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode("append") .option("checkpointLocation", "…") .start() Checkpoint • Tracks the progress of a query in persistent storage • Can be used to restart the query if there is a failure
18. t=2 t=3 process new data process new data Checkpointing – tracks progress (offsets) of consuming data from the source and intermediate state. t=1 process new data Fault-tolerance with Checkpointing Offsets and metadata saved as JSON Can resume after changing your streaming transformations write ahead log end-to-end exactly-once guarantees
19. Underneath the Hood
20. Batch Execution on Spark SQL DataFrame/ Dataset Logical Plan Abstract representation of query
21. Batch Execution on Spark SQL Logical Plan SQL AST DataFrame Dataset Planner Analysis Unresolved Logical Plan Logical Physical Optimization Planning Logical Plan Catalog Optimized Logical Plan Physical Plans Cost Model DataFrame/ Dataset Code Generation Selected Physical Plan RDDs
22. Batch Execution on Spark SQL DataFrame/ Dataset Logical Plan Planner Execution Plan Run super-optimized Spark jobs to compute results Project Tungsten Code Optimizations Memory Optimizations Bytecode generation JVM intrinsics, vectorization Operations on serialized data Compact and fast encoding Offheap memory
23. Continuous Incremental Execution DataFrame/ Dataset Logical Plan Planner Planner knows how to convert streaming logical plans to a continuous series of incremental execution plans, for each processing the next chunk of streaming data Incremental Execution Plan 1 Incremental Execution Plan 2 Incremental Execution Plan 3 Incremental Execution Plan 4
24. Continuous Incremental Execution Planner polls for new data from sources Planner Incrementally executes new data and writes to sink Offsets: [19-105] Incremental Execution 1 Count: 87 Offsets: [106-197] Incremental Execution 2 Count: 92
25. Continuous Aggregations Maintain running aggregate as in-memory state backed by WAL in file system for fault-tolerance Offsets: [19-105] Incremental Execution 1 Running Count: 87 sta te: 87 Offsets: [106-179] Incremental memory 2 Execution sta te: 17 9 Count: 87+92 = 179 state data generated and used across incremental executions
26. Fault-tolerance Planner All data and metadata in the system needs to be recoverable / replayable source state Incremental Execution 1 Incremental Execution 2 sink
27. Fault-tolerance Fault-tolerant Planner Offsets written to fault-tolerant WAL before execution Incremental Execution 1 source state Tracks offsets by writing the offset range of each execution to a write ahead log (WAL) in HDFS Planner Incremental Execution 2 sink
28. Fault-tolerance Failed Planner Planner Fault-tolerant Planner Incremental Execution 1 source state Tracks offsets by writing the offset range of each execution to a write ahead log (WAL) in HDFS Failed planner fails current execution Incremental Failed Execution Execution 2 sink
29. Fault-tolerance Restarted Fault-tolerant Planner Reads log to recover from failures, and re-execute exact range of offsets Incremental Execution 1 source state Tracks offsets by writing the offset range of each execution to a write ahead log (WAL) in HDFS Offsets read back from WAL Planner Incremental Failed Execution Execution 2 Same executions regenerated from offsets sink
30. Fault-tolerance Fault-tolerant Sources Incremental Execution 1 source state Structured streaming sources are by design replayable (e.g. Kafka, files) and generate the exactly same data given offsets recovered by planner Planner Replayable source Incremental Execution 2 sink
31. Fault-tolerance Fault-tolerant State Planner makes sure "correct version" of state used to re-execute after failure Incremental Execution 1 source state Intermediate "state data" is a maintained in versioned, key-value maps in Spark workers, backed by HDFS Planner Incremental Execution 2 state is fault-tolerant with WAL sink
32. Fault-tolerance Fault-tolerant Sink Incremental Execution 1 source state Sink are by design idempotent, and handles re-executions to avoid double committing the output Planner sink Incremental Execution 2 Idempotent by design
33. offset tracking in WAL + state management + fault-tolerant sources and sinks = end-to-end exactly-once guarantees
34. Fast, fault-tolerant, exactly-once stateful stream processing without having to reason about streaming
35. Continuous Processing Continuous processing mode to run without micro-batches Long running Spark tasks and checkpoint periodically <=1 ms latency (same as per-record streaming systems) 1 line change: trigger(Trigger.Continuous("1 second")) Added in Spark 2.3
36. Building Complex Continuous Apps
37. Metric Processing @ Events generated by user actions (logins, clicks, spark job updates) ETL Clean, normalize and store historical data Dashboards Analyze trends in usage as they occur Alerts Notify engineers of critical issues Ad-hoc Analysis Diagnose issues when they occur
38. Metric Processing @ Alerts Filter Metrics ETL Databricks Delta + Ad-hoc Analysis = Dashboards
39. Read from JSON rawLogs = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", ...) .option("subscribe", "rawLogs") .load() augmentedLogs = rawLogs .withColumn("msg", from_json($"value".cast("string"), schema)) .select("timestamp", "msg.*") .join(table("customers"), ["customer_id"]) ETL DataFrames can be reused for multiple streams Can build libraries of useful DataFrames and share code between applications
40. Write to ETL Databricks Delta + Store augmented stream as efficient columnar data for later processing Latency: ~1 minute augmentedLogs .repartition(1) .writeStream .format("delta") .option("path", "/data/metrics") .trigger("1 minute") .start() Buffer data and write one large file every minute for efficient reads
41. Dashboards Databricks Delta Always up-to-date visualizations of important business trends Latency: ~1 minute to hours (configurable) logins = spark.readStream.parquet("/data/metrics") .where("metric = 'login'") .groupBy(window("timestamp", "1 minute")) .count() display(logins) // visualize in Databricks notebooks Dashboards
42. Filter and write to Filter Forward filtered and augmented events back to Kafka Latency: ~100 ms average filteredLogs = augmentedLogs .where("eventType = 'clusterHeartbeat'") .selectExpr("to_json(struct("*")) as value") filteredLogs.writeStream .format("kafka") .option("kafka.bootstrap.servers", ...) .option("topic", "clusterHeartbeats") .start() to convert columns back into json string, and then save as different Kafka topic to_json()
43. Alerts Alerts E.g. Alert when Spark cluster load > threshold Latency: ~100 ms sparkErrors .as[ClusterHeartBeat] .filter(_.load > 99) .writeStream .foreach(new PagerdutySink(credentials)) notify PagerDuty
44. Ad-hoc Analysis Databricks Delta + Trouble shoot problems as they occur with latest information Ad-hoc Analysis Latency: ~1 minute SELECT * FROM delta.`/data/metrics` WHERE level IN ('WARN', 'ERROR') AND customer = "…" AND timestamp < now() – INTERVAL 1 HOUR will read latest data when query executed
45. Metric Processing @ 14+ billion Filter records / hour with 10 nodes Metrics Alerts ETL meet diverse latency requirements as efficiently as possible = Dashboards Ad-hoc Analysis
46. Structured Streaming @ 100s of customer streaming apps in production on Databricks Largest app process 10s of trillions of records per month
47. More Info Structured Streaming Programming Guide http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html Databricks blog posts for more focused discussions https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html https://databricks.com/blog/2017/10/11/benchmarking-structured-streaming-on-databricks-runtime-against-state-of-the-art-streaming-systems.html https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html and more to come, stay tuned!!
48. Thank You Q&A

相关幻灯片