examples.smoke.minimal_oak

  1from __future__ import annotations
  2
  3"""Bare-minimum external implementation used to smoke-test the interface.
  4
  5This module answers a single question: can the current package interfaces be
  6instantiated and run through a complete OaK step loop?
  7
  8The implementation shows the **direct** approach: each of Sutton's four
  9modules (Perception, Transition Model, Value Function, Reactive Policy) is
 10implemented as a single class.  There is no need to use the fine-grained
 11component interfaces or the composite wrappers, which exist for projects
 12that need more modularity inside each module.
 13
 14What this module is:
 15
 16- a tiny integer world
 17- a direct observation-to-subjective_state perception with one fixed feature
 18- a no-op transition model with trivial one-step planning
 19- a simple value tracker with usage counting and no curation
 20- a reactive policy that alternates actions and options
 21
 22What this module is not:
 23
 24- a trained agent
 25- a realistic planner
 26- a serious option-learning system
 27- a benchmark implementation
 28"""
 29
 30from dataclasses import dataclass
 31from typing import Mapping, Sequence, TypedDict
 32
 33from oak.agent import OaKAgent
 34from oak.interfaces import (
 35    Perception,
 36    ReactivePolicy,
 37    TransitionModel,
 38    ValueFunction,
 39    World,
 40)
 41from oak.types import (
 42    CurationDecision,
 43    FeatureId,
 44    FeatureSpec,
 45    GeneralValueFunctionId,
 46    OptionDescriptor,
 47    OptionId,
 48    PlanningUpdate,
 49    SubtaskId,
 50    SubtaskSpec,
 51    TimeStep,
 52    Transition,
 53    UsageRecord,
 54    UtilityRecord,
 55)
 56
 57Observation = int
 58Action = int
 59
 60
 61class MinimalInfo(TypedDict, total=False):
 62    reset: bool
 63    echo_action: Action
 64
 65
 66class MinimalTraceStep(TypedDict):
 67    subjective_state: "MinimalSubjectiveState"
 68    action: Action
 69    active_option_id: OptionId | None
 70    created_subtasks: list[SubtaskId]
 71    planning_budget_used: int | None
 72
 73
 74def _planning_budget_used(update: PlanningUpdate[Action] | None) -> int | None:
 75    """Extract an integer planning budget from structured search statistics."""
 76    if update is None:
 77        return None
 78
 79    value = update.search_statistics.get("budget_used")
 80    if isinstance(value, bool):
 81        return None
 82    if isinstance(value, int):
 83        return value
 84    return None
 85
 86
 87@dataclass(slots=True, frozen=True)
 88class MinimalSubjectiveState:
 89    """Small concrete subjective state used by the smoke implementation."""
 90
 91    step_index: int
 92    observation: Observation
 93    reward: float
 94    last_action: Action | None
 95
 96
 97# ─────────────────────────────────────────────────────────────────────
 98# Environment
 99# ─────────────────────────────────────────────────────────────────────
