"""SQLAlchemy models for Cassandra. Schema rationale lives in /home/gg/.claude/plans/ok-i-think-this-tidy-lake.md. All datetimes are tz-aware UTC (see app.db.utcnow). """ from __future__ import annotations from datetime import datetime, date from sqlalchemy import ( JSON, BigInteger, Boolean, Date, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, SmallInteger, String, Text, UniqueConstraint, text, ) from sqlalchemy.orm import Mapped, mapped_column, relationship from app.db import Base, utcnow # Portable autoincrement primary-key type. SQLite only treats `INTEGER # PRIMARY KEY` as a ROWID alias (the bit that auto-fills); plain BIGINT # requires explicit values, which breaks our async tests. `with_variant` # emits INTEGER on SQLite and keeps BIGINT everywhere else. _PK = BigInteger().with_variant(Integer(), "sqlite") class Quote(Base): __tablename__ = "quotes" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) symbol: Mapped[str] = mapped_column(String(128), nullable=False) source: Mapped[str] = mapped_column(String(32), nullable=False) label: Mapped[str] = mapped_column(String(128), default="") group_name: Mapped[str] = mapped_column(String(64), nullable=False) price: Mapped[float | None] = mapped_column(Float) currency: Mapped[str | None] = mapped_column(String(8)) as_of: Mapped[str | None] = mapped_column(String(16)) # provider date string changes: Mapped[dict | None] = mapped_column(JSON) # {"1d": x, "1m": y, ...} error: Mapped[str | None] = mapped_column(String(255)) fetched_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) __table_args__ = ( Index("ix_quotes_symbol_fetched", "symbol", "fetched_at"), Index("ix_quotes_group", "group_name"), ) class QuoteDaily(Base): """Daily rollup — sparkline source. PK on (symbol, date).""" __tablename__ = "quotes_daily" symbol: Mapped[str] = mapped_column(String(64), primary_key=True) date: Mapped[date] = mapped_column(Date, primary_key=True) close: Mapped[float | None] = mapped_column(Float) high: Mapped[float | None] = mapped_column(Float) low: Mapped[float | None] = mapped_column(Float) source: Mapped[str] = mapped_column(String(32)) class Headline(Base): __tablename__ = "headlines" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) source: Mapped[str] = mapped_column(String(64), nullable=False) category: Mapped[str] = mapped_column(String(32), nullable=False) title: Mapped[str] = mapped_column(String(512), nullable=False) url: Mapped[str] = mapped_column(String(1024), nullable=False) published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) fetched_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) fingerprint: Mapped[str] = mapped_column(String(40), nullable=False) # sha1 of normalised title # Semantic content tags from app.services.news_tagging. NULL = not yet # tagged; the next news_job run picks it up. Each entry is one of the # values in news_tagging.TAG_VOCABULARY. tags: Mapped[list[str] | None] = mapped_column(JSON, nullable=True) __table_args__ = ( UniqueConstraint("fingerprint", name="uq_headlines_fingerprint"), Index("ix_headlines_published", "published_at"), Index("ix_headlines_category_published", "category", "published_at"), ) class Feed(Base): """Persisted feed state; bootstrapped from default.toml on first startup.""" __tablename__ = "feeds" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) category: Mapped[str] = mapped_column(String(32), nullable=False) name: Mapped[str] = mapped_column(String(64), nullable=False) url: Mapped[str] = mapped_column(String(1024), nullable=False) enabled: Mapped[bool] = mapped_column(Boolean, default=True) consecutive_failures: Mapped[int] = mapped_column(Integer, default=0) last_success_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) __table_args__ = ( UniqueConstraint("category", "name", name="uq_feeds_cat_name"), ) class StrategicLog(Base): __tablename__ = "strategic_logs" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) generated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True) model: Mapped[str] = mapped_column(String(64), nullable=False) anchor_date: Mapped[str | None] = mapped_column(String(16)) prompt_version: Mapped[int] = mapped_column(Integer, default=1) tone: Mapped[str | None] = mapped_column(String(16)) # NOVICE|INTERMEDIATE|PRO analysis: Mapped[str | None] = mapped_column(String(16)) # DRY|SPECULATIVE content: Mapped[str] = mapped_column(Text, nullable=False) prompt_tokens: Mapped[int | None] = mapped_column(Integer) completion_tokens: Mapped[int | None] = mapped_column(Integer) cost_usd: Mapped[float | None] = mapped_column(Float) class StrategicLogTranslation(Base): """Cached translation of a single StrategicLog row. Populated by ai_log_job after the English row is committed: one row per (log_id, lang) combination. The /log endpoint serves the matching row when available and falls back to the English source when no row exists yet (e.g. translation failed or the language was added after the log was generated). No user attribution — the cache is shared. Setting `lang` on a user just selects which (already-translated) variant they see. """ __tablename__ = "strategic_log_translations" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) log_id: Mapped[int] = mapped_column( BigInteger().with_variant(Integer(), "sqlite"), ForeignKey("strategic_logs.id", ondelete="CASCADE"), nullable=False, ) lang: Mapped[str] = mapped_column(String(8), nullable=False) content_md: Mapped[str] = mapped_column(Text, nullable=False) generated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, default=utcnow, ) llm_model: Mapped[str | None] = mapped_column(String(64)) llm_cost_usd: Mapped[float | None] = mapped_column(Float) __table_args__ = ( UniqueConstraint("log_id", "lang", name="uq_slt_log_lang"), ) class IndicatorSummary(Base): """Short AI-generated read for one indicator group, regenerated hourly. The latest row per group_name is what the dashboard renders.""" __tablename__ = "indicator_summaries" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) group_name: Mapped[str] = mapped_column(String(64), nullable=False) generated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) model: Mapped[str] = mapped_column(String(64), nullable=False) tone: Mapped[str | None] = mapped_column(String(16)) analysis: Mapped[str | None] = mapped_column(String(16)) prompt_version: Mapped[int] = mapped_column(Integer, default=1) content: Mapped[str] = mapped_column(Text, nullable=False) prompt_tokens: Mapped[int | None] = mapped_column(Integer) completion_tokens: Mapped[int | None] = mapped_column(Integer) cost_usd: Mapped[float | None] = mapped_column(Float) __table_args__ = (Index("ix_indsumm_group_generated", "group_name", "generated_at"),) class AICall(Base): """Cost ledger for OpenRouter calls. Feeds the monthly cap check.""" __tablename__ = "ai_calls" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) called_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True) model: Mapped[str] = mapped_column(String(64), nullable=False) prompt_tokens: Mapped[int | None] = mapped_column(Integer) completion_tokens: Mapped[int | None] = mapped_column(Integer) cost_usd: Mapped[float | None] = mapped_column(Float) status: Mapped[str] = mapped_column(String(16), default="ok") error: Mapped[str | None] = mapped_column(String(512)) # Portfolio / PortfolioSnapshot / Position removed in Phase G — # holdings live in the browser, the server stores only the anonymous # ticker universe + public market data. class User(Base): """A user account. Authentication is e-mail-only via one-time codes (see EmailOTP) — no passwords. Possessing an active session cookie means the user proved control of `email` at session creation time, so a separate `email_verified` flag would be redundant.""" __tablename__ = "users" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) email: Mapped[str] = mapped_column(String(255), nullable=False) tier: Mapped[str] = mapped_column(String(16), default="free") # free | paid | enterprise settings_json: Mapped[dict | None] = mapped_column(JSON) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) last_login_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) # Referral code is unique + URL-safe; generated on first need rather # than at row creation so existing accounts get one the next time # they hit /settings. referral_code: Mapped[str | None] = mapped_column(String(16), nullable=True) referred_by_user_id: Mapped[int | None] = mapped_column( ForeignKey("users.id", ondelete="SET NULL"), nullable=True, ) # Paid-tier credit window. Null = no credit. When set and > now(), # the user gets paid-tier features regardless of `tier`. Populated # by admin CLI (manual grants) and by referral conversion (45 days # per converted referral, both parties). credit_until: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, ) email_digest_opt_in: Mapped[bool] = mapped_column( Boolean, nullable=False, default=True, server_default=text("1"), ) # NULL = use INTERMEDIATE at render time. Server-side mirror of the # dashboard tone, decoupled because the dashboard pref is localStorage. digest_tone: Mapped[str | None] = mapped_column(String(16)) # Preferred language for AI-generated content (strategic log, # digest emails, portfolio commentary). Default 'en'. The settings # PATCH endpoint validates against ACTIVE_LANGUAGES in # app/services/i18n.py before writing. lang: Mapped[str] = mapped_column( String(8), nullable=False, default="en", server_default="en", ) # Polar (MoR) linkage — populated by the polar_webhook handler the # first time we see a subscription/order event for the user. The # customer id is the stable join key; the subscription id is what # we cancel against from /settings. polar_customer_id: Mapped[str | None] = mapped_column(String(64), nullable=True) polar_subscription_id: Mapped[str | None] = mapped_column(String(64), nullable=True) # Stripe (merchant-on-record for read.markets). Populated on the # first checkout.session.completed event via client_reference_id; # used thereafter to match incoming subscription/invoice events # back to this row. stripe_customer_id: Mapped[str | None] = mapped_column(String(64), nullable=True) stripe_subscription_id: Mapped[str | None] = mapped_column(String(64), nullable=True) # Set when a subscription is in `trialing` state — drives the # "Free trial — N days remaining" hint on /settings. Cleared on # subscription.revoked or when status transitions out of trialing. stripe_trial_end_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, ) __table_args__ = ( UniqueConstraint("email", name="uq_users_email"), UniqueConstraint("referral_code", name="uq_users_referral_code"), UniqueConstraint("polar_customer_id", name="uq_users_polar_customer"), UniqueConstraint("stripe_customer_id", name="uq_users_stripe_customer"), ) class PortfolioSync(Base): """Opt-in encrypted backup of a user's pie. Stored as opaque bytes: the client encrypts the pie with a PIN-derived key (AES-GCM), and the server wraps that ciphertext again with a per-user key derived from PORTFOLIO_SYNC_PEPPER + user_id (also AES-GCM). A DB-only leak yields nothing usable without the env-only pepper; a pepper-only leak still leaves the attacker brute-forcing the PIN through PBKDF2(600k). One row per user. Absent row = sync disabled for that user. The fetch_window_* fields drive a sliding-window rate limit on GET so the pepper-leak threat model can't degenerate into an unthrottled brute force against the inner PBKDF2.""" __tablename__ = "portfolio_sync" user_id: Mapped[int] = mapped_column( ForeignKey("users.id", ondelete="CASCADE"), primary_key=True, ) outer_ciphertext: Mapped[bytes] = mapped_column(LargeBinary, nullable=False) outer_nonce: Mapped[bytes] = mapped_column(LargeBinary, nullable=False) version: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=1) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) fetch_window_start: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) fetch_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) # 8-byte HKDF fingerprint of the pepper that wrapped this row. A # mismatch against the current pepper means the row is orphaned # (pepper was rotated) — distinct from genuine GCM corruption. pepper_fp: Mapped[bytes | None] = mapped_column(LargeBinary(length=8)) class Referral(Base): """One row per captured (referrer, referred) pair. Created at signup when the new user supplied a valid `?ref=`. The conversion fields (`converted_at`, `credited_at`) stay null until the referred user makes their first paid subscription — the Stripe webhook calls ``referral_service.convert_referral`` to fill them in and extend both parties' ``credit_until``.""" __tablename__ = "referrals" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) referrer_user_id: Mapped[int] = mapped_column( ForeignKey("users.id", ondelete="CASCADE"), nullable=False, ) referred_user_id: Mapped[int] = mapped_column( ForeignKey("users.id", ondelete="CASCADE"), nullable=False, ) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) converted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) credited_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) __table_args__ = ( UniqueConstraint("referred_user_id", name="uq_referrals_referred"), Index("ix_referrals_referrer", "referrer_user_id"), ) class EmailOTP(Base): """One-time codes for email verification. The plaintext 6-digit code is sent in the email; we store an argon2 hash, expiry, attempt count, and a used_at timestamp so a single code can't be reused or brute-forced.""" __tablename__ = "email_otps" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) email: Mapped[str] = mapped_column(String(255), nullable=False) code_hash: Mapped[str] = mapped_column(String(255), nullable=False) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) attempts: Mapped[int] = mapped_column(Integer, default=0) used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) purpose: Mapped[str] = mapped_column(String(16), default="signup") __table_args__ = (Index("ix_otps_email_created", "email", "created_at"),) class InstrumentMap(Base): """Maps T212's tickers/shortnames to Yahoo Finance tickers so we can refresh prices via Yahoo after a user uploads a T212 pie CSV. Synced periodically from T212's /equity/metadata/instruments endpoint via the admin's read-only API key. Each row is one T212 listing. Multiple rows can share a shortName (e.g. SHEL on LSE in GBX vs SHEL on NYSE in USD); the resolver picks the right one per user.""" __tablename__ = "instrument_map" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) t212_ticker: Mapped[str] = mapped_column(String(64), nullable=False) t212_shortname: Mapped[str] = mapped_column(String(32), nullable=False) yahoo_ticker: Mapped[str | None] = mapped_column(String(32)) name: Mapped[str] = mapped_column(String(128), nullable=False) currency: Mapped[str | None] = mapped_column(String(8)) isin: Mapped[str | None] = mapped_column(String(16)) instrument_type: Mapped[str | None] = mapped_column(String(16)) manual: Mapped[bool] = mapped_column(Boolean, default=False) last_verified_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) __table_args__ = ( UniqueConstraint("t212_ticker", name="uq_imap_t212_ticker"), Index("ix_imap_shortname", "t212_shortname"), Index("ix_imap_isin", "isin"), ) class TickerUniverse(Base): """The set of public tickers Cassandra is currently tracking. Populated as the union of all users' holdings, *without user attribution* — once a ticker is in the universe, the row carries no signal as to who put it there. The /api/universe endpoint returns the entire set (gzipped) to every authenticated client, so the request body itself doesn't leak which tickers belong to which user. Eviction policy: passive aging. last_referenced_at is bumped whenever the ticker appears in /api/portfolio/parse or /api/analyze. A nightly cron prunes rows older than UNIVERSE_EVICTION_TTL (60 days). """ __tablename__ = "ticker_universe" yahoo_ticker: Mapped[str] = mapped_column(String(32), primary_key=True) currency: Mapped[str | None] = mapped_column(String(8)) first_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) last_referenced_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) __table_args__ = (Index("ix_universe_last_ref", "last_referenced_at"),) class JobRun(Base): """One row per scheduled-job invocation; powers /api/health + the ops footer.""" __tablename__ = "job_runs" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) name: Mapped[str] = mapped_column(String(64), nullable=False) started_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) status: Mapped[str] = mapped_column(String(16), default="running") # running|success|failed error: Mapped[str | None] = mapped_column(Text) items_written: Mapped[int | None] = mapped_column(Integer) __table_args__ = (Index("ix_jobruns_name_started", "name", "started_at"),) class EmailSend(Base): """Audit row per digest email send. Used for idempotency (don't send twice on the same UTC day) and for surfacing 'last delivery' on the Settings page.""" __tablename__ = "email_sends" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) user_id: Mapped[int] = mapped_column( ForeignKey("users.id", ondelete="CASCADE"), nullable=False, ) kind: Mapped[str] = mapped_column(String(16), nullable=False) # "daily" | "weekly" sent_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=utcnow, nullable=False, ) status: Mapped[str] = mapped_column(String(16), nullable=False) # "sent" | "skipped" | "error" error: Mapped[str | None] = mapped_column(String(255)) __table_args__ = ( Index("ix_email_sends_user_kind_sent", "user_id", "kind", "sent_at"), ) class PolarEvent(Base): """Audit + idempotency table for inbound Polar (MoR) webhook deliveries. Polar uses the Standard Webhooks spec, which guarantees each delivery carries a unique `webhook-id` header. We store that ID under a UNIQUE constraint so a replay of the same event is a no-op (the INSERT fails and the handler returns the prior result). `processed_at` distinguishes "delivered and handled" from "delivered but the handler crashed mid-flight" — the latter rows are what an operator looks at when investigating a stuck subscription.""" __tablename__ = "polar_events" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) event_id: Mapped[str] = mapped_column(String(128), nullable=False) event_type: Mapped[str] = mapped_column(String(64), nullable=False) received_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=utcnow, nullable=False, ) processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) error: Mapped[str | None] = mapped_column(Text) # Raw JSON body, kept for forensics. Truncated to 16 KiB to keep # one bad request from blowing up the row. payload: Mapped[str] = mapped_column(Text, nullable=False) __table_args__ = ( UniqueConstraint("event_id", name="uq_polar_events_event_id"), Index("ix_polar_events_type_received", "event_type", "received_at"), ) class StripeEvent(Base): """Audit + idempotency table for inbound Stripe webhook deliveries. Same shape and purpose as PolarEvent — Stripe's `event.id` plays the same role as Standard Webhooks' `webhook-id`. We keep the tables distinct (rather than a single 'webhook_events' table) so an operator can look at the audit trail per processor without filtering on a `source` column.""" __tablename__ = "stripe_events" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) event_id: Mapped[str] = mapped_column(String(128), nullable=False) event_type: Mapped[str] = mapped_column(String(64), nullable=False) received_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=utcnow, nullable=False, ) processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) error: Mapped[str | None] = mapped_column(Text) payload: Mapped[str] = mapped_column(Text, nullable=False) __table_args__ = ( UniqueConstraint("event_id", name="uq_stripe_events_event_id"), Index("ix_stripe_events_type_received", "event_type", "received_at"), ) class CsvFormatTemplate(Base): """Cached column-mapping for a single broker CSV format. Populated on the first upload of a previously-unseen format via the LLM-fallback parser. Subsequent uploads of the same format (identified by ``fingerprint``, a sha256 of the normalised header row) replay ``mapping`` deterministically with no LLM call. The table holds the actual ``headers`` and one anonymous ``sample_row`` from the originating upload — there is no ``user_id`` column, no link back to the uploader. The sample exists so the operator has concrete material to look at when hand-writing future native parsers; the system never auto-generates or modifies parser code from this data. """ __tablename__ = "csv_format_templates" id: Mapped[int] = mapped_column(_PK, primary_key=True, autoincrement=True) fingerprint: Mapped[str] = mapped_column(String(64), unique=True, nullable=False) headers: Mapped[list[str]] = mapped_column(JSON, nullable=False) sample_row: Mapped[list[str]] = mapped_column(JSON, nullable=False) mapping: Mapped[dict[str, str | None]] = mapped_column(JSON, nullable=False) preamble_rows: Mapped[int] = mapped_column(Integer, nullable=False, default=0) delimiter: Mapped[str] = mapped_column(String(1), nullable=False, default=",") broker_label: Mapped[str | None] = mapped_column(String(128)) first_seen_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, default=utcnow, ) # use_count and last_used_at are application-managed: parse_with_llm # increments use_count and sets last_used_at = utcnow() on every cache hit. # No onupdate hook — we don't want unrelated writes (e.g. broker_label edits) # to re-stamp last_used_at. use_count: Mapped[int] = mapped_column(Integer, nullable=False, default=1) last_used_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, default=utcnow, ) llm_model: Mapped[str | None] = mapped_column(String(64)) llm_cost_usd: Mapped[float | None] = mapped_column(Float)