examples.smoke.minimal_oak_fine_grained

  1from __future__ import annotations
  2
  3"""Bare-minimum OaK example built from fine-grained components.
  4
  5This mirrors `examples/minimal_oak.py`, but instead of implementing the four
  6main OaK interfaces directly, it assembles them from the optional fine-grained
  7building blocks in `oak.fine_grained`.
  8
  9The behavior is intentionally the same as the direct example:
 10
 11- a tiny integer world
 12- a direct observation-to-subjective_state state builder
 13- one fixed identity feature
 14- no-op model learning with trivial planning
 15- a simple value tracker with usage counting and no curation
 16- a reactive policy that alternates actions and options
 17"""
 18
 19from dataclasses import dataclass
 20from typing import Mapping, Sequence
 21
 22from oak.agent import OaKAgent
 23from oak.fine_grained import (
 24    ActionSelector,
 25    CompositePerception,
 26    CompositeReactivePolicy,
 27    CompositeTransitionModel,
 28    CompositeValueFunction,
 29    Curator,
 30    FeatureBank,
 31    FeatureConstructor,
 32    FeatureRanker,
 33    GeneralValueFunctionLearner,
 34    Option,
 35    OptionLearner,
 36    OptionLibrary,
 37    OptionModel,
 38    OptionModelLearner,
 39    Planner,
 40    StateBuilder,
 41    SubtaskGenerator,
 42    UtilityAssessor,
 43    ValueEstimator,
 44    WorldModel,
 45)
 46from oak.types import (
 47    CurationDecision,
 48    FeatureCandidate,
 49    FeatureId,
 50    FeatureSpec,
 51    GeneralValueFunctionId,
 52    ModelPrediction,
 53    OptionDescriptor,
 54    OptionId,
 55    PlanningUpdate,
 56    PolicyDecision,
 57    SubtaskId,
 58    SubtaskSpec,
 59    Transition,
 60    UsageRecord,
 61    UtilityRecord,
 62)
 63
 64from .minimal_oak import (
 65    Action,
 66    MinimalInfo,
 67    MinimalSubjectiveState,
 68    MinimalTraceStep,
 69    MinimalWorld,
 70    Observation,
 71    _planning_budget_used,
 72)
 73
 74
 75# ─────────────────────────────────────────────────────────────────────
 76# Perception components
 77# ─────────────────────────────────────────────────────────────────────
 78
 79
 80class MinimalStateBuilder(StateBuilder[Observation, Action, MinimalSubjectiveState]):
 81    """Direct observation-to-state mapping."""
 82
 83    def __init__(self) -> None:
 84        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
 85
 86    def reset(self) -> None:
 87        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
 88
 89    def update(
 90        self,
 91        observation: Observation,
 92        reward: float,
 93        last_action: Action | None,
 94    ) -> MinimalSubjectiveState:
 95        self._state = MinimalSubjectiveState(
 96            step_index=observation,
 97            observation=observation,
 98            reward=reward,
 99            last_action=last_action,
100        )
101        return self._state
102
103    def current_subjective_state(self) -> MinimalSubjectiveState:
104        return self._state
105
106
107class MinimalFeatureBank(FeatureBank[MinimalSubjectiveState]):
108    """Stores one fixed identity feature."""
109
110    def __init__(self) -> None:
111        self._features: dict[FeatureId, FeatureSpec] = {
112            "observation": FeatureSpec(
113                feature_id="observation",
114                name="Observation value",
115                description="Identity feature for the integer observation.",
116            )
117        }
118
119    def list_features(self) -> Sequence[FeatureSpec]:
120        return tuple(self._features.values())
121
122    def activations(
123        self,
124        subjective_state: MinimalSubjectiveState,
125    ) -> Mapping[FeatureId, float]:
126        return {"observation": float(subjective_state.observation)}
127
128    def add_candidates(
129        self, candidates: Sequence[FeatureCandidate]
130    ) -> Sequence[FeatureSpec]:
131        added: list[FeatureSpec] = []
132        for candidate in candidates:
133            feature = FeatureSpec(
134                feature_id=candidate.feature_id,
135                name=candidate.name,
136                description=candidate.description,
137                metadata=candidate.metadata,
138            )
139            self._features[feature.feature_id] = feature
140            added.append(feature)
141        return tuple(added)
142
143    def remove(self, feature_ids: Sequence[FeatureId]) -> None:
144        for feature_id in feature_ids:
145            self._features.pop(feature_id, None)
146
147
148class MinimalFeatureConstructor(FeatureConstructor[MinimalSubjectiveState]):
149    """Never proposes new features."""
150
151    def propose(
152        self,
153        subjective_state: MinimalSubjectiveState,
154        active_features: Sequence[FeatureSpec],
155    ) -> Sequence[FeatureCandidate]:
156        return ()
157
158
159class MinimalFeatureRanker(FeatureRanker):
160    """Ranks features in their existing order."""
161
162    def rank(
163        self,
164        features: Sequence[FeatureSpec],
165        utilities: Sequence[UtilityRecord],
166        limit: int | None = None,
167    ) -> Sequence[FeatureId]:
168        feature_ids = [feature.feature_id for feature in features]
169        if limit is None:
170            return tuple(feature_ids)
171        return tuple(feature_ids[:limit])
172
173
174class MinimalSubtaskGenerator(SubtaskGenerator[MinimalSubjectiveState]):
175    """Creates at most one subtask per feature."""
176
177    def __init__(self) -> None:
178        self._created_subtask_for: set[FeatureId] = set()
179
180    def generate(
181        self,
182        ranked_feature_ids: Sequence[FeatureId],
183        feature_bank: FeatureBank[MinimalSubjectiveState],
184    ) -> Sequence[SubtaskSpec]:
185        created: list[SubtaskSpec] = []
186        feature_specs = {
187            feature.feature_id: feature for feature in feature_bank.list_features()
188        }
189        for feature_id in ranked_feature_ids:
190            if feature_id in self._created_subtask_for:
191                continue
192            self._created_subtask_for.add(feature_id)
193            feature = feature_specs[feature_id]
194            created.append(
195                SubtaskSpec(
196                    subtask_id=f"subtask:{feature_id}",
197                    name=f"Track {feature.name}",
198                    feature_id=feature_id,
199                )
200            )
201        return tuple(created)
202
203
204# ─────────────────────────────────────────────────────────────────────
205# Transition-model components
206# ─────────────────────────────────────────────────────────────────────
207
208
209class MinimalWorldModel(WorldModel[MinimalSubjectiveState, Action, MinimalInfo]):
210    """Trivial planner-facing model."""
211
212    def update(
213        self,
214        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
215    ) -> None:
216        pass
217
218    def predict_action(
219        self,
220        subjective_state: MinimalSubjectiveState,
221        action: Action,
222    ) -> ModelPrediction[MinimalSubjectiveState]:
223        return ModelPrediction(
224            predicted_subjective_state=subjective_state,
225            cumulative_reward=0.0,
226            steps=1,
227        )
228
229    def predict_option(
230        self,
231        subjective_state: MinimalSubjectiveState,
232        option_id: OptionId,
233    ) -> ModelPrediction[MinimalSubjectiveState]:
234        return ModelPrediction(
235            predicted_subjective_state=subjective_state,
236            cumulative_reward=0.0,
237            steps=1,
238        )
239
240    def add_or_replace_option_models(
241        self, models: Sequence[OptionModel[MinimalSubjectiveState]]
242    ) -> None:
243        pass
244
245    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
246        pass
247
248
249class MinimalOptionModelLearner(
250    OptionModelLearner[MinimalSubjectiveState, Action, MinimalInfo]
251):
252    """No-op option-model learner."""
253
254    def update(
255        self,
256        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
257    ) -> None:
258        pass
259
260    def export_models(self) -> Sequence[OptionModel[MinimalSubjectiveState]]:
261        return ()
262
263
264class MinimalPlanner(Planner[MinimalSubjectiveState, Action, MinimalInfo]):
265    """Returns one-step value targets without real search."""
266
267    def plan_step(
268        self,
269        subjective_state: MinimalSubjectiveState,
270        model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo],
271        value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo],
272        budget: int,
273    ) -> PlanningUpdate[Action]:
274        return PlanningUpdate(
275            value_targets=value_function.predict(subjective_state),
276            policy_targets={"preferred_action": 0},
277            search_statistics={"budget_used": budget},
278        )
279
280
281# ─────────────────────────────────────────────────────────────────────
282# Value-function components
283# ─────────────────────────────────────────────────────────────────────
284
285
286class MinimalValueEstimator(
287    ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo]
288):
289    """Stores latest reward as the only value estimate."""
290
291    def __init__(self) -> None:
292        self._value: float = 0.0
293
294    def list_general_value_functions(
295        self,
296    ) -> Sequence[
297        GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo]
298    ]:
299        return ()
300
301    def predict(
302        self,
303        subjective_state: MinimalSubjectiveState,
304    ) -> Mapping[GeneralValueFunctionId, float]:
305        return {"main": self._value}
306
307    def update(
308        self,
309        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
310    ) -> Mapping[GeneralValueFunctionId, float]:
311        self._value = transition.reward
312        return {"main": 0.0}
313
314    def add_or_replace(
315        self,
316        learner: GeneralValueFunctionLearner[
317            MinimalSubjectiveState, Action, MinimalInfo
318        ],
319    ) -> None:
320        pass
321
322    def remove(
323        self,
324        general_value_function_ids: Sequence[GeneralValueFunctionId],
325    ) -> None:
326        pass
327
328
329class MinimalUtilityAssessor(UtilityAssessor):
330    """Aggregates usage records into simple counts."""
331
332    def __init__(self) -> None:
333        self._usage_records: list[UsageRecord] = []
334
335    def observe(self, usage: Sequence[UsageRecord]) -> None:
336        self._usage_records.extend(usage)
337
338    def scores(self) -> Sequence[UtilityRecord]:
339        totals: dict[tuple[str, str], float] = {}
340        latest: dict[tuple[str, str], UsageRecord] = {}
341        for record in self._usage_records:
342            key = (record.kind.value, record.component_id)
343            totals[key] = totals.get(key, 0.0) + record.amount
344            latest[key] = record
345        return tuple(
346            UtilityRecord(
347                kind=record.kind,
348                component_id=record.component_id,
349                utility=totals[key],
350            )
351            for key, record in latest.items()
352        )
353
354
355class MinimalCurator(Curator):
356    """Never prunes."""
357
358    def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision:
359        return CurationDecision()
360
361
362# ─────────────────────────────────────────────────────────────────────
363# Reactive-policy components
364# ─────────────────────────────────────────────────────────────────────
365
366
367@dataclass
368class MinimalOption(Option[MinimalSubjectiveState, Action]):
369    """Trivial option that always emits action=1 and stops immediately."""
370
371    _descriptor: OptionDescriptor
372    _action: Action = 1
373
374    @property
375    def descriptor(self) -> OptionDescriptor:
376        return self._descriptor
377
378    def is_available(self, subjective_state: MinimalSubjectiveState) -> bool:
379        return True
380
381    def act(self, subjective_state: MinimalSubjectiveState) -> Action:
382        return self._action
383
384    def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float:
385        return 1.0
386
387
388class MinimalActionSelector(ActionSelector[MinimalSubjectiveState, Action]):
389    """Alternates primitive actions and option selection."""
390
391    def __init__(self) -> None:
392        self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {}
393        self.last_planning_update: PlanningUpdate[Action] | None = None
394
395    def decide(
396        self,
397        subjective_state: MinimalSubjectiveState,
398        active_option: Option[MinimalSubjectiveState, Action] | None,
399        available_options: Sequence[Option[MinimalSubjectiveState, Action]],
400    ) -> PolicyDecision[Action]:
401        if subjective_state.observation % 2 == 0:
402            return PolicyDecision(action=0)
403        if available_options:
404            return PolicyDecision(option_id=available_options[0].descriptor.option_id)
405        return PolicyDecision(action=1)
406
407    def update_from_values(
408        self,
409        subjective_state: MinimalSubjectiveState,
410        td_errors: Mapping[GeneralValueFunctionId, float],
411    ) -> None:
412        self.last_td_errors = dict(td_errors)
413
414    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
415        self.last_planning_update = update
416
417
418class MinimalOptionLibrary(OptionLibrary[MinimalSubjectiveState, Action]):
419    """Stores learned options."""
420
421    def __init__(self) -> None:
422        self._options: dict[OptionId, Option[MinimalSubjectiveState, Action]] = {}
423
424    def list_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
425        return tuple(self._options.values())
426
427    def get(self, option_id: OptionId) -> Option[MinimalSubjectiveState, Action]:
428        return self._options[option_id]
429
430    def add_or_replace(self, option: Option[MinimalSubjectiveState, Action]) -> None:
431        self._options[option.descriptor.option_id] = option
432
433    def remove(self, option_ids: Sequence[OptionId]) -> None:
434        for option_id in option_ids:
435            self._options.pop(option_id, None)
436
437
438class MinimalOptionLearner(OptionLearner[MinimalSubjectiveState, Action, MinimalInfo]):
439    """Creates one trivial option per subtask."""
440
441    def __init__(self) -> None:
442        self._subtasks: dict[SubtaskId, SubtaskSpec] = {}
443        self._options: dict[OptionId, MinimalOption] = {}
444
445    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
446        for subtask in subtasks:
447            self._subtasks[subtask.subtask_id] = subtask
448            option_id = f"option:{subtask.subtask_id}"
449            self._options[option_id] = MinimalOption(
450                OptionDescriptor(
451                    option_id=option_id,
452                    name=f"Option for {subtask.subtask_id}",
453                    subtask_id=subtask.subtask_id,
454                )
455            )
456
457    def update(
458        self,
459        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
460    ) -> None:
461        pass
462
463    def export_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
464        return tuple(self._options.values())
465
466    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
467        for subtask_id in subtask_ids:
468            self._subtasks.pop(subtask_id, None)
469            self._options.pop(f"option:{subtask_id}", None)
470
471
472# ─────────────────────────────────────────────────────────────────────
473# Wiring
474# ─────────────────────────────────────────────────────────────────────
475
476
477def build_minimal_agent() -> (
478    OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]
479):
480    """Construct a fully wired fine-grained smoke-test OaK agent."""
481    perception = CompositePerception(
482        state_builder=MinimalStateBuilder(),
483        feature_bank=MinimalFeatureBank(),
484        feature_constructor=MinimalFeatureConstructor(),
485        feature_ranker=MinimalFeatureRanker(),
486        subtask_generator=MinimalSubtaskGenerator(),
487    )
488    transition_model = CompositeTransitionModel(
489        world_model=MinimalWorldModel(),
490        option_model_learner=MinimalOptionModelLearner(),
491        planner=MinimalPlanner(),
492    )
493    value_function = CompositeValueFunction(
494        value_estimator=MinimalValueEstimator(),
495        utility_assessor=MinimalUtilityAssessor(),
496        curator=MinimalCurator(),
497    )
498    action_selector = MinimalActionSelector()
499    reactive_policy = CompositeReactivePolicy(
500        action_selector=action_selector,
501        option_library=MinimalOptionLibrary(),
502        option_learner=MinimalOptionLearner(),
503    )
504    return OaKAgent(
505        perception=perception,
506        transition_model=transition_model,
507        value_function=value_function,
508        reactive_policy=reactive_policy,
509        planning_budget=4,
510    )
511
512
513def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]:
514    """Run a short smoke episode and return a compact trace."""
515    world = MinimalWorld(horizon=horizon)
516    agent = build_minimal_agent()
517    step = world.reset()
518    agent.reset()
519
520    trace: list[MinimalTraceStep] = []
521
522    for _ in range(horizon):
523        result = agent.step(step)
524        action = result.action
525        trace.append(
526            {
527                "subjective_state": result.subjective_state,
528                "action": action,
529                "active_option_id": result.active_option_id,
530                "created_subtasks": [
531                    subtask.subtask_id for subtask in result.created_subtasks
532                ],
533                "planning_budget_used": _planning_budget_used(result.planning_update),
534            }
535        )
536        step = world.step(action)
537        if step.terminated:
538            break
539
540    return trace
class MinimalStateBuilder(oak.fine_grained.components.StateBuilder[int, int, examples.smoke.minimal_oak.MinimalSubjectiveState]):
 81class MinimalStateBuilder(StateBuilder[Observation, Action, MinimalSubjectiveState]):
 82    """Direct observation-to-state mapping."""
 83
 84    def __init__(self) -> None:
 85        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
 86
 87    def reset(self) -> None:
 88        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
 89
 90    def update(
 91        self,
 92        observation: Observation,
 93        reward: float,
 94        last_action: Action | None,
 95    ) -> MinimalSubjectiveState:
 96        self._state = MinimalSubjectiveState(
 97            step_index=observation,
 98            observation=observation,
 99            reward=reward,
100            last_action=last_action,
101        )
102        return self._state
103
104    def current_subjective_state(self) -> MinimalSubjectiveState:
105        return self._state

