任务支持
并行集合在操作调度方式上是模块化的。每个并行集合都使用任务支持对象进行参数化,该对象负责将任务调度和负载平衡到处理器。
任务支持对象在内部保留对线程池实现的引用,并决定如何以及何时将任务拆分为更小的任务。要详细了解它是如何完成的,请参阅技术报告 [1].
目前有几个任务支持实现可用于并行集合。 ForkJoinTaskSupport
在内部使用 fork-join 池,并在 JVM 1.6 或更高版本上默认使用。效率较低的 ThreadPoolTaskSupport
是 JVM 1.5 和不支持 fork-join 池的 JVM 的后备。 ExecutionContextTaskSupport
使用 scala.concurrent
中的默认执行上下文实现,并且它会重复使用 scala.concurrent
中使用的线程池(这可能是 fork-join 池或线程池执行器,具体取决于 JVM 版本)。默认情况下,执行上下文任务支持会设置到每个并行集合,因此并行集合会重复使用与 future API 相同的 fork-join 池。
以下是一种更改并行集合任务支持的方法
scala> import scala.collection.parallel._
import scala.collection.parallel._
scala> val pc = mutable.ParArray(1, 2, 3)
pc: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3)
scala> val forkJoinPool = new java.util.concurrent.ForkJoinPool(2)
forkJoinPool: java.util.concurrent.ForkJoinPool = java.util.concurrent.ForkJoinPool@6436e181[Running, parallelism = 2, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
scala> pc.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
pc.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@4a5d484a
scala> pc map { _ + 1 }
res0: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)
上述设置将并行集合设置为使用并行级别为 2 的 fork-join 池。若要设置并行集合以使用线程池执行器
scala> pc.tasksupport = new ThreadPoolTaskSupport()
pc.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ThreadPoolTaskSupport@1d914a39
scala> pc map { _ + 1 }
res1: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)
scala> forkJoinPool.shutdown()
请注意,如果你正在创建自己的 ForkJoinPool
实例,则应在你不再需要线程池时调用 ForkJoinPool.shutdown()
。如果你不调用 ForkJoinPool.shutdown()
并继续创建 ForkJoinPool 的新实例,则 JVM 最终可能会耗尽可用线程并抛出 java.lang.OutOfMemoryError
。
当并行集合被序列化时,任务支持字段将从序列化中省略。在反序列化并行集合时,任务支持字段将被设置为默认值 - 执行上下文任务支持。
若要实现自定义任务支持,请扩展 TaskSupport
特征并实现以下方法
def execute[R, Tp](task: Task[R, Tp]): () => R
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R
def parallelismLevel: Int
execute
方法异步调度任务并返回一个 future 以等待计算结果。 executeAndWait
方法执行相同操作,但仅在任务完成后才返回。 parallelismLevel
仅返回任务支持用于调度任务的目标内核数。