当你想在 Scala 中编写并行和并发应用程序时,可以使用本机 Java Thread,但 Scala Future 提供了一种更高级和更惯用的方法,因此更受青睐,并在本章中介绍。
简介
以下是 Scala Future 的 Scaladoc 描述
“
Future表示一个值,它当前可能可用或不可用,但将在某个时间点可用,或者如果无法提供该值,则表示一个异常。”
为了演示这意味着什么,我们先来看看单线程编程。在单线程世界中,你可以将方法调用的结果绑定到变量,如下所示
def aShortRunningTask(): Int = 42
val x = aShortRunningTask()
在此代码中,值 42 立即绑定到 x。
当您使用 Future 时,分配过程看起来类似
def aLongRunningTask(): Future[Int] = ???
val x = aLongRunningTask()
但这种情况下的主要区别在于,由于 aLongRunningTask 返回需要不确定时间,因此 x 中的值可能当前不可用,但它将在某个时间点(未来)可用。
另一种看待此问题的方法是阻塞。在此单线程示例中,println 语句直到 aShortRunningTask 完成才打印
def aShortRunningTask(): Int =
Thread.sleep(500)
42
val x = aShortRunningTask()
println("Here")
相反,如果 aShortRunningTask 被创建为 Future,则 println 语句几乎立即打印,因为 aShortRunningTask 在其他线程上产生 - 它不会阻塞。
在本章中,您将看到如何使用 future,包括如何在并行运行多个 future 并在 for 表达式中组合其结果。您还将看到用于处理 future 中的值(一旦返回)的方法示例。
当您考虑 future 时,重要的是要知道它们被设计为一次性,“在其他线程上处理此相对较慢的计算,并在完成后用结果给我回电”的结构。作为对比点,Akka 旨在长期运行并在其生命周期内响应许多请求。虽然一个 actor 可能永远存在,但 future 最终包含仅运行一次的计算结果。
REPL 中的示例
future 用于创建临时的并发空间。例如,当您需要调用运行不确定时间量的算法(例如调用远程微服务)时,您会使用 future,因此您希望在主线程之外运行它。
为了演示其工作原理,让我们从 REPL 中的 Future 示例开始。首先,粘贴这些必需的 import 语句
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
现在,您已准备好创建未来。对于此示例,首先定义一个长期运行的单线程算法
def longRunningAlgorithm() =
Thread.sleep(10_000)
42
该花哨算法在十秒延迟后返回整数值 42。现在,通过将其包装到 Future 构造函数中并为变量分配结果来调用该算法
scala> val eventualInt = Future(longRunningAlgorithm())
eventualInt: scala.concurrent.Future[Int] = Future(<not completed>)
您的计算(调用 longRunningAlgorithm())立即开始运行。如果您立即检查变量 eventualInt 的值,您会看到未来尚未完成
scala> eventualInt
val res1: scala.concurrent.Future[Int] = Future(<not completed>)
但是,如果您在十秒后再次检查,您会看到它已成功完成
scala> eventualInt
val res2: scala.concurrent.Future[Int] = Future(Success(42))
虽然这是一个相对简单的示例,但它展示了基本方法:只需使用您的长期运行算法构建一个新的 Future。
需要注意的一件事是,您预期的 42 被包装在 Success 中,而 Success 进一步被包装在 Future 中。这是一个需要理解的关键概念:Future 中的值始终是 scala.util.Try 类型之一的实例:Success 或 Failure。因此,当您使用 future 的结果时,您使用通常的 Try 处理技术。
使用 map 与 futures
Future 有一个 map 方法,您可以像在集合上使用 map 方法一样使用它。这是在创建变量 a 之后立即调用 map 时结果的样子
scala> val a = Future(longRunningAlgorithm()).map(_ * 2)
a: scala.concurrent.Future[Int] = Future(<not completed>)
如所示,对于使用 longRunningAlgorithm 创建的 future,初始输出显示 Future(<not completed>)。但是,当您在十秒后检查 a 的值时,您会看到它包含预期的结果 84
scala> a
res1: scala.concurrent.Future[Int] = Future(Success(84))
再次,成功的结果被包装在 Success 和 Future 中。
使用回调方法与 futures
除了像 map 这样的高阶函数之外,您还可以将回调方法与 futures 一起使用。一个常用的回调方法是 onComplete,它采用一个部分函数,您可以在其中处理 Success 和 Failure 案例
Future(longRunningAlgorithm()).onComplete {
case Success(value) => println(s"Got the callback, value = $value")
case Failure(e) => e.printStackTrace
}
当你将该代码粘贴到 REPL 中时,最终会看到结果
Got the callback, value = 42
其他 Future 方法
Future 类有其他你可以使用的方法。它有一些可以在 Scala 集合类中找到的方法,包括
filterflatMapmap
它的回调方法是
onCompleteandThenforeach
其他转换方法包括
fallbackTorecoverrecoverWith
请参阅Futures and Promises页面,了解可用于 future 的其他方法。
运行多个 future 并连接其结果
要并行运行多个计算并在所有 future 完成时连接其结果,请使用 for 表达式。
正确的方法是
- 启动返回
Future结果的计算 - 在
for表达式中合并其结果 - 使用
onComplete或类似技术提取合并的结果
一个示例
以下示例显示了正确方法的三个步骤。关键是首先启动返回 future 的计算,然后在 for 表达式中连接它们
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
val startTime = System.currentTimeMillis()
def delta() = System.currentTimeMillis() - startTime
def sleep(millis: Long) = Thread.sleep(millis)
@main def multipleFutures1 =
println(s"creating the futures: ${delta()}")
// (1) start the computations that return futures
val f1 = Future { sleep(800); 1 } // eventually returns 1
val f2 = Future { sleep(200); 2 } // eventually returns 2
val f3 = Future { sleep(400); 3 } // eventually returns 3
// (2) join the futures in a `for` expression
val result =
for
r1 <- f1
r2 <- f2
r3 <- f3
yield
println(s"in the 'yield': ${delta()}")
(r1 + r2 + r3)
// (3) process the result
result.onComplete {
case Success(x) =>
println(s"in the Success case: ${delta()}")
println(s"result = $x")
case Failure(e) =>
e.printStackTrace
}
println(s"before the 'sleep(3000)': ${delta()}")
// important for a little parallel demo: keep the jvm alive
sleep(3000)
当你运行该应用程序时,你将看到类似这样的输出
creating the futures: 1
before the 'sleep(3000)': 2
in the 'yield': 806
in the Success case: 806
result = 6
正如该输出所示,future 创建得非常快,并且在方法末尾的 sleep(3000) 语句正之前的打印语句在短短两毫秒内就到达了。所有这些代码都在 JVM 的主线程上运行。然后,在 806 毫秒时,三个 future 完成,并且 yield 块中的代码运行。然后,代码立即进入 onComplete 方法中的 Success 案例。
806 毫秒的输出是查看三个计算是否并行运行的关键。如果它们是顺序运行的,总时间将大约为 1400 毫秒,即三个计算的睡眠时间的总和。但由于它们是并行运行的,因此总时间仅比运行时间最长的计算稍长:f1,为 800 毫秒。
请注意,如果计算在
for表达式中运行,它们将顺序执行,而不是并行执行// Sequential execution (no parallelism!) for r1 <- Future { sleep(800); 1 } r2 <- Future { sleep(200); 2 } r3 <- Future { sleep(400); 3 } yield r1 + r2 + r3因此,如果您希望计算可能并行运行,请记住在
for表达式之外运行它们。
返回 Future 的方法
到目前为止,您已经了解如何将单线程算法传递到 Future 构造函数中。您可以使用相同技术创建返回 Future 的方法
// simulate a slow-running method
def slowlyDouble(x: Int, delay: Long): Future[Int] = Future {
sleep(delay)
x * 2
}
与前面的示例一样,只需将方法调用的结果分配给新变量。然后,当您立即检查结果时,您会看到它尚未完成,但在延迟时间之后,future 将有结果
scala> val f = slowlyDouble(2, 5_000L)
val f: concurrent.Future[Int] = Future(<not completed>)
scala> f
val res0: concurrent.Future[Int] = Future(<not completed>)
scala> f
val res1: concurrent.Future[Int] = Future(Success(4))
关于 future 的要点
希望这些示例让您了解 Scala future 的工作原理。总而言之,关于 future 的一些要点是
- 您构建 future 以在主线程之外运行任务
- future 旨在用于一次性、可能长时间运行的并发任务,这些任务最终返回一个值;它们创建了一个临时的并发空间
- 一构建 future,它就会开始运行
- future 比线程的优点在于,它们可与
for表达式配合使用,并附带各种回调方法,简化了使用并发线程的过程 - 使用 future 时,您不必关心线程管理的底层细节
- 您可以使用回调方法(如
onComplete和andThen)或转换方法(如filter、map等)来处理 future 的结果。 Future中的值始终是Try类型之一的实例:Success或Failure- 如果您使用多个 future 来产生单个结果,请将它们组合在
for表达式中
此外,正如您在这些示例中的 import 语句中所看到的,Scala Future 依赖于 ExecutionContext。
有关 future 的更多详细信息,请参阅 Futures and Promises,这是一篇讨论 future、promise 和执行上下文的文章。它还提供了有关如何将 for 表达式转换为 flatMap 操作的讨论。