examples.smoke.minimal_oak
1from __future__ import annotations 2 3"""Bare-minimum external implementation used to smoke-test the interface. 4 5This module answers a single question: can the current package interfaces be 6instantiated and run through a complete OaK step loop? 7 8The implementation shows the **direct** approach: each of Sutton's four 9modules (Perception, Transition Model, Value Function, Reactive Policy) is 10implemented as a single class. There is no need to use the fine-grained 11component interfaces or the composite wrappers, which exist for projects 12that need more modularity inside each module. 13 14What this module is: 15 16- a tiny integer world 17- a direct observation-to-subjective_state perception with one fixed feature 18- a no-op transition model with trivial one-step planning 19- a simple value tracker with usage counting and no curation 20- a reactive policy that alternates actions and options 21 22What this module is not: 23 24- a trained agent 25- a realistic planner 26- a serious option-learning system 27- a benchmark implementation 28""" 29 30from dataclasses import dataclass 31from typing import Mapping, Sequence, TypedDict 32 33from oak.agent import OaKAgent 34from oak.interfaces import ( 35 Perception, 36 ReactivePolicy, 37 TransitionModel, 38 ValueFunction, 39 World, 40) 41from oak.types import ( 42 CurationDecision, 43 FeatureId, 44 FeatureSpec, 45 GeneralValueFunctionId, 46 OptionDescriptor, 47 OptionId, 48 PlanningUpdate, 49 SubtaskId, 50 SubtaskSpec, 51 TimeStep, 52 Transition, 53 UsageRecord, 54 UtilityRecord, 55) 56 57Observation = int 58Action = int 59 60 61class MinimalInfo(TypedDict, total=False): 62 reset: bool 63 echo_action: Action 64 65 66class MinimalTraceStep(TypedDict): 67 subjective_state: "MinimalSubjectiveState" 68 action: Action 69 active_option_id: OptionId | None 70 created_subtasks: list[SubtaskId] 71 planning_budget_used: int | None 72 73 74def _planning_budget_used(update: PlanningUpdate[Action] | None) -> int | None: 75 """Extract an integer planning budget from structured search statistics.""" 76 if update is None: 77 return None 78 79 value = update.search_statistics.get("budget_used") 80 if isinstance(value, bool): 81 return None 82 if isinstance(value, int): 83 return value 84 return None 85 86 87@dataclass(slots=True, frozen=True) 88class MinimalSubjectiveState: 89 """Small concrete subjective state used by the smoke implementation.""" 90 91 step_index: int 92 observation: Observation 93 reward: float 94 last_action: Action | None 95 96 97# ───────────────────────────────────────────────────────────────────── 98# Environment 99# ───────────────────────────────────────────────────────────────────── 100 101 102class MinimalWorld(World[Observation, Action, MinimalInfo]): 103 """A toy world that increments an integer observation every step.""" 104 105 def __init__(self, horizon: int = 5) -> None: 106 self.horizon = horizon 107 self.current_step = 0 108 109 def reset(self) -> TimeStep[Observation, MinimalInfo]: 110 self.current_step = 0 111 return TimeStep(observation=0, reward=0.0, info={"reset": True}) 112 113 def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]: 114 self.current_step += 1 115 terminated = self.current_step >= self.horizon 116 reward = 1.0 if action == 1 else 0.0 117 return TimeStep( 118 observation=self.current_step, 119 reward=reward, 120 terminated=terminated, 121 info={"echo_action": action}, 122 ) 123 124 def close(self) -> None: 125 pass 126 127 128# ───────────────────────────────────────────────────────────────────── 129# Perception 130# ───────────────────────────────────────────────────────────────────── 131 132 133class MinimalPerception(Perception[Observation, Action, MinimalSubjectiveState]): 134 """Direct observation-to-state mapping with one fixed feature. 135 136 - The subjective state is a thin wrapper around the observation. 137 - One identity feature ("observation") is always present. 138 - No new features are ever proposed. 139 - One subtask is created per feature (deduplicated). 140 """ 141 142 def __init__(self) -> None: 143 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 144 self._features: dict[FeatureId, FeatureSpec] = { 145 "observation": FeatureSpec( 146 feature_id="observation", 147 name="Observation value", 148 description="Identity feature for the integer observation.", 149 ) 150 } 151 self._created_subtask_for: set[FeatureId] = set() 152 153 def reset(self) -> None: 154 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 155 156 def update( 157 self, 158 observation: Observation, 159 reward: float, 160 last_action: Action | None, 161 ) -> MinimalSubjectiveState: 162 self._state = MinimalSubjectiveState( 163 step_index=observation, 164 observation=observation, 165 reward=reward, 166 last_action=last_action, 167 ) 168 return self._state 169 170 def current_subjective_state(self) -> MinimalSubjectiveState: 171 return self._state 172 173 def discover_and_rank_features( 174 self, 175 subjective_state: MinimalSubjectiveState, 176 utility_scores: Sequence[UtilityRecord], 177 feature_budget: int, 178 ) -> Sequence[FeatureId]: 179 # No new features proposed; rank existing ones in insertion order. 180 ids = list(self._features.keys()) 181 return tuple(ids[:feature_budget]) 182 183 def generate_subtasks( 184 self, 185 ranked_feature_ids: Sequence[FeatureId], 186 ) -> Sequence[SubtaskSpec]: 187 created: list[SubtaskSpec] = [] 188 for fid in ranked_feature_ids: 189 if fid in self._created_subtask_for: 190 continue 191 self._created_subtask_for.add(fid) 192 created.append( 193 SubtaskSpec( 194 subtask_id=f"subtask:{fid}", 195 name=f"Track {fid}", 196 feature_id=fid, 197 ) 198 ) 199 return tuple(created) 200 201 def list_features(self) -> Sequence[FeatureSpec]: 202 return tuple(self._features.values()) 203 204 def remove_features(self, feature_ids: Sequence[FeatureId]) -> None: 205 for fid in feature_ids: 206 self._features.pop(fid, None) 207 208 209# ───────────────────────────────────────────────────────────────────── 210# Transition Model 211# ───────────────────────────────────────────────────────────────────── 212 213 214class MinimalTransitionModel( 215 TransitionModel[MinimalSubjectiveState, Action, MinimalInfo] 216): 217 """Trivial world model with one-step lookahead planning. 218 219 - No real model learning (update is a no-op). 220 - No option models. 221 - Planning calls predict once and returns value targets. 222 """ 223 224 def update( 225 self, 226 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 227 ) -> None: 228 pass 229 230 def integrate_option_models(self) -> None: 231 pass 232 233 def plan( 234 self, 235 subjective_state: MinimalSubjectiveState, 236 value_function: ValueFunction[MinimalSubjectiveState, Action, MinimalInfo], 237 budget: int, 238 ) -> PlanningUpdate[Action]: 239 return PlanningUpdate( 240 value_targets=value_function.predict(subjective_state), 241 policy_targets={"preferred_action": 0}, 242 search_statistics={"budget_used": budget}, 243 ) 244 245 def remove_option_models(self, option_ids: Sequence[OptionId]) -> None: 246 pass 247 248 249# ───────────────────────────────────────────────────────────────────── 250# Value Function 251# ───────────────────────────────────────────────────────────────────── 252 253 254class MinimalValueFunction(ValueFunction[MinimalSubjectiveState, Action, MinimalInfo]): 255 """Stores latest reward as a value, counts usage, never curates. 256 257 - One implicit value learner ("main") that stores the latest reward. 258 - Usage records are accumulated for utility scoring. 259 - Curation always returns an empty decision (no pruning). 260 """ 261 262 def __init__(self) -> None: 263 self._value: float = 0.0 264 self._usage_records: list[UsageRecord] = [] 265 266 def update( 267 self, 268 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 269 *, 270 planning: bool = False, 271 ) -> Mapping[GeneralValueFunctionId, float]: 272 self._value = transition.reward 273 return {"main": 0.0} 274 275 def predict( 276 self, 277 subjective_state: MinimalSubjectiveState, 278 ) -> Mapping[GeneralValueFunctionId, float]: 279 return {"main": self._value} 280 281 def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None: 282 self._usage_records.extend(usage_records) 283 284 def utility_scores(self) -> Sequence[UtilityRecord]: 285 totals: dict[tuple[str, str], float] = {} 286 latest: dict[tuple[str, str], UsageRecord] = {} 287 for record in self._usage_records: 288 key = (record.kind.value, record.component_id) 289 totals[key] = totals.get(key, 0.0) + record.amount 290 latest[key] = record 291 return tuple( 292 UtilityRecord( 293 kind=record.kind, 294 component_id=record.component_id, 295 utility=totals[key], 296 ) 297 for key, record in latest.items() 298 ) 299 300 def curate(self) -> CurationDecision: 301 return CurationDecision() 302 303 def remove( 304 self, 305 general_value_function_ids: Sequence[GeneralValueFunctionId], 306 ) -> None: 307 pass 308 309 310# ───────────────────────────────────────────────────────────────────── 311# Reactive Policy 312# ───────────────────────────────────────────────────────────────────── 313 314 315@dataclass 316class _MinimalOption: 317 """Trivial option that always emits action=1 and stops immediately.""" 318 319 _descriptor: OptionDescriptor 320 _action: Action = 1 321 322 @property 323 def descriptor(self) -> OptionDescriptor: 324 return self._descriptor 325 326 def is_available(self, subjective_state: MinimalSubjectiveState) -> bool: 327 return True 328 329 def act(self, subjective_state: MinimalSubjectiveState) -> Action: 330 return self._action 331 332 def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float: 333 return 1.0 334 335 336class MinimalReactivePolicy( 337 ReactivePolicy[MinimalSubjectiveState, Action, MinimalInfo] 338): 339 """Alternates primitive actions and options, creates options from subtasks. 340 341 - On even observations: primitive action 0. 342 - On odd observations with options available: executes the first option. 343 - On odd observations without options: primitive action 1. 344 - Options are created 1:1 from ingested subtasks. 345 """ 346 347 def __init__(self) -> None: 348 self._active_option: _MinimalOption | None = None 349 self._options: dict[OptionId, _MinimalOption] = {} 350 self._subtasks: dict[SubtaskId, SubtaskSpec] = {} 351 self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {} 352 self.last_planning_update: PlanningUpdate[Action] | None = None 353 354 def update( 355 self, 356 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 357 td_errors: Mapping[GeneralValueFunctionId, float], 358 ) -> None: 359 self.last_td_errors = dict(td_errors) 360 361 def apply_planning_update(self, update: PlanningUpdate[Action]) -> None: 362 self.last_planning_update = update 363 364 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 365 for subtask in subtasks: 366 self._subtasks[subtask.subtask_id] = subtask 367 option_id = f"option:{subtask.subtask_id}" 368 self._options[option_id] = _MinimalOption( 369 OptionDescriptor( 370 option_id=option_id, 371 name=f"Option for {subtask.subtask_id}", 372 subtask_id=subtask.subtask_id, 373 ) 374 ) 375 376 def integrate_options(self) -> None: 377 pass # options already registered in ingest_subtasks 378 379 def select_action( 380 self, 381 subjective_state: MinimalSubjectiveState, 382 option_stop_threshold: float, 383 ) -> tuple[Action, OptionId | None]: 384 # Check if active option should continue 385 if self._active_option is not None: 386 stop_prob = self._active_option.stop_probability(subjective_state) 387 if stop_prob < option_stop_threshold: 388 return ( 389 self._active_option.act(subjective_state), 390 self._active_option.descriptor.option_id, 391 ) 392 self._active_option = None 393 394 # Even observation → primitive action 0 395 if subjective_state.observation % 2 == 0: 396 return (0, None) 397 398 # Odd observation → first available option, or primitive action 1 399 options = list(self._options.values()) 400 if options: 401 self._active_option = options[0] 402 return ( 403 self._active_option.act(subjective_state), 404 self._active_option.descriptor.option_id, 405 ) 406 return (1, None) 407 408 def clear_active_option(self) -> None: 409 self._active_option = None 410 411 def remove_options(self, option_ids: Sequence[OptionId]) -> None: 412 for oid in option_ids: 413 self._options.pop(oid, None) 414 if ( 415 self._active_option is not None 416 and self._active_option.descriptor.option_id in option_ids 417 ): 418 self._active_option = None 419 420 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 421 for sid in subtask_ids: 422 self._subtasks.pop(sid, None) 423 self._options.pop(f"option:{sid}", None) 424 425 426# ───────────────────────────────────────────────────────────────────── 427# Wiring 428# ───────────────────────────────────────────────────────────────────── 429 430 431def build_minimal_agent() -> ( 432 OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo] 433): 434 """Construct a fully wired smoke-test OaK agent.""" 435 return OaKAgent( 436 perception=MinimalPerception(), 437 transition_model=MinimalTransitionModel(), 438 value_function=MinimalValueFunction(), 439 reactive_policy=MinimalReactivePolicy(), 440 planning_budget=4, 441 ) 442 443 444def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]: 445 """Run a short smoke episode and return a compact trace.""" 446 world = MinimalWorld(horizon=horizon) 447 agent = build_minimal_agent() 448 step = world.reset() 449 agent.reset() 450 451 trace: list[MinimalTraceStep] = [] 452 453 for _ in range(horizon): 454 result = agent.step(step) 455 action = result.action 456 trace.append( 457 { 458 "subjective_state": result.subjective_state, 459 "action": action, 460 "active_option_id": result.active_option_id, 461 "created_subtasks": [ 462 subtask.subtask_id for subtask in result.created_subtasks 463 ], 464 "planning_budget_used": _planning_budget_used(result.planning_update), 465 } 466 ) 467 step = world.step(action) 468 if step.terminated: 469 break 470 471 return trace 472 473 474def run_minimal_training( 475 num_episodes: int = 3, 476 *, 477 horizon: int = 5, 478 average_window: int = 100, 479 solved_threshold: float | None = None, 480) -> list[float]: 481 """Train the minimal smoke agent for a few episodes and return rewards.""" 482 world = MinimalWorld(horizon=horizon) 483 agent = build_minimal_agent() 484 try: 485 return agent.train( 486 world, 487 num_episodes=num_episodes, 488 average_window=average_window, 489 solved_threshold=solved_threshold, 490 ) 491 finally: 492 world.close()
67class MinimalTraceStep(TypedDict): 68 subjective_state: "MinimalSubjectiveState" 69 action: Action 70 active_option_id: OptionId | None 71 created_subtasks: list[SubtaskId] 72 planning_budget_used: int | None
88@dataclass(slots=True, frozen=True) 89class MinimalSubjectiveState: 90 """Small concrete subjective state used by the smoke implementation.""" 91 92 step_index: int 93 observation: Observation 94 reward: float 95 last_action: Action | None
Small concrete subjective state used by the smoke implementation.
103class MinimalWorld(World[Observation, Action, MinimalInfo]): 104 """A toy world that increments an integer observation every step.""" 105 106 def __init__(self, horizon: int = 5) -> None: 107 self.horizon = horizon 108 self.current_step = 0 109 110 def reset(self) -> TimeStep[Observation, MinimalInfo]: 111 self.current_step = 0 112 return TimeStep(observation=0, reward=0.0, info={"reset": True}) 113 114 def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]: 115 self.current_step += 1 116 terminated = self.current_step >= self.horizon 117 reward = 1.0 if action == 1 else 0.0 118 return TimeStep( 119 observation=self.current_step, 120 reward=reward, 121 terminated=terminated, 122 info={"echo_action": action}, 123 ) 124 125 def close(self) -> None: 126 pass
A toy world that increments an integer observation every step.
114 def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]: 115 self.current_step += 1 116 terminated = self.current_step >= self.horizon 117 reward = 1.0 if action == 1 else 0.0 118 return TimeStep( 119 observation=self.current_step, 120 reward=reward, 121 terminated=terminated, 122 info={"echo_action": action}, 123 )
134class MinimalPerception(Perception[Observation, Action, MinimalSubjectiveState]): 135 """Direct observation-to-state mapping with one fixed feature. 136 137 - The subjective state is a thin wrapper around the observation. 138 - One identity feature ("observation") is always present. 139 - No new features are ever proposed. 140 - One subtask is created per feature (deduplicated). 141 """ 142 143 def __init__(self) -> None: 144 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 145 self._features: dict[FeatureId, FeatureSpec] = { 146 "observation": FeatureSpec( 147 feature_id="observation", 148 name="Observation value", 149 description="Identity feature for the integer observation.", 150 ) 151 } 152 self._created_subtask_for: set[FeatureId] = set() 153 154 def reset(self) -> None: 155 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 156 157 def update( 158 self, 159 observation: Observation, 160 reward: float, 161 last_action: Action | None, 162 ) -> MinimalSubjectiveState: 163 self._state = MinimalSubjectiveState( 164 step_index=observation, 165 observation=observation, 166 reward=reward, 167 last_action=last_action, 168 ) 169 return self._state 170 171 def current_subjective_state(self) -> MinimalSubjectiveState: 172 return self._state 173 174 def discover_and_rank_features( 175 self, 176 subjective_state: MinimalSubjectiveState, 177 utility_scores: Sequence[UtilityRecord], 178 feature_budget: int, 179 ) -> Sequence[FeatureId]: 180 # No new features proposed; rank existing ones in insertion order. 181 ids = list(self._features.keys()) 182 return tuple(ids[:feature_budget]) 183 184 def generate_subtasks( 185 self, 186 ranked_feature_ids: Sequence[FeatureId], 187 ) -> Sequence[SubtaskSpec]: 188 created: list[SubtaskSpec] = [] 189 for fid in ranked_feature_ids: 190 if fid in self._created_subtask_for: 191 continue 192 self._created_subtask_for.add(fid) 193 created.append( 194 SubtaskSpec( 195 subtask_id=f"subtask:{fid}", 196 name=f"Track {fid}", 197 feature_id=fid, 198 ) 199 ) 200 return tuple(created) 201 202 def list_features(self) -> Sequence[FeatureSpec]: 203 return tuple(self._features.values()) 204 205 def remove_features(self, feature_ids: Sequence[FeatureId]) -> None: 206 for fid in feature_ids: 207 self._features.pop(fid, None)
Direct observation-to-state mapping with one fixed feature.
- The subjective state is a thin wrapper around the observation.
- One identity feature ("observation") is always present.
- No new features are ever proposed.
- One subtask is created per feature (deduplicated).
157 def update( 158 self, 159 observation: Observation, 160 reward: float, 161 last_action: Action | None, 162 ) -> MinimalSubjectiveState: 163 self._state = MinimalSubjectiveState( 164 step_index=observation, 165 observation=observation, 166 reward=reward, 167 last_action=last_action, 168 ) 169 return self._state
Process a new observation and return the updated subjective state.
Return the most recently computed subjective state.
174 def discover_and_rank_features( 175 self, 176 subjective_state: MinimalSubjectiveState, 177 utility_scores: Sequence[UtilityRecord], 178 feature_budget: int, 179 ) -> Sequence[FeatureId]: 180 # No new features proposed; rank existing ones in insertion order. 181 ids = list(self._features.keys()) 182 return tuple(ids[:feature_budget])
Propose new features, integrate them, and return the top-ranked IDs.
A typical implementation:
- Proposes candidate features from the current subjective state.
- Adds accepted candidates to its internal feature store.
- Ranks all features using the provided utility scores.
- Returns the top feature IDs (up to feature_budget).
184 def generate_subtasks( 185 self, 186 ranked_feature_ids: Sequence[FeatureId], 187 ) -> Sequence[SubtaskSpec]: 188 created: list[SubtaskSpec] = [] 189 for fid in ranked_feature_ids: 190 if fid in self._created_subtask_for: 191 continue 192 self._created_subtask_for.add(fid) 193 created.append( 194 SubtaskSpec( 195 subtask_id=f"subtask:{fid}", 196 name=f"Track {fid}", 197 feature_id=fid, 198 ) 199 ) 200 return tuple(created)
Turn ranked feature IDs into subtask specifications.
215class MinimalTransitionModel( 216 TransitionModel[MinimalSubjectiveState, Action, MinimalInfo] 217): 218 """Trivial world model with one-step lookahead planning. 219 220 - No real model learning (update is a no-op). 221 - No option models. 222 - Planning calls predict once and returns value targets. 223 """ 224 225 def update( 226 self, 227 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 228 ) -> None: 229 pass 230 231 def integrate_option_models(self) -> None: 232 pass 233 234 def plan( 235 self, 236 subjective_state: MinimalSubjectiveState, 237 value_function: ValueFunction[MinimalSubjectiveState, Action, MinimalInfo], 238 budget: int, 239 ) -> PlanningUpdate[Action]: 240 return PlanningUpdate( 241 value_targets=value_function.predict(subjective_state), 242 policy_targets={"preferred_action": 0}, 243 search_statistics={"budget_used": budget}, 244 ) 245 246 def remove_option_models(self, option_ids: Sequence[OptionId]) -> None: 247 pass
Trivial world model with one-step lookahead planning.
- No real model learning (update is a no-op).
- No option models.
- Planning calls predict once and returns value targets.
225 def update( 226 self, 227 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 228 ) -> None: 229 pass
Learn from an observed transition.
This should update both the world model and any option-model learners.
Export learned option models and integrate them into the world model.
Called after option learning so that planning reasons over fresh models.
234 def plan( 235 self, 236 subjective_state: MinimalSubjectiveState, 237 value_function: ValueFunction[MinimalSubjectiveState, Action, MinimalInfo], 238 budget: int, 239 ) -> PlanningUpdate[Action]: 240 return PlanningUpdate( 241 value_targets=value_function.predict(subjective_state), 242 policy_targets={"preferred_action": 0}, 243 search_statistics={"budget_used": budget}, 244 )
Run bounded planning and return improvement signals.
The planner uses the internal world model together with the supplied value_function (for state evaluation) to produce value targets, policy targets, or search statistics.
255class MinimalValueFunction(ValueFunction[MinimalSubjectiveState, Action, MinimalInfo]): 256 """Stores latest reward as a value, counts usage, never curates. 257 258 - One implicit value learner ("main") that stores the latest reward. 259 - Usage records are accumulated for utility scoring. 260 - Curation always returns an empty decision (no pruning). 261 """ 262 263 def __init__(self) -> None: 264 self._value: float = 0.0 265 self._usage_records: list[UsageRecord] = [] 266 267 def update( 268 self, 269 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 270 *, 271 planning: bool = False, 272 ) -> Mapping[GeneralValueFunctionId, float]: 273 self._value = transition.reward 274 return {"main": 0.0} 275 276 def predict( 277 self, 278 subjective_state: MinimalSubjectiveState, 279 ) -> Mapping[GeneralValueFunctionId, float]: 280 return {"main": self._value} 281 282 def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None: 283 self._usage_records.extend(usage_records) 284 285 def utility_scores(self) -> Sequence[UtilityRecord]: 286 totals: dict[tuple[str, str], float] = {} 287 latest: dict[tuple[str, str], UsageRecord] = {} 288 for record in self._usage_records: 289 key = (record.kind.value, record.component_id) 290 totals[key] = totals.get(key, 0.0) + record.amount 291 latest[key] = record 292 return tuple( 293 UtilityRecord( 294 kind=record.kind, 295 component_id=record.component_id, 296 utility=totals[key], 297 ) 298 for key, record in latest.items() 299 ) 300 301 def curate(self) -> CurationDecision: 302 return CurationDecision() 303 304 def remove( 305 self, 306 general_value_function_ids: Sequence[GeneralValueFunctionId], 307 ) -> None: 308 pass
Stores latest reward as a value, counts usage, never curates.
- One implicit value learner ("main") that stores the latest reward.
- Usage records are accumulated for utility scoring.
- Curation always returns an empty decision (no pruning).
267 def update( 268 self, 269 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 270 *, 271 planning: bool = False, 272 ) -> Mapping[GeneralValueFunctionId, float]: 273 self._value = transition.reward 274 return {"main": 0.0}
Learn from a transition and return TD-error signals.
276 def predict( 277 self, 278 subjective_state: MinimalSubjectiveState, 279 ) -> Mapping[GeneralValueFunctionId, float]: 280 return {"main": self._value}
Predict values for the given subjective state.
282 def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None: 283 self._usage_records.extend(usage_records)
Record usage evidence for utility assessment.
285 def utility_scores(self) -> Sequence[UtilityRecord]: 286 totals: dict[tuple[str, str], float] = {} 287 latest: dict[tuple[str, str], UsageRecord] = {} 288 for record in self._usage_records: 289 key = (record.kind.value, record.component_id) 290 totals[key] = totals.get(key, 0.0) + record.amount 291 latest[key] = record 292 return tuple( 293 UtilityRecord( 294 kind=record.kind, 295 component_id=record.component_id, 296 utility=totals[key], 297 ) 298 for key, record in latest.items() 299 )
Return current utility estimates for all tracked structures.
337class MinimalReactivePolicy( 338 ReactivePolicy[MinimalSubjectiveState, Action, MinimalInfo] 339): 340 """Alternates primitive actions and options, creates options from subtasks. 341 342 - On even observations: primitive action 0. 343 - On odd observations with options available: executes the first option. 344 - On odd observations without options: primitive action 1. 345 - Options are created 1:1 from ingested subtasks. 346 """ 347 348 def __init__(self) -> None: 349 self._active_option: _MinimalOption | None = None 350 self._options: dict[OptionId, _MinimalOption] = {} 351 self._subtasks: dict[SubtaskId, SubtaskSpec] = {} 352 self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {} 353 self.last_planning_update: PlanningUpdate[Action] | None = None 354 355 def update( 356 self, 357 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 358 td_errors: Mapping[GeneralValueFunctionId, float], 359 ) -> None: 360 self.last_td_errors = dict(td_errors) 361 362 def apply_planning_update(self, update: PlanningUpdate[Action]) -> None: 363 self.last_planning_update = update 364 365 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 366 for subtask in subtasks: 367 self._subtasks[subtask.subtask_id] = subtask 368 option_id = f"option:{subtask.subtask_id}" 369 self._options[option_id] = _MinimalOption( 370 OptionDescriptor( 371 option_id=option_id, 372 name=f"Option for {subtask.subtask_id}", 373 subtask_id=subtask.subtask_id, 374 ) 375 ) 376 377 def integrate_options(self) -> None: 378 pass # options already registered in ingest_subtasks 379 380 def select_action( 381 self, 382 subjective_state: MinimalSubjectiveState, 383 option_stop_threshold: float, 384 ) -> tuple[Action, OptionId | None]: 385 # Check if active option should continue 386 if self._active_option is not None: 387 stop_prob = self._active_option.stop_probability(subjective_state) 388 if stop_prob < option_stop_threshold: 389 return ( 390 self._active_option.act(subjective_state), 391 self._active_option.descriptor.option_id, 392 ) 393 self._active_option = None 394 395 # Even observation → primitive action 0 396 if subjective_state.observation % 2 == 0: 397 return (0, None) 398 399 # Odd observation → first available option, or primitive action 1 400 options = list(self._options.values()) 401 if options: 402 self._active_option = options[0] 403 return ( 404 self._active_option.act(subjective_state), 405 self._active_option.descriptor.option_id, 406 ) 407 return (1, None) 408 409 def clear_active_option(self) -> None: 410 self._active_option = None 411 412 def remove_options(self, option_ids: Sequence[OptionId]) -> None: 413 for oid in option_ids: 414 self._options.pop(oid, None) 415 if ( 416 self._active_option is not None 417 and self._active_option.descriptor.option_id in option_ids 418 ): 419 self._active_option = None 420 421 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 422 for sid in subtask_ids: 423 self._subtasks.pop(sid, None) 424 self._options.pop(f"option:{sid}", None)
Alternates primitive actions and options, creates options from subtasks.
- On even observations: primitive action 0.
- On odd observations with options available: executes the first option.
- On odd observations without options: primitive action 1.
- Options are created 1:1 from ingested subtasks.
355 def update( 356 self, 357 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 358 td_errors: Mapping[GeneralValueFunctionId, float], 359 ) -> None: 360 self.last_td_errors = dict(td_errors)
Update the policy and option learners from an observed transition.
362 def apply_planning_update(self, update: PlanningUpdate[Action]) -> None: 363 self.last_planning_update = update
Integrate planning improvement signals into the policy.
365 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 366 for subtask in subtasks: 367 self._subtasks[subtask.subtask_id] = subtask 368 option_id = f"option:{subtask.subtask_id}" 369 self._options[option_id] = _MinimalOption( 370 OptionDescriptor( 371 option_id=option_id, 372 name=f"Option for {subtask.subtask_id}", 373 subtask_id=subtask.subtask_id, 374 ) 375 )
Feed newly created subtasks into the option learner.
380 def select_action( 381 self, 382 subjective_state: MinimalSubjectiveState, 383 option_stop_threshold: float, 384 ) -> tuple[Action, OptionId | None]: 385 # Check if active option should continue 386 if self._active_option is not None: 387 stop_prob = self._active_option.stop_probability(subjective_state) 388 if stop_prob < option_stop_threshold: 389 return ( 390 self._active_option.act(subjective_state), 391 self._active_option.descriptor.option_id, 392 ) 393 self._active_option = None 394 395 # Even observation → primitive action 0 396 if subjective_state.observation % 2 == 0: 397 return (0, None) 398 399 # Odd observation → first available option, or primitive action 1 400 options = list(self._options.values()) 401 if options: 402 self._active_option = options[0] 403 return ( 404 self._active_option.act(subjective_state), 405 self._active_option.descriptor.option_id, 406 ) 407 return (1, None)
Choose a primitive action, possibly by continuing an active option.
Returns a (primitive_action, active_option_id) pair. When no
option is active, active_option_id is None.
Clear the currently executing option (e.g. at episode boundaries).
412 def remove_options(self, option_ids: Sequence[OptionId]) -> None: 413 for oid in option_ids: 414 self._options.pop(oid, None) 415 if ( 416 self._active_option is not None 417 and self._active_option.descriptor.option_id in option_ids 418 ): 419 self._active_option = None
Remove options by ID (called during curation).
421 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 422 for sid in subtask_ids: 423 self._subtasks.pop(sid, None) 424 self._options.pop(f"option:{sid}", None)
Remove subtasks by ID (called during curation).
432def build_minimal_agent() -> ( 433 OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo] 434): 435 """Construct a fully wired smoke-test OaK agent.""" 436 return OaKAgent( 437 perception=MinimalPerception(), 438 transition_model=MinimalTransitionModel(), 439 value_function=MinimalValueFunction(), 440 reactive_policy=MinimalReactivePolicy(), 441 planning_budget=4, 442 )
Construct a fully wired smoke-test OaK agent.
445def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]: 446 """Run a short smoke episode and return a compact trace.""" 447 world = MinimalWorld(horizon=horizon) 448 agent = build_minimal_agent() 449 step = world.reset() 450 agent.reset() 451 452 trace: list[MinimalTraceStep] = [] 453 454 for _ in range(horizon): 455 result = agent.step(step) 456 action = result.action 457 trace.append( 458 { 459 "subjective_state": result.subjective_state, 460 "action": action, 461 "active_option_id": result.active_option_id, 462 "created_subtasks": [ 463 subtask.subtask_id for subtask in result.created_subtasks 464 ], 465 "planning_budget_used": _planning_budget_used(result.planning_update), 466 } 467 ) 468 step = world.step(action) 469 if step.terminated: 470 break 471 472 return trace
Run a short smoke episode and return a compact trace.
475def run_minimal_training( 476 num_episodes: int = 3, 477 *, 478 horizon: int = 5, 479 average_window: int = 100, 480 solved_threshold: float | None = None, 481) -> list[float]: 482 """Train the minimal smoke agent for a few episodes and return rewards.""" 483 world = MinimalWorld(horizon=horizon) 484 agent = build_minimal_agent() 485 try: 486 return agent.train( 487 world, 488 num_episodes=num_episodes, 489 average_window=average_window, 490 solved_threshold=solved_threshold, 491 ) 492 finally: 493 world.close()
Train the minimal smoke agent for a few episodes and return rewards.