Scala 3 — 书籍

并发

语言

当你想在 Scala 中编写并行和并发应用程序时,可以使用本机 Java Thread,但 Scala Future 提供了一种更高级和更惯用的方法,因此更受青睐,并在本章中介绍。

简介

以下是 Scala Future 的 Scaladoc 描述

Future 表示一个值,它当前可能可用或不可用,但将在某个时间点可用,或者如果无法提供该值,则表示一个异常。”

为了演示这意味着什么,我们先来看看单线程编程。在单线程世界中,你可以将方法调用的结果绑定到变量,如下所示

def aShortRunningTask(): Int = 42
val x = aShortRunningTask()

在此代码中,值 42 立即绑定到 x

当您使用 Future 时,分配过程看起来类似

def aLongRunningTask(): Future[Int] = ???
val x = aLongRunningTask()

但这种情况下的主要区别在于,由于 aLongRunningTask 返回需要不确定时间,因此 x 中的值可能当前不可用,但它将在某个时间点(未来)可用。

另一种看待此问题的方法是阻塞。在此单线程示例中,println 语句直到 aShortRunningTask 完成才打印

def aShortRunningTask(): Int =
  Thread.sleep(500)
  42
val x = aShortRunningTask()
println("Here")

相反,如果 aShortRunningTask 被创建为 Future,则 println 语句几乎立即打印,因为 aShortRunningTask 在其他线程上产生 - 它不会阻塞。

在本章中,您将看到如何使用 future,包括如何在并行运行多个 future 并在 for 表达式中组合其结果。您还将看到用于处理 future 中的值(一旦返回)的方法示例。

当您考虑 future 时,重要的是要知道它们被设计为一次性,“在其他线程上处理此相对较慢的计算,并在完成后用结果给我回电”的结构。作为对比点,Akka 旨在长期运行并在其生命周期内响应许多请求。虽然一个 actor 可能永远存在,但 future 最终包含仅运行一次的计算结果。

REPL 中的示例

future 用于创建临时的并发空间。例如,当您需要调用运行不确定时间量的算法(例如调用远程微服务)时,您会使用 future,因此您希望在主线程之外运行它。

为了演示其工作原理,让我们从 REPL 中的 Future 示例开始。首先,粘贴这些必需的 import 语句

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

现在,您已准备好创建未来。对于此示例,首先定义一个长期运行的单线程算法

def longRunningAlgorithm() =
  Thread.sleep(10_000)
  42

该花哨算法在十秒延迟后返回整数值 42。现在,通过将其包装到 Future 构造函数中并为变量分配结果来调用该算法

scala> val eventualInt = Future(longRunningAlgorithm())
eventualInt: scala.concurrent.Future[Int] = Future(<not completed>)

您的计算(调用 longRunningAlgorithm())立即开始运行。如果您立即检查变量 eventualInt 的值,您会看到未来尚未完成

scala> eventualInt
val res1: scala.concurrent.Future[Int] = Future(<not completed>)

但是,如果您在十秒后再次检查,您会看到它已成功完成

scala> eventualInt
val res2: scala.concurrent.Future[Int] = Future(Success(42))

虽然这是一个相对简单的示例,但它展示了基本方法:只需使用您的长期运行算法构建一个新的 Future

需要注意的一件事是,您预期的 42 被包装在 Success 中,而 Success 进一步被包装在 Future 中。这是一个需要理解的关键概念:Future 中的值始终是 scala.util.Try 类型之一的实例:SuccessFailure。因此,当您使用 future 的结果时,您使用通常的 Try 处理技术。

使用 map 与 futures

Future 有一个 map 方法,您可以像在集合上使用 map 方法一样使用它。这是在创建变量 a 之后立即调用 map 时结果的样子

scala> val a = Future(longRunningAlgorithm()).map(_ * 2)
a: scala.concurrent.Future[Int] = Future(<not completed>)

如所示,对于使用 longRunningAlgorithm 创建的 future,初始输出显示 Future(<not completed>)。但是,当您在十秒后检查 a 的值时,您会看到它包含预期的结果 84

scala> a
res1: scala.concurrent.Future[Int] = Future(Success(84))

再次,成功的结果被包装在 SuccessFuture 中。

使用回调方法与 futures

除了像 map 这样的高阶函数之外,您还可以将回调方法与 futures 一起使用。一个常用的回调方法是 onComplete,它采用一个部分函数,您可以在其中处理 SuccessFailure 案例

Future(longRunningAlgorithm()).onComplete {
  case Success(value) => println(s"Got the callback, value = $value")
  case Failure(e) => e.printStackTrace
}

当你将该代码粘贴到 REPL 中时,最终会看到结果

Got the callback, value = 42

其他 Future 方法

Future 类有其他你可以使用的方法。它有一些可以在 Scala 集合类中找到的方法,包括

  • filter
  • flatMap
  • map