100
101
102class MinimalWorld(World[Observation, Action, MinimalInfo]):
103    """A toy world that increments an integer observation every step."""
104
105    def __init__(self, horizon: int = 5) -> None:
106        self.horizon = horizon
107        self.current_step = 0
108
109    def reset(self) -> TimeStep[Observation, MinimalInfo]:
110        self.current_step = 0
111        return TimeStep(observation=0, reward=0.0, info={"reset": True})
112
113    def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]:
114        self.current_step += 1
115        terminated = self.current_step >= self.horizon
116        reward = 1.0 if action == 1 else 0.0
117        return TimeStep(
118            observation=self.current_step,
119            reward=reward,
120            terminated=terminated,
121            info={"echo_action": action},
122        )
123
124    def close(self) -> None:
125        pass
126
127
128# ─────────────────────────────────────────────────────────────────────
129# Perception
130# ─────────────────────────────────────────────────────────────────────
131
132
133class MinimalPerception(Perception[Observation, Action, MinimalSubjectiveState]):
134    """Direct observation-to-state mapping with one fixed feature.
135
136    - The subjective state is a thin wrapper around the observation.
137    - One identity feature ("observation") is always present.
138    - No new features are ever proposed.
139    - One subtask is created per feature (deduplicated).
140    """
141
142    def __init__(self) -> None:
143        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
144        self._features: dict[FeatureId, FeatureSpec] = {
145            "observation": FeatureSpec(
146                feature_id="observation",
147                name="Observation value",
148                description="Identity feature for the integer observation.",
149            )
150        }
151        self._created_subtask_for: set[FeatureId] = set()
152
153    def reset(self) -> None:
154        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
155
156    def update(
157        self,
158        observation: Observation,
159        reward: float,
160        last_action: Action | None,
161    ) -> MinimalSubjectiveState:
162        self._state = MinimalSubjectiveState(
163            step_index=observation,
164            observation=observation,
165            reward=reward,
166            last_action=last_action,
167        )
168        return self._state
169
170    def current_subjective_state(self) -> MinimalSubjectiveState:
171        return self._state
172
173    def discover_and_rank_features(
174        self,
175        subjective_state: MinimalSubjectiveState,
176        utility_scores: Sequence[UtilityRecord],
177        feature_budget: int,
178    ) -> Sequence[FeatureId]:
179        # No new features proposed; rank existing ones in insertion order.
180        ids = list(self._features.keys())
181        return tuple(ids[:feature_budget])
182
183    def generate_subtasks(
184        self,
185        ranked_feature_ids: Sequence[FeatureId],
186    ) -> Sequence[SubtaskSpec]:
187        created: list[SubtaskSpec] = []
188        for fid in ranked_feature_ids:
189            if fid in self._created_subtask_for:
190                continue
191            self._created_subtask_for.add(fid)
192            created.append(
193                SubtaskSpec(
194                    subtask_id=f"subtask:{fid}",
195                    name=f"Track {fid}",
196                    feature_id=fid,
197                )
198            )
199        return tuple(created)
200
201    def list_features(self) -> Sequence[FeatureSpec]:
202        return tuple(self._features.values())
203
204    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
205        for fid in feature_ids:
206            self._features.pop(fid, None)
207
208
209# ─────────────────────────────────────────────────────────────────────
210# Transition Model
211# ─────────────────────────────────────────────────────────────────────
212
213
214class MinimalTransitionModel(
215    TransitionModel[MinimalSubjectiveState, Action, MinimalInfo]
216):
217    """Trivial world model with one-step lookahead planning.
218
219    - No real model learning (update is a no-op).
220    - No option models.
221    - Planning calls predict once and returns value targets.
222    """
223
224    def update(
225        self,
226        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
227    ) -> None:
228        pass
229
230    def integrate_option_models(self) -> None:
231        pass
232
233    def plan(
234        self,
235        subjective_state: MinimalSubjectiveState,
236        value_function: ValueFunction[MinimalSubjectiveState, Action, MinimalInfo],
237        budget: int,
238    ) -> PlanningUpdate[Action]:
239        return PlanningUpdate(
240            value_targets=value_function.predict(subjective_state),
241            policy_targets={"preferred_action": 0},
242            search_statistics={"budget_used": budget},
243        )
244
245    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
246        pass
247
248
249# ─────────────────────────────────────────────────────────────────────
250# Value Function
251# ─────────────────────────────────────────────────────────────────────
252
253
254class MinimalValueFunction(ValueFunction[MinimalSubjectiveState, Action, MinimalInfo]):
255    """Stores latest reward as a value, counts usage, never curates.
256
257    - One implicit value learner ("main") that stores the latest reward.
258    - Usage records are accumulated for utility scoring.
259    - Curation always returns an empty decision (no pruning).
260    """
261
262    def __init__(self) -> None:
263        self._value: float = 0.0
264        self._usage_records: list[UsageRecord] = []
265
266    def update(
267        self,
268        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
269        *,
270        planning: bool = False,
271    ) -> Mapping[GeneralValueFunctionId, float]:
272        self._value = transition.reward
273        return {"main": 0.0}
274
275    def predict(
276        self,
277        subjective_state: MinimalSubjectiveState,
278    ) -> Mapping[GeneralValueFunctionId, float]:
279        return {"main": self._value}
280
281    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
282        self._usage_records.extend(usage_records)
283
284    def utility_scores(self) -> Sequence[UtilityRecord]:
285        totals: dict[tuple[str, str], float] = {}
286        latest: dict[tuple[str, str], UsageRecord] = {}
287        for record in self._usage_records:
288            key = (record.kind.value, record.component_id)
289            totals[key] = totals.get(key, 0.0) + record.amount
290            latest[key] = record
291        return tuple(
292            UtilityRecord(
293                kind=record.kind,
294                component_id=record.component_id,
295                utility=totals[key],
296            )
297            for key, record in latest.items()
298        )
299
300    def curate(self) -> CurationDecision:
301        return CurationDecision()
302
303    def remove(
304        self,
305        general_value_function_ids: Sequence[GeneralValueFunctionId],
306    ) -> None:
307        pass
308
309
310# ─────────────────────────────────────────────────────────────────────
311# Reactive Policy
312# ─────────────────────────────────────────────────────────────────────
313
314
315@dataclass
316class _MinimalOption:
317    """Trivial option that always emits action=1 and stops immediately."""
318
319    _descriptor: OptionDescriptor
320    _action: Action = 1
321
322    @property
323    def descriptor(self) -> OptionDescriptor:
324        return self._descriptor
325
326    def is_available(self, subjective_state: MinimalSubjectiveState) -> bool:
327        return True
328
329    def act(self, subjective_state: MinimalSubjectiveState) -> Action:
330        return self._action
331
332    def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float:
333        return 1.0
334
335
336class MinimalReactivePolicy(
337    ReactivePolicy[MinimalSubjectiveState, Action, MinimalInfo]
338):
339    """Alternates primitive actions and options, creates options from subtasks.
340
341    - On even observations: primitive action 0.
342    - On odd observations with options available: executes the first option.
343    - On odd observations without options: primitive action 1.
344    - Options are created 1:1 from ingested subtasks.
345    """
346
347    def __init__(self) -> None:
348        self._active_option: _MinimalOption | None = None
349        self._options: dict[OptionId, _MinimalOption] = {}
350        self._subtasks: dict[SubtaskId, SubtaskSpec] = {}
351        self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {}
352        self.last_planning_update: PlanningUpdate[Action] | None = None
353
354    def update(
355        self,
356        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
357        td_errors: Mapping[GeneralValueFunctionId, float],
358    ) -> None:
359        self.last_td_errors = dict(td_errors)
360
361    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
362        self.last_planning_update = update
363
364    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
365        for subtask in subtasks:
366            self._subtasks[subtask.subtask_id] = subtask
367            option_id = f"option:{subtask.subtask_id}"
368            self._options[option_id] = _MinimalOption(
369                OptionDescriptor(
370                    option_id=option_id,
371                    name=f"Option for {subtask.subtask_id}",
372                    subtask_id=subtask.subtask_id,
373                )
374            )
375
376    def integrate_options(self) -> None:
377        pass  # options already registered in ingest_subtasks
378
379    def select_action(
380        self,
381        subjective_state: MinimalSubjectiveState,
382        option_stop_threshold: float,
383    ) -> tuple[Action, OptionId | None]:
384        # Check if active option should continue
385        if self._active_option is not None:
386            stop_prob = self._active_option.stop_probability(subjective_state)
387            if stop_prob < option_stop_threshold:
388                return (
389                    self._active_option.act(subjective_state),
390                    self._active_option.descriptor.option_id,
391                )
392            self._active_option = None
393
394        # Even observation → primitive action 0
395        if subjective_state.observation % 2 == 0:
396            return (0, None)
397
398        # Odd observation → first available option, or primitive action 1
399        options = list(self._options.values())
400        if options:
401            self._active_option = options[0]
402            return (
403                self._active_option.act(subjective_state),
404                self._active_option.descriptor.option_id,
405            )
406        return (1, None)
407
408    def clear_active_option(self) -> None:
409        self._active_option = None
410
411    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
412        for oid in option_ids:
413            self._options.pop(oid, None)
414        if (
415            self._active_option is not None
416            and self._active_option.descriptor.option_id in option_ids
417        ):
418            self._active_option = None
419
420    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
421        for sid in subtask_ids:
422            self._subtasks.pop(sid, None)
423            self._options.pop(f"option:{sid}", None)
424
425
426# ─────────────────────────────────────────────────────────────────────
427# Wiring
428# ─────────────────────────────────────────────────────────────────────
429
430
431def build_minimal_agent() -> (
432    OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]
433):
434    """Construct a fully wired smoke-test OaK agent."""
435    return OaKAgent(
436        perception=MinimalPerception(),
437        transition_model=MinimalTransitionModel(),
438        value_function=MinimalValueFunction(),
439        reactive_policy=MinimalReactivePolicy(),
440        planning_budget=4,
441    )
442
443
444def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]:
445    """Run a short smoke episode and return a compact trace."""
446    world = MinimalWorld(horizon=horizon)
447    agent = build_minimal_agent()
448    step = world.reset()
449    agent.reset()
450
451    trace: list[MinimalTraceStep] = []
452
453    for _ in range(horizon):
454        result = agent.step(step)
455        action = result.action
456        trace.append(
457            {
458                "subjective_state": result.subjective_state,
459                "action": action,
460                "active_option_id": result.active_option_id,
461                "created_subtasks": [
462                    subtask.subtask_id for subtask in result.created_subtasks
463                ],
464                "planning_budget_used": _planning_budget_used(result.planning_update),
465            }
466        )
467        step = world.step(action)
468        if step.terminated:
469            break
470
471    return trace
472
473
474def run_minimal_training(
475    num_episodes: int = 3,
476    *,
477    horizon: int = 5,
478    average_window: int = 100,
479    solved_threshold: float | None = None,
480) -> list[float]:
481    """Train the minimal smoke agent for a few episodes and return rewards."""
482    world = MinimalWorld(horizon=horizon)
483    agent = build_minimal_agent()
484    try:
485        return agent.train(
486            world,
487            num_episodes=num_episodes,
488            average_window=average_window,
489            solved_threshold=solved_threshold,
490        )
491    finally:
492        world.close()
Observation = <class 'int'>
Action = <class 'int'>
class MinimalInfo(typing.TypedDict):
62class MinimalInfo(TypedDict, total=False):
63    reset: bool
64    echo_action: Action
reset: bool
echo_action: int
class MinimalTraceStep(typing.TypedDict):
67class MinimalTraceStep(TypedDict):
68    subjective_state: "MinimalSubjectiveState"
69    action: Action
70    active_option_id: OptionId | None
71    created_subtasks: list[SubtaskId]
72    planning_budget_used: int | None
subjective_state: MinimalSubjectiveState
action: int
active_option_id: str | None
created_subtasks: list[str]
planning_budget_used: int | None
@dataclass(slots=True, frozen=True)
class MinimalSubjectiveState:
88@dataclass(slots=True, frozen=True)
89class MinimalSubjectiveState:
90    """Small concrete subjective state used by the smoke implementation."""
91
92    step_index: int
93    observation: Observation
94    reward: float
95    last_action: Action | None

Small concrete subjective state used by the smoke implementation.

MinimalSubjectiveState( step_index: 'int', observation: 'Observation', reward: 'float', last_action: 'Action | None')
step_index: 'int'
observation: 'Observation'
reward: 'float'
last_action: 'Action | None'
class MinimalWorld(oak.interfaces.World[int, int, examples.smoke.minimal_oak.MinimalInfo]):
103class MinimalWorld(World[Observation, Action, MinimalInfo]):
104    """A toy world that increments an integer observation every step."""
105
106    def __init__(self, horizon: int = 5) -> None:
107        self.horizon = horizon
108        self.current_step = 0
109
110    def reset(self) -> TimeStep[Observation, MinimalInfo]:
111        self.current_step = 0
112        return TimeStep(observation=0, reward=0.0, info={"reset": True})
113
114    def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]:
115        self.current_step += 1
116        terminated = self.current_step >= self.horizon
117        reward = 1.0 if action == 1 else 0.0
118        return TimeStep(
119            observation=self.current_step,
120            reward=reward,
121            terminated=terminated,
122            info={"echo_action": action},
123        )
124
125    def close(self) -> None:
126        pass

A toy world that increments an integer observation every step.

MinimalWorld(horizon: 'int' = 5)
106    def __init__(self, horizon: int = 5) -> None:
107        self.horizon = horizon
108        self.current_step = 0
horizon
current_step
def reset(self) -> 'TimeStep[Observation, MinimalInfo]':
110    def reset(self) -> TimeStep[Observation, MinimalInfo]:
111        self.current_step = 0
112        return TimeStep(observation=0, reward=0.0, info={"reset": True})
def step(self, action: 'Action') -> 'TimeStep[Observation, MinimalInfo]':
114    def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]:
115        self.current_step += 1
116        terminated = self.current_step >= self.horizon
117        reward = 1.0 if action == 1 else 0.0
118        return TimeStep(
119            observation=self.current_step,
120            reward=reward,
121            terminated=terminated,
122            info={"echo_action": action},
123        )
def close(self) -> 'None':
125    def close(self) -> None:
126        pass