Direct observation-to-state mapping.

def reset(self) -> 'None':
87    def reset(self) -> None:
88        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
def update( self, observation: 'Observation', reward: 'float', last_action: 'Action | None') -> 'MinimalSubjectiveState':
 90    def update(
 91        self,
 92        observation: Observation,
 93        reward: float,
 94        last_action: Action | None,
 95    ) -> MinimalSubjectiveState:
 96        self._state = MinimalSubjectiveState(
 97            step_index=observation,
 98            observation=observation,
 99            reward=reward,
100            last_action=last_action,
101        )
102        return self._state
def current_subjective_state(self) -> 'MinimalSubjectiveState':
104    def current_subjective_state(self) -> MinimalSubjectiveState:
105        return self._state
class MinimalFeatureBank(oak.fine_grained.components.FeatureBank[examples.smoke.minimal_oak.MinimalSubjectiveState]):
108class MinimalFeatureBank(FeatureBank[MinimalSubjectiveState]):
109    """Stores one fixed identity feature."""
110
111    def __init__(self) -> None:
112        self._features: dict[FeatureId, FeatureSpec] = {
113            "observation": FeatureSpec(
114                feature_id="observation",
115                name="Observation value",
116                description="Identity feature for the integer observation.",
117            )
118        }
119
120    def list_features(self) -> Sequence[FeatureSpec]:
121        return tuple(self._features.values())
122
123    def activations(
124        self,
125        subjective_state: MinimalSubjectiveState,
126    ) -> Mapping[FeatureId, float]:
127        return {"observation": float(subjective_state.observation)}
128
129    def add_candidates(
130        self, candidates: Sequence[FeatureCandidate]
131    ) -> Sequence[FeatureSpec]:
132        added: list[FeatureSpec] = []
133        for candidate in candidates:
134            feature = FeatureSpec(
135                feature_id=candidate.feature_id,
136                name=candidate.name,
137                description=candidate.description,
138                metadata=candidate.metadata,
139            )
140            self._features[feature.feature_id] = feature
141            added.append(feature)
142        return tuple(added)
143
144    def remove(self, feature_ids: Sequence[FeatureId]) -> None:
145        for feature_id in feature_ids:
146            self._features.pop(feature_id, None)

Stores one fixed identity feature.

def list_features(self) -> 'Sequence[FeatureSpec]':
120    def list_features(self) -> Sequence[FeatureSpec]:
121        return tuple(self._features.values())
def activations( self, subjective_state: 'MinimalSubjectiveState') -> 'Mapping[FeatureId, float]':
123    def activations(
124        self,
125        subjective_state: MinimalSubjectiveState,
126    ) -> Mapping[FeatureId, float]:
127        return {"observation": float(subjective_state.observation)}

Return per-feature activation values for the given state.

Intended for SubtaskGenerator implementations, which receive the FeatureBank and may use activations to decide which features warrant new subtasks.

def add_candidates( self, candidates: 'Sequence[FeatureCandidate]') -> 'Sequence[FeatureSpec]':
129    def add_candidates(
130        self, candidates: Sequence[FeatureCandidate]
131    ) -> Sequence[FeatureSpec]:
132        added: list[FeatureSpec] = []
133        for candidate in candidates:
134            feature = FeatureSpec(
135                feature_id=candidate.feature_id,
136                name=candidate.name,
137                description=candidate.description,
138                metadata=candidate.metadata,
139            )
140            self._features[feature.feature_id] = feature
141            added.append(feature)
142        return tuple(added)
def remove(self, feature_ids: 'Sequence[FeatureId]') -> 'None':
144    def remove(self, feature_ids: Sequence[FeatureId]) -> None:
145        for feature_id in feature_ids:
146            self._features.pop(feature_id, None)
class MinimalFeatureConstructor(oak.fine_grained.components.FeatureConstructor[examples.smoke.minimal_oak.MinimalSubjectiveState]):
149class MinimalFeatureConstructor(FeatureConstructor[MinimalSubjectiveState]):
150    """Never proposes new features."""
151
152    def propose(
153        self,
154        subjective_state: MinimalSubjectiveState,
155        active_features: Sequence[FeatureSpec],
156    ) -> Sequence[FeatureCandidate]:
157        return ()

