# 1 Mars numpy与pandas的并行和分布式加速器 秦续业

#### CodeWarrior

2019/10/11 发布于 技术 分类

1. Mars：Numpy 与 Pandas 的并 行和分布式加速器 秦续业
2. 目录 CONTENTS 用30秒了解什么是 Mars 背景和动机 Mars 能做什么和如何做 性能和展望
3. 1 用30秒了解什么是 Mars • 从 Numpy 到 Mars tensor • 从 Pandas 到 Mars DataFrame • 从 scikit-learn 到 Mars learn
4. 从 Numpy 到 Mars tensor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import numpy as np from scipy.special import erf def black_scholes(P, S, T, rate, vol): a = np.log(P / S) b = T * -rate z = T * (vol * vol * 2) c = 0.25 * z y = 1.0 / np.sqrt(z) w1 = (a - b + c) * y w2 = (a - b - c) * y 运⾏时间：11.9 d1 = 0.5 +s 0.5 * erf(w1) d2 = 0.5 + 0.5 * erf(w2) 峰值内存：5479.47 Se = np.exp(b) * S call = P * d1 - Se * d2 put = call - P + Se return call, put N = 50000000 price = np.random.uniform(10.0, 50.0, N), strike = np.random.uniform(10.0, 50.0, N) t = np.random.uniform(1.0, 2.0, N) print(black_scholes(price, strike, t, 0.1, 0.2)) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import mars.tensor as mt from mars.tensor.special import erf def black_scholes(P, S, T, rate, vol): a = mt.log(P / S) b = T * -rate z = T * (vol * vol * 2) c = 0.25 * z y = 1.0 / mt.sqrt(z) w1 = (a - b + c) * y w2 = (a - b - c) * y 运⾏时间：5.48 d1 = 0.5 s+ 0.5 * erf(w1) d2 = 0.5 + 0.5 * erf(w2) 峰值内存：1647.85 Se = mt.exp(b) * S call = P * d1 - Se * d2 put = call - P + Se return call, put N = 50000000 price = mt.random.uniform(10.0, 50.0, N) strike = mt.random.uniform(10.0, 50.0, N) t = mt.random.uniform(1.0, 2.0, N) print(mt.ExecutableTuple(black_scholes(price, strike, t, 0.1, 0.2)).execute())
5. 从 Pandas 到 Mars DataFrame 1 2 3 4 5 6 7 import numpy as np import pandas as pd 运⾏时间：18.7 s df = pd.DataFrame(np.random.rand(100000000, 4), 峰值内存：3430.29 columns=list('abcd')) print(df.sum()) 1 2 3 4 5 6 7 import mars.tensor as mt import mars.dataframe as md 运⾏时间：5.25 s df = md.DataFrame(mt.random.rand(100000000, 4), 峰值内存：2007.92 columns=list('abcd')) print(df.sum().execute())
6. 从 Scikit-learn 到 Mars learn 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from sklearn.datasets.samples_generator import make_blobs from sklearn.decomposition.pca import PCA X, y = make_blobs(n_samples=100000000, n_features=3, 运⾏时间：19.1 centers=[[3,3, 3], [0,0,0], [ s 1,1,1], [2,2,2]], cluster_std=[0.2, 0.1, 0.2, 0. 峰值内存：7314.82 2], random_state=9) pca = PCA(n_components=3) pca.fit(X) print(pca.explained_variance_ratio_) print(pca.explained_variance_) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from sklearn.datasets.samples_generator import make_blobs from mars.learn.decomposition.pca import PCA X, y = make_blobs(n_samples=100000000, n_features=3, 运⾏时间：12.8 centers=[[3,3, 3], [0,0,0], [ s 1,1,1], [2,2,2]], cluster_std=[0.2, 0.1, 0.2, 0. 峰值内存：3814.32 2], random_state=9) pca = PCA(n_components=3) pca.fit(X) print(pca.explained_variance_ratio_.execute()) print(pca.explained_variance_.execute())
7. 2 背景和动机 • Python 语言越来越流行 • AI 很火热，机器学习的生命周期中，数据处理往往是瓶颈 • Numpy 与 Pandas 的重要性 • 习惯是生产力，无需学习成本 • 目前存在的问题
8. 单击此处添加标题 1
9. 机器学习的生命周期 特征⼯程/ 模型训练 { 新的数据 } Data 数据处理/ 模型部署/ 数据分析 维护/改进 往往要占⽤ 80% 的时间 训练的模型 { 预测 }
11. 日益增长的数据科学技术栈
12. Numpy • ndarray：多维数组 • • • • 对整组数据快速运算的快速数学函数（⽆需循环） 读写磁盘数据的⼯具和操作内存映射⽂件的⼯具 线性代数、随机数⽣成和傅⾥叶变换 Pandas、Scipy、Scikit-learn、Tensorflow 和 Pytorch 的基础 In [78]: %%time ...: from math import sqrt ...: b = np.empty((1000, 1000)) ...: for i in range(1000): ...: for j in range(1000): ...: acc = 0.0 ...: for k in range(10): ...: acc += (a[i, k] - a[j, k]) ** 2 ...: b[i, j] = sqrt(acc) ...: print(b) ...: [[0. 1.34092722 1.03167424 ... 1.28462032 1.10915735 1.30688143] [1.34092722 0. 1.54184418 ... 1.36835935 1.46841795 1.50831086] [1.03167424 1.54184418 0. ... 1.18083129 0.86086453 1.16513362] ... [1.28462032 1.36835935 1.18083129 ... 0. 0.93105133 1.23865215] [1.10915735 1.46841795 0.86086453 ... 0.93105133 0. 1.19356203] [1.30688143 1.50831086 1.16513362 ... 1.23865215 1.19356203 0. ]] CPU times:'>times: user 10 s, sys:'>sys: 29.5 ms, total:'>total: 10 s 快 >50 倍 Wall time:'>time: 10.1 s In [76]: a = np.random.rand(1000, 10) 1000x1x10 In [77]: %%time ...: b = a[:, np.newaxis, :] - a ...: print(np.sqrt((b ** 2).sum(-1))) ...: ...: [[0. 1.34092722 1.03167424 ... 1.28462032 1.10915735 1.30688143] [1.34092722 0. 1.54184418 ... 1.36835935 1.46841795 1.50831086] [1.03167424 1.54184418 0. ... 1.18083129 0.86086453 1.16513362] ... [1.28462032 1.36835935 1.18083129 ... 0. 0.93105133 1.23865215] [1.10915735 1.46841795 0.86086453 ... 0.93105133 0. 1.19356203] [1.30688143 1.50831086 1.16513362 ... 1.23865215 1.19356203 0. ]] CPU times:'>times: user 103 ms, sys:'>sys: 75.9 ms, total:'>total: 179 ms Wall time:'>time: 193 ms
13. In [1]: import numpy as np Numpy 是数据科学的基石和标准 • ndarray/tensor：cupy、tensorflow、 pytorch、jax…… • 定义了⼀套标准接⼝（Numpy-ish API） • NEP-18：__array_function__ 协议，所有实 现了这个协议的库都可以通过 Numpy 调⽤ In [3]: %%time ...: a = np.random.rand(5000, 10) ...: _ = ((a[:, np.newaxis, :] - a) ** 2).sum(axis=-1) ...: CPU times:'>times:'>times:'>times: user 2.97 s, sys:'>sys:'>sys:'>sys: 923 ms, total:'>total:'>total:'>total: 3.9 s Wall time:'>time:'>time:'>time: 2.77 s In [2]: import cupy as cp In [4]: %%time ...: a = cp.random.rand(5000, 10) ...: _ = ((a[:, cp.newaxis, :] - a) ** 2).sum(axis=-1) ...: CPU times:'>times:'>times:'>times: user 373 ms, sys:'>sys:'>sys:'>sys: 163 ms, total:'>total:'>total:'>total: 536 ms Wall time:'>time:'>time:'>time: 538 ms cupy，import 替换 \$ NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 ipython Python 3.7.3 (default, Mar 27 2019, 16:54:48) Type 'copyright', 'credits' or 'license' for more information IPython 7.4.0 -- An enhanced Interactive Python. Type '?' for help. In [1]: import numpy as np In [2]: import cupy as cp In [3]: %%time ...: a = cp.random.rand(50, 5) ...: np.linalg.svd(a) ...: ...: CPU times:'>times:'>times:'>times: user 416 ms, sys:'>sys:'>sys:'>sys: 249 ms, total:'>total:'>total:'>total: 665 ms Wall time:'>time:'>time:'>time: 670 ms cupy，直接通过 numpy 调⽤
14. Pandas • DataFrame：表格型数据 • Series：⼀组数据 • Index：索引列，DataFrame 和 Series 都包含索引列 • 强悍的数据分析、处理和可视化能⼒ • 基于 Numpy 实现 • 事实标准：GPU实现（Rapids cudf）…… In [6]: %%time ...: import pandas as pd ...: ratings = pd.read_csv('ml-20m/ratings.csv') ...: ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}) ...: ...: ...: CPU times:'>times: user 10.5 s, sys:'>sys: 1.58 s, total:'>total: 12.1 s Wall time:'>time: 18 s In [7]: %%time ...: import cudf ...: ratings = cudf.read_csv('ml-20m/ratings.csv') ...: ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}) ...: ...: ...: CPU times:'>times: user 1.2 s, sys:'>sys: 409 ms, total:'>total: 1.61 s Wall time:'>time: 1.66 s
15. Data Engineer Data Scientist
16. Pandas、Numpy、Scikit-learn 的接⼝已经被证明很好⽤、有⽤， 为什么需要重新学⼀套新的重新被发明的⼯具？
17. 3 Mars 能做什么和如何做 • Mars 是 numpy 和 pandas 的并行和分布式加速器 • Mars 自动并行和分布式化的原理
18. Mars tensor：并行和分布式化的 Numpy，实现了 70% 常见的 Numpy 接口 • • Tensor creation • Basic manipulations • Indexing • ones • astype • Slice • empty • transpose • Boolean indexing • zeros • broadcast_to • Fancy indexing • ones_like • newaxis • … Random sampling • • … Aggregation • sum • • Ellipsis Discrete Fourier transform • Linear Algebra • rand • nansum • QR • randint • max • SVD • beta • all • Cholesky • binomial • mean • inv • … • … • norm • …
19. 延迟执⾏ Eager mode In [13]: a = mt.random.rand(10000, 10000) In [20]: from mars.config import options In [14]: b = (a + 1) * 2 In [21]: options.eager_mode = True - 10 In [15]: b Out[15]: Tensor In [18]: c = b.sum() In [19]: c.execute() Out[19]: -699994492.9534674 CPU times:'>times: user 2.98 s, sys:'>sys: 547 ms, total:'>total: 3.52 s Wall time:'>time: 847 ms In [5] used 254.8555 MiB RAM in 0.95s, peaked 469.05 MiB above current, total RAM usage 362.24 MiB 打开 eager mode In [22]: a = mt.random.rand(10000, 10000) In [23]: b = (a + 1) * 2 - 10 In [29]: b Out[29]: Tensor In [25]: c = b.sum() In [26]: c Out[26]: Tensor CPU times:'>times: user 3.46 s, sys:'>sys: 1.95 s, total:'>total: 5.42 s Wall time:'>time: 1.28 s In [6] used 3056.3398 MiB RAM in 1.39s, peaked 0.00 MiB above current, total RAM usage 3164.35 MiB
20. Mars 和 NEP-18：紧跟标准 • Mars 实现了 __array_function__ 接⼝ • 配合 eager mode：甚⾄省去 import 替换 In [6]: np.sum(a) Out[6]: Tensor In [7]: np.inner(a, a) Out[7]: Tensor In [9]: np.linalg.qr(a[:4, None]) Out[9]: (Tensor , Tensor )
21. 保证和 Numpy 的行为兼容非常困难 视图 拷⻉ In [8]: a = mt.ones((5, 8), dtype=int) In [2]: a = mt.ones((5, 8), dtype=int) In [9]: b = a[:3, :2] In [3]: b = a[:3, :2].copy() In [10]: b[0, 0] = 2 In [4]: b[0, 0] = 2 In [11]: b.execute() Out[11]: array([[2, 1], [1, 1], [1, 1]]) In [5]: b.execute() Out[5]: array([[2, 1], [1, 1], [1, 1]]) In [12]: a.execute() Out[12]: array([[2, 1, 1, 1, 1, [1, 1, 1, 1, 1, [1, 1, 1, 1, 1, [1, 1, 1, 1, 1, [1, 1, 1, 1, 1, In [6]: a.execute() Out[6]: array([[1, 1, 1, 1, [1, 1, 1, 1, [1, 1, 1, 1, [1, 1, 1, 1, [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 1], 1], 1], 1]]) 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 1], 1], 1], 1]])
22. 单机并行 In [5]: import numpy as np In [8]: import mars.tensor as mt In [6]: a = np.random.rand(5000, 10) In [12]: ma = mt.array(a, chunk_size=2000)不填 Mars 会⾃动 In [7]: %%time ...: b = a[:, np.newaxis, :] - a ...: print(np.sqrt(b ** 2).sum(-1)) ...: ...: [[0. 2.82510037 4.46837739 ... 3.27907163 3.4719372 3.37709946] [2.82510037 0. 3.32650516 ... 2.53584057 3.22452394 2.45815415] [4.46837739 3.32650516 0. ... 3.08332206 2.3909553 3.10370547] ... [3.27907163 2.53584057 3.08332206 ... 0. 2.21482239 3.39591248] [3.4719372 3.22452394 2.3909553 ... 2.21482239 0. 2.76933364] [3.37709946 2.45815415 3.10370547 ... 3.39591248 2.76933364 0. ]] CPU times:'>times: user 3.8 s, sys:'>sys: 2.72 s, total:'>total: 6.52 s Wall time:'>time: 3.44 s 计算，chunk_size In [13]: %%time ...: b = ma[:, mt.newaxis, :] - ma 对性能影响较⼤ ...: print(mt.sqrt(b ** 2).sum(-1).execute()) ...: ...: 延迟执⾏，可以做更多优化 [[0. 2.82510037 4.46837739 ... 3.27907163 3.4719372 3.37709946] [2.82510037 0. 3.32650516 ... 2.53584057 3.22452394 2.45815415] [4.46837739 3.32650516 0. ... 3.08332206 2.3909553 3.10370547] ... [3.27907163 2.53584057 3.08332206 ... 0. 2.21482239 3.39591248] [3.4719372 3.22452394 2.3909553 ... 2.21482239 0. 2.76933364] [3.37709946 2.45815415 3.10370547 ... 3.39591248 2.76933364 0. ]] CPU times:'>times: user 6.92 s, sys:'>sys: 1.42 s, total:'>total: 8.34 s Wall time:'>time: 1.95 s ⾃动利⽤多核，提升~40% Import 替换
23. 分布式 节点数增加到 50 万 1 2 3 4 5 import mars.tensor as mt a = mt.random.rand(500000, 10, chunk_size=10000) b = a[:, mt.newaxis, :] - a mt.triu(mt.sqrt((b ** 2).sum(axis=-1))) • 最终计算结果需要 1.82T 的存储 • 5000 节点计算量的 1万倍，单机 Numpy 预计需要 5.42 个⼩时， 前提是有 18.2T 的内存 • Mars 在分布式环境 37 worker（8核32G）7分钟计算完成
24. 扩充 Numpy 的能力 稀疏 GPU 😁 很多真实数据都是以稀疏的⽅式存储 😁 使⽤GPU加速⼀些workload，可以是稀疏数据 😭 ⽬前只⽀持⼀维和⼆维稀疏张量 😭 分布式的⽀持正在进⾏中 1 import mars.tensor as mt 2 3 a = mt.eye(1000, sparse=True, gpu=True) 4 (a + 1).sum(axis=1) 1 import mars.tensor as mt 2 3 a = mt.random.rand(1000, 2000, gpu=True) 4 (a + 1).sum(axis=1)
25. Mutable tensor：可变张量读写 创建 mutable tensor a = mt.mutable_tensor('a', shape=(10000, 10000), chunk_size=2000, dtype=float, fill_value=0.5) 获取 mutable tensor a = mt.mutable_tensor('a') 向 mutable tensor 写数 可并⾏ a[100:300, 200:500] = np.random.rand(200, 300) 据 读取 mutable tensor 数 a[100:300, 200:500] 据 转成普通 tensor tensor = a.seal()
26. Mars learn：并行和分布式的 scikit-learn • Scikit-learn 有相当⼀部分算法⽤ Numpy 实现 • mars.learn.decomposition.PCA 和 mars.learn.decomposition.TruncatedSVD 从 scikt-learn 移植⾮常 简单，实质性修改 <1%，但因此得以利⽤ Mars tensor 的优势
27. Mars DataFrame：并行和分布式化的 Pandas • 实现的接⼝ • 创建 DataFrame：DataFrame、from_records • Basic arithmetic：基本算数运算 • Math：数学运算 • Indexing: 索引 • iloc • 列选择 • set_Index • Reduction：聚合 • Groupby (进⾏中)：分组聚合 • merge/join • Roadmap：https://github.com/mars-project/mars/issues/495
28. 保证和 Pandas 的行为兼容同样非常困难 索引⾃动对⻬ In [3]: import mars.dataframe as md In [4]: df = md.DataFrame({'a': [1, 2, 3], 'b': [3, 4, 5]}, index=[0, 1, 1]) In [10]: df.execute() Out[10]: a b 0 1 3 1 2 4 1 3 5 In [6]: df2 = md.DataFrame({'a': [2, 3, 4], 'c': [4, 5, 6]}, index=[1, 0, 1]) In [11]: df2.execute() Out[11]: a c 1 2 4 0 3 5 1 4 6 In [9]: (df + df2).execute() Out[9]: a b c 0 4 NaN NaN 1 4 NaN NaN 1 6 NaN NaN 1 5 NaN NaN 1 7 NaN NaN
29. 把 Mars tensor、mutable tensor、DataFrame 放到一起会发生什么？
30. ① 外部系统如图计算系 统产⽣数据，⽤张量形 式直接存储到 Mars ② 使⽤ Mars 读取外部 数据源如⼤数据存储， 和先前写到 Mars 的数据 之间进⾏运算 ③ 外部系统如 tensorflow，直接读取 Mars 数据进⾏训练 1 2 3 4 5 6 7 8 9 10 11 12 13 14 1 15 2 16 3 17 4 18 5 6 7 8 9 10 1 2 3 4 5 6 7 8 9 import mars.tensor as mt # perform computation # ... 数据⽣产⽅ if rank == 0: # create mutable tensor on master or first worker mtensor = mt.mutable_tensor(name='relation', shape=(1000, 1000), chunk_size=500, fill_value=1) else: mtensor = mt.mutable_tensor(name='relation') # write data into mutable tensor for each worker mtensor[:400, :300] = as array1 import mars.dataframe md mtensor[500:, 500:] = array2 from odps import ODPS if=rank == 0: o ODPS('access id', 'access key', 'project') tensor = mtensor.seal() df = md.DataFrame(tensor, columns=['id' + str(i) for i in range(1000)]) # read data from MaxCompute df2 = o.to_mars_dataframe('table_name', partition='ds=xxxxxxxx') # merge data df3 = df2.merge(df, left_on='id', right_on='id0') import tensorflow as tf 数据消费⽅ # fetch batch data data = df3.iloc[100:200].execute() a = tf.placeholder(tf.float64, shape=(100, 1000), name='a') b = tf.reduce_sum(a + 1, axis=1) sess = tf.Session() sess.run(b, feed_dict={a: data})
31. Mars 如何做到并行和分布式？ 让我们来看看 Mars 背后的设计哲学
32. 哲学1：分而治之 粗粒度计算图 data Series(s) In [1]: import mars.tensor as mt SeriesData Sum In [2]: import mars.dataframe as md data In [3]: a = mt.ones((10, 10), chunk_size=5) DataFrame(df) In [4]: a[5, 5] = 8 DataFrameD ata In [5]: df = md.DataFrame(a) FromTen sor In [6]: s = df.sum() In [7]: s.execute() Out[7]: 0 10.0 1 10.0 2 10.0 3 10.0 4 10.0 5 17.0 6 10.0 7 10.0 8 10.0 9 10.0 dtype: float64 TensorData IndexSet Value indexes: (5, 5) value: 8 Tensor(a) Tileable data TensorData TileableData Operand Ones
33. SeriesData 粗粒度计算图 SeriesChunk Data 细粒度计算图 DataFrameC hunkData Sum Sum Concat DataFrameD ata SeriesChunk Data Sum DataFrameC hunkData DataFrameC hunkData Concat Sum DataFrameC hunkData DataFrameC hunkData DataFrameC hunkData DataFrameC hunkData Sum Sum Sum FromTen sor DataFrameC hunkData DataFrameC hunkData DataFrameC hunkData TensorChunk Data FromTen sor FromTen sor FromTen sor IndexSet Value TensorData TensorChunk Data (0, 0) TensorChunk Data (1, 0) TensorChunk Data (0, 1) TensorChunk Data (1, 1) Ones Ones Ones Ones Ones FromTen sor Tile TensorData IndexSet Value indexes:'>indexes: (5, 5) value:'>value: 8 indexes:'>indexes: (0, 0) value:'>value: 8
34. SeriesChunk Data DataFrameC hunkData Sum Concat DataFrameC hunkData SeriesChunk Data Sum 细粒度计算图（算⼦融合前） DataFrameC hunkData DataFrameC hunkData SeriesChunk Data Concat DataFrameC hunkData 细粒度计算图（算⼦融合后） DataFrameC hunkData Sum SeriesChunk Data DataFrameC hunkData Sum Sum Sum FromTen sor DataFrameC hunkData DataFrameC hunkData DataFrameC hunkData TensorChunk Data indexes: (0, 0) value: 8 FromTen sor FromTen sor IndexSet Value TensorChunk Data (0, 0) TensorChunk Data (1, 0) TensorChunk Data (0, 1) TensorChunk Data (1, 1) Ones Ones Ones Ones DataFrameC hunkData Compo se Fuse FromTen sor Compo se Comp ose DataFrameC hunkData DataFrameC hunkData DataFrameC hunkData Comp ose Comp ose Comp ose
35. 哲学2：站在巨人的肩膀上 算⼦实现 Tensor DataFrame 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 In [13]: import numpy as np In [14]: a = np.random.rand(1000, 1000) Mars 做算⼦融合的时候⾃动⽣成 In [15]: %time ((a + 1) * 2 - 10).sum() CPU times:'>times:'>times:'>times: user 7.63 ms, sys:'>sys:'>sys:'>sys: 9.59 ms, total:'>total:'>total:'>total: 17.2 ms Wall time:'>time:'>time:'>time: 31.7 ms Out[15]: -7001585.663330172 CPU import numexpr GPU In [16]: as ne CPU 算⼦融合 In [17]: %time ne.evaluate('sum((a + 1 ) * 2 - 10)') CPU times:'>times:'>times:'>times: user 4.31 ms, sys:'>sys:'>sys:'>sys: 1.1 ms, total:'>total:'>total:'>total: 5.41 ms Wall time:'>time:'>time:'>time: 4.4 ms Numexpr Out[17]: Numpyarray(-7001585.66332956) Cupy Jax（进⾏中） In [18]: import jax In [20]: import jax.numpy as jnp Pandas Cudf 暂不⽀持 In [21]: @jax.jit ...: def _temp(x): ...: return jnp.sum((x + 1) * 2 - 10) ...: In [23]: %time _temp(a) CPU times:'>times:'>times:'>times: user 1.64 ms, sys:'>sys:'>sys:'>sys: 1.43 ms, total:'>total:'>total:'>total: 3.07 ms Wall time:'>time:'>time:'>time: 2.37 ms Out[23]: DeviceArray(-7001603., dtype=float32) GPU算⼦融合 Cupy 暂不⽀持
36. 客户端 服务端 SeriesData 1 import mars.tensor as mt 2 import mars.dataframe as md 3 from mars.session import 4 new_session 5 6 new_session('http://web:12345'). 7 as_default() 8 9 a = mt.random.rand((10, 10), 10 chunk_size=5) 11 df = md.DataFrame(a) 12 print(df.sum().execute()) SeriesData DataFrameC hunkData SU M Tile SU M DataFrameC hunkData FromT ensor FromT ensor TensorData TensorChunk Data RA ND RA ND Rest API SeriesChunk Data SU M DataFrameData 反序列化 DataFrameC hunkData DataFrameC hunkData Co ncat SU M SeriesChunk Data DataFrameC hunkData COMP OSE COMP OSE COMP OSE Fuse 分配到各 scheduler FromT ensor DataFrameC hunkData DataFrameC hunkData TensorChunk Data COMP OSE COMP OSE RA ND Data SeriesChunk Data COMP OSE DataFrameC hunkData DataFrameC hunkData FEtCH FETCH Schedulers Workers 序列化 Data Processes Data DataProcesses FromT ensor Shared memory Shared memory TensorData RA ND DataFrameC hunkData Data SU M DataFrameData DataFrameC hunkData Data Data Data … Data Data Data Disk/Cloud Storage Data … Data Data Disk/Cloud Storage Data
37. 客户端 1 import mars.tensor as mt 2 3 # perform computation 4 # ... 5 6 if rank == 0: 7 # create mutable tensor on 8 master or first worker 9 mtensor = mt.mutable_tensor( 10 name='relation', shape=( 11 1000, 1000), 12 chunk_size=500, 13 fill_value=1) 14 else: 15 mtensor = mt.mutable_tensor( 16 name='relation') 17 序列化 18 # write data into mutable tensor 19 for each worker 20 mtensor[:400, :300] = array1 21 mtensor[500:600, 400:700] = array2 22 mtensor[500:, 500:] = array3 23 24 if rank == 0: 25 tensor = mtensor.seal() 服务端 按chunk划分到各 worker • (0,0) -> worker0 • (0,1) -> worker1 • (1,0) -> worker0 • (1,1) -> worker1 创建 buffer Data <坐标, ts, value> （0,0) (0,1) <坐标, (1,0) Data ts, value> <坐标, (1,1) Data ts, value> 写⼊ Rest API Schedulers Workers <坐标, ts, value> 根据 <坐标, ts, value> 等计算 Processes fill_value Processes Data Shared memory Data Data … Data Shared memory Disk/Cloud Storage Data Data Data … Data Data Disk/Cloud Storage Data Data Data
38. 4 性能和展望 Mars 和 dask 的对比
39. 对⽐项 Mars Dask 😭 （Numpy、Pandas、Scikit- 😁 （Numpy、Pandas、Scikit- learn） learn、delayed） 社区 😭 😁 调度 😭 (Kubernetes⽀持较好) Mars 和 Dask 对比 API丰富程度 😁 (Kubernetes⽀持⼀般，另外 ⽀持 Yarn，超算） 运⾏时优化（算⼦融 合） 😁 😭 可变数据 😁 😭 多语⾔⽀持设计 😁 😭 容错 😁 （worker 级别容错） 😭 分布式 master 去单点 😁 😭 Worker stealing 😭 😁
40. Mars 和 Dask 性能对比（单机） • Macbook pro，2.2 GHz Intel Core i7，16G 内存 • Mars 0.3.0a2，dask 2.2.0 Dask Mars 160 143.64 运⾏时间（秒） 120 83.37 80 54.92 53.06 51.25 43.97 43.97 40 36.47 34.65 48.5 36.09 31.44 27.33 24.9 22.34 18.1 16.94 24.55 6.04 4.73 0 Black-Scholes Cholesky Dot FFT Inv LU Monte-Carlo-Pi QR RNG SVD
41. Mars 和 Dask 性能对比（分布式） • 1 个 scheduler，4 个 worker（24核80G） • 麻烦的是，我们进⾏ Benchmark 的过程中，Dask 有时会 crash Dask Mars 3000 2,361.37 运⾏时间（秒） 2250 1500 843.82 750 621.78 549.21 448.64 343.62 442.58 354.6 269.46 164.93 159.36 148.95 131.19 28.05 0 Black-Scholes 163.64 226.57 132.03 111.24 6.84 Cholesky Dot FFT Inv LU Monte-Carlo-Pi QR 6.19 RNG SVD
42. Mars 现状及方向 • pip install pymars • Mars 开源地址：https://github.com/mars-project/mars • Mars ⽂档：https://docs.mars-project.io/zh_CN/latest/ • 双版本发布 • ⽅向： • 社区是重点 • 技术： • Roadmap 和 Enhancement Proposal：https://github.com/marsproject/mars/issues/537 • 丰富 DataFrame、 learn 和 Tensor 的接⼝ • 更好的算⼦融合 • Mars actors 层优化，⽀持更⾼效的执⾏和⽹络效率 • ⽀持更多调度
43. THANK YOU 微博&知乎：@秦续业 ⼆维码