When there are multiple QTimers in the same QThread, why are the tasks associated with the QTimer executed serially?

75 views Asked by At

I'm trying to run multiple timers in one QThread, I expect each timer to be independent. But the final running result is not what I expected. All timers seem to be executed serially.

For example, in my code below, I created 4 timers in a QThread. I want them to start at the same time. In addition, since they obtain a public resource through a semaphore, after the timer that obtained the resource first is completed, the subsequent timer will Obtain resources and run them in turn. While other timers are running, if the previously completed timer reaches a new execution time, it blocks and waits for resources.

But the final result is that all timers run serially, that is, the following timer will only run after the previous timer completes. And the start time of the first timer in the second cycle is equal to the end time of the last timer task in the previous cycle plus the timeout time of the first timer.

I'm confused why this is happening. I guess the possible reason is that all QObjects in this thread are executed serially in a certain order. So I tried to put different timers in different sub-threads, and put these sub-threads in the current thread to achieve the effect of parallel running. But the problem is that this cannot ensure that all timer tasks have fair access to resources.

I would like to know if there is a way to get the best of both worlds, so that all timers can run according to their own set timeouts and have fair access to resources.

Below is my code:


class Worker(QObject):
    stopped = pyqtSignal()
    stop_signal = pyqtSignal()

    def __init__(self):
        super().__init__()
        self.semaphore = QSemaphore(1)
        self.runners = []
        self.threads = []

    def startTask(self):
        self.runners = []
        for i in range(1, 5):

            timer = Timer(f"runner_{i}", self)
            self.runners.append(timer)

        for timer in self.runners:
            timer.start_task()

    def stopTask(self):
        print("client stopTask is called.")
        for timer in self.runners:
            timer.stop_task()
        self.stopped.emit()


class Timer(QTimer):
    def __init__(self, name, client):
        super().__init__()
        self.name = name
        self.client = client
        self.timeout.connect(self.timer_timeout)

    def timer_timeout(self):
        logger.info(f"{self.name} start.")
        self.client.semaphore.acquire(1)
        time.sleep(1)
        logger.info(f"{self.name} stopped.")
        self.client.semaphore.release(1)

    def start_task(self):
        self.start(4000)

    def stop_task(self):
        self.stop()

class Widget(QWidget):
    stop_signal = pyqtSignal()

    def __init__(self):
        super().__init__()
        self._layout = QVBoxLayout(self)
        self.start_btn = QPushButton("start")
        self.stop_btn = QPushButton("stop")
        self._layout.addWidget(self.start_btn)
        self._layout.addWidget(self.stop_btn)
        self.start_btn.clicked.connect(self.start)
        self.stop_btn.clicked.connect(self.stop)

    def start(self):
        self._thread = QThread()
        self.worker = Worker()
        self.worker.moveToThread(self._thread)
        self._thread.started.connect(self.worker.startTask)

        self.stop_signal.connect(self.worker.stopTask)
        self.worker.stopped.connect(self._thread.quit)
        self.worker.stopped.connect(self.worker.deleteLater)

        self._thread.finished.connect(self._thread.deleteLater)
        self._thread.start()

    def stop(self):
        self.stop_signal.emit()

The following is the running result:

2023-09-18 23:03:11: runner_4 start.
2023-09-18 23:03:12: runner_4 stopped.
2023-09-18 23:03:12: runner_3 start.
2023-09-18 23:03:13: runner_3 stopped.
2023-09-18 23:03:13: runner_2 start.
2023-09-18 23:03:14: runner_2 stopped.
2023-09-18 23:03:14: runner_1 start.
2023-09-18 23:03:15: runner_1 stopped.
2023-09-18 23:03:19: runner_4 start.
2023-09-18 23:03:20: runner_4 stopped.
2023-09-18 23:03:20: runner_3 start.
2023-09-18 23:03:21: runner_3 stopped.

The result I expect is that in the first loop, all timers start at the same time. Therefore, all runners in the first round of loop should start at 2023-09-18 23:03:11.

