You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a pending_message_monitor to process message in PEL, like this
def consume_and_ack_messages(self, group: GroupInfo):
message_processor = TaskDispatchMessageProcessor(
stream=group.stream_name,
group_name=group.group_name,
consumer=self.consumer,
name_suffix=group.name_suffix,
message_processor_role='main_message_processor',
)
logging.info('%s start consume from %s', group.group_name, group.stream_name)
response = self.consumer.read_group(
group_name=group.group_name,
consumer_name='consumer',
stream=group.stream_name,
block=10,
count=BATCH_CONSUME_COUNT,
)
if not response:
return
for _, message_list in response:
try:
message_processor.process_message_list(message_list)
except Exception as e:
logging.error('Fail to consume message %s with: %s', str(message_list), str(e))
def start(self):
self.run()
def start_pending_message_monitor(self):
if self.pending_message_monitor_thread is None:
self.pending_message_monitor_thread = threading.Thread(
target=self.pending_messages_monitor_thread_impl,
daemon=True,
)
self.pending_message_monitor_thread.start()
else:
logging.warning('pending message monitor thread is already running')
def run(self):
self._create_consumer_group()
self.start_pending_message_monitor()
while True:
for group in self.master_groups:
self.consume_and_ack_messages(group)
def monitor_pending_messages(self):
pending_info = self.consumer.xpending(self.stream, self.group_name)
pending_messages = self.consumer.xpending_range(
stream=self.stream,
group_name=self.group_name,
min=pending_info['min'],
max=pending_info['max'],
count=pending_count,
idle=60000, # 60s
)
for msg in pending_messages:
msg_id = msg['message_id']
if pending_message_list := self.consumer.xrange(
self.stream, start_id=msg_id, end_id=msg_id, count=1
):
self.message_processor.process_message_list(pending_message_list)
continue
The type of message_id from xpending_range is string,but the type of message_id from xread_group in my main consumer is bytes. when i try to ack those two two different types of messages, the string one can be ack correctly,but bytes one xack return 0. pending_message_monitor and main consumer use the same redis client.
But if we force uniform type, like this, xack will not return 0 any more.
Hi @tianming123, can you please share more info on the client initialization and lib version? I tried to reproduce your issue with sync Redis standalone client and the code that is currently in master and everything seems to be working as expected - both commands return the same type of output - when client is set not to decode responses, both ids are of type bytes and xack processes them without issues.
I have a pending_message_monitor to process message in PEL, like this
The type of message_id from xpending_range is string,but the type of message_id from xread_group in my main consumer is bytes. when i try to ack those two two different types of messages, the string one can be ack correctly,but bytes one xack return 0. pending_message_monitor and main consumer use the same redis client.
But if we force uniform type, like this, xack will not return 0 any more.
Can anyone give me some suggestions,thanks
The text was updated successfully, but these errors were encountered: