Mars:大规模张量计算系统的构建实践 斯文骏

Razor

2019/10/19 发布于 技术 分类

文字内容
1. Mars:大规模张量计算系统的构建 实践 斯文骏 阿里巴巴 / 技术专家
2. 自我介绍
3. 自我介绍 2015.112017.6 • PyODPS 2017.6-2018.6 • PyODPS • Mars (POC) 2018.6• Mars
4. 目录 什么是 Mars,为什么要开发 Mars Mars 的架构和执行过程 准备 Graph 调度策略 Worker 端的执行 对比与展望
5. 什么是 Mars 提供 Numpy / Scipy / Pandas / Scikit-Learn 类似的 API,使得 Pythoner 能够用自 己熟悉的方式编写分布式代码 例子 从 Numpy 到 Mars Tensor 从 Pandas DataFrame 到 Mars DataFrame 从 scikit-learn 到 Mars Learn
6. 从 Numpy 到 Mars Tensor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import numpy as np from scipy.special import erf def black_scholes(P, S, T, rate, vol): a = np.log(P / S) b = T * -rate z = T * (vol * vol * 2) c = 0.25 * z y = 1.0 / np.sqrt(z) w1 = (a - b + c) * y w2 = (a - b - c) * y d1 = 0.5 + 0.5 * erf(w1) d2 = 0.5 + 0.5 * erf(w2) Se = np.exp(b) * S call = P * d1 - Se * d2 put = call - P + Se return call, put N = 50000000 price = np.random.uniform(10.0, 50.0, N), strike = np.random.uniform(10.0, 50.0, N) t = np.random.uniform(1.0, 2.0, N) print(black_scholes(price, strike, t, 0.1, 0.2)) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import mars.tensor as mt from mars.tensor.special import erf def black_scholes(P, S, T, rate, vol): a = mt.log(P / S) b = T * -rate z = T * (vol * vol * 2) c = 0.25 * z y = 1.0 / mt.sqrt(z) w1 = (a - b + c) * y w2 = (a - b - c) * y d1 = 0.5 + 0.5 * erf(w1) d2 = 0.5 + 0.5 * erf(w2) Se = mt.exp(b) * S call = P * d1 - Se * d2 put = call - P + Se return call, put N = 50000000 price = mt.random.uniform(10.0, 50.0, N) strike = mt.random.uniform(10.0, 50.0, N) t = mt.random.uniform(1.0, 2.0, N) print(mt.ExecutableTuple(black_scholes(price, strike, t, 0.1, 0.2)).execute())
7. 从 Pandas DataFrame 到 Mars DataFrame 1 2 3 4 5 6 7 import numpy as np import pandas as pd df = pd.DataFrame(np.random.rand(100000000, 4), columns=list('abcd')) print(df.sum()) 1 2 3 4 5 6 7 import mars.tensor as mt import mars.dataframe as md df = md.DataFrame(mt.random.rand(100000000, 4), columns=list('abcd')) print(df.sum().execute())
8. 从 scikit-learn 到 Mars Learn 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from sklearn.datasets.samples_generator import \ make_blobs from sklearn.decomposition.pca import PCA X, y = make_blobs(n_samples=100000000, n_features=3, centers=[[3,3, 3], [0,0,0], [ 1,1,1], [2,2,2]], cluster_std=[0.2, 0.1, 0.2, 0. 2], random_state=9) pca = PCA(n_components=3) pca.fit(X) print(pca.explained_variance_ratio_) print(pca.explained_variance_) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from sklearn.datasets.samples_generator import \ make_blobs from mars.learn.decomposition.pca import PCA X, y = make_blobs(n_samples=100000000, n_features=3, centers=[[3,3, 3], [0,0,0], [ 1,1,1], [2,2,2]], cluster_std=[0.2, 0.1, 0.2, 0. 2], random_state=9) pca = PCA(n_components=3) pca.fit(X) print(pca.explained_variance_ratio_.execute()) print(pca.explained_variance_.execute())
9. 为什么要开发 Mars Python 的流行 Numpy、Pandas 是 Python 数据处理的事实标准 为达到更高的执行效率,需要新的框架
10. Python 的流行 TIOBE Index(2019年10月)
11. Numpy、Pandas 是 Python 数据处理的事实标准 Google Trends(全球) 120 100 80 60 40 20 0 2014/10/5 2015/10/5 2016/10/5 pandas TensorFlow 2017/10/5 NumPy Apache Spark 2018/10/5
12. 为达到更高的执行效率,需要新的框架 在 Spark 等现有分布式框架中,执行诸如矩阵乘法这样的操作需要 Shuffle,然而 Shuffle 并不是必需的。使用更细粒度的依赖可以显著提高效率 dot dot dot sum sum sum
13. Mars 的架构和执行过程 Mars 的架构 Actor Model Scheduler 端的执行过程 准备 Graph 调度策略 Worker 端的执行过程 计算和数据存储
14. Actor Model Mars 使用我们自行开发的简化 Actor System 构建,可方便地实现分布式计算框架 Function Call w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1 Ca ll ( A RP sy nc CC all ) w:2:actor1'>w:2:actor1 (As ync ) w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1 w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1 Process 2 IP C Process 1 w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1 Dispatch Process 192.168.0.2:12345 Process 2 Process 1 Dispatch Process 192.168.0.1:12345 w:2:actor1'>w:2:actor1
15. Mars 的架构 Expression Optimization Learn (SKLearn) DataFrame (Pandas) Tensor (Numpy) Web Frontend Execution Assigner Transfer Operand Actor System Calc Storage Layer Worker Graph Quota Meta Resource Scheduler Session
16. 准备 Graph:从代码到粗粒度图 Client-side Series(s) In [1]: import mars.tensor as mt data In [2]: import mars.dataframe as md SeriesData Sum In [3]: a = mt.ones((10, 10), chunk_size=5) In [4]: a[5, 5] = 8 DataFrame(df) data DataFrame Data In [5]: df = md.DataFrame(a) FromT ensor In [6]: s = df.sum() In [7]: s.execute() Out[7]: 0 10.0 1 10.0 2 10.0 3 10.0 4 10.0 5 17.0 6 10.0 7 10.0 8 10.0 9 10.0 dtype: float64 Build TensorData IndexS etValu e Tileable TileableData Operand Tensor(a) data indexes: (5, 5) value: 8 TensorData Ones
17. 准备 Graph:从粗粒度图到细粒度图 SeriesData Client-side Web Scheduler SeriesChunkData SeriesChunkData Sum Sum DataFrameChunkData DataFrameChunkData Concat Concat Sum DataFrameData FromTen sor Serialize Submit Tile DataFrameChunkData DataFrameChunkData DataFrameChunkData DataFrameChunkData Sum Sum Sum Sum DataFrameChunkData DataFrameChunkData DataFrameChunkData DataFrameChunkData FromTe nsor FromTe nsor FromTe nsor FromTe nsor TensorData IndexSet Value indexes:'>indexes: (5, 5) value:'>value: 8 indexes:'>indexes: (0, 0) value:'>value: 8 TensorData Ones TensorChunkData (0, 0) TensorChunkData (1, 0) TensorChunkData (0, 1) TensorChunkData (1, 1) Ones Ones Ones Ones
18. 准备 Graph:优化细粒度图(算子融合) Scheduler SeriesChunkData SeriesChunkData Sum Sum DataFrameChunkData DataFrameChunkData Concat DataFrameChunkData SeriesChunkData Compose Compose Concat DataFrameChunkData DataFrameChunkData DataFrameChunkData Fuse Sum SeriesChunkData Sum Sum DataFrameChunk Data DataFrameChunk Data DataFrameChunk Data DataFrameChunk Data Compose Compose Compose Compose Sum DataFrameChunkData DataFrameChunkData DataFrameChunkData DataFrameChunkData FromTe nsor FromTe nsor FromTe nsor FromTe nsor indexes: (0, 0) value: 8 TensorChunkData (0, 0) TensorChunkData (1, 0) TensorChunkData (0, 1) TensorChunkData (1, 1) Ones Ones Ones Ones
19. 准备 Graph:从细粒度图到可执行图 Scheduler Worker Processor SeriesChunkData it m b u S SeriesChunkData Compose Shared Storage Disk Compose DataFrameChunk Data DataFrameChunk Data Compose Compose Make Executable Fetch Fetch DataFrameChunk Data DataFrameChunk Data Compose Compose Sub mit Processor Shared Storage Disk
20. 调度策略:初始作业分配 在计算量一致的情况下,IO 开销是影响性能的重要因素 减少IO开销措施之一:合理进行初始作业分配可以显著减少数据复制 Worker 1 Worker 2
21. 调度策略:后继作业选择 理想的后继作业调度:前趋作业产生多少数据,即被后继作业消费掉 深度优先策略:执行时应当选择深度更大的可执行(ready)作业
22. 调度策略:作业的分发 Worker 中心分发:Scheduler 提交 Ready 作业到 Worker,Worker 管理作业队列, 当有资源时抢占作业并执行 优点:Worker 对自身资源的感知度更强,因而能带来较低的延迟 缺点:缺乏全局信息,容易导致作业扎堆到某个 Worker OperandActor 抢占 提交 AssignerActor 根据数据 分布分配 TaskQueueActor 提交执行 ExecutorActor
23. 调度策略:作业的分发(续) Scheduler 中心分发:Scheduler 管理 Worker 剩余资源,负责作业分配和提交 优点:具备全局信息,能够调度规模更大的作业 缺点:对 Worker 资源的掌握存在延迟,因而可能存在调度延迟(但可改善) StatusActor AssignerActor 提交 定时 更新 ResourceActor 请求分配 ExecutionActor 请求提交 OperandActor
24. 计算和数据存储 Control Process Worker 由控制进程和若干负责计算和 IO 的进程组成,通过共享内存交换数据 Networking Process Remote End MemQuotaActor ReceiverActor SenderActor Disk IO Process IORunnerActor ExecutionActor CPU Calc Process CpuCalcActor DispatcherActor CPU Calc Process CpuCalcActor Shared Memory (Plasma Store) CPU Calc Process CalcActor CUDA Calc Process CudaCalcActor
25. 计算和数据存储 Control Process 根据可执行图的信息估计所需的内存,申请 Memory Quota Remote End MemQuotaActor Networking Process ReceiverActor SenderActor Disk IO Process IORunnerActor ExecutionActor CPU Calc Process CpuCalcActor DispatcherActor CPU Calc Process CpuCalcActor Shared Memory (Plasma Store) CPU Calc Process CalcActor CUDA Calc Process CudaCalcActor
26. 计算和数据存储 Control Process 根据数据是否在 Plasma Store 中选择从磁盘或者其他 Worker 加载数据 Remote End MemQuotaActor Networking Process ReceiverActor SenderActor Disk IO Process IORunnerActor ExecutionActor CPU Calc Process CpuCalcActor DispatcherActor CPU Calc Process CpuCalcActor Shared Memory (Plasma Store) CPU Calc Process CalcActor CUDA Calc Process CudaCalcActor
27. 计算和数据存储 Control Process 使用单机计算库执行计算并将结果存储回 Plasma Store Remote End MemQuotaActor Networking Process ReceiverActor SenderActor Disk IO Process IORunnerActor ExecutionActor CPU Calc Process CpuCalcActor DispatcherActor CPU Calc Process CpuCalcActor Shared Memory (Plasma Store) CPU Calc Process CalcActor CUDA Calc Process CudaCalcActor
28. 进程和 Worker 级别 Failover 由于 Mars Worker 采用多进程开发,因而单个进程 Crash 时,只需要重建相关 Actor 即可实现故障恢复 如果 Worker Fail,Mars Scheduler 会根据细粒度图进行血缘重建,再重新提交失败 的作业。
29. 对比与展望 对比 Mars、Dask 与 Spark 的对比 Mars 与 Dask 的性能对比 Mars 的未来发展
30. Mars、Dask 与 Spark 的对比 对比项 Mars Dask Spark API丰富程度 Numpy, Pandas, scikit-learn Numpy, Pandas, scikit-learn, delayed RDD, DataFrame, ML ✓ ✓ Kubernetes, Yarn, Mesos 社区 调度 Kubernetes Kubernetes(一般), Yarn, 超算 细粒度图 ✓ ✓ 运行时优化(算子融合) ✓ 可变数据 ✓ 多语言支持设计 ✓ ✓ 容错 ✓ (worker 级别容错) ✓ ✓ ✓ 分布式 master 去单点 ✓
31. Mars 与 Dask 的性能对比 分布式环境,1 Scheduler,4 Worker(24 CPU Core 8GB RAM) 当数据量进一步增大时,Dask 可能会 Crash Dask Mars 3000 2,361 运行时间(秒) 2250 1500 844 750 622 344 149 0 165 159 28 449 269 549 355 443 131 164 111 227 7 6 132 N G D SV R R Q i -P lo ar -C te on M LU v In ot s le ho Sc y sk le ho ck la T FF D C B
32. Mars 的未来发展 进一步丰富 DataFrame、 Learn 和 Tensor 的接口 更强的 Graph 优化 增强的 Fuse 操作 Constant Folding 更好的生态兼容性 支持调度 XGBoost 等其他开源 Library Actor 层优化,支持更高效的执行和网络效率 Roadmap 和 Enhancement Proposal:https://github.com/marsproject/mars/issues/537