Never proposes new features.

def propose( self, subjective_state: 'MinimalSubjectiveState', active_features: 'Sequence[FeatureSpec]') -> 'Sequence[FeatureCandidate]':
152    def propose(
153        self,
154        subjective_state: MinimalSubjectiveState,
155        active_features: Sequence[FeatureSpec],
156    ) -> Sequence[FeatureCandidate]:
157        return ()
class MinimalFeatureRanker(oak.fine_grained.components.FeatureRanker):
160class MinimalFeatureRanker(FeatureRanker):
161    """Ranks features in their existing order."""
162
163    def rank(
164        self,
165        features: Sequence[FeatureSpec],
166        utilities: Sequence[UtilityRecord],
167        limit: int | None = None,
168    ) -> Sequence[FeatureId]:
169        feature_ids = [feature.feature_id for feature in features]
170        if limit is None:
171            return tuple(feature_ids)
172        return tuple(feature_ids[:limit])

Ranks features in their existing order.

def rank( self, features: 'Sequence[FeatureSpec]', utilities: 'Sequence[UtilityRecord]', limit: 'int | None' = None) -> 'Sequence[FeatureId]':
163    def rank(
164        self,
165        features: Sequence[FeatureSpec],
166        utilities: Sequence[UtilityRecord],
167        limit: int | None = None,
168    ) -> Sequence[FeatureId]:
169        feature_ids = [feature.feature_id for feature in features]
170        if limit is None:
171            return tuple(feature_ids)
172        return tuple(feature_ids[:limit])
class MinimalSubtaskGenerator(oak.fine_grained.components.SubtaskGenerator[examples.smoke.minimal_oak.MinimalSubjectiveState]):
175class MinimalSubtaskGenerator(SubtaskGenerator[MinimalSubjectiveState]):
176    """Creates at most one subtask per feature."""
177
178    def __init__(self) -> None:
179        self._created_subtask_for: set[FeatureId] = set()
180
181    def generate(
182        self,
183        ranked_feature_ids: Sequence[FeatureId],
184        feature_bank: FeatureBank[MinimalSubjectiveState],
185    ) -> Sequence[SubtaskSpec]:
186        created: list[SubtaskSpec] = []
187        feature_specs = {
188            feature.feature_id: feature for feature in feature_bank.list_features()
189        }
190        for feature_id in ranked_feature_ids:
191            if feature_id in self._created_subtask_for:
192                continue
193            self._created_subtask_for.add(feature_id)
194            feature = feature_specs[feature_id]
195            created.append(
196                SubtaskSpec(
197                    subtask_id=f"subtask:{feature_id}",
198                    name=f"Track {feature.name}",
199                    feature_id=feature_id,
200                )
201            )
202        return tuple(created)

