1. Airflow + Kubernetes Daniel Imberman, Bloomberg Barni Seetharaman, Google
2. Bio: Daniel + Barni • Daniel - Data Science Infrastructure @ Bloomberg LP - See our talk tomorrow: Machine Learning the Kubernetes Way • Barni Seethraman - Kubernetes @ Google Cloud - Works on Kubernetes Workloads API
3. Pipelines are hard
4. Pipelines are hard Raw Data Actionable Data
5. Pipelines are hard Raw Data Actionable Data
6. Pipelines are hard Raw Data Actionable Data
7. Pipelines are hard Raw Data Actionable Data TWITTER, TWEET, RETWEET and the Twitter logo are trademarks of Twitter, Inc. or its affiliates.
8. Pipelines are hard Raw Data Actionable Data TWITTER, TWEET, RETWEET and the Twitter logo are trademarks of Twitter, Inc. or its affiliates.
9. Lots of pipelines are really hard
10. Enter Apache Airflow
11. Apache Airflow • Workflow Scheduler developed @ Airbnb • Converts Python code into DAGs • Has large number of operators/hooks (HDFS, Spark, Bash, Hive, etc…)
12. Apache Airflow
13. Apache Airflow
14. Creating a pipeline with Airflow Define Operators Set Dependencies
15. Creating a pipeline with Airflow Define Operators Set Dependencies
16. Creating a pipeline with Airflow Define Operators Set Dependencies
17. Managing a pipeline with Airflow
18. State of things: Complexity • Complex to deploy and manage • Multiple components • Varied configurations • Executor • DAG source • Different failure points
19. State of things: Static Allocation Scheduler
20. State of things: Static Allocation Scheduler
21. State of things: Static Allocation Scheduler
22. Kubernetes + Airflow • Modernized stack using containers + k8s • Reduced Deployment and Management complexity • Dynamic Resource Allocation • Automatic Fault remediation • Improved resource utilization
23. Airflow + Kubernetes Kubernetes Cluster 1. KubernetesPodOperator 2. KubernetesExecutor 3. AirflowOperator (k8s controller)
24. KubernetesPodOperator • Allow users to deploy arbitrary Docker images • Users can offload dependencies to containers • “Lets Airflow focus on scheduling tasks” Scheduler
25. KubernetesPodOperator • Airflow workers are much lighter (don’t require extra libraries) • Easy rollbacks + deployments through tags
26. KubernetesExecutor • High levels of parallelism (dynamic allocation) • Task-level pod configuration • Fault Tolerance
27. Dynamic Allocation Scheduler
28. Dynamic Allocation Scheduler
29. Dynamic Allocation Scheduler
30. Dynamic Allocation Scheduler
31. Task Level Configs t = BashOperator( task_id = ‘account-test’, bash_command = ‘gcloud auth application-default login’, dag = dag, executor_config = { ‘request_memory’: ‘128Mi’, ‘limit_memory’: ‘128Mi’ ‘image’: ‘airflow/scipy:1.1.5’ ‘gcp-service-account’ : ‘service-account@xxx.iam.gserviceaccount.com’ } )
32. Task Level Configs t = BashOperator( task_id = ‘account-test’, bash_command = ‘gcloud auth application-default login’, dag = dag, executor_config = { ‘request_memory’: ‘128Mi’, ‘limit_memory’: ‘128Mi’ ‘image’: ‘airflow/scipy:1.1.5’ ‘gcp-service-account’ : ‘service-account@xxx.iam.gserviceaccount.com’ } )
33. Task Level Configs t = BashOperator( task_id = ‘account-test’, bash_command = ‘gcloud auth application-default login’, dag = dag, executor_config = { ‘request_memory’: ‘128Mi’, ‘limit_memory’: ‘128Mi’ ‘image’: ‘airflow/scipy:1.1.5’ ‘gcp-service-account’ : ‘service-account@xxx.iam.gserviceaccount.com’ } )
34. Task Level Configs t = BashOperator( task_id = ‘account-test’, bash_command = ‘gcloud auth application-default login’, dag = dag, executor_config = { ‘request_memory’: ‘128Mi’, ‘limit_memory’: ‘128Mi’ ‘image’: ‘airflow/scipy:1.1.5’ ‘gcp-service-account’ : ‘service-account@xxx.iam.gserviceaccount.com’ } )
35. Kubernetes Executor Scheduler Watcher Thread
36. Kubernetes Executor Scheduler Watcher Thread
37. Kubernetes Executor Scheduler Watcher Thread
38. Kubernetes Executor Scheduler Watcher Thread
39. Fault Tolerance Scheduler
40. Fault Tolerance • Uses “resourceVersion” to re-create state • Maintain a resourceVersion in SQL table for state recovery
41. DAG Propagation Scheduler DAGs DAGs
42. DAG Propagation Scheduler DAGs DAGs DAGs DAGs
43. DAG Injection • Three modes: Git-init mode, persistent volume mode, and “prebaked” mode (1.10.2) • Git-init mode + pre-baked is recommended for development and small instances of Airflow, because it does not involve any distributed file systems • Persistent volume mode recommended for large DAG folders
44. AirflowOperator (k8s controller) • Simplifies Airflow deployment and management • Is a Custom Kubernetes controller • Using CRDs, user creates declarative specs describing his intent • AirflowBase • AirflowCluster
45. AirflowBase CRD AirflowBase client kubectl k8s core API Server statefulset controller Endpoints controller dynamic provisioner Airflow Controller • AirflowBase CRD - MySQL/Postgres/SQLProxy - NFS • Used by multiple Airflow Clusters
46. AirflowBase CRD AirflowBase client kubectl k8s core API Server statefulset controller • AirflowBase CRD - MySQL/Postgres/SQLProxy - NFS • Used by multiple Airflow Clusters Endpoints controller dynamic provisioner Airflow Controller apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowBase metadata: name: mc-base spec: mysql: operator: False storage: version: ""
47. AirflowBase CRD AirflowBase client kubectl Post AirflowBase k8s core API Server statefulset controller • AirflowBase CRD - MySQL/Postgres/SQLProxy - NFS • Used by multiple Airflow Clusters Endpoints controller dynamic provisioner Airflow Controller apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowBase metadata: name: mc-base spec: mysql: operator: False storage: version: ""
48. AirflowBase CRD AirflowBase client kubectl Post AirflowBase k8s core API Server statefulset controller • AirflowBase CRD - MySQL/Postgres/SQLProxy - NFS • Used by multiple Airflow Clusters Endpoints controller dynamic provisioner StatefulSets, Pods, Service, ConfigMap, Secrets Cluster, Backup Restore Airflow Controller apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowBase metadata: name: mc-base spec: mysql: operator: False storage: version: ""
49. AirflowBase CRD AirflowBase client kubectl • AirflowBase CRD Post AirflowBase k8s core API Server statefulset controller statefulset MySQL .operatror False pod1 - MySQL/Postgres/SQLProxy pv1 - NFS • Used by multiple Airflow Clusters Endpoints controller dynamic provisioner StatefulSets, Pods, Service, ConfigMap, Secrets Cluster, Backup Restore Airflow Controller apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowBase metadata: name: mc-base spec: mysql: operator: False storage: version: ""
50. AirflowBase CRD AirflowBase client kubectl • AirflowBase CRD Post AirflowBase k8s core API Server statefulset MySQL .operatror False pod1 - MySQL/Postgres/SQLProxy pv1 - NFS • Used by multiple Airflow Clusters statefulset controller Endpoints controller dynamic provisioner StatefulSets, Pods, Service, ConfigMap, Secrets Cluster, Backup Restore Airflow Controller statefulset NFS pod1 pv1 apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowBase metadata: name: mc-base spec: mysql: operator: False storage: version: ""
51. AirflowBase CRD AirflowBase client kubectl • AirflowBase CRD Post AirflowBase k8s core API Server statefulset MySQL .operatror False pod1 - MySQL/Postgres/SQLProxy pv1 - NFS • Used by multiple Airflow Clusters statefulset controller Endpoints controller dynamic provisioner StatefulSets, Pods, Service, ConfigMap, Secrets Cluster, Backup Restore Airflow Controller statefulset NFS pod1 pv1 apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowBase metadata: name: mc-base spec: mysql: operator: False storage: version: "" apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowBase metadata: name: ck-base spec: sqlproxy: project: someproject region: us-central1 instance: testsql-cluster storage: version: ""
52. AirflowBase CRD AirflowBase client kubectl • AirflowBase CRD Post AirflowBase k8s core API Server dynamic provisioner StatefulSets, Pods, Service, ConfigMap, Secrets Cluster, Backup Restore Airflow Controller MySQL/Postgres/SQLProxy - NFS • Used by multiple Airflow Clusters statefulset controller Endpoints controller - statefulset SQLProxy pod1 apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowBase metadata: name: mc-base spec: mysql: operator: False storage: version: "" apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowBase metadata: name: ck-base spec: sqlproxy: project: someproject region: us-central1 instance: testsql-cluster storage: version: ""
53. AirflowCluster CRD AirflowCluster client kubectl Post AirflowCluster statefulset Scheduler CeleryExecutor Celery Executor pod1 Dag pv k8s core API Server statefulset controller endpoints controller statefulset Redis dynamic provisioner StatefulSets, Pods, Service, ConfigMap, Secrets Airflow Controller statefulset Workers statefulset Airflow UI pod1 pod2 pv1 pv2 pod1 pod2 Dag pv Dag pv pod1 Dag pv … … podN Dag pv • Celery Executor - Redis - Airflow UI - Airflow Scheduler - Airflow Workers • Each cluster gets its own unique SQL connection string (user:password/dB). apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowCluster metadata: name: mc-cluster spec: executor: Celery config: airflow: AIRFLOW_SOME_CONFIG: SomeValue redis: operator: False scheduler: version: "1.10.1" ui: replicas: 1 version: "1.10.1" worker: replicas: 2 version: "1.10.1" flower: replicas: 1 version: "1.10.1" dags: subdir: "airflow/example_dags/" git: repo: "https://github.com/apache/incubator airflow/" once: true airflowbase: name: mc-base
54. AirflowCluster CRD AirflowCluster client kubectl Post AirflowCluster statefulset Scheduler statefulset controller endpoints controller statefulset Airflow UI Airflow Controller pod1 Dag pv dynamic provisioner StatefulSets, Pods, Service, ConfigMap, Secrets Local/K8s Executor Dag pv k8s core API Server pod1 • AirflowCluster CRD with Local/K8s Executor - Airflow UI - Airflow Scheduler apiVersion: airflow.k8s.io/v1alpha1 kind: AirflowCluster metadata: name: mk-cluster spec: executor: Kubernetes ui: replicas: 1 version: "1.10.1" scheduler: version: "1.10.1" worker: version: "1.10.1" dags: subdir: "airflow/example_dags/" git: repo: "https://github.com/apache/ incubator-airflow/" once: true branch: master airflowbase: name: mc-base
55. AirflowCluster CRD - Pod affinity rules cluster.Spec.Affinity.*.topology can be set to “kubernetes.io/ hostname” to spread Pods across Nodes within a Zone. Region Zone1 Statefulset 1 Statefulset 2 - Limit the impact of node failures within a zone Node 2 Node 2 pod1 pod2 pv1 pv2 pod1 pod2 pv1 pv2 Node N … podN pvN … podN pvN … Pods spread across Nodes in Zone
56. AirflowOperator k8s node kubelet - - Multiple DAG sources are supported via a DAG Sidecar Custom Airflow Pod images are supported Airflow Pod Scheduler or Worker container scheduler MySQL DAG Sidecar GCS Bucket packaged gcsfuse Airflow Pod S3 sync dags GIT Git sync DAG Storage options: PVC rwx NFS PV rwx DAG volume 1. PVC rwx NFS PV 2. Gcsfuse + gcs bucket 3. Pre-package dags in container 4. Git-syncer that syncs DAGs from git repo 5. S3 bucket sync that copies from s3 buckets
57. Monitoring • Can use existing Kubernetes infrastructure • Only needed to think about Airflow, not machines
58. Prometheus
59. Elasticsearch
60. K8sExecutor Status • Has been released with Airflow 1.10 in experimental mode • Multiple companies already using in production • Helm chart in progress • AirflowOperator by end of 2018 • Active community in #sig-big-data on kubernetes.slack.com • Seeking beta testers, devs, and brave souls
61. Airflow Operator Status • Supports Airflow 1.10.1 • Available on Kubernetes Marketplace in GCP • Slack channels kubernetes.slack.com #sig-big-data #airflow-operator
62. Demo
63. Thank You Learn more: github.com/apache/incubator-airflow/ @danimberman github.com/GoogleCloudPlatform/airflow-operator @bharanis

相关幻灯片