examples.minimal_oak

  1from __future__ import annotations
  2
  3"""Bare-minimum external implementation used to smoke-test the interface.
  4
  5This module answers a single question: can the current package interfaces be
  6instantiated and run through a complete OaK step loop?
  7
  8The implementation shows the **direct** approach: each of Sutton's four
  9modules (Perception, Transition Model, Value Function, Reactive Policy) is
 10implemented as a single class.  There is no need to use the fine-grained
 11component interfaces or the composite wrappers — those exist for projects
 12that need more modularity inside each module.
 13
 14What this module is:
 15
 16- a tiny integer world
 17- a direct observation-to-subjective_state perception with one fixed feature
 18- a no-op transition model with trivial one-step planning
 19- a simple value tracker with usage counting and no curation
 20- a reactive policy that alternates actions and options
 21
 22What this module is not:
 23
 24- a trained agent
 25- a realistic planner
 26- a serious option-learning system
 27- a benchmark implementation
 28"""
 29
 30from dataclasses import dataclass
 31from typing import Mapping, Sequence, TypedDict
 32
 33from oak_architecture.agent import OaKAgent
 34from oak_architecture.interfaces import (
 35    Perception,
 36    ReactivePolicy,
 37    TransitionModel,
 38    ValueFunction,
 39    World,
 40)
 41from oak_architecture.types import (
 42    CurationDecision,
 43    FeatureId,
 44    FeatureSpec,
 45    GeneralValueFunctionId,
 46    OptionDescriptor,
 47    OptionId,
 48    PlanningUpdate,
 49    SubtaskId,
 50    SubtaskSpec,
 51    TimeStep,
 52    Transition,
 53    UsageRecord,
 54    UtilityRecord,
 55)
 56
 57Observation = int
 58Action = int
 59
 60
 61class MinimalInfo(TypedDict, total=False):
 62    reset: bool
 63    echo_action: Action
 64
 65
 66class MinimalTraceStep(TypedDict):
 67    subjective_state: "MinimalSubjectiveState"
 68    action: Action
 69    active_option_id: OptionId | None
 70    created_subtasks: list[SubtaskId]
 71    planning_budget_used: int | None
 72
 73
 74@dataclass(slots=True, frozen=True)
 75class MinimalSubjectiveState:
 76    """Small concrete subjective state used by the smoke implementation."""
 77
 78    step_index: int
 79    observation: Observation
 80    reward: float
 81    last_action: Action | None
 82
 83
 84# ─────────────────────────────────────────────────────────────────────
 85# Environment
 86# ─────────────────────────────────────────────────────────────────────
 87
 88
 89class MinimalWorld(World[Observation, Action, MinimalInfo]):
 90    """A toy world that increments an integer observation every step."""
 91
 92    def __init__(self, horizon: int = 5) -> None:
 93        self.horizon = horizon
 94        self.current_step = 0
 95
 96    def reset(self) -> TimeStep[Observation, MinimalInfo]:
 97        self.current_step = 0
 98        return TimeStep(observation=0, reward=0.0, info={"reset": True})
 99