Release environment resources. Default is a no-op.

class MinimalPerception(oak.interfaces.Perception[int, int, examples.smoke.minimal_oak.MinimalSubjectiveState]):
134class MinimalPerception(Perception[Observation, Action, MinimalSubjectiveState]):
135    """Direct observation-to-state mapping with one fixed feature.
136
137    - The subjective state is a thin wrapper around the observation.
138    - One identity feature ("observation") is always present.
139    - No new features are ever proposed.
140    - One subtask is created per feature (deduplicated).
141    """
142
143    def __init__(self) -> None:
144        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
145        self._features: dict[FeatureId, FeatureSpec] = {
146            "observation": FeatureSpec(
147                feature_id="observation",
148                name="Observation value",
149                description="Identity feature for the integer observation.",
150            )
151        }
152        self._created_subtask_for: set[FeatureId] = set()
153
154    def reset(self) -> None:
155        self._state = MinimalSubjectiveState(0, 0, 0.0, None)
156
157    def update(
158        self,
159        observation: Observation,
160        reward: float,
161        last_action: Action | None,
162    ) -> MinimalSubjectiveState:
163        self._state = MinimalSubjectiveState(
164            step_index=observation,
165            observation=observation,
166            reward=reward,
167            last_action=last_action,
168        )
169        return self._state
170
171    def current_subjective_state(self) -> MinimalSubjectiveState:
172        return self._state
173
174    def discover_and_rank_features(
175        self,
176        subjective_state: MinimalSubjectiveState,
177        utility_scores: Sequence[UtilityRecord],
178        feature_budget: int,
179    ) -> Sequence[FeatureId]:
180        # No new features proposed; rank existing ones in insertion order.
181        ids = list(self._features.keys())
182        return tuple(ids[:feature_budget])
183
184    def generate_subtasks(
185        self,
186        ranked_feature_ids: Sequence[FeatureId],
187    ) -> Sequence[SubtaskSpec]:
188        created: list[SubtaskSpec] = []
189        for fid in ranked_feature_ids:
190            if fid in self._created_subtask_for:
191                continue
192            self._created_subtask_for.add(fid)
193            created.append(
194                SubtaskSpec(
195                    subtask_id=f"subtask:{fid}",
196                    name=f"Track {fid}",
197                    feature_id=fid,
198                )
199            )
200        return tuple(created)
201
202    def list_features(self) -> Sequence[FeatureSpec]:
203        return tuple(self._features.values())
204
205    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
206        for fid in feature_ids:
207            self._features.pop(fid, None)

