并行集合

配置并行集合

语言

任务支持

并行集合在操作调度方式上是模块化的。每个并行集合都使用任务支持对象进行参数化,该对象负责将任务调度和负载平衡到处理器。

任务支持对象在内部保留对线程池实现的引用,并决定如何以及何时将任务拆分为更小的任务。要详细了解它是如何完成的,请参阅技术报告 [1].

目前有几个任务支持实现可用于并行集合。 ForkJoinTaskSupport 在内部使用 fork-join 池,并在 JVM 1.6 或更高版本上默认使用。效率较低的 ThreadPoolTaskSupport 是 JVM 1.5 和不支持 fork-join 池的 JVM 的后备。 ExecutionContextTaskSupport 使用 scala.concurrent 中的默认执行上下文实现,并且它会重复使用 scala.concurrent 中使用的线程池(这可能是 fork-join 池或线程池执行器,具体取决于 JVM 版本)。默认情况下,执行上下文任务支持会设置到每个并行集合,因此并行集合会重复使用与 future API 相同的 fork-join 池。

以下是一种更改并行集合任务支持的方法

scala> import scala.collection.parallel._
import scala.collection.parallel._

scala> val pc = mutable.ParArray(1, 2, 3)
pc: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3)

scala> val forkJoinPool = new java.util.concurrent.ForkJoinPool(2)
forkJoinPool: java.util.concurrent.ForkJoinPool = java.util.concurrent.ForkJoinPool@6436e181[Running, parallelism = 2, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]

scala> pc.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
pc.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@4a5d484a

scala> pc map { _ + 1 }
res0: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)

上述设置将并行集合设置为使用并行级别为 2 的 fork-join 池。若要设置并行集合以使用线程池执行器

scala> pc.tasksupport = new ThreadPoolTaskSupport()
pc.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ThreadPoolTaskSupport@1d914a39

scala> pc map { _ + 1 }
res1: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)

scala> forkJoinPool.shutdown()

请注意,如果你正在创建自己的 ForkJoinPool 实例,则应在你不再需要线程池时调用 ForkJoinPool.shutdown()。如果你不调用 ForkJoinPool.shutdown() 并继续创建 ForkJoinPool 的新实例,则 JVM 最终可能会耗尽可用线程并抛出 java.lang.OutOfMemoryError

当并行集合被序列化时,任务支持字段将从序列化中省略。在反序列化并行集合时,任务支持字段将被设置为默认值 - 执行上下文任务支持。

若要实现自定义任务支持,请扩展 TaskSupport 特征并实现以下方法

def execute[R, Tp](task: Task[R, Tp]): () => R

def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R

def parallelismLevel: Int

execute 方法异步调度任务并返回一个 future 以等待计算结果。 executeAndWait 方法执行相同操作,但仅在任务完成后才返回。 parallelismLevel 仅返回任务支持用于调度任务的目标内核数。

参考

  1. 关于通用并行集合框架,2011 年 6 月

此页面的贡献者