100    def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]:
101        self.current_step += 1
102        terminated = self.current_step >= self.horizon
103        reward = 1.0 if action == 1 else 0.0
104        return TimeStep(
105            observation=self.current_step,
106            reward=reward,
107            terminated=terminated,
108            info={"echo_action": action},
109        )
110
111
112# ─────────────────────────────────────────────────────────────────────
113# Perception
114# ─────────────────────────────────────────────────────────────────────
115
116
117class MinimalPerception(Perception[Observation, Action, MinimalSubjectiveState]):
118    """Direct observation-to-state mapping with one fixed feature.
119
120    - The subjective state is a thin wrapper around the observation.
121    - One identity feature ("observation") is always present.
122    - No new features are ever proposed.
123    - One subtask is created per feature (deduplicated).
124    """
125
126    def __init__(self) -> None:
127        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
128        self._features: dict[FeatureId, FeatureSpec] = {
129            "observation": FeatureSpec(
130                feature_id="observation",
131                name="Observation value",
132                description="Identity feature for the integer observation.",
133            )
134        }
135        self._created_subtask_for: set[FeatureId] = set()
136
137    def reset(self) -> None:
138        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
139
140    def update(
141        self,
142        observation: Observation,
143        reward: float,
144        last_action: Action | None,
145    ) -> MinimalSubjectiveState:
146        self._state = MinimalSubjectiveState(
147            step_index=observation,
148            observation=observation,
149            reward=reward,
150            last_action=last_action,
151        )
152        return self._state
153
154    def current_subjective_state(self) -> MinimalSubjectiveState:
155        return self._state
156
157    def discover_and_rank_features(
158        self,
159        subjective_state: MinimalSubjectiveState,
160        utility_scores: Sequence[UtilityRecord],
161        feature_budget: int,
162    ) -> Sequence[FeatureId]:
163        # No new features proposed; rank existing ones in insertion order.
164        ids = list(self._features.keys())
165        return tuple(ids[:feature_budget])
166
167    def generate_subtasks(
168        self,
169        ranked_feature_ids: Sequence[FeatureId],
170    ) -> Sequence[SubtaskSpec]:
171        created: list[SubtaskSpec] = []
172        for fid in ranked_feature_ids:
173            if fid in self._created_subtask_for:
174                continue
175            self._created_subtask_for.add(fid)
176            created.append(
177                SubtaskSpec(
178                    subtask_id=f"subtask:{fid}",
179                    name=f"Track {fid}",
180                    feature_id=fid,
181                )
182            )
183        return tuple(created)
184
185    def list_features(self) -> Sequence[FeatureSpec]:
186        return tuple(self._features.values())
187
188    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
189        for fid in feature_ids:
190            self._features.pop(fid, None)
191
192
193# ─────────────────────────────────────────────────────────────────────
194# Transition Model
195# ─────────────────────────────────────────────────────────────────────
196
197
198class MinimalTransitionModel(
199    TransitionModel[MinimalSubjectiveState, Action, MinimalInfo]
200):
201    """Trivial world model with one-step lookahead planning.
202
203    - No real model learning (update is a no-op).
204    - No option models.
205    - Planning calls predict once and returns value targets.
206    """
207
208    def update(
209        self,
210        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
211    ) -> None:
212        pass
213
214    def integrate_option_models(self) -> None:
215        pass
216
217    def plan(
218        self,
219        subjective_state: MinimalSubjectiveState,
220        value_function: ValueFunction[
221            MinimalSubjectiveState, Action, MinimalInfo
222        ],
223        budget: int,
224    ) -> PlanningUpdate[Action]:
225        return PlanningUpdate(
226            value_targets=value_function.predict(subjective_state),
227            policy_targets={"preferred_action": 0},
228            search_statistics={"budget_used": budget},
229        )
230
231    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
232        pass
233
234
235# ─────────────────────────────────────────────────────────────────────
236# Value Function
237# ─────────────────────────────────────────────────────────────────────
238
239
240class MinimalValueFunction(
241    ValueFunction[MinimalSubjectiveState, Action, MinimalInfo]
242):
243    """Stores latest reward as a value, counts usage, never curates.
244
245    - One implicit value learner ("main") that stores the latest reward.
246    - Usage records are accumulated for utility scoring.
247    - Curation always returns an empty decision (no pruning).
248    """
249
250    def __init__(self) -> None:
251        self._value: float = 0.0
252        self._usage_records: list[UsageRecord] = []
253
254    def update(
255        self,
256        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
257    ) -> Mapping[GeneralValueFunctionId, float]:
258        self._value = transition.reward
259        return {"main": 0.0}
260
261    def predict(
262        self,
263        subjective_state: MinimalSubjectiveState,
264    ) -> Mapping[GeneralValueFunctionId, float]:
265        return {"main": self._value}
266
267    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
268        self._usage_records.extend(usage_records)
269
270    def utility_scores(self) -> Sequence[UtilityRecord]:
271        totals: dict[tuple[str, str], float] = {}
272        latest: dict[tuple[str, str], UsageRecord] = {}
273        for record in self._usage_records:
274            key = (record.kind.value, record.component_id)
275            totals[key] = totals.get(key, 0.0) + record.amount
276            latest[key] = record
277        return tuple(
278            UtilityRecord(
279                kind=record.kind,
280                component_id=record.component_id,
281                utility=totals[key],
282            )
283            for key, record in latest.items()
284        )
285
286    def curate(self) -> CurationDecision:
287        return CurationDecision()
288
289    def remove(
290        self,
291        general_value_function_ids: Sequence[GeneralValueFunctionId],
292    ) -> None:
293        pass
294
295
296# ─────────────────────────────────────────────────────────────────────
297# Reactive Policy
298# ─────────────────────────────────────────────────────────────────────
299
300
301@dataclass
302class _MinimalOption:
303    """Trivial option that always emits action=1 and stops immediately."""
304
305    _descriptor: OptionDescriptor
306    _action: Action = 1
307
308    @property
309    def descriptor(self) -> OptionDescriptor:
310        return self._descriptor
311
312    def is_available(self, subjective_state: MinimalSubjectiveState) -> bool:
313        return True
314
315    def act(self, subjective_state: MinimalSubjectiveState) -> Action:
316        return self._action
317
318    def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float:
319        return 1.0
320
321
322class MinimalReactivePolicy(
323    ReactivePolicy[MinimalSubjectiveState, Action, MinimalInfo]
324):
325    """Alternates primitive actions and options, creates options from subtasks.
326
327    - On even observations: primitive action 0.
328    - On odd observations with options available: executes the first option.
329    - On odd observations without options: primitive action 1.
330    - Options are created 1:1 from ingested subtasks.
331    """
332
333    def __init__(self) -> None:
334        self._active_option: _MinimalOption | None = None
335        self._options: dict[OptionId, _MinimalOption] = {}
336        self._subtasks: dict[SubtaskId, SubtaskSpec] = {}
337        self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {}
338        self.last_planning_update: PlanningUpdate[Action] | None = None
339
340    def update(
341        self,
342        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
343        td_errors: Mapping[GeneralValueFunctionId, float],
344    ) -> None:
345        self.last_td_errors = dict(td_errors)
346
347    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
348        self.last_planning_update = update
349
350    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
351        for subtask in subtasks:
352            self._subtasks[subtask.subtask_id] = subtask
353            option_id = f"option:{subtask.subtask_id}"
354            self._options[option_id] = _MinimalOption(
355                OptionDescriptor(
356                    option_id=option_id,
357                    name=f"Option for {subtask.subtask_id}",
358                    subtask_id=subtask.subtask_id,
359                )
360            )
361
362    def integrate_options(self) -> None:
363        pass  # options already registered in ingest_subtasks
364
365    def select_action(
366        self,
367        subjective_state: MinimalSubjectiveState,
368        option_stop_threshold: float,
369    ) -> tuple[Action, OptionId | None]:
370        # Check if active option should continue
371        if self._active_option is not None:
372            stop_prob = self._active_option.stop_probability(subjective_state)
373            if stop_prob < option_stop_threshold:
374                return (
375                    self._active_option.act(subjective_state),
376                    self._active_option.descriptor.option_id,
377                )
378            self._active_option = None
379
380        # Even observation → primitive action 0
381        if subjective_state.observation % 2 == 0:
382            return (0, None)
383
384        # Odd observation → first available option, or primitive action 1
385        options = list(self._options.values())
386        if options:
387            self._active_option = options[0]
388            return (
389                self._active_option.act(subjective_state),
390                self._active_option.descriptor.option_id,
391            )
392        return (1, None)
393
394    def clear_active_option(self) -> None:
395        self._active_option = None
396
397    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
398        for oid in option_ids:
399            self._options.pop(oid, None)
400        if (
401            self._active_option is not None
402            and self._active_option.descriptor.option_id in option_ids
403        ):
404            self._active_option = None
405
406    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
407        for sid in subtask_ids:
408            self._subtasks.pop(sid, None)
409            self._options.pop(f"option:{sid}", None)
410
411
412# ─────────────────────────────────────────────────────────────────────
413# Wiring
414# ─────────────────────────────────────────────────────────────────────
415
416
417def build_minimal_agent() -> (
418    OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]
419):
420    """Construct a fully wired smoke-test OaK agent."""
421    return OaKAgent(
422        perception=MinimalPerception(),
423        transition_model=MinimalTransitionModel(),
424        value_function=MinimalValueFunction(),
425        reactive_policy=MinimalReactivePolicy(),
426        planning_budget=4,
427    )
428
429
430def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]:
431    """Run a short smoke episode and return a compact trace."""
432    world = MinimalWorld(horizon=horizon)
433    agent = build_minimal_agent()
434    step = world.reset()
435    agent.reset()
436
437    trace: list[MinimalTraceStep] = []
438
439    for _ in range(horizon):
440        result = agent.step(step)
441        action = result.action
442        trace.append(
443            {
444                "subjective_state": result.subjective_state,
445                "action": action,
446                "active_option_id": result.active_option_id,
447                "created_subtasks": [
448                    subtask.subtask_id for subtask in result.created_subtasks
449                ],
450                "planning_budget_used": (
451                    result.planning_update.search_statistics["budget_used"]
452                    if result.planning_update is not None
453                    else None
454                ),
455            }
456        )
457        step = world.step(action)
458        if step.terminated:
459            break
460
461    return trace
Observation = <class 'int'>
Action = <class 'int'>
class MinimalInfo(typing.TypedDict):
62class MinimalInfo(TypedDict, total=False):
63    reset: bool
64    echo_action: Action
reset: bool
echo_action: int
class MinimalTraceStep(typing.TypedDict):
67class MinimalTraceStep(TypedDict):
68    subjective_state: "MinimalSubjectiveState"
69    action: Action
70    active_option_id: OptionId | None
71    created_subtasks: list[SubtaskId]
72    planning_budget_used: int | None
subjective_state: MinimalSubjectiveState
action: int
active_option_id: str | None
created_subtasks: list[str]
planning_budget_used: int | None
@dataclass(slots=True, frozen=True)
class MinimalSubjectiveState:
75@dataclass(slots=True, frozen=True)
76class MinimalSubjectiveState:
77    """Small concrete subjective state used by the smoke implementation."""
78
79    step_index: int
80    observation: Observation
81    reward: float
82    last_action: Action | None

