Flutter 的运行模型与多线程 Isolate
Posted on Wed, 25 Dec 2024 11:21:52 +0800 by LiangMingJian
Flutter 的运行模型与多线程 Isolate
Flutter 的运行模型
Flutter 的编写语言是 Dart,Dart 是一种单线程语言,意味着同一时刻程序只能执行一个操作,其他操作在这个操作完成后执行,只要这个操作还在执行,它就不会被其他 Dart 代码中断。依赖于 Dart 的 Flutter 也是如此。
void myBigLoop(){
for (int i = 0; i < 1000000; i++){
_doSomethingSynchronously();
}
}
在上述例子中,myBigLoop()
方法在执行完成前永远不会被中断,在整个方法执行期间应用将会被阻塞。
在这里需要关注的是 Dart 的代码序列器(事件循环)。
当你启动一个 Flutter 或 Dart 应用时,应用将创建并启动一个新的线程进程 Isolate,这个线程将是整个应用的主线程。
在主线程启动后,应用会在此初始化 2 个 FIFO(先进先出)队列,MicroTask
和 Event
队列,在上述操作执行完成后,才会执行main()
方法,并启动事件循环。
事件循环是一种由一个内部时钟控制的无限循环,在每个时钟周期内,如果没有其他 Dart 代码执行,则执行以下操作:
void eventLoop(){
while (microTaskQueue.isNotEmpty){
fetchFirstMicroTaskFromQueue();
executeThisMicroTask();
return;
}
if (eventQueue.isNotEmpty){
fetchFirstEventFromQueue();
executeThisEventRelatedCode();
}
}
可以注意到,这个操作的作用是从MicroTask
和 Event
队列提取出事件到循环中执行,直到两个队列中所有事件执行完成。
MicroTask
:用于非常简短且需要异步执行的内部动作,这些动作需要在其他事件完成之后并在将执行权送还给Event
队列之前运行。Event
:大多数需要使用异步的动作都使用Event
队列进行处理,如外部事件 I/O,绘图等。值得注意的是,Future 操作也通过Event
队列处理。
Future
Future 是一个异步执行并且在未来的某一个时刻完成(或失败)的任务。Future 并非并行执行,而是遵循事件循环处理事件的顺序规则执行。当你实例化一个 Future 时,应用会执行以下操作:
- 该 Future 的一个实例被创建并记录在由 Dart 管理的内部数组中;
- 需要由此 Future 执行的代码直接推送到 Event 队列中去;
- 该 Future 实例会返回一个状态(= incomplete);
- 如果存在下一个同步代码,执行它(非 Future 的执行代码);
只要事件循环从 Event 循环中获取它,被 Future 引用的代码将像其他任何 Event 一样执行。当该代码将被执行并将完成(或失败)时,then()
或 catchError()
方法将直接被触发。
void main(){
print('Before the Future');
Future((){
print('Running the Future');
}).then((_){
print('Future is complete');
});
print('After the Future');
}
上述代码执行输出
Before the Future
After the Future
Running the Future
Future is complete
应用的执行执行流程如下:
print("Before the Future")
- 将
(){print("Running the Future");}
添加到Event
队列; print("After the Future")
- 事件循环获取(在第二步引用的)代码并执行它
- 当代码执行时,它会查找
then()
语句并执行它
Async
当你使用 async 关键字作为方法声明的后缀时,Dart 会将其理解为:
- 该方法的返回值是一个 Future;
- 它同步执行该方法的代码直到第一个 await 关键字,然后它暂停该方法其他部分的执行;
- 一旦由 await 关键字引用的 Future 执行完成,下一行代码将立即执行。
这是 Dart 异步代码的同步实现。注意到的是,async 并非并行执行,也是遵循事件循环处理事件的顺序规则执行。
伪多线程 Isolate
Isolate 是 Dart 中的 线程。然而,它与常规「线程」的实现存在较大差异,Isolate 在 Flutter 中并不共享内存,不同 Isolate 之间通过消息进行通信,实际上 Isolate 更新进程。
每个 Isolate 都有自己的事件循环及队列,这意味着在一个 Isolate 中运行的代码与另外一个 Isolate 不会存在任何关联。
如何启动 Isolate
创建并握手
由于Isolate 不共享任何内存并通过消息进行交互,因此,我们需要在调用者与 isolate 间建立通信。每个 Isolate 都暴露一个将消息传递给 Isolate 的被称为 SendPort 的端口,调用者和 isolate 需要互相知道彼此的端口才能进行通信。需要注意的是约束 isolate 的入口必须是顶级函数或静态方法。
// 新的 isolate 端口
// 该端口将在未来使用
// 用来给 isolate 发送消息
SendPort newIsolateSendPort;
// 新 Isolate 实例
Isolate newIsolate;
// 启动一个新的 isolate
// 然后开始第一次握手
void callerCreateIsolate() async {
// 本地临时 ReceivePort
// 用于检索新的 isolate 的 SendPort
ReceivePort receivePort = ReceivePort();
// 初始化新的 isolate
newIsolate = await Isolate.spawn(
callbackFunction,
receivePort.sendPort,
);
// 检索要用于进一步通信的端口
newIsolateSendPort = await receivePort.first;
}
// 新 isolate 的入口
static void callbackFunction(SendPort callerSendPort){
// 一个 SendPort 实例,用来接收来自调用者的消息
ReceivePort newIsolateReceivePort = ReceivePort();
// 向调用者提供此 isolate 的 SendPort 引用
callerSendPort.send(newIsolateReceivePort.sendPort);
//
// 进一步流程
//
}
向 Isolate 提交消息
// 向新 isolate 发送消息并接收回复的方法
// 在该例中,我将使用字符串进行通信操作
// (发送和接收的数据)
Future<String> sendReceive(String messageToBeSent) async {
// 创建一个临时端口来接收回复
ReceivePort port = ReceivePort();
// 发送消息到 Isolate,并且
// 通知该 isolate 哪个端口是用来提供回复的
newIsolateSendPort.send(
CrossIsolatesMessage<String>(
sender: port.sendPort,
message: messageToBeSent,
)
);
// 等待回复并返回
return port.first;
}
// 扩展回调函数来处理接输入报文
static void callbackFunction(SendPort callerSendPort){
// 初始化一个 SendPort 来接收来自调用者的消息
ReceivePort newIsolateReceivePort = ReceivePort();
// 向调用者提供该 isolate 的 SendPort 引用
callerSendPort.send(newIsolateReceivePort.sendPort);
// 监听输入报文、处理并提供回复的
// Isolate 主程序
newIsolateReceivePort.listen((dynamic message){
CrossIsolatesMessage incomingMessage = message as CrossIsolatesMessage;
// 处理消息
String newMessage = "complemented string " + incomingMessage.message;
// 发送处理的结果
incomingMessage.sender.send(newMessage);
});
}
// 帮助类
class CrossIsolatesMessage<T> {
final SendPort sender;
final T message;
CrossIsolatesMessage({
@required this.sender,
this.message,
});
}
销毁这个新的 Isolate 实例
// 释放一个 isolate 的例程
void dispose(){
newIsolate?.kill(priority: Isolate.immediate);
newIsolate = null;
}
简单示例
//请求的目的端口
static SendPort server_TargetPort = null;
//客户端发起连接,拿到服务端的消息接收端口
void Connect() async
{
ReceivePort client_receivePort = ReceivePort();
//client_receivePort.sendPort 是指client_receivePort用于接收消息的端口
await Isolate.spawn(Server_onReceivedMsg, [client_receivePort.sendPort,"hello world"]);
//自行设计的约定,第一个消息为服务端消息接收端口
server_TargetPort = await client_receivePort.first;
}
//发送消息
void SendToServer(String msg)
{
ReceivePort port = ReceivePort();
server_TargetPort.send([port.sendPort,msg])
var resp = await port.first;
print(resp.toString());
}
//服务端接收消息
void Server_onReceivedMsg(List args) async
{
//第一个参数为客户端拥用于接收消息的端口
SendPort sendPort = args[0];
//第二个参数为"hello world"
print(arg[1].toString());
//创建服务端端口
ReceivePort server_receivePort = ReceivePort();
//把服务端接收消息的端口发送给客户端
sendPort.send(server_receivePort.sendPort);
//循环接收消息
await for (List data in server_receivePort)
{
SendPort replayTo = data[0];
String msg = data[1];
replayTo.send("success : " + msg);
}
}
void test()
{
Connect();
SendToServer("abc");
SendToServer("efg");
}