当前位置: 首页 > news >正文

多进程

import time
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from pydantic import BaseModel
import multiprocessing as mp
import os
import signalclass SharedData(BaseModel):value: int = 0def worker(shared_value, data_dict):# 真实场景中的任务,没有循环检查stop_eventfor i in range(30):data_dict['value'] = ishared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')time.sleep(1)return data_dict['value'] + shared_value.valuedef init_worker():"""设置子进程的信号处理"""signal.signal(signal.SIGTERM, lambda sig, frame: os._exit(0))if __name__ == '__main__':manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})with ProcessPoolExecutor(initializer=init_worker) as executor:future = executor.submit(worker, shared_value, data_dict)try:# 主进程监控5秒for count in range(5):print(f'\n第 {count + 1} 次检查 - 状态: {"运行中" if future.running() else "完成" if future.done() else "等待"}')print(f'主进程 shared_value:{shared_value.value} data_dict:{data_dict["value"]}')time.sleep(1)# 检查是否完成if not future.done():print("\n任务超时,强制终止子进程...")# 获取子进程PID并发送终止信号for pid, process in executor._processes.items():if process.is_alive():os.kill(pid, signal.SIGTERM)raise TimeoutError("任务超时")result = future.result()print(f"Result: {result}")except TimeoutError:print("子进程已被强制终止")future.cancel()finally:executor.shutdown(wait=True)print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")

  

 

 

 

import time
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from pydantic import BaseModel
import multiprocessing as mp
import signal
import osclass SharedData(BaseModel):value: int = 0def worker(shared_value, data_dict, stop_event):print('子进程PID', os.getpid())i = 0while i < 30 and not stop_event.is_set():# data_dict['value'] = i# shared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')i += 1time.sleep(1)return data_dict['value'] + shared_value.valueif __name__ == '__main__':print('主进程PID', os.getpid())manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})stop_event = manager.Event()with ProcessPoolExecutor() as executor:future = executor.submit(worker, shared_value, data_dict, stop_event)try:# 主进程监控5秒for _ in range(5):print("future.running() =>", future.running())print(f'主进程 shared_value:{shared_value.value} data_dict:{data_dict["value"]}')time.sleep(1)# 检查子进程是否完成result = future.result(timeout=0.1)print(f"Result: {result}")except TimeoutError:print("任务超时,通知子进程停止...")stop_event.set()  # 通知子进程优雅停止try:# 给子进程一些时间进行清理result = future.result(timeout=0.1)print(f"子进程已优雅停止,Result: {result}")except TimeoutError:print("子进程未及时响应,强制取消...")future.cancel()# 正常关闭执行器,等待所有进程完成executor.shutdown(wait=True)print("future.running() =>", future.running())print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")

  

 

 

import time
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from pydantic import BaseModel
import multiprocessing as mp
import os
import signalclass SharedData(BaseModel):value: int = 0# 全局变量存储executor
executor = Nonedef worker(shared_value, data_dict):# 真实场景中的任务,没有循环检查stop_eventfor i in range(30):# data_dict['value'] = i# shared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')time.sleep(1)return data_dict['value'] + shared_value.valuedef init_worker():"""设置子进程的信号处理"""signal.signal(signal.SIGTERM, lambda sig, frame: os._exit(0))def create_executor():"""创建并返回executor实例"""global executorif executor is None:executor = ProcessPoolExecutor(initializer=init_worker)return executordef shutdown_executor(wait=True):"""关闭executor"""global executorif executor is not None:executor.shutdown(wait=wait)executor = Nonedef submit_task(shared_value, data_dict):"""提交任务到executor"""global executorif executor is None:create_executor()return executor.submit(worker, shared_value, data_dict)def check_process_status(future):"""检查子进程运行状态"""try:# 检查future状态if future.done():if future.cancelled():print("子进程任务已被取消")return "cancelled"else:try:result = future.result(timeout=0)print(f"子进程已完成,结果: {result}")return "completed"except Exception as e:print(f"子进程执行出错: {e}")return "error"else:print("子进程仍在运行中...")return "running"except TimeoutError:print("检查状态超时")return "timeout"if __name__ == '__main__':manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})try:# 在程序不同地方使用executorcreate_executor()future = submit_task(shared_value, data_dict)# 主进程监控5秒for count in range(5):data_dict['value'] = countshared_value.value = count * 10time.sleep(1)# 在另一个地方检查状态status = check_process_status(future)# 如果需要终止if status == "running":print("\n任务超时,强制终止子进程...")# 获取子进程PID并发送终止信号for pid, process in executor._processes.items():if process.is_alive():os.kill(pid, signal.SIGTERM)raise TimeoutError("任务超时")result = future.result()print(f"Result: {result}")except TimeoutError:print("子进程已被强制终止")if 'future' in locals():future.cancel()finally:# 在程序结束前检查子进程状态if 'future' in locals():final_status = check_process_status(future)print(f"最终任务状态: {final_status}")# 关闭executorshutdown_executor(wait=True)print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")

  

http://www.sczhlp.com/news/110445/

相关文章:

  • 93. 递归实现组合型枚举
  • Sort方法学习(伪代码记录)
  • 深入解析:【每日一问】运算放大器与比较器有什么区别?
  • 9.17支配对问题专题总结
  • 医疗软件网站建设公司排名珠海网站建设珠海
  • 现在都用什么软件做网站百度电脑网页版入口
  • 网站设计怎么弄腾讯企点网页版
  • 考试类网站如何做广州平台公司
  • wordpress的站点地址如何配置有什么学做木工的网站吗
  • 潍坊在线网站建设电商运营的基本内容
  • 哪些网上可以赚钱的网站数据库2008做企业网站
  • 网站营销方案设计公司pageadmin官网
  • Xじゃないか
  • 开源收银体系_大型收银系统源码_OctShop
  • 个人网站首页布局设计华强北商城官网app
  • 已有备 网站新增网站网上注册公司申请入口
  • 石家庄网站建设人员wordpress用什么开发工具
  • 微网站背景图片好学校平台网站模板下载不了
  • 上海国际建设总承包公司网站Excel怎么做网站链接
  • 网站页面字体设置做公司的网站的需求有哪些内容
  • 常德投诉网站wordpress可以自定义模型吗
  • 网页模板网站wordpress 菜单没了
  • 建设工程网站什么时候可以同步qq空间注册申请
  • 直播型网站开发极客 pthyon 做网站
  • 记录知识
  • AT_agc058_b [AGC058B] Adjacent Chmax
  • Jenkins CVE-2018-1000600漏洞利用与SSRF攻击分析
  • 唐山网站建设方案策划网站建设哪些好
  • 状元村建设官方网站江苏住房城乡建设网站
  • 便宜做网站公司网站建设方案保障措施