它的回调方法是

  • onComplete
  • andThen
  • foreach

其他转换方法包括

  • fallbackTo
  • recover
  • recoverWith

请参阅Futures and Promises页面,了解可用于 future 的其他方法。

运行多个 future 并连接其结果

要并行运行多个计算并在所有 future 完成时连接其结果,请使用 for 表达式。

正确的方法是

  1. 启动返回 Future 结果的计算
  2. for 表达式中合并其结果
  3. 使用 onComplete 或类似技术提取合并的结果

一个示例

以下示例显示了正确方法的三个步骤。关键是首先启动返回 future 的计算,然后在 for 表达式中连接它们

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

val startTime = System.currentTimeMillis()
def delta() = System.currentTimeMillis() - startTime
def sleep(millis: Long) = Thread.sleep(millis)

@main def multipleFutures1 =

  println(s"creating the futures:   ${delta()}")

  // (1) start the computations that return futures
  val f1 = Future { sleep(800); 1 }   // eventually returns 1
  val f2 = Future { sleep(200); 2 }   // eventually returns 2
  val f3 = Future { sleep(400); 3 }   // eventually returns 3

  // (2) join the futures in a `for` expression
  val result =
    for
      r1 <- f1
      r2 <- f2
      r3 <- f3
    yield
      println(s"in the 'yield': ${delta()}")
      (r1 + r2 + r3)

  // (3) process the result
  result.onComplete {
    case Success(x) =>
      println(s"in the Success case: ${delta()}")
      println(s"result = $x")
    case Failure(e) =>
      e.printStackTrace
  }

  println(s"before the 'sleep(3000)': ${delta()}")

  // important for a little parallel demo: keep the jvm alive
  sleep(3000)

当你运行该应用程序时,你将看到类似这样的输出

creating the futures:   1
before the 'sleep(3000)': 2
in the 'yield': 806
in the Success case: 806
result = 6

正如该输出所示,future 创建得非常快,并且在方法末尾的 sleep(3000) 语句正之前的打印语句在短短两毫秒内就到达了。所有这些代码都在 JVM 的主线程上运行。然后,在 806 毫秒时,三个 future 完成,并且 yield 块中的代码运行。然后,代码立即进入 onComplete 方法中的 Success 案例。

806 毫秒的输出是查看三个计算是否并行运行的关键。如果它们是顺序运行的,总时间将大约为 1400 毫秒,即三个计算的睡眠时间的总和。但由于它们是并行运行的,因此总时间仅比运行时间最长的计算稍长:f1,为 800 毫秒。

请注意,如果计算在 for 表达式中运行,它们将顺序执行,而不是并行执行

// Sequential execution (no parallelism!)
for
  r1 <- Future { sleep(800); 1 }
  r2 <- Future { sleep(200); 2 }
  r3 <- Future { sleep(400); 3 }
yield
  r1 + r2 + r3

因此,如果您希望计算可能并行运行,请记住在 for 表达式之外运行它们。

返回 Future 的方法

到目前为止,您已经了解如何将单线程算法传递到 Future 构造函数中。您可以使用相同技术创建返回 Future 的方法

// simulate a slow-running method
def slowlyDouble(x: Int, delay: Long): Future[Int] = Future {
  sleep(delay)
  x * 2
}

与前面的示例一样,只需将方法调用的结果分配给新变量。然后,当您立即检查结果时,您会看到它尚未完成,但在延迟时间之后,future 将有结果

scala> val f = slowlyDouble(2, 5_000L)
val f: concurrent.Future[Int] = Future(<not completed>)

scala> f
val res0: concurrent.Future[Int] = Future(<not completed>)

scala> f
val res1: concurrent.Future[Int] = Future(Success(4))

关于 future 的要点

希望这些示例让您了解 Scala future 的工作原理。总而言之,关于 future 的一些要点是

  • 您构建 future 以在主线程之外运行任务
  • future 旨在用于一次性、可能长时间运行的并发任务,这些任务最终返回一个值;它们创建了一个临时的并发空间
  • 一构建 future,它就会开始运行
  • future 比线程的优点在于,它们可与 for 表达式配合使用,并附带各种回调方法,简化了使用并发线程的过程
  • 使用 future 时,您不必关心线程管理的底层细节
  • 您可以使用回调方法(如 onCompleteandThen)或转换方法(如 filtermap 等)来处理 future 的结果。
  • Future 中的值始终是 Try 类型之一的实例:SuccessFailure
  • 如果您使用多个 future 来产生单个结果,请将它们组合在 for 表达式中

此外,正如您在这些示例中的 import 语句中所看到的,Scala Future 依赖于 ExecutionContext

有关 future 的更多详细信息,请参阅 Futures and Promises,这是一篇讨论 future、promise 和执行上下文的文章。它还提供了有关如何将 for 表达式转换为 flatMap 操作的讨论。

此页面的贡献者