Due to resource competition, some scheduled tasks are blocked, and the completion time of subsequent scheduled tasks in a single loop may be delayed. But the completed scheduled task should immediately enter the task and wait for available resources when it reaches the next running time. That is, the next start time of runner_4 should be 2023-09-18 23:03:15. Thanks.

1

There are 1 answers

3
Poison On

After thinking for a while, I put each timer in QThread and used QWaitCondition to wake up subsequent scheduled tasks. The wake-up rule is that the subsequent task is awakened by the previous task, and the first task is awakened by the last task.

The modified code is as follows:

class WaitCondition(QWaitCondition):
    def __init__(self):
        super().__init__()
        self._waked = 0
        self._waiting = 0

    def wait(
            self,
            lockedMutex: typing.Optional[QMutex],
            deadline: QDeadlineTimer = QDeadlineTimer(QDeadlineTimer.ForeverConstant.Forever)
    ) -> bool:
        if self._waked > 0:
            self._waked -= 1
            return True
        else:
            self._waiting += 1
            return super().wait(lockedMutex, deadline)

    def wakeOne(self) -> None:
        if self._waiting > 0:
            self._waiting -= 1
            return super().wakeOne()
        else:
            self._waked += 1


class Worker(QObject):
    stopped = pyqtSignal()
    stop_signal = pyqtSignal()

    def __init__(self):
        super().__init__()
        self.runner_counter = 0
        self.condition_mutex_dict: typing.Dict[int, typing.Tuple[QWaitCondition, QMutex]] = {
            0: (WaitCondition(), QMutex()),
        }
        self.runners: typing.Dict[int, Runner] = {}
        self.threads: typing.Dict[int, QThread] = {}

    def startTask(self):
        self.runners = {}
        self.runner_counter = 2
        for i in range(1, self.runner_counter+1):
            runner = Runner(
                self,
                f"runner_{i}",
                self.condition_mutex_dict[i-1][0],
                self.condition_mutex_dict[i-1][1],
                i,
                i**2
            )
            self.condition_mutex_dict[i] = (WaitCondition(), QMutex())
            thread = QThread()
            runner.moveToThread(thread)

            thread.started.connect(runner.run)
            self.stop_signal.connect(runner.stop)
            runner.stopped.connect(thread.quit)
            runner.stopped.connect(runner.deleteLater)
            thread.finished.connect(thread.deleteLater)

            self.runners[i] = runner
            self.threads[i] = thread
            
        for thread in self.threads.values():
            thread.start()
            
        # wake the first one
        self.condition_mutex_dict[0][0].wakeOne()
        ...

class Runner(QObject):
    stopped = pyqtSignal()

    def __init__(self, client: Worker, name, wait_condition: QWaitCondition, mutex: QMutex, counter: int, interval: int):
        super().__init__()
        self.client = client
        self.name = name
        self.mutex: QMutex = mutex
        self.wait_condition: QWaitCondition = wait_condition
        self.counter = counter
        self.interval = interval

    def timerEvent(self, a0: typing.Optional['QTimerEvent']) -> None:
        self.mutex.lock()
        logger.info(f"{self.name} start.")
        self.wait_condition.wait(self.mutex)
        time.sleep(1)
        logger.info(f"{self.name} stopped.")
        self.mutex.unlock()
        if self.counter == self.client.runner_counter:
            # wake the first one
            self.client.condition_mutex_dict[0][0].wakeOne()
        else:
            # wake the next one
            self.client.condition_mutex_dict[self.counter][0].wakeOne()

    def run(self):
        self.timer = self.startTimer(self.interval*1000)
    ...

If the execution intervals of all timers are not very different, the above modification can ensure that all timers can be executed fairly.

However, if the execution interval between timers is very different, for example, the execution interval of the previous task is much longer than the execution interval of the subsequent task, when the resource is idle, the subsequent task must also wait for the previous task to wake up.

This introduces another problem, that is, tasks with short execution intervals are actually blocked for a long time before being executed, which is obviously not what we want. I wonder if there is some algorithm that can solve this problem?