Small concrete subjective state used by the smoke implementation.

MinimalSubjectiveState( step_index: 'int', observation: 'Observation', reward: 'float', last_action: 'Action | None')
step_index: 'int'
observation: 'Observation'
reward: 'float'
last_action: 'Action | None'
class MinimalWorld(oak_architecture.interfaces.World[int, int, examples.minimal_oak.MinimalInfo]):
 90class MinimalWorld(World[Observation, Action, MinimalInfo]):
 91    """A toy world that increments an integer observation every step."""
 92
 93    def __init__(self, horizon: int = 5) -> None:
 94        self.horizon = horizon
 95        self.current_step = 0
 96
 97    def reset(self) -> TimeStep[Observation, MinimalInfo]:
 98        self.current_step = 0
 99        return TimeStep(observation=0, reward=0.0, info={"reset": True})
100
101    def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]:
102        self.current_step += 1
103        terminated = self.current_step >= self.horizon
104        reward = 1.0 if action == 1 else 0.0
105        return TimeStep(
106            observation=self.current_step,
107            reward=reward,
108            terminated=terminated,
109            info={"echo_action": action},
110        )

A toy world that increments an integer observation every step.

MinimalWorld(horizon: 'int' = 5)
93    def __init__(self, horizon: int = 5) -> None:
94        self.horizon = horizon
95        self.current_step = 0
horizon
current_step
def reset(self) -> 'TimeStep[Observation, MinimalInfo]':
97    def reset(self) -> TimeStep[Observation, MinimalInfo]:
98        self.current_step = 0
99        return TimeStep(observation=0, reward=0.0, info={"reset": True})
def step(self, action: 'Action') -> 'TimeStep[Observation, MinimalInfo]':
101    def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]:
102        self.current_step += 1
103        terminated = self.current_step >= self.horizon
104        reward = 1.0 if action == 1 else 0.0
105        return TimeStep(
106            observation=self.current_step,
107            reward=reward,
108            terminated=terminated,
109            info={"echo_action": action},
110        )
class MinimalPerception(oak_architecture.interfaces.Perception[int, int, examples.minimal_oak.MinimalSubjectiveState]):
118class MinimalPerception(Perception[Observation, Action, MinimalSubjectiveState]):
119    """Direct observation-to-state mapping with one fixed feature.
120
121    - The subjective state is a thin wrapper around the observation.
122    - One identity feature ("observation") is always present.
123    - No new features are ever proposed.
124    - One subtask is created per feature (deduplicated).
125    """
126
127    def __init__(self) -> None:
128        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
129        self._features: dict[FeatureId, FeatureSpec] = {
130            "observation": FeatureSpec(
131                feature_id="observation",
132                name="Observation value",
133                description="Identity feature for the integer observation.",
134            )
135        }
136        self._created_subtask_for: set[FeatureId] = set()
137
138    def reset(self) -> None:
139        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
140
141    def update(
142        self,
143        observation: Observation,
144        reward: float,
145        last_action: Action | None,
146    ) -> MinimalSubjectiveState:
147        self._state = MinimalSubjectiveState(
148            step_index=observation,
149            observation=observation,
150            reward=reward,
151            last_action=last_action,
152        )
153        return self._state
154
155    def current_subjective_state(self) -> MinimalSubjectiveState:
156        return self._state
157
158    def discover_and_rank_features(
159        self,
160        subjective_state: MinimalSubjectiveState,
161        utility_scores: Sequence[UtilityRecord],
162        feature_budget: int,
163    ) -> Sequence[FeatureId]:
164        # No new features proposed; rank existing ones in insertion order.
165        ids = list(self._features.keys())
166        return tuple(ids[:feature_budget])
167
168    def generate_subtasks(
169        self,
170        ranked_feature_ids: Sequence[FeatureId],
171    ) -> Sequence[SubtaskSpec]:
172        created: list[SubtaskSpec] = []
173        for fid in ranked_feature_ids:
174            if fid in self._created_subtask_for:
175                continue
176            self._created_subtask_for.add(fid)
177            created.append(
178                SubtaskSpec(
179                    subtask_id=f"subtask:{fid}",
180                    name=f"Track {fid}",
181                    feature_id=fid,
182                )
183            )
184        return tuple(created)
185
186    def list_features(self) -> Sequence[FeatureSpec]:
187        return tuple(self._features.values())
188
189    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
190        for fid in feature_ids:
191            self._features.pop(fid, None)

