# Mars：大规模张量计算系统的构建实践 斯文骏

#### Razor

2019/10/19 发布于 技术 分类

1. Mars：大规模张量计算系统的构建 实践 斯文骏 阿里巴巴 / 技术专家
2. 自我介绍
3. 自我介绍 2015.112017.6 • PyODPS 2017.6-2018.6 • PyODPS • Mars (POC) 2018.6• Mars
4. 目录 什么是 Mars，为什么要开发 Mars Mars 的架构和执行过程 准备 Graph 调度策略 Worker 端的执行 对比与展望
5. 什么是 Mars 提供 Numpy / Scipy / Pandas / Scikit-Learn 类似的 API，使得 Pythoner 能够用自 己熟悉的方式编写分布式代码 例子 从 Numpy 到 Mars Tensor 从 Pandas DataFrame 到 Mars DataFrame 从 scikit-learn 到 Mars Learn
6. 从 Numpy 到 Mars Tensor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import numpy as np from scipy.special import erf def black_scholes(P, S, T, rate, vol): a = np.log(P / S) b = T * -rate z = T * (vol * vol * 2) c = 0.25 * z y = 1.0 / np.sqrt(z) w1 = (a - b + c) * y w2 = (a - b - c) * y d1 = 0.5 + 0.5 * erf(w1) d2 = 0.5 + 0.5 * erf(w2) Se = np.exp(b) * S call = P * d1 - Se * d2 put = call - P + Se return call, put N = 50000000 price = np.random.uniform(10.0, 50.0, N), strike = np.random.uniform(10.0, 50.0, N) t = np.random.uniform(1.0, 2.0, N) print(black_scholes(price, strike, t, 0.1, 0.2)) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import mars.tensor as mt from mars.tensor.special import erf def black_scholes(P, S, T, rate, vol): a = mt.log(P / S) b = T * -rate z = T * (vol * vol * 2) c = 0.25 * z y = 1.0 / mt.sqrt(z) w1 = (a - b + c) * y w2 = (a - b - c) * y d1 = 0.5 + 0.5 * erf(w1) d2 = 0.5 + 0.5 * erf(w2) Se = mt.exp(b) * S call = P * d1 - Se * d2 put = call - P + Se return call, put N = 50000000 price = mt.random.uniform(10.0, 50.0, N) strike = mt.random.uniform(10.0, 50.0, N) t = mt.random.uniform(1.0, 2.0, N) print(mt.ExecutableTuple(black_scholes(price, strike, t, 0.1, 0.2)).execute())
7. 从 Pandas DataFrame 到 Mars DataFrame 1 2 3 4 5 6 7 import numpy as np import pandas as pd df = pd.DataFrame(np.random.rand(100000000, 4), columns=list('abcd')) print(df.sum()) 1 2 3 4 5 6 7 import mars.tensor as mt import mars.dataframe as md df = md.DataFrame(mt.random.rand(100000000, 4), columns=list('abcd')) print(df.sum().execute())
8. 从 scikit-learn 到 Mars Learn 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from sklearn.datasets.samples_generator import \ make_blobs from sklearn.decomposition.pca import PCA X, y = make_blobs(n_samples=100000000, n_features=3, centers=[[3,3, 3], [0,0,0], [ 1,1,1], [2,2,2]], cluster_std=[0.2, 0.1, 0.2, 0. 2], random_state=9) pca = PCA(n_components=3) pca.fit(X) print(pca.explained_variance_ratio_) print(pca.explained_variance_) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from sklearn.datasets.samples_generator import \ make_blobs from mars.learn.decomposition.pca import PCA X, y = make_blobs(n_samples=100000000, n_features=3, centers=[[3,3, 3], [0,0,0], [ 1,1,1], [2,2,2]], cluster_std=[0.2, 0.1, 0.2, 0. 2], random_state=9) pca = PCA(n_components=3) pca.fit(X) print(pca.explained_variance_ratio_.execute()) print(pca.explained_variance_.execute())
9. 为什么要开发 Mars Python 的流行 Numpy、Pandas 是 Python 数据处理的事实标准 为达到更高的执行效率，需要新的框架
10. Python 的流行 TIOBE Index（2019年10月）
11. Numpy、Pandas 是 Python 数据处理的事实标准 Google Trends（全球） 120 100 80 60 40 20 0 2014/10/5 2015/10/5 2016/10/5 pandas TensorFlow 2017/10/5 NumPy Apache Spark 2018/10/5
12. 为达到更高的执行效率，需要新的框架 在 Spark 等现有分布式框架中，执行诸如矩阵乘法这样的操作需要 Shuffle，然而 Shuffle 并不是必需的。使用更细粒度的依赖可以显著提高效率 dot dot dot sum sum sum
13. Mars 的架构和执行过程 Mars 的架构 Actor Model Scheduler 端的执行过程 准备 Graph 调度策略 Worker 端的执行过程 计算和数据存储
14. Actor Model Mars 使用我们自行开发的简化 Actor System 构建，可方便地实现分布式计算框架 Function Call w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1 Ca ll ( A RP sy nc CC all ) w:2:actor1'>w:2:actor1 (As ync ) w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1 w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1 Process 2 IP C Process 1 w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1'>w:1:actor1 Dispatch Process 192.168.0.2:12345 Process 2 Process 1 Dispatch Process 192.168.0.1:12345 w:2:actor1'>w:2:actor1
15. Mars 的架构 Expression Optimization Learn (SKLearn) DataFrame (Pandas) Tensor (Numpy) Web Frontend Execution Assigner Transfer Operand Actor System Calc Storage Layer Worker Graph Quota Meta Resource Scheduler Session
16. 准备 Graph：从代码到粗粒度图 Client-side Series(s) In [1]: import mars.tensor as mt data In [2]: import mars.dataframe as md SeriesData Sum In [3]: a = mt.ones((10, 10), chunk_size=5) In [4]: a[5, 5] = 8 DataFrame(df) data DataFrame Data In [5]: df = md.DataFrame(a) FromT ensor In [6]: s = df.sum() In [7]: s.execute() Out[7]: 0 10.0 1 10.0 2 10.0 3 10.0 4 10.0 5 17.0 6 10.0 7 10.0 8 10.0 9 10.0 dtype: float64 Build TensorData IndexS etValu e Tileable TileableData Operand Tensor(a) data indexes: (5, 5) value: 8 TensorData Ones
17. 准备 Graph：从粗粒度图到细粒度图 SeriesData Client-side Web Scheduler SeriesChunkData SeriesChunkData Sum Sum DataFrameChunkData DataFrameChunkData Concat Concat Sum DataFrameData FromTen sor Serialize Submit Tile DataFrameChunkData DataFrameChunkData DataFrameChunkData DataFrameChunkData Sum Sum Sum Sum DataFrameChunkData DataFrameChunkData DataFrameChunkData DataFrameChunkData FromTe nsor FromTe nsor FromTe nsor FromTe nsor TensorData IndexSet Value indexes:'>indexes: (5, 5) value:'>value: 8 indexes:'>indexes: (0, 0) value:'>value: 8 TensorData Ones TensorChunkData (0, 0) TensorChunkData (1, 0) TensorChunkData (0, 1) TensorChunkData (1, 1) Ones Ones Ones Ones
18. 准备 Graph：优化细粒度图（算子融合） Scheduler SeriesChunkData SeriesChunkData Sum Sum DataFrameChunkData DataFrameChunkData Concat DataFrameChunkData SeriesChunkData Compose Compose Concat DataFrameChunkData DataFrameChunkData DataFrameChunkData Fuse Sum SeriesChunkData Sum Sum DataFrameChunk Data DataFrameChunk Data DataFrameChunk Data DataFrameChunk Data Compose Compose Compose Compose Sum DataFrameChunkData DataFrameChunkData DataFrameChunkData DataFrameChunkData FromTe nsor FromTe nsor FromTe nsor FromTe nsor indexes: (0, 0) value: 8 TensorChunkData (0, 0) TensorChunkData (1, 0) TensorChunkData (0, 1) TensorChunkData (1, 1) Ones Ones Ones Ones
19. 准备 Graph：从细粒度图到可执行图 Scheduler Worker Processor SeriesChunkData it m b u S SeriesChunkData Compose Shared Storage Disk Compose DataFrameChunk Data DataFrameChunk Data Compose Compose Make Executable Fetch Fetch DataFrameChunk Data DataFrameChunk Data Compose Compose Sub mit Processor Shared Storage Disk
20. 调度策略：初始作业分配 在计算量一致的情况下，IO 开销是影响性能的重要因素 减少IO开销措施之一：合理进行初始作业分配可以显著减少数据复制 Worker 1 Worker 2