Creates at most one subtask per feature.

def generate( self, ranked_feature_ids: 'Sequence[FeatureId]', feature_bank: 'FeatureBank[MinimalSubjectiveState]') -> 'Sequence[SubtaskSpec]':
181    def generate(
182        self,
183        ranked_feature_ids: Sequence[FeatureId],
184        feature_bank: FeatureBank[MinimalSubjectiveState],
185    ) -> Sequence[SubtaskSpec]:
186        created: list[SubtaskSpec] = []
187        feature_specs = {
188            feature.feature_id: feature for feature in feature_bank.list_features()
189        }
190        for feature_id in ranked_feature_ids:
191            if feature_id in self._created_subtask_for:
192                continue
193            self._created_subtask_for.add(feature_id)
194            feature = feature_specs[feature_id]
195            created.append(
196                SubtaskSpec(
197                    subtask_id=f"subtask:{feature_id}",
198                    name=f"Track {feature.name}",
199                    feature_id=feature_id,
200                )
201            )
202        return tuple(created)
class MinimalWorldModel(oak.fine_grained.components.WorldModel[examples.smoke.minimal_oak.MinimalSubjectiveState, int, examples.smoke.minimal_oak.MinimalInfo]):
210class MinimalWorldModel(WorldModel[MinimalSubjectiveState, Action, MinimalInfo]):
211    """Trivial planner-facing model."""
212
213    def update(
214        self,
215        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
216    ) -> None:
217        pass
218
219    def predict_action(
220        self,
221        subjective_state: MinimalSubjectiveState,
222        action: Action,
223    ) -> ModelPrediction[MinimalSubjectiveState]:
224        return ModelPrediction(
225            predicted_subjective_state=subjective_state,
226            cumulative_reward=0.0,
227            steps=1,
228        )
229
230    def predict_option(
231        self,
232        subjective_state: MinimalSubjectiveState,
233        option_id: OptionId,
234    ) -> ModelPrediction[MinimalSubjectiveState]:
235        return ModelPrediction(
236            predicted_subjective_state=subjective_state,
237            cumulative_reward=0.0,
238            steps=1,
239        )
240
241    def add_or_replace_option_models(
242        self, models: Sequence[OptionModel[MinimalSubjectiveState]]
243    ) -> None:
244        pass
245
246    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
247        pass

Trivial planner-facing model.

def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'None':
213    def update(
214        self,
215        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
216    ) -> None:
217        pass
def predict_action( self, subjective_state: 'MinimalSubjectiveState', action: 'Action') -> 'ModelPrediction[MinimalSubjectiveState]':
219    def predict_action(
220        self,
221        subjective_state: MinimalSubjectiveState,
222        action: Action,
223    ) -> ModelPrediction[MinimalSubjectiveState]:
224        return ModelPrediction(
225            predicted_subjective_state=subjective_state,
226            cumulative_reward=0.0,
227            steps=1,
228        )
def predict_option( self, subjective_state: 'MinimalSubjectiveState', option_id: 'OptionId') -> 'ModelPrediction[MinimalSubjectiveState]':
230    def predict_option(
231        self,
232        subjective_state: MinimalSubjectiveState,
233        option_id: OptionId,
234    ) -> ModelPrediction[MinimalSubjectiveState]:
235        return ModelPrediction(
236            predicted_subjective_state=subjective_state,
237            cumulative_reward=0.0,
238            steps=1,
239        )
def add_or_replace_option_models(self, models: 'Sequence[OptionModel[MinimalSubjectiveState]]') -> 'None':
241    def add_or_replace_option_models(
242        self, models: Sequence[OptionModel[MinimalSubjectiveState]]
243    ) -> None:
244        pass
def remove_option_models(self, option_ids: 'Sequence[OptionId]') -> 'None':
246    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
247        pass
class MinimalOptionModelLearner(oak.fine_grained.components.OptionModelLearner[examples.smoke.minimal_oak.MinimalSubjectiveState, int, examples.smoke.minimal_oak.MinimalInfo]):
250class MinimalOptionModelLearner(
251    OptionModelLearner[MinimalSubjectiveState, Action, MinimalInfo]
252):
253    """No-op option-model learner."""
254
255    def update(
256        self,
257        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
258    ) -> None:
259        pass
260
261    def export_models(self) -> Sequence[OptionModel[MinimalSubjectiveState]]:
262        return ()

No-op option-model learner.