Direct observation-to-state mapping with one fixed feature.

  • The subjective state is a thin wrapper around the observation.
  • One identity feature ("observation") is always present.
  • No new features are ever proposed.
  • One subtask is created per feature (deduplicated).
def reset(self) -> 'None':
138    def reset(self) -> None:
139        self._state = MinimalSubjectiveState(0, 0, 0.0, None)

Reset all perception state for a new episode.

def update( self, observation: 'Observation', reward: 'float', last_action: 'Action | None') -> 'MinimalSubjectiveState':
141    def update(
142        self,
143        observation: Observation,
144        reward: float,
145        last_action: Action | None,
146    ) -> MinimalSubjectiveState:
147        self._state = MinimalSubjectiveState(
148            step_index=observation,
149            observation=observation,
150            reward=reward,
151            last_action=last_action,
152        )
153        return self._state

Process a new observation and return the updated subjective state.

def current_subjective_state(self) -> 'MinimalSubjectiveState':
155    def current_subjective_state(self) -> MinimalSubjectiveState:
156        return self._state

Return the most recently computed subjective state.

def discover_and_rank_features( self, subjective_state: 'MinimalSubjectiveState', utility_scores: 'Sequence[UtilityRecord]', feature_budget: 'int') -> 'Sequence[FeatureId]':
158    def discover_and_rank_features(
159        self,
160        subjective_state: MinimalSubjectiveState,
161        utility_scores: Sequence[UtilityRecord],
162        feature_budget: int,
163    ) -> Sequence[FeatureId]:
164        # No new features proposed; rank existing ones in insertion order.
165        ids = list(self._features.keys())
166        return tuple(ids[:feature_budget])

Propose new features, integrate them, and return the top-ranked IDs.

A typical implementation:

  1. Proposes candidate features from the current subjective state.
  2. Adds accepted candidates to its internal feature store.
  3. Ranks all features using the provided utility scores.
  4. Returns the top feature IDs (up to feature_budget).
def generate_subtasks( self, ranked_feature_ids: 'Sequence[FeatureId]') -> 'Sequence[SubtaskSpec]':
168    def generate_subtasks(
169        self,
170        ranked_feature_ids: Sequence[FeatureId],
171    ) -> Sequence[SubtaskSpec]:
172        created: list[SubtaskSpec] = []
173        for fid in ranked_feature_ids:
174            if fid in self._created_subtask_for:
175                continue
176            self._created_subtask_for.add(fid)
177            created.append(
178                SubtaskSpec(
179                    subtask_id=f"subtask:{fid}",
180                    name=f"Track {fid}",
181                    feature_id=fid,
182                )
183            )
184        return tuple(created)

Turn ranked feature IDs into subtask specifications.

def list_features(self) -> 'Sequence[FeatureSpec]':
186    def list_features(self) -> Sequence[FeatureSpec]:
187        return tuple(self._features.values())

Return all currently tracked features.

def remove_features(self, feature_ids: 'Sequence[FeatureId]') -> 'None':
189    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
190        for fid in feature_ids:
191            self._features.pop(fid, None)

Remove features by ID (called during curation).

class MinimalTransitionModel(oak_architecture.interfaces.TransitionModel[examples.minimal_oak.MinimalSubjectiveState, int, examples.minimal_oak.MinimalInfo]):
199class MinimalTransitionModel(
200    TransitionModel[MinimalSubjectiveState, Action, MinimalInfo]
201):
202    """Trivial world model with one-step lookahead planning.
203
204    - No real model learning (update is a no-op).
205    - No option models.
206    - Planning calls predict once and returns value targets.
207    """
208
209    def update(
210        self,
211        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
212    ) -> None:
213        pass
214
215    def integrate_option_models(self) -> None:
216        pass
217
218    def plan(
219        self,
220        subjective_state: MinimalSubjectiveState,
221        value_function: ValueFunction[
222            MinimalSubjectiveState, Action, MinimalInfo
223        ],
224        budget: int,
225    ) -> PlanningUpdate[Action]:
226        return PlanningUpdate(
227            value_targets=value_function.predict(subjective_state),
228            policy_targets={"preferred_action": 0},
229            search_statistics={"budget_used": budget},
230        )
231
232    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
233        pass

Trivial world model with one-step lookahead planning.

  • No real model learning (update is a no-op).
  • No option models.
  • Planning calls predict once and returns value targets.
def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'None':
209    def update(
210        self,
211        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
212    ) -> None:
213        pass

Learn from an observed transition.

This should update both the world model and any option-model learners.

def integrate_option_models(self) -> 'None':
215    def integrate_option_models(self) -> None:
216        pass

Export learned option models and integrate them into the world model.

Called after option learning so that planning reasons over fresh models.

def plan( self, subjective_state: 'MinimalSubjectiveState', value_function: 'ValueFunction[MinimalSubjectiveState, Action, MinimalInfo]', budget: 'int') -> 'PlanningUpdate[Action]':
218    def plan(
219        self,
220        subjective_state: MinimalSubjectiveState,
221        value_function: ValueFunction[
222            MinimalSubjectiveState, Action, MinimalInfo
223        ],
224        budget: int,
225    ) -> PlanningUpdate[Action]:
226        return PlanningUpdate(
227            value_targets=value_function.predict(subjective_state),
228            policy_targets={"preferred_action": 0},
229            search_statistics={"budget_used": budget},
230        )

Run bounded planning and return improvement signals.

The planner uses the internal world model together with the supplied value_function (for state evaluation) to produce value targets, policy targets, or search statistics.

def remove_option_models(self, option_ids: 'Sequence[OptionId]') -> 'None':
232    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
233        pass

Remove option models by ID (called during curation).

class MinimalValueFunction(oak_architecture.interfaces.ValueFunction[examples.minimal_oak.MinimalSubjectiveState, int, examples.minimal_oak.MinimalInfo]):
241class MinimalValueFunction(
242    ValueFunction[MinimalSubjectiveState, Action, MinimalInfo]
243):
244    """Stores latest reward as a value, counts usage, never curates.
245
246    - One implicit value learner ("main") that stores the latest reward.
247    - Usage records are accumulated for utility scoring.
248    - Curation always returns an empty decision (no pruning).
249    """
250
251    def __init__(self) -> None:
252        self._value: float = 0.0
253        self._usage_records: list[UsageRecord] = []
254
255    def update(
256        self,
257        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
258    ) -> Mapping[GeneralValueFunctionId, float]:
259        self._value = transition.reward
260        return {"main": 0.0}
261
262    def predict(
263        self,
264        subjective_state: MinimalSubjectiveState,
265    ) -> Mapping[GeneralValueFunctionId, float]:
266        return {"main": self._value}
267
268    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
269        self._usage_records.extend(usage_records)
270
271    def utility_scores(self) -> Sequence[UtilityRecord]:
272        totals: dict[tuple[str, str], float] = {}
273        latest: dict[tuple[str, str], UsageRecord] = {}
274        for record in self._usage_records:
275            key = (record.kind.value, record.component_id)
276            totals[key] = totals.get(key, 0.0) + record.amount
277            latest[key] = record
278        return tuple(
279            UtilityRecord(
280                kind=record.kind,
281                component_id=record.component_id,
282                utility=totals[key],
283            )
284            for key, record in latest.items()
285        )
286
287    def curate(self) -> CurationDecision:
288        return CurationDecision()
289
290    def remove(
291        self,
292        general_value_function_ids: Sequence[GeneralValueFunctionId],
293    ) -> None:
294        pass

Stores latest reward as a value, counts usage, never curates.

  • One implicit value learner ("main") that stores the latest reward.
  • Usage records are accumulated for utility scoring.
  • Curation always returns an empty decision (no pruning).
def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'Mapping[GeneralValueFunctionId, float]':
255    def update(
256        self,
257        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
258    ) -> Mapping[GeneralValueFunctionId, float]:
259        self._value = transition.reward
260        return {"main": 0.0}

Learn from a transition and return TD-error signals.

def predict( self, subjective_state: 'MinimalSubjectiveState') -> 'Mapping[GeneralValueFunctionId, float]':
262    def predict(
263        self,
264        subjective_state: MinimalSubjectiveState,
265    ) -> Mapping[GeneralValueFunctionId, float]:
266        return {"main": self._value}

Predict values for the given subjective state.

def observe_usage(self, usage_records: 'Sequence[UsageRecord]') -> 'None':
268    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
269        self._usage_records.extend(usage_records)

