tests: backfill coverage for openrouter transport, auth sessions, cadence
Three new test files covering modules the audit flagged as having zero direct coverage: - test_openrouter_transport.py (18 tests): provider chain selection, endpoint resolution, _call_provider parse path (including the reasoning-field fallback and token-based cost estimation), and call_llm's cross-provider failover. Uses httpx.MockTransport so no network. Patches _call_provider for failover tests to bypass tenacity's retry delays. - test_auth_session.py (7 tests): sign/verify round-trip, tampered cookie rejection, expired cookie rejection (via TTL monkeypatch), garbage input handling, salt isolation between session and pending serializers, and rejection of cookies signed with a different secret. - test_cadence_policy.py (16 tests): is_active_window weekday/weekend + half-open interval boundaries, min_gap_hours across bands, should_run gating for first-run / active / off-hours / weekend / naive-datetime cases, and the NEWS_POLICY 20-minute / 3-hour variations. Suite goes from 291 to 336 passing.
This commit is contained in:
parent
83995e96c8
commit
f9f4f25ef7
3 changed files with 500 additions and 0 deletions
81
tests/test_auth_session.py
Normal file
81
tests/test_auth_session.py
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
"""Session cookie sign/verify — security-critical edges that the
|
||||
existing test suite uses as a fixture (``sign_session(1)`` for cookies)
|
||||
but doesn't actually probe.
|
||||
|
||||
Covers:
|
||||
- Round-trip: sign(user_id) → verify → user_id
|
||||
- Tampered cookie → None (not raised)
|
||||
- Expired cookie → None (via itsdangerous max_age)
|
||||
- Garbage / non-serializer-format input → None
|
||||
- Wrong-salt isolation: a pending cookie can't be unlocked by the
|
||||
session verifier (and vice versa)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from itsdangerous import URLSafeTimedSerializer
|
||||
|
||||
from app import auth
|
||||
|
||||
|
||||
def test_session_signed_token_round_trips():
|
||||
cookie = auth.sign_session(42)
|
||||
assert auth.verify_session(cookie) == 42
|
||||
|
||||
|
||||
def test_session_token_is_opaque_url_safe():
|
||||
"""Sanity: the serializer produces a URL-safe string with at least
|
||||
two dot-separated segments (payload.timestamp.signature). Not a
|
||||
semantic test, but catches a future swap to an un-encoded format."""
|
||||
cookie = auth.sign_session(7)
|
||||
assert "." in cookie
|
||||
assert " " not in cookie
|
||||
|
||||
|
||||
def test_tampered_session_cookie_returns_none():
|
||||
"""Flip a single character in the signature segment and verify
|
||||
the cookie no longer authenticates — without exceptions leaking."""
|
||||
cookie = auth.sign_session(99)
|
||||
# Flip the last character (signature segment).
|
||||
tampered = cookie[:-1] + ("a" if cookie[-1] != "a" else "b")
|
||||
assert auth.verify_session(tampered) is None
|
||||
|
||||
|
||||
def test_garbage_session_cookie_returns_none():
|
||||
assert auth.verify_session("not-a-real-cookie") is None
|
||||
assert auth.verify_session("") is None
|
||||
assert auth.verify_session("a.b.c") is None
|
||||
|
||||
|
||||
def test_expired_session_cookie_returns_none(monkeypatch):
|
||||
"""Forge a cookie with an ancient timestamp and confirm the TTL
|
||||
check rejects it. We bypass sign_session() so the timestamp is
|
||||
in our control rather than `now`."""
|
||||
s = auth._serializer()
|
||||
# itsdangerous stores the issued-at timestamp in a base62 segment.
|
||||
# Easier than hand-building: monkeypatch the SESSION_TTL_SECONDS
|
||||
# to a negative value so any freshly-signed cookie is "expired"
|
||||
# the moment we verify it.
|
||||
cookie = auth.sign_session(123)
|
||||
monkeypatch.setattr(auth, "SESSION_TTL_SECONDS", -1)
|
||||
assert auth.verify_session(cookie) is None
|
||||
|
||||
|
||||
def test_session_serializer_isolated_from_pending_serializer():
|
||||
"""A pending-verify cookie must not authenticate as a session
|
||||
(different salts), and vice versa — otherwise the half-finished
|
||||
OTP flow becomes a free login."""
|
||||
pending = auth.sign_pending("u@x", 5)
|
||||
session = auth.sign_session(5)
|
||||
assert auth.verify_session(pending) is None
|
||||
assert auth.verify_pending(session) is None
|
||||
|
||||
|
||||
def test_session_cookie_signed_with_different_secret_rejected(monkeypatch):
|
||||
"""Defence-in-depth: signing with a different secret produces a
|
||||
cookie that the live verifier (using the configured secret)
|
||||
rejects. Confirms we're actually checking the HMAC, not just the
|
||||
payload format."""
|
||||
rogue = URLSafeTimedSerializer("totally-different-secret",
|
||||
salt="cassandra-session-v1")
|
||||
rogue_cookie = rogue.dumps({"uid": 1})
|
||||
assert auth.verify_session(rogue_cookie) is None
|
||||
163
tests/test_cadence_policy.py
Normal file
163
tests/test_cadence_policy.py
Normal file
|
|
@ -0,0 +1,163 @@
|
|||
"""Cadence policy — the gate that ai_log_job and indicator_summary_job
|
||||
use to throttle OpenRouter spend outside active market hours.
|
||||
|
||||
Pure-function module, so tests just construct timestamps and assert on
|
||||
the (should_run, reason) tuple. Uses the default policy (active window
|
||||
07:00-21:00 UTC weekdays, no off-hours runs without 4+ hours since
|
||||
last success, weekends 12+ hours).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from app.services.cadence import DEFAULT_POLICY, NEWS_POLICY, CadencePolicy
|
||||
|
||||
|
||||
def _utc(year, month, day, hour, minute=0):
|
||||
return datetime(year, month, day, hour, minute, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
# Pick reference timestamps used across tests. Wednesday 12:00 UTC is
|
||||
# squarely inside the active window; Wednesday 03:00 is off-hours;
|
||||
# Saturday 12:00 is weekend.
|
||||
_WED_NOON = _utc(2026, 5, 27, 12) # Wednesday 12:00
|
||||
_WED_PRE_DAWN = _utc(2026, 5, 27, 3) # Wednesday 03:00
|
||||
_SAT_NOON = _utc(2026, 5, 30, 12) # Saturday 12:00
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_active_window
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_active_window_weekday_noon_is_active():
|
||||
assert DEFAULT_POLICY.is_active_window(_WED_NOON) is True
|
||||
|
||||
|
||||
def test_active_window_weekday_predawn_is_off_hours():
|
||||
assert DEFAULT_POLICY.is_active_window(_WED_PRE_DAWN) is False
|
||||
|
||||
|
||||
def test_active_window_weekend_always_off_hours():
|
||||
"""Weekends bypass the hour check — even Saturday noon is throttled."""
|
||||
assert DEFAULT_POLICY.is_active_window(_SAT_NOON) is False
|
||||
|
||||
|
||||
def test_active_window_boundary_inclusive_start_exclusive_end():
|
||||
"""07:00 UTC is the first active hour; 21:00 is the first off-hour.
|
||||
Locks the half-open interval semantics in place."""
|
||||
assert DEFAULT_POLICY.is_active_window(_utc(2026, 5, 27, 7)) is True
|
||||
assert DEFAULT_POLICY.is_active_window(_utc(2026, 5, 27, 21)) is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# min_gap_hours
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_min_gap_uses_zero_during_active_window():
|
||||
assert DEFAULT_POLICY.min_gap_hours(_WED_NOON) == 0.0
|
||||
|
||||
|
||||
def test_min_gap_uses_off_hours_value_at_night():
|
||||
assert DEFAULT_POLICY.min_gap_hours(_WED_PRE_DAWN) == 4.0
|
||||
|
||||
|
||||
def test_min_gap_uses_weekend_value_on_saturday():
|
||||
assert DEFAULT_POLICY.min_gap_hours(_SAT_NOON) == 12.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# should_run — the function jobs call
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_should_run_first_ever_call_always_proceeds():
|
||||
ok, reason = DEFAULT_POLICY.should_run(None, now=_WED_NOON)
|
||||
assert ok is True
|
||||
assert "no prior" in reason.lower()
|
||||
|
||||
|
||||
def test_should_run_during_active_window_always_proceeds():
|
||||
"""Default policy has active_gap_h=0, so even a run from 1 minute ago
|
||||
is allowed when we're in the active window."""
|
||||
last = _WED_NOON - timedelta(minutes=1)
|
||||
ok, reason = DEFAULT_POLICY.should_run(last, now=_WED_NOON)
|
||||
assert ok is True
|
||||
assert "active" in reason
|
||||
|
||||
|
||||
def test_should_run_off_hours_too_soon_is_throttled():
|
||||
"""Off-hours requires 4+ hours since last success. 1 hour ago → no."""
|
||||
last = _WED_PRE_DAWN - timedelta(hours=1)
|
||||
ok, reason = DEFAULT_POLICY.should_run(last, now=_WED_PRE_DAWN)
|
||||
assert ok is False
|
||||
assert "throttled" in reason
|
||||
assert "off-hours" in reason
|
||||
|
||||
|
||||
def test_should_run_off_hours_after_gap_proceeds():
|
||||
last = _WED_PRE_DAWN - timedelta(hours=5)
|
||||
ok, reason = DEFAULT_POLICY.should_run(last, now=_WED_PRE_DAWN)
|
||||
assert ok is True
|
||||
assert "off-hours" in reason
|
||||
|
||||
|
||||
def test_should_run_weekend_requires_12h_gap():
|
||||
"""Weekend gap is 12h. 6h is too soon; 13h is enough."""
|
||||
ok6, _ = DEFAULT_POLICY.should_run(
|
||||
_SAT_NOON - timedelta(hours=6), now=_SAT_NOON,
|
||||
)
|
||||
ok13, _ = DEFAULT_POLICY.should_run(
|
||||
_SAT_NOON - timedelta(hours=13), now=_SAT_NOON,
|
||||
)
|
||||
assert ok6 is False
|
||||
assert ok13 is True
|
||||
|
||||
|
||||
def test_should_run_naive_datetime_treated_as_utc():
|
||||
"""The DB column comes back as a naive datetime in some test paths;
|
||||
the policy must coerce it to UTC rather than crash on tz subtraction."""
|
||||
naive_last = _WED_PRE_DAWN.replace(tzinfo=None) - timedelta(hours=5)
|
||||
ok, _ = DEFAULT_POLICY.should_run(naive_last, now=_WED_PRE_DAWN)
|
||||
assert ok is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# NEWS_POLICY — tighter gaps so 3 runs/hour during the active window.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_news_policy_active_gap_is_twenty_minutes():
|
||||
# 20 minutes = 1/3 hour. Verify a 15-min-ago run is throttled but
|
||||
# a 21-min-ago one is allowed.
|
||||
last_15 = _WED_NOON - timedelta(minutes=15)
|
||||
last_21 = _WED_NOON - timedelta(minutes=21)
|
||||
assert NEWS_POLICY.should_run(last_15, now=_WED_NOON)[0] is False
|
||||
assert NEWS_POLICY.should_run(last_21, now=_WED_NOON)[0] is True
|
||||
|
||||
|
||||
def test_news_policy_off_hours_gap_is_three_hours():
|
||||
last_2h = _WED_PRE_DAWN - timedelta(hours=2)
|
||||
last_4h = _WED_PRE_DAWN - timedelta(hours=4)
|
||||
assert NEWS_POLICY.should_run(last_2h, now=_WED_PRE_DAWN)[0] is False
|
||||
assert NEWS_POLICY.should_run(last_4h, now=_WED_PRE_DAWN)[0] is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bespoke policy — confirms the dataclass is reconfigurable for callers
|
||||
# (the audit flagged this as risky to over-fit to defaults).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_custom_policy_with_active_gap_throttles_within_window():
|
||||
"""active_gap_h=0.5 means even during the active window a run from
|
||||
20 minutes ago is throttled — verifies the gate isn't hardcoded to
|
||||
'always run during active'."""
|
||||
p = CadencePolicy(active_gap_h=0.5)
|
||||
last = _WED_NOON - timedelta(minutes=20)
|
||||
ok, reason = p.should_run(last, now=_WED_NOON)
|
||||
assert ok is False
|
||||
assert "throttled" in reason
|
||||
256
tests/test_openrouter_transport.py
Normal file
256
tests/test_openrouter_transport.py
Normal file
|
|
@ -0,0 +1,256 @@
|
|||
"""Transport-layer tests for app.services.openrouter.
|
||||
|
||||
The companion file `test_openrouter_prompt.py` covers prompt building;
|
||||
this one covers the HTTP plumbing: provider chain selection, endpoint
|
||||
resolution, the per-call retry/parse path in `_call_provider`, and
|
||||
fallback behaviour in `call_llm`. Network requests are intercepted with
|
||||
``httpx.MockTransport`` so nothing hits the wire.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from app.config import get_settings
|
||||
from app.services import openrouter as ot
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _estimate_cost_usd
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_estimate_cost_known_model_uses_table_rates():
|
||||
# deepseek-v4-flash table: 0.07/M input, 0.28/M output.
|
||||
# 1000 in + 2000 out = 0.000_07 + 0.000_56 = 0.000_63.
|
||||
cost = ot._estimate_cost_usd("deepseek-v4-flash", 1000, 2000)
|
||||
assert cost == pytest.approx(0.00063, rel=1e-9)
|
||||
|
||||
|
||||
def test_estimate_cost_handles_provider_prefixed_model_name():
|
||||
# OpenRouter-style model strings use the slash-prefixed form.
|
||||
cost = ot._estimate_cost_usd("deepseek/deepseek-v4-flash", 1000, 2000)
|
||||
assert cost == pytest.approx(0.00063, rel=1e-9)
|
||||
|
||||
|
||||
def test_estimate_cost_unknown_model_returns_none():
|
||||
assert ot._estimate_cost_usd("never-heard-of-this-model", 100, 200) is None
|
||||
|
||||
|
||||
def test_estimate_cost_missing_tokens_returns_none():
|
||||
assert ot._estimate_cost_usd("deepseek-v4-flash", None, 200) is None
|
||||
assert ot._estimate_cost_usd("deepseek-v4-flash", 100, None) is None
|
||||
assert ot._estimate_cost_usd("deepseek-v4-flash", None, None) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _provider_chain / llm_configured / active_model
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _configure(monkeypatch, **overrides):
|
||||
"""Apply a small bundle of LLM settings for one test."""
|
||||
s = get_settings()
|
||||
defaults = {
|
||||
"LLM_PROVIDER": "deepseek",
|
||||
"LLM_FALLBACK": "openrouter",
|
||||
"DEEPSEEK_API_KEY": "",
|
||||
"OPENROUTER_API_KEY": "",
|
||||
"DEEPSEEK_MODEL": "deepseek-v4-flash",
|
||||
"OPENROUTER_MODEL": "deepseek/deepseek-v4-flash",
|
||||
"DEEPSEEK_URL": "https://api.deepseek.com/chat/completions",
|
||||
}
|
||||
defaults.update(overrides)
|
||||
for k, v in defaults.items():
|
||||
monkeypatch.setattr(s, k, v, raising=False)
|
||||
|
||||
|
||||
def test_provider_chain_drops_providers_without_keys(monkeypatch):
|
||||
_configure(monkeypatch, DEEPSEEK_API_KEY="sk-deepseek") # openrouter key missing
|
||||
assert ot._provider_chain() == ["deepseek"]
|
||||
assert ot.llm_configured() is True
|
||||
|
||||
|
||||
def test_provider_chain_lists_primary_then_fallback(monkeypatch):
|
||||
_configure(monkeypatch,
|
||||
DEEPSEEK_API_KEY="sk-deepseek", OPENROUTER_API_KEY="sk-openrouter")
|
||||
assert ot._provider_chain() == ["deepseek", "openrouter"]
|
||||
|
||||
|
||||
def test_provider_chain_skips_duplicate_when_primary_equals_fallback(monkeypatch):
|
||||
_configure(monkeypatch, LLM_FALLBACK="deepseek", DEEPSEEK_API_KEY="sk")
|
||||
assert ot._provider_chain() == ["deepseek"]
|
||||
|
||||
|
||||
def test_llm_configured_false_when_no_keys(monkeypatch):
|
||||
_configure(monkeypatch) # both keys empty
|
||||
assert ot.llm_configured() is False
|
||||
assert ot._provider_chain() == []
|
||||
assert ot.active_model() == "unknown"
|
||||
|
||||
|
||||
def test_active_model_reflects_primary(monkeypatch):
|
||||
_configure(monkeypatch,
|
||||
LLM_PROVIDER="openrouter", OPENROUTER_API_KEY="sk-or",
|
||||
DEEPSEEK_API_KEY="")
|
||||
assert ot.active_model() == "deepseek/deepseek-v4-flash" # OPENROUTER_MODEL
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _endpoint_for
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_endpoint_for_unknown_provider_raises(monkeypatch):
|
||||
_configure(monkeypatch, DEEPSEEK_API_KEY="sk")
|
||||
with pytest.raises(RuntimeError, match="Unknown LLM provider"):
|
||||
ot._endpoint_for("anthropic")
|
||||
|
||||
|
||||
def test_endpoint_for_provider_without_key_raises(monkeypatch):
|
||||
_configure(monkeypatch) # both keys empty
|
||||
with pytest.raises(RuntimeError, match="DEEPSEEK_API_KEY not set"):
|
||||
ot._endpoint_for("deepseek")
|
||||
with pytest.raises(RuntimeError, match="OPENROUTER_API_KEY not set"):
|
||||
ot._endpoint_for("openrouter")
|
||||
|
||||
|
||||
def test_endpoint_for_openrouter_includes_attribution_and_no_train_headers(monkeypatch):
|
||||
_configure(monkeypatch, OPENROUTER_API_KEY="sk-or")
|
||||
url, key, model, headers = ot._endpoint_for("openrouter")
|
||||
assert url.endswith("/chat/completions")
|
||||
assert key == "sk-or"
|
||||
assert headers["X-OR-Allow-Training"] == "false"
|
||||
assert "HTTP-Referer" in headers and "X-Title" in headers
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _call_provider (through call_llm so retry doesn't fire — happy paths only)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _mock_post(callback):
|
||||
"""Wrap a callback into an httpx.MockTransport. Callback receives the
|
||||
request and returns either an httpx.Response or raises."""
|
||||
return httpx.MockTransport(callback)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_llm_returns_parsed_log_result(monkeypatch):
|
||||
_configure(monkeypatch, DEEPSEEK_API_KEY="sk-deepseek", LLM_FALLBACK="")
|
||||
|
||||
def handler(request: httpx.Request) -> httpx.Response:
|
||||
body = json.loads(request.content.decode())
|
||||
assert body["model"] == "deepseek-v4-flash"
|
||||
return httpx.Response(200, json={
|
||||
"choices": [{"message": {"content": "hello"}, "finish_reason": "stop"}],
|
||||
"usage": {"prompt_tokens": 100, "completion_tokens": 200},
|
||||
})
|
||||
|
||||
async with httpx.AsyncClient(transport=_mock_post(handler)) as client:
|
||||
result = await ot.call_llm(client, [{"role": "user", "content": "hi"}])
|
||||
|
||||
assert result.content == "hello"
|
||||
# Model is prefixed with the answering provider for ledger traceability.
|
||||
assert result.model == "deepseek/deepseek-v4-flash"
|
||||
assert result.prompt_tokens == 100
|
||||
assert result.completion_tokens == 200
|
||||
# DeepSeek doesn't return cost — estimated from tokens.
|
||||
# 100 * 0.07 + 200 * 0.28 = 7 + 56 = 63 → 0.000063.
|
||||
assert result.cost_usd == pytest.approx(0.000063, rel=1e-9)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_llm_uses_upstream_cost_when_provided(monkeypatch):
|
||||
"""When the upstream supplies usage.cost (OpenRouter), we trust it
|
||||
and skip the per-model table estimate."""
|
||||
_configure(monkeypatch, LLM_PROVIDER="openrouter",
|
||||
OPENROUTER_API_KEY="sk-or", LLM_FALLBACK="")
|
||||
|
||||
def handler(request: httpx.Request) -> httpx.Response:
|
||||
return httpx.Response(200, json={
|
||||
"choices": [{"message": {"content": "ok"}, "finish_reason": "stop"}],
|
||||
"usage": {"prompt_tokens": 50, "completion_tokens": 50, "cost": 0.0042},
|
||||
})
|
||||
|
||||
async with httpx.AsyncClient(transport=_mock_post(handler)) as client:
|
||||
result = await ot.call_llm(client, [{"role": "user", "content": "hi"}])
|
||||
|
||||
assert result.cost_usd == 0.0042
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_llm_falls_back_to_reasoning_field_when_content_null(monkeypatch):
|
||||
"""Thinking models sometimes return null `content` plus a populated
|
||||
`reasoning` block — we surface the reasoning so the caller still gets
|
||||
something usable rather than treating the row as empty."""
|
||||
_configure(monkeypatch, DEEPSEEK_API_KEY="sk-d", LLM_FALLBACK="")
|
||||
|
||||
def handler(request: httpx.Request) -> httpx.Response:
|
||||
return httpx.Response(200, json={
|
||||
"choices": [{
|
||||
"message": {"content": None, "reasoning": "deep thought"},
|
||||
"finish_reason": "stop",
|
||||
}],
|
||||
"usage": {"prompt_tokens": 10, "completion_tokens": 20},
|
||||
})
|
||||
|
||||
async with httpx.AsyncClient(transport=_mock_post(handler)) as client:
|
||||
result = await ot.call_llm(client, [{"role": "user", "content": "hi"}])
|
||||
assert result.content == "deep thought"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_llm_raises_when_no_provider_configured(monkeypatch):
|
||||
_configure(monkeypatch) # both keys empty
|
||||
async with httpx.AsyncClient() as client:
|
||||
with pytest.raises(RuntimeError, match="No LLM provider configured"):
|
||||
await ot.call_llm(client, [{"role": "user", "content": "hi"}])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# call_llm fallback chain — patch _call_provider to bypass the retry/sleep
|
||||
# decorator and exercise the cross-provider failover logic directly.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_llm_falls_back_to_secondary_when_primary_raises(monkeypatch):
|
||||
_configure(monkeypatch,
|
||||
DEEPSEEK_API_KEY="sk-d", OPENROUTER_API_KEY="sk-or")
|
||||
|
||||
calls = []
|
||||
success = ot.LogResult(
|
||||
content="from-fallback", model="openrouter/deepseek/deepseek-v4-flash",
|
||||
prompt_tokens=1, completion_tokens=2, cost_usd=0.0,
|
||||
)
|
||||
|
||||
async def fake(_client, provider, _messages, _model, _max_tokens):
|
||||
calls.append(provider)
|
||||
if provider == "deepseek":
|
||||
raise RuntimeError("primary down")
|
||||
return success
|
||||
|
||||
with patch.object(ot, "_call_provider", fake):
|
||||
async with httpx.AsyncClient() as client:
|
||||
result = await ot.call_llm(client, [{"role": "user", "content": "hi"}])
|
||||
|
||||
assert calls == ["deepseek", "openrouter"]
|
||||
assert result.content == "from-fallback"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_llm_raises_last_exception_when_chain_exhausted(monkeypatch):
|
||||
_configure(monkeypatch,
|
||||
DEEPSEEK_API_KEY="sk-d", OPENROUTER_API_KEY="sk-or")
|
||||
|
||||
async def fake(_client, provider, _messages, _model, _max_tokens):
|
||||
raise RuntimeError(f"{provider} broken")
|
||||
|
||||
with patch.object(ot, "_call_provider", fake):
|
||||
async with httpx.AsyncClient() as client:
|
||||
with pytest.raises(RuntimeError, match="openrouter broken"):
|
||||
await ot.call_llm(client, [{"role": "user", "content": "hi"}])
|
||||
Loading…
Add table
Add a link
Reference in a new issue