并行集合

创建自定义并行集合

语言

没有合并器的并行集合

就像可以定义自定义顺序集合而不定义它们的生成器一样,也可以定义并行集合而不定义它们的合并器。没有合并器的后果是,转换器方法(例如 mapflatMapcollectfilter,…)默认情况下会返回层次结构中最接近的标准集合类型。例如,范围没有生成器,因此映射范围的元素会创建一个向量。

在以下示例中,我们定义了一个并行字符串集合。由于字符串在逻辑上是不可变序列,因此我们让并行字符串继承 immutable.ParSeq[Char]

class ParString(val str: String)
extends immutable.ParSeq[Char] {

接下来,我们定义在每个不可变序列中找到的方法

  def apply(i: Int) = str.charAt(i)

  def length = str.length

我们还必须定义此并行集合的顺序对应项。在这种情况下,我们返回 WrappedString

  def seq = new collection.immutable.WrappedString(str)

最后,我们必须为我们的并行字符串集合定义一个分隔符。我们命名分隔符 ParStringSplitter,并让它继承一个序列分隔符,即 SeqSplitter[Char]

  def splitter = new ParStringSplitter(str, 0, str.length)

  class ParStringSplitter(private var s: String, private var i: Int, private val ntl: Int)
  extends SeqSplitter[Char] {

    final def hasNext = i < ntl

    final def next = {
      val r = s.charAt(i)
      i += 1
      r
    }

上面,ntl 表示字符串的总长度,i 是当前位置,s 是字符串本身。

除了顺序集合迭代器中的 nexthasNext 之外,并行集合迭代器或分隔器还需要更多方法。首先,它们有一个名为 remaining 的方法,该方法返回此分隔器尚未遍历的元素数。接下来,它们有一个名为 dup 的方法,该方法复制当前分隔器。

    def remaining = ntl - i

    def dup = new ParStringSplitter(s, i, ntl)

最后,方法 splitpsplit 用于创建分隔器,这些分隔器遍历当前分隔器的元素子集。方法 split 的契约是它返回一个分隔器序列,该序列遍历当前分隔器遍历的元素的不相交、不重叠的子集,并且这些子集中没有一个是空的。如果当前分隔器有 1 个或更少的元素,则 split 仅返回此分隔器的序列。方法 psplit 必须返回一个分隔器序列,该序列遍历与 sizes 参数指定的元素数量完全相同。如果 sizes 参数指定的元素少于当前分隔器,则在末尾追加一个包含其余元素的附加分隔器。如果 sizes 参数需要比当前分隔器中剩余的元素更多,它将为每个大小追加一个空分隔器。最后,调用 splitpsplit 会使当前分隔器无效。

   def split = {
      val rem = remaining
      if (rem >= 2) psplit(rem / 2, rem - rem / 2)
      else Seq(this)
    }

    def psplit(sizes: Int*): Seq[ParStringSplitter] = {
      val splitted = new ArrayBuffer[ParStringSplitter]
      for (sz <- sizes) {
        val next = (i + sz) min ntl
        splitted += new ParStringSplitter(s, i, next)
        i = next
      }
      if (remaining > 0) splitted += new ParStringSplitter(s, i, ntl)
      splitted
    }
  }
}

上面,split 是根据 psplit 实现的,这通常是并行序列的情况。实现并行映射、集合或可迭代对象的分割器通常更容易,因为它不需要 psplit

因此,我们获得了一个并行字符串类。唯一的缺点是调用转换器方法(如 filter)不会生成并行字符串,而是生成并行向量,这可能不是最优的 - 从过滤后的向量中再次生成字符串可能会很昂贵。

带合并器的并行集合

假设我们要 filter 并行字符串的字符,例如去除逗号。如上所述,调用 filter 会生成一个并行向量,而我们希望获得一个并行字符串(因为 API 中的某些接口可能需要一个顺序字符串)。