Record usage evidence for utility assessment.

def utility_scores(self) -> 'Sequence[UtilityRecord]':
271    def utility_scores(self) -> Sequence[UtilityRecord]:
272        totals: dict[tuple[str, str], float] = {}
273        latest: dict[tuple[str, str], UsageRecord] = {}
274        for record in self._usage_records:
275            key = (record.kind.value, record.component_id)
276            totals[key] = totals.get(key, 0.0) + record.amount
277            latest[key] = record
278        return tuple(
279            UtilityRecord(
280                kind=record.kind,
281                component_id=record.component_id,
282                utility=totals[key],
283            )
284            for key, record in latest.items()
285        )

Return current utility estimates for all tracked structures.

def curate(self) -> 'CurationDecision':
287    def curate(self) -> CurationDecision:
288        return CurationDecision()

Decide which learned structures to drop.

def remove( self, general_value_function_ids: 'Sequence[GeneralValueFunctionId]') -> 'None':
290    def remove(
291        self,
292        general_value_function_ids: Sequence[GeneralValueFunctionId],
293    ) -> None:
294        pass

Remove value functions by ID (called during curation).

class MinimalReactivePolicy(oak_architecture.interfaces.ReactivePolicy[examples.minimal_oak.MinimalSubjectiveState, int, examples.minimal_oak.MinimalInfo]):
323class MinimalReactivePolicy(
324    ReactivePolicy[MinimalSubjectiveState, Action, MinimalInfo]
325):
326    """Alternates primitive actions and options, creates options from subtasks.
327
328    - On even observations: primitive action 0.
329    - On odd observations with options available: executes the first option.
330    - On odd observations without options: primitive action 1.
331    - Options are created 1:1 from ingested subtasks.
332    """
333
334    def __init__(self) -> None:
335        self._active_option: _MinimalOption | None = None
336        self._options: dict[OptionId, _MinimalOption] = {}
337        self._subtasks: dict[SubtaskId, SubtaskSpec] = {}
338        self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {}
339        self.last_planning_update: PlanningUpdate[Action] | None = None
340
341    def update(
342        self,
343        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
344        td_errors: Mapping[GeneralValueFunctionId, float],
345    ) -> None:
346        self.last_td_errors = dict(td_errors)
347
348    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
349        self.last_planning_update = update
350
351    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
352        for subtask in subtasks:
353            self._subtasks[subtask.subtask_id] = subtask
354            option_id = f"option:{subtask.subtask_id}"
355            self._options[option_id] = _MinimalOption(
356                OptionDescriptor(
357                    option_id=option_id,
358                    name=f"Option for {subtask.subtask_id}",
359                    subtask_id=subtask.subtask_id,
360                )
361            )
362
363    def integrate_options(self) -> None:
364        pass  # options already registered in ingest_subtasks
365
366    def select_action(
367        self,
368        subjective_state: MinimalSubjectiveState,
369        option_stop_threshold: float,
370    ) -> tuple[Action, OptionId | None]:
371        # Check if active option should continue
372        if self._active_option is not None:
373            stop_prob = self._active_option.stop_probability(subjective_state)
374            if stop_prob < option_stop_threshold:
375                return (
376                    self._active_option.act(subjective_state),
377                    self._active_option.descriptor.option_id,
378                )
379            self._active_option = None
380
381        # Even observation → primitive action 0
382        if subjective_state.observation % 2 == 0:
383            return (0, None)
384
385        # Odd observation → first available option, or primitive action 1
386        options = list(self._options.values())
387        if options:
388            self._active_option = options[0]
389            return (
390                self._active_option.act(subjective_state),
391                self._active_option.descriptor.option_id,
392            )
393        return (1, None)
394
395    def clear_active_option(self) -> None:
396        self._active_option = None
397
398    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
399        for oid in option_ids:
400            self._options.pop(oid, None)
401        if (
402            self._active_option is not None
403            and self._active_option.descriptor.option_id in option_ids
404        ):
405            self._active_option = None
406
407    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
408        for sid in subtask_ids:
409            self._subtasks.pop(sid, None)
410            self._options.pop(f"option:{sid}", None)

Alternates primitive actions and options, creates options from subtasks.

  • On even observations: primitive action 0.
  • On odd observations with options available: executes the first option.
  • On odd observations without options: primitive action 1.
  • Options are created 1:1 from ingested subtasks.
