Categorization stage (two-pass + single-pass)

First real implementation of the categorization pipeline stage. Ports two of the three legacy Gemini-via-OpenRouter strategies. Introduces the backend's first LLM integration, a per-item DB table, DB-backed categorizer settings, and a Storage abstraction with a local-disk implementation.

Spec Draft 2026-05-19 Β· pending user review

TL;DRSummary

What this changes

The categorization stage is a stub today (features/pipeline/stages.py:25). Legacy has three Gemini-via-OpenRouter variants under checkin-pipeline/app/steps/classify/; we port two-pass (current legacy default) and single-pass.

The slice does more than translate two strategy modules. Categorization is the first stage that needs a per-item DB record (deal_items; segmentation populates it), calls an LLM (new integrations/openrouter/, predictable home for appraisal next), and reads files via storage (new core/storage.py protocol; LocalFsStorage only β€” Azure backend is a follow-up slice that just implements the protocol). Strategy selection mirrors segmentation: a singleton categorizer_settings row + build_categorizer() factory. Per-item failures don't fail the stage; re-runs reprocess pending + failed.

Β§1Context

What exists

Legacy categorization (the source we are porting)

Β§2Scope (in / out)

In scope

  • New core/storage.py: Storage protocol + LocalFsStorage. VCC_STORAGE_KIND=local (default).
  • New api/routers/files.py: GET /files/{key:path} proxy for LocalFsStorage.
  • New integrations/openrouter/: client.py (lifted), schemas.py.
  • New features/categorization/: models, settings, categorizer protocol, two strategies, factory, taxonomy loader, stage entry, prompts, TOML+CSV.
  • Extend run_segmentation_stage: insert one DealItem per crop; refactor I/O onto Storage.
  • Migration 0016_categorization.py: deal_items, categorizer_settings, three nullable columns on pipeline_runs.
  • Wire categorization_stage in features/pipeline/stages.py.
  • Worker startup validates VCC_OPENROUTER_API_KEY.
  • Manual smoke script backend/scripts/smoke_categorize.py (one fixture image; hand-run).

Out of scope

  • Describe-then-classify strategy.
  • Azure Blob Storage implementation (follow-up).
  • Source-image download from deal_images.url into storage.
  • Admin UI for editing categorizer_settings.
  • Prompt-management or taxonomy-editing UI.
  • Per-deal categorizer-strategy override.
  • Live OpenRouter calls in CI.
  • Per-LLM-call llm_calls audit table.

Β§3Architecture

Directory layout

backend/src/vcc_backend/
β”œβ”€β”€ core/
β”‚   └── storage.py                   # NEW β€” Storage Protocol + LocalFsStorage
β”œβ”€β”€ api/
β”‚   └── routers/
β”‚       └── files.py                 # NEW β€” GET /files/{key:path}
β”œβ”€β”€ integrations/
β”‚   └── openrouter/                  # NEW β€” first LLM integration
β”‚       β”œβ”€β”€ __init__.py
β”‚       β”œβ”€β”€ client.py                # lifted from checkin-pipeline/app/gemini/client.py
β”‚       └── schemas.py               # ChatResponse, JsonResponse, TokenUsage
β”œβ”€β”€ features/
β”‚   β”œβ”€β”€ segmentation/
β”‚   β”‚   └── step.py                  # EXTENDED β€” storage I/O + DealItem insert
β”‚   β”œβ”€β”€ categorization/              # NEW
β”‚   β”‚   β”œβ”€β”€ __init__.py
β”‚   β”‚   β”œβ”€β”€ models.py                # DealItem, CategorizerSettings ORM
β”‚   β”‚   β”œβ”€β”€ schemas.py               # pydantic Settings types
β”‚   β”‚   β”œβ”€β”€ settings.py              # get_categorizer_settings(session) + upsert
β”‚   β”‚   β”œβ”€β”€ categorizer.py           # Categorizer Protocol, dataclasses
β”‚   β”‚   β”œβ”€β”€ two_pass.py              # TwoPassCategorizer
β”‚   β”‚   β”œβ”€β”€ single_pass.py           # SinglePassCategorizer
β”‚   β”‚   β”œβ”€β”€ factory.py               # build_categorizer(settings, http)
β”‚   β”‚   β”œβ”€β”€ taxonomy.py              # loads TOML+CSV; resolve_subcategory()
β”‚   β”‚   β”œβ”€β”€ step.py                  # run_categorization_stage(session, deal)
β”‚   β”‚   β”œβ”€β”€ config/
β”‚   β”‚   β”‚   β”œβ”€β”€ categories.toml
β”‚   β”‚   β”‚   └── subcategories.csv
β”‚   β”‚   └── prompts/
β”‚   β”‚       β”œβ”€β”€ two_pass.category.md
β”‚   β”‚       β”œβ”€β”€ two_pass.subcategory.md
β”‚   β”‚       └── single_pass.md
β”‚   └── pipeline/
β”‚       └── stages.py                # EDITED β€” wire run_categorization_stage