def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'None':
255    def update(
256        self,
257        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
258    ) -> None:
259        pass
def export_models(self) -> 'Sequence[OptionModel[MinimalSubjectiveState]]':
261    def export_models(self) -> Sequence[OptionModel[MinimalSubjectiveState]]:
262        return ()
class MinimalPlanner(oak.fine_grained.components.Planner[examples.smoke.minimal_oak.MinimalSubjectiveState, int, examples.smoke.minimal_oak.MinimalInfo]):
265class MinimalPlanner(Planner[MinimalSubjectiveState, Action, MinimalInfo]):
266    """Returns one-step value targets without real search."""
267
268    def plan_step(
269        self,
270        subjective_state: MinimalSubjectiveState,
271        model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo],
272        value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo],
273        budget: int,
274    ) -> PlanningUpdate[Action]:
275        return PlanningUpdate(
276            value_targets=value_function.predict(subjective_state),
277            policy_targets={"preferred_action": 0},
278            search_statistics={"budget_used": budget},
279        )

Returns one-step value targets without real search.

def plan_step( self, subjective_state: 'MinimalSubjectiveState', model: 'WorldModel[MinimalSubjectiveState, Action, MinimalInfo]', value_function: 'ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo]', budget: 'int') -> 'PlanningUpdate[Action]':
268    def plan_step(
269        self,
270        subjective_state: MinimalSubjectiveState,
271        model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo],
272        value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo],
273        budget: int,
274    ) -> PlanningUpdate[Action]:
275        return PlanningUpdate(
276            value_targets=value_function.predict(subjective_state),
277            policy_targets={"preferred_action": 0},
278            search_statistics={"budget_used": budget},
279        )
class MinimalValueEstimator(oak.fine_grained.components.ValueEstimator[examples.smoke.minimal_oak.MinimalSubjectiveState, int, examples.smoke.minimal_oak.MinimalInfo]):
287class MinimalValueEstimator(
288    ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo]
289):
290    """Stores latest reward as the only value estimate."""
291
292    def __init__(self) -> None:
293        self._value: float = 0.0
294
295    def list_general_value_functions(
296        self,
297    ) -> Sequence[
298        GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo]
299    ]:
300        return ()
301
302    def predict(
303        self,
304        subjective_state: MinimalSubjectiveState,
305    ) -> Mapping[GeneralValueFunctionId, float]:
306        return {"main": self._value}
307
308    def update(
309        self,
310        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
311    ) -> Mapping[GeneralValueFunctionId, float]:
312        self._value = transition.reward
313        return {"main": 0.0}
314
315    def add_or_replace(
316        self,
317        learner: GeneralValueFunctionLearner[
318            MinimalSubjectiveState, Action, MinimalInfo
319        ],
320    ) -> None:
321        pass
322
323    def remove(
324        self,
325        general_value_function_ids: Sequence[GeneralValueFunctionId],
326    ) -> None:
327        pass

Stores latest reward as the only value estimate.

def list_general_value_functions( self) -> 'Sequence[GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo]]':
295    def list_general_value_functions(
296        self,
297    ) -> Sequence[
298        GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo]
299    ]:
300        return ()

Return all managed GVF learners.

Intended for Planner implementations that need to inspect the GVF bank (e.g., to evaluate auxiliary predictions during planning).

def predict( self, subjective_state: 'MinimalSubjectiveState') -> 'Mapping[GeneralValueFunctionId, float]':
302    def predict(
303        self,
304        subjective_state: MinimalSubjectiveState,
305    ) -> Mapping[GeneralValueFunctionId, float]:
306        return {"main": self._value}
def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'Mapping[GeneralValueFunctionId, float]':
308    def update(
309        self,
310        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
311    ) -> Mapping[GeneralValueFunctionId, float]:
312        self._value = transition.reward
313        return {"main": 0.0}
def add_or_replace( self, learner: 'GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo]') -> 'None':
315    def add_or_replace(
316        self,
317        learner: GeneralValueFunctionLearner[
318            MinimalSubjectiveState, Action, MinimalInfo
319        ],
320    ) -> None:
321        pass

Add or replace a GVF learner in the bank.

Used for dynamic GVF management, e.g., creating new GVFs when new subtasks or options are discovered.

def remove( self, general_value_function_ids: 'Sequence[GeneralValueFunctionId]') -> 'None':
323    def remove(
324        self,
325        general_value_function_ids: Sequence[GeneralValueFunctionId],
326    ) -> None:
327        pass
class MinimalUtilityAssessor(oak.fine_grained.components.UtilityAssessor):
330class MinimalUtilityAssessor(UtilityAssessor):
331    """Aggregates usage records into simple counts."""
332
333    def __init__(self) -> None:
334        self._usage_records: list[UsageRecord] = []
335
336    def observe(self, usage: Sequence[UsageRecord]) -> None:
337        self._usage_records.extend(usage)
338
339    def scores(self) -> Sequence[UtilityRecord]:
340        totals: dict[tuple[str, str], float] = {}
341        latest: dict[tuple[str, str], UsageRecord] = {}
342        for record in self._usage_records:
343            key = (record.kind.value, record.component_id)
344            totals[key] = totals.get(key, 0.0) + record.amount
345            latest[key] = record
346        return tuple(
347            UtilityRecord(
348                kind=record.kind,
349                component_id=record.component_id,
350                utility=totals[key],
351            )
352            for key, record in latest.items()
353        )

Aggregates usage records into simple counts.

def observe(self, usage: 'Sequence[UsageRecord]') -> 'None':
336    def observe(self, usage: Sequence[UsageRecord]) -> None:
337        self._usage_records.extend(usage)
def scores(self) -> 'Sequence[UtilityRecord]':
339    def scores(self) -> Sequence[UtilityRecord]:
340        totals: dict[tuple[str, str], float] = {}
341        latest: dict[tuple[str, str], UsageRecord] = {}
342        for record in self._usage_records:
343            key = (record.kind.value, record.component_id)
344            totals[key] = totals.get(key, 0.0) + record.amount
345            latest[key] = record
346        return tuple(
347            UtilityRecord(
348                kind=record.kind,
349                component_id=record.component_id,
350                utility=totals[key],
351            )
352            for key, record in latest.items()
353        )
class MinimalCurator(oak.fine_grained.components.Curator):
356class MinimalCurator(Curator):
357    """Never prunes."""
358
359    def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision:
360        return CurationDecision()

Never prunes.

def curate(self, utilities: 'Sequence[UtilityRecord]') -> 'CurationDecision':
359    def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision:
360        return CurationDecision()
@dataclass
class MinimalOption(oak.fine_grained.components.Option[examples.smoke.minimal_oak.MinimalSubjectiveState, int]):
368@dataclass
369class MinimalOption(Option[MinimalSubjectiveState, Action]):
370    """Trivial option that always emits action=1 and stops immediately."""
371
372    _descriptor: OptionDescriptor
373    _action: Action = 1
374
375    @property
376    def descriptor(self) -> OptionDescriptor:
377        return self._descriptor
378
379    def is_available(self, subjective_state: MinimalSubjectiveState) -> bool:
380        return True
381
382    def act(self, subjective_state: MinimalSubjectiveState) -> Action:
383        return self._action
384
385    def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float:
386        return 1.0

Trivial option that always emits action=1 and stops immediately.

MinimalOption(_descriptor: 'OptionDescriptor', _action: 'Action' = 1)
descriptor: 'OptionDescriptor'
375    @property
376    def descriptor(self) -> OptionDescriptor:
377        return self._descriptor
def is_available(self, subjective_state: 'MinimalSubjectiveState') -> 'bool':
379    def is_available(self, subjective_state: MinimalSubjectiveState) -> bool:
380        return True

Whether this option can be initiated in the given state.

Intended for ActionSelector implementations, which receive available options and may filter by initiation conditions.

def act(self, subjective_state: 'MinimalSubjectiveState') -> 'Action':
382    def act(self, subjective_state: MinimalSubjectiveState) -> Action:
383        return self._action
def stop_probability(self, subjective_state: 'MinimalSubjectiveState') -> 'float':
385    def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float:
386        return 1.0
class MinimalActionSelector(oak.fine_grained.components.ActionSelector[examples.smoke.minimal_oak.MinimalSubjectiveState, int]):
389class MinimalActionSelector(ActionSelector[MinimalSubjectiveState, Action]):
390    """Alternates primitive actions and option selection."""
391
392    def __init__(self) -> None:
393        self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {}
394        self.last_planning_update: PlanningUpdate[Action] | None = None
395
396    def decide(
397        self,
398        subjective_state: MinimalSubjectiveState,
399        active_option: Option[MinimalSubjectiveState, Action] | None,
400        available_options: Sequence[Option[MinimalSubjectiveState, Action]],
401    ) -> PolicyDecision[Action]:
402        if subjective_state.observation % 2 == 0:
403            return PolicyDecision(action=0)
404        if available_options:
405            return PolicyDecision(option_id=available_options[0].descriptor.option_id)
406        return PolicyDecision(action=1)
407
408    def update_from_values(
409        self,
410        subjective_state: MinimalSubjectiveState,
411        td_errors: Mapping[GeneralValueFunctionId, float],
412    ) -> None:
413        self.last_td_errors = dict(td_errors)
414
415    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
416        self.last_planning_update = update

Alternates primitive actions and option selection.

last_td_errors: 'Mapping[GeneralValueFunctionId, float]'
last_planning_update: 'PlanningUpdate[Action] | None'
def decide( self, subjective_state: 'MinimalSubjectiveState', active_option: 'Option[MinimalSubjectiveState, Action] | None', available_options: 'Sequence[Option[MinimalSubjectiveState, Action]]') -> 'PolicyDecision[Action]':
396    def decide(
397        self,
398        subjective_state: MinimalSubjectiveState,
399        active_option: Option[MinimalSubjectiveState, Action] | None,
400        available_options: Sequence[Option[MinimalSubjectiveState, Action]],
401    ) -> PolicyDecision[Action]:
402        if subjective_state.observation % 2 == 0:
403            return PolicyDecision(action=0)
404        if available_options:
405            return PolicyDecision(option_id=available_options[0].descriptor.option_id)
406        return PolicyDecision(action=1)
def update_from_values( self, subjective_state: 'MinimalSubjectiveState', td_errors: 'Mapping[GeneralValueFunctionId, float]') -> 'None':
408    def update_from_values(
409        self,
410        subjective_state: MinimalSubjectiveState,
411        td_errors: Mapping[GeneralValueFunctionId, float],
412    ) -> None:
413        self.last_td_errors = dict(td_errors)
def apply_planning_update(self, update: 'PlanningUpdate[Action]') -> 'None':
415    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
416        self.last_planning_update = update
class MinimalOptionLibrary(oak.fine_grained.components.OptionLibrary[examples.smoke.minimal_oak.MinimalSubjectiveState, int]):
419class MinimalOptionLibrary(OptionLibrary[MinimalSubjectiveState, Action]):
420    """Stores learned options."""
421
422    def __init__(self) -> None:
423        self._options: dict[OptionId, Option[MinimalSubjectiveState, Action]] = {}
424
425    def list_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
426        return tuple(self._options.values())
427
428    def get(self, option_id: OptionId) -> Option[MinimalSubjectiveState, Action]:
429        return self._options[option_id]
430
431    def add_or_replace(self, option: Option[MinimalSubjectiveState, Action]) -> None:
432        self._options[option.descriptor.option_id] = option
433
434    def remove(self, option_ids: Sequence[OptionId]) -> None:
435        for option_id in option_ids:
436            self._options.pop(option_id, None)