为避免这种情况,我们必须为并行字符串集合编写一个合并器。这次我们还将继承 ParSeqLike 特性,以确保 filter 的返回类型更具体 - 一个 ParString 而不是 ParSeq[Char]ParSeqLike 有一个第三个类型参数,它指定并行集合的顺序对应类型的类型(与只有两个类型参数的顺序 *Like 特性不同)。

class ParString(val str: String)
extends immutable.ParSeq[Char]
   with ParSeqLike[Char, ParString, collection.immutable.WrappedString]

所有方法都与之前保持相同,但我们添加了一个受保护的附加方法 newCombiner,它在内部由 filter 使用。

  protected[this] override def newCombiner: Combiner[Char, ParString] = new ParStringCombiner

接下来,我们定义 ParStringCombiner 类。合并器是构建器的子类型,它们引入了一个名为 combine 的附加方法,该方法将另一个合并器作为参数,并返回一个新合并器,其中包含当前合并器和参数合并器的元素。调用 combine 后,当前合并器和参数合并器将失效。如果参数与当前合并器是同一对象,则 combine 仅返回当前合并器。此方法预计是高效的,在最坏情况下,其运行时间与元素数量成对数关系,因为它在并行计算期间被多次调用。

我们的 ParStringCombiner 将在内部维护一系列字符串构建器。它将通过向序列中最后一个字符串构建器添加元素来实现 +=,并通过连接当前和参数组合器的字符串构建器列表来实现 combine。在并行计算结束时调用的 result 方法将通过将所有字符串构建器附加在一起来生成一个并行字符串。这样,元素仅在最后复制一次,而不是每次调用 combine 时都复制。理想情况下,我们希望并行化此过程并在并行中复制它们(这是为并行数组执行的操作),但如果不利用字符串的内部表示,这是我们所能做的最好的事情——我们必须忍受这个顺序瓶颈。

private class ParStringCombiner extends Combiner[Char, ParString] {
  var sz = 0
  val chunks = new ArrayBuffer[StringBuilder] += new StringBuilder
  var lastc = chunks.last

  def size: Int = sz

  def +=(elem: Char): this.type = {
    lastc += elem
    sz += 1
    this
  }

  def clear = {
    chunks.clear
    chunks += new StringBuilder
    lastc = chunks.last
    sz = 0
  }

  def result: ParString = {
    val rsb = new StringBuilder
    for (sb <- chunks) rsb.append(sb)
    new ParString(rsb.toString)
  }

  def combine[U <: Char, NewTo >: ParString](other: Combiner[U, NewTo]) = if (other eq this) this else {
    val that = other.asInstanceOf[ParStringCombiner]
    sz += that.sz
    chunks ++= that.chunks
    lastc = chunks.last
    this
  }
}

我如何一般实现我的组合器?

没有预定义的配方——这取决于手头的 data-structure,并且通常需要实施者发挥一些独创性。但是,通常会采用一些方法

  1. 连接和合并。一些 data-structure 具有这些操作的高效实现(通常是对数)。如果手头的集合是由这样的 data-structure 支持的,那么它的组合器可以是集合本身。手指树、绳索和各种堆特别适合这种方法。

  2. 两阶段评估。并行数组和并行哈希表中采用的一种方法,它假设元素可以有效地部分排序到可连接的存储桶中,从中可以并行构建最终的 data-structure。在第一阶段,不同的处理器独立填充这些存储桶并将存储桶连接在一起。在第二阶段,分配数据结构,不同的处理器使用来自不相交存储桶的元素并行填充数据结构的不同部分。必须注意,不同的处理器永远不会修改数据结构的同一部分,否则可能会出现细微的并发错误。正如我们在上一节中所示,这种方法很容易应用于随机访问序列。

  3. 并发数据结构。虽然后两种方法实际上不需要数据结构本身的任何同步原语,但它们假设可以并发地构造数据结构,以使两个不同的处理器永远不会修改相同的内存位置。存在大量并发数据结构,可以由多个处理器安全地修改——并发跳跃表、并发哈希表、拆分有序列表、并发 AVL 树,仅举几例。在这种情况下,一个重要的考虑因素是并发数据结构具有水平可扩展的插入方法。对于并发并行集合,组合器可以是集合本身,并且所有执行并行操作的处理器之间共享单个组合器实例。