Sequence

process_deal(deal_id) [Procrastinate worker] β”‚ β–Ό pipeline.orchestrator.run_pipeline(session, deal) β”‚ β”œβ”€β–Ί run_segmentation_stage(session, deal) β”‚ β”‚ β”‚ β”œβ”€β–Ί storage.write_bytes(key, crop_png_bytes) [per crop] β”‚ └─► INSERT deal_items (status=pending, crop_key, …) [per crop] β”‚ β”œβ”€β–Ί COMMIT (segmentation transaction) β”‚ β”œβ”€β–Ί run_categorization_stage(session, deal) β”‚ β”‚ β”‚ β”œβ”€β–Ί load DealItem WHERE status IN ('pending','failed') β”‚ β”œβ”€β–Ί settings = await get_categorizer_settings(session) β”‚ β”œβ”€β–Ί categorizer = build_categorizer(settings, http) [Two-pass | Single-pass] β”‚ β”œβ”€β–Ί async with TaskGroup, Semaphore(settings.concurrency): β”‚ β”‚ β”œβ”€β–Ί storage.read_bytes(item.crop_key) β”‚ β”‚ β”œβ”€β–Ί categorizer.classify(image_bytes, size_hint) β”‚ β”‚ β”œβ”€β–Ί β†’ 1 or 2 calls to OpenRouterClient β”‚ β”‚ β”œβ”€β–Ί β†’ resolve_subcategory(label) β†’ canonical id β”‚ β”‚ └─► UPDATE deal_item SET category/subcategory/…, status='classified' β”‚ β”‚ (per-item exception β†’ SET status='failed', log; do NOT raise) β”‚ β”œβ”€β–Ί PipelineRun.input_tokens / output_tokens / model = aggregates β”‚ └─► (orchestrator commits stage transaction) β”‚ └─► run_appraisal_stage(…) [stub today]
Why a core/storage.py seam Segmentation today writes crops via raw pathlib.Path under data_dir. Categorization needs to read those crops. Adding a second feature that pokes data_dir directly would mean rewriting both when Azure lands. A Storage protocol contains the blast radius: Azure becomes a new class, not a refactor. LocalFsStorage is the only implementation in this slice; the API surface is pinned so the future Azure backend is mechanical.
Why integrations/openrouter/ and not features/categorization/llm.py Mirrors integrations/hubspot/. Appraisal will be the next LLM consumer; co-locating now avoids a move later. The client is portable (httpx, no domain assumptions) β€” fits the integration boundary.

Β§4Data model

New table β€” deal_items

ColumnTypeNotes
idUUID PKuuid4() default
deal_idUUID FK β†’ deals.idON DELETE CASCADE; indexed
source_imagetext NOT NULLFilename of the original photo
crop_keytext NOT NULLStorage-agnostic key, e.g. deals/{deal_id}/crops/IMG_0421/item_03.png
bboxJSONB NULL{x, y, w, h} from segmentation; nullable for forward-compat
categorytext NULLPopulated by categorization
category_confidencedouble NULL
subcategorytext NULLFree-text label from the model
subcategory_idtext NULLCanonical id from resolve_subcategory() (NULL if unresolved)
subcategory_confidencedouble NULL
statusenum (pending|classified|failed) NOT NULLdefault pending
errortext NULLOn failed, last error truncated to 2000 chars
created_at / updated_attimestamptz NOT NULLfunc.now() server default; onupdate on updated_at

Lives in features/categorization/models.py. Segmentation is the row creator but does not own the table β€” same as how PipelineRun lives in features/pipeline/ even though other features write to it.

New table β€” categorizer_settings (singleton)

Mirrors segmenter_settings. Single-row table; pydantic-serialized settings JSONB plus updated_at / updated_by.

class TwoPassConfig(BaseModel):
    model: str = "google/gemini-2.5-flash"
    timeout_seconds: float = 30.0
    temperature: float = 0.0

class SinglePassConfig(BaseModel):
    model: str = "google/gemini-2.5-flash"
    timeout_seconds: float = 30.0
    temperature: float = 0.0

class CategorizerSettings(BaseModel):
    kind: Literal["two_pass", "single_pass"] = "two_pass"
    concurrency: int = 10
    two_pass: TwoPassConfig = Field(default_factory=TwoPassConfig)
    single_pass: SinglePassConfig | None = None

Modified table β€” pipeline_runs

ColumnTypeNotes
input_tokensinteger NULLSum across all LLM calls for this stage execution
output_tokensinteger NULLSum across all LLM calls for this stage execution
modeltext NULLConfigured model at execution time

All nullable; older stages and old rows stay NULL. No backfill.

Migration 0016_categorization.py

One Alembic revision. Up creates deal_items, categorizer_settings, and the three columns on pipeline_runs. Down drops in reverse. Append deal_items, categorizer_settings to TRUNCATE_TABLES in backend/tests/conftest.py.

Β§5Storage abstraction

Storage protocol

# core/storage.py
class Storage(Protocol):
    async def read_bytes(self, key: str) -> bytes: ...
    async def write_bytes(self, key: str, data: bytes) -> None: ...
    async def exists(self, key: str) -> bool: ...
    async def delete_prefix(self, prefix: str) -> None: ...
    def signed_url(self, key: str, ttl_seconds: int = 3600) -> str: ...

Keys are POSIX-style strings (e.g. deals/{deal_id}/crops/IMG_0421/item_03.png). Never contain backslashes; never start with /. The protocol is intentionally minimal β€” no streaming, no metadata. Add when needed.

LocalFsStorage(root: Path)

GET /files/{key:path} proxy route

Resolves the key under data_dir, validates it stays under data_dir, streams the file with sniffed Content-Type. 404 for missing keys, 400 for invalid keys. Documented as dev/local only β€” in production with Azure, the proxy is unmounted and the Azure backend issues real signed URLs.

Selection

Settings.storage_kind: Literal["local"] = "local" (env: VCC_STORAGE_KIND). A second pydantic-settings field group per kind (only local today). core/storage.py exposes get_storage() β€” lru_cached, mirrors get_db_engine.

Future Azure backend (out of slice) AzureBlobStorage(account, container, credential) implements the same protocol. signed_url issues SAS URLs. No code change needed in segmentation or categorization β€” the contract is the seam.

Β§6OpenRouter client (integrations/openrouter/)

Lift

Copy checkin-pipeline/app/gemini/client.py to integrations/openrouter/client.py. Rename methods onto a class:

Class takes an httpx.AsyncClient and API key in __init__; instances constructed per-stage (cheap, no global state).

Typed errors

# integrations/openrouter/client.py
class OpenRouterError(Exception):
    """Base. Carries status + first 500 bytes of body."""
