2022年 11月 8日

python怎么完成工作流_在python中实现此工作流的最佳方法

我想用Python实现以下工作流。在这个例子中,我有5个进程在运行平行。一过程被定义为经理,而其他人都是工人。经理以循环方式多次执行控制例行程序,直到所有工人停止。在

关于这个问题的主要观点是,每个工人从一个不同的工作清单开始。然而,当他们在处理他们的工作时,他们会将更多的工作添加到自己的列表中,并且在某些情况下,重复工作的情况会发生在工人身上。经理的职责是避免重复工作。在

e8b46cd04c484a859e12c7165d0e68ab.png

我目前正在尝试使用Python多处理和管道来实现这一点。下面是我的代码,用于对通信进行测试:import multiprocessing as mp

import time

import random

def checkManager(i, data_pipe, com_pipe, work_list):

com_out, com_in = com_pipe

if com_out.poll(): ##check if there is anything to read

msg = com_out.recv()

print i, ” received message “, msg

if msg == “SEND”:

data_out, data_in = data_pipe

data_in.send(work_list)

work_list = com_out.recv()

return work_list

def myfunc(i, data_pipe, com_pipe, work_list):

print “starting worker “, i, ” with list: “, work_list

while work_list != []:

time.sleep(3) ##sleep just to simulate some work delay

work_list = checkManager(i, data_pipe, com_pipe, work_list) ##check if manager wants to comunicate

print “stopping worker “, i

print “Starting…”

data_pipe = mp.Pipe() ##pipe to receive work lists from all workers

pipes = [] ##comunication pipe for each worker

workers = []

##spawn workers

for i in range(0,4):

pipe = mp.Pipe() ##pipe for comunication for that process

r = random.randint(10, 100) ##create random list just to test

p = mp.Process(target=myfunc,args=(i, data_pipe, pipe, range(r))) ##create process

pipes.append(pipe)

workers.append(p)

p.start()

index = 0

stopped_workers = []

data_out, data_in = data_pipe

while len(stopped_workers) != len(workers):

time.sleep(2)

for i in range(0, len(workers)):

if i in stopped_workers: ##check if wworker has already stopepd

continue

r = random.randint(0,100) ##just to avoid send the request all the times..

if r > 80:

print “Comunication with “, i

output, input = pipes[i]

input.send(“SEND”) ## send message

work_list = data_out.recv() #block untill receive data

print “data: “, work_list

input.send([]) ##send an empty list just to test if it stops

stopped_workers.append(i) ##add to the workers that already stopped

print “Stoping main”

在这个简单的测试中一切都很好,但是我希望这是尽可能有效的,并且有一些东西我不喜欢在我的代码。在

首先,我认为如果我有一个机制,可以向工作进程发送一个信号,而不是让它们不时检查某个函数,那么效率会更高。我试着用信号,但一直没能正常工作。

除此之外,我创建的管道数量与进程数量相同,我不确定这是否是最佳解决方案。我用多处理.池,但这似乎不是解决我问题的好办法。在

最后,使用python MPI库实现所有功能会更好吗?在

提前谢谢