Skip to content

Commit

Permalink
Fix if lag is not avaliable (#28)
Browse files Browse the repository at this point in the history
* Fix if lag is not avaliable

* Add test case on count_stream_added

* Fix testcase
  • Loading branch information
Wh1isper authored Aug 12, 2024
1 parent 22fba04 commit 6fe28e8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
25 changes: 24 additions & 1 deletion brq/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ async def count_stream(self, function_name: str) -> int:
return 0
return await self.redis.xlen(stream_name)

async def count_stream_added(self, function_name: str) -> int:
stream_name = self.get_stream_name(function_name)
if not await self.redis.exists(stream_name):
return 0

xinfo_stream = await self.redis.xinfo_stream(stream_name)
return xinfo_stream.get("entries-added")

async def count_processing_jobs(
self, function_name: str, group_name: str = "default-workers"
) -> int:
Expand Down Expand Up @@ -209,8 +217,23 @@ async def count_unprocessed_jobs(
gropu_infos = await self.redis.xinfo_groups(stream_name)
for group_info in gropu_infos:
if group_info["name"] == group_name:
lag = group_info.get("lag") or 0
pending = group_info.get("pending") or 0
lag = group_info.get("lag")
if lag is None:
logger.warning(
f"Lag is not available for group `{group_name}` in stream `{stream_name}`. Try calculate it."
)
"""
1. Is empty
2. Stream last-generated-id were deleted, the lag possibly is length of stream
"""
entries_read = group_info.get("entries-read")
if entries_read is None:
return pending
added_count = await self.count_stream_added(function_name)
if added_count is None:
return pending
return added_count - entries_read - pending
return lag + pending
return await self.count_stream(function_name)

Expand Down
2 changes: 2 additions & 0 deletions tests/test_brq.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ async def test_count_jobs(async_redis_client, redis_version):
if redis_version == "7":
assert await consumer.count_undelivered_jobs("delay_job") == 1
assert await consumer.count_unprocessed_jobs("delay_job") == 1
assert await consumer.count_stream_added("delay_job") == 1
else:
assert await consumer.count_undelivered_jobs("delay_job") == None
assert await consumer.count_unprocessed_jobs("delay_job") == 0
assert await consumer.count_stream_added("delay_job") == None
await browser.status()
loop = asyncio.get_event_loop()
loop.create_task(consumer.run())
Expand Down

0 comments on commit 6fe28e8

Please sign in to comment.