examples.minimal_oak
1from __future__ import annotations 2 3"""Bare-minimum external implementation used to smoke-test the interface. 4 5This module answers a single question: can the current package interfaces be 6instantiated and run through a complete OaK step loop? 7 8The implementation shows the **direct** approach: each of Sutton's four 9modules (Perception, Transition Model, Value Function, Reactive Policy) is 10implemented as a single class. There is no need to use the fine-grained 11component interfaces or the composite wrappers — those exist for projects 12that need more modularity inside each module. 13 14What this module is: 15 16- a tiny integer world 17- a direct observation-to-subjective_state perception with one fixed feature 18- a no-op transition model with trivial one-step planning 19- a simple value tracker with usage counting and no curation 20- a reactive policy that alternates actions and options 21 22What this module is not: 23 24- a trained agent 25- a realistic planner 26- a serious option-learning system 27- a benchmark implementation 28""" 29 30from dataclasses import dataclass 31from typing import Mapping, Sequence, TypedDict 32 33from oak_architecture.agent import OaKAgent 34from oak_architecture.interfaces import ( 35 Perception, 36 ReactivePolicy, 37 TransitionModel, 38 ValueFunction, 39 World, 40) 41from oak_architecture.types import ( 42 CurationDecision, 43 FeatureId, 44 FeatureSpec, 45 GeneralValueFunctionId, 46 OptionDescriptor, 47 OptionId, 48 PlanningUpdate, 49 SubtaskId, 50 SubtaskSpec, 51 TimeStep, 52 Transition, 53 UsageRecord, 54 UtilityRecord, 55) 56 57Observation = int 58Action = int 59 60 61class MinimalInfo(TypedDict, total=False): 62 reset: bool 63 echo_action: Action 64 65 66class MinimalTraceStep(TypedDict): 67 subjective_state: "MinimalSubjectiveState" 68 action: Action 69 active_option_id: OptionId | None 70 created_subtasks: list[SubtaskId] 71 planning_budget_used: int | None 72 73 74@dataclass(slots=True, frozen=True) 75class MinimalSubjectiveState: 76 """Small concrete subjective state used by the smoke implementation.""" 77 78 step_index: int 79 observation: Observation 80 reward: float 81 last_action: Action | None 82 83 84# ───────────────────────────────────────────────────────────────────── 85# Environment 86# ───────────────────────────────────────────────────────────────────── 87 88 89class MinimalWorld(World[Observation, Action, MinimalInfo]): 90 """A toy world that increments an integer observation every step.""" 91 92 def __init__(self, horizon: int = 5) -> None: 93 self.horizon = horizon 94 self.current_step = 0 95 96 def reset(self) -> TimeStep[Observation, MinimalInfo]: 97 self.current_step = 0 98 return TimeStep(observation=0, reward=0.0, info={"reset": True}) 99 100 def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]: 101 self.current_step += 1 102 terminated = self.current_step >= self.horizon 103 reward = 1.0 if action == 1 else 0.0 104 return TimeStep( 105 observation=self.current_step, 106 reward=reward, 107 terminated=terminated, 108 info={"echo_action": action}, 109 ) 110 111 112# ───────────────────────────────────────────────────────────────────── 113# Perception 114# ───────────────────────────────────────────────────────────────────── 115 116 117class MinimalPerception(Perception[Observation, Action, MinimalSubjectiveState]): 118 """Direct observation-to-state mapping with one fixed feature. 119 120 - The subjective state is a thin wrapper around the observation. 121 - One identity feature ("observation") is always present. 122 - No new features are ever proposed. 123 - One subtask is created per feature (deduplicated). 124 """ 125 126 def __init__(self) -> None: 127 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 128 self._features: dict[FeatureId, FeatureSpec] = { 129 "observation": FeatureSpec( 130 feature_id="observation", 131 name="Observation value", 132 description="Identity feature for the integer observation.", 133 ) 134 } 135 self._created_subtask_for: set[FeatureId] = set() 136 137 def reset(self) -> None: 138 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 139 140 def update( 141 self, 142 observation: Observation, 143 reward: float, 144 last_action: Action | None, 145 ) -> MinimalSubjectiveState: 146 self._state = MinimalSubjectiveState( 147 step_index=observation, 148 observation=observation, 149 reward=reward, 150 last_action=last_action, 151 ) 152 return self._state 153 154 def current_subjective_state(self) -> MinimalSubjectiveState: 155 return self._state 156 157 def discover_and_rank_features( 158 self, 159 subjective_state: MinimalSubjectiveState, 160 utility_scores: Sequence[UtilityRecord], 161 feature_budget: int, 162 ) -> Sequence[FeatureId]: 163 # No new features proposed; rank existing ones in insertion order. 164 ids = list(self._features.keys()) 165 return tuple(ids[:feature_budget]) 166 167 def generate_subtasks( 168 self, 169 ranked_feature_ids: Sequence[FeatureId], 170 ) -> Sequence[SubtaskSpec]: 171 created: list[SubtaskSpec] = [] 172 for fid in ranked_feature_ids: 173 if fid in self._created_subtask_for: 174 continue 175 self._created_subtask_for.add(fid) 176 created.append( 177 SubtaskSpec( 178 subtask_id=f"subtask:{fid}", 179 name=f"Track {fid}", 180 feature_id=fid, 181 ) 182 ) 183 return tuple(created) 184 185 def list_features(self) -> Sequence[FeatureSpec]: 186 return tuple(self._features.values()) 187 188 def remove_features(self, feature_ids: Sequence[FeatureId]) -> None: 189 for fid in feature_ids: 190 self._features.pop(fid, None) 191 192 193# ───────────────────────────────────────────────────────────────────── 194# Transition Model 195# ───────────────────────────────────────────────────────────────────── 196 197 198class MinimalTransitionModel( 199 TransitionModel[MinimalSubjectiveState, Action, MinimalInfo] 200): 201 """Trivial world model with one-step lookahead planning. 202 203 - No real model learning (update is a no-op). 204 - No option models. 205 - Planning calls predict once and returns value targets. 206 """ 207 208 def update( 209 self, 210 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 211 ) -> None: 212 pass 213 214 def integrate_option_models(self) -> None: 215 pass 216 217 def plan( 218 self, 219 subjective_state: MinimalSubjectiveState, 220 value_function: ValueFunction[ 221 MinimalSubjectiveState, Action, MinimalInfo 222 ], 223 budget: int, 224 ) -> PlanningUpdate[Action]: 225 return PlanningUpdate( 226 value_targets=value_function.predict(subjective_state), 227 policy_targets={"preferred_action": 0}, 228 search_statistics={"budget_used": budget}, 229 ) 230 231 def remove_option_models(self, option_ids: Sequence[OptionId]) -> None: 232 pass 233 234 235# ───────────────────────────────────────────────────────────────────── 236# Value Function 237# ───────────────────────────────────────────────────────────────────── 238 239 240class MinimalValueFunction( 241 ValueFunction[MinimalSubjectiveState, Action, MinimalInfo] 242): 243 """Stores latest reward as a value, counts usage, never curates. 244 245 - One implicit value learner ("main") that stores the latest reward. 246 - Usage records are accumulated for utility scoring. 247 - Curation always returns an empty decision (no pruning). 248 """ 249 250 def __init__(self) -> None: 251 self._value: float = 0.0 252 self._usage_records: list[UsageRecord] = [] 253 254 def update( 255 self, 256 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 257 ) -> Mapping[GeneralValueFunctionId, float]: 258 self._value = transition.reward 259 return {"main": 0.0} 260 261 def predict( 262 self, 263 subjective_state: MinimalSubjectiveState, 264 ) -> Mapping[GeneralValueFunctionId, float]: 265 return {"main": self._value} 266 267 def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None: 268 self._usage_records.extend(usage_records) 269 270 def utility_scores(self) -> Sequence[UtilityRecord]: 271 totals: dict[tuple[str, str], float] = {} 272 latest: dict[tuple[str, str], UsageRecord] = {} 273 for record in self._usage_records: 274 key = (record.kind.value, record.component_id) 275 totals[key] = totals.get(key, 0.0) + record.amount 276 latest[key] = record 277 return tuple( 278 UtilityRecord( 279 kind=record.kind, 280 component_id=record.component_id, 281 utility=totals[key], 282 ) 283 for key, record in latest.items() 284 ) 285 286 def curate(self) -> CurationDecision: 287 return CurationDecision() 288 289 def remove( 290 self, 291 general_value_function_ids: Sequence[GeneralValueFunctionId], 292 ) -> None: 293 pass 294 295 296# ───────────────────────────────────────────────────────────────────── 297# Reactive Policy 298# ───────────────────────────────────────────────────────────────────── 299 300 301@dataclass 302class _MinimalOption: 303 """Trivial option that always emits action=1 and stops immediately.""" 304 305 _descriptor: OptionDescriptor 306 _action: Action = 1 307 308 @property 309 def descriptor(self) -> OptionDescriptor: 310 return self._descriptor 311 312 def is_available(self, subjective_state: MinimalSubjectiveState) -> bool: 313 return True 314 315 def act(self, subjective_state: MinimalSubjectiveState) -> Action: 316 return self._action 317 318 def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float: 319 return 1.0 320 321 322class MinimalReactivePolicy( 323 ReactivePolicy[MinimalSubjectiveState, Action, MinimalInfo] 324): 325 """Alternates primitive actions and options, creates options from subtasks. 326 327 - On even observations: primitive action 0. 328 - On odd observations with options available: executes the first option. 329 - On odd observations without options: primitive action 1. 330 - Options are created 1:1 from ingested subtasks. 331 """ 332 333 def __init__(self) -> None: 334 self._active_option: _MinimalOption | None = None 335 self._options: dict[OptionId, _MinimalOption] = {} 336 self._subtasks: dict[SubtaskId, SubtaskSpec] = {} 337 self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {} 338 self.last_planning_update: PlanningUpdate[Action] | None = None 339 340 def update( 341 self, 342 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 343 td_errors: Mapping[GeneralValueFunctionId, float], 344 ) -> None: 345 self.last_td_errors = dict(td_errors) 346 347 def apply_planning_update(self, update: PlanningUpdate[Action]) -> None: 348 self.last_planning_update = update 349 350 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 351 for subtask in subtasks: 352 self._subtasks[subtask.subtask_id] = subtask 353 option_id = f"option:{subtask.subtask_id}" 354 self._options[option_id] = _MinimalOption( 355 OptionDescriptor( 356 option_id=option_id, 357 name=f"Option for {subtask.subtask_id}", 358 subtask_id=subtask.subtask_id, 359 ) 360 ) 361 362 def integrate_options(self) -> None: 363 pass # options already registered in ingest_subtasks 364 365 def select_action( 366 self, 367 subjective_state: MinimalSubjectiveState, 368 option_stop_threshold: float, 369 ) -> tuple[Action, OptionId | None]: 370 # Check if active option should continue 371 if self._active_option is not None: 372 stop_prob = self._active_option.stop_probability(subjective_state) 373 if stop_prob < option_stop_threshold: 374 return ( 375 self._active_option.act(subjective_state), 376 self._active_option.descriptor.option_id, 377 ) 378 self._active_option = None 379 380 # Even observation → primitive action 0 381 if subjective_state.observation % 2 == 0: 382 return (0, None) 383 384 # Odd observation → first available option, or primitive action 1 385 options = list(self._options.values()) 386 if options: 387 self._active_option = options[0] 388 return ( 389 self._active_option.act(subjective_state), 390 self._active_option.descriptor.option_id, 391 ) 392 return (1, None) 393 394 def clear_active_option(self) -> None: 395 self._active_option = None 396 397 def remove_options(self, option_ids: Sequence[OptionId]) -> None: 398 for oid in option_ids: 399 self._options.pop(oid, None) 400 if ( 401 self._active_option is not None 402 and self._active_option.descriptor.option_id in option_ids 403 ): 404 self._active_option = None 405 406 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 407 for sid in subtask_ids: 408 self._subtasks.pop(sid, None) 409 self._options.pop(f"option:{sid}", None) 410 411 412# ───────────────────────────────────────────────────────────────────── 413# Wiring 414# ───────────────────────────────────────────────────────────────────── 415 416 417def build_minimal_agent() -> ( 418 OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo] 419): 420 """Construct a fully wired smoke-test OaK agent.""" 421 return OaKAgent( 422 perception=MinimalPerception(), 423 transition_model=MinimalTransitionModel(), 424 value_function=MinimalValueFunction(), 425 reactive_policy=MinimalReactivePolicy(), 426 planning_budget=4, 427 ) 428 429 430def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]: 431 """Run a short smoke episode and return a compact trace.""" 432 world = MinimalWorld(horizon=horizon) 433 agent = build_minimal_agent() 434 step = world.reset() 435 agent.reset() 436 437 trace: list[MinimalTraceStep] = [] 438 439 for _ in range(horizon): 440 result = agent.step(step) 441 action = result.action 442 trace.append( 443 { 444 "subjective_state": result.subjective_state, 445 "action": action, 446 "active_option_id": result.active_option_id, 447 "created_subtasks": [ 448 subtask.subtask_id for subtask in result.created_subtasks 449 ], 450 "planning_budget_used": ( 451 result.planning_update.search_statistics["budget_used"] 452 if result.planning_update is not None 453 else None 454 ), 455 } 456 ) 457 step = world.step(action) 458 if step.terminated: 459 break 460 461 return trace
67class MinimalTraceStep(TypedDict): 68 subjective_state: "MinimalSubjectiveState" 69 action: Action 70 active_option_id: OptionId | None 71 created_subtasks: list[SubtaskId] 72 planning_budget_used: int | None
75@dataclass(slots=True, frozen=True) 76class MinimalSubjectiveState: 77 """Small concrete subjective state used by the smoke implementation.""" 78 79 step_index: int 80 observation: Observation 81 reward: float 82 last_action: Action | None
Small concrete subjective state used by the smoke implementation.
90class MinimalWorld(World[Observation, Action, MinimalInfo]): 91 """A toy world that increments an integer observation every step.""" 92 93 def __init__(self, horizon: int = 5) -> None: 94 self.horizon = horizon 95 self.current_step = 0 96 97 def reset(self) -> TimeStep[Observation, MinimalInfo]: 98 self.current_step = 0 99 return TimeStep(observation=0, reward=0.0, info={"reset": True}) 100 101 def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]: 102 self.current_step += 1 103 terminated = self.current_step >= self.horizon 104 reward = 1.0 if action == 1 else 0.0 105 return TimeStep( 106 observation=self.current_step, 107 reward=reward, 108 terminated=terminated, 109 info={"echo_action": action}, 110 )
A toy world that increments an integer observation every step.
101 def step(self, action: Action) -> TimeStep[Observation, MinimalInfo]: 102 self.current_step += 1 103 terminated = self.current_step >= self.horizon 104 reward = 1.0 if action == 1 else 0.0 105 return TimeStep( 106 observation=self.current_step, 107 reward=reward, 108 terminated=terminated, 109 info={"echo_action": action}, 110 )
118class MinimalPerception(Perception[Observation, Action, MinimalSubjectiveState]): 119 """Direct observation-to-state mapping with one fixed feature. 120 121 - The subjective state is a thin wrapper around the observation. 122 - One identity feature ("observation") is always present. 123 - No new features are ever proposed. 124 - One subtask is created per feature (deduplicated). 125 """ 126 127 def __init__(self) -> None: 128 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 129 self._features: dict[FeatureId, FeatureSpec] = { 130 "observation": FeatureSpec( 131 feature_id="observation", 132 name="Observation value", 133 description="Identity feature for the integer observation.", 134 ) 135 } 136 self._created_subtask_for: set[FeatureId] = set() 137 138 def reset(self) -> None: 139 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 140 141 def update( 142 self, 143 observation: Observation, 144 reward: float, 145 last_action: Action | None, 146 ) -> MinimalSubjectiveState: 147 self._state = MinimalSubjectiveState( 148 step_index=observation, 149 observation=observation, 150 reward=reward, 151 last_action=last_action, 152 ) 153 return self._state 154 155 def current_subjective_state(self) -> MinimalSubjectiveState: 156 return self._state 157 158 def discover_and_rank_features( 159 self, 160 subjective_state: MinimalSubjectiveState, 161 utility_scores: Sequence[UtilityRecord], 162 feature_budget: int, 163 ) -> Sequence[FeatureId]: 164 # No new features proposed; rank existing ones in insertion order. 165 ids = list(self._features.keys()) 166 return tuple(ids[:feature_budget]) 167 168 def generate_subtasks( 169 self, 170 ranked_feature_ids: Sequence[FeatureId], 171 ) -> Sequence[SubtaskSpec]: 172 created: list[SubtaskSpec] = [] 173 for fid in ranked_feature_ids: 174 if fid in self._created_subtask_for: 175 continue 176 self._created_subtask_for.add(fid) 177 created.append( 178 SubtaskSpec( 179 subtask_id=f"subtask:{fid}", 180 name=f"Track {fid}", 181 feature_id=fid, 182 ) 183 ) 184 return tuple(created) 185 186 def list_features(self) -> Sequence[FeatureSpec]: 187 return tuple(self._features.values()) 188 189 def remove_features(self, feature_ids: Sequence[FeatureId]) -> None: 190 for fid in feature_ids: 191 self._features.pop(fid, None)
Direct observation-to-state mapping with one fixed feature.
- The subjective state is a thin wrapper around the observation.
- One identity feature ("observation") is always present.
- No new features are ever proposed.
- One subtask is created per feature (deduplicated).
141 def update( 142 self, 143 observation: Observation, 144 reward: float, 145 last_action: Action | None, 146 ) -> MinimalSubjectiveState: 147 self._state = MinimalSubjectiveState( 148 step_index=observation, 149 observation=observation, 150 reward=reward, 151 last_action=last_action, 152 ) 153 return self._state
Process a new observation and return the updated subjective state.
Return the most recently computed subjective state.
158 def discover_and_rank_features( 159 self, 160 subjective_state: MinimalSubjectiveState, 161 utility_scores: Sequence[UtilityRecord], 162 feature_budget: int, 163 ) -> Sequence[FeatureId]: 164 # No new features proposed; rank existing ones in insertion order. 165 ids = list(self._features.keys()) 166 return tuple(ids[:feature_budget])
Propose new features, integrate them, and return the top-ranked IDs.
A typical implementation:
- Proposes candidate features from the current subjective state.
- Adds accepted candidates to its internal feature store.
- Ranks all features using the provided utility scores.
- Returns the top feature IDs (up to feature_budget).
168 def generate_subtasks( 169 self, 170 ranked_feature_ids: Sequence[FeatureId], 171 ) -> Sequence[SubtaskSpec]: 172 created: list[SubtaskSpec] = [] 173 for fid in ranked_feature_ids: 174 if fid in self._created_subtask_for: 175 continue 176 self._created_subtask_for.add(fid) 177 created.append( 178 SubtaskSpec( 179 subtask_id=f"subtask:{fid}", 180 name=f"Track {fid}", 181 feature_id=fid, 182 ) 183 ) 184 return tuple(created)
Turn ranked feature IDs into subtask specifications.
199class MinimalTransitionModel( 200 TransitionModel[MinimalSubjectiveState, Action, MinimalInfo] 201): 202 """Trivial world model with one-step lookahead planning. 203 204 - No real model learning (update is a no-op). 205 - No option models. 206 - Planning calls predict once and returns value targets. 207 """ 208 209 def update( 210 self, 211 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 212 ) -> None: 213 pass 214 215 def integrate_option_models(self) -> None: 216 pass 217 218 def plan( 219 self, 220 subjective_state: MinimalSubjectiveState, 221 value_function: ValueFunction[ 222 MinimalSubjectiveState, Action, MinimalInfo 223 ], 224 budget: int, 225 ) -> PlanningUpdate[Action]: 226 return PlanningUpdate( 227 value_targets=value_function.predict(subjective_state), 228 policy_targets={"preferred_action": 0}, 229 search_statistics={"budget_used": budget}, 230 ) 231 232 def remove_option_models(self, option_ids: Sequence[OptionId]) -> None: 233 pass
Trivial world model with one-step lookahead planning.
- No real model learning (update is a no-op).
- No option models.
- Planning calls predict once and returns value targets.
209 def update( 210 self, 211 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 212 ) -> None: 213 pass
Learn from an observed transition.
This should update both the world model and any option-model learners.
Export learned option models and integrate them into the world model.
Called after option learning so that planning reasons over fresh models.
218 def plan( 219 self, 220 subjective_state: MinimalSubjectiveState, 221 value_function: ValueFunction[ 222 MinimalSubjectiveState, Action, MinimalInfo 223 ], 224 budget: int, 225 ) -> PlanningUpdate[Action]: 226 return PlanningUpdate( 227 value_targets=value_function.predict(subjective_state), 228 policy_targets={"preferred_action": 0}, 229 search_statistics={"budget_used": budget}, 230 )
Run bounded planning and return improvement signals.
The planner uses the internal world model together with the supplied value_function (for state evaluation) to produce value targets, policy targets, or search statistics.
241class MinimalValueFunction( 242 ValueFunction[MinimalSubjectiveState, Action, MinimalInfo] 243): 244 """Stores latest reward as a value, counts usage, never curates. 245 246 - One implicit value learner ("main") that stores the latest reward. 247 - Usage records are accumulated for utility scoring. 248 - Curation always returns an empty decision (no pruning). 249 """ 250 251 def __init__(self) -> None: 252 self._value: float = 0.0 253 self._usage_records: list[UsageRecord] = [] 254 255 def update( 256 self, 257 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 258 ) -> Mapping[GeneralValueFunctionId, float]: 259 self._value = transition.reward 260 return {"main": 0.0} 261 262 def predict( 263 self, 264 subjective_state: MinimalSubjectiveState, 265 ) -> Mapping[GeneralValueFunctionId, float]: 266 return {"main": self._value} 267 268 def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None: 269 self._usage_records.extend(usage_records) 270 271 def utility_scores(self) -> Sequence[UtilityRecord]: 272 totals: dict[tuple[str, str], float] = {} 273 latest: dict[tuple[str, str], UsageRecord] = {} 274 for record in self._usage_records: 275 key = (record.kind.value, record.component_id) 276 totals[key] = totals.get(key, 0.0) + record.amount 277 latest[key] = record 278 return tuple( 279 UtilityRecord( 280 kind=record.kind, 281 component_id=record.component_id, 282 utility=totals[key], 283 ) 284 for key, record in latest.items() 285 ) 286 287 def curate(self) -> CurationDecision: 288 return CurationDecision() 289 290 def remove( 291 self, 292 general_value_function_ids: Sequence[GeneralValueFunctionId], 293 ) -> None: 294 pass
Stores latest reward as a value, counts usage, never curates.
- One implicit value learner ("main") that stores the latest reward.
- Usage records are accumulated for utility scoring.
- Curation always returns an empty decision (no pruning).
255 def update( 256 self, 257 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 258 ) -> Mapping[GeneralValueFunctionId, float]: 259 self._value = transition.reward 260 return {"main": 0.0}
Learn from a transition and return TD-error signals.
262 def predict( 263 self, 264 subjective_state: MinimalSubjectiveState, 265 ) -> Mapping[GeneralValueFunctionId, float]: 266 return {"main": self._value}
Predict values for the given subjective state.
268 def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None: 269 self._usage_records.extend(usage_records)
Record usage evidence for utility assessment.
271 def utility_scores(self) -> Sequence[UtilityRecord]: 272 totals: dict[tuple[str, str], float] = {} 273 latest: dict[tuple[str, str], UsageRecord] = {} 274 for record in self._usage_records: 275 key = (record.kind.value, record.component_id) 276 totals[key] = totals.get(key, 0.0) + record.amount 277 latest[key] = record 278 return tuple( 279 UtilityRecord( 280 kind=record.kind, 281 component_id=record.component_id, 282 utility=totals[key], 283 ) 284 for key, record in latest.items() 285 )
Return current utility estimates for all tracked structures.
323class MinimalReactivePolicy( 324 ReactivePolicy[MinimalSubjectiveState, Action, MinimalInfo] 325): 326 """Alternates primitive actions and options, creates options from subtasks. 327 328 - On even observations: primitive action 0. 329 - On odd observations with options available: executes the first option. 330 - On odd observations without options: primitive action 1. 331 - Options are created 1:1 from ingested subtasks. 332 """ 333 334 def __init__(self) -> None: 335 self._active_option: _MinimalOption | None = None 336 self._options: dict[OptionId, _MinimalOption] = {} 337 self._subtasks: dict[SubtaskId, SubtaskSpec] = {} 338 self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {} 339 self.last_planning_update: PlanningUpdate[Action] | None = None 340 341 def update( 342 self, 343 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 344 td_errors: Mapping[GeneralValueFunctionId, float], 345 ) -> None: 346 self.last_td_errors = dict(td_errors) 347 348 def apply_planning_update(self, update: PlanningUpdate[Action]) -> None: 349 self.last_planning_update = update 350 351 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 352 for subtask in subtasks: 353 self._subtasks[subtask.subtask_id] = subtask 354 option_id = f"option:{subtask.subtask_id}" 355 self._options[option_id] = _MinimalOption( 356 OptionDescriptor( 357 option_id=option_id, 358 name=f"Option for {subtask.subtask_id}", 359 subtask_id=subtask.subtask_id, 360 ) 361 ) 362 363 def integrate_options(self) -> None: 364 pass # options already registered in ingest_subtasks 365 366 def select_action( 367 self, 368 subjective_state: MinimalSubjectiveState, 369 option_stop_threshold: float, 370 ) -> tuple[Action, OptionId | None]: 371 # Check if active option should continue 372 if self._active_option is not None: 373 stop_prob = self._active_option.stop_probability(subjective_state) 374 if stop_prob < option_stop_threshold: 375 return ( 376 self._active_option.act(subjective_state), 377 self._active_option.descriptor.option_id, 378 ) 379 self._active_option = None 380 381 # Even observation → primitive action 0 382 if subjective_state.observation % 2 == 0: 383 return (0, None) 384 385 # Odd observation → first available option, or primitive action 1 386 options = list(self._options.values()) 387 if options: 388 self._active_option = options[0] 389 return ( 390 self._active_option.act(subjective_state), 391 self._active_option.descriptor.option_id, 392 ) 393 return (1, None) 394 395 def clear_active_option(self) -> None: 396 self._active_option = None 397 398 def remove_options(self, option_ids: Sequence[OptionId]) -> None: 399 for oid in option_ids: 400 self._options.pop(oid, None) 401 if ( 402 self._active_option is not None 403 and self._active_option.descriptor.option_id in option_ids 404 ): 405 self._active_option = None 406 407 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 408 for sid in subtask_ids: 409 self._subtasks.pop(sid, None) 410 self._options.pop(f"option:{sid}", None)
Alternates primitive actions and options, creates options from subtasks.
- On even observations: primitive action 0.
- On odd observations with options available: executes the first option.
- On odd observations without options: primitive action 1.
- Options are created 1:1 from ingested subtasks.
341 def update( 342 self, 343 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 344 td_errors: Mapping[GeneralValueFunctionId, float], 345 ) -> None: 346 self.last_td_errors = dict(td_errors)
Update the policy and option learners from an observed transition.
348 def apply_planning_update(self, update: PlanningUpdate[Action]) -> None: 349 self.last_planning_update = update
Integrate planning improvement signals into the policy.
351 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 352 for subtask in subtasks: 353 self._subtasks[subtask.subtask_id] = subtask 354 option_id = f"option:{subtask.subtask_id}" 355 self._options[option_id] = _MinimalOption( 356 OptionDescriptor( 357 option_id=option_id, 358 name=f"Option for {subtask.subtask_id}", 359 subtask_id=subtask.subtask_id, 360 ) 361 )
Feed newly created subtasks into the option learner.
366 def select_action( 367 self, 368 subjective_state: MinimalSubjectiveState, 369 option_stop_threshold: float, 370 ) -> tuple[Action, OptionId | None]: 371 # Check if active option should continue 372 if self._active_option is not None: 373 stop_prob = self._active_option.stop_probability(subjective_state) 374 if stop_prob < option_stop_threshold: 375 return ( 376 self._active_option.act(subjective_state), 377 self._active_option.descriptor.option_id, 378 ) 379 self._active_option = None 380 381 # Even observation → primitive action 0 382 if subjective_state.observation % 2 == 0: 383 return (0, None) 384 385 # Odd observation → first available option, or primitive action 1 386 options = list(self._options.values()) 387 if options: 388 self._active_option = options[0] 389 return ( 390 self._active_option.act(subjective_state), 391 self._active_option.descriptor.option_id, 392 ) 393 return (1, None)
Choose a primitive action, possibly by continuing an active option.
Returns a (primitive_action, active_option_id) pair. When no
option is active, active_option_id is None.
Clear the currently executing option (e.g. at episode boundaries).
398 def remove_options(self, option_ids: Sequence[OptionId]) -> None: 399 for oid in option_ids: 400 self._options.pop(oid, None) 401 if ( 402 self._active_option is not None 403 and self._active_option.descriptor.option_id in option_ids 404 ): 405 self._active_option = None
Remove options by ID (called during curation).
407 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 408 for sid in subtask_ids: 409 self._subtasks.pop(sid, None) 410 self._options.pop(f"option:{sid}", None)
Remove subtasks by ID (called during curation).
418def build_minimal_agent() -> ( 419 OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo] 420): 421 """Construct a fully wired smoke-test OaK agent.""" 422 return OaKAgent( 423 perception=MinimalPerception(), 424 transition_model=MinimalTransitionModel(), 425 value_function=MinimalValueFunction(), 426 reactive_policy=MinimalReactivePolicy(), 427 planning_budget=4, 428 )
Construct a fully wired smoke-test OaK agent.
431def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]: 432 """Run a short smoke episode and return a compact trace.""" 433 world = MinimalWorld(horizon=horizon) 434 agent = build_minimal_agent() 435 step = world.reset() 436 agent.reset() 437 438 trace: list[MinimalTraceStep] = [] 439 440 for _ in range(horizon): 441 result = agent.step(step) 442 action = result.action 443 trace.append( 444 { 445 "subjective_state": result.subjective_state, 446 "action": action, 447 "active_option_id": result.active_option_id, 448 "created_subtasks": [ 449 subtask.subtask_id for subtask in result.created_subtasks 450 ], 451 "planning_budget_used": ( 452 result.planning_update.search_statistics["budget_used"] 453 if result.planning_update is not None 454 else None 455 ), 456 } 457 ) 458 step = world.step(action) 459 if step.terminated: 460 break 461 462 return trace
Run a short smoke episode and return a compact trace.