@@ -609,7 +609,8 @@ cdef class ReadBuffer:
609
609
self ._finish_message()
610
610
return mem
611
611
612
- cdef redirect_messages(self , WriteBuffer buf, char mtype):
612
+ cdef redirect_messages(self , WriteBuffer buf, char mtype,
613
+ int stop_at = 0 ):
613
614
if not self ._current_message_ready:
614
615
raise exceptions.BufferError(
615
616
' consume_full_messages called on a buffer without a '
@@ -644,7 +645,10 @@ cdef class ReadBuffer:
644
645
else :
645
646
return
646
647
647
- # Fast path: exhaust buf0 as efficiently as possible.
648
+ if stop_at and buf._length >= stop_at:
649
+ return
650
+
651
+ # Fast path: exhaust buf0 as efficiently as possible.
648
652
if self ._pos0 + 5 <= self ._len0:
649
653
cbuf = cpython.PyBytes_AS_STRING(self ._buf0)
650
654
new_pos0 = self ._pos0
@@ -657,14 +661,17 @@ cdef class ReadBuffer:
657
661
if (cbuf + new_pos0)[0 ] != mtype:
658
662
done = 1
659
663
break
664
+ if (stop_at and
665
+ (buf._length + new_pos0 - self ._pos0) > stop_at):
666
+ done = 1
667
+ break
660
668
msg_len = hton.unpack_int32(cbuf + new_pos0 + 1 ) + 1
661
669
if new_pos0 + msg_len > cbuf_len:
662
670
break
663
671
new_pos0 += msg_len
664
672
665
673
if new_pos0 != self ._pos0:
666
- if PG_DEBUG:
667
- assert self ._pos0 < new_pos0 <= self ._len0
674
+ assert self ._pos0 < new_pos0 <= self ._len0
668
675
669
676
pos_delta = new_pos0 - self ._pos0
670
677
buf.write_cstr(
@@ -674,8 +681,7 @@ cdef class ReadBuffer:
674
681
self ._pos0 = new_pos0
675
682
self ._length -= pos_delta
676
683
677
- if PG_DEBUG:
678
- assert self ._length >= 0
684
+ assert self ._length >= 0
679
685
680
686
if done:
681
687
# The next message is of a different type.
0 commit comments