Efficiently Handling Full Queues in Python for Real-Time Frame Processing

43 views Asked by At

I am working on a process that captures and processes frames from two sensors. Subsequently, these frames are shared with the main process by enqueuing them. I am not entirely certain if this method is the most efficient for sharing and processing frames. However, I require a solution that is extremely fast, ideally taking no more than 50 ms in total. The system almost functions as intended, but I encounter a significant issue when dequeuing a full queue: the get request blocks for approximately 5000 ms.

I have attempted various approaches to dequeue the queue when it's full, including using .get_nowait(), .get(timeout=0.001), and .get(False, timeout=0.001). Unfortunately, none of these methods have resolved the issue. My goal is to efficiently dequeue the queue upon reaching its capacity. Any insights or assistance on this matter would be greatly appreciated.

from torch.multiprocessing import Queue, Value, set_start_method

#manager = Manager()
set_start_method('spawn') 
queue_raw_stream_1 = Queue(maxsize=20)
queue_raw_stream_2 = Queue(maxsize=20)
stop_flag = Value('b', False)


frames = dict()
cnt = 0
if self.isStereo:
    while not self.stopGrabbing.value:
        if self.m.has_completed_imageSet():
            ls = time()
            imageSet = self.m.get_imageSet()
            
            _pointcloud = imageSet[0].buffer.payload.components[0].data.reshape(-1, 3), imageSet[1].buffer.payload.components[0].data.reshape(-1, 3)
            _intensity  = imageSet[0].buffer.payload.components[1].data.reshape(-1, 3), imageSet[1].buffer.payload.components[1].data.reshape(-1, 3)
            _confidence = imageSet[0].buffer.payload.components[2].data.reshape(-1, 3), imageSet[1].buffer.payload.components[2].data.reshape(-1, 3)
            _2d = imageSet[2].buffer.payload.components[0].data.reshape(1024, 1280), imageSet[3].buffer.payload.components[0].data.reshape(1024, 1280)
            frames["timestamps"] = [imset.timestamp for imset in imageSet]
            frames["2D"] = cv2.cvtColor(_2d[0], cv2.COLOR_BayerBG2BGR), cv2.cvtColor(_2d[1], cv2.COLOR_BayerBG2BGR)
            frames["3D"] = _pointcloud
            frames["conf"] = _confidence
            frames["intensity"] = _intensity
            frames["id"] = cnt
            ls_1 = time()
            print(f"FIRST PART Duration {(ls_1 - ls)*1000} ms")
            if self.queue_raw_stream_1.full():
                self.queue_raw_stream_1.get(block=False, timeout=0.001) 
            
            if self.queue_raw_stream_2.full():
                self.queue_raw_stream_2.get(block=False, timeout=0.001) 
            print(f"REMOVE PART Duration {(time() - ls_1)*1000} ms")
            ls_2 = time()
            self.queue_raw_stream_1.put(frames)
            self.queue_raw_stream_2.put(frames)
            print(f"QUEUING PART Duration {(time() - ls_2)*1000} ms")
            imageSet[0].buffer.queue()
            imageSet[1].buffer.queue()
            imageSet[2].buffer.queue()
            imageSet[3].buffer.queue()
            print(f"FULL Duration {(time() - ls)*1000} ms")
            cnt += 1

Output

FIRST PART Duration 48.102617263793945 ms
REMOVE PART Duration 5483.046054840088 ms
QUEUING PART Duration 0.02384185791015625 ms
FULL Duration 5531.17251 ms
0

There are 0 answers