Direct observation-to-state mapping with one fixed feature.

  • The subjective state is a thin wrapper around the observation.
  • One identity feature ("observation") is always present.
  • No new features are ever proposed.
  • One subtask is created per feature (deduplicated).
def reset(self) -> 'None':
154    def reset(self) -> None:
155        self._state = MinimalSubjectiveState(0, 0, 0.0, None)

Reset all perception state for a new episode.

def update( self, observation: 'Observation', reward: 'float', last_action: 'Action | None') -> 'MinimalSubjectiveState':
157    def update(
158        self,
159        observation: Observation,
160        reward: float,
161        last_action: Action | None,
162    ) -> MinimalSubjectiveState:
163        self._state = MinimalSubjectiveState(
164            step_index=observation,
165            observation=observation,
166            reward=reward,
167            last_action=last_action,
168        )
169        return self._state

Process a new observation and return the updated subjective state.

def current_subjective_state(self) -> 'MinimalSubjectiveState':
171    def current_subjective_state(self) -> MinimalSubjectiveState:
172        return self._state

Return the most recently computed subjective state.

def discover_and_rank_features( self, subjective_state: 'MinimalSubjectiveState', utility_scores: 'Sequence[UtilityRecord]', feature_budget: 'int') -> 'Sequence[FeatureId]':
174    def discover_and_rank_features(
175        self,
176        subjective_state: MinimalSubjectiveState,
177        utility_scores: Sequence[UtilityRecord],
178        feature_budget: int,
179    ) -> Sequence[FeatureId]:
180        # No new features proposed; rank existing ones in insertion order.
181        ids = list(self._features.keys())
182        return tuple(ids[:feature_budget])

Propose new features, integrate them, and return the top-ranked IDs.

A typical implementation:

  1. Proposes candidate features from the current subjective state.
  2. Adds accepted candidates to its internal feature store.
  3. Ranks all features using the provided utility scores.
  4. Returns the top feature IDs (up to feature_budget).
def generate_subtasks( self, ranked_feature_ids: 'Sequence[FeatureId]') -> 'Sequence[SubtaskSpec]':
184    def generate_subtasks(
185        self,
186        ranked_feature_ids: Sequence[FeatureId],
187    ) -> Sequence[SubtaskSpec]:
188        created: list[SubtaskSpec] = []
189        for fid in ranked_feature_ids:
190            if fid in self._created_subtask_for:
191                continue
192            self._created_subtask_for.add(fid)
193            created.append(
194                SubtaskSpec(
195                    subtask_id=f"subtask:{fid}",
196                    name=f"Track {fid}",
197                    feature_id=fid,
198                )
199            )
200        return tuple(created)

Turn ranked feature IDs into subtask specifications.

def list_features(self) -> 'Sequence[FeatureSpec]':
202    def list_features(self) -> Sequence[FeatureSpec]:
203        return tuple(self._features.values())

Return all currently tracked features.

def remove_features(self, feature_ids: 'Sequence[FeatureId]') -> 'None':
205    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
206        for fid in feature_ids:
207            self._features.pop(fid, None)

Remove features by ID (called during curation).

class MinimalTransitionModel(oak.interfaces.TransitionModel[examples.smoke.minimal_oak.MinimalSubjectiveState, int, examples.smoke.minimal_oak.MinimalInfo]):
215class MinimalTransitionModel(
216    TransitionModel[MinimalSubjectiveState, Action, MinimalInfo]
217):
218    """Trivial world model with one-step lookahead planning.
219
220    - No real model learning (update is a no-op).
221    - No option models.
222    - Planning calls predict once and returns value targets.
223    """
224
225    def update(
226        self,
227        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
228    ) -> None:
229        pass
230
231    def integrate_option_models(self) -> None:
232        pass
233
234    def plan(
235        self,
236        subjective_state: MinimalSubjectiveState,
237        value_function: ValueFunction[MinimalSubjectiveState, Action, MinimalInfo],
238        budget: int,
239    ) -> PlanningUpdate[Action]:
240        return PlanningUpdate(
241            value_targets=value_function.predict(subjective_state),
242            policy_targets={"preferred_action": 0},
243            search_statistics={"budget_used": budget},
244        )
245
246    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
247        pass

