Categorization stage (two-pass + single-pass)
First real implementation of the categorization pipeline stage. Ports two of the three legacy Gemini-via-OpenRouter strategies. Introduces the backend's first LLM integration, a per-item DB table, DB-backed categorizer settings, and a Storage abstraction with a local-disk implementation.
TL;DRSummary
The categorization stage is a stub today (features/pipeline/stages.py:25). Legacy has three Gemini-via-OpenRouter variants under checkin-pipeline/app/steps/classify/; we port two-pass (current legacy default) and single-pass.
The slice does more than translate two strategy modules. Categorization is the first stage that needs a per-item DB record (deal_items; segmentation populates it), calls an LLM (new integrations/openrouter/, predictable home for appraisal next), and reads files via storage (new core/storage.py protocol; LocalFsStorage only β Azure backend is a follow-up slice that just implements the protocol). Strategy selection mirrors segmentation: a singleton categorizer_settings row + build_categorizer() factory. Per-item failures don't fail the stage; re-runs reprocess pending + failed.
Β§1Context
What exists
features/pipeline/stages.py:25βcategorization_stageis areturn Nonestub.STAGES/NEXT_STAGEalready routeSEGMENTATION β CATEGORIZATION β APPRAISAL.features/segmentation/β precedent we mirror.step.pyentry,config.pywithkindenum +build_segmenter()factory,settings.pyover a singleton DB row.features/pipeline/models.pyβPipelineRunaudit row. No token-usage columns today.core/settings.py:19βdata_dir: Path = Path("data"). The only storage primitive in the backend; segmentation pokespathlib.Pathdirectly.features/deal_images/β stores URLs of source photos, not bytes on disk. Not relevant to crop storage.- No backend code imports any LLM SDK;
httpx>=0.27is the only HTTP dependency.
Legacy categorization (the source we are porting)
checkin-pipeline/app/steps/classify/two_pass/β two Gemini vision calls (category β category-scoped subcategory). Legacy default.checkin-pipeline/app/steps/classify/single_pass/β one Gemini vision JSON call returning both fields.checkin-pipeline/app/steps/classify/describe_classify/β vision-describe + text-classify. Not ported in this slice._shared.pyβCLASSIFICATION_SCHEMA,resolve_subcategory(),format_size_suffix(),strip_code_fences().deal_step.pyβ orchestrator: registry-based variation loader, per-dealSemaphore(10), token accumulation onpipeline_runs.app/gemini/client.pyβ OpenRouter HTTP client with three wrappers (vision_request,vision_json_request,text_json_request).config/categories.toml(11 categories) +subcategories.csv(per-category list with hint metadata).app/steps/_prompts.pyreads prompt.mdfiles on every call (no caching); injects{categories}and{subcategory_tree}.
Β§2Scope (in / out)
In scope
- New
core/storage.py:Storageprotocol +LocalFsStorage.VCC_STORAGE_KIND=local(default). - New
api/routers/files.py:GET /files/{key:path}proxy forLocalFsStorage. - New
integrations/openrouter/:client.py(lifted),schemas.py. - New
features/categorization/: models, settings, categorizer protocol, two strategies, factory, taxonomy loader, stage entry, prompts, TOML+CSV. - Extend
run_segmentation_stage: insert oneDealItemper crop; refactor I/O ontoStorage. - Migration
0016_categorization.py:deal_items,categorizer_settings, three nullable columns onpipeline_runs. - Wire
categorization_stageinfeatures/pipeline/stages.py. - Worker startup validates
VCC_OPENROUTER_API_KEY. - Manual smoke script
backend/scripts/smoke_categorize.py(one fixture image; hand-run).
Out of scope
- Describe-then-classify strategy.
- Azure Blob
Storageimplementation (follow-up). - Source-image download from
deal_images.urlinto storage. - Admin UI for editing
categorizer_settings. - Prompt-management or taxonomy-editing UI.
- Per-deal categorizer-strategy override.
- Live OpenRouter calls in CI.
- Per-LLM-call
llm_callsaudit table.
Β§3Architecture
Directory layout
backend/src/vcc_backend/
βββ core/
β βββ storage.py # NEW β Storage Protocol + LocalFsStorage
βββ api/
β βββ routers/
β βββ files.py # NEW β GET /files/{key:path}
βββ integrations/
β βββ openrouter/ # NEW β first LLM integration
β βββ __init__.py
β βββ client.py # lifted from checkin-pipeline/app/gemini/client.py
β βββ schemas.py # ChatResponse, JsonResponse, TokenUsage
βββ features/
β βββ segmentation/
β β βββ step.py # EXTENDED β storage I/O + DealItem insert
β βββ categorization/ # NEW
β β βββ __init__.py
β β βββ models.py # DealItem, CategorizerSettings ORM
β β βββ schemas.py # pydantic Settings types
β β βββ settings.py # get_categorizer_settings(session) + upsert
β β βββ categorizer.py # Categorizer Protocol, dataclasses
β β βββ two_pass.py # TwoPassCategorizer
β β βββ single_pass.py # SinglePassCategorizer
β β βββ factory.py # build_categorizer(settings, http)
β β βββ taxonomy.py # loads TOML+CSV; resolve_subcategory()
β β βββ step.py # run_categorization_stage(session, deal)
β β βββ config/
β β β βββ categories.toml
β β β βββ subcategories.csv
β β βββ prompts/
β β βββ two_pass.category.md
β β βββ two_pass.subcategory.md
β β βββ single_pass.md
β βββ pipeline/
β βββ stages.py # EDITED β wire run_categorization_stage
Sequence
core/storage.py seam
Segmentation today writes crops via raw pathlib.Path under data_dir. Categorization needs to read those crops. Adding a second feature that pokes data_dir directly would mean rewriting both when Azure lands. A Storage protocol contains the blast radius: Azure becomes a new class, not a refactor. LocalFsStorage is the only implementation in this slice; the API surface is pinned so the future Azure backend is mechanical.
integrations/openrouter/ and not features/categorization/llm.py
Mirrors integrations/hubspot/. Appraisal will be the next LLM consumer; co-locating now avoids a move later. The client is portable (httpx, no domain assumptions) β fits the integration boundary.
Β§4Data model
New table β deal_items
| Column | Type | Notes |
|---|---|---|
| id | UUID PK | uuid4() default |
| deal_id | UUID FK β deals.id | ON DELETE CASCADE; indexed |
| source_image | text NOT NULL | Filename of the original photo |
| crop_key | text NOT NULL | Storage-agnostic key, e.g. deals/{deal_id}/crops/IMG_0421/item_03.png |
| bbox | JSONB NULL | {x, y, w, h} from segmentation; nullable for forward-compat |
| category | text NULL | Populated by categorization |
| category_confidence | double NULL | |
| subcategory | text NULL | Free-text label from the model |
| subcategory_id | text NULL | Canonical id from resolve_subcategory() (NULL if unresolved) |
| subcategory_confidence | double NULL | |
| status | enum (pending|classified|failed) NOT NULL | default pending |
| error | text NULL | On failed, last error truncated to 2000 chars |
| created_at / updated_at | timestamptz NOT NULL | func.now() server default; onupdate on updated_at |
Lives in features/categorization/models.py. Segmentation is the row creator but does not own the table β same as how PipelineRun lives in features/pipeline/ even though other features write to it.
New table β categorizer_settings (singleton)
Mirrors segmenter_settings. Single-row table; pydantic-serialized settings JSONB plus updated_at / updated_by.
class TwoPassConfig(BaseModel):
model: str = "google/gemini-2.5-flash"
timeout_seconds: float = 30.0
temperature: float = 0.0
class SinglePassConfig(BaseModel):
model: str = "google/gemini-2.5-flash"
timeout_seconds: float = 30.0
temperature: float = 0.0
class CategorizerSettings(BaseModel):
kind: Literal["two_pass", "single_pass"] = "two_pass"
concurrency: int = 10
two_pass: TwoPassConfig = Field(default_factory=TwoPassConfig)
single_pass: SinglePassConfig | None = None
Modified table β pipeline_runs
| Column | Type | Notes |
|---|---|---|
| input_tokens | integer NULL | Sum across all LLM calls for this stage execution |
| output_tokens | integer NULL | Sum across all LLM calls for this stage execution |
| model | text NULL | Configured model at execution time |
All nullable; older stages and old rows stay NULL. No backfill.
Migration 0016_categorization.py
One Alembic revision. Up creates deal_items, categorizer_settings, and the three columns on pipeline_runs. Down drops in reverse. Append deal_items, categorizer_settings to TRUNCATE_TABLES in backend/tests/conftest.py.
Β§5Storage abstraction
Storage protocol
# core/storage.py
class Storage(Protocol):
async def read_bytes(self, key: str) -> bytes: ...
async def write_bytes(self, key: str, data: bytes) -> None: ...
async def exists(self, key: str) -> bool: ...
async def delete_prefix(self, prefix: str) -> None: ...
def signed_url(self, key: str, ttl_seconds: int = 3600) -> str: ...
Keys are POSIX-style strings (e.g. deals/{deal_id}/crops/IMG_0421/item_03.png). Never contain backslashes; never start with /. The protocol is intentionally minimal β no streaming, no metadata. Add when needed.
LocalFsStorage(root: Path)
rootisSettings.data_dir(defaultPath("data")).read_bytes/write_bytesviaaiofiles; parent dirs created on write.delete_prefixrecursively removesroot / prefix; safe against..escape (validated).signed_url(key)returnsf"/files/{quote(key)}". Thettl_secondsargument is accepted but ignored (documented).
GET /files/{key:path} proxy route
Resolves the key under data_dir, validates it stays under data_dir, streams the file with sniffed Content-Type. 404 for missing keys, 400 for invalid keys. Documented as dev/local only β in production with Azure, the proxy is unmounted and the Azure backend issues real signed URLs.
Selection
Settings.storage_kind: Literal["local"] = "local" (env: VCC_STORAGE_KIND). A second pydantic-settings field group per kind (only local today). core/storage.py exposes get_storage() β lru_cached, mirrors get_db_engine.
AzureBlobStorage(account, container, credential) implements the same protocol. signed_url issues SAS URLs. No code change needed in segmentation or categorization β the contract is the seam.
Β§6OpenRouter client (integrations/openrouter/)
Lift
Copy checkin-pipeline/app/gemini/client.py to integrations/openrouter/client.py. Rename methods onto a class:
gemini_vision_requestβOpenRouterClient.vision_request(image, system, model, β¦) β ChatResponsegemini_vision_json_requestβOpenRouterClient.vision_json_request(image, system, schema, model, β¦) β JsonResponsegemini_text_json_requestβOpenRouterClient.text_json_request(prompt, schema, model, β¦) β JsonResponse
Class takes an httpx.AsyncClient and API key in __init__; instances constructed per-stage (cheap, no global state).
Typed errors
# integrations/openrouter/client.py
class OpenRouterError(Exception):
"""Base. Carries status + first 500 bytes of body."""
class OpenRouterAuthError(OpenRouterError): pass # 401, 403
class OpenRouterQuotaError(OpenRouterError): pass # 429, 402
class OpenRouterServerError(OpenRouterError): pass # 5xx
class OpenRouterParseError(OpenRouterError): pass # JSON-mode response didn't parse
The categorization stage uses these to distinguish per-item failures (caught, item β failed) from stage-level failures (auth/quota, re-raised past the per-item try/except).
Schemas
# integrations/openrouter/schemas.py
class TokenUsage(BaseModel):
input_tokens: int
output_tokens: int
class ChatResponse(BaseModel):
text: str
usage: TokenUsage
model: str
class JsonResponse(BaseModel):
data: dict[str, Any] # already JSON-parsed; raises on bad JSON
usage: TokenUsage
model: str
Settings additions (core/settings.py)
openrouter_api_key: SecretStr | None = None
openrouter_base_url: str = "https://openrouter.ai/api/v1"
openrouter_default_model: str = "google/gemini-2.5-flash"
storage_kind: Literal["local"] = "local"
Worker startup adds VCC_OPENROUTER_API_KEY to the existing required-env validation block (next to HUBSPOT_ACCESS_TOKEN).
Β§7Categorizer strategies
Categorizer protocol
# features/categorization/categorizer.py
class SizeHint(BaseModel):
width_cm: float
height_cm: float
class ItemClassification(BaseModel):
category: str | None
category_confidence: float | None
subcategory: str | None
subcategory_id: str | None
subcategory_confidence: float | None
class Categorizer(Protocol):
async def classify(
self, *, image_bytes: bytes, size_hint: SizeHint | None
) -> tuple[ItemClassification, TokenUsage]: ...
TwoPassCategorizer
class TwoPassCategorizer:
def __init__(self, client, taxonomy, cfg): ...
async def classify(self, *, image_bytes, size_hint):
# 1. category call
sys1 = render_prompt("two_pass.category.md",
categories=self.taxonomy.categories_block(),
size_suffix=format_size_suffix(size_hint))
r1 = await self.client.vision_json_request(image_bytes, sys1, CATEGORY_SCHEMA,
model=self.cfg.model, timeout=self.cfg.timeout_seconds, temperature=self.cfg.temperature)
category = r1.data["category"]; cat_conf = r1.data["confidence"]
# 2. subcategory call (scoped to chosen category)
sys2 = render_prompt("two_pass.subcategory.md",
subcategory_block=self.taxonomy.subcategory_block(category),
category=category, size_suffix=format_size_suffix(size_hint))
r2 = await self.client.vision_json_request(image_bytes, sys2, SUBCATEGORY_SCHEMA,
model=self.cfg.model, timeout=self.cfg.timeout_seconds, temperature=self.cfg.temperature)
subcat_label = r2.data["subcategory"]; sub_conf = r2.data["confidence"]
subcat_id = self.taxonomy.resolve_subcategory(category, subcat_label)
return (ItemClassification(category=category, category_confidence=cat_conf,
subcategory=subcat_label, subcategory_id=subcat_id,
subcategory_confidence=sub_conf),
TokenUsage(input_tokens =r1.usage.input_tokens + r2.usage.input_tokens,
output_tokens=r1.usage.output_tokens + r2.usage.output_tokens))
SinglePassCategorizer
class SinglePassCategorizer:
async def classify(self, *, image_bytes, size_hint):
sys = render_prompt("single_pass.md",
tree=self.taxonomy.tree_block(), size_suffix=format_size_suffix(size_hint))
r = await self.client.vision_json_request(image_bytes, sys, SINGLE_PASS_SCHEMA,
model=self.cfg.model, timeout=self.cfg.timeout_seconds, temperature=self.cfg.temperature)
cat = r.data["category"]; subcat_label = r.data["subcategory"]; conf = r.data["confidence"]
subcat_id = self.taxonomy.resolve_subcategory(cat, subcat_label)
# single-pass returns one confidence; copy to both fields for shape parity
return (ItemClassification(category=cat, category_confidence=conf,
subcategory=subcat_label, subcategory_id=subcat_id,
subcategory_confidence=conf),
r.usage)
build_categorizer(settings, http)
# features/categorization/factory.py
async def build_categorizer(settings: CategorizerSettings, http: httpx.AsyncClient) -> Categorizer:
cfg = get_settings()
if not cfg.openrouter_api_key:
raise RuntimeError("VCC_OPENROUTER_API_KEY not configured")
client = OpenRouterClient(http, api_key=cfg.openrouter_api_key.get_secret_value(),
base_url=cfg.openrouter_base_url)
taxonomy = Taxonomy.load() # reads TOML+CSV each call
if settings.kind == "two_pass":
return TwoPassCategorizer(client, taxonomy, settings.two_pass)
if settings.kind == "single_pass":
return SinglePassCategorizer(client, taxonomy, settings.single_pass or SinglePassConfig())
raise ValueError(f"unknown categorizer kind: {settings.kind}")
Taxonomy loader
Taxonomy.load()readsconfig/categories.toml+config/subcategories.csvfrom disk every call (no cache β matches legacy).categories_block()renders category names + descriptions.subcategory_block(category)renders category-scoped subcategory list with hint metadata.tree_block()renders the full categoryβsubcategory tree.resolve_subcategory(category, label)case-insensitive substring match, exact preferred; returns canonical CSV id orNone.
Prompts
Copied from checkin-pipeline/app/steps/classify/{two_pass,single_pass}/*.md, placeholder names normalized:
prompts/two_pass.category.mdβ{categories},{size_suffix}prompts/two_pass.subcategory.mdβ{category},{subcategory_block},{size_suffix}prompts/single_pass.mdβ{tree},{size_suffix}
render_prompt(name, **kwargs) does Template(text).safe_substitute(**kwargs) against the file contents β re-read each call.
Β§8Stage entry (features/categorization/step.py)
async def run_categorization_stage(session: AsyncSession, deal: Deal) -> None:
items = (await session.scalars(
select(DealItem).where(DealItem.deal_id == deal.id,
DealItem.status.in_([ItemStatus.PENDING, ItemStatus.FAILED]))
)).all()
if not items:
logger.info("categorization_skipped_no_items", extra={"deal_id": str(deal.id)})
return
settings = await get_categorizer_settings(session)
storage = get_storage()
totals = {"input_tokens": 0, "output_tokens": 0}
async with httpx.AsyncClient() as http:
categorizer = await build_categorizer(settings, http)
sem = asyncio.Semaphore(settings.concurrency)
async def classify_one(item: DealItem) -> None:
async with sem:
try:
image_bytes = await storage.read_bytes(item.crop_key)
result, usage = await categorizer.classify(image_bytes=image_bytes, size_hint=None)
item.category = result.category
item.category_confidence = result.category_confidence
item.subcategory = result.subcategory
item.subcategory_id = result.subcategory_id
item.subcategory_confidence = result.subcategory_confidence
item.status = ItemStatus.CLASSIFIED
item.error = None
totals["input_tokens"] += usage.input_tokens
totals["output_tokens"] += usage.output_tokens
except Exception as exc:
item.status = ItemStatus.FAILED
item.error = repr(exc)[:2000]
logger.warning("categorization_item_failed",
extra={"deal_id": str(deal.id),
"item_id": str(item.id),
"error": repr(exc)})
finally:
await session.flush() # keep ORM state consistent within the open tx
async with asyncio.TaskGroup() as tg:
for item in items:
tg.create_task(classify_one(item))
# Record aggregate usage onto the in-progress PipelineRun row
run = await _current_pipeline_run(session, deal, PipelineStage.CATEGORIZATION)
if run is not None:
run.input_tokens = totals["input_tokens"]
run.output_tokens = totals["output_tokens"]
run.model = _resolve_model(settings)
# commit happens in the orchestrator
_current_pipeline_run(...) selects the in-flight pipeline_runs row (status=running) for this deal+stage; the orchestrator at features/pipeline/orchestrator.py:31-38 creates and flushes that row before calling the stage. _resolve_model(settings) returns settings.two_pass.model or (settings.single_pass or SinglePassConfig()).model based on kind.
size_hint=None at the call site
Legacy's format_size_suffix expects physical dimensions in cm, which the segmentation bbox (pixel coordinates) does not provide on its own. Until segmentation grows a DPI/scale model, we pass None β the protocol leaves the seam for later. Items still classify; just without the physical-size suffix in the prompt.
session.flush() per item only pushes pending ORM changes into the open transaction so subsequent reads see them β it does not commit. If the stage itself raises (auth/quota/build error), the orchestrator's per-stage rollback wipes every flushed update, returning items to their original pending/failed state for the next retry. Consistent with the orchestrator's documented transaction model.
Β§9Error handling
- Per-item LLM failure (timeout, OpenRouter 5xx, JSON parse error): item marked
failed, error captured, stage proceeds. Re-runs reprocesspending+failed. - Per-item storage read failure: same as per-item LLM failure.
- Settings / build_categorizer / get_storage failure: bubble up. Orchestrator rolls back the stage transaction. Deal stays at
SEGMENTATION. Procrastinate retries with backoff. - OpenRouter auth (401) / quota (429): bubble up after the first item β not worth burning items on a known-bad config.
OpenRouterClientraises typedOpenRouterAuthError/OpenRouterQuotaErroron these statuses;classify_onere-raises them past thetry/except. Other exceptions are caught. - TaskGroup cancellation: if any task re-raises (auth/quota cases), the TaskGroup cancels the rest. In-flight item updates already flushed survive in memory; the stage transaction then rolls back. Items reprocess on retry.
Β§10Concurrency
asyncio.Semaphore(settings.concurrency)per deal (default 10).- No global cap across deals. Worker is single-process today; concurrent deals are not a real concern.
- When the worker scales, a global cap will likely live on the
OpenRouterClientβ out of slice.
Β§11Configuration
Environment
| Env var | Default | Required | Purpose |
|---|---|---|---|
| VCC_STORAGE_KIND | local | no | Storage backend selector |
| VCC_OPENROUTER_API_KEY | (none) | worker startup | OpenRouter auth |
| VCC_OPENROUTER_BASE_URL | https://openrouter.ai/api/v1 | no | |
| VCC_OPENROUTER_DEFAULT_MODEL | google/gemini-2.5-flash | no | Fallback for settings defaults |
Runtime (DB-backed)
categorizer_settings.settings: kind, concurrency, per-kind model / timeout_seconds / temperature.
Repo (no env)
features/categorization/config/categories.toml, config/subcategories.csv, prompts/*.md β copied verbatim from legacy. Edited via PR.
Β§12Testing
1 Β· Unit β strategies (test_two_pass.py, test_single_pass.py)
Mock OpenRouterClient.vision_json_request to return canned JsonResponses. Assert:
- Two-pass makes exactly two calls; single-pass makes one.
- Prompt text contains injected
{categories}/{subcategory_block}/{tree}. size_suffixappended whensize_hintprovided, omitted when not.- Known subcategory label resolves to right id; case-insensitive match works; unknown label β
subcategory_id=None(label preserved). TokenUsagesummed across passes (two-pass) or passed through (single-pass).
2 Β· Unit β taxonomy (test_taxonomy.py)
Load the real shipped TOML + CSV. Assert 11 categories load, known subcategory resolves both case-sensitively and case-insensitively, rendered blocks for each category are non-empty.
3 Β· Integration β stage (test_step.py, real Postgres, mocked LLM + storage)
Uses the existing testcontainers conftest. Per test:
- Create a Deal, insert 3
DealItemrows. - Patch
OpenRouterClient.vision_json_requestto return[success, raises httpx.ReadTimeout, success]. - Patch
Storage.read_bytes(monkeypatch atget_storage()cache) to return a canned 64Γ64 PNG. - Run
run_categorization_stage(session, deal). - Assert: 2 items
classifiedwith fields set, 1 itemfailedwith classification NULL anderrorset,PipelineRunforCATEGORIZATIONhas aggregate tokens + model, stage returned without raising.
Second test (test_step_rerun.py): pre-seed one failed and one classified item; run again; assert only failed retried, classified untouched.
4 Β· Unit β storage (tests/core/test_storage.py)
LocalFsStorage round-trip under tmp_path; delete_prefix recurses; signed_url returns /files/{key} with URL-quoting; path-escape attempt (..) rejected.
backend/scripts/smoke_categorize.py handles that hand-run before merge). Full segmentation+categorization end-to-end pipeline (segmentation's existing test gets one new assertion that DealItem rows are inserted; orchestrator's existing test covers wiring).
Fixtures
tests/features/categorization/fixtures/sample.png β single 64Γ64 RGB PNG checked into the repo. ~200 bytes.
Β§13Decisions made (during brainstorm)
DealItemrows come from segmentation. Cleaner contract than letting categorization scan disk. Categorization readsWHERE status IN ('pending','failed').- DB-backed
CategorizerSettingswithkindenum (two_pass | single_pass). MirrorsSegmenterSettings; consistent with the existing factory pattern. - LLM client lives in
integrations/openrouter/. Parallelsintegrations/hubspot/; predictable home for appraisal next. - Categories / subcategories / prompts: copy legacy files as-is under
features/categorization/. Re-read each call. Edits live in dev. - Token tracking: nullable columns on
pipeline_runs(input_tokens,output_tokens,model). Cheap; useful immediately. No separatellm_callstable. - Per-deal concurrency: 10, configurable. Matches legacy.
Storageabstraction lands in this slice;LocalFsStorageonly. Azure backend is a follow-up that only implements the protocol.- Describe-then-classify is not ported. The variation registry pattern leaves the seam.
- Per-item failures don't fail the stage. Items get
status='failed'+error; re-runs reprocess them. Stage-level failures (auth, quota, config) bubble up for orchestrator backoff/retry.