最新的网站搭建工具,建立网站的目的,做dnf辅助网站,自己做的网站不满屏并发模型 并发模型是用来实现不同应用场景中并发任务的编程模型#xff0c;常见的并发模型分为基于内存共享的并发模型和基于消息通信的并发模型。 Actor并发模型作为基于消息通信并发模型的典型代表#xff0c;不需要开发者去面对锁带来的一系列复杂偶发的问题#xff0c;同… 并发模型 并发模型是用来实现不同应用场景中并发任务的编程模型常见的并发模型分为基于内存共享的并发模型和基于消息通信的并发模型。 Actor并发模型作为基于消息通信并发模型的典型代表不需要开发者去面对锁带来的一系列复杂偶发的问题同时并发度也相对较高因此得到了广泛的支持和使用。 当前鸿蒙ArkTS提供了TaskPool和Worker两种并发能力TaskPool和Worker都基于Actor并发模型实现。 内存共享并发模型指多线程同时执行任务这些线程依赖同一内存并且都有权限访问线程访问内存前需要抢占并锁定内存的使用权没有抢占到内存的线程需要等待其他线程释放使用权再执行。 Actor并发模型每一个线程都是一个独立Actor每个Actor有自己独立的内存Actor之间通过消息传递机制触发对方Actor的行为不同Actor之间不能直接访问对方的内存空间。Actor并发模型对比内存共享并发模型的优势在于不同线程间内存隔离不会产生不同线程竞争同一内存资源的问题。开发者不需要考虑对内存上锁导致的一系列功能、性能问题提升了开发效率。 由于Actor并发模型线程之间不共享内存需要通过线程间通信机制传输并发任务和任务结果。
TaskPool简介 任务池TaskPool作用是为应用程序提供一个多线程的运行环境降低整体资源的消耗、提高系统的整体性能且您无需关心线程实例的生命周期。 TaskPool支持开发者在宿主线程封装任务抛给任务队列系统选择合适的工作线程进行任务的分发及执行再将结果返回给宿主线程。接口直观易用支持任务的执行、取消以及指定优先级的能力同时通过系统统一线程管理结合动态调度及负载均衡算法可以节约系统资源。系统默认会启动一个任务工作线程当任务较多时会扩容工作线程数量上限跟当前设备的物理核数相关具体数量内部管理保证最优的调度及执行效率长时间没有任务分发时会缩容减少工作线程数量。
TaskPool注意事项 实现任务的函数需要使用Concurrent装饰器标注且仅支持在.ets文件中使用。 从API version 11开始跨并发实例传递带方法的实例对象时该类必须使用装饰器Sendable装饰器标注且仅支持在.ets文件中使用。 任务函数在TaskPool工作线程的执行耗时不能超过3分钟不包含Promise和async/await异步调用的耗时例如网络下载、文件读写等I/O任务的耗时否则会被强制退出。 实现任务的函数入参需满足序列化支持的类型详情请参见线程间通信对象。 ArrayBuffer参数在TaskPool中默认转移需要设置转移列表的话可通过接口setTransferList()设置。 由于不同线程中上下文对象是不同的因此TaskPool工作线程只能使用线程安全的库例如UI相关的非线程安全库不能使用。 序列化传输的数据量大小限制为16MB。 Priority的IDLE优先级是用来标记需要在后台运行的耗时任务例如数据同步、备份它的优先级别是最低的。这种优先级标记的任务只会在所有线程都空闲的情况下触发执行并且只会占用一个线程来执行。 Promise不支持跨线程传递如果TaskPool返回pending或rejected状态的Promise会返回失败对于fulfilled状态的PromiseTaskPool会解析返回的结果如果结果可以跨线程传递则返回成功。 不支持在TaskPool工作线程中使用AppStorage。
TaskPool应用实例 生产者消费者模型应用taskPool的具体代码实现
1.生产者 import { taskpool } from kit.ArkTS;
import { stingToUint8, uint8TransformString } from ./utils;Concurrent
export async function producer(ArrayBuffer: Int32Array, dataBuffer: Uint8Array, newStr: string) {let i32a ArrayBuffer;let array dataBufferif (array[array.length-1] ! 0) {taskpool.Task.sendData(false)let runner new taskpool.SequenceRunner()console.log(-----atomics-producer-push-fal- newStr)return}let jsonStr: string uint8TransformString(array)let arr: string[] []try {arr JSON.parse(jsonStr) as string[]} catch (e) {taskpool.Task.sendData(false)return}arr.push(newStr)let newArrJson JSON.stringify(arr) ?? //console.log(newArrJson newArrJson)let isFinish stingToUint8(newArrJson,array,4)if (!isFinish) {arr.pop()let newArrJson1 JSON.stringify(arr) ?? stingToUint8(newArrJson1,array,4)taskpool.Task.sendData(false)console.log(-----atomics-producer-push-fal- newStr)}else{console.log(-----atomics-producer-push-sec- newStr)}Atomics.notify(i32a, 0, 1)Promise.resolve()
}2.消费者 import { getStringArrayFromJson, testMethod, uint8TransformString} from ./utils;
import { buffer, taskpool } from kit.ArkTS;
import { ThreadUtils } from ./ThreadUtils;Concurrent
export async function consumerTask(ArrayBuffer: Int32Array, dataBuffer: Uint8Array): Promisevoid {let i32a ArrayBuffer;let array dataBufferwhile (true) {let jsonStr: string uint8TransformString(array)let arr getStringArrayFromJson(jsonStr)if (arr.length 0) {Atomics.wait(i32a, 0, 0);} else {let i 4for (let index 0; index array.byteLength; index) {if (i array.byteLength) {break}Atomics.store(array, i, 0)}taskpool.Task.sendData(true)let writeResult: boolean truewhile ((writeResult true || writeResult false)) {let ele arr.shift()if (!ele) {break}writeResult await ThreadUtils.getInstance().writeToFile(ele)console.log(-----atomics-consumer- ele)}}}
}
3.字符串和字节码相互转换工具
export function testMethod(str: string) {console.log(--test-function-str- str)
}
export function uint8TransformString(array:Uint8Array): string{let jsonStr: string JSON.stringify([])let tempArr: number[] []let j 0for (let index 0; index array.length; index) {if (array[index] 0) {continue}tempArr[j] array[index]}let temp new Uint8Array(tempArr)if (temp.byteLength 0) {let str ;for (let i 0; i temp.length; ) {let byte1 temp[i];let codePoint: numberif (byte1 7 0) { // 1字节codePoint byte1;i 1;} else if (byte1 5 0b110) { // 2字节codePoint ((byte1 0b11111) 6) | (temp[i 1] 0b111111);i 2;} else if (byte1 4 0b1110) { // 3字节codePoint ((byte1 0b1111) 12) | ((temp[i 1] 0b111111) 6) | (temp[i 2] 0b111111);i 3;} else {// 错误处理不支持的字节序列i 1; // 跳过当前字节continue;}str String.fromCodePoint(codePoint)console.info(字节流转成可理解的字符串: str);}jsonStr str}return jsonStr
}
//
export function stingToUint8(json: string, array:Uint8Array,formIndex: number 0) : boolean{let i formIndexlet isFinish truefor (let index 0; index json.length; index) {if (i array.byteLength) {if (index json.length - 1) {isFinish false}break}const element json.charCodeAt(index);if (element 0x7FF) {Atomics.store(array, i, (0xE0 | (element 12)))Atomics.store(array, i, (0x80 | ((element 6) 0x3F)))Atomics.store(array, i, (0x80 | (element 0x3F)))} else if (element 0x7F) {Atomics.store(array, i, (0xC0 | (element 6)))Atomics.store(array, i, (0x80 | (element 0x3F)))} else {Atomics.store(array, i, (element))}}//剩余空间赋值0for (let index i; index array.length; index) {array[index] 0}return isFinish
}
4.单例工具
import { taskpool } from kit.ArkTS;
import { it } from ohos/hypium;
import { consumerTask } from ./consumer;
import { producer } from ./product;export class ThreadUtils {private tempLogList: Arraystring new Array()private static instance: ThreadUtilsprivate sab :SharedArrayBufferprivate ui8 :Uint8Arrayprivate i32a :Int32Arrayprivate constructor(bufferSize:number 1024) {this.sab new SharedArrayBuffer(bufferSize)this.ui8 new Uint8Array(this.sab)this.i32a new Int32Array(this.sab)this.startConsumer()};writeLog(log: string) {if (this.flag) {this.tempLogList.push(log)}else {this.product(log)}}public static getInstance(bufferSize:number 1024): ThreadUtils {if (!ThreadUtils.instance) {ThreadUtils.instance new ThreadUtils(bufferSize);}return ThreadUtils.instance;}async writeToFile(content: string): Promiseboolean {return new Promise((resolve, reject) {setTimeout(() {console.log(日志写入完成 content)console.log(pop element content)resolve(true)}, 4000)})}lastTask:taskpool.Task | undefinedflag falseasync product(log: string):Promiseboolean {return new Promiseboolean((resolve,reject){let newLog loglet task new taskpool.Task(producer, this.i32a, this.ui8, newLog)if (this.lastTask) {task.addDependency(this.lastTask)}this.lastTask tasktask.onReceiveData((success: boolean) {if (!success) {this.flag truethis.tempLogList.unshift(log)resolve(false)}})taskpool.execute(task).then((){console.log(------taskpool.execute.then-----)resolve(true)});})}isWhile falseasync startConsumer() {let task new taskpool.Task(consumerTask, this.i32a, this.ui8)task.onReceiveData(async (hasSpace: boolean) {if (hasSpace) {this.flag falseif (this.tempLogList.length 0 this.isWhile false){let item this.tempLogList.shift()console.log(---item--- item)this.isWhile truelet com truewhile (item this.flag false com){com await this.product(item)item this.tempLogList.shift()}this.isWhile false}}})taskpool.execute(task)}}
5.页面UI应用
import { buffer, taskpool } from kit.ArkTS;
import { consumerTask } from ../consumer;
import { producer } from ../product;
import { router } from kit.ArkUI;
import { ThreadUtils } from ../ThreadUtils;Entry
Component
struct Index {timer -1count 0logTool ThreadUtils.getInstance(32)aboutToAppear(): void {}State inputText:string build() {Column({space: 20}) {TextInput({text: $$this.inputText}).width(80%)Button() {Text(生产日志).padding(10)}.backgroundColor(Color.Gray).onClick(async () {this.timer setInterval((){this.logTool.writeLog (item this.count)this.count 1},1000)})Button() {Text(停止生产).padding(10)}.backgroundColor(Color.Gray).onClick(async () {clearInterval(this.timer)// router.pushUrl({// url: pages/TaskPoolPage// })})}.alignItems(HorizontalAlign.Center).justifyContent(FlexAlign.Center).height(100%).width(100%)}
}