作者:Philipp Haller、Aleksandar Prokopec、Heather Miller、Viktor Klang、Roland Kuhn 和 Vojin Jovanovic
简介
期货提供了一种推理方式,可以高效且非阻塞地并行执行许多操作。
一个 Future
是一个占位符对象,用于表示一个可能尚未存在的值。通常,Future 的值是并发提供的,并且随后可以使用。以这种方式组合并发任务往往会导致更快的异步非阻塞并行代码。
默认情况下,Future 和 Promise 是非阻塞的,利用回调而不是典型的阻塞操作。为了在语法和概念上简化回调的使用,Scala 提供了组合器,如 flatMap
、foreach
和 filter
,用于以非阻塞方式组合 Future。阻塞仍然可行 - 在绝对必要的情况下,可以阻塞 Future(尽管不建议这样做)。
典型的 Future 如下所示
val inverseFuture: Future[Matrix] = Future {
fatMatrix.inverse() // non-blocking long lasting computation
}(executionContext)
或使用更惯用的
implicit val ec: ExecutionContext = ...
val inverseFuture : Future[Matrix] = Future {
fatMatrix.inverse()
} // ec is implicitly passed
given ExecutionContext = ...
val inverseFuture : Future[Matrix] = Future {
fatMatrix.inverse()
} // execution context is implicitly passed
两个代码片段都将 fatMatrix.inverse()
的执行委托给 ExecutionContext
,并将计算结果体现到 inverseFuture
中。
执行上下文
Future 和 Promise 围绕 ExecutionContext
运行,负责执行计算。
ExecutionContext
类似于 Executor:它可以自由地在新的线程、线程池或当前线程中执行计算(尽管不建议在当前线程中执行计算 - 更多内容见下文)。
scala.concurrent
包开箱即用地提供了一个 ExecutionContext
实现,一个全局静态线程池。还可以将 Executor
转换为 ExecutionContext
。最后,用户可以自由地扩展 ExecutionContext
特征来实现自己的执行上下文,尽管只应在极少数情况下这样做。
全局执行上下文
ExecutionContext.global
是由 ForkJoinPool 支持的 ExecutionContext
。它应该足以满足大多数情况,但需要小心。一个 ForkJoinPool
管理有限数量的线程(最大线程数称为并行级别)。只有当每个阻塞调用都包装在 blocking
调用中时,并发阻塞计算的数量才能超过并行级别(更多内容见下文)。否则,全局执行上下文中线程池饥饿的风险,并且无法进行计算。
默认情况下,ExecutionContext.global
将其底层 fork-join 池的并行级别设置为可用处理器的数量 (Runtime.availableProcessors)。可以通过设置以下一个(或多个)VM 属性来覆盖此配置
- scala.concurrent.context.minThreads - 默认为
1
- scala.concurrent.context.numThreads - 可以是数字或形式为“xN”的乘数 (N);默认为
Runtime.availableProcessors
- scala.concurrent.context.maxThreads - 默认为
Runtime.availableProcessors
只要并行级别保持在 [minThreads; maxThreads]
内,它就会被设置为 numThreads
。
如上所述,ForkJoinPool
可以增加其 parallelismLevel
之外的线程数,以应对阻塞计算。如 ForkJoinPool
API 中所述,只有在明确通知池时才有可能
import scala.concurrent.{ Future, ExecutionContext }
import scala.concurrent.forkjoin._
// the following is equivalent to `implicit val ec = ExecutionContext.global`
import ExecutionContext.Implicits.global
Future {
ForkJoinPool.managedBlock(
new ManagedBlocker {
var done = false
def block(): Boolean = {
try {
myLock.lock()
// ...
} finally {
done = true
}
true
}
def isReleasable: Boolean = done
}
)
}
import scala.concurrent.{ Future, ExecutionContext }
import scala.concurrent.forkjoin.*
// the following is equivalent to `given ExecutionContext = ExecutionContext.global`
import ExecutionContext.Implicits.global
Future {
ForkJoinPool.managedBlock(
new ManagedBlocker {
var done = false
def block(): Boolean =
try
myLock.lock()
// ...
finally
done = true
true
def isReleasable: Boolean = done
}
)
}
幸运的是,concurrent 包提供了一种方便的方法来执行此操作
import scala.concurrent.Future
import scala.concurrent.blocking
Future {
blocking {
myLock.lock()
// ...
}
}
请注意,blocking
是一个通用构造,将在 下面更深入地讨论。
最后但并非最不重要的一点,您必须记住,ForkJoinPool
不是为长期阻塞操作设计的。即使在使用 blocking
通知后,池也可能不会像您预期的那样生成新的工作线程,并且当创建新的工作线程时,它们可能多达 32767 个。为了让您了解一下,以下代码将使用 32000 个线程
implicit val ec = ExecutionContext.global
for (i <- 1 to 32000) {
Future {
blocking {
Thread.sleep(999999)
}
}
}
given ExecutionContext = ExecutionContext.global
for i <- 1 to 32000 do
Future {
blocking {
Thread.sleep(999999)
}
}
如果您需要包装长期阻塞操作,我们建议使用专用的 ExecutionContext
,例如通过包装 Java Executor
。
改编 Java Executor
使用 ExecutionContext.fromExecutor
方法,您可以将 Java Executor
包装到 ExecutionContext
中。例如
ExecutionContext.fromExecutor(new ThreadPoolExecutor( /* your configuration */ ))
ExecutionContext.fromExecutor(ThreadPoolExecutor( /* your configuration */ ))
同步执行上下文
人们可能很想拥有一个在当前线程内运行计算的 ExecutionContext
val currentThreadExecutionContext = ExecutionContext.fromExecutor(
new Executor {
// Do not do this!
def execute(runnable: Runnable) = runnable.run()
})
应避免这样做,因为它会在您的 future 的执行中引入不确定性。
Future {
doSomething
}(ExecutionContext.global).map {
doSomethingElse
}(currentThreadExecutionContext)
doSomethingElse
调用可能在 doSomething
的线程或主线程中执行,因此可能是异步的或同步的。如 此处 所述,回调不应同时具有这两种特性。
期货
一个 Future
是一个持有值的对象,该值可能在某个时刻可用。该值通常是其他一些计算的结果
- 如果计算尚未完成,我们说
Future
未完成。 - 如果计算已完成,并带有值或异常,我们说
Future
已完成。
完成可以采取两种形式之一
- 当
Future
使用值完成时,我们说 future 已使用该值成功完成。 - 当
Future
使用计算抛出的异常完成时,我们说Future
已使用该异常失败。
一个 Future
具有一个重要的属性,即它只能分配一次。一旦 Future
对象获得值或异常,它实际上就变得不可变——它永远不能被覆盖。
创建 future 对象的最简单方法是调用 Future.apply
方法,该方法启动异步计算并返回持有该计算结果的 future。一旦 future 完成,结果就会可用。
请注意,Future[T]
是表示 future 对象的类型,而 Future.apply
是创建和调度异步计算的方法,然后返回一个将使用该计算结果完成的 future 对象。
通过一个示例最好地展示这一点。
让我们假设我们想使用某些流行社交网络的假设 API 来获取给定用户的关注者列表。我们将打开一个新会话,然后发送请求以获取特定用户的关注者列表
import scala.concurrent._
import ExecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
session.getFriends()
}
import scala.concurrent.*
import ExecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
session.getFriends()
}
在上面,我们首先导入 scala.concurrent
包的内容,以使类型 Future
可见。我们将很快解释第二个导入。
然后,我们使用假设的 createSessionFor
方法初始化会话变量,我们将使用该变量向服务器发送请求。要获取用户的关注者列表,必须通过网络发送请求,这可能需要很长时间。这通过调用返回 List[Friend]
的 getFriends
方法来说明。为了在响应到达之前更好地利用 CPU,我们不应该阻塞程序的其余部分——此计算应异步调度。Future.apply
方法的作用完全如此——它并发执行指定的计算块,在本例中向服务器发送请求并等待响应。
服务器响应后,好友列表将可用于未来的 f
。
尝试失败可能会导致异常。在以下示例中,session
值初始化不正确,因此 Future
块中的计算将抛出 NullPointerException
。然后,此 future f
将因该异常而失败,而不是成功完成
val session = null
val f: Future[List[Friend]] = Future {
session.getFriends()
}
上面的行 import ExecutionContext.Implicits.global
导入了默认全局执行上下文。执行上下文执行提交给它们的 task,您可以将执行上下文视为线程池。它们对于 Future.apply
方法至关重要,因为它们处理异步计算的执行方式和时间。您可以定义自己的执行上下文并将其与 Future
一起使用,但现在只需知道您可以按上述方式导入默认执行上下文就足够了。
我们的示例基于一个假设的社交网络 API,其中计算包括发送网络请求并等待响应。提供一个开箱即用的异步计算示例是公平的。假设您有一个文本文件,并且您想找到特定关键字的第一次出现的的位置。此计算可能涉及在从磁盘检索文件内容时进行阻塞,因此有必要与计算的其他部分同时执行它。
val firstOccurrence: Future[Int] = Future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
回调
我们现在知道如何启动异步计算以创建新的 future 值,但我们尚未展示如何使用结果(一旦结果可用),以便我们可以用它做一些有用的事情。我们通常对计算结果感兴趣,而不仅仅是对它的副作用感兴趣。
在许多 future 实现中,一旦 future 的客户端对它的结果感兴趣,它就必须阻塞自己的计算并等待 future 完成 - 只有这样才能使用 future 的值来继续自己的计算。虽然 Scala Future
API 允许这样做,如我们稍后将展示的那样,但从性能的角度来看,更好的方法是以完全非阻塞的方式进行,即在 future 上注册回调。一旦 future 完成,此回调将异步调用。如果在注册回调时 future 已经完成,则回调可能会异步执行,或在同一线程上按顺序执行。
注册回调的最一般形式是使用 onComplete
方法,该方法采用类型为 Try[T] => U
的回调函数。如果 future 成功完成,则回调将应用于类型为 Success[T]
的值,否则应用于类型为 Failure[T]
的值。
Try[T]
类似于 Option[T]
或 Either[T, S]
,因为它是一个可能保存某种类型值的单子。但是,它经过专门设计,可以保存一个值或某个可抛出对象。Option[T]
可以是一个值(即 Some[T]
)或根本没有值(即 None
),而 Try[T]
在保存一个值时为 Success[T]
,否则为 Failure[T]
,它保存一个异常。Failure[T]
保存的信息比普通的 None
更丰富,它说明了为什么没有值。同时,你可以将 Try[T]
视为 Either[Throwable, T]
的一个特殊版本,专门用于左值是 Throwable
的情况。
回到我们的社交网络示例,假设我们想要获取我们自己的近期帖子列表并将其呈现到屏幕上。我们通过调用方法 getRecentPosts
来执行此操作,该方法返回 List[String]
- 近期文本帖子的列表
import scala.util.{Success, Failure}
val f: Future[List[String]] = Future {
session.getRecentPosts()
}
f.onComplete {
case Success(posts) => for (post <- posts) println(post)
case Failure(t) => println("An error has occurred: " + t.getMessage)
}
import scala.util.{Success, Failure}
val f: Future[List[String]] = Future {
session.getRecentPosts()
}
f.onComplete {
case Success(posts) => for post <- posts do println(post)
case Failure(t) => println("An error has occurred: " + t.getMessage)
}
onComplete
方法很通用,因为它允许客户端处理失败和成功的未来计算结果。如果只需要处理成功的结果,可以使用 foreach
回调
val f: Future[List[String]] = Future {
session.getRecentPosts()
}
for {
posts <- f
post <- posts
} println(post)
val f: Future[List[String]] = Future {
session.getRecentPosts()
}
for
posts <- f
post <- posts
do println(post)
Future
提供了一种干净的方式,可以使用 failed
投影来仅处理失败的结果,该投影将 Failure[Throwable]
转换为 Success[Throwable]
。下面关于 投影 的部分提供了执行此操作的示例。
回到之前搜索关键字首次出现的示例,你可能希望将关键字的位置打印到屏幕上
val firstOccurrence: Future[Int] = Future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurrence.onComplete {
case Success(idx) => println("The keyword first appears at position: " + idx)
case Failure(t) => println("Could not process file: " + t.getMessage)
}
onComplete
和 foreach
方法的结果类型都是 Unit
,这意味着无法链接对这些方法的调用。请注意,此设计是有意的,以避免暗示链接的调用可能意味着对已注册回调的执行顺序(在同一 future 上注册的回调是无序的)。
也就是说,我们现在应该评论一下回调确切的调用时间。由于它需要 future 中的值可用,因此只有在 future 完成后才能调用它。但是,无法保证它将由完成 future 的线程或创建回调的线程调用。相反,回调由某个线程在 future 对象完成后的某个时间执行。我们说回调最终执行。
此外,回调执行的顺序是未预定义的,即使是在同一应用程序的不同运行之间。事实上,回调可能不会按顺序一个接一个地调用,而可能同时并发执行。这意味着在以下示例中,变量 totalA
可能未被设置为从计算文本中获取的小写和大小写 a
字符的正确数量。
@volatile var totalA = 0
val text = Future {
"na" * 16 + "BATMAN!!!"
}
text.foreach { txt =>
totalA += txt.count(_ == 'a')
}
text.foreach { txt =>
totalA += txt.count(_ == 'A')
}
在上面,两个回调可能一个接一个地执行,在这种情况下,变量 totalA
具有预期值 18
。但是,它们也可能并发执行,因此 totalA
可能最终为 16
或 2
,因为 +=
不是原子操作(即它包含一个读取和一个写入步骤,可以与其他读取和写入任意交错)。
为了完整起见,此处列出了回调的语义
-
在将来注册一个
onComplete
回调可确保在将来最终完成后调用相应的闭包。 -
注册一个
foreach
回调具有与onComplete
相同的语义,不同之处在于仅在将来成功完成后才调用闭包。 -
在已经完成的将来注册一个回调将最终导致执行回调(如 1 所暗示的)。
-
如果在将来注册了多个回调,则它们的执行顺序未定义。事实上,回调可能彼此并发执行。但是,特定的
ExecutionContext
实现可能导致一个明确的顺序。 -
如果某些回调抛出异常,则无论如何都会执行其他回调。
-
如果某些回调永远不会完成(例如回调包含一个无限循环),则其他回调可能根本不会被执行。在这些情况下,潜在的阻塞回调必须使用
blocking
构造(见下文)。 -
一旦执行,回调将从 future 对象中移除,从而有资格进行 GC。
函数式组合和 For-Comprehensions
我们展示的回调机制足以将 future 结果与后续计算链接起来。但是,它有时不方便,并且会导致代码庞大。我们通过一个示例来说明这一点。假设我们有一个用于与货币交易服务进行交互的 API。假设我们想购买美元,但仅在有利可图时才购买。我们首先展示如何使用回调来实现这一点
val rateQuote = Future {
connection.getCurrentValue(USD)
}
for (quote <- rateQuote) {
val purchase = Future {
if (isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
for (amount <- purchase)
println("Purchased " + amount + " USD")
}
val rateQuote = Future {
connection.getCurrentValue(USD)
}
for quote <- rateQuote do
val purchase = Future {
if isProfitable(quote) then connection.buy(amount, quote)
else throw Exception("not profitable")
}
for amount <- purchase do
println("Purchased " + amount + " USD")
我们首先创建一个 future rateQuote
,它获取当前汇率。从服务器获取此值并成功完成 future 后,计算将在 foreach
回调中进行,我们准备决定是否购买。因此,我们创建另一个 future purchase
,它仅在有利可图时做出购买决定,然后发送请求。最后,一旦完成购买,我们将打印一条通知消息到标准输出。
这是可行的,但由于两个原因而不方便。首先,我们必须使用 foreach
并将第二个 purchase
future 嵌套在其中。想象一下,在 purchase
完成后,我们想出售其他一些货币。我们必须在 foreach
回调中重复此模式,这会使代码过度缩进、庞大且难以理解。
其次,purchase
future 不在与代码其余部分相同的范围内——它只能在 foreach
回调中执行。这意味着应用程序的其他部分看不到 purchase
future,并且无法向其注册另一个 foreach
回调,例如出售其他一些货币。
由于这两个原因,futures 提供了允许更直接组合的组合器。其中一个基本组合器是 map
,它给定一个 future 和一个 future 值的映射函数,产生一个新的 future,该 future 在原始 future 成功完成后使用映射值完成。你可以像推理映射集合一样推理映射 futures。
让我们使用 map
组合器重写前面的示例
val rateQuote = Future {
connection.getCurrentValue(USD)
}
val purchase = rateQuote.map { quote =>
if (isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
purchase.foreach { amount =>
println("Purchased " + amount + " USD")
}
val rateQuote = Future {
connection.getCurrentValue(USD)
}
val purchase = rateQuote.map { quote =>
if isProfitable(quote) then connection.buy(amount, quote)
else throw Exception("not profitable")
}
purchase.foreach { amount =>
println("Purchased " + amount + " USD")
}
通过对 rateQuote
使用 map
,我们消除了一个 foreach
回调,更重要的是,嵌套。如果我们现在决定出售其他一些货币,则只需再次对 purchase
使用 map
即可。
但是,如果 isProfitable
返回 false
,从而导致抛出异常,会发生什么情况?在这种情况下,purchase
会因该异常而失败。此外,假设连接已断开,并且 getCurrentValue
抛出异常,导致 rateQuote
失败。在这种情况下,我们将没有要映射的值,因此 purchase
将自动因与 rateQuote
相同的异常而失败。
总之,如果原始 future 成功完成,则返回的 future 将使用原始 future 中映射的值完成。如果映射函数抛出异常,则 future 将使用该异常完成。如果原始 future 因异常而失败,则返回的 future 也将包含相同的异常。这种异常传播语义也存在于其他组合器中。
future 的设计目标之一是使其能够用于 for-comprehension。出于此原因,future 还具有 flatMap
和 withFilter
组合器。flatMap
方法采用一个将值映射到新 future g
的函数,然后返回一个在 g
完成后完成的 future。
假设我们要将美元兑换为瑞士法郎 (CHF)。我们必须获取这两种货币的报价,然后根据这两个报价决定是否购买。以下是在 for-comprehension 中使用 flatMap
和 withFilter
的示例
val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }
val purchase = for {
usd <- usdQuote
chf <- chfQuote
if isProfitable(usd, chf)
} yield connection.buy(amount, chf)
purchase foreach { amount =>
println("Purchased " + amount + " CHF")
}
val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }
val purchase = for
usd <- usdQuote
chf <- chfQuote
if isProfitable(usd, chf)
yield connection.buy(amount, chf)
purchase.foreach { amount =>
println("Purchased " + amount + " CHF")
}
purchase
future 仅在 usdQuote
和 chfQuote
都完成后才完成 - 它取决于这两个 future 的值,因此它自己的计算不能更早开始。
上面的 for-comprehension 翻译为
val purchase = usdQuote.flatMap {
usd =>
chfQuote
.withFilter(chf => isProfitable(usd, chf))
.map(chf => connection.buy(amount, chf))
}
这比 for-comprehension 稍微难理解一些,但我们对其进行分析以更好地理解 flatMap
操作。flatMap
操作将它自己的值映射到其他 future 中。一旦这个不同的 future 完成,结果 future 将使用其值完成。在我们的示例中,flatMap
使用 usdQuote
future 的值将 chfQuote
的值映射到第三个 future 中,该 future 发送请求购买一定数量的瑞士法郎。结果 future purchase
仅在从 map
返回的第三个 future 完成后才完成。
这可能会令人费解,但幸运的是,flatMap
操作很少在 for-comprehension 之外使用,后者更易于使用和理解。
filter
组合器创建一个新 future,其中仅当原始 future 满足某个谓词时才包含原始 future 的值。否则,新 future 将因 NoSuchElementException
而失败。对于 future,调用 filter
的效果与调用 withFilter
完全相同。
collect
和 filter
组合器之间的关系类似于集合 API 中这些方法的关系。
由于 Future
特征在概念上可以包含两种类型的值(计算结果和异常),因此需要处理异常的组合器。
假设根据 rateQuote
我们决定购买一定数量。connection.buy
方法采用要购买的 amount
和预期的 quote
。它返回购买的数量。如果在此期间 quote
发生变化,它将抛出 QuoteChangedException
并且不会购买任何东西。如果我们希望 future 包含 0
而不是异常,则使用 recover
组合器
val purchase: Future[Int] = rateQuote.map {
quote => connection.buy(amount, quote)
}.recover {
case QuoteChangedException() => 0
}
recover
组合器创建一个新 future,如果原始 future 成功完成,则该 future 将保存与原始 future 相同的结果。如果没有,则将部分函数参数应用于导致原始 future 失败的 Throwable
。如果它将 Throwable
映射到某个值,则新 future 将成功完成并带有该值。如果部分函数未在该 Throwable
上定义,则结果 future 将因相同的 Throwable
而失败。
recoverWith
组合器创建一个新 future,如果原始 future 成功完成,则该 future 将保存与原始 future 相同的结果。否则,将部分函数应用于导致原始 future 失败的 Throwable
。如果它将 Throwable
映射到某个 future,则此 future 将使用该 future 的结果完成。它与 recover
的关系类似于 flatMap
与 map
的关系。
组合器 fallbackTo
创建一个新 future,如果此 future 成功完成,则该 future 保存此 future 的结果,否则保存参数 future 的成功结果。如果此 future 和参数 future 都失败,则新 future 将使用此 future 中的异常完成,如下例所示,该示例尝试打印美元值,但如果无法获取美元值,则打印瑞士法郎值
val usdQuote = Future {
connection.getCurrentValue(USD)
}.map {
usd => "Value: " + usd + "$"
}
val chfQuote = Future {
connection.getCurrentValue(CHF)
}.map {
chf => "Value: " + chf + "CHF"
}
val anyQuote = usdQuote.fallbackTo(chfQuote)
anyQuote.foreach { println(_) }
andThen
组合器纯粹用于产生副作用。它返回一个新 future,其结果与当前 future 完全相同,无论当前 future 是否失败。一旦当前 future 使用结果完成,则调用与 andThen
相对应的闭包,然后使用与当前 future 相同的结果完成新 future。这确保了多个 andThen
调用是有序的,如下例所示,该示例将社交网络中的最新帖子存储到可变集中,然后将所有帖子呈现到屏幕上
val allPosts = mutable.Set[String]()
Future {
session.getRecentPosts()
}.andThen {
case Success(posts) => allPosts ++= posts
}.andThen {
case _ =>
clearAll()
for (post <- allPosts) render(post)
}
val allPosts = mutable.Set[String]()
Future {
session.getRecentPosts()
}.andThen {
case Success(posts) => allPosts ++= posts
}.andThen {
case _ =>
clearAll()
for post <- allPosts do render(post)
}
总之,期货上的组合器是纯函数式的。每个组合器返回一个新期货,它与从中派生的期货相关。
投影
为了对作为异常返回的结果启用 for-comprehension,期货还具有投影。如果原始期货失败,failed
投影会返回一个包含类型为 Throwable
的值的期货。如果原始期货成功,failed
投影会因 NoSuchElementException
而失败。以下是一个将异常打印到屏幕的示例
val f = Future {
2 / 0
}
for (exc <- f.failed) println(exc)
val f = Future {
2 / 0
}
for exc <- f.failed do println(exc)
此示例中的 for-comprehension 转换为
f.failed.foreach(exc => println(exc))
因为此处 f
不成功,所以闭包已注册到新成功的 Future[Throwable]
上的 foreach
回调。以下示例不会在屏幕上打印任何内容
val g = Future {
4 / 2
}
for (exc <- g.failed) println(exc)
val g = Future {
4 / 2
}
for exc <- g.failed do println(exc)
扩展期货
计划支持使用其他实用方法扩展 Futures API。这将允许外部框架提供更专业的实用程序。
阻塞
期货通常是异步的,并且不会阻塞基础执行线程。但是,在某些情况下,有必要进行阻塞。我们区分两种形式的执行线程阻塞:从期货内部调用阻塞线程的任意代码,以及从另一个期货外部阻塞,等待该期货完成。
在期货内部阻塞
正如在全局 ExecutionContext
中看到的那样,可以使用 blocking
构造通知 ExecutionContext
阻塞调用。但是,实现完全由 ExecutionContext
自行决定。虽然一些 ExecutionContext
(例如 ExecutionContext.global
)通过 ManagedBlocker
实现 blocking
,但一些执行上下文(例如固定线程池)
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(x))
将什么都不做,如下所示
implicit val ec =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
Future {
blocking { blockingStuff() }
}
given ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(4))
Future {
blocking { blockingStuff() }
}
与以下效果相同
Future { blockingStuff() }
阻塞代码也可能抛出异常。在这种情况下,异常将转发给调用者。
在期货外部阻塞
如前所述,强烈建议不要阻塞期货,以提高性能并防止死锁。期货上的回调和组合器是使用其结果的首选方法。但是,在某些情况下可能需要进行阻塞,并且 Futures and Promises API 支持这种操作。
在上面的货币交易示例中,一个阻塞位置是在应用程序的末尾,以确保所有期货都已完成。以下是如何阻塞期货结果的示例
import scala.concurrent._
import scala.concurrent.duration._
object awaitPurchase {
def main(args: Array[String]): Unit = {
val rateQuote = Future {
connection.getCurrentValue(USD)
}
val purchase = rateQuote.map { quote =>
if (isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
Await.result(purchase, 0.nanos)
}
}
import scala.concurrent.*
import scala.concurrent.duration.*
@main def awaitPurchase =
val rateQuote = Future {
connection.getCurrentValue(USD)
}
val purchase = rateQuote.map { quote =>
if isProfitable(quote) then connection.buy(amount, quote)
else throw Exception("not profitable")
}
Await.result(purchase, 0.nanos)
如果 future 失败,则会将异常转发给调用者,future 会因该异常而失败。这包括 failed
投影——如果原始 future 成功完成,则阻塞它会导致抛出 NoSuchElementException
。
或者,调用 Await.ready
会一直等到 future 完成,但不会检索其结果。同样,如果 future 失败,调用该方法不会抛出异常。
Future
特征实现了 Awaitable
特征,方法为 ready()
和 result()
。这些方法不能由客户端直接调用——它们只能由执行上下文调用。
异常
当异步计算抛出未处理的异常时,与这些计算关联的 future 会失败。失败的 future 会存储 Throwable
的实例,而不是结果值。 Future
提供 failed
投影方法,该方法允许将此 Throwable
视为另一个 Future
的成功值。以下异常会收到特殊处理
-
scala.runtime.NonLocalReturnControl[_]
– 此异常保存与返回关联的值。通常,方法体中的return
构造会转换为带有此异常的throw
。存储到 future 或 promise 中的是关联值,而不是保留此异常。 -
ExecutionException
- 当计算因未处理的InterruptedException
、Error
或scala.util.control.ControlThrowable
而失败时存储。在这种情况下,ExecutionException
将未处理的异常作为其原因。其基本原理是防止传播关键的和与控制流相关的异常,这些异常通常不会由客户端代码处理,同时告知客户端计算在哪个 future 中失败。
致命异常(由 NonFatal
确定)从执行失败的异步计算的线程中重新抛出。这会将问题告知管理执行线程的代码,并允许它在必要时快速失败。有关哪些异常被视为致命异常的更精确描述,请参见 NonFatal
。
ExecutionContext.global
默认情况下通过打印堆栈跟踪来处理致命异常。
致命异常意味着与计算关联的 Future
永远不会完成。也就是说,“致命”意味着该错误对于 ExecutionContext
来说是不可恢复的,并且也不打算由用户代码处理。相比之下,应用程序代码可能会尝试从“失败”的 Future
中恢复,该 Future
已完成但带有异常。
可以使用处理致命异常的报告器来自定义执行上下文。请参见工厂方法 fromExecutor
和 fromExecutorService
。
由于有必要为执行线程设置 UncaughtExceptionHandler
,因此当传递 null
执行器时,fromExecutor
将创建一个与 global
配置相同的上下文,但使用提供的报告器来处理异常。
以下示例演示如何获取具有自定义错误处理的 ExecutionContext
,并显示了不同异常的结果,如上所述
import java.util.concurrent.{ForkJoinPool, TimeoutException}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}
object Test extends App {
def crashing(): Int = throw new NoSuchMethodError("test")
def failing(): Int = throw new NumberFormatException("test")
def interrupt(): Int = throw new InterruptedException("test")
def erroring(): Int = throw new AssertionError("test")
// computations can fail in the middle of a chain of combinators, after the initial Future job has completed
def testCrashes()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => crashing())
def testFails()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => failing())
def testInterrupted()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => interrupt())
def testError()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => erroring())
// Wait for 1 second for the the completion of the passed `future` value and print it
def check(future: Future[Int]): Unit =
try {
Await.ready(future, 1.second)
for (completion <- future.value) {
println(s"completed $completion")
// In case of failure, also print the cause of the exception, when defined
completion match {
case Failure(exception) if exception.getCause != null =>
println(s" caused by ${exception.getCause}")
_ => ()
}
}
} catch {
// If the future value did not complete within 1 second, the call
// to `Await.ready` throws a TimeoutException
case _: TimeoutException => println(s"did not complete")
}
def reporter(t: Throwable) = println(s"reported $t")
locally {
// using the `global` implicit context
import ExecutionContext.Implicits._
// a successful Future
check(Future(42)) // completed Success(42)
// a Future that completes with an application exception
check(Future(failing())) // completed Failure(java.lang.NumberFormatException: test)
// same, but the exception is thrown somewhere in the chain of combinators
check(testFails()) // completed Failure(java.lang.NumberFormatException: test)
// a Future that does not complete because of a linkage error;
// the trace is printed to stderr by default
check(testCrashes()) // did not complete
// a Future that completes with an operational exception that is wrapped
check(testInterrupted()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.InterruptedException: test
// a Future that completes due to a failed assert, which is bad for the app,
// but is handled the same as interruption
check(testError()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.AssertionError: test
}
locally {
// same as `global`, but adds a custom reporter that will handle uncaught
// exceptions and errors reported to the context
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(null, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
}
locally {
// does not handle uncaught exceptions; the executor would have to be
// configured separately
val executor = ForkJoinPool.commonPool()
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
// the reporter is not invoked and the Future does not complete
check(testCrashes()) // did not complete
}
locally {
// sample minimal configuration for a context and underlying pool that
// use the reporter
val handler: Thread.UncaughtExceptionHandler =
(_: Thread, t: Throwable) => reporter(t)
val executor = new ForkJoinPool(
Runtime.getRuntime.availableProcessors,
ForkJoinPool.defaultForkJoinWorkerThreadFactory, // threads use the pool's handler
handler,
/*asyncMode=*/ false
)
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
}
}
import java.util.concurrent.{ForkJoinPool, TimeoutException}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}
def crashing(): Int = throw new NoSuchMethodError("test")
def failing(): Int = throw new NumberFormatException("test")
def interrupt(): Int = throw new InterruptedException("test")
def erroring(): Int = throw new AssertionError("test")
// computations can fail in the middle of a chain of combinators,
// after the initial Future job has completed
def testCrashes()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => crashing())
def testFails()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => failing())
def testInterrupted()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => interrupt())
def testError()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => erroring())
// Wait for 1 second for the the completion of the passed `future` value and print it
def check(future: Future[Int]): Unit =
try
Await.ready(future, 1.second)
for completion <- future.value do
println(s"completed $completion")
// In case of failure, also print the cause of the exception, when defined
completion match
case Failure(exception) if exception.getCause != null =>
println(s" caused by ${exception.getCause}")
case _ => ()
catch
// If the future value did not complete within 1 second, the call
// to `Await.ready` throws a TimeoutException
case _: TimeoutException => println(s"did not complete")
def reporter(t: Throwable) = println(s"reported $t")
@main def test(): Unit =
locally:
// using the `global` implicit context
import ExecutionContext.Implicits.given
// a successful Future
check(Future(42)) // completed Success(42)
// a Future that completes with an application exception
check(Future(failing())) // completed Failure(java.lang.NumberFormatException: test)
// same, but the exception is thrown somewhere in the chain of combinators
check(testFails()) // completed Failure(java.lang.NumberFormatException: test)
// a Future that does not complete because of a linkage error;
// the trace is printed to stderr by default
check(testCrashes()) // did not complete
// a Future that completes with an operational exception that is wrapped
check(testInterrupted()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.InterruptedException: test
// a Future that completes due to a failed assert, which is bad for the app,
// but is handled the same as interruption
check(testError()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.AssertionError: test
locally:
// same as `global`, but adds a custom reporter that will handle uncaught
// exceptions and errors reported to the context
given ExecutionContext = ExecutionContext.fromExecutor(null, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
locally:
// does not handle uncaught exceptions; the executor would have to be
// configured separately
val executor = ForkJoinPool.commonPool()
given ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
// the reporter is not invoked and the Future does not complete
check(testCrashes()) // did not complete
locally:
// sample minimal configuration for a context and underlying pool that
// use the reporter
val handler: Thread.UncaughtExceptionHandler =
(_: Thread, t: Throwable) => reporter(t)
val executor = new ForkJoinPool(
Runtime.getRuntime.availableProcessors,
ForkJoinPool.defaultForkJoinWorkerThreadFactory, // threads use the pool's handler
handler,
/*asyncMode=*/ false
)
given ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
end test
承诺
到目前为止,我们只考虑了使用 Future
方法启动的异步计算创建的 Future
对象。但是,也可以使用承诺创建期货。
虽然期货被定义为为尚未存在的某个结果创建的只读占位符对象,但承诺可以被认为是一个可写的、单赋值容器,它完成了一个期货。也就是说,可以使用 success
方法通过成功完成一个承诺(“完成”该承诺)来使用一个承诺来用一个值成功完成一个期货。相反,还可以使用 failure
方法通过使承诺失败来使用承诺用异常完成期货。
承诺 p
完成由 p.future
返回的期货。此期货特定于承诺 p
。根据实现,可能是 p.future eq p
。
考虑以下生产者-消费者示例,其中一个计算产生一个值并将其传递给另一个消耗该值的计算。此值的传递是使用承诺完成的。
import scala.concurrent.{ Future, Promise }
import scala.concurrent.ExecutionContext.Implicits.global
val p = Promise[T]()
val f = p.future
val producer = Future {
val r = produceSomething()
p.success(r)
continueDoingSomethingUnrelated()
}
val consumer = Future {
startDoingSomething()
f.foreach { r =>
doSomethingWithResult()
}
}
在此,我们创建一个承诺并使用其 future
方法来获取它完成的 Future
。然后,我们开始两个异步计算。第一个执行一些计算,产生一个值 r
,然后使用该值通过实现承诺 p
来完成未来 f
。第二个执行一些计算,然后读取已完成的未来 f
的结果 r
。请注意,consumer
可以获取结果,而无需等待 producer
任务完成执行 continueDoingSomethingUnrelated()
方法。
如前所述,承诺具有单一赋值语义。因此,它们只能完成一次。对已完成(或失败)的承诺调用 success
将抛出 IllegalStateException
。
以下示例显示如何使承诺失败。
val p = Promise[T]()
val f = p.future
val producer = Future {
val r = someComputation
if (isInvalid(r))
p.failure(new IllegalStateException)
else {
val q = doSomeMoreComputation(r)
p.success(q)
}
}
val p = Promise[T]()
val f = p.future
val producer = Future {
val r = someComputation
if isInvalid(r) then
p.failure(new IllegalStateException)
else
val q = doSomeMoreComputation(r)
p.success(q)
}
在此,producer
计算中间结果 r
,并检查它是否有效。如果无效,它将通过使用异常完成承诺 p
来使承诺失败。在这种情况下,关联的未来 f
失败。否则,producer
将继续其计算,并最终通过完成承诺 p
来使用有效结果完成未来 f
。
承诺也可以使用 complete
方法完成,该方法采用一个潜在值 Try[T]
– 即类型为 Failure[Throwable]
的失败结果或类型为 Success[T]
的成功结果。
类似于 success
,对已完成的承诺调用 failure
和 complete
将抛出 IllegalStateException
。
使用迄今为止描述的操作编写的程序的一个优点是,通过单子操作组合而成的 promise 和 future,在没有副作用的情况下,这些程序是确定性的。此处的确定性意味着,在程序中未抛出任何异常的情况下,程序的结果(在 future 中观察到的值)将始终相同,而与并行程序的执行计划无关。
在某些情况下,客户端可能只想在 promise 尚未完成时才完成它(例如,从多个不同的 future 执行了多个 HTTP 请求,并且客户端只对第一个 HTTP 响应感兴趣 - 对应于完成 promise 的第一个 future)。出于这些原因,promise 上存在方法 tryComplete
、trySuccess
和 tryFailure
。客户端应意识到,使用这些方法会导致程序不确定,而是取决于执行计划。
方法 completeWith
使用另一个 future 完成 promise。在 future 完成后,promise 也将使用该 future 的结果完成。以下程序打印 1
val f = Future { 1 }
val p = Promise[Int]()
p.completeWith(f)
p.future.foreach { x =>
println(x)
}
当使用异常使 promise 失败时,将专门处理 Throwable
的三个子类型。如果用于中断 promise 的 Throwable
是 scala.runtime.NonLocalReturnControl
,则 promise 将使用相应的值完成。如果用于中断 promise 的 Throwable
是 Error
、InterruptedException
或 scala.util.control.ControlThrowable
的实例,则 Throwable
将被包装为新 ExecutionException
的原因,而该异常又会使 promise 失败。
使用 promise,future 的 onComplete
方法和 future
构造,您可以实现前面描述的任何函数组合组合器。假设您想实现一个新的组合器 first
,它采用两个 future f
和 g
,并生成一个第三个 future,该 future 由 f
或 g
完成(以先完成者为准),但仅在成功的情况下。
以下是如何执行此操作的示例
def first[T](f: Future[T], g: Future[T]): Future[T] = {
val p = Promise[T]
f.foreach { x =>
p.trySuccess(x)
}
g.foreach { x =>
p.trySuccess(x)
}
p.future
}
def first[T](f: Future[T], g: Future[T]): Future[T] =
val p = Promise[T]
f.foreach { x =>
p.trySuccess(x)
}
g.foreach { x =>
p.trySuccess(x)
}
p.future
请注意,在此实现中,如果 f
和 g
都未成功,则 first(f, g)
永远不会完成(无论是使用值还是异常)。
实用工具
为了简化并发应用程序中的时间处理,scala.concurrent
引入了 Duration
抽象。不应将 Duration
视为另一个通用时间抽象。它旨在与并发库一起使用,并驻留在 scala.concurrent
包中。
Duration
是表示时间长度的基本类。它可以是有限的,也可以是无限的。有限的持续时间用 FiniteDuration
类表示,该类由 Long
长度和 java.util.concurrent.TimeUnit
构成。无限持续时间(也从 Duration
扩展)仅存在两个实例,Duration.Inf
和 Duration.MinusInf
。该库还提供了几个 Duration
子类,用于隐式转换,不应使用这些子类。
抽象 Duration
包含允许以下操作的方法
- 转换为不同的时间单位(
toNanos
、toMicros
、toMillis
、toSeconds
、toMinutes
、toHours
、toDays
和toUnit(unit: TimeUnit)
)。 - 比较持续时间(
<
、<=
、>
和>=
)。 - 算术运算(
+
、-
、*
、/
和unary_-
)。 this
持续时间和参数中提供的持续时间之间的最小值和最大值(min
、max
)。- 检查持续时间是否有限(
isFinite
)。
Duration
可以通过以下方式实例化
- 隐式地从类型
Int
和Long
,例如,val d = 100 millis
。 - 通过传递
Long
长度和java.util.concurrent.TimeUnit
,例如,val d = Duration(100, MILLISECONDS)
。 - 通过解析表示时间段的字符串,例如,
val d = Duration("1.2 µs")
。
Duration 还提供了 unapply
方法,因此它可以在模式匹配结构中使用。示例
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit._
// instantiation
val d1 = Duration(100, MILLISECONDS) // from Long and TimeUnit
val d2 = Duration(100, "millis") // from Long and String
val d3 = 100 millis // implicitly from Long, Int or Double
val d4 = Duration("1.2 µs") // from String
// pattern matching
val Duration(length, unit) = 5 millis
import scala.concurrent.duration.*
import java.util.concurrent.TimeUnit.*
// instantiation
val d1 = Duration(100, MILLISECONDS) // from Long and TimeUnit
val d2 = Duration(100, "millis") // from Long and String
val d3 = 100.millis // implicitly from Long, Int or Double
val d4 = Duration("1.2 µs") // from String
// pattern matching
val Duration(length, unit) = 5.millis