Skip to content

Why py-redis xack can not process message_id with type bytes and string simultaneously #3638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
tianming123 opened this issue May 7, 2025 · 1 comment

Comments

@tianming123
Copy link

tianming123 commented May 7, 2025

I have a pending_message_monitor to process message in PEL, like this

  def consume_and_ack_messages(self, group: GroupInfo):
          message_processor = TaskDispatchMessageProcessor(
              stream=group.stream_name,
              group_name=group.group_name,
              consumer=self.consumer,
              name_suffix=group.name_suffix,
              message_processor_role='main_message_processor',
          )
  
          logging.info('%s start consume from %s', group.group_name, group.stream_name)
          response = self.consumer.read_group(
              group_name=group.group_name,
              consumer_name='consumer',
              stream=group.stream_name,
              block=10,
              count=BATCH_CONSUME_COUNT,
          )
  
          if not response:
              return
          for _, message_list in response:
              try:
                  message_processor.process_message_list(message_list)
              except Exception as e:
                  logging.error('Fail to consume message %s with: %s', str(message_list), str(e))

    def start(self):
        self.run()

    def start_pending_message_monitor(self):
        if self.pending_message_monitor_thread is None:
            self.pending_message_monitor_thread = threading.Thread(
                target=self.pending_messages_monitor_thread_impl,
                daemon=True,
            )
            self.pending_message_monitor_thread.start()
        else:
            logging.warning('pending message monitor thread is already running')

    def run(self):
        self._create_consumer_group()
        self.start_pending_message_monitor()
        while True:
            for group in self.master_groups:
                self.consume_and_ack_messages(group)

    def monitor_pending_messages(self):
         pending_info = self.consumer.xpending(self.stream, self.group_name)
 
        pending_messages = self.consumer.xpending_range(
            stream=self.stream,
            group_name=self.group_name,
            min=pending_info['min'],
            max=pending_info['max'],
            count=pending_count,
            idle=60000,  # 60s
        )
        for msg in pending_messages:
            msg_id = msg['message_id']
            if pending_message_list := self.consumer.xrange(
                self.stream, start_id=msg_id, end_id=msg_id, count=1
            ):
                self.message_processor.process_message_list(pending_message_list)
                continue

The type of message_id from xpending_range is string,but the type of message_id from xread_group in my main consumer is bytes. when i try to ack those two two different types of messages, the string one can be ack correctly,but bytes one xack return 0. pending_message_monitor and main consumer use the same redis client.

But if we force uniform type, like this, xack will not return 0 any more.

  message_id_str = (
                  message_id.decode('utf-8') if isinstance(message_id, bytes) else message_id
              )
    ack_count = self.consumer.ack(
        stream=self.stream,
        group_name=self.group_name,
        stream_id=message_id_str,
    )

Can anyone give me some suggestions,thanks

@petyaslavova
Copy link
Collaborator

Hi @tianming123, can you please share more info on the client initialization and lib version? I tried to reproduce your issue with sync Redis standalone client and the code that is currently in master and everything seems to be working as expected - both commands return the same type of output - when client is set not to decode responses, both ids are of type bytes and xack processes them without issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants