Unverified Commit c68052ff authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(test): poll for snapshot before starting Router 2 in indexers_sync (#6707)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarClaude Opus 4.6 <noreply@anthropic.com>
parent dba51eed
...@@ -1485,8 +1485,35 @@ def _test_router_indexers_sync( ...@@ -1485,8 +1485,35 @@ def _test_router_indexers_sync(
await asyncio.sleep(5) await asyncio.sleep(5)
# Wait for a second before creating the second router # Wait for snapshot to be available before creating second router.
logger.info("Waiting for 1 second before creating second router") # In JetStream mode, the background task may purge acknowledged messages
# from the stream before the snapshot upload completes. Poll the object
# store so Router 2 can reliably download the snapshot on startup.
if durable_kv_events:
component_subject = f"namespace.{engine_workers.namespace}.component.{engine_workers.component_name}"
slugified = component_subject.lower().replace(".", "-").replace("_", "-")
bucket_name = f"{slugified}-radix-bucket"
nc = await nats.connect(servers=_nats_server())
try:
js = nc.jetstream()
for attempt in range(50):
try:
obj_store = await js.object_store(bucket_name)
await obj_store.get("radix-state")
logger.info(
f"Snapshot available in object store (attempt {attempt + 1})"
)
break
except Exception:
await asyncio.sleep(0.1)
else:
assert False, (
f"Snapshot not found in bucket '{bucket_name}' after 50 attempts (5s). "
f"Router 1 sent 25 requests with snapshot_threshold=20, snapshot should exist."
)
finally:
await nc.close()
else:
await asyncio.sleep(1) await asyncio.sleep(1)
# Create second runtime and endpoint for router 2 # Create second runtime and endpoint for router 2
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment