Priorise a Luigi workflow compare to anther workflow

26 views Asked by At

I have two Luigi workflows, wfA and wfB. I set priority for each task of both Luigi workflow and all wfA task have a higher priority of wfB task. If I run both workflow simultaneously with a --workers=1, each workflow will have one active task. If I choose --workers=3 each workflow will have three active tasks but both workflows still independent, the priority field effect is limit to one workflow.

What I like to do is to add a global workers value (for all workflow) and to schedule tasks with highest priority (looking in all workflow).
So, in my example, if I set a global workers limit to 3 then run both workflow wfA and wfB, I want all task of wfA to be schedule (by group of 3 max) then all taks of wfB to be schedule because tasks of wfA have an higher priority compared to wfB tasks

Here is an example of wfA and wfB.

wfA:

import time
import luigi


class TaskA1(luigi.Task):
    priority = 110

    def output(self):
        return luigi.LocalTarget("output1.txt")

    def run(self):
        with self.output().open('w') as f:
            f.write("This is some dummy data")
        time.sleep(8)


class TaskA2(luigi.Task):
    priority = 111

    def output(self):
        return luigi.LocalTarget("output2.txt")

    def run(self):
        with self.output().open('w') as f:
            f.write("This is some dummy data")
        time.sleep(8)


class TaskA3(luigi.Task):
    priority = 112

    def output(self):
        return luigi.LocalTarget("output3.txt")

    def run(self):
        with self.output().open('w') as f:
            f.write("This is some dummy data")
        time.sleep(8)


class TaskA4(luigi.Task):
    priority = 113

    def requires(self):
        return [TaskA1(), TaskA2(), TaskA3()]

    def output(self):
        return luigi.LocalTarget("output4.txt")

    def run(self):
        with self.output().open('w') as f:
            f.write("This is some dummy data")
        time.sleep(8)


if __name__ == "__main__":
    luigi.build([TaskA4()], workers=3)

wfB:

import time
import luigi


class TaskB1(luigi.Task):
    priority = 10

    def output(self):
        return luigi.LocalTarget("output1.txt")

    def run(self):
        with self.output().open('w') as f:
            f.write("This is some dummy data")
        time.sleep(8)


class TaskB2(luigi.Task):
    priority = 11

    def output(self):
        return luigi.LocalTarget("output2.txt")

    def run(self):
        with self.output().open('w') as f:
            f.write("This is some dummy data")
        time.sleep(8)


class TaskB3(luigi.Task):
    priority = 12

    def output(self):
        return luigi.LocalTarget("output3.txt")

    def run(self):
        with self.output().open('w') as f:
            f.write("This is some dummy data")
        time.sleep(8)


class TaskB4(luigi.Task):
    priority = 13

    def requires(self):
        return [TaskA1(), TaskA2(), TaskA3()]

    def output(self):
        return luigi.LocalTarget("output4.txt")

    def run(self):
        with self.output().open('w') as f:
            f.write("This is some dummy data")
        time.sleep(8)


if __name__ == "__main__":
    luigi.build([TaskB4()], workers=3)

I want to execute both works simentaniously and the scheduler to schedule wfA tasks then wfB tasks.

0

There are 0 answers