Skip to content

Commit 6355a7e

Browse files
committed
add new files
1 parent 81de2a9 commit 6355a7e

File tree

1 file changed

+222
-0
lines changed

1 file changed

+222
-0
lines changed

surround_view/buffer.py

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
from PyQt5.QtCore import QSemaphore, QMutex, QMutexLocker, QWaitCondition
2+
from queue import Queue
3+
import numpy as np
4+
from .param_settings import project_shapes
5+
6+
7+
class Buffer(object):
8+
9+
def __init__(self, buffer_size=5):
10+
self.buffer_size = buffer_size
11+
self.free_slots = QSemaphore(self.buffer_size)
12+
self.used_slots = QSemaphore(0)
13+
self.clear_buffer_add = QSemaphore(1)
14+
self.clear_buffer_get = QSemaphore(1)
15+
self.queue_mutex = QMutex()
16+
self.queue = Queue(self.buffer_size)
17+
18+
def add(self, data, drop_if_full=False):
19+
self.clear_buffer_add.acquire()
20+
if drop_if_full:
21+
if self.free_slots.tryAcquire():
22+
self.queue_mutex.lock()
23+
self.queue.put(data)
24+
self.queue_mutex.unlock()
25+
self.used_slots.release()
26+
else:
27+
self.free_slots.acquire()
28+
self.queue_mutex.lock()
29+
self.queue.put(data)
30+
self.queue_mutex.unlock()
31+
self.used_slots.release()
32+
33+
self.clear_buffer_add.release()
34+
35+
def get(self):
36+
# acquire semaphores
37+
self.clear_buffer_get.acquire()
38+
self.used_slots.acquire()
39+
self.queue_mutex.lock()
40+
data = self.queue.get()
41+
self.queue_mutex.unlock()
42+
# release semaphores
43+
self.free_slots.release()
44+
self.clear_buffer_get.release()
45+
# return item to caller
46+
return data
47+
48+
def clear(self):
49+
# check if buffer contains items
50+
if self.queue.qsize() > 0:
51+
# stop adding items to buffer (will return false if an item is currently being added to the buffer)
52+
if self.clear_buffer_add.tryAcquire():
53+
# stop taking items from buffer (will return false if an item is currently being taken from the buffer)
54+
if self.clear_buffer_get.tryAcquire():
55+
# release all remaining slots in queue
56+
self.free_slots.release(self.queue.qsize())
57+
# acquire all queue slots
58+
self.free_slots.acquire(self.buffer_size)
59+
# reset used_slots to zero
60+
self.used_slots.acquire(self.queue.qsize())
61+
# clear buffer
62+
for _ in range(self.queue.qsize()):
63+
self.queue.get()
64+
# release all slots
65+
self.free_slots.release(self.buffer_size)
66+
# allow get method to resume
67+
self.clear_buffer_get.release()
68+
else:
69+
return False
70+
# allow add method to resume
71+
self.clear_buffer_add.release()
72+
return True
73+
else:
74+
return False
75+
else:
76+
return False
77+
78+
def size(self):
79+
return self.queue.qsize()
80+
81+
def maxsize(self):
82+
return self.buffer_size
83+
84+
def isfull(self):
85+
return self.queue.qsize() == self.buffer_size
86+
87+
def isempty(self):
88+
return self.queue.qsize() == 0
89+
90+
91+
class MultiBufferManager(object):
92+
93+
"""
94+
Class for synchronizing capture threads from different cameras.
95+
"""
96+
97+
def __init__(self, do_sync=True):
98+
self.sync_devices = set()
99+
self.do_sync = do_sync
100+
self.wc = QWaitCondition()
101+
self.mutex = QMutex()
102+
self.arrived = 0
103+
self.buffer_maps = dict()
104+
105+
def bind_thread(self, thread, buffer_size, sync=True):
106+
self.create_buffer_for_device(thread.device_id, buffer_size, sync)
107+
thread.buffer_manager = self
108+
109+
def create_buffer_for_device(self, device_id, buffer_size, sync=True):
110+
if sync:
111+
with QMutexLocker(self.mutex):
112+
self.sync_devices.add(device_id)
113+
114+
self.buffer_maps[device_id] = Buffer(buffer_size)
115+
116+
def get_device(self, device_id):
117+
return self.buffer_maps[device_id]
118+
119+
def remove_device(self, device_id):
120+
self.buffer_maps.pop(device_id)
121+
with QMutexLocker(self.mutex):
122+
if device_id in self.sync_devices:
123+
self.sync_devices.remove(device_id)
124+
self.wc.wakeAll()
125+
126+
def sync(self, device_id):
127+
# only perform sync if enabled for specified device/stream
128+
self.mutex.lock()
129+
if device_id in self.sync_devices:
130+
# increment arrived count
131+
self.arrived += 1
132+
# we are the last to arrive: wake all waiting threads
133+
if self.do_sync and self.arrived == len(self.sync_devices):
134+
self.wc.wakeAll()
135+
# still waiting for other streams to arrive: wait
136+
else:
137+
self.wc.wait(self.mutex)
138+
# decrement arrived count
139+
self.arrived -= 1
140+
self.mutex.unlock()
141+
142+
def wake_all(self):
143+
with QMutexLocker(self.mutex):
144+
self.wc.wakeAll()
145+
146+
def set_sync(self, enable):
147+
self.do_sync = enable
148+
149+
def sync_enabled(self):
150+
return self.do_sync
151+
152+
def sync_enabled_for_device(self, device_id):
153+
return device_id in self.sync_devices
154+
155+
def __contains__(self, device_id):
156+
return device_id in self.buffer_maps
157+
158+
def __str__(self):
159+
return (self.__class__.__name__ + ":\n" + \
160+
"sync: {}\n".format(self.do_sync) + \
161+
"devices: {}\n".format(tuple(self.buffer_maps.keys())) + \
162+
"sync enabled devices: {}".format(self.sync_devices))
163+
164+
165+
class ProjectedImageBuffer(object):
166+
167+
"""
168+
Class for synchronizing processing threads from different cameras.
169+
"""
170+
171+
def __init__(self, drop_if_full=True, buffer_size=8):
172+
self.sync_devices = set()
173+
self.buffer = Buffer(buffer_size)
174+
self.drop_if_full = drop_if_full
175+
self.wc = QWaitCondition()
176+
self.mutex = QMutex()
177+
self.arrived = 0
178+
self.current_frames = dict()
179+
180+
def bind_thread(self, thread):
181+
with QMutexLocker(self.mutex):
182+
self.sync_devices.add(thread.device_id)
183+
184+
shape = project_shapes[thread.camera_model.camera_name]
185+
self.current_frames[thread.device_id] = np.zeros(shape[::-1] + (3,), np.uint8)
186+
thread.proc_buffer_manager = self
187+
188+
def get(self):
189+
return self.buffer.get()
190+
191+
def set_frame_for_device(self, device_id, frame):
192+
if device_id not in self.sync_devices:
193+
raise ValueError("Device not held by the buffer: {}".format(device_id))
194+
self.current_frames[device_id] = frame
195+
196+
def sync(self, device_id):
197+
# only perform sync if enabled for specified device/stream
198+
self.mutex.lock()
199+
if device_id in self.sync_devices:
200+
# increment arrived count
201+
self.arrived += 1
202+
# we are the last to arrive: wake all waiting threads
203+
if self.arrived == len(self.sync_devices):
204+
self.buffer.add(self.current_frames, self.drop_if_full)
205+
self.wc.wakeAll()
206+
# still waiting for other streams to arrive: wait
207+
else:
208+
self.wc.wait(self.mutex)
209+
# decrement arrived count
210+
self.arrived -= 1
211+
self.mutex.unlock()
212+
213+
def wake_all(self):
214+
with QMutexLocker(self.mutex):
215+
self.wc.wakeAll()
216+
217+
def __contains__(self, device_id):
218+
return device_id in self.sync_devices
219+
220+
def __str__(self):
221+
return (self.__class__.__name__ + ":\n" + \
222+
"devices: {}\n".format(self.sync_devices))

0 commit comments

Comments
 (0)