Stores learned options.

def list_options(self) -> 'Sequence[Option[MinimalSubjectiveState, Action]]':
425    def list_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
426        return tuple(self._options.values())
def get(self, option_id: 'OptionId') -> 'Option[MinimalSubjectiveState, Action]':
428    def get(self, option_id: OptionId) -> Option[MinimalSubjectiveState, Action]:
429        return self._options[option_id]
def add_or_replace(self, option: 'Option[MinimalSubjectiveState, Action]') -> 'None':
431    def add_or_replace(self, option: Option[MinimalSubjectiveState, Action]) -> None:
432        self._options[option.descriptor.option_id] = option
def remove(self, option_ids: 'Sequence[OptionId]') -> 'None':
434    def remove(self, option_ids: Sequence[OptionId]) -> None:
435        for option_id in option_ids:
436            self._options.pop(option_id, None)
class MinimalOptionLearner(oak.fine_grained.components.OptionLearner[examples.smoke.minimal_oak.MinimalSubjectiveState, int, examples.smoke.minimal_oak.MinimalInfo]):
439class MinimalOptionLearner(OptionLearner[MinimalSubjectiveState, Action, MinimalInfo]):
440    """Creates one trivial option per subtask."""
441
442    def __init__(self) -> None:
443        self._subtasks: dict[SubtaskId, SubtaskSpec] = {}
444        self._options: dict[OptionId, MinimalOption] = {}
445
446    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
447        for subtask in subtasks:
448            self._subtasks[subtask.subtask_id] = subtask
449            option_id = f"option:{subtask.subtask_id}"
450            self._options[option_id] = MinimalOption(
451                OptionDescriptor(
452                    option_id=option_id,
453                    name=f"Option for {subtask.subtask_id}",
454                    subtask_id=subtask.subtask_id,
455                )
456            )
457
458    def update(
459        self,
460        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
461    ) -> None:
462        pass
463
464    def export_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
465        return tuple(self._options.values())
466
467    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
468        for subtask_id in subtask_ids:
469            self._subtasks.pop(subtask_id, None)
470            self._options.pop(f"option:{subtask_id}", None)

Creates one trivial option per subtask.

def ingest_subtasks(self, subtasks: 'Sequence[SubtaskSpec]') -> 'None':
446    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
447        for subtask in subtasks:
448            self._subtasks[subtask.subtask_id] = subtask
449            option_id = f"option:{subtask.subtask_id}"
450            self._options[option_id] = MinimalOption(
451                OptionDescriptor(
452                    option_id=option_id,
453                    name=f"Option for {subtask.subtask_id}",
454                    subtask_id=subtask.subtask_id,
455                )
456            )
def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'None':
458    def update(
459        self,
460        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
461    ) -> None:
462        pass
def export_options(self) -> 'Sequence[Option[MinimalSubjectiveState, Action]]':
464    def export_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
465        return tuple(self._options.values())
def remove_subtasks(self, subtask_ids: 'Sequence[SubtaskId]') -> 'None':
467    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
468        for subtask_id in subtask_ids:
469            self._subtasks.pop(subtask_id, None)
470            self._options.pop(f"option:{subtask_id}", None)
def build_minimal_agent() -> 'OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]':
478def build_minimal_agent() -> (
479    OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]
480):
481    """Construct a fully wired fine-grained smoke-test OaK agent."""
482    perception = CompositePerception(
483        state_builder=MinimalStateBuilder(),
484        feature_bank=MinimalFeatureBank(),
485        feature_constructor=MinimalFeatureConstructor(),
486        feature_ranker=MinimalFeatureRanker(),
487        subtask_generator=MinimalSubtaskGenerator(),
488    )
489    transition_model = CompositeTransitionModel(
490        world_model=MinimalWorldModel(),
491        option_model_learner=MinimalOptionModelLearner(),
492        planner=MinimalPlanner(),
493    )
494    value_function = CompositeValueFunction(
495        value_estimator=MinimalValueEstimator(),
496        utility_assessor=MinimalUtilityAssessor(),
497        curator=MinimalCurator(),
498    )
499    action_selector = MinimalActionSelector()
500    reactive_policy = CompositeReactivePolicy(
501        action_selector=action_selector,
502        option_library=MinimalOptionLibrary(),
503        option_learner=MinimalOptionLearner(),
504    )
505    return OaKAgent(
506        perception=perception,
507        transition_model=transition_model,
508        value_function=value_function,
509        reactive_policy=reactive_policy,
510        planning_budget=4,
511    )

Construct a fully wired fine-grained smoke-test OaK agent.

def run_minimal_episode(horizon: 'int' = 5) -> 'list[MinimalTraceStep]':
514def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]:
515    """Run a short smoke episode and return a compact trace."""
516    world = MinimalWorld(horizon=horizon)
517    agent = build_minimal_agent()
518    step = world.reset()
519    agent.reset()
520
521    trace: list[MinimalTraceStep] = []
522
523    for _ in range(horizon):
524        result = agent.step(step)
525        action = result.action
526        trace.append(
527            {
528                "subjective_state": result.subjective_state,
529                "action": action,
530                "active_option_id": result.active_option_id,
531                "created_subtasks": [
532                    subtask.subtask_id for subtask in result.created_subtasks
533                ],
534                "planning_budget_used": _planning_budget_used(result.planning_update),
535            }
536        )
537        step = world.step(action)
538        if step.terminated:
539            break
540
541    return trace

Run a short smoke episode and return a compact trace.