总决赛冠军队伍作死小分队答辩PPT

第三届阿里中间件性能大赛 总决赛冠军队伍 作死小分队 决赛答辩PPT

2. 团队介绍 “作死小分队” • 傅宇,Splunk研发中心,毕业于南京大学 • 吴迪,Splunk研发中心,毕业于北京大学 2
3. 议程 • 问题回顾 • 算法思路 • 架构设计 • 实现细节优化 • 总结 3
4. 问题回顾 4
5. 问题 • 输入一批单表的增量数据变更信息,进行重放计算 • 变更包含 Insert/Update/Delete 三种类型 • 主键可能发生变更(UpdatePK) • 输出指定范围的查询结果 5
6. 重要信息 • 16核CPU • 并行算法! • 必须顺序读文件 • One-Pass算法 • Server端输入,Client端输出 6
7. 算法思路 7
8. 如何并行? • 串行算法:单表、单线程 • 如何切分这个问题,且能并行计算? 8
9. 分桶 • 把数据表按 hash(PK) 切分成 N 个 Bucket 9
10. 分桶 • 把 Insert / Update / Delete 分发到相应 PK 所在的 桶即可 • 对同一条记录的修改,在队列中按顺序出现 10 INSERT id null 13 score null 90 UPDATE id 13 13 score 90 95 DELETE id 13 null score 95 null
11. 别忘了UpdatePK • 怎样处理UpdatePK? UPDATE id 11 13 14 score 90 95
12. 别忘了UpdatePK • 把更新后的 PK 转发到原来的桶 • 然而,Parser无法并行! UPDATE id 13 14 PK Bucket … … 14 3 … … 12 14 90 95 Bucket 3 relayMap DELETE id score ...
13. 别忘了UpdatePK • 先把数据“搬”到新的桶,再做其他的 Update UPDATE id 13 14 score 90 data Bucket 3 13 Bucket 4 95
14. Milestone 1 Bucket 14
15. 等等……! 说的容易,你搬一个试试啊! 15
16. Channel • 用一个 channel 连接 UpdatePKSrc 事件和 UpdatePKDst 事件 16
17. Channel • 要搬运的记录 data 通过 channel 从发送方传递到 接收方 • buf_size = 0 双方都要等待 • buf_size = 1 接收方等待 • 可用 CountDownLatch 实现 17
18. 当前队列 UPDATE id 18 13 14 score 90 95
19. 如果 Bucket 3 的动作略快一些…… 19
20. 如果 Bucket 3 的动作略快一些…… 20
21. 如果 Bucket 4 的动作略快一些…… 21
22. 如果 Bucket 4 的动作略快一些…… 22
23. 阻塞 • 用 Channel 的思路没错,但是会阻塞 • 事实上,这比你想的还要糟糕 23
24. 循环阻塞!退化到协程 UPDATE id 0 100 UPDATE id 1 101 UPDATE id 2 102 UPDATE id 3 103 …… hash(x) = x mod 3 24
25. 解决阻塞 • 先回忆一下 Future/Promise 25
26. Future/Promise in Clojure ;; an example to show a future that delivers promise user=> (def p (promise)) #'user/p ;; future that will deliver the promise from another thread after 10 sec delay user=> (future (Thread/sleep 10000) (deliver p 123)) #future[{:status :pending, :val nil} 0x9a51df1] ;; within 10 seconds dereference p, and wait for delivery of the value user=> @p 123 阻塞(等待)10s 26
27. 非阻塞方法 ;; Create a promise user=> (def p (promise)) #'user/p ; p is our promise ;; Check if was delivered/realized user=> (realized? p) false ; No yet ;; Delivering the promise user=> (deliver p 42) #<[email protected]: 42> ;; Check again if it was delivered user=> (realized? p) true ; Yes! ;; Deref to see what has been delivered user=> @p 27 42
28. 解决阻塞 • 如果把要“搬运”的数据看作一个 Promise • 如果接收时发现没有准备好,就延迟执行 final class Promise<T> { volatile T data; public boolean ready() { return data != null; } public T get() { return data; } public void set(T data) { this.data = data; } } 28
29. 延迟执行 • 如果UpdatePKDst没有拿到data,就延迟执行 • 一旦某个 UpdatePKDst 被延迟执行,所有后续对 该 PK 的操作也无法进行 • 用 HashMap + LinkedList 管理被延迟的 task 29
30. UPDATE id PK 13 14 score Task List … 14 Head UpdatePKDst … blockedTable 30 Task 1 90 95 Task 1
31. UPDATE id 13 14 score 90 95 Task 1 UPDATE id 14 14 score 95 98 Task 2 PK Task List … 14 Head … blockedTable 31 Task 1 Task 2 UpdatePKDst Update
32. UPDATE id 13 14 score 90 95 Task 1 UPDATE id 14 14 score 95 98 Task 2 DELETE id 14 NULL score 98 NULL Task 3 PK Task List … 14 Head … blockedTable 32 Task 1 Task 2 Task 3 UpdatePKDst Update Delete
33. 延迟执行 • 当 Promise.ready() 的时候,再执行 PK Task List Run! … 14 Head … blockedTable 33 Task 1 Task 2 Task 3 UpdatePKDst Update Delete
34. 现在的 Bucket 设计 34
35. 如何检查 Promise.ready() 遍历:效率太低! • 对每个发送方(其他各 bucket)维护一个队列 • 队列内一定是依次完成的 • 所以,只需要检查各队列头部 等待Bucket3 35 UpdatePKDst UpdatePKDst Promise Promise …
36. Milestone 2 Promise 36
37. 架构设计 37
38. 基本概念 • Segment:输入的一个16MB分片,作为一批 • Task:每个变更记录(I/U/D) • Reader:负责读取文件 • Parser:负责读取 task 类型和PK,生产 task • Worker:对应一个 bucket 38
39. 流水线设计 39
40. RingBuffer 相比 BlockingQueue • 速度更快,本身不需要产生新对象 • 静态分配 slot 内存,对象池化 • 简单,只用一个RingBuffer就够了 40
41. 第三方库 Disruptor https://gotocon.com/dl/goto-cph-2012/slides/java/Disruptor.pdf 41
42. RingBuffer 视角的流水线 parsed 42
43. Demo: Reader 前进一步 43
44. Demo: 某个 Parser 跟进一步 44
45. Demo: 各个 Worker 跟进一步 45
46. Milestone 3 RingBuffer 46
47. 最后:输出 • 各bucket 按 PK 顺序准备好需要输出的记录 • PK + 格式化的文本 • 在主线程merge各bucket的结果,同时进行输出 • 用大小为 K 的最小堆,merge K 个有序 stream PS. 后来用了更暴力的方法,这里就不说了! 47
48. 优势和创新点 • 正确性 • 无锁,无阻塞 • 可伸缩 • 流处理 48
49. 实现细节和优化 49
50. 数据存储 • 使用 long 数组来存储每行数据 • 对于字符串数据: • 直接读出bytes,StringStore类负责写入磁盘,并把 offset + length放进一个long型 • 如果bytes长度不大于7,可以直接压缩到一个long中 50
51. 原生类型数据结构 • Java 范型对于 primitive type 是很低效的 • 例如 Map<Long, Integer> • 浪费了大量内存 • 大量的 boxing/unboxing • 第三方库:FastUtil, Koloboke 51
52. 数组池 • 如果为每行数据都创建一个 long[],代价高 • 使用一个巨大的 long[],存放所有的行数据,用 offset 来查找某一行 52
53. 对象池 + 数组化 数组化之前,总计会产生上亿个 Task 对象(Promise 同理) • 提高创建速度 • 降低内存开销 • 保证顺序access final class Task { byte opcode; long key; int promise; int data; } // In class Segment opcodes = new byte[MAX_N]; offsets = new int[MAX_N]; keys = new long[MAX_N]; promises = new int[MAX_N]; 53
54. Pool 的懒初始化 • 对象池优化后,程序启动时间大大增加,拖慢了 main 函数的启动 • 对部分 Pool,第一次使用时才创建数组空间 54
55. 线程数调优 • 如何选取最合适的 Parser 和 Worker 线程数? • 监控 Reader, Parser 和 Worker 的进度,推测性能 瓶颈在哪一方 55
56. 重写网络传输和 Logging • Netty 和 logback 都比较重量级 • 自己实现了网络传输和 Logger,提高启动速度 • 网络传输使用 nio.SocketChannel + 双缓冲 56
57. 回顾 Bucket 按主键哈希把数据表分桶,各自处理一个队列 Promise 暂存还没拿到数据的UpdatePKDst Task 而不是阻塞等待 RingBuffer Reader — Parsers — Workers,批处理,预分配空间 57
58. Thanks! Q&A 58