class OpenRouterAuthError(OpenRouterError):    pass   # 401, 403
class OpenRouterQuotaError(OpenRouterError):   pass   # 429, 402
class OpenRouterServerError(OpenRouterError):  pass   # 5xx
class OpenRouterParseError(OpenRouterError):   pass   # JSON-mode response didn't parse

The categorization stage uses these to distinguish per-item failures (caught, item β†’ failed) from stage-level failures (auth/quota, re-raised past the per-item try/except).

Schemas

# integrations/openrouter/schemas.py
class TokenUsage(BaseModel):
    input_tokens: int
    output_tokens: int

class ChatResponse(BaseModel):
    text: str
    usage: TokenUsage
    model: str

class JsonResponse(BaseModel):
    data: dict[str, Any]      # already JSON-parsed; raises on bad JSON
    usage: TokenUsage
    model: str

Settings additions (core/settings.py)

openrouter_api_key:        SecretStr | None = None
openrouter_base_url:       str = "https://openrouter.ai/api/v1"
openrouter_default_model:  str = "google/gemini-2.5-flash"
storage_kind:              Literal["local"] = "local"

Worker startup adds VCC_OPENROUTER_API_KEY to the existing required-env validation block (next to HUBSPOT_ACCESS_TOKEN).

Β§7Categorizer strategies

Categorizer protocol

# features/categorization/categorizer.py
class SizeHint(BaseModel):
    width_cm:  float
    height_cm: float

class ItemClassification(BaseModel):
    category:               str | None
    category_confidence:    float | None
    subcategory:            str | None
    subcategory_id:         str | None
    subcategory_confidence: float | None

class Categorizer(Protocol):
    async def classify(
        self, *, image_bytes: bytes, size_hint: SizeHint | None
    ) -> tuple[ItemClassification, TokenUsage]: ...

TwoPassCategorizer

class TwoPassCategorizer:
    def __init__(self, client, taxonomy, cfg): ...

    async def classify(self, *, image_bytes, size_hint):
        # 1. category call
        sys1 = render_prompt("two_pass.category.md",
            categories=self.taxonomy.categories_block(),
            size_suffix=format_size_suffix(size_hint))
        r1 = await self.client.vision_json_request(image_bytes, sys1, CATEGORY_SCHEMA,
            model=self.cfg.model, timeout=self.cfg.timeout_seconds, temperature=self.cfg.temperature)
        category = r1.data["category"]; cat_conf = r1.data["confidence"]

        # 2. subcategory call (scoped to chosen category)
        sys2 = render_prompt("two_pass.subcategory.md",
            subcategory_block=self.taxonomy.subcategory_block(category),
            category=category, size_suffix=format_size_suffix(size_hint))
        r2 = await self.client.vision_json_request(image_bytes, sys2, SUBCATEGORY_SCHEMA,
            model=self.cfg.model, timeout=self.cfg.timeout_seconds, temperature=self.cfg.temperature)
        subcat_label = r2.data["subcategory"]; sub_conf = r2.data["confidence"]
        subcat_id = self.taxonomy.resolve_subcategory(category, subcat_label)

        return (ItemClassification(category=category, category_confidence=cat_conf,
                                   subcategory=subcat_label, subcategory_id=subcat_id,
                                   subcategory_confidence=sub_conf),
                TokenUsage(input_tokens =r1.usage.input_tokens  + r2.usage.input_tokens,
                           output_tokens=r1.usage.output_tokens + r2.usage.output_tokens))

SinglePassCategorizer

class SinglePassCategorizer:
    async def classify(self, *, image_bytes, size_hint):
        sys = render_prompt("single_pass.md",
            tree=self.taxonomy.tree_block(), size_suffix=format_size_suffix(size_hint))
        r = await self.client.vision_json_request(image_bytes, sys, SINGLE_PASS_SCHEMA,
            model=self.cfg.model, timeout=self.cfg.timeout_seconds, temperature=self.cfg.temperature)
        cat = r.data["category"]; subcat_label = r.data["subcategory"]; conf = r.data["confidence"]
        subcat_id = self.taxonomy.resolve_subcategory(cat, subcat_label)
        # single-pass returns one confidence; copy to both fields for shape parity
        return (ItemClassification(category=cat, category_confidence=conf,
                                   subcategory=subcat_label, subcategory_id=subcat_id,
                                   subcategory_confidence=conf),
                r.usage)