Trivial world model with one-step lookahead planning.

  • No real model learning (update is a no-op).
  • No option models.
  • Planning calls predict once and returns value targets.
def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]') -> 'None':
225    def update(
226        self,
227        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
228    ) -> None:
229        pass

Learn from an observed transition.

This should update both the world model and any option-model learners.

def integrate_option_models(self) -> 'None':
231    def integrate_option_models(self) -> None:
232        pass

Export learned option models and integrate them into the world model.

Called after option learning so that planning reasons over fresh models.

def plan( self, subjective_state: 'MinimalSubjectiveState', value_function: 'ValueFunction[MinimalSubjectiveState, Action, MinimalInfo]', budget: 'int') -> 'PlanningUpdate[Action]':
234    def plan(
235        self,
236        subjective_state: MinimalSubjectiveState,
237        value_function: ValueFunction[MinimalSubjectiveState, Action, MinimalInfo],
238        budget: int,
239    ) -> PlanningUpdate[Action]:
240        return PlanningUpdate(
241            value_targets=value_function.predict(subjective_state),
242            policy_targets={"preferred_action": 0},
243            search_statistics={"budget_used": budget},
244        )

Run bounded planning and return improvement signals.

The planner uses the internal world model together with the supplied value_function (for state evaluation) to produce value targets, policy targets, or search statistics.

def remove_option_models(self, option_ids: 'Sequence[OptionId]') -> 'None':
246    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
247        pass

Remove option models by ID (called during curation).

class MinimalValueFunction(oak.interfaces.ValueFunction[examples.smoke.minimal_oak.MinimalSubjectiveState, int, examples.smoke.minimal_oak.MinimalInfo]):
255class MinimalValueFunction(ValueFunction[MinimalSubjectiveState, Action, MinimalInfo]):
256    """Stores latest reward as a value, counts usage, never curates.
257
258    - One implicit value learner ("main") that stores the latest reward.
259    - Usage records are accumulated for utility scoring.
260    - Curation always returns an empty decision (no pruning).
261    """
262
263    def __init__(self) -> None:
264        self._value: float = 0.0
265        self._usage_records: list[UsageRecord] = []
266
267    def update(
268        self,
269        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
270        *,
271        planning: bool = False,
272    ) -> Mapping[GeneralValueFunctionId, float]:
273        self._value = transition.reward
274        return {"main": 0.0}
275
276    def predict(
277        self,
278        subjective_state: MinimalSubjectiveState,
279    ) -> Mapping[GeneralValueFunctionId, float]:
280        return {"main": self._value}
281
282    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
283        self._usage_records.extend(usage_records)
284
285    def utility_scores(self) -> Sequence[UtilityRecord]:
286        totals: dict[tuple[str, str], float] = {}
287        latest: dict[tuple[str, str], UsageRecord] = {}
288        for record in self._usage_records:
289            key = (record.kind.value, record.component_id)
290            totals[key] = totals.get(key, 0.0) + record.amount
291            latest[key] = record
292        return tuple(
293            UtilityRecord(
294                kind=record.kind,
295                component_id=record.component_id,
296                utility=totals[key],
297            )
298            for key, record in latest.items()
299        )
300
301    def curate(self) -> CurationDecision:
302        return CurationDecision()
303
304    def remove(
305        self,
306        general_value_function_ids: Sequence[GeneralValueFunctionId],
307    ) -> None:
308        pass

Stores latest reward as a value, counts usage, never curates.

  • One implicit value learner ("main") that stores the latest reward.
  • Usage records are accumulated for utility scoring.
  • Curation always returns an empty decision (no pruning).
def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]', *, planning: 'bool' = False) -> 'Mapping[GeneralValueFunctionId, float]':
267    def update(
268        self,
269        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
270        *,
271        planning: bool = False,
272    ) -> Mapping[GeneralValueFunctionId, float]:
273        self._value = transition.reward
274        return {"main": 0.0}

Learn from a transition and return TD-error signals.

def predict( self, subjective_state: 'MinimalSubjectiveState') -> 'Mapping[GeneralValueFunctionId, float]':
276    def predict(
277        self,
278        subjective_state: MinimalSubjectiveState,
279    ) -> Mapping[GeneralValueFunctionId, float]:
280        return {"main": self._value}

Predict values for the given subjective state.

def observe_usage(self, usage_records: 'Sequence[UsageRecord]') -> 'None':
282    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
283        self._usage_records.extend(usage_records)

Record usage evidence for utility assessment.

def utility_scores(self) -> 'Sequence[UtilityRecord]':
285    def utility_scores(self) -> Sequence[UtilityRecord]:
286        totals: dict[tuple[str, str], float] = {}
287        latest: dict[tuple[str, str], UsageRecord] = {}
288        for record in self._usage_records:
289            key = (record.kind.value, record.component_id)
290            totals[key] = totals.get(key, 0.0) + record.amount
291            latest[key] = record
292        return tuple(
293            UtilityRecord(
294                kind=record.kind,
295                component_id=record.component_id,
296                utility=totals[key],
297            )
298            for key, record in latest.items()
299        )

