From 77d4f44c48ba32b5c4d17c5c201bd48453ed2058 Mon Sep 17 00:00:00 2001 From: Lazy-Y <122320573+owen-zora@users.noreply.github.com> Date: Thu, 25 May 2023 10:07:57 -0700 Subject: [PATCH 1/4] Update consumer.py --- faust/transport/consumer.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 0feba69c0..c000d5de4 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -749,10 +749,17 @@ async def getmany(self, timeout: float) -> AsyncIterator[Tuple[TP, Message]]: or tp in active_partitions or tp in self._buffered_partitions ): - highwater_mark = self.highwater(tp) - self.app.monitor.track_tp_end_offset(tp, highwater_mark) - # convert timestamp to seconds from int milliseconds. - yield tp, to_message(tp, record) + try: + highwater_mark = self.highwater(tp) + self.app.monitor.track_tp_end_offset(tp, highwater_mark) + # convert timestamp to seconds from int milliseconds. + yield tp, to_message(tp, record) + except StopAsyncIteration: + raise + except Exception as exc: + self.log.exception( + "Error while processing message", exc_info=exc + ) else: self.log.dev( "getmany called while flow not active. Seek back to committed offsets." From 50b978efd8caa600ae185913ddcc38bbb54e9bd4 Mon Sep 17 00:00:00 2001 From: Lazy-Y <122320573+owen-zora@users.noreply.github.com> Date: Thu, 25 May 2023 10:13:40 -0700 Subject: [PATCH 2/4] fix bugs --- faust/streams.py | 7 +++++++ faust/transport/conductor.py | 18 ++++++++++++++++-- faust/transport/consumer.py | 4 +++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/faust/streams.py b/faust/streams.py index f23d874ad..c3d3070de 100644 --- a/faust/streams.py +++ b/faust/streams.py @@ -1211,6 +1211,13 @@ async def _py_aiter(self) -> AsyncIterator[T_co]: # We want to ack the filtered message # otherwise the lag would increase value = skipped_value + except StopAsyncIteration: + raise + except Exception as exc: + value = skipped_value + self.log.exception( + "Error in processor %r: %r", _shortlabel(processor), exc + ) try: if value is not skipped_value: diff --git a/faust/transport/conductor.py b/faust/transport/conductor.py index 45a147ceb..c62bd991a 100644 --- a/faust/transport/conductor.py +++ b/faust/transport/conductor.py @@ -266,12 +266,26 @@ def _compile_message_handler(self) -> ConsumerCallback: async def on_message(message: Message) -> None: tp = TP(topic=message.topic, partition=0) - return await get_callback_for_tp(tp)(message) + try: + return await get_callback_for_tp(tp)(message) + except KeyError: + self.logger.warning( + "client: get_callback_for_tp No callback for %r", tp + ) + except Exception as exc: + self.logger.exception(f"client: get_callback_for_tp {exc}") else: async def on_message(message: Message) -> None: - return await get_callback_for_tp(message.tp)(message) + try: + return await get_callback_for_tp(message.tp)(message) + except KeyError: + self.logger.warning( + "both: get_callback_for_tp No callback for %r", message.tp + ) + except Exception as exc: + self.logger.exception(f"client: get_callback_for_tp {exc}") return on_message diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index c000d5de4..78d60bd9d 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -1125,7 +1125,9 @@ def _new_offset(self, tp: TP) -> Optional[int]: # the return value will be None (the same as 31) if self._committed_offset[tp]: if min(acked) - self._committed_offset[tp] > 1: - return None + new_acked = list(range(self._committed_offset[tp] + 1, min(acked))) + self.log.dev(f"insert new ack {new_acked=}") + acked = new_acked + acked # Note: acked is always kept sorted. # find first list of consecutive numbers From b1c88a73a283024e8fd2bfd1483c96c4b4a606d3 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Thu, 25 May 2023 14:43:30 -0400 Subject: [PATCH 3/4] fixup exc --- faust/streams.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/faust/streams.py b/faust/streams.py index 00f18d1ad..2e6fa2769 100644 --- a/faust/streams.py +++ b/faust/streams.py @@ -1202,6 +1202,7 @@ async def _py_aiter(self) -> AsyncIterator[T_co]: sensor_state = None # reduce using processors + processor = None try: for processor in processors: with trace(f"processor-{_shortlabel(processor)}"): @@ -1213,10 +1214,10 @@ async def _py_aiter(self) -> AsyncIterator[T_co]: value = skipped_value except StopAsyncIteration: raise - except Exception: + except Exception as exc: value = skipped_value self.log.exception( - "Error in processor %r: %r", _shortlabel(processor), exc + f"Error in processor {_shortlabel(processor)}: {exc}" ) try: From a07b0e565c7c81cc61fcbbdd4f6ed7b9e46e155e Mon Sep 17 00:00:00 2001 From: Lazy-Y <122320573+owen-zora@users.noreply.github.com> Date: Thu, 25 May 2023 14:27:53 -0700 Subject: [PATCH 4/4] Update consumer.py --- faust/transport/consumer.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 78d60bd9d..c000d5de4 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -1125,9 +1125,7 @@ def _new_offset(self, tp: TP) -> Optional[int]: # the return value will be None (the same as 31) if self._committed_offset[tp]: if min(acked) - self._committed_offset[tp] > 1: - new_acked = list(range(self._committed_offset[tp] + 1, min(acked))) - self.log.dev(f"insert new ack {new_acked=}") - acked = new_acked + acked + return None # Note: acked is always kept sorted. # find first list of consecutive numbers