last_td_errors: 'Mapping[GeneralValueFunctionId, float]'
last_planning_update: 'PlanningUpdate[Action] | None'
def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]', td_errors: 'Mapping[GeneralValueFunctionId, float]') -> 'None':
341    def update(
342        self,
343        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
344        td_errors: Mapping[GeneralValueFunctionId, float],
345    ) -> None:
346        self.last_td_errors = dict(td_errors)

Update the policy and option learners from an observed transition.

def apply_planning_update(self, update: 'PlanningUpdate[Action]') -> 'None':
348    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
349        self.last_planning_update = update

Integrate planning improvement signals into the policy.

def ingest_subtasks(self, subtasks: 'Sequence[SubtaskSpec]') -> 'None':
351    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
352        for subtask in subtasks:
353            self._subtasks[subtask.subtask_id] = subtask
354            option_id = f"option:{subtask.subtask_id}"
355            self._options[option_id] = _MinimalOption(
356                OptionDescriptor(
357                    option_id=option_id,
358                    name=f"Option for {subtask.subtask_id}",
359                    subtask_id=subtask.subtask_id,
360                )
361            )

Feed newly created subtasks into the option learner.

def integrate_options(self) -> 'None':
363    def integrate_options(self) -> None:
364        pass  # options already registered in ingest_subtasks

Export learned options into the option library.

def select_action( self, subjective_state: 'MinimalSubjectiveState', option_stop_threshold: 'float') -> 'tuple[Action, OptionId | None]':
366    def select_action(
367        self,
368        subjective_state: MinimalSubjectiveState,
369        option_stop_threshold: float,
370    ) -> tuple[Action, OptionId | None]:
371        # Check if active option should continue
372        if self._active_option is not None:
373            stop_prob = self._active_option.stop_probability(subjective_state)
374            if stop_prob < option_stop_threshold:
375                return (
376                    self._active_option.act(subjective_state),
377                    self._active_option.descriptor.option_id,
378                )
379            self._active_option = None
380
381        # Even observation → primitive action 0
382        if subjective_state.observation % 2 == 0:
383            return (0, None)
384
385        # Odd observation → first available option, or primitive action 1
386        options = list(self._options.values())
387        if options:
388            self._active_option = options[0]
389            return (
390                self._active_option.act(subjective_state),
391                self._active_option.descriptor.option_id,
392            )
393        return (1, None)

Choose a primitive action, possibly by continuing an active option.

Returns a (primitive_action, active_option_id) pair. When no option is active, active_option_id is None.

def clear_active_option(self) -> 'None':
395    def clear_active_option(self) -> None:
396        self._active_option = None

Clear the currently executing option (e.g. at episode boundaries).

def remove_options(self, option_ids: 'Sequence[OptionId]') -> 'None':
398    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
399        for oid in option_ids:
400            self._options.pop(oid, None)
401        if (
402            self._active_option is not None
403            and self._active_option.descriptor.option_id in option_ids
404        ):
405            self._active_option = None

Remove options by ID (called during curation).

def remove_subtasks(self, subtask_ids: 'Sequence[SubtaskId]') -> 'None':
407    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
408        for sid in subtask_ids:
409            self._subtasks.pop(sid, None)
410            self._options.pop(f"option:{sid}", None)

Remove subtasks by ID (called during curation).

def build_minimal_agent() -> 'OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]':
418def build_minimal_agent() -> (
419    OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]
420):
421    """Construct a fully wired smoke-test OaK agent."""
422    return OaKAgent(
423        perception=MinimalPerception(),
424        transition_model=MinimalTransitionModel(),
425        value_function=MinimalValueFunction(),
426        reactive_policy=MinimalReactivePolicy(),
427        planning_budget=4,
428    )

Construct a fully wired smoke-test OaK agent.

def run_minimal_episode(horizon: 'int' = 5) -> 'list[MinimalTraceStep]':
431def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]:
432    """Run a short smoke episode and return a compact trace."""
433    world = MinimalWorld(horizon=horizon)
434    agent = build_minimal_agent()
435    step = world.reset()
436    agent.reset()
437
438    trace: list[MinimalTraceStep] = []
439
440    for _ in range(horizon):
441        result = agent.step(step)
442        action = result.action
443        trace.append(
444            {
445                "subjective_state": result.subjective_state,
446                "action": action,
447                "active_option_id": result.active_option_id,
448                "created_subtasks": [
449                    subtask.subtask_id for subtask in result.created_subtasks
450                ],
451                "planning_budget_used": (
452                    result.planning_update.search_statistics["budget_used"]
453                    if result.planning_update is not None
454                    else None
455                ),
456            }
457        )
458        step = world.step(action)
459        if step.terminated:
460            break
461
462    return trace

Run a short smoke episode and return a compact trace.