Return current utility estimates for all tracked structures.

def curate(self) -> 'CurationDecision':
301    def curate(self) -> CurationDecision:
302        return CurationDecision()

Decide which learned structures to drop.

def remove( self, general_value_function_ids: 'Sequence[GeneralValueFunctionId]') -> 'None':
304    def remove(
305        self,
306        general_value_function_ids: Sequence[GeneralValueFunctionId],
307    ) -> None:
308        pass

Remove value functions by ID (called during curation).

class MinimalReactivePolicy(oak.interfaces.ReactivePolicy[examples.smoke.minimal_oak.MinimalSubjectiveState, int, examples.smoke.minimal_oak.MinimalInfo]):
337class MinimalReactivePolicy(
338    ReactivePolicy[MinimalSubjectiveState, Action, MinimalInfo]
339):
340    """Alternates primitive actions and options, creates options from subtasks.
341
342    - On even observations: primitive action 0.
343    - On odd observations with options available: executes the first option.
344    - On odd observations without options: primitive action 1.
345    - Options are created 1:1 from ingested subtasks.
346    """
347
348    def __init__(self) -> None:
349        self._active_option: _MinimalOption | None = None
350        self._options: dict[OptionId, _MinimalOption] = {}
351        self._subtasks: dict[SubtaskId, SubtaskSpec] = {}
352        self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {}
353        self.last_planning_update: PlanningUpdate[Action] | None = None
354
355    def update(
356        self,
357        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
358        td_errors: Mapping[GeneralValueFunctionId, float],
359    ) -> None:
360        self.last_td_errors = dict(td_errors)
361
362    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
363        self.last_planning_update = update
364
365    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
366        for subtask in subtasks:
367            self._subtasks[subtask.subtask_id] = subtask
368            option_id = f"option:{subtask.subtask_id}"
369            self._options[option_id] = _MinimalOption(
370                OptionDescriptor(
371                    option_id=option_id,
372                    name=f"Option for {subtask.subtask_id}",
373                    subtask_id=subtask.subtask_id,
374                )
375            )
376
377    def integrate_options(self) -> None:
378        pass  # options already registered in ingest_subtasks
379
380    def select_action(
381        self,
382        subjective_state: MinimalSubjectiveState,
383        option_stop_threshold: float,
384    ) -> tuple[Action, OptionId | None]:
385        # Check if active option should continue
386        if self._active_option is not None:
387            stop_prob = self._active_option.stop_probability(subjective_state)
388            if stop_prob < option_stop_threshold:
389                return (
390                    self._active_option.act(subjective_state),
391                    self._active_option.descriptor.option_id,
392                )
393            self._active_option = None
394
395        # Even observation → primitive action 0
396        if subjective_state.observation % 2 == 0:
397            return (0, None)
398
399        # Odd observation → first available option, or primitive action 1
400        options = list(self._options.values())
401        if options:
402            self._active_option = options[0]
403            return (
404                self._active_option.act(subjective_state),
405                self._active_option.descriptor.option_id,
406            )
407        return (1, None)
408
409    def clear_active_option(self) -> None:
410        self._active_option = None
411
412    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
413        for oid in option_ids:
414            self._options.pop(oid, None)
415        if (
416            self._active_option is not None
417            and self._active_option.descriptor.option_id in option_ids
418        ):
419            self._active_option = None
420
421    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
422        for sid in subtask_ids:
423            self._subtasks.pop(sid, None)
424            self._options.pop(f"option:{sid}", None)

Alternates primitive actions and options, creates options from subtasks.

  • On even observations: primitive action 0.
  • On odd observations with options available: executes the first option.
  • On odd observations without options: primitive action 1.
  • Options are created 1:1 from ingested subtasks.
last_td_errors: 'Mapping[GeneralValueFunctionId, float]'
last_planning_update: 'PlanningUpdate[Action] | None'
def update( self, transition: 'Transition[Action, MinimalSubjectiveState, MinimalInfo]', td_errors: 'Mapping[GeneralValueFunctionId, float]') -> 'None':
355    def update(
356        self,
357        transition: Transition[Action, MinimalSubjectiveState, MinimalInfo],
358        td_errors: Mapping[GeneralValueFunctionId, float],
359    ) -> None:
360        self.last_td_errors = dict(td_errors)

Update the policy and option learners from an observed transition.

def apply_planning_update(self, update: 'PlanningUpdate[Action]') -> 'None':
362    def apply_planning_update(self, update: PlanningUpdate[Action]) -> None:
363        self.last_planning_update = update

Integrate planning improvement signals into the policy.

