并行集合

概述

语言

Aleksandar Prokopec、Heather Miller

如果你正在使用 Scala 2.13+ 并希望使用 Scala 的并行集合,则必须导入单独的模块,如 此处 所述。

动机

近年来,处理器制造商已从单核架构转向多核架构,在此期间,学术界和工业界一致认为,流行的并行编程仍然是一项艰巨的挑战。

并行集合已包含在 Scala 标准库中,目的是通过免除用户低级并行化细节来促进并行编程,同时为他们提供一种熟悉且简单的顶级抽象。希望是,并且仍然是,集合抽象背后的隐式并行性将使主流开发人员的工作流程更接近可靠的并行执行。

这个想法很简单 - 集合是一种理解良好且经常使用的编程抽象。鉴于它们的规律性,它们能够以透明的方式有效地并行化。通过允许用户“交换”顺序集合以并行操作,Scala 的并行集合在使并行性更容易地引入更多代码方面迈出了一大步。

采用以下顺序示例,我们对某个大型集合执行单子操作

val list = (1 to 10000).toList
list.map(_ + 42)

要并行执行同一操作,只需在顺序集合 list 上调用 par 方法。之后,可以像通常使用顺序集合一样使用并行集合。可以通过简单执行以下操作并行化上述示例

list.par.map(_ + 42)

Scala 并行集合库的设计灵感源自 Scala(顺序)集合库(在 2.8 中引入),并与之深度集成。它为 Scala(顺序)集合库中的许多重要数据结构提供了并行对应项,包括

  • ParArray
  • ParVector
  • mutable.ParHashMap
  • mutable.ParHashSet
  • immutable.ParHashMap
  • immutable.ParHashSet
  • ParRange
  • ParTrieMapcollection.concurrent.TrieMap 在 2.10 中是新的)

除了通用架构外,Scala 并行集合库还与顺序集合库共享可扩展性。也就是说,与普通顺序集合一样,用户可以集成自己的集合类型,并自动继承标准库中其他并行集合上所有预定义的(并行)操作。

一些示例

为了尝试说明并行集合的通用性和实用性,我们提供了一些简单的示例用法,所有这些用法都以并行方式透明执行。

注意:以下一些示例在小型集合上运行,不推荐这样做。它们仅作为示例提供,仅供说明目的。作为一般启发式,当集合较大(通常为数千个元素)时,加速往往会很明显。(有关并行集合大小与性能之间关系的更多信息,请参阅本指南的性能部分的相应小节。)

map

使用并行 mapString 集合转换为全大写

scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)

scala> lastNames.map(_.toUpperCase)
res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(SMITH, JONES, FRANKENSTEIN, BACH, JACKSON, RODIN)

fold

ParArray 上通过 fold 求和

scala> val parArray = (1 to 10000).toArray.par
parArray: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3, ...

scala> parArray.fold(0)(_ + _)
res0: Int = 50005000

filter

使用并行filter来选择字母“I”之后按字母顺序排列的姓氏。

scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)

scala> lastNames.filter(_.head >= 'J')
res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Jackson, Rodin)

创建并行集合

并行集合的用途与顺序集合完全相同——唯一值得注意的区别是获取并行集合的方式。

通常,创建并行集合有两种选择

首先,使用new关键字和正确的导入语句

import scala.collection.parallel.immutable.ParVector
val pv = new ParVector[Int]

其次,通过转换顺序集合

val pv = Vector(1,2,3,4,5,6,7,8,9).par

这里重要的是对这些转换方法进行扩展——顺序集合可以通过调用顺序集合的par方法转换为并行集合,同样,并行集合可以通过调用并行集合的seq方法转换为顺序集合。

注意:本质上是顺序的集合(从某种意义上说,必须一个接一个地访问元素),如列表、队列和流,通过将元素复制到类似的并行集合中而转换为其并行对应项。一个示例是List——它被转换为标准不可变并行序列,即ParVector。当然,这些集合类型所需的复制会产生其他任何集合类型(如ArrayVectorHashMap等)不会产生的开销。

有关并行集合转换的更多信息,请参阅本指南的转换具体并行集合类部分。

语义

虽然并行集合抽象感觉与普通的顺序集合非常相似,但重要的是要注意其语义不同,尤其是在副作用和非关联操作方面。

