注册

计算耗时? Isolate 来帮忙

一、问题引入 - 计算密集型任务


假如现在有个需求,我想要计算 1 亿1~10000 间随机数的平均值,在界面上显示结果,该怎么办?

可能有小伙伴踊跃发言:这还不简单,生成 1 亿 个随机数,算呗。




1. 搭建测试场景

如下,写个简单的测试界面,界面中有计算结果和耗时的信息。点击运行按钮,触发 _doTask 方法进行运算。计算完后将结果展示出来:



代码详见: 【async/isolate/01】



1667781807811.png


void _doTask() {
int sum = 0;
int startTime = DateTime.now().millisecondsSinceEpoch;
for(int i = 0;i;i++){>


可以看到,这样是可以实现需求的,总耗时在 8.5 秒左右。细心的朋友可能会发现,在点击按键触发 _doTask 时,FloatingActionButton 的水波纹并没有出现,仿佛是卡死一般。为了应证这点,我们再进行一个对比实验。















请点击前请点击后

2. 计算耗时阻塞

如下所示,我们让 CupertinoActivityIndicator 一直处于运动状态,作为界面 未被卡死 的标志。当点击运行时,可以看出指示器被卡住了, 再点击按钮也没有任何的水波纹反映,这说明:



计算的耗时任务会阻塞 Dart 的线程,界面因此无法有任何响应。
















未执行前执行前后37.gif35.gif
Row(
mainAxisAlignment: MainAxisAlignment.center,
children: const [Text("动画指示器示意: "), CupertinoActivityIndicator()],
),


3. 计算耗时阻塞的解决方案

有人说,用异步的方式触发 _doTask 呗,比如用 FuturescheduleMicrotask 包一下,或 Stream 异步处理。有这个想法的人可以试一试,如果你看懂前面几篇看到了原理,就知道是不可行的,这些工具只不过是回调包装而已。只要计算的任务仍是 Dart 在单线程中处理的,就无法避免阻塞。现在的问题相当于:



一个人无法同时做 洗漱扫地 的任务。



一旦阻塞,界面就无法有任何响应,自然也无法展示加载中的动画,这对于用户体验来说是极其糟糕的。那如何让计算密集型的耗时任务,在处理时不阻塞呢? 我们可以好好品味一下这句话:


image.png


这句话言外之意给出了两种解决方案:



【1】. 将计算密集型的耗时任务,从 Dart 端剥离,交由 其他机体 来处理。

【2】. 在 Dart 中通过 多线程 的方式处理,从而不阻塞主线程。



方式一其实很好理解,比如耗时的任务交由服务端来完成,客户端通过 接口请求 ,获取响应结果。这样计算型的密集任务,对于 Flutter 而言,就转换成了一个网络的 IO 任务。或者通过 插件 的方式,将计算的耗时任务交由平台来通过多线程处理,而 Dart 端只需要通过回调处理即可,也不会阻塞。


方式一处理的本质上都是将计算密集型的任务转移到其他机体中,从而让 Dart 避免处理计算密集型的耗时任务。这种方式需要其他语言或后端的支持,想要实现是有一定门槛的。那如何直接在 Flutter 中,通过 Dart 语言处理计算密集型的任务呢?


这就是我们今天的主角: Isolate 。 可能很多人潜意识里 Dart 是单线程模型,无法通过多线程的处理任务,这种认知就狭隘了。其实 Dart 提供了 Isolate, 本质上是通过 C++ 创建线程,隔离出另一份区间来通过 Dart 处理任务。它相当于线程的一种上层封装,屏蔽了很多内部细节,可以通过 Dart 语言直接操作。



二、从 compute 函数认识 Isolate


首先,我们通过 compute 函数认识一下计算密集型的耗时任务该如何处理。 compute 函数字如其名,用于处理计算。只要简单看一下,就知道它本身是 Isolate 的一个简单的封装使用方式。它作为全局函数被定义在 foundation/isolates.dart 中:


image.png



1. 认识 compute 函数

既然是函数,那使用时就非常简单,调用就行了。关于函数的调用,比较重要的是 入参返回值泛型。从上面函数定义中可以看出,它就是 isolate 包中的 compute 函数, 其中泛型有两个 QR ,返回值是 R 泛型的 Future 对象,很明显该泛型表示结果 Result;第二入参是 Q 泛型的 message ,表示消息类型;第三入参是可选参数,用于调试时的标签。


---->[_isolates_io.dart#compute]----
/// The dart:io implementation of [isolate.compute].
Future compute(
isolates.ComputeCallback callback,
Q message,
{ String? debugLabel })
async {
,>
,>

看到这里,很自然地就可以想到,这里第一参中传入的 callback 就是计算任务,它将被在其他的 isolate 中被执行,然后返回计算结果。下面我们来看一下在当前场景下的使用方式。在此之前,先封装一下返回的结果。通过 TaskResult 记录结果,作为 compute 的返回值:



代码详见: 【async/isolate/02_compute】



class TaskResult {
final int cost;
final double result;

TaskResult({required this.cost, required this.result});
}


2. compute 函数的使用

compute 方法在传入两个参数,其一是 _doTaskInCompute ,也就是计算的耗时任务,其二是传递的信息,这里不需要,传空值字符串。虽然方法的泛型可以不传,但严谨一些的话,可也以把泛型加上,这样可读性更好一些:


void _doTask() async {
TaskResult taskResult = await compute(
_doTaskInCompute, '',
debugLabel: "task1");
setState(() {
result = taskResult.result;
cost = taskResult.cost;
});
},>


对于 compute 而言,传入的回调有一个非常重要的注意点:



函数必须是 静态函数 或者 全局函数



static Random random = Random();

static Future _doTaskInCompute(String arg) async {
int count = 100000000;
double result = 0;
int cost = 0;
int sum = 0;
int startTime = DateTime.now().millisecondsSinceEpoch;
for (int i = 0; i < count; i++) {
sum += random.nextInt(10000);
}
int endTime = DateTime.now().millisecondsSinceEpoch;
result = sum / count;
cost = endTime - startTime;
return TaskResult(
result: result,
cost: cost,
);
}


下面看一下用和不用 compute 处理的效果差异,如下左图是使用 compute 的效果,在进行计算的同时指示器的动画仍在运动,桌面计算操作并未影响主线程,界面仍可以触发响应,这就和前面产生了鲜明的对比。















用 compute不用 compute38.gif35.gif

3. 理解 compute 的作用

如下,在 _doTaskInCompute 中打断点调试一下,可以看出此时除了 main 还有一个 task1 的栈帧。此时断点停留在新帧中, main 仍处于运行状态:


image.png


image.png


这就相当于计算任务不想自己处理,找另外一个人来做。每块处理任务的单元,就可以视为一个 isolate。它们之间的信息数据在内存中是不互通的,这也是为什么起名为 隔离 isolate 的原因。 这种特性能非常有效地避免多线程中操作同一内存数据的风险。 但同时也需要引入一个 通信机制 来处理两个 isolate 间的通信。


image.png


其实这和 客户端 - 服务端 的模型非常相似,通过 发送端 SendPort 发送消息,通过接收端 RawReceivePort 接收消息。从 compute 方法的源码中可以简单地看出,其本质是通过 Isolate.spawn 实现的 Isolate 创建。


image.png


这里有个小细节要注意,通过多次测试发现 compute 中的计算耗时要普遍高于主线程中的耗时。这并不是说新建的 isolate 在计算能力上远小于 主 isolate, 毕竟这里是 1 亿 次的计算,任何微小的细节都将被放大 1 亿 倍。这里的关注点应在于 新 isolate 可以独立于 主 isolate 运行,并且可以通过通信机制将结果返回给 主 isolate



4. compute 参数传递与多个 isolate

如果是大量的相互独立的计算耗时任务,可以开启多个 isolate 共同处理,最后进行结果汇总。比如这里 1 亿 次的计算,我们可以开 2isolate , 分别处理 5000 万 个计算任务。如下所示,总耗时就是 6 秒左右。当然创建 isolate 也是有资源消耗的,并不是说创建 100 个就能把耗时降低 100 倍。


39.gif


关于传参非常简单,compute 第一泛型是参数类型,这里可以指定 int类型作为 _doTaskInCompute 任务的入参,指定计算的次数。这里通过两个 compute 创建两个 isolate 同时处理 5000 万 个随机数的的平均值,来模拟那些相互独立的任务:



代码详见: 【async/isolate/03_compute】



image.png


最后通过 Future.await 对多个异步任务进行结果汇总,示意图如下,这样就相当于又开了一个 isolate 进行处理计算任务:


image.png


对于 isolate 千万不要盲目使用,一定要认清当前任务是否真有必要使用。比如几百微秒就能处理完成的任务,用 isolate 就是拿导弹打蚊子。或者那些并非由 Dart 端处理的 IO 密集型 任务,用 isolate 就相当于你打开了烧水按钮,又找来一个人专门看着烧水的过程。这种多此一举的行为,都是对于异步不理解的表现。


一般而言,客户端中并没有太多需要处理复杂计算的场景,只有一些特定场景的软件,比如需要进行大量的文字解析、复杂的图片处理等。



三、分析 compute 函数的源码实现


到这可能有人觉得,新开一个 isolate好简单啊,compute 函数处理一下就好啦。但是,简单必然有简单的 局限性,仔细思考一下,会发现 compute 函数有个缺陷:它只会 "闷头干活",只有任务完成才会通过 Future 通知 main isolate


也就是说,对于 UI 界面来说无法无法感知到 任务执行进度 信息,处理展示 计算中... 之外没什么能干的。这在某些特别耗时的场景中会造成用户的等待焦虑,我们需要让干活的 isolate 抽空通知一下 main isolate,所以对 isolate 之间的通信方式,是有必要了解的。


image.png


既然 compute 在完成任务时可以进行一次通信,那么就可以从 compute 函数的源码中去分析这种通信的方式。



1. 接收端口的创建与处理器设置

如下所示,在一开始会创建一个 Flow 对象,从该对象的成员中可以看出,它只负责维护两个整型 id_type 的数值信息。接下来会创建 RawReceivePort 对象,是不是有点眼熟?


image.png



还记得那个经常在面前晃的 _RawRecivePortImpl类吗? RawReceivePort 的默认工厂构造方法创建的就是 _RawReceivePortImpl 对象,如下代码所示:


image.png


---->[isolate_patch.dart/RawReceivePort]----
@patch
class RawReceivePort {
@patch
factory RawReceivePort([Function? handler, String debugName = '']) {
_RawReceivePortImpl result = new _RawReceivePortImpl(debugName);
result.handler = handler;
return result;
}
}


接下来,会创建一个 Completer 对象,并在为 port 设置信息的 handler 处理器,在处理回调中触发 completer#complete 方法,表示异步任务完成。也就是说处理器接收信息之时,就是 completer 中异步任务完成之日。


如果不知道 Completer 和接收端口设置 handler 是干嘛的,可以分别到 【第五篇·第二节】【第六篇·第一节】 温故,这里就不赘述了。


---->[_isolates_io.dart#compute]----
final Completer completer = Completer();
port.handler = (dynamic msg) {
timeEndAndCleanup();
completer.complete(msg);
};


2. 认识 Isolate.spawn 方法

接下来会触发 Isolate.spawn 方法,该方法是生成 isolate 的核心。其中传入的 回调 callback消息 message 以及发送的端口 SendPort 会组合成 _IsolateConfiguration 作为第二参数:


image.png


通过 Isolate.spawn 方法的定义可以看出,第一参是一个入口函数,第二参是函数入参。所以上面红框中的对象将作为 _spawn 函数的入参。从这里可以看出第一参 _spawn 函数应该是在新 isolate 中执行的。


external static Future spawn(
void entryPoint(T message), T message,
{bool paused = false,
bool errorsAreFatal = true,
SendPort? onExit,
SendPort? onError,
@Since("2.3") String? debugName})
;


下面是在耗时任务中打断点的效果,其中很清晰地展现出 _spawn 方法到 _doTaskInCompute 的过程。


image.png


如下,是 _spawn 的处理流程,上面的调试发生在 127 行,此时触发回调方法,获取结果。然后在关闭 isolate 时,将结果发送出去,流程其实并不复杂。


image.png


有一个小细节,结果通过 _buildSuccessResponse 方法处理了一下,关闭时发送的消息是列表,后期会根据列表的长度判断任务处理的正确性。


List _buildSuccessResponse(R result) {
return List.filled(1, result);
}


3. 异步任务的结束

从前面测试中可以知道 compute 函数返回值是一个泛型为结果的 Future 对象,那这个返回值是什么呢?如下可以看出当结果列表长度为 1 表示任务成功完成,返回 completer 任务结果的首元素:


image.png


再结合 completer 触发 complete 完成的时机,就不难知道。最终的结果是由接收端接收到的信息,调试如下:


image.png


也就是说,isolate 关闭时发送的信息,将会被 接收端的处理器 监听到。这就是 compute 函数源码的全部处理逻辑,总的来看还是非常简单的。就是,使用 Completer ,基于 Isolate.spawn 的简单封装,屏蔽了用户对 RawReceivePort 的感知,从而简化使用。



四、Isolate 发送和接收消息的使用


通过 compute 函数我们知道 isoalte 之间有着一套消息 发送 - 监听 的机制。我们可以利用这个机制在某些时刻发送进度消息传给 main isolate,这样 UI 界面中就可以展示出 耗时任务 的进度。如下所示,每当 100 万次 计算时,发送消息通知 main isolate :


40.gif



1. 使用 Isolate.spawn

compute 函数为了简化使用,将 发送 - 监听 的处理封装在了内部,用户无法操作。使用为了能使用该功能,我们可以主动来使用 Isolate.spawn 。如下所示,创建 RawReceivePort,并设置 handler 处理器器,这里通过 handleMessage 函数来单独处理。



代码详见: 【async/isolate/04_spawn】



然后调用 Isolate.spawn 来开启新 isolate,其中第一参是在新 isolate 中处理的耗时任务,第二参是任务的入参。这里将发送端口传入 _doTaskInCompute 方法,以便发送消息:


void _doTask() async {
final receivePort = RawReceivePort();
receivePort.handler = handleMessage;
await Isolate.spawn(
_doTaskInCompute,
receivePort.sendPort,
onError: receivePort.sendPort,
onExit: receivePort.sendPort,
);
}


2. 通过端口发送消息

SendPort 传入 _doTaskInCompute 中,如下 tag1 处,可以每隔 1000000 次发送一次进度通知。在任务完成后,使用 Isolate.exit 方法关闭当前 isolate 并发送结果数据。


static void _doTaskInCompute(SendPort port) async {
int count = 100000000;
double result = 0;
int cost = 0;
int sum = 0;
int startTime = DateTime.now().millisecondsSinceEpoch;
for (int i = 0; i < count; i++) {
sum += random.nextInt(10000);
if (i % 1000000 == 0) { // tag1
port.send(i / count);
}
}
int endTime = DateTime.now().millisecondsSinceEpoch;
result = sum / count;
cost = endTime - startTime;
Isolate.exit(port, TaskResult(result: result, cost: cost));
}


3. 通过接收端处理消息

接下来只要在 handleMessage 方法中处理发送端传递的消息即可,可以根据消息的类型判断是什么消息,比如这里如果是 double 表示是进度,通知 UI 更新进度值。另外,如果不同类型的消息非常多,也可以自己定义一套发送结果的规范方便处理。


void handleMessage(dynamic msg) {
print("=========$msg===============");
if (msg is TaskResult) {
progress = 1;
setState(() {
result = msg.result;
cost = msg.cost;
});
}
if (msg is double) {
setState(() {
progress = msg;
});
}
}


其实学会了如何通过 Isolate.spawn 处理计算耗时任务,以及通过 SendPort-RawReceivePort 处理 发送 - 监听 消息,就能满足绝大多数对 Isolate 的使用场景。如果不需要在任务执行过程中发送通知,使用 compute 函数会方便一些。最后还是要强调一点,不要滥用 Isolate ,使用前动动脑子,思考一下是否真的是计算耗时任务,是否真的需要在 Dart 端来完成。开一个 isolate 至少要消耗 30 kb


image.png


作者:张风捷特烈
链接:https://juejin.cn/post/7163431846783483912
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册