并行集合

并发 Trie

语言

如果在遍历过程中修改数据结构,大多数并发数据结构不保证一致的遍历。事实上,大多数可变集合也是如此。并发 Trie 的特殊之处在于,它们允许你修改正在遍历的 Trie 本身。修改仅在后续遍历中可见。这适用于顺序并发 Trie 及其并行对应项。两者之间的唯一区别在于,前者按顺序遍历其元素,而后者并行遍历其元素。

这是一个不错的特性,它允许你更轻松地编写一些算法。通常,这些算法以迭代方式处理元素数据集,其中不同的元素需要不同的迭代次数才能处理。

以下示例计算一组数字的平方根。每次迭代都会更新平方根值。平方根收敛的数字将从映射中删除。

case class Entry(num: Double) {
  var sqrt = num
}

val length = 50000

// prepare the list
val entries = (1 until length) map { num => Entry(num.toDouble) }
val results = ParTrieMap()
for (e <- entries) results += ((e.num, e))

// compute square roots
while (results.nonEmpty) {
  for ((num, e) <- results) {
    val nsqrt = 0.5 * (e.sqrt + e.num / e.sqrt)
    if (math.abs(nsqrt - e.sqrt) < 0.01) {
      results.remove(num)
    } else e.sqrt = nsqrt
  }
}

请注意,在上述巴比伦方法中,平方根计算 ([3]) 中,某些数字可能比其他数字收敛得快得多。因此,我们希望将它们从 results 中移除,以便仅遍历需要处理的那些元素。

另一个示例是广度优先搜索算法,它会不断扩展节点前沿,直到找到通往目标的路径或没有更多节点可扩展。我们将 2D 地图上的节点定义为 Int 的元组。我们将 map 定义为布尔值的 2D 数组,表示各个插槽是否被占用。然后,我们声明 2 个并发 Trie 映射 - 包含所有需要扩展的节点(节点前沿)的 open,以及包含所有已扩展节点的 closed。我们希望从地图的角落开始搜索并找到通往地图中心的路径 - 我们使用适当的节点初始化 open 映射。然后,我们并行不断扩展 open 映射中的所有节点,直到没有更多节点可扩展。每次扩展一个节点时,都会将其从 open 映射中移除并放入 closed 映射中。完成后,我们将输出从目标到初始节点的路径。

val length = 1000

// define the Node type
type Node = (Int, Int);
type Parent = (Int, Int);

// operations on the Node type
def up(n: Node) = (n._1, n._2 - 1);
def down(n: Node) = (n._1, n._2 + 1);
def left(n: Node) = (n._1 - 1, n._2);
def right(n: Node) = (n._1 + 1, n._2);

// create a map and a target
val target = (length / 2, length / 2);
val map = Array.tabulate(length, length)((x, y) => (x % 3) != 0 || (y % 3) != 0 || (x, y) == target)
def onMap(n: Node) = n._1 >= 0 && n._1 < length && n._2 >= 0 && n._2 < length

// open list - the nodefront
// closed list - nodes already processed
val open = ParTrieMap[Node, Parent]()
val closed = ParTrieMap[Node, Parent]()

// add a couple of starting positions
open((0, 0)) = null
open((length - 1, length - 1)) = null
open((0, length - 1)) = null
open((length - 1, 0)) = null

// greedy bfs path search
while (open.nonEmpty && !open.contains(target)) {
  for ((node, parent) <- open) {
    def expand(next: Node) {
      if (onMap(next) && map(next._1)(next._2) && !closed.contains(next) && !open.contains(next)) {
        open(next) = node
      }
    }
    expand(up(node))
    expand(down(node))
    expand(left(node))
    expand(right(node))
    closed(node) = parent
    open.remove(node)
  }
}

// print path
var pathnode = open(target)
while (closed.contains(pathnode)) {
  print(pathnode + "->")
  pathnode = closed(pathnode)
}
println()

GitHub 上有一个生命游戏示例,它使用 Ctries 有选择地仅模拟生命游戏自动机中当前处于活动状态的部分 [4]。它还包括生命游戏模拟的基于 Swing 的可视化,您可以在其中观察调整参数如何影响性能。

并发尝试还支持可线性化、无锁、恒定时间的 快照 操作。此操作创建一个新的并发尝试,其中包含特定时间点的所有元素,从而有效地捕获特定时间点的尝试状态。 快照 操作仅仅为并发尝试创建了一个新的根。后续更新会延迟重新构建与更新相关的并发尝试部分,并使并发尝试的其余部分保持不变。首先,这意味着快照操作本身并不昂贵,因为它不会复制元素。其次,由于写时复制优化仅复制并发尝试的部分,因此后续修改会横向扩展。 readOnlySnapshot 方法比 snapshot 方法效率稍高,但返回的只读映射无法修改。并发尝试还支持基于快照机制的可线性化、恒定时间的 清除 操作。要详细了解并发尝试和快照的工作原理,请参阅 [1] 和 [2].

并发尝试的迭代器基于快照。在创建迭代器对象之前,会对并发尝试进行快照,因此迭代器仅遍历创建快照时尝试中的元素。当然,迭代器使用只读快照。

size 操作也基于快照。直接实现时, size 调用只会创建一个迭代器(即快照)并遍历元素以对其进行计数。因此,每次调用 size 都需要与元素数量成线性的时间。但是,并发尝试经过优化,可以缓存其不同部分的大小,从而将 size 方法的复杂度降低到摊销对数时间。实际上,这意味着在调用 size 一次后,后续对 size 的调用将需要最少的工作量,通常仅为自上次 size 调用以来已修改的尝试分支重新计算大小。此外,并行并发尝试的大小计算是并行执行的。

参考

  1. Cache-Aware Lock-Free Concurrent Hash Tries
  2. Concurrent Tries with Efficient Non-Blocking Snapshots
  3. Methods of computing square roots
  4. Game of Life simulation

此页的贡献者