def ingest_subtasks(self, subtasks: 'Sequence[SubtaskSpec]') -> 'None':
365    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
366        for subtask in subtasks:
367            self._subtasks[subtask.subtask_id] = subtask
368            option_id = f"option:{subtask.subtask_id}"
369            self._options[option_id] = _MinimalOption(
370                OptionDescriptor(
371                    option_id=option_id,
372                    name=f"Option for {subtask.subtask_id}",
373                    subtask_id=subtask.subtask_id,
374                )
375            )

Feed newly created subtasks into the option learner.

def integrate_options(self) -> 'None':
377    def integrate_options(self) -> None:
378        pass  # options already registered in ingest_subtasks

Export learned options into the option library.

def select_action( self, subjective_state: 'MinimalSubjectiveState', option_stop_threshold: 'float') -> 'tuple[Action, OptionId | None]':
380    def select_action(
381        self,
382        subjective_state: MinimalSubjectiveState,
383        option_stop_threshold: float,
384    ) -> tuple[Action, OptionId | None]:
385        # Check if active option should continue
386        if self._active_option is not None:
387            stop_prob = self._active_option.stop_probability(subjective_state)
388            if stop_prob < option_stop_threshold:
389                return (
390                    self._active_option.act(subjective_state),
391                    self._active_option.descriptor.option_id,
392                )
393            self._active_option = None
394
395        # Even observation → primitive action 0
396        if subjective_state.observation % 2 == 0:
397            return (0, None)
398
399        # Odd observation → first available option, or primitive action 1
400        options = list(self._options.values())
401        if options:
402            self._active_option = options[0]
403            return (
404                self._active_option.act(subjective_state),
405                self._active_option.descriptor.option_id,
406            )
407        return (1, None)

Choose a primitive action, possibly by continuing an active option.

Returns a (primitive_action, active_option_id) pair. When no option is active, active_option_id is None.

def clear_active_option(self) -> 'None':
409    def clear_active_option(self) -> None:
410        self._active_option = None

Clear the currently executing option (e.g. at episode boundaries).

def remove_options(self, option_ids: 'Sequence[OptionId]') -> 'None':
412    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
413        for oid in option_ids:
414            self._options.pop(oid, None)
415        if (
416            self._active_option is not None
417            and self._active_option.descriptor.option_id in option_ids
418        ):
419            self._active_option = None

Remove options by ID (called during curation).

def remove_subtasks(self, subtask_ids: 'Sequence[SubtaskId]') -> 'None':
421    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
422        for sid in subtask_ids:
423            self._subtasks.pop(sid, None)
424            self._options.pop(f"option:{sid}", None)

Remove subtasks by ID (called during curation).

def build_minimal_agent() -> 'OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]':
432def build_minimal_agent() -> (
433    OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo]
434):
435    """Construct a fully wired smoke-test OaK agent."""
436    return OaKAgent(
437        perception=MinimalPerception(),
438        transition_model=MinimalTransitionModel(),
439        value_function=MinimalValueFunction(),
440        reactive_policy=MinimalReactivePolicy(),
441        planning_budget=4,
442    )

Construct a fully wired smoke-test OaK agent.

def run_minimal_episode(horizon: 'int' = 5) -> 'list[MinimalTraceStep]':
445def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]:
446    """Run a short smoke episode and return a compact trace."""
447    world = MinimalWorld(horizon=horizon)
448    agent = build_minimal_agent()
449    step = world.reset()
450    agent.reset()
451
452    trace: list[MinimalTraceStep] = []
453
454    for _ in range(horizon):
455        result = agent.step(step)
456        action = result.action
457        trace.append(
458            {
459                "subjective_state": result.subjective_state,
460                "action": action,
461                "active_option_id": result.active_option_id,
462                "created_subtasks": [
463                    subtask.subtask_id for subtask in result.created_subtasks
464                ],
465                "planning_budget_used": _planning_budget_used(result.planning_update),
466            }
467        )
468        step = world.step(action)
469        if step.terminated:
470            break
471
472    return trace

Run a short smoke episode and return a compact trace.

def run_minimal_training( num_episodes: 'int' = 3, *, horizon: 'int' = 5, average_window: 'int' = 100, solved_threshold: 'float | None' = None) -> 'list[float]':
475def run_minimal_training(
476    num_episodes: int = 3,
477    *,
478    horizon: int = 5,
479    average_window: int = 100,
480    solved_threshold: float | None = None,
481) -> list[float]:
482    """Train the minimal smoke agent for a few episodes and return rewards."""
483    world = MinimalWorld(horizon=horizon)
484    agent = build_minimal_agent()
485    try:
486        return agent.train(
487            world,
488            num_episodes=num_episodes,
489            average_window=average_window,
490            solved_threshold=solved_threshold,
491        )
492    finally:
493        world.close()

Train the minimal smoke agent for a few episodes and return rewards.