build_categorizer(settings, http)

# features/categorization/factory.py
async def build_categorizer(settings: CategorizerSettings, http: httpx.AsyncClient) -> Categorizer:
    cfg = get_settings()
    if not cfg.openrouter_api_key:
        raise RuntimeError("VCC_OPENROUTER_API_KEY not configured")
    client = OpenRouterClient(http, api_key=cfg.openrouter_api_key.get_secret_value(),
                              base_url=cfg.openrouter_base_url)
    taxonomy = Taxonomy.load()  # reads TOML+CSV each call
    if settings.kind == "two_pass":
        return TwoPassCategorizer(client, taxonomy, settings.two_pass)
    if settings.kind == "single_pass":
        return SinglePassCategorizer(client, taxonomy, settings.single_pass or SinglePassConfig())
    raise ValueError(f"unknown categorizer kind: {settings.kind}")

Taxonomy loader

Prompts

Copied from checkin-pipeline/app/steps/classify/{two_pass,single_pass}/*.md, placeholder names normalized:

render_prompt(name, **kwargs) does Template(text).safe_substitute(**kwargs) against the file contents β€” re-read each call.

Β§8Stage entry (features/categorization/step.py)

async def run_categorization_stage(session: AsyncSession, deal: Deal) -> None:
    items = (await session.scalars(
        select(DealItem).where(DealItem.deal_id == deal.id,
                               DealItem.status.in_([ItemStatus.PENDING, ItemStatus.FAILED]))
    )).all()
    if not items:
        logger.info("categorization_skipped_no_items", extra={"deal_id": str(deal.id)})
        return

    settings   = await get_categorizer_settings(session)
    storage    = get_storage()
    totals     = {"input_tokens": 0, "output_tokens": 0}

    async with httpx.AsyncClient() as http:
        categorizer = await build_categorizer(settings, http)
        sem = asyncio.Semaphore(settings.concurrency)

        async def classify_one(item: DealItem) -> None:
            async with sem:
                try:
                    image_bytes = await storage.read_bytes(item.crop_key)
                    result, usage = await categorizer.classify(image_bytes=image_bytes, size_hint=None)
                    item.category               = result.category
                    item.category_confidence    = result.category_confidence
                    item.subcategory            = result.subcategory
                    item.subcategory_id         = result.subcategory_id
                    item.subcategory_confidence = result.subcategory_confidence
                    item.status                 = ItemStatus.CLASSIFIED
                    item.error                  = None
                    totals["input_tokens"]  += usage.input_tokens
                    totals["output_tokens"] += usage.output_tokens
                except Exception as exc:
                    item.status = ItemStatus.FAILED
                    item.error  = repr(exc)[:2000]
                    logger.warning("categorization_item_failed",
                                   extra={"deal_id": str(deal.id),
                                          "item_id": str(item.id),
                                          "error": repr(exc)})
                finally:
                    await session.flush()  # keep ORM state consistent within the open tx

        async with asyncio.TaskGroup() as tg:
            for item in items:
                tg.create_task(classify_one(item))

    # Record aggregate usage onto the in-progress PipelineRun row
    run = await _current_pipeline_run(session, deal, PipelineStage.CATEGORIZATION)
    if run is not None:
        run.input_tokens  = totals["input_tokens"]
        run.output_tokens = totals["output_tokens"]
        run.model         = _resolve_model(settings)
    # commit happens in the orchestrator

_current_pipeline_run(...) selects the in-flight pipeline_runs row (status=running) for this deal+stage; the orchestrator at features/pipeline/orchestrator.py:31-38 creates and flushes that row before calling the stage. _resolve_model(settings) returns settings.two_pass.model or (settings.single_pass or SinglePassConfig()).model based on kind.

Why size_hint=None at the call site Legacy's format_size_suffix expects physical dimensions in cm, which the segmentation bbox (pixel coordinates) does not provide on its own. Until segmentation grows a DPI/scale model, we pass None β€” the protocol leaves the seam for later. Items still classify; just without the physical-size suffix in the prompt.
Flush vs commit session.flush() per item only pushes pending ORM changes into the open transaction so subsequent reads see them β€” it does not commit. If the stage itself raises (auth/quota/build error), the orchestrator's per-stage rollback wipes every flushed update, returning items to their original pending/failed state for the next retry. Consistent with the orchestrator's documented transaction model.

Β§9Error handling

Β§10Concurrency

Β§11Configuration

Environment

Env varDefaultRequiredPurpose
VCC_STORAGE_KINDlocalnoStorage backend selector
VCC_OPENROUTER_API_KEY(none)worker startupOpenRouter auth
VCC_OPENROUTER_BASE_URLhttps://openrouter.ai/api/v1no
VCC_OPENROUTER_DEFAULT_MODELgoogle/gemini-2.5-flashnoFallback for settings defaults

Runtime (DB-backed)

categorizer_settings.settings: kind, concurrency, per-kind model / timeout_seconds / temperature.

Repo (no env)

features/categorization/config/categories.toml, config/subcategories.csv, prompts/*.md β€” copied verbatim from legacy. Edited via PR.

Β§12Testing

1 Β· Unit β€” strategies (test_two_pass.py, test_single_pass.py)

Mock OpenRouterClient.vision_json_request to return canned JsonResponses. Assert:

2 Β· Unit β€” taxonomy (test_taxonomy.py)

Load the real shipped TOML + CSV. Assert 11 categories load, known subcategory resolves both case-sensitively and case-insensitively, rendered blocks for each category are non-empty.

3 Β· Integration β€” stage (test_step.py, real Postgres, mocked LLM + storage)

Uses the existing testcontainers conftest. Per test:

Second test (test_step_rerun.py): pre-seed one failed and one classified item; run again; assert only failed retried, classified untouched.

4 Β· Unit β€” storage (tests/core/test_storage.py)

LocalFsStorage round-trip under tmp_path; delete_prefix recurses; signed_url returns /files/{key} with URL-quoting; path-escape attempt (..) rejected.

Not tested Live OpenRouter in CI (manual backend/scripts/smoke_categorize.py handles that hand-run before merge). Full segmentation+categorization end-to-end pipeline (segmentation's existing test gets one new assertion that DealItem rows are inserted; orchestrator's existing test covers wiring).

Fixtures

tests/features/categorization/fixtures/sample.png β€” single 64Γ—64 RGB PNG checked into the repo. ~200 bytes.

Β§13Decisions made (during brainstorm)

  1. DealItem rows come from segmentation. Cleaner contract than letting categorization scan disk. Categorization reads WHERE status IN ('pending','failed').
  2. DB-backed CategorizerSettings with kind enum (two_pass | single_pass). Mirrors SegmenterSettings; consistent with the existing factory pattern.
  3. LLM client lives in integrations/openrouter/. Parallels integrations/hubspot/; predictable home for appraisal next.
  4. Categories / subcategories / prompts: copy legacy files as-is under features/categorization/. Re-read each call. Edits live in dev.
  5. Token tracking: nullable columns on pipeline_runs (input_tokens, output_tokens, model). Cheap; useful immediately. No separate llm_calls table.
  6. Per-deal concurrency: 10, configurable. Matches legacy.
  7. Storage abstraction lands in this slice; LocalFsStorage only. Azure backend is a follow-up that only implements the protocol.
  8. Describe-then-classify is not ported. The variation registry pattern leaves the seam.
  9. Per-item failures don't fail the stage. Items get status='failed' + error; re-runs reprocess them. Stage-level failures (auth, quota, config) bubble up for orchestrator backoff/retry.