为了了解这是如何发生的,首先,我们可视化如何在并行中执行操作。从概念上讲,Scala 的并行集合框架通过递归“拆分”给定集合,并行对集合的每个分区应用操作,然后重新“组合”并行完成的所有结果,从而并行化并行集合上的操作。

并行集合的这些并发和“无序”语义导致以下两个含义

  1. 产生副作用的操作可能导致不确定性
  2. 非关联操作导致不确定性

产生副作用的操作

鉴于并行集合框架的并发执行语义,通常应避免对集合执行导致副作用的操作,以保持确定性。一个简单的示例是使用访问器方法,如 foreach,以递增在传递给 foreach 的闭包外部声明的 var

scala> var sum = 0
sum: Int = 0

scala> val list = (1 to 1000).toList.par
list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,…

scala> list.foreach(sum += _); sum
res01: Int = 467766

scala> var sum = 0
sum: Int = 0

scala> list.foreach(sum += _); sum
res02: Int = 457073

scala> var sum = 0
sum: Int = 0

scala> list.foreach(sum += _); sum
res03: Int = 468520

这里,我们可以看到每次 sum 重新初始化为 0,并且再次对 list 调用 foreach 时,sum 都包含不同的值。这种非确定性的来源是数据竞争——对同一可变变量的并发读/写。

在上述示例中,两个线程有可能在 sum 中读取相同值,花费一些时间对 sum 的该值执行一些操作,然后尝试向 sum 写入新值,这可能会导致有价值结果的覆盖(因此丢失),如下所示

ThreadA: read value in sum, sum = 0                value in sum: 0
ThreadB: read value in sum, sum = 0                value in sum: 0
ThreadA: increment sum by 760, write sum = 760     value in sum: 760
ThreadB: increment sum by 12, write sum = 12       value in sum: 12

上述示例说明了一个场景,其中两个线程读取相同的值 0,然后其中一个或另一个线程才能将 0 与并行集合的其分区中的元素求和。在这种情况下,ThreadA 读取 0 并将其与其元素 0+760 求和,而在 ThreadB 的情况下,将 0 与其元素 0+12 求和。在计算各自的和之后,它们各自在 sum 中写入其计算值。由于 ThreadA 击败了 ThreadB,因此它首先写入,只是 sum 中的值很快被 ThreadB 覆盖,实际上完全覆盖(因此丢失)值 760

非结合操作

鉴于这种“乱序”语义,还必须小心仅执行结合操作,以避免非确定性。也就是说,给定一个并行集合 pcoll,在对 pcoll 调用高阶函数(如 pcoll.reduce(func))时,应确保 func 应用于 pcoll 元素的顺序可以是任意的。一个简单但显而易见的示例是一个非结合操作,例如减法

scala> val list = (1 to 1000).toList.par
list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,…

scala> list.reduce(_-_)
res01: Int = -228888

scala> list.reduce(_-_)
res02: Int = -61000

scala> list.reduce(_-_)
res03: Int = -331818

在上述示例中,我们采用一个 ParVector[Int],调用 reduce,并传递给它 _-_,它只是取两个未命名的元素,并从第二个元素中减去第一个元素。由于并行集合框架生成线程,这些线程实际上独立地在集合的不同部分执行 reduce(_-_),因此在同一集合上对 reduce(_-_) 运行两次的结果将不同。

注意:通常,人们认为,与非关联操作一样,传递给并行集合中高阶函数的非交换操作同样会导致非确定性行为。事实并非如此,一个简单的示例是字符串连接——一个关联但非交换的操作

scala> val strings = List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").par
strings: scala.collection.parallel.immutable.ParSeq[java.lang.String] = ParVector(abc, def, ghi, jk, lmnop, qrs, tuv, wx, yz)

scala> val alphabet = strings.reduce(_++_)
alphabet: java.lang.String = abcdefghijklmnopqrstuvwxyz

并行集合的“无序”语义仅表示操作将按无序方式执行(在时间意义上。即非顺序),并不意味着结果将按无序方式重新“组合”(在空间意义上)。相反,结果通常始终按顺序重新组合——即按该顺序分解为分区 A、B、C 的并行集合将按 A、B、C 的顺序重新组合。而不是像 B、C、A 这样的其他任意顺序。

有关并行集合如何拆分和组合不同并行集合类型上的操作的更多信息,请参阅本指南的架构部分。

此页面的贡献者