examples.minimal_oak_fine_grained
1from __future__ import annotations 2 3"""Bare-minimum OaK example built from fine-grained components. 4 5This mirrors `examples/minimal_oak.py`, but instead of implementing the four 6main OaK interfaces directly, it assembles them from the optional fine-grained 7building blocks in `oak_architecture.fine_grained`. 8 9The behavior is intentionally the same as the direct example: 10 11- a tiny integer world 12- a direct observation-to-subjective_state state builder 13- one fixed identity feature 14- no-op model learning with trivial planning 15- a simple value tracker with usage counting and no curation 16- a reactive policy that alternates actions and options 17""" 18 19from dataclasses import dataclass 20from typing import Mapping, Sequence 21 22from oak_architecture.agent import OaKAgent 23from oak_architecture.fine_grained import ( 24 ActionSelector, 25 CompositePerception, 26 CompositeReactivePolicy, 27 CompositeTransitionModel, 28 CompositeValueFunction, 29 Curator, 30 FeatureBank, 31 FeatureConstructor, 32 FeatureRanker, 33 GeneralValueFunctionLearner, 34 Option, 35 OptionLearner, 36 OptionLibrary, 37 OptionModel, 38 OptionModelLearner, 39 Planner, 40 StateBuilder, 41 SubtaskGenerator, 42 UtilityAssessor, 43 ValueEstimator, 44 WorldModel, 45) 46from oak_architecture.types import ( 47 CurationDecision, 48 FeatureCandidate, 49 FeatureId, 50 FeatureSpec, 51 GeneralValueFunctionId, 52 ModelPrediction, 53 OptionDescriptor, 54 OptionId, 55 PlanningUpdate, 56 PolicyDecision, 57 SubtaskId, 58 SubtaskSpec, 59 Transition, 60 UsageRecord, 61 UtilityRecord, 62) 63 64from .minimal_oak import ( 65 Action, 66 MinimalInfo, 67 MinimalSubjectiveState, 68 MinimalTraceStep, 69 MinimalWorld, 70 Observation, 71) 72 73 74# ───────────────────────────────────────────────────────────────────── 75# Perception components 76# ───────────────────────────────────────────────────────────────────── 77 78 79class MinimalStateBuilder( 80 StateBuilder[Observation, Action, MinimalSubjectiveState] 81): 82 """Direct observation-to-state mapping.""" 83 84 def __init__(self) -> None: 85 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 86 87 def reset(self) -> None: 88 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 89 90 def update( 91 self, 92 observation: Observation, 93 reward: float, 94 last_action: Action | None, 95 ) -> MinimalSubjectiveState: 96 self._state = MinimalSubjectiveState( 97 step_index=observation, 98 observation=observation, 99 reward=reward, 100 last_action=last_action, 101 ) 102 return self._state 103 104 def current_subjective_state(self) -> MinimalSubjectiveState: 105 return self._state 106 107 108class MinimalFeatureBank(FeatureBank[MinimalSubjectiveState]): 109 """Stores one fixed identity feature.""" 110 111 def __init__(self) -> None: 112 self._features: dict[FeatureId, FeatureSpec] = { 113 "observation": FeatureSpec( 114 feature_id="observation", 115 name="Observation value", 116 description="Identity feature for the integer observation.", 117 ) 118 } 119 120 def list_features(self) -> Sequence[FeatureSpec]: 121 return tuple(self._features.values()) 122 123 def activations( 124 self, 125 subjective_state: MinimalSubjectiveState, 126 ) -> Mapping[FeatureId, float]: 127 return {"observation": float(subjective_state.observation)} 128 129 def add_candidates( 130 self, candidates: Sequence[FeatureCandidate] 131 ) -> Sequence[FeatureSpec]: 132 added: list[FeatureSpec] = [] 133 for candidate in candidates: 134 feature = FeatureSpec( 135 feature_id=candidate.feature_id, 136 name=candidate.name, 137 description=candidate.description, 138 metadata=candidate.metadata, 139 ) 140 self._features[feature.feature_id] = feature 141 added.append(feature) 142 return tuple(added) 143 144 def remove(self, feature_ids: Sequence[FeatureId]) -> None: 145 for feature_id in feature_ids: 146 self._features.pop(feature_id, None) 147 148 149class MinimalFeatureConstructor(FeatureConstructor[MinimalSubjectiveState]): 150 """Never proposes new features.""" 151 152 def propose( 153 self, 154 subjective_state: MinimalSubjectiveState, 155 active_features: Sequence[FeatureSpec], 156 ) -> Sequence[FeatureCandidate]: 157 return () 158 159 160class MinimalFeatureRanker(FeatureRanker): 161 """Ranks features in their existing order.""" 162 163 def rank( 164 self, 165 features: Sequence[FeatureSpec], 166 utilities: Sequence[UtilityRecord], 167 limit: int | None = None, 168 ) -> Sequence[FeatureId]: 169 feature_ids = [feature.feature_id for feature in features] 170 if limit is None: 171 return tuple(feature_ids) 172 return tuple(feature_ids[:limit]) 173 174 175class MinimalSubtaskGenerator(SubtaskGenerator[MinimalSubjectiveState]): 176 """Creates at most one subtask per feature.""" 177 178 def __init__(self) -> None: 179 self._created_subtask_for: set[FeatureId] = set() 180 181 def generate( 182 self, 183 ranked_feature_ids: Sequence[FeatureId], 184 feature_bank: FeatureBank[MinimalSubjectiveState], 185 ) -> Sequence[SubtaskSpec]: 186 created: list[SubtaskSpec] = [] 187 feature_specs = { 188 feature.feature_id: feature for feature in feature_bank.list_features() 189 } 190 for feature_id in ranked_feature_ids: 191 if feature_id in self._created_subtask_for: 192 continue 193 self._created_subtask_for.add(feature_id) 194 feature = feature_specs[feature_id] 195 created.append( 196 SubtaskSpec( 197 subtask_id=f"subtask:{feature_id}", 198 name=f"Track {feature.name}", 199 feature_id=feature_id, 200 ) 201 ) 202 return tuple(created) 203 204 205# ───────────────────────────────────────────────────────────────────── 206# Transition-model components 207# ───────────────────────────────────────────────────────────────────── 208 209 210class MinimalWorldModel(WorldModel[MinimalSubjectiveState, Action, MinimalInfo]): 211 """Trivial planner-facing model.""" 212 213 def update( 214 self, 215 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 216 ) -> None: 217 pass 218 219 def predict_action( 220 self, 221 subjective_state: MinimalSubjectiveState, 222 action: Action, 223 ) -> ModelPrediction[MinimalSubjectiveState]: 224 return ModelPrediction( 225 predicted_subjective_state=subjective_state, 226 cumulative_reward=0.0, 227 steps=1, 228 ) 229 230 def predict_option( 231 self, 232 subjective_state: MinimalSubjectiveState, 233 option_id: OptionId, 234 ) -> ModelPrediction[MinimalSubjectiveState]: 235 return ModelPrediction( 236 predicted_subjective_state=subjective_state, 237 cumulative_reward=0.0, 238 steps=1, 239 ) 240 241 def add_or_replace_option_models( 242 self, models: Sequence[OptionModel[MinimalSubjectiveState]] 243 ) -> None: 244 pass 245 246 def remove_option_models(self, option_ids: Sequence[OptionId]) -> None: 247 pass 248 249 250class MinimalOptionModelLearner( 251 OptionModelLearner[MinimalSubjectiveState, Action, MinimalInfo] 252): 253 """No-op option-model learner.""" 254 255 def update( 256 self, 257 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 258 ) -> None: 259 pass 260 261 def export_models(self) -> Sequence[OptionModel[MinimalSubjectiveState]]: 262 return () 263 264 265class MinimalPlanner(Planner[MinimalSubjectiveState, Action, MinimalInfo]): 266 """Returns one-step value targets without real search.""" 267 268 def plan_step( 269 self, 270 subjective_state: MinimalSubjectiveState, 271 model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo], 272 value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo], 273 budget: int, 274 ) -> PlanningUpdate[Action]: 275 return PlanningUpdate( 276 value_targets=value_function.predict(subjective_state), 277 policy_targets={"preferred_action": 0}, 278 search_statistics={"budget_used": budget}, 279 ) 280 281 282# ───────────────────────────────────────────────────────────────────── 283# Value-function components 284# ───────────────────────────────────────────────────────────────────── 285 286 287class MinimalValueEstimator( 288 ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo] 289): 290 """Stores latest reward as the only value estimate.""" 291 292 def __init__(self) -> None: 293 self._value: float = 0.0 294 295 def list_general_value_functions( 296 self, 297 ) -> Sequence[ 298 GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo] 299 ]: 300 return () 301 302 def predict( 303 self, 304 subjective_state: MinimalSubjectiveState, 305 ) -> Mapping[GeneralValueFunctionId, float]: 306 return {"main": self._value} 307 308 def update( 309 self, 310 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 311 ) -> Mapping[GeneralValueFunctionId, float]: 312 self._value = transition.reward 313 return {"main": 0.0} 314 315 def add_or_replace( 316 self, 317 learner: GeneralValueFunctionLearner[ 318 MinimalSubjectiveState, Action, MinimalInfo 319 ], 320 ) -> None: 321 pass 322 323 def remove( 324 self, 325 general_value_function_ids: Sequence[GeneralValueFunctionId], 326 ) -> None: 327 pass 328 329 330class MinimalUtilityAssessor(UtilityAssessor): 331 """Aggregates usage records into simple counts.""" 332 333 def __init__(self) -> None: 334 self._usage_records: list[UsageRecord] = [] 335 336 def observe(self, usage: Sequence[UsageRecord]) -> None: 337 self._usage_records.extend(usage) 338 339 def scores(self) -> Sequence[UtilityRecord]: 340 totals: dict[tuple[str, str], float] = {} 341 latest: dict[tuple[str, str], UsageRecord] = {} 342 for record in self._usage_records: 343 key = (record.kind.value, record.component_id) 344 totals[key] = totals.get(key, 0.0) + record.amount 345 latest[key] = record 346 return tuple( 347 UtilityRecord( 348 kind=record.kind, 349 component_id=record.component_id, 350 utility=totals[key], 351 ) 352 for key, record in latest.items() 353 ) 354 355 356class MinimalCurator(Curator): 357 """Never prunes.""" 358 359 def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision: 360 return CurationDecision() 361 362 363# ───────────────────────────────────────────────────────────────────── 364# Reactive-policy components 365# ───────────────────────────────────────────────────────────────────── 366 367 368@dataclass 369class MinimalOption(Option[MinimalSubjectiveState, Action]): 370 """Trivial option that always emits action=1 and stops immediately.""" 371 372 _descriptor: OptionDescriptor 373 _action: Action = 1 374 375 @property 376 def descriptor(self) -> OptionDescriptor: 377 return self._descriptor 378 379 def is_available(self, subjective_state: MinimalSubjectiveState) -> bool: 380 return True 381 382 def act(self, subjective_state: MinimalSubjectiveState) -> Action: 383 return self._action 384 385 def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float: 386 return 1.0 387 388 389class MinimalActionSelector(ActionSelector[MinimalSubjectiveState, Action]): 390 """Alternates primitive actions and option selection.""" 391 392 def __init__(self) -> None: 393 self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {} 394 self.last_planning_update: PlanningUpdate[Action] | None = None 395 396 def decide( 397 self, 398 subjective_state: MinimalSubjectiveState, 399 active_option: Option[MinimalSubjectiveState, Action] | None, 400 available_options: Sequence[Option[MinimalSubjectiveState, Action]], 401 ) -> PolicyDecision[Action]: 402 if subjective_state.observation % 2 == 0: 403 return PolicyDecision(action=0) 404 if available_options: 405 return PolicyDecision(option_id=available_options[0].descriptor.option_id) 406 return PolicyDecision(action=1) 407 408 def update_from_values( 409 self, 410 subjective_state: MinimalSubjectiveState, 411 td_errors: Mapping[GeneralValueFunctionId, float], 412 ) -> None: 413 self.last_td_errors = dict(td_errors) 414 415 def apply_planning_update(self, update: PlanningUpdate[Action]) -> None: 416 self.last_planning_update = update 417 418 419class MinimalOptionLibrary(OptionLibrary[MinimalSubjectiveState, Action]): 420 """Stores learned options.""" 421 422 def __init__(self) -> None: 423 self._options: dict[OptionId, Option[MinimalSubjectiveState, Action]] = {} 424 425 def list_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]: 426 return tuple(self._options.values()) 427 428 def get(self, option_id: OptionId) -> Option[MinimalSubjectiveState, Action]: 429 return self._options[option_id] 430 431 def add_or_replace(self, option: Option[MinimalSubjectiveState, Action]) -> None: 432 self._options[option.descriptor.option_id] = option 433 434 def remove(self, option_ids: Sequence[OptionId]) -> None: 435 for option_id in option_ids: 436 self._options.pop(option_id, None) 437 438 439class MinimalOptionLearner(OptionLearner[MinimalSubjectiveState, Action, MinimalInfo]): 440 """Creates one trivial option per subtask.""" 441 442 def __init__(self) -> None: 443 self._subtasks: dict[SubtaskId, SubtaskSpec] = {} 444 self._options: dict[OptionId, MinimalOption] = {} 445 446 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 447 for subtask in subtasks: 448 self._subtasks[subtask.subtask_id] = subtask 449 option_id = f"option:{subtask.subtask_id}" 450 self._options[option_id] = MinimalOption( 451 OptionDescriptor( 452 option_id=option_id, 453 name=f"Option for {subtask.subtask_id}", 454 subtask_id=subtask.subtask_id, 455 ) 456 ) 457 458 def update( 459 self, 460 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 461 ) -> None: 462 pass 463 464 def export_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]: 465 return tuple(self._options.values()) 466 467 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 468 for subtask_id in subtask_ids: 469 self._subtasks.pop(subtask_id, None) 470 self._options.pop(f"option:{subtask_id}", None) 471 472 473# ───────────────────────────────────────────────────────────────────── 474# Wiring 475# ───────────────────────────────────────────────────────────────────── 476 477 478def build_minimal_agent() -> ( 479 OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo] 480): 481 """Construct a fully wired fine-grained smoke-test OaK agent.""" 482 perception = CompositePerception( 483 state_builder=MinimalStateBuilder(), 484 feature_bank=MinimalFeatureBank(), 485 feature_constructor=MinimalFeatureConstructor(), 486 feature_ranker=MinimalFeatureRanker(), 487 subtask_generator=MinimalSubtaskGenerator(), 488 ) 489 transition_model = CompositeTransitionModel( 490 world_model=MinimalWorldModel(), 491 option_model_learner=MinimalOptionModelLearner(), 492 planner=MinimalPlanner(), 493 ) 494 value_function = CompositeValueFunction( 495 value_estimator=MinimalValueEstimator(), 496 utility_assessor=MinimalUtilityAssessor(), 497 curator=MinimalCurator(), 498 ) 499 action_selector = MinimalActionSelector() 500 reactive_policy = CompositeReactivePolicy( 501 action_selector=action_selector, 502 option_library=MinimalOptionLibrary(), 503 option_learner=MinimalOptionLearner(), 504 ) 505 return OaKAgent( 506 perception=perception, 507 transition_model=transition_model, 508 value_function=value_function, 509 reactive_policy=reactive_policy, 510 planning_budget=4, 511 ) 512 513 514def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]: 515 """Run a short smoke episode and return a compact trace.""" 516 world = MinimalWorld(horizon=horizon) 517 agent = build_minimal_agent() 518 step = world.reset() 519 agent.reset() 520 521 trace: list[MinimalTraceStep] = [] 522 523 for _ in range(horizon): 524 result = agent.step(step) 525 action = result.action 526 trace.append( 527 { 528 "subjective_state": result.subjective_state, 529 "action": action, 530 "active_option_id": result.active_option_id, 531 "created_subtasks": [ 532 subtask.subtask_id for subtask in result.created_subtasks 533 ], 534 "planning_budget_used": ( 535 int(result.planning_update.search_statistics["budget_used"]) 536 if result.planning_update is not None 537 else None 538 ), 539 } 540 ) 541 step = world.step(action) 542 if step.terminated: 543 break 544 545 return trace
80class MinimalStateBuilder( 81 StateBuilder[Observation, Action, MinimalSubjectiveState] 82): 83 """Direct observation-to-state mapping.""" 84 85 def __init__(self) -> None: 86 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 87 88 def reset(self) -> None: 89 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 90 91 def update( 92 self, 93 observation: Observation, 94 reward: float, 95 last_action: Action | None, 96 ) -> MinimalSubjectiveState: 97 self._state = MinimalSubjectiveState( 98 step_index=observation, 99 observation=observation, 100 reward=reward, 101 last_action=last_action, 102 ) 103 return self._state 104 105 def current_subjective_state(self) -> MinimalSubjectiveState: 106 return self._state
Direct observation-to-state mapping.
91 def update( 92 self, 93 observation: Observation, 94 reward: float, 95 last_action: Action | None, 96 ) -> MinimalSubjectiveState: 97 self._state = MinimalSubjectiveState( 98 step_index=observation, 99 observation=observation, 100 reward=reward, 101 last_action=last_action, 102 ) 103 return self._state
109class MinimalFeatureBank(FeatureBank[MinimalSubjectiveState]): 110 """Stores one fixed identity feature.""" 111 112 def __init__(self) -> None: 113 self._features: dict[FeatureId, FeatureSpec] = { 114 "observation": FeatureSpec( 115 feature_id="observation", 116 name="Observation value", 117 description="Identity feature for the integer observation.", 118 ) 119 } 120 121 def list_features(self) -> Sequence[FeatureSpec]: 122 return tuple(self._features.values()) 123 124 def activations( 125 self, 126 subjective_state: MinimalSubjectiveState, 127 ) -> Mapping[FeatureId, float]: 128 return {"observation": float(subjective_state.observation)} 129 130 def add_candidates( 131 self, candidates: Sequence[FeatureCandidate] 132 ) -> Sequence[FeatureSpec]: 133 added: list[FeatureSpec] = [] 134 for candidate in candidates: 135 feature = FeatureSpec( 136 feature_id=candidate.feature_id, 137 name=candidate.name, 138 description=candidate.description, 139 metadata=candidate.metadata, 140 ) 141 self._features[feature.feature_id] = feature 142 added.append(feature) 143 return tuple(added) 144 145 def remove(self, feature_ids: Sequence[FeatureId]) -> None: 146 for feature_id in feature_ids: 147 self._features.pop(feature_id, None)
Stores one fixed identity feature.
124 def activations( 125 self, 126 subjective_state: MinimalSubjectiveState, 127 ) -> Mapping[FeatureId, float]: 128 return {"observation": float(subjective_state.observation)}
Return per-feature activation values for the given state.
Intended for SubtaskGenerator implementations, which receive
the FeatureBank and may use activations to decide which
features warrant new subtasks.
130 def add_candidates( 131 self, candidates: Sequence[FeatureCandidate] 132 ) -> Sequence[FeatureSpec]: 133 added: list[FeatureSpec] = [] 134 for candidate in candidates: 135 feature = FeatureSpec( 136 feature_id=candidate.feature_id, 137 name=candidate.name, 138 description=candidate.description, 139 metadata=candidate.metadata, 140 ) 141 self._features[feature.feature_id] = feature 142 added.append(feature) 143 return tuple(added)
150class MinimalFeatureConstructor(FeatureConstructor[MinimalSubjectiveState]): 151 """Never proposes new features.""" 152 153 def propose( 154 self, 155 subjective_state: MinimalSubjectiveState, 156 active_features: Sequence[FeatureSpec], 157 ) -> Sequence[FeatureCandidate]: 158 return ()
Never proposes new features.
161class MinimalFeatureRanker(FeatureRanker): 162 """Ranks features in their existing order.""" 163 164 def rank( 165 self, 166 features: Sequence[FeatureSpec], 167 utilities: Sequence[UtilityRecord], 168 limit: int | None = None, 169 ) -> Sequence[FeatureId]: 170 feature_ids = [feature.feature_id for feature in features] 171 if limit is None: 172 return tuple(feature_ids) 173 return tuple(feature_ids[:limit])
Ranks features in their existing order.
164 def rank( 165 self, 166 features: Sequence[FeatureSpec], 167 utilities: Sequence[UtilityRecord], 168 limit: int | None = None, 169 ) -> Sequence[FeatureId]: 170 feature_ids = [feature.feature_id for feature in features] 171 if limit is None: 172 return tuple(feature_ids) 173 return tuple(feature_ids[:limit])
176class MinimalSubtaskGenerator(SubtaskGenerator[MinimalSubjectiveState]): 177 """Creates at most one subtask per feature.""" 178 179 def __init__(self) -> None: 180 self._created_subtask_for: set[FeatureId] = set() 181 182 def generate( 183 self, 184 ranked_feature_ids: Sequence[FeatureId], 185 feature_bank: FeatureBank[MinimalSubjectiveState], 186 ) -> Sequence[SubtaskSpec]: 187 created: list[SubtaskSpec] = [] 188 feature_specs = { 189 feature.feature_id: feature for feature in feature_bank.list_features() 190 } 191 for feature_id in ranked_feature_ids: 192 if feature_id in self._created_subtask_for: 193 continue 194 self._created_subtask_for.add(feature_id) 195 feature = feature_specs[feature_id] 196 created.append( 197 SubtaskSpec( 198 subtask_id=f"subtask:{feature_id}", 199 name=f"Track {feature.name}", 200 feature_id=feature_id, 201 ) 202 ) 203 return tuple(created)
Creates at most one subtask per feature.
182 def generate( 183 self, 184 ranked_feature_ids: Sequence[FeatureId], 185 feature_bank: FeatureBank[MinimalSubjectiveState], 186 ) -> Sequence[SubtaskSpec]: 187 created: list[SubtaskSpec] = [] 188 feature_specs = { 189 feature.feature_id: feature for feature in feature_bank.list_features() 190 } 191 for feature_id in ranked_feature_ids: 192 if feature_id in self._created_subtask_for: 193 continue 194 self._created_subtask_for.add(feature_id) 195 feature = feature_specs[feature_id] 196 created.append( 197 SubtaskSpec( 198 subtask_id=f"subtask:{feature_id}", 199 name=f"Track {feature.name}", 200 feature_id=feature_id, 201 ) 202 ) 203 return tuple(created)
211class MinimalWorldModel(WorldModel[MinimalSubjectiveState, Action, MinimalInfo]): 212 """Trivial planner-facing model.""" 213 214 def update( 215 self, 216 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 217 ) -> None: 218 pass 219 220 def predict_action( 221 self, 222 subjective_state: MinimalSubjectiveState, 223 action: Action, 224 ) -> ModelPrediction[MinimalSubjectiveState]: 225 return ModelPrediction( 226 predicted_subjective_state=subjective_state, 227 cumulative_reward=0.0, 228 steps=1, 229 ) 230 231 def predict_option( 232 self, 233 subjective_state: MinimalSubjectiveState, 234 option_id: OptionId, 235 ) -> ModelPrediction[MinimalSubjectiveState]: 236 return ModelPrediction( 237 predicted_subjective_state=subjective_state, 238 cumulative_reward=0.0, 239 steps=1, 240 ) 241 242 def add_or_replace_option_models( 243 self, models: Sequence[OptionModel[MinimalSubjectiveState]] 244 ) -> None: 245 pass 246 247 def remove_option_models(self, option_ids: Sequence[OptionId]) -> None: 248 pass
Trivial planner-facing model.
251class MinimalOptionModelLearner( 252 OptionModelLearner[MinimalSubjectiveState, Action, MinimalInfo] 253): 254 """No-op option-model learner.""" 255 256 def update( 257 self, 258 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 259 ) -> None: 260 pass 261 262 def export_models(self) -> Sequence[OptionModel[MinimalSubjectiveState]]: 263 return ()
No-op option-model learner.
266class MinimalPlanner(Planner[MinimalSubjectiveState, Action, MinimalInfo]): 267 """Returns one-step value targets without real search.""" 268 269 def plan_step( 270 self, 271 subjective_state: MinimalSubjectiveState, 272 model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo], 273 value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo], 274 budget: int, 275 ) -> PlanningUpdate[Action]: 276 return PlanningUpdate( 277 value_targets=value_function.predict(subjective_state), 278 policy_targets={"preferred_action": 0}, 279 search_statistics={"budget_used": budget}, 280 )
Returns one-step value targets without real search.
269 def plan_step( 270 self, 271 subjective_state: MinimalSubjectiveState, 272 model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo], 273 value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo], 274 budget: int, 275 ) -> PlanningUpdate[Action]: 276 return PlanningUpdate( 277 value_targets=value_function.predict(subjective_state), 278 policy_targets={"preferred_action": 0}, 279 search_statistics={"budget_used": budget}, 280 )
288class MinimalValueEstimator( 289 ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo] 290): 291 """Stores latest reward as the only value estimate.""" 292 293 def __init__(self) -> None: 294 self._value: float = 0.0 295 296 def list_general_value_functions( 297 self, 298 ) -> Sequence[ 299 GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo] 300 ]: 301 return () 302 303 def predict( 304 self, 305 subjective_state: MinimalSubjectiveState, 306 ) -> Mapping[GeneralValueFunctionId, float]: 307 return {"main": self._value} 308 309 def update( 310 self, 311 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 312 ) -> Mapping[GeneralValueFunctionId, float]: 313 self._value = transition.reward 314 return {"main": 0.0} 315 316 def add_or_replace( 317 self, 318 learner: GeneralValueFunctionLearner[ 319 MinimalSubjectiveState, Action, MinimalInfo 320 ], 321 ) -> None: 322 pass 323 324 def remove( 325 self, 326 general_value_function_ids: Sequence[GeneralValueFunctionId], 327 ) -> None: 328 pass
Stores latest reward as the only value estimate.
296 def list_general_value_functions( 297 self, 298 ) -> Sequence[ 299 GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo] 300 ]: 301 return ()
Return all managed GVF learners.
Intended for Planner implementations that need to inspect
the GVF bank (e.g., to evaluate auxiliary predictions during
planning).
316 def add_or_replace( 317 self, 318 learner: GeneralValueFunctionLearner[ 319 MinimalSubjectiveState, Action, MinimalInfo 320 ], 321 ) -> None: 322 pass
Add or replace a GVF learner in the bank.
Used for dynamic GVF management, e.g., creating new GVFs when new subtasks or options are discovered.
331class MinimalUtilityAssessor(UtilityAssessor): 332 """Aggregates usage records into simple counts.""" 333 334 def __init__(self) -> None: 335 self._usage_records: list[UsageRecord] = [] 336 337 def observe(self, usage: Sequence[UsageRecord]) -> None: 338 self._usage_records.extend(usage) 339 340 def scores(self) -> Sequence[UtilityRecord]: 341 totals: dict[tuple[str, str], float] = {} 342 latest: dict[tuple[str, str], UsageRecord] = {} 343 for record in self._usage_records: 344 key = (record.kind.value, record.component_id) 345 totals[key] = totals.get(key, 0.0) + record.amount 346 latest[key] = record 347 return tuple( 348 UtilityRecord( 349 kind=record.kind, 350 component_id=record.component_id, 351 utility=totals[key], 352 ) 353 for key, record in latest.items() 354 )
Aggregates usage records into simple counts.
340 def scores(self) -> Sequence[UtilityRecord]: 341 totals: dict[tuple[str, str], float] = {} 342 latest: dict[tuple[str, str], UsageRecord] = {} 343 for record in self._usage_records: 344 key = (record.kind.value, record.component_id) 345 totals[key] = totals.get(key, 0.0) + record.amount 346 latest[key] = record 347 return tuple( 348 UtilityRecord( 349 kind=record.kind, 350 component_id=record.component_id, 351 utility=totals[key], 352 ) 353 for key, record in latest.items() 354 )
357class MinimalCurator(Curator): 358 """Never prunes.""" 359 360 def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision: 361 return CurationDecision()
Never prunes.
369@dataclass 370class MinimalOption(Option[MinimalSubjectiveState, Action]): 371 """Trivial option that always emits action=1 and stops immediately.""" 372 373 _descriptor: OptionDescriptor 374 _action: Action = 1 375 376 @property 377 def descriptor(self) -> OptionDescriptor: 378 return self._descriptor 379 380 def is_available(self, subjective_state: MinimalSubjectiveState) -> bool: 381 return True 382 383 def act(self, subjective_state: MinimalSubjectiveState) -> Action: 384 return self._action 385 386 def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float: 387 return 1.0
Trivial option that always emits action=1 and stops immediately.
Whether this option can be initiated in the given state.
Intended for ActionSelector implementations, which receive
available options and may filter by initiation conditions.
390class MinimalActionSelector(ActionSelector[MinimalSubjectiveState, Action]): 391 """Alternates primitive actions and option selection.""" 392 393 def __init__(self) -> None: 394 self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {} 395 self.last_planning_update: PlanningUpdate[Action] | None = None 396 397 def decide( 398 self, 399 subjective_state: MinimalSubjectiveState, 400 active_option: Option[MinimalSubjectiveState, Action] | None, 401 available_options: Sequence[Option[MinimalSubjectiveState, Action]], 402 ) -> PolicyDecision[Action]: 403 if subjective_state.observation % 2 == 0: 404 return PolicyDecision(action=0) 405 if available_options: 406 return PolicyDecision(option_id=available_options[0].descriptor.option_id) 407 return PolicyDecision(action=1) 408 409 def update_from_values( 410 self, 411 subjective_state: MinimalSubjectiveState, 412 td_errors: Mapping[GeneralValueFunctionId, float], 413 ) -> None: 414 self.last_td_errors = dict(td_errors) 415 416 def apply_planning_update(self, update: PlanningUpdate[Action]) -> None: 417 self.last_planning_update = update
Alternates primitive actions and option selection.
397 def decide( 398 self, 399 subjective_state: MinimalSubjectiveState, 400 active_option: Option[MinimalSubjectiveState, Action] | None, 401 available_options: Sequence[Option[MinimalSubjectiveState, Action]], 402 ) -> PolicyDecision[Action]: 403 if subjective_state.observation % 2 == 0: 404 return PolicyDecision(action=0) 405 if available_options: 406 return PolicyDecision(option_id=available_options[0].descriptor.option_id) 407 return PolicyDecision(action=1)
420class MinimalOptionLibrary(OptionLibrary[MinimalSubjectiveState, Action]): 421 """Stores learned options.""" 422 423 def __init__(self) -> None: 424 self._options: dict[OptionId, Option[MinimalSubjectiveState, Action]] = {} 425 426 def list_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]: 427 return tuple(self._options.values()) 428 429 def get(self, option_id: OptionId) -> Option[MinimalSubjectiveState, Action]: 430 return self._options[option_id] 431 432 def add_or_replace(self, option: Option[MinimalSubjectiveState, Action]) -> None: 433 self._options[option.descriptor.option_id] = option 434 435 def remove(self, option_ids: Sequence[OptionId]) -> None: 436 for option_id in option_ids: 437 self._options.pop(option_id, None)
Stores learned options.
440class MinimalOptionLearner(OptionLearner[MinimalSubjectiveState, Action, MinimalInfo]): 441 """Creates one trivial option per subtask.""" 442 443 def __init__(self) -> None: 444 self._subtasks: dict[SubtaskId, SubtaskSpec] = {} 445 self._options: dict[OptionId, MinimalOption] = {} 446 447 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 448 for subtask in subtasks: 449 self._subtasks[subtask.subtask_id] = subtask 450 option_id = f"option:{subtask.subtask_id}" 451 self._options[option_id] = MinimalOption( 452 OptionDescriptor( 453 option_id=option_id, 454 name=f"Option for {subtask.subtask_id}", 455 subtask_id=subtask.subtask_id, 456 ) 457 ) 458 459 def update( 460 self, 461 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 462 ) -> None: 463 pass 464 465 def export_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]: 466 return tuple(self._options.values()) 467 468 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 469 for subtask_id in subtask_ids: 470 self._subtasks.pop(subtask_id, None) 471 self._options.pop(f"option:{subtask_id}", None)
Creates one trivial option per subtask.
447 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 448 for subtask in subtasks: 449 self._subtasks[subtask.subtask_id] = subtask 450 option_id = f"option:{subtask.subtask_id}" 451 self._options[option_id] = MinimalOption( 452 OptionDescriptor( 453 option_id=option_id, 454 name=f"Option for {subtask.subtask_id}", 455 subtask_id=subtask.subtask_id, 456 ) 457 )
479def build_minimal_agent() -> ( 480 OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo] 481): 482 """Construct a fully wired fine-grained smoke-test OaK agent.""" 483 perception = CompositePerception( 484 state_builder=MinimalStateBuilder(), 485 feature_bank=MinimalFeatureBank(), 486 feature_constructor=MinimalFeatureConstructor(), 487 feature_ranker=MinimalFeatureRanker(), 488 subtask_generator=MinimalSubtaskGenerator(), 489 ) 490 transition_model = CompositeTransitionModel( 491 world_model=MinimalWorldModel(), 492 option_model_learner=MinimalOptionModelLearner(), 493 planner=MinimalPlanner(), 494 ) 495 value_function = CompositeValueFunction( 496 value_estimator=MinimalValueEstimator(), 497 utility_assessor=MinimalUtilityAssessor(), 498 curator=MinimalCurator(), 499 ) 500 action_selector = MinimalActionSelector() 501 reactive_policy = CompositeReactivePolicy( 502 action_selector=action_selector, 503 option_library=MinimalOptionLibrary(), 504 option_learner=MinimalOptionLearner(), 505 ) 506 return OaKAgent( 507 perception=perception, 508 transition_model=transition_model, 509 value_function=value_function, 510 reactive_policy=reactive_policy, 511 planning_budget=4, 512 )
Construct a fully wired fine-grained smoke-test OaK agent.
515def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]: 516 """Run a short smoke episode and return a compact trace.""" 517 world = MinimalWorld(horizon=horizon) 518 agent = build_minimal_agent() 519 step = world.reset() 520 agent.reset() 521 522 trace: list[MinimalTraceStep] = [] 523 524 for _ in range(horizon): 525 result = agent.step(step) 526 action = result.action 527 trace.append( 528 { 529 "subjective_state": result.subjective_state, 530 "action": action, 531 "active_option_id": result.active_option_id, 532 "created_subtasks": [ 533 subtask.subtask_id for subtask in result.created_subtasks 534 ], 535 "planning_budget_used": ( 536 int(result.planning_update.search_statistics["budget_used"]) 537 if result.planning_update is not None 538 else None 539 ), 540 } 541 ) 542 step = world.step(action) 543 if step.terminated: 544 break 545 546 return trace
Run a short smoke episode and return a compact trace.