与集合框架集成

我们的 ParString 类尚未完成。虽然我们已经实现了一个自定义组合器,该组合器将由 filterpartitiontakeWhilespan 等方法使用,但大多数转换器方法都需要一个隐式的 CanBuildFrom 证据(有关完整说明,请参阅 Scala 集合指南)。为了使其可用并完全将 ParString 与集合框架集成,我们必须混合一个名为 GenericParTemplate 的附加特征,并定义 ParString 的伴随对象。

class ParString(val str: String)
extends immutable.ParSeq[Char]
   with GenericParTemplate[Char, ParString]
   with ParSeqLike[Char, ParString, collection.immutable.WrappedString] {

  def companion = ParString

在伴随对象内部,我们为 CanBuildFrom 参数提供了一个隐式证据。

object ParString {
  implicit def canBuildFrom: CanCombineFrom[ParString, Char, ParString] =
    new CanCombinerFrom[ParString, Char, ParString] {
      def apply(from: ParString) = newCombiner
      def apply() = newCombiner
    }

  def newBuilder: Combiner[Char, ParString] = newCombiner

  def newCombiner: Combiner[Char, ParString] = new ParStringCombiner

  def apply(elems: Char*): ParString = {
	val cb = newCombiner
	cb ++= elems
	cb.result
  }
}

进一步自定义——并发和其他集合

实现并发集合(与并行集合不同,并发集合是可以并发修改的集合,如 collection.concurrent.TrieMap)并不总是简单明了的。组合器尤其经常需要大量思考。在迄今为止描述的大多数并行集合中,组合器使用两步评估。第一步,元素由不同的处理器添加到组合器,并且组合器合并在一起。第二步,在所有元素可用后,构建结果集合。

组合器的另一种方法是将结果集合构造为元素。这要求集合是线程安全的——组合器必须允许并发元素插入。在这种情况下,所有处理器共享一个组合器。

要并行化并发集合,其组合器必须覆盖方法 canBeShared 以返回 true。这将确保在调用并行操作时只创建一个组合器。接下来,+= 方法必须是线程安全的。最后,如果当前组合器和参数组合器相同,方法 combine 仍返回当前组合器,否则可以自由地抛出异常。

将分隔器划分为更小的分隔器以实现更好的负载平衡。默认情况下,remaining 方法返回的信息用于决定何时停止划分分隔器。对于某些集合,调用 remaining 方法可能代价高昂,应使用其他方法来决定何时划分分隔器。在这种情况下,应覆盖分隔器中的 shouldSplitFurther 方法。

如果剩余元素的数量大于集合大小除以并行级别乘以八,则默认实现将划分分隔器。

def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) =
    remaining > thresholdFromSize(coll.size, parallelismLevel)

同样,分隔器可以记录其被划分的次数,并通过在划分次数大于 3 + log(parallelismLevel) 时返回 true 来实现 shouldSplitFurther。这避免了调用 remaining

此外,如果对特定集合调用 remaining 不是一项廉价的操作(即需要计算集合中的元素数量),则应覆盖分隔器中的 isRemainingCheap 方法以返回 false

最后,如果分隔器中的 remaining 方法的实现非常繁琐,则可以覆盖其集合中的 isStrictSplitterCollection 方法以返回 false。此类集合将无法执行一些依赖于分隔器严格性的方法,即在 remaining 方法中返回正确的值。重要的是,这不会影响在 for-comprehensions 中使用的方法。

此页面的贡献者