examples.minimal_oak_fine_grained

  1from __future__ import annotations
  2
  3"""Bare-minimum OaK example built from fine-grained components.
  4
  5This mirrors `examples/minimal_oak.py`, but instead of implementing the four
  6main OaK interfaces directly, it assembles them from the optional fine-grained
  7building blocks in `oak_architecture.fine_grained`.
  8
  9The behavior is intentionally the same as the direct example:
 10
 11- a tiny integer world
 12- a direct observation-to-subjective_state state builder
 13- one fixed identity feature
 14- no-op model learning with trivial planning
 15- a simple value tracker with usage counting and no curation
 16- a reactive policy that alternates actions and options
 17"""
 18
 19from dataclasses import dataclass
 20from typing import Mapping, Sequence
 21
 22from oak_architecture.agent import OaKAgent
 23from oak_architecture.fine_grained import (
 24    ActionSelector,
 25    CompositePerception,
 26    CompositeReactivePolicy,
 27    CompositeTransitionModel,
 28    CompositeValueFunction,
 29    Curator,
 30    FeatureBank,
 31    FeatureConstructor,
 32    FeatureRanker,
 33    GeneralValueFunctionLearner,
 34    Option,
 35    OptionLearner,
 36    OptionLibrary,
 37    OptionModel,
 38    OptionModelLearner,
 39    Planner,
 40    StateBuilder,
 41    SubtaskGenerator,
 42    UtilityAssessor,
 43    ValueEstimator,
 44    WorldModel,
 45)
 46from oak_architecture.types import (
 47    CurationDecision,
 48    FeatureCandidate,
 49    FeatureId,
 50    FeatureSpec,
 51    GeneralValueFunctionId,
 52    ModelPrediction,
 53    OptionDescriptor,
 54    OptionId,
 55    PlanningUpdate,
 56    PolicyDecision,
 57    SubtaskId,
 58    SubtaskSpec,
 59    Transition,
 60    UsageRecord,
 61    UtilityRecord,
 62)
 63
 64from .minimal_oak import (
 65    Action,
 66    MinimalInfo,
 67    MinimalSubjectiveState,
 68    MinimalTraceStep,
 69    MinimalWorld,
 70    Observation,
 71)
 72
 73
 74# ─────────────────────────────────────────────────────────────────────
 75# Perception components
 76# ─────────────────────────────────────────────────────────────────────
 77
 78
 79class MinimalStateBuilder(
 80    StateBuilder[Observation, Action, MinimalSubjectiveState]
 81):
 82    """Direct observation-to-state mapping."""
 83
 84    def __init__(self) -> None:
 85        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
 86
 87    def reset(self) -> None:
 88        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
 89
 90    def update(
 91        self,
 92        observation: Observation,
 93        reward: float,
 94        last_action: Action | None,
 95    ) -> MinimalSubjectiveState:
 96        self._state = MinimalSubjectiveState(
 97            step_index=observation,
 98            observation=observation,
 99            reward=reward,
100            last_action=last_action,
101        )
102        return self._state
103
104    def current_subjective_state(self) -> MinimalSubjectiveState:
105        return self._state
106
107
108class MinimalFeatureBank(FeatureBank[MinimalSubjectiveState]):
109    """Stores one fixed identity feature."""
110
111    def __init__(self) -> None:
112        self._features: dict[FeatureId, FeatureSpec] = {
113            "observation": FeatureSpec(
114                feature_id="observation",
115                name="Observation value",
116                description="Identity feature for the integer observation.",
117            )
118        }
119
120    def list_features(self) -> Sequence[FeatureSpec]:
121        return tuple(self._features.values())
122
123    def activations(
124        self,
125        subjective_state: MinimalSubjectiveState,
126    ) -> Mapping[FeatureId, float]:
127        return {"observation": float(subjective_state.observation)}
128
129    def add_candidates(
130        self, candidates: Sequence[FeatureCandidate]
131    ) -> Sequence[FeatureSpec]:
132        added: list[FeatureSpec] = []
133        for candidate in candidates:
134            feature = FeatureSpec(
135                feature_id=candidate.feature_id,
136                name=candidate.name,
137                description=candidate.description,
138                metadata=candidate.metadata,
139            )
140            self._features[feature.feature_id] = feature
141            added.append(feature)
142        return tuple(added)
143
144    def remove(self, feature_ids: Sequence[FeatureId]) -> None:
145        for feature_id in feature_ids:
146            self._features.pop(feature_id, None)
147
148
149class MinimalFeatureConstructor(FeatureConstructor[MinimalSubjectiveState]):
150    """Never proposes new features."""
151
152    def propose(
153        self,
154        subjective_state: MinimalSubjectiveState,
155        active_features: Sequence[FeatureSpec],
156    ) -> Sequence[FeatureCandidate]:
157        return ()
158
159
160class MinimalFeatureRanker(FeatureRanker):
161    """Ranks features in their existing order."""
162
163    def rank(
164        self,
165        features: Sequence[FeatureSpec],
166        utilities: Sequence[UtilityRecord],
167        limit: int | None = None,
168    ) -> Sequence[FeatureId]:
169        feature_ids = [feature.feature_id for feature in features]
170        if limit is None:
171            return tuple(feature_ids)
172        return tuple(feature_ids[:limit])
173
174
175class MinimalSubtaskGenerator(SubtaskGenerator[MinimalSubjectiveState]):
176    """Creates at most one subtask per feature."""
177
178    def __init__(self) -> None:
179        self._created_subtask_for: set[FeatureId] = set()
180
181    def generate(
182        self,
183        ranked_feature_ids: Sequence[FeatureId],
184        feature_bank: FeatureBank[MinimalSubjectiveState],
185    ) -> Sequence[SubtaskSpec]:
186        created: list[SubtaskSpec] = []
187        feature_specs = {
188            feature.feature_id: feature for feature in feature_bank.list_features()
189        }
190        for feature_id in ranked_feature_ids:
191            if feature_id in self._created_subtask_for:
192                continue
193            self._created_subtask_for.add(feature_id)
194            feature = feature_specs[feature_id]
195            created.append(
196                SubtaskSpec(
197                    subtask_id=f"subtask:{feature_id}",
198                    name=f"Track {feature.name}",
199                    feature_id=feature_id,
200                )
201            )
202        return tuple(created)
203
204
205# ─────────────────────────────────────────────────────────────────────
206# Transition-model components
207# ─────────────────────────────────────────────────────────────────────
208
209
210class MinimalWorldModel(WorldModel[MinimalSubjectiveState, Action, MinimalInfo]):
211    """Trivial planner-facing model."""
212
213    def update(
214        self,
215        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
216    ) -> None:
217        pass
218
219    def predict_action(
220        self,
221        subjective_state: MinimalSubjectiveState,
222        action: Action,
223    ) -> ModelPrediction[MinimalSubjectiveState]:
224        return ModelPrediction(
225            predicted_subjective_state=subjective_state,
226            cumulative_reward=0.0,
227            steps=1,
228        )
229
230    def predict_option(
231        self,
232        subjective_state: MinimalSubjectiveState,
233        option_id: OptionId,
234    ) -> ModelPrediction[MinimalSubjectiveState]:
235        return ModelPrediction(
236            predicted_subjective_state=subjective_state,
237            cumulative_reward=0.0,
238            steps=1,
239        )
240
241    def add_or_replace_option_models(
242        self, models: Sequence[OptionModel[MinimalSubjectiveState]]
243    ) -> None:
244        pass
245
246    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
247        pass
248
249
250class MinimalOptionModelLearner(
251    OptionModelLearner[MinimalSubjectiveState, Action, MinimalInfo]
252):
253    """No-op option-model learner."""
254
255    def update(
256        self,
257        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
258    ) -> None:
259        pass
260
261    def export_models(self) -> Sequence[OptionModel[MinimalSubjectiveState]]:
262        return ()
263
264
265class MinimalPlanner(Planner[MinimalSubjectiveState, Action, MinimalInfo]):
266    """Returns one-step value targets without real search."""
267
268    def plan_step(
269        self,
270        subjective_state: MinimalSubjectiveState,
271        model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo],
272        value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo],
273        budget: int,
274    ) -> PlanningUpdate[Action]:
275        return PlanningUpdate(
276            value_targets=value_function.predict(subjective_state),
277            policy_targets={"preferred_action": 0},
278            search_statistics={"budget_used": budget},
279        )
280
281
282# ─────────────────────────────────────────────────────────────────────
283# Value-function components
284# ─────────────────────────────────────────────────────────────────────
285
286
287class MinimalValueEstimator(
288    ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo]
289):
290    """Stores latest reward as the only value estimate."""
291
292    def __init__(self) -> None:
293        self._value: float = 0.0
294
295    def list_general_value_functions(
296        self,
297    ) -> Sequence[
298        GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo]
299    ]:
300        return ()
301
302    def predict(
303        self,
304        subjective_state: MinimalSubjectiveState,
305    ) -> Mapping[GeneralValueFunctionId, float]:
306        return {"main": self._value}
307
308    def update(
309        self,
310        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
311    ) -> Mapping[GeneralValueFunctionId, float]:
312        self._value = transition.reward
313        return {"main": 0.0}
314
315    def add_or_replace(
316        self,
317        learner: GeneralValueFunctionLearner[
318            MinimalSubjectiveState, Action, MinimalInfo
319        ],
320    ) -> None:
321        pass
322
323    def remove(
324        self,
325        general_value_function_ids: Sequence[GeneralValueFunctionId],
326    ) -> None:
327        pass
328
329
330class MinimalUtilityAssessor(UtilityAssessor):
331    """Aggregates usage records into simple counts."""
332
333    def __init__(self) -> None:
334        self._usage_records: list[UsageRecord] = []
335
336    def observe(self, usage: Sequence[UsageRecord]) -> None:
337        self._usage_records.extend(usage)
338
339    def scores(self) -> Sequence[UtilityRecord]:
340        totals: dict[tuple[str, str], float] = {}
341        latest: dict[tuple[str, str], UsageRecord] = {}
342        for record in self._usage_records:
343            key = (record.kind.value, record.component_id)
344            totals[key] = totals.get(key, 0.0) + record.amount
345            latest[key] = record
346        return tuple(
347            UtilityRecord(
348                kind=record.kind,
349                component_id=record.component_id,
350                utility=totals[key],
351            )
352            for key, record in latest.items()
353        )
354
355
356class MinimalCurator(Curator):
357    """Never prunes."""
358
359    def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision:
360        return CurationDecision()
361
362
363# ─────────────────────────────────────────────────────────────────────
364# Reactive-policy components
365# ─────────────────────────────────────────────────────────────────────
366
367
368@dataclass
369class MinimalOption(Option[MinimalSubjectiveState, Action]):
370    """Trivial option that always emits action=1 and stops immediately."""
371
372    _descriptor: OptionDescriptor
373    _action: Action = 1
374
375    @property
376    def descriptor(self) -> OptionDescriptor:
377        return self._descriptor
378
379    def is_available(self, subjective_state: MinimalSubjectiveState) -> bool:
380        return True
381
382    def act(self, subjective_state: MinimalSubjectiveState) -> Action:
383        return self._action
384
385    def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float:
386        return 1.0
387
388
389class MinimalActionSelector(ActionSelector[MinimalSubjectiveState, Action]):
390    """Alternates primitive actions and option selection."""
391
392    def __init__(self) -> None:
393        self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {}
394        self.last_planning_update: PlanningUpdate[Action] | None = None
395
396    def decide(
397        self,
398        subjective_state: MinimalSubjectiveState,
399        active_option: Option[MinimalSubjectiveState, Action] | None,
400        available_options: Sequence[Option[MinimalSubjectiveState, Action]],
401    ) -> PolicyDecision[Action]:
402        if subjective_state.observation % 2 == 0:
403            return PolicyDecision(action=0)
404        if available_options:
405            return PolicyDecision(option_id=available_options[0].descriptor.option_id)
406        return PolicyDecision(action=1)
407
408    def update_from_values(
409        self,
410        subjective_state: MinimalSubjectiveState,
411        td_errors: Mapping[GeneralValueFunctionId, float],
412    ) -> None:
413        self.last_td_errors = dict(td_errors)
414
415    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
416        self.last_planning_update = update
417
418
419class MinimalOptionLibrary(OptionLibrary[MinimalSubjectiveState, Action]):
420    """Stores learned options."""
421
422    def __init__(self) -> None:
423        self._options: dict[OptionId, Option[MinimalSubjectiveState, Action]] = {}
424
425    def list_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
426        return tuple(self._options.values())
427
428    def get(self, option_id: OptionId) -> Option[MinimalSubjectiveState, Action]:
429        return self._options[option_id]
430
431    def add_or_replace(self, option: Option[MinimalSubjectiveState, Action]) -> None:
432        self._options[option.descriptor.option_id] = option
433
434    def remove(self, option_ids: Sequence[OptionId]) -> None:
435        for option_id in option_ids:
436            self._options.pop(option_id, None)
437
438
439class MinimalOptionLearner(OptionLearner[MinimalSubjectiveState, Action, MinimalInfo]):
440    """Creates one trivial option per subtask."""
441
442    def __init__(self) -> None:
443        self._subtasks: dict[SubtaskId, SubtaskSpec] = {}
444        self._options: dict[OptionId, MinimalOption] = {}
445
446    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
447        for subtask in subtasks:
448            self._subtasks[subtask.subtask_id] = subtask
449            option_id = f"option:{subtask.subtask_id}"
450            self._options[option_id] = MinimalOption(
451                OptionDescriptor(
452                    option_id=option_id,
453                    name=f"Option for {subtask.subtask_id}",
454                    subtask_id=subtask.subtask_id,
455                )
456            )
457
458    def update(
459        self,
460        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
461    ) -> None:
462        pass
463
464    def export_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
465        return tuple(self._options.values())
466
467    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
468        for subtask_id in subtask_ids:
469            self._subtasks.pop(subtask_id, None)
470            self._options.pop(f"option:{subtask_id}", None)
471
472
473# ─────────────────────────────────────────────────────────────────────
474# Wiring
475# ─────────────────────────────────────────────────────────────────────
476
477
478def build_minimal_agent() -> (
479    OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]
480):
481    """Construct a fully wired fine-grained smoke-test OaK agent."""
482    perception = CompositePerception(
483        state_builder=MinimalStateBuilder(),
484        feature_bank=MinimalFeatureBank(),
485        feature_constructor=MinimalFeatureConstructor(),
486        feature_ranker=MinimalFeatureRanker(),
487        subtask_generator=MinimalSubtaskGenerator(),
488    )
489    transition_model = CompositeTransitionModel(
490        world_model=MinimalWorldModel(),
491        option_model_learner=MinimalOptionModelLearner(),
492        planner=MinimalPlanner(),
493    )
494    value_function = CompositeValueFunction(
495        value_estimator=MinimalValueEstimator(),
496        utility_assessor=MinimalUtilityAssessor(),
497        curator=MinimalCurator(),
498    )
499    action_selector = MinimalActionSelector()
500    reactive_policy = CompositeReactivePolicy(
501        action_selector=action_selector,
502        option_library=MinimalOptionLibrary(),
503        option_learner=MinimalOptionLearner(),
504    )
505    return OaKAgent(
506        perception=perception,
507        transition_model=transition_model,
508        value_function=value_function,
509        reactive_policy=reactive_policy,
510        planning_budget=4,
511    )
512
513
514def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]:
515    """Run a short smoke episode and return a compact trace."""
516    world = MinimalWorld(horizon=horizon)
517    agent = build_minimal_agent()
518    step = world.reset()
519    agent.reset()
520
521    trace: list[MinimalTraceStep] = []
522
523    for _ in range(horizon):
524        result = agent.step(step)
525        action = result.action
526        trace.append(
527            {
528                "subjective_state": result.subjective_state,
529                "action": action,
530                "active_option_id": result.active_option_id,
531                "created_subtasks": [
532                    subtask.subtask_id for subtask in result.created_subtasks
533                ],
534                "planning_budget_used": (
535                    int(result.planning_update.search_statistics["budget_used"])
536                    if result.planning_update is not None
537                    else None
538                ),
539            }
540        )
541        step = world.step(action)
542        if step.terminated:
543            break
544
545    return trace
class MinimalStateBuilder(oak_architecture.fine_grained.components.StateBuilder[int, int, examples.minimal_oak.MinimalSubjectiveState]):
 80class MinimalStateBuilder(
 81    StateBuilder[Observation, Action, MinimalSubjectiveState]
 82):
 83    """Direct observation-to-state mapping."""
 84
 85    def __init__(self) -> None:
 86        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
 87
 88    def reset(self) -> None:
 89        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
 90
 91    def update(
 92        self,
 93        observation: Observation,
 94        reward: float,
 95        last_action: Action | None,
 96    ) -> MinimalSubjectiveState:
 97        self._state = MinimalSubjectiveState(
 98            step_index=observation,
 99            observation=observation,
100            reward=reward,
101            last_action=last_action,
102        )
103        return self._state
104
105    def current_subjective_state(self) -> MinimalSubjectiveState:
106        return self._state

Direct observation-to-state mapping.

def reset(self) -> 'None':
88    def reset(self) -> None:
89        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
def update( self, observation: 'Observation', reward: 'float', last_action: 'Action | None') -> 'MinimalSubjectiveState':
 91    def update(
 92        self,
 93        observation: Observation,
 94        reward: float,
 95        last_action: Action | None,
 96    ) -> MinimalSubjectiveState:
 97        self._state = MinimalSubjectiveState(
 98            step_index=observation,
 99            observation=observation,
100            reward=reward,
101            last_action=last_action,
102        )
103        return self._state
def current_subjective_state(self) -> 'MinimalSubjectiveState':
105    def current_subjective_state(self) -> MinimalSubjectiveState:
106        return self._state
class MinimalFeatureBank(oak_architecture.fine_grained.components.FeatureBank[examples.minimal_oak.MinimalSubjectiveState]):
109class MinimalFeatureBank(FeatureBank[MinimalSubjectiveState]):
110    """Stores one fixed identity feature."""
111
112    def __init__(self) -> None:
113        self._features: dict[FeatureId, FeatureSpec] = {
114            "observation": FeatureSpec(
115                feature_id="observation",
116                name="Observation value",
117                description="Identity feature for the integer observation.",
118            )
119        }
120
121    def list_features(self) -> Sequence[FeatureSpec]:
122        return tuple(self._features.values())
123
124    def activations(
125        self,
126        subjective_state: MinimalSubjectiveState,
127    ) -> Mapping[FeatureId, float]:
128        return {"observation": float(subjective_state.observation)}
129
130    def add_candidates(
131        self, candidates: Sequence[FeatureCandidate]
132    ) -> Sequence[FeatureSpec]:
133        added: list[FeatureSpec] = []
134        for candidate in candidates:
135            feature = FeatureSpec(
136                feature_id=candidate.feature_id,
137                name=candidate.name,
138                description=candidate.description,
139                metadata=candidate.metadata,
140            )
141            self._features[feature.feature_id] = feature
142            added.append(feature)
143        return tuple(added)
144
145    def remove(self, feature_ids: Sequence[FeatureId]) -> None:
146        for feature_id in feature_ids:
147            self._features.pop(feature_id, None)

Stores one fixed identity feature.

def list_features(self) -> 'Sequence[FeatureSpec]':
121    def list_features(self) -> Sequence[FeatureSpec]:
122        return tuple(self._features.values())
def activations( self, subjective_state: 'MinimalSubjectiveState') -> 'Mapping[FeatureId, float]':
124    def activations(
125        self,
126        subjective_state: MinimalSubjectiveState,
127    ) -> Mapping[FeatureId, float]:
128        return {"observation": float(subjective_state.observation)}

Return per-feature activation values for the given state.

Intended for SubtaskGenerator implementations, which receive the FeatureBank and may use activations to decide which features warrant new subtasks.

def add_candidates( self, candidates: 'Sequence[FeatureCandidate]') -> 'Sequence[FeatureSpec]':
130    def add_candidates(
131        self, candidates: Sequence[FeatureCandidate]
132    ) -> Sequence[FeatureSpec]:
133        added: list[FeatureSpec] = []
134        for candidate in candidates:
135            feature = FeatureSpec(
136                feature_id=candidate.feature_id,
137                name=candidate.name,
138                description=candidate.description,
139                metadata=candidate.metadata,
140            )
141            self._features[feature.feature_id] = feature
142            added.append(feature)
143        return tuple(added)
def remove(self, feature_ids: 'Sequence[FeatureId]') -> 'None':
145    def remove(self, feature_ids: Sequence[FeatureId]) -> None:
146        for feature_id in feature_ids:
147            self._features.pop(feature_id, None)
class MinimalFeatureConstructor(oak_architecture.fine_grained.components.FeatureConstructor[examples.minimal_oak.MinimalSubjectiveState]):
150class MinimalFeatureConstructor(FeatureConstructor[MinimalSubjectiveState]):
151    """Never proposes new features."""
152
153    def propose(
154        self,
155        subjective_state: MinimalSubjectiveState,
156        active_features: Sequence[FeatureSpec],
157    ) -> Sequence[FeatureCandidate]:
158        return ()

Never proposes new features.

def propose( self, subjective_state: 'MinimalSubjectiveState', active_features: 'Sequence[FeatureSpec]') -> 'Sequence[FeatureCandidate]':
153    def propose(
154        self,
155        subjective_state: MinimalSubjectiveState,
156        active_features: Sequence[FeatureSpec],
157    ) -> Sequence[FeatureCandidate]:
158        return ()
class MinimalFeatureRanker(oak_architecture.fine_grained.components.FeatureRanker):
161class MinimalFeatureRanker(FeatureRanker):
162    """Ranks features in their existing order."""
163
164    def rank(
165        self,
166        features: Sequence[FeatureSpec],
167        utilities: Sequence[UtilityRecord],
168        limit: int | None = None,
169    ) -> Sequence[FeatureId]:
170        feature_ids = [feature.feature_id for feature in features]
171        if limit is None:
172            return tuple(feature_ids)
173        return tuple(feature_ids[:limit])

Ranks features in their existing order.

def rank( self, features: 'Sequence[FeatureSpec]', utilities: 'Sequence[UtilityRecord]', limit: 'int | None' = None) -> 'Sequence[FeatureId]':
164    def rank(
165        self,
166        features: Sequence[FeatureSpec],
167        utilities: Sequence[UtilityRecord],
168        limit: int | None = None,
169    ) -> Sequence[FeatureId]:
170        feature_ids = [feature.feature_id for feature in features]
171        if limit is None:
172            return tuple(feature_ids)
173        return tuple(feature_ids[:limit])
class MinimalSubtaskGenerator(oak_architecture.fine_grained.components.SubtaskGenerator[examples.minimal_oak.MinimalSubjectiveState]):
176class MinimalSubtaskGenerator(SubtaskGenerator[MinimalSubjectiveState]):
177    """Creates at most one subtask per feature."""
178
179    def __init__(self) -> None:
180        self._created_subtask_for: set[FeatureId] = set()
181
182    def generate(
183        self,
184        ranked_feature_ids: Sequence[FeatureId],
185        feature_bank: FeatureBank[MinimalSubjectiveState],
186    ) -> Sequence[SubtaskSpec]:
187        created: list[SubtaskSpec] = []
188        feature_specs = {
189            feature.feature_id: feature for feature in feature_bank.list_features()
190        }
191        for feature_id in ranked_feature_ids:
192            if feature_id in self._created_subtask_for:
193                continue
194            self._created_subtask_for.add(feature_id)
195            feature = feature_specs[feature_id]
196            created.append(
197                SubtaskSpec(
198                    subtask_id=f"subtask:{feature_id}",
199                    name=f"Track {feature.name}",
200                    feature_id=feature_id,
201                )
202            )
203        return tuple(created)

Creates at most one subtask per feature.

def generate( self, ranked_feature_ids: 'Sequence[FeatureId]', feature_bank: 'FeatureBank[MinimalSubjectiveState]') -> 'Sequence[SubtaskSpec]':
182    def generate(
183        self,
184        ranked_feature_ids: Sequence[FeatureId],
185        feature_bank: FeatureBank[MinimalSubjectiveState],
186    ) -> Sequence[SubtaskSpec]:
187        created: list[SubtaskSpec] = []
188        feature_specs = {
189            feature.feature_id: feature for feature in feature_bank.list_features()
190        }
191        for feature_id in ranked_feature_ids:
192            if feature_id in self._created_subtask_for:
193                continue
194            self._created_subtask_for.add(feature_id)
195            feature = feature_specs[feature_id]
196            created.append(
197                SubtaskSpec(
198                    subtask_id=f"subtask:{feature_id}",
199                    name=f"Track {feature.name}",
200                    feature_id=feature_id,
201                )
202            )
203        return tuple(created)
class MinimalWorldModel(oak_architecture.fine_grained.components.WorldModel[examples.minimal_oak.MinimalSubjectiveState, int, examples.minimal_oak.MinimalInfo]):
211class MinimalWorldModel(WorldModel[MinimalSubjectiveState, Action, MinimalInfo]):
212    """Trivial planner-facing model."""
213
214    def update(
215        self,
216        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
217    ) -> None:
218        pass
219
220    def predict_action(
221        self,
222        subjective_state: MinimalSubjectiveState,
223        action: Action,
224    ) -> ModelPrediction[MinimalSubjectiveState]:
225        return ModelPrediction(
226            predicted_subjective_state=subjective_state,
227            cumulative_reward=0.0,
228            steps=1,
229        )
230
231    def predict_option(
232        self,
233        subjective_state: MinimalSubjectiveState,
234        option_id: OptionId,
235    ) -> ModelPrediction[MinimalSubjectiveState]:
236        return ModelPrediction(
237            predicted_subjective_state=subjective_state,
238            cumulative_reward=0.0,
239            steps=1,
240        )
241
242    def add_or_replace_option_models(
243        self, models: Sequence[OptionModel[MinimalSubjectiveState]]
244    ) -> None:
245        pass
246
247    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
248        pass

Trivial planner-facing model.

def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'None':
214    def update(
215        self,
216        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
217    ) -> None:
218        pass
def predict_action( self, subjective_state: 'MinimalSubjectiveState', action: 'Action') -> 'ModelPrediction[MinimalSubjectiveState]':
220    def predict_action(
221        self,
222        subjective_state: MinimalSubjectiveState,
223        action: Action,
224    ) -> ModelPrediction[MinimalSubjectiveState]:
225        return ModelPrediction(
226            predicted_subjective_state=subjective_state,
227            cumulative_reward=0.0,
228            steps=1,
229        )
def predict_option( self, subjective_state: 'MinimalSubjectiveState', option_id: 'OptionId') -> 'ModelPrediction[MinimalSubjectiveState]':
231    def predict_option(
232        self,
233        subjective_state: MinimalSubjectiveState,
234        option_id: OptionId,
235    ) -> ModelPrediction[MinimalSubjectiveState]:
236        return ModelPrediction(
237            predicted_subjective_state=subjective_state,
238            cumulative_reward=0.0,
239            steps=1,
240        )
def add_or_replace_option_models(self, models: 'Sequence[OptionModel[MinimalSubjectiveState]]') -> 'None':
242    def add_or_replace_option_models(
243        self, models: Sequence[OptionModel[MinimalSubjectiveState]]
244    ) -> None:
245        pass
def remove_option_models(self, option_ids: 'Sequence[OptionId]') -> 'None':
247    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
248        pass
class MinimalOptionModelLearner(oak_architecture.fine_grained.components.OptionModelLearner[examples.minimal_oak.MinimalSubjectiveState, int, examples.minimal_oak.MinimalInfo]):
251class MinimalOptionModelLearner(
252    OptionModelLearner[MinimalSubjectiveState, Action, MinimalInfo]
253):
254    """No-op option-model learner."""
255
256    def update(
257        self,
258        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
259    ) -> None:
260        pass
261
262    def export_models(self) -> Sequence[OptionModel[MinimalSubjectiveState]]:
263        return ()

No-op option-model learner.

def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'None':
256    def update(
257        self,
258        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
259    ) -> None:
260        pass
def export_models(self) -> 'Sequence[OptionModel[MinimalSubjectiveState]]':
262    def export_models(self) -> Sequence[OptionModel[MinimalSubjectiveState]]:
263        return ()
class MinimalPlanner(oak_architecture.fine_grained.components.Planner[examples.minimal_oak.MinimalSubjectiveState, int, examples.minimal_oak.MinimalInfo]):
266class MinimalPlanner(Planner[MinimalSubjectiveState, Action, MinimalInfo]):
267    """Returns one-step value targets without real search."""
268
269    def plan_step(
270        self,
271        subjective_state: MinimalSubjectiveState,
272        model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo],
273        value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo],
274        budget: int,
275    ) -> PlanningUpdate[Action]:
276        return PlanningUpdate(
277            value_targets=value_function.predict(subjective_state),
278            policy_targets={"preferred_action": 0},
279            search_statistics={"budget_used": budget},
280        )

Returns one-step value targets without real search.

def plan_step( self, subjective_state: 'MinimalSubjectiveState', model: 'WorldModel[MinimalSubjectiveState, Action, MinimalInfo]', value_function: 'ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo]', budget: 'int') -> 'PlanningUpdate[Action]':
269    def plan_step(
270        self,
271        subjective_state: MinimalSubjectiveState,
272        model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo],
273        value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo],
274        budget: int,
275    ) -> PlanningUpdate[Action]:
276        return PlanningUpdate(
277            value_targets=value_function.predict(subjective_state),
278            policy_targets={"preferred_action": 0},
279            search_statistics={"budget_used": budget},
280        )
class MinimalValueEstimator(oak_architecture.fine_grained.components.ValueEstimator[examples.minimal_oak.MinimalSubjectiveState, int, examples.minimal_oak.MinimalInfo]):
288class MinimalValueEstimator(
289    ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo]
290):
291    """Stores latest reward as the only value estimate."""
292
293    def __init__(self) -> None:
294        self._value: float = 0.0
295
296    def list_general_value_functions(
297        self,
298    ) -> Sequence[
299        GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo]
300    ]:
301        return ()
302
303    def predict(
304        self,
305        subjective_state: MinimalSubjectiveState,
306    ) -> Mapping[GeneralValueFunctionId, float]:
307        return {"main": self._value}
308
309    def update(
310        self,
311        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
312    ) -> Mapping[GeneralValueFunctionId, float]:
313        self._value = transition.reward
314        return {"main": 0.0}
315
316    def add_or_replace(
317        self,
318        learner: GeneralValueFunctionLearner[
319            MinimalSubjectiveState, Action, MinimalInfo
320        ],
321    ) -> None:
322        pass
323
324    def remove(
325        self,
326        general_value_function_ids: Sequence[GeneralValueFunctionId],
327    ) -> None:
328        pass

Stores latest reward as the only value estimate.

def list_general_value_functions( self) -> 'Sequence[GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo]]':
296    def list_general_value_functions(
297        self,
298    ) -> Sequence[
299        GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo]
300    ]:
301        return ()

Return all managed GVF learners.

Intended for Planner implementations that need to inspect the GVF bank (e.g., to evaluate auxiliary predictions during planning).

def predict( self, subjective_state: 'MinimalSubjectiveState') -> 'Mapping[GeneralValueFunctionId, float]':
303    def predict(
304        self,
305        subjective_state: MinimalSubjectiveState,
306    ) -> Mapping[GeneralValueFunctionId, float]:
307        return {"main": self._value}
def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'Mapping[GeneralValueFunctionId, float]':
309    def update(
310        self,
311        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
312    ) -> Mapping[GeneralValueFunctionId, float]:
313        self._value = transition.reward
314        return {"main": 0.0}
def add_or_replace( self, learner: 'GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo]') -> 'None':
316    def add_or_replace(
317        self,
318        learner: GeneralValueFunctionLearner[
319            MinimalSubjectiveState, Action, MinimalInfo
320        ],
321    ) -> None:
322        pass

Add or replace a GVF learner in the bank.

Used for dynamic GVF management, e.g., creating new GVFs when new subtasks or options are discovered.

def remove( self, general_value_function_ids: 'Sequence[GeneralValueFunctionId]') -> 'None':
324    def remove(
325        self,
326        general_value_function_ids: Sequence[GeneralValueFunctionId],
327    ) -> None:
328        pass
class MinimalUtilityAssessor(oak_architecture.fine_grained.components.UtilityAssessor):
331class MinimalUtilityAssessor(UtilityAssessor):
332    """Aggregates usage records into simple counts."""
333
334    def __init__(self) -> None:
335        self._usage_records: list[UsageRecord] = []
336
337    def observe(self, usage: Sequence[UsageRecord]) -> None:
338        self._usage_records.extend(usage)
339
340    def scores(self) -> Sequence[UtilityRecord]:
341        totals: dict[tuple[str, str], float] = {}
342        latest: dict[tuple[str, str], UsageRecord] = {}
343        for record in self._usage_records:
344            key = (record.kind.value, record.component_id)
345            totals[key] = totals.get(key, 0.0) + record.amount
346            latest[key] = record
347        return tuple(
348            UtilityRecord(
349                kind=record.kind,
350                component_id=record.component_id,
351                utility=totals[key],
352            )
353            for key, record in latest.items()
354        )

Aggregates usage records into simple counts.

def observe(self, usage: 'Sequence[UsageRecord]') -> 'None':
337    def observe(self, usage: Sequence[UsageRecord]) -> None:
338        self._usage_records.extend(usage)
def scores(self) -> 'Sequence[UtilityRecord]':
340    def scores(self) -> Sequence[UtilityRecord]:
341        totals: dict[tuple[str, str], float] = {}
342        latest: dict[tuple[str, str], UsageRecord] = {}
343        for record in self._usage_records:
344            key = (record.kind.value, record.component_id)
345            totals[key] = totals.get(key, 0.0) + record.amount
346            latest[key] = record
347        return tuple(
348            UtilityRecord(
349                kind=record.kind,
350                component_id=record.component_id,
351                utility=totals[key],
352            )
353            for key, record in latest.items()
354        )
class MinimalCurator(oak_architecture.fine_grained.components.Curator):
357class MinimalCurator(Curator):
358    """Never prunes."""
359
360    def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision:
361        return CurationDecision()

Never prunes.

def curate(self, utilities: 'Sequence[UtilityRecord]') -> 'CurationDecision':
360    def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision:
361        return CurationDecision()
@dataclass
class MinimalOption(oak_architecture.fine_grained.components.Option[examples.minimal_oak.MinimalSubjectiveState, int]):
369@dataclass
370class MinimalOption(Option[MinimalSubjectiveState, Action]):
371    """Trivial option that always emits action=1 and stops immediately."""
372
373    _descriptor: OptionDescriptor
374    _action: Action = 1
375
376    @property
377    def descriptor(self) -> OptionDescriptor:
378        return self._descriptor
379
380    def is_available(self, subjective_state: MinimalSubjectiveState) -> bool:
381        return True
382
383    def act(self, subjective_state: MinimalSubjectiveState) -> Action:
384        return self._action
385
386    def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float:
387        return 1.0

Trivial option that always emits action=1 and stops immediately.

MinimalOption(_descriptor: 'OptionDescriptor', _action: 'Action' = 1)
descriptor: 'OptionDescriptor'
376    @property
377    def descriptor(self) -> OptionDescriptor:
378        return self._descriptor
def is_available(self, subjective_state: 'MinimalSubjectiveState') -> 'bool':
380    def is_available(self, subjective_state: MinimalSubjectiveState) -> bool:
381        return True

Whether this option can be initiated in the given state.

Intended for ActionSelector implementations, which receive available options and may filter by initiation conditions.

def act(self, subjective_state: 'MinimalSubjectiveState') -> 'Action':
383    def act(self, subjective_state: MinimalSubjectiveState) -> Action:
384        return self._action
def stop_probability(self, subjective_state: 'MinimalSubjectiveState') -> 'float':
386    def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float:
387        return 1.0
class MinimalActionSelector(oak_architecture.fine_grained.components.ActionSelector[examples.minimal_oak.MinimalSubjectiveState, int]):
390class MinimalActionSelector(ActionSelector[MinimalSubjectiveState, Action]):
391    """Alternates primitive actions and option selection."""
392
393    def __init__(self) -> None:
394        self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {}
395        self.last_planning_update: PlanningUpdate[Action] | None = None
396
397    def decide(
398        self,
399        subjective_state: MinimalSubjectiveState,
400        active_option: Option[MinimalSubjectiveState, Action] | None,
401        available_options: Sequence[Option[MinimalSubjectiveState, Action]],
402    ) -> PolicyDecision[Action]:
403        if subjective_state.observation % 2 == 0:
404            return PolicyDecision(action=0)
405        if available_options:
406            return PolicyDecision(option_id=available_options[0].descriptor.option_id)
407        return PolicyDecision(action=1)
408
409    def update_from_values(
410        self,
411        subjective_state: MinimalSubjectiveState,
412        td_errors: Mapping[GeneralValueFunctionId, float],
413    ) -> None:
414        self.last_td_errors = dict(td_errors)
415
416    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
417        self.last_planning_update = update

Alternates primitive actions and option selection.

last_td_errors: 'Mapping[GeneralValueFunctionId, float]'
last_planning_update: 'PlanningUpdate[Action] | None'
def decide( self, subjective_state: 'MinimalSubjectiveState', active_option: 'Option[MinimalSubjectiveState, Action] | None', available_options: 'Sequence[Option[MinimalSubjectiveState, Action]]') -> 'PolicyDecision[Action]':
397    def decide(
398        self,
399        subjective_state: MinimalSubjectiveState,
400        active_option: Option[MinimalSubjectiveState, Action] | None,
401        available_options: Sequence[Option[MinimalSubjectiveState, Action]],
402    ) -> PolicyDecision[Action]:
403        if subjective_state.observation % 2 == 0:
404            return PolicyDecision(action=0)
405        if available_options:
406            return PolicyDecision(option_id=available_options[0].descriptor.option_id)
407        return PolicyDecision(action=1)
def update_from_values( self, subjective_state: 'MinimalSubjectiveState', td_errors: 'Mapping[GeneralValueFunctionId, float]') -> 'None':
409    def update_from_values(
410        self,
411        subjective_state: MinimalSubjectiveState,
412        td_errors: Mapping[GeneralValueFunctionId, float],
413    ) -> None:
414        self.last_td_errors = dict(td_errors)
def apply_planning_update(self, update: 'PlanningUpdate[Action]') -> 'None':
416    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
417        self.last_planning_update = update
class MinimalOptionLibrary(oak_architecture.fine_grained.components.OptionLibrary[examples.minimal_oak.MinimalSubjectiveState, int]):
420class MinimalOptionLibrary(OptionLibrary[MinimalSubjectiveState, Action]):
421    """Stores learned options."""
422
423    def __init__(self) -> None:
424        self._options: dict[OptionId, Option[MinimalSubjectiveState, Action]] = {}
425
426    def list_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
427        return tuple(self._options.values())
428
429    def get(self, option_id: OptionId) -> Option[MinimalSubjectiveState, Action]:
430        return self._options[option_id]
431
432    def add_or_replace(self, option: Option[MinimalSubjectiveState, Action]) -> None:
433        self._options[option.descriptor.option_id] = option
434
435    def remove(self, option_ids: Sequence[OptionId]) -> None:
436        for option_id in option_ids:
437            self._options.pop(option_id, None)

Stores learned options.

def list_options(self) -> 'Sequence[Option[MinimalSubjectiveState, Action]]':
426    def list_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
427        return tuple(self._options.values())
def get(self, option_id: 'OptionId') -> 'Option[MinimalSubjectiveState, Action]':
429    def get(self, option_id: OptionId) -> Option[MinimalSubjectiveState, Action]:
430        return self._options[option_id]
def add_or_replace(self, option: 'Option[MinimalSubjectiveState, Action]') -> 'None':
432    def add_or_replace(self, option: Option[MinimalSubjectiveState, Action]) -> None:
433        self._options[option.descriptor.option_id] = option
def remove(self, option_ids: 'Sequence[OptionId]') -> 'None':
435    def remove(self, option_ids: Sequence[OptionId]) -> None:
436        for option_id in option_ids:
437            self._options.pop(option_id, None)
class MinimalOptionLearner(oak_architecture.fine_grained.components.OptionLearner[examples.minimal_oak.MinimalSubjectiveState, int, examples.minimal_oak.MinimalInfo]):
440class MinimalOptionLearner(OptionLearner[MinimalSubjectiveState, Action, MinimalInfo]):
441    """Creates one trivial option per subtask."""
442
443    def __init__(self) -> None:
444        self._subtasks: dict[SubtaskId, SubtaskSpec] = {}
445        self._options: dict[OptionId, MinimalOption] = {}
446
447    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
448        for subtask in subtasks:
449            self._subtasks[subtask.subtask_id] = subtask
450            option_id = f"option:{subtask.subtask_id}"
451            self._options[option_id] = MinimalOption(
452                OptionDescriptor(
453                    option_id=option_id,
454                    name=f"Option for {subtask.subtask_id}",
455                    subtask_id=subtask.subtask_id,
456                )
457            )
458
459    def update(
460        self,
461        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
462    ) -> None:
463        pass
464
465    def export_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
466        return tuple(self._options.values())
467
468    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
469        for subtask_id in subtask_ids:
470            self._subtasks.pop(subtask_id, None)
471            self._options.pop(f"option:{subtask_id}", None)

Creates one trivial option per subtask.

def ingest_subtasks(self, subtasks: 'Sequence[SubtaskSpec]') -> 'None':
447    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
448        for subtask in subtasks:
449            self._subtasks[subtask.subtask_id] = subtask
450            option_id = f"option:{subtask.subtask_id}"
451            self._options[option_id] = MinimalOption(
452                OptionDescriptor(
453                    option_id=option_id,
454                    name=f"Option for {subtask.subtask_id}",
455                    subtask_id=subtask.subtask_id,
456                )
457            )
def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'None':
459    def update(
460        self,
461        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
462    ) -> None:
463        pass
def export_options(self) -> 'Sequence[Option[MinimalSubjectiveState, Action]]':
465    def export_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]:
466        return tuple(self._options.values())
def remove_subtasks(self, subtask_ids: 'Sequence[SubtaskId]') -> 'None':
468    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
469        for subtask_id in subtask_ids:
470            self._subtasks.pop(subtask_id, None)
471            self._options.pop(f"option:{subtask_id}", None)
def build_minimal_agent() -> 'OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]':
479def build_minimal_agent() -> (
480    OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]
481):
482    """Construct a fully wired fine-grained smoke-test OaK agent."""
483    perception = CompositePerception(
484        state_builder=MinimalStateBuilder(),
485        feature_bank=MinimalFeatureBank(),
486        feature_constructor=MinimalFeatureConstructor(),
487        feature_ranker=MinimalFeatureRanker(),
488        subtask_generator=MinimalSubtaskGenerator(),
489    )
490    transition_model = CompositeTransitionModel(
491        world_model=MinimalWorldModel(),
492        option_model_learner=MinimalOptionModelLearner(),
493        planner=MinimalPlanner(),
494    )
495    value_function = CompositeValueFunction(
496        value_estimator=MinimalValueEstimator(),
497        utility_assessor=MinimalUtilityAssessor(),
498        curator=MinimalCurator(),
499    )
500    action_selector = MinimalActionSelector()
501    reactive_policy = CompositeReactivePolicy(
502        action_selector=action_selector,
503        option_library=MinimalOptionLibrary(),
504        option_learner=MinimalOptionLearner(),
505    )
506    return OaKAgent(
507        perception=perception,
508        transition_model=transition_model,
509        value_function=value_function,
510        reactive_policy=reactive_policy,
511        planning_budget=4,
512    )

Construct a fully wired fine-grained smoke-test OaK agent.

def run_minimal_episode(horizon: 'int' = 5) -> 'list[MinimalTraceStep]':
515def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]:
516    """Run a short smoke episode and return a compact trace."""
517    world = MinimalWorld(horizon=horizon)
518    agent = build_minimal_agent()
519    step = world.reset()
520    agent.reset()
521
522    trace: list[MinimalTraceStep] = []
523
524    for _ in range(horizon):
525        result = agent.step(step)
526        action = result.action
527        trace.append(
528            {
529                "subjective_state": result.subjective_state,
530                "action": action,
531                "active_option_id": result.active_option_id,
532                "created_subtasks": [
533                    subtask.subtask_id for subtask in result.created_subtasks
534                ],
535                "planning_budget_used": (
536                    int(result.planning_update.search_statistics["budget_used"])
537                    if result.planning_update is not None
538                    else None
539                ),
540            }
541        )
542        step = world.step(action)
543        if step.terminated:
544            break
545
546    return trace

Run a short smoke episode and return a compact trace.