examples.smoke.minimal_oak_fine_grained
1from __future__ import annotations 2 3"""Bare-minimum OaK example built from fine-grained components. 4 5This mirrors `examples/minimal_oak.py`, but instead of implementing the four 6main OaK interfaces directly, it assembles them from the optional fine-grained 7building blocks in `oak.fine_grained`. 8 9The behavior is intentionally the same as the direct example: 10 11- a tiny integer world 12- a direct observation-to-subjective_state state builder 13- one fixed identity feature 14- no-op model learning with trivial planning 15- a simple value tracker with usage counting and no curation 16- a reactive policy that alternates actions and options 17""" 18 19from dataclasses import dataclass 20from typing import Mapping, Sequence 21 22from oak.agent import OaKAgent 23from oak.fine_grained import ( 24 ActionSelector, 25 CompositePerception, 26 CompositeReactivePolicy, 27 CompositeTransitionModel, 28 CompositeValueFunction, 29 Curator, 30 FeatureBank, 31 FeatureConstructor, 32 FeatureRanker, 33 GeneralValueFunctionLearner, 34 Option, 35 OptionLearner, 36 OptionLibrary, 37 OptionModel, 38 OptionModelLearner, 39 Planner, 40 StateBuilder, 41 SubtaskGenerator, 42 UtilityAssessor, 43 ValueEstimator, 44 WorldModel, 45) 46from oak.types import ( 47 CurationDecision, 48 FeatureCandidate, 49 FeatureId, 50 FeatureSpec, 51 GeneralValueFunctionId, 52 ModelPrediction, 53 OptionDescriptor, 54 OptionId, 55 PlanningUpdate, 56 PolicyDecision, 57 SubtaskId, 58 SubtaskSpec, 59 Transition, 60 UsageRecord, 61 UtilityRecord, 62) 63 64from .minimal_oak import ( 65 Action, 66 MinimalInfo, 67 MinimalSubjectiveState, 68 MinimalTraceStep, 69 MinimalWorld, 70 Observation, 71 _planning_budget_used, 72) 73 74 75# ───────────────────────────────────────────────────────────────────── 76# Perception components 77# ───────────────────────────────────────────────────────────────────── 78 79 80class MinimalStateBuilder(StateBuilder[Observation, Action, MinimalSubjectiveState]): 81 """Direct observation-to-state mapping.""" 82 83 def __init__(self) -> None: 84 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 85 86 def reset(self) -> None: 87 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 88 89 def update( 90 self, 91 observation: Observation, 92 reward: float, 93 last_action: Action | None, 94 ) -> MinimalSubjectiveState: 95 self._state = MinimalSubjectiveState( 96 step_index=observation, 97 observation=observation, 98 reward=reward, 99 last_action=last_action, 100 ) 101 return self._state 102 103 def current_subjective_state(self) -> MinimalSubjectiveState: 104 return self._state 105 106 107class MinimalFeatureBank(FeatureBank[MinimalSubjectiveState]): 108 """Stores one fixed identity feature.""" 109 110 def __init__(self) -> None: 111 self._features: dict[FeatureId, FeatureSpec] = { 112 "observation": FeatureSpec( 113 feature_id="observation", 114 name="Observation value", 115 description="Identity feature for the integer observation.", 116 ) 117 } 118 119 def list_features(self) -> Sequence[FeatureSpec]: 120 return tuple(self._features.values()) 121 122 def activations( 123 self, 124 subjective_state: MinimalSubjectiveState, 125 ) -> Mapping[FeatureId, float]: 126 return {"observation": float(subjective_state.observation)} 127 128 def add_candidates( 129 self, candidates: Sequence[FeatureCandidate] 130 ) -> Sequence[FeatureSpec]: 131 added: list[FeatureSpec] = [] 132 for candidate in candidates: 133 feature = FeatureSpec( 134 feature_id=candidate.feature_id, 135 name=candidate.name, 136 description=candidate.description, 137 metadata=candidate.metadata, 138 ) 139 self._features[feature.feature_id] = feature 140 added.append(feature) 141 return tuple(added) 142 143 def remove(self, feature_ids: Sequence[FeatureId]) -> None: 144 for feature_id in feature_ids: 145 self._features.pop(feature_id, None) 146 147 148class MinimalFeatureConstructor(FeatureConstructor[MinimalSubjectiveState]): 149 """Never proposes new features.""" 150 151 def propose( 152 self, 153 subjective_state: MinimalSubjectiveState, 154 active_features: Sequence[FeatureSpec], 155 ) -> Sequence[FeatureCandidate]: 156 return () 157 158 159class MinimalFeatureRanker(FeatureRanker): 160 """Ranks features in their existing order.""" 161 162 def rank( 163 self, 164 features: Sequence[FeatureSpec], 165 utilities: Sequence[UtilityRecord], 166 limit: int | None = None, 167 ) -> Sequence[FeatureId]: 168 feature_ids = [feature.feature_id for feature in features] 169 if limit is None: 170 return tuple(feature_ids) 171 return tuple(feature_ids[:limit]) 172 173 174class MinimalSubtaskGenerator(SubtaskGenerator[MinimalSubjectiveState]): 175 """Creates at most one subtask per feature.""" 176 177 def __init__(self) -> None: 178 self._created_subtask_for: set[FeatureId] = set() 179 180 def generate( 181 self, 182 ranked_feature_ids: Sequence[FeatureId], 183 feature_bank: FeatureBank[MinimalSubjectiveState], 184 ) -> Sequence[SubtaskSpec]: 185 created: list[SubtaskSpec] = [] 186 feature_specs = { 187 feature.feature_id: feature for feature in feature_bank.list_features() 188 } 189 for feature_id in ranked_feature_ids: 190 if feature_id in self._created_subtask_for: 191 continue 192 self._created_subtask_for.add(feature_id) 193 feature = feature_specs[feature_id] 194 created.append( 195 SubtaskSpec( 196 subtask_id=f"subtask:{feature_id}", 197 name=f"Track {feature.name}", 198 feature_id=feature_id, 199 ) 200 ) 201 return tuple(created) 202 203 204# ───────────────────────────────────────────────────────────────────── 205# Transition-model components 206# ───────────────────────────────────────────────────────────────────── 207 208 209class MinimalWorldModel(WorldModel[MinimalSubjectiveState, Action, MinimalInfo]): 210 """Trivial planner-facing model.""" 211 212 def update( 213 self, 214 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 215 ) -> None: 216 pass 217 218 def predict_action( 219 self, 220 subjective_state: MinimalSubjectiveState, 221 action: Action, 222 ) -> ModelPrediction[MinimalSubjectiveState]: 223 return ModelPrediction( 224 predicted_subjective_state=subjective_state, 225 cumulative_reward=0.0, 226 steps=1, 227 ) 228 229 def predict_option( 230 self, 231 subjective_state: MinimalSubjectiveState, 232 option_id: OptionId, 233 ) -> ModelPrediction[MinimalSubjectiveState]: 234 return ModelPrediction( 235 predicted_subjective_state=subjective_state, 236 cumulative_reward=0.0, 237 steps=1, 238 ) 239 240 def add_or_replace_option_models( 241 self, models: Sequence[OptionModel[MinimalSubjectiveState]] 242 ) -> None: 243 pass 244 245 def remove_option_models(self, option_ids: Sequence[OptionId]) -> None: 246 pass 247 248 249class MinimalOptionModelLearner( 250 OptionModelLearner[MinimalSubjectiveState, Action, MinimalInfo] 251): 252 """No-op option-model learner.""" 253 254 def update( 255 self, 256 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 257 ) -> None: 258 pass 259 260 def export_models(self) -> Sequence[OptionModel[MinimalSubjectiveState]]: 261 return () 262 263 264class MinimalPlanner(Planner[MinimalSubjectiveState, Action, MinimalInfo]): 265 """Returns one-step value targets without real search.""" 266 267 def plan_step( 268 self, 269 subjective_state: MinimalSubjectiveState, 270 model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo], 271 value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo], 272 budget: int, 273 ) -> PlanningUpdate[Action]: 274 return PlanningUpdate( 275 value_targets=value_function.predict(subjective_state), 276 policy_targets={"preferred_action": 0}, 277 search_statistics={"budget_used": budget}, 278 ) 279 280 281# ───────────────────────────────────────────────────────────────────── 282# Value-function components 283# ───────────────────────────────────────────────────────────────────── 284 285 286class MinimalValueEstimator( 287 ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo] 288): 289 """Stores latest reward as the only value estimate.""" 290 291 def __init__(self) -> None: 292 self._value: float = 0.0 293 294 def list_general_value_functions( 295 self, 296 ) -> Sequence[ 297 GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo] 298 ]: 299 return () 300 301 def predict( 302 self, 303 subjective_state: MinimalSubjectiveState, 304 ) -> Mapping[GeneralValueFunctionId, float]: 305 return {"main": self._value} 306 307 def update( 308 self, 309 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 310 ) -> Mapping[GeneralValueFunctionId, float]: 311 self._value = transition.reward 312 return {"main": 0.0} 313 314 def add_or_replace( 315 self, 316 learner: GeneralValueFunctionLearner[ 317 MinimalSubjectiveState, Action, MinimalInfo 318 ], 319 ) -> None: 320 pass 321 322 def remove( 323 self, 324 general_value_function_ids: Sequence[GeneralValueFunctionId], 325 ) -> None: 326 pass 327 328 329class MinimalUtilityAssessor(UtilityAssessor): 330 """Aggregates usage records into simple counts.""" 331 332 def __init__(self) -> None: 333 self._usage_records: list[UsageRecord] = [] 334 335 def observe(self, usage: Sequence[UsageRecord]) -> None: 336 self._usage_records.extend(usage) 337 338 def scores(self) -> Sequence[UtilityRecord]: 339 totals: dict[tuple[str, str], float] = {} 340 latest: dict[tuple[str, str], UsageRecord] = {} 341 for record in self._usage_records: 342 key = (record.kind.value, record.component_id) 343 totals[key] = totals.get(key, 0.0) + record.amount 344 latest[key] = record 345 return tuple( 346 UtilityRecord( 347 kind=record.kind, 348 component_id=record.component_id, 349 utility=totals[key], 350 ) 351 for key, record in latest.items() 352 ) 353 354 355class MinimalCurator(Curator): 356 """Never prunes.""" 357 358 def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision: 359 return CurationDecision() 360 361 362# ───────────────────────────────────────────────────────────────────── 363# Reactive-policy components 364# ───────────────────────────────────────────────────────────────────── 365 366 367@dataclass 368class MinimalOption(Option[MinimalSubjectiveState, Action]): 369 """Trivial option that always emits action=1 and stops immediately.""" 370 371 _descriptor: OptionDescriptor 372 _action: Action = 1 373 374 @property 375 def descriptor(self) -> OptionDescriptor: 376 return self._descriptor 377 378 def is_available(self, subjective_state: MinimalSubjectiveState) -> bool: 379 return True 380 381 def act(self, subjective_state: MinimalSubjectiveState) -> Action: 382 return self._action 383 384 def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float: 385 return 1.0 386 387 388class MinimalActionSelector(ActionSelector[MinimalSubjectiveState, Action]): 389 """Alternates primitive actions and option selection.""" 390 391 def __init__(self) -> None: 392 self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {} 393 self.last_planning_update: PlanningUpdate[Action] | None = None 394 395 def decide( 396 self, 397 subjective_state: MinimalSubjectiveState, 398 active_option: Option[MinimalSubjectiveState, Action] | None, 399 available_options: Sequence[Option[MinimalSubjectiveState, Action]], 400 ) -> PolicyDecision[Action]: 401 if subjective_state.observation % 2 == 0: 402 return PolicyDecision(action=0) 403 if available_options: 404 return PolicyDecision(option_id=available_options[0].descriptor.option_id) 405 return PolicyDecision(action=1) 406 407 def update_from_values( 408 self, 409 subjective_state: MinimalSubjectiveState, 410 td_errors: Mapping[GeneralValueFunctionId, float], 411 ) -> None: 412 self.last_td_errors = dict(td_errors) 413 414 def apply_planning_update(self, update: PlanningUpdate[Action]) -> None: 415 self.last_planning_update = update 416 417 418class MinimalOptionLibrary(OptionLibrary[MinimalSubjectiveState, Action]): 419 """Stores learned options.""" 420 421 def __init__(self) -> None: 422 self._options: dict[OptionId, Option[MinimalSubjectiveState, Action]] = {} 423 424 def list_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]: 425 return tuple(self._options.values()) 426 427 def get(self, option_id: OptionId) -> Option[MinimalSubjectiveState, Action]: 428 return self._options[option_id] 429 430 def add_or_replace(self, option: Option[MinimalSubjectiveState, Action]) -> None: 431 self._options[option.descriptor.option_id] = option 432 433 def remove(self, option_ids: Sequence[OptionId]) -> None: 434 for option_id in option_ids: 435 self._options.pop(option_id, None) 436 437 438class MinimalOptionLearner(OptionLearner[MinimalSubjectiveState, Action, MinimalInfo]): 439 """Creates one trivial option per subtask.""" 440 441 def __init__(self) -> None: 442 self._subtasks: dict[SubtaskId, SubtaskSpec] = {} 443 self._options: dict[OptionId, MinimalOption] = {} 444 445 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 446 for subtask in subtasks: 447 self._subtasks[subtask.subtask_id] = subtask 448 option_id = f"option:{subtask.subtask_id}" 449 self._options[option_id] = MinimalOption( 450 OptionDescriptor( 451 option_id=option_id, 452 name=f"Option for {subtask.subtask_id}", 453 subtask_id=subtask.subtask_id, 454 ) 455 ) 456 457 def update( 458 self, 459 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 460 ) -> None: 461 pass 462 463 def export_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]: 464 return tuple(self._options.values()) 465 466 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 467 for subtask_id in subtask_ids: 468 self._subtasks.pop(subtask_id, None) 469 self._options.pop(f"option:{subtask_id}", None) 470 471 472# ───────────────────────────────────────────────────────────────────── 473# Wiring 474# ───────────────────────────────────────────────────────────────────── 475 476 477def build_minimal_agent() -> ( 478 OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo] 479): 480 """Construct a fully wired fine-grained smoke-test OaK agent.""" 481 perception = CompositePerception( 482 state_builder=MinimalStateBuilder(), 483 feature_bank=MinimalFeatureBank(), 484 feature_constructor=MinimalFeatureConstructor(), 485 feature_ranker=MinimalFeatureRanker(), 486 subtask_generator=MinimalSubtaskGenerator(), 487 ) 488 transition_model = CompositeTransitionModel( 489 world_model=MinimalWorldModel(), 490 option_model_learner=MinimalOptionModelLearner(), 491 planner=MinimalPlanner(), 492 ) 493 value_function = CompositeValueFunction( 494 value_estimator=MinimalValueEstimator(), 495 utility_assessor=MinimalUtilityAssessor(), 496 curator=MinimalCurator(), 497 ) 498 action_selector = MinimalActionSelector() 499 reactive_policy = CompositeReactivePolicy( 500 action_selector=action_selector, 501 option_library=MinimalOptionLibrary(), 502 option_learner=MinimalOptionLearner(), 503 ) 504 return OaKAgent( 505 perception=perception, 506 transition_model=transition_model, 507 value_function=value_function, 508 reactive_policy=reactive_policy, 509 planning_budget=4, 510 ) 511 512 513def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]: 514 """Run a short smoke episode and return a compact trace.""" 515 world = MinimalWorld(horizon=horizon) 516 agent = build_minimal_agent() 517 step = world.reset() 518 agent.reset() 519 520 trace: list[MinimalTraceStep] = [] 521 522 for _ in range(horizon): 523 result = agent.step(step) 524 action = result.action 525 trace.append( 526 { 527 "subjective_state": result.subjective_state, 528 "action": action, 529 "active_option_id": result.active_option_id, 530 "created_subtasks": [ 531 subtask.subtask_id for subtask in result.created_subtasks 532 ], 533 "planning_budget_used": _planning_budget_used(result.planning_update), 534 } 535 ) 536 step = world.step(action) 537 if step.terminated: 538 break 539 540 return trace
81class MinimalStateBuilder(StateBuilder[Observation, Action, MinimalSubjectiveState]): 82 """Direct observation-to-state mapping.""" 83 84 def __init__(self) -> None: 85 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 86 87 def reset(self) -> None: 88 self._state = MinimalSubjectiveState(0, 0, 0.0, None) 89 90 def update( 91 self, 92 observation: Observation, 93 reward: float, 94 last_action: Action | None, 95 ) -> MinimalSubjectiveState: 96 self._state = MinimalSubjectiveState( 97 step_index=observation, 98 observation=observation, 99 reward=reward, 100 last_action=last_action, 101 ) 102 return self._state 103 104 def current_subjective_state(self) -> MinimalSubjectiveState: 105 return self._state
Direct observation-to-state mapping.
90 def update( 91 self, 92 observation: Observation, 93 reward: float, 94 last_action: Action | None, 95 ) -> MinimalSubjectiveState: 96 self._state = MinimalSubjectiveState( 97 step_index=observation, 98 observation=observation, 99 reward=reward, 100 last_action=last_action, 101 ) 102 return self._state
108class MinimalFeatureBank(FeatureBank[MinimalSubjectiveState]): 109 """Stores one fixed identity feature.""" 110 111 def __init__(self) -> None: 112 self._features: dict[FeatureId, FeatureSpec] = { 113 "observation": FeatureSpec( 114 feature_id="observation", 115 name="Observation value", 116 description="Identity feature for the integer observation.", 117 ) 118 } 119 120 def list_features(self) -> Sequence[FeatureSpec]: 121 return tuple(self._features.values()) 122 123 def activations( 124 self, 125 subjective_state: MinimalSubjectiveState, 126 ) -> Mapping[FeatureId, float]: 127 return {"observation": float(subjective_state.observation)} 128 129 def add_candidates( 130 self, candidates: Sequence[FeatureCandidate] 131 ) -> Sequence[FeatureSpec]: 132 added: list[FeatureSpec] = [] 133 for candidate in candidates: 134 feature = FeatureSpec( 135 feature_id=candidate.feature_id, 136 name=candidate.name, 137 description=candidate.description, 138 metadata=candidate.metadata, 139 ) 140 self._features[feature.feature_id] = feature 141 added.append(feature) 142 return tuple(added) 143 144 def remove(self, feature_ids: Sequence[FeatureId]) -> None: 145 for feature_id in feature_ids: 146 self._features.pop(feature_id, None)
Stores one fixed identity feature.
123 def activations( 124 self, 125 subjective_state: MinimalSubjectiveState, 126 ) -> Mapping[FeatureId, float]: 127 return {"observation": float(subjective_state.observation)}
Return per-feature activation values for the given state.
Intended for SubtaskGenerator implementations, which receive
the FeatureBank and may use activations to decide which
features warrant new subtasks.
129 def add_candidates( 130 self, candidates: Sequence[FeatureCandidate] 131 ) -> Sequence[FeatureSpec]: 132 added: list[FeatureSpec] = [] 133 for candidate in candidates: 134 feature = FeatureSpec( 135 feature_id=candidate.feature_id, 136 name=candidate.name, 137 description=candidate.description, 138 metadata=candidate.metadata, 139 ) 140 self._features[feature.feature_id] = feature 141 added.append(feature) 142 return tuple(added)
149class MinimalFeatureConstructor(FeatureConstructor[MinimalSubjectiveState]): 150 """Never proposes new features.""" 151 152 def propose( 153 self, 154 subjective_state: MinimalSubjectiveState, 155 active_features: Sequence[FeatureSpec], 156 ) -> Sequence[FeatureCandidate]: 157 return ()
Never proposes new features.
160class MinimalFeatureRanker(FeatureRanker): 161 """Ranks features in their existing order.""" 162 163 def rank( 164 self, 165 features: Sequence[FeatureSpec], 166 utilities: Sequence[UtilityRecord], 167 limit: int | None = None, 168 ) -> Sequence[FeatureId]: 169 feature_ids = [feature.feature_id for feature in features] 170 if limit is None: 171 return tuple(feature_ids) 172 return tuple(feature_ids[:limit])
Ranks features in their existing order.
163 def rank( 164 self, 165 features: Sequence[FeatureSpec], 166 utilities: Sequence[UtilityRecord], 167 limit: int | None = None, 168 ) -> Sequence[FeatureId]: 169 feature_ids = [feature.feature_id for feature in features] 170 if limit is None: 171 return tuple(feature_ids) 172 return tuple(feature_ids[:limit])
175class MinimalSubtaskGenerator(SubtaskGenerator[MinimalSubjectiveState]): 176 """Creates at most one subtask per feature.""" 177 178 def __init__(self) -> None: 179 self._created_subtask_for: set[FeatureId] = set() 180 181 def generate( 182 self, 183 ranked_feature_ids: Sequence[FeatureId], 184 feature_bank: FeatureBank[MinimalSubjectiveState], 185 ) -> Sequence[SubtaskSpec]: 186 created: list[SubtaskSpec] = [] 187 feature_specs = { 188 feature.feature_id: feature for feature in feature_bank.list_features() 189 } 190 for feature_id in ranked_feature_ids: 191 if feature_id in self._created_subtask_for: 192 continue 193 self._created_subtask_for.add(feature_id) 194 feature = feature_specs[feature_id] 195 created.append( 196 SubtaskSpec( 197 subtask_id=f"subtask:{feature_id}", 198 name=f"Track {feature.name}", 199 feature_id=feature_id, 200 ) 201 ) 202 return tuple(created)
Creates at most one subtask per feature.
181 def generate( 182 self, 183 ranked_feature_ids: Sequence[FeatureId], 184 feature_bank: FeatureBank[MinimalSubjectiveState], 185 ) -> Sequence[SubtaskSpec]: 186 created: list[SubtaskSpec] = [] 187 feature_specs = { 188 feature.feature_id: feature for feature in feature_bank.list_features() 189 } 190 for feature_id in ranked_feature_ids: 191 if feature_id in self._created_subtask_for: 192 continue 193 self._created_subtask_for.add(feature_id) 194 feature = feature_specs[feature_id] 195 created.append( 196 SubtaskSpec( 197 subtask_id=f"subtask:{feature_id}", 198 name=f"Track {feature.name}", 199 feature_id=feature_id, 200 ) 201 ) 202 return tuple(created)
210class MinimalWorldModel(WorldModel[MinimalSubjectiveState, Action, MinimalInfo]): 211 """Trivial planner-facing model.""" 212 213 def update( 214 self, 215 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 216 ) -> None: 217 pass 218 219 def predict_action( 220 self, 221 subjective_state: MinimalSubjectiveState, 222 action: Action, 223 ) -> ModelPrediction[MinimalSubjectiveState]: 224 return ModelPrediction( 225 predicted_subjective_state=subjective_state, 226 cumulative_reward=0.0, 227 steps=1, 228 ) 229 230 def predict_option( 231 self, 232 subjective_state: MinimalSubjectiveState, 233 option_id: OptionId, 234 ) -> ModelPrediction[MinimalSubjectiveState]: 235 return ModelPrediction( 236 predicted_subjective_state=subjective_state, 237 cumulative_reward=0.0, 238 steps=1, 239 ) 240 241 def add_or_replace_option_models( 242 self, models: Sequence[OptionModel[MinimalSubjectiveState]] 243 ) -> None: 244 pass 245 246 def remove_option_models(self, option_ids: Sequence[OptionId]) -> None: 247 pass
Trivial planner-facing model.
250class MinimalOptionModelLearner( 251 OptionModelLearner[MinimalSubjectiveState, Action, MinimalInfo] 252): 253 """No-op option-model learner.""" 254 255 def update( 256 self, 257 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 258 ) -> None: 259 pass 260 261 def export_models(self) -> Sequence[OptionModel[MinimalSubjectiveState]]: 262 return ()
No-op option-model learner.
265class MinimalPlanner(Planner[MinimalSubjectiveState, Action, MinimalInfo]): 266 """Returns one-step value targets without real search.""" 267 268 def plan_step( 269 self, 270 subjective_state: MinimalSubjectiveState, 271 model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo], 272 value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo], 273 budget: int, 274 ) -> PlanningUpdate[Action]: 275 return PlanningUpdate( 276 value_targets=value_function.predict(subjective_state), 277 policy_targets={"preferred_action": 0}, 278 search_statistics={"budget_used": budget}, 279 )
Returns one-step value targets without real search.
268 def plan_step( 269 self, 270 subjective_state: MinimalSubjectiveState, 271 model: WorldModel[MinimalSubjectiveState, Action, MinimalInfo], 272 value_function: ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo], 273 budget: int, 274 ) -> PlanningUpdate[Action]: 275 return PlanningUpdate( 276 value_targets=value_function.predict(subjective_state), 277 policy_targets={"preferred_action": 0}, 278 search_statistics={"budget_used": budget}, 279 )
287class MinimalValueEstimator( 288 ValueEstimator[MinimalSubjectiveState, Action, MinimalInfo] 289): 290 """Stores latest reward as the only value estimate.""" 291 292 def __init__(self) -> None: 293 self._value: float = 0.0 294 295 def list_general_value_functions( 296 self, 297 ) -> Sequence[ 298 GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo] 299 ]: 300 return () 301 302 def predict( 303 self, 304 subjective_state: MinimalSubjectiveState, 305 ) -> Mapping[GeneralValueFunctionId, float]: 306 return {"main": self._value} 307 308 def update( 309 self, 310 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 311 ) -> Mapping[GeneralValueFunctionId, float]: 312 self._value = transition.reward 313 return {"main": 0.0} 314 315 def add_or_replace( 316 self, 317 learner: GeneralValueFunctionLearner[ 318 MinimalSubjectiveState, Action, MinimalInfo 319 ], 320 ) -> None: 321 pass 322 323 def remove( 324 self, 325 general_value_function_ids: Sequence[GeneralValueFunctionId], 326 ) -> None: 327 pass
Stores latest reward as the only value estimate.
295 def list_general_value_functions( 296 self, 297 ) -> Sequence[ 298 GeneralValueFunctionLearner[MinimalSubjectiveState, Action, MinimalInfo] 299 ]: 300 return ()
Return all managed GVF learners.
Intended for Planner implementations that need to inspect
the GVF bank (e.g., to evaluate auxiliary predictions during
planning).
315 def add_or_replace( 316 self, 317 learner: GeneralValueFunctionLearner[ 318 MinimalSubjectiveState, Action, MinimalInfo 319 ], 320 ) -> None: 321 pass
Add or replace a GVF learner in the bank.
Used for dynamic GVF management, e.g., creating new GVFs when new subtasks or options are discovered.
330class MinimalUtilityAssessor(UtilityAssessor): 331 """Aggregates usage records into simple counts.""" 332 333 def __init__(self) -> None: 334 self._usage_records: list[UsageRecord] = [] 335 336 def observe(self, usage: Sequence[UsageRecord]) -> None: 337 self._usage_records.extend(usage) 338 339 def scores(self) -> Sequence[UtilityRecord]: 340 totals: dict[tuple[str, str], float] = {} 341 latest: dict[tuple[str, str], UsageRecord] = {} 342 for record in self._usage_records: 343 key = (record.kind.value, record.component_id) 344 totals[key] = totals.get(key, 0.0) + record.amount 345 latest[key] = record 346 return tuple( 347 UtilityRecord( 348 kind=record.kind, 349 component_id=record.component_id, 350 utility=totals[key], 351 ) 352 for key, record in latest.items() 353 )
Aggregates usage records into simple counts.
339 def scores(self) -> Sequence[UtilityRecord]: 340 totals: dict[tuple[str, str], float] = {} 341 latest: dict[tuple[str, str], UsageRecord] = {} 342 for record in self._usage_records: 343 key = (record.kind.value, record.component_id) 344 totals[key] = totals.get(key, 0.0) + record.amount 345 latest[key] = record 346 return tuple( 347 UtilityRecord( 348 kind=record.kind, 349 component_id=record.component_id, 350 utility=totals[key], 351 ) 352 for key, record in latest.items() 353 )
356class MinimalCurator(Curator): 357 """Never prunes.""" 358 359 def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision: 360 return CurationDecision()
Never prunes.
368@dataclass 369class MinimalOption(Option[MinimalSubjectiveState, Action]): 370 """Trivial option that always emits action=1 and stops immediately.""" 371 372 _descriptor: OptionDescriptor 373 _action: Action = 1 374 375 @property 376 def descriptor(self) -> OptionDescriptor: 377 return self._descriptor 378 379 def is_available(self, subjective_state: MinimalSubjectiveState) -> bool: 380 return True 381 382 def act(self, subjective_state: MinimalSubjectiveState) -> Action: 383 return self._action 384 385 def stop_probability(self, subjective_state: MinimalSubjectiveState) -> float: 386 return 1.0
Trivial option that always emits action=1 and stops immediately.
Whether this option can be initiated in the given state.
Intended for ActionSelector implementations, which receive
available options and may filter by initiation conditions.
389class MinimalActionSelector(ActionSelector[MinimalSubjectiveState, Action]): 390 """Alternates primitive actions and option selection.""" 391 392 def __init__(self) -> None: 393 self.last_td_errors: Mapping[GeneralValueFunctionId, float] = {} 394 self.last_planning_update: PlanningUpdate[Action] | None = None 395 396 def decide( 397 self, 398 subjective_state: MinimalSubjectiveState, 399 active_option: Option[MinimalSubjectiveState, Action] | None, 400 available_options: Sequence[Option[MinimalSubjectiveState, Action]], 401 ) -> PolicyDecision[Action]: 402 if subjective_state.observation % 2 == 0: 403 return PolicyDecision(action=0) 404 if available_options: 405 return PolicyDecision(option_id=available_options[0].descriptor.option_id) 406 return PolicyDecision(action=1) 407 408 def update_from_values( 409 self, 410 subjective_state: MinimalSubjectiveState, 411 td_errors: Mapping[GeneralValueFunctionId, float], 412 ) -> None: 413 self.last_td_errors = dict(td_errors) 414 415 def apply_planning_update(self, update: PlanningUpdate[Action]) -> None: 416 self.last_planning_update = update
Alternates primitive actions and option selection.
396 def decide( 397 self, 398 subjective_state: MinimalSubjectiveState, 399 active_option: Option[MinimalSubjectiveState, Action] | None, 400 available_options: Sequence[Option[MinimalSubjectiveState, Action]], 401 ) -> PolicyDecision[Action]: 402 if subjective_state.observation % 2 == 0: 403 return PolicyDecision(action=0) 404 if available_options: 405 return PolicyDecision(option_id=available_options[0].descriptor.option_id) 406 return PolicyDecision(action=1)
419class MinimalOptionLibrary(OptionLibrary[MinimalSubjectiveState, Action]): 420 """Stores learned options.""" 421 422 def __init__(self) -> None: 423 self._options: dict[OptionId, Option[MinimalSubjectiveState, Action]] = {} 424 425 def list_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]: 426 return tuple(self._options.values()) 427 428 def get(self, option_id: OptionId) -> Option[MinimalSubjectiveState, Action]: 429 return self._options[option_id] 430 431 def add_or_replace(self, option: Option[MinimalSubjectiveState, Action]) -> None: 432 self._options[option.descriptor.option_id] = option 433 434 def remove(self, option_ids: Sequence[OptionId]) -> None: 435 for option_id in option_ids: 436 self._options.pop(option_id, None)
Stores learned options.
439class MinimalOptionLearner(OptionLearner[MinimalSubjectiveState, Action, MinimalInfo]): 440 """Creates one trivial option per subtask.""" 441 442 def __init__(self) -> None: 443 self._subtasks: dict[SubtaskId, SubtaskSpec] = {} 444 self._options: dict[OptionId, MinimalOption] = {} 445 446 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 447 for subtask in subtasks: 448 self._subtasks[subtask.subtask_id] = subtask 449 option_id = f"option:{subtask.subtask_id}" 450 self._options[option_id] = MinimalOption( 451 OptionDescriptor( 452 option_id=option_id, 453 name=f"Option for {subtask.subtask_id}", 454 subtask_id=subtask.subtask_id, 455 ) 456 ) 457 458 def update( 459 self, 460 transition: Transition[Action, MinimalSubjectiveState, MinimalInfo], 461 ) -> None: 462 pass 463 464 def export_options(self) -> Sequence[Option[MinimalSubjectiveState, Action]]: 465 return tuple(self._options.values()) 466 467 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 468 for subtask_id in subtask_ids: 469 self._subtasks.pop(subtask_id, None) 470 self._options.pop(f"option:{subtask_id}", None)
Creates one trivial option per subtask.
446 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 447 for subtask in subtasks: 448 self._subtasks[subtask.subtask_id] = subtask 449 option_id = f"option:{subtask.subtask_id}" 450 self._options[option_id] = MinimalOption( 451 OptionDescriptor( 452 option_id=option_id, 453 name=f"Option for {subtask.subtask_id}", 454 subtask_id=subtask.subtask_id, 455 ) 456 )
478def build_minimal_agent() -> ( 479 OaKAgent[Observation, Action, MinimalSubjectiveState, MinimalInfo] 480): 481 """Construct a fully wired fine-grained smoke-test OaK agent.""" 482 perception = CompositePerception( 483 state_builder=MinimalStateBuilder(), 484 feature_bank=MinimalFeatureBank(), 485 feature_constructor=MinimalFeatureConstructor(), 486 feature_ranker=MinimalFeatureRanker(), 487 subtask_generator=MinimalSubtaskGenerator(), 488 ) 489 transition_model = CompositeTransitionModel( 490 world_model=MinimalWorldModel(), 491 option_model_learner=MinimalOptionModelLearner(), 492 planner=MinimalPlanner(), 493 ) 494 value_function = CompositeValueFunction( 495 value_estimator=MinimalValueEstimator(), 496 utility_assessor=MinimalUtilityAssessor(), 497 curator=MinimalCurator(), 498 ) 499 action_selector = MinimalActionSelector() 500 reactive_policy = CompositeReactivePolicy( 501 action_selector=action_selector, 502 option_library=MinimalOptionLibrary(), 503 option_learner=MinimalOptionLearner(), 504 ) 505 return OaKAgent( 506 perception=perception, 507 transition_model=transition_model, 508 value_function=value_function, 509 reactive_policy=reactive_policy, 510 planning_budget=4, 511 )
Construct a fully wired fine-grained smoke-test OaK agent.
514def run_minimal_episode(horizon: int = 5) -> list[MinimalTraceStep]: 515 """Run a short smoke episode and return a compact trace.""" 516 world = MinimalWorld(horizon=horizon) 517 agent = build_minimal_agent() 518 step = world.reset() 519 agent.reset() 520 521 trace: list[MinimalTraceStep] = [] 522 523 for _ in range(horizon): 524 result = agent.step(step) 525 action = result.action 526 trace.append( 527 { 528 "subjective_state": result.subjective_state, 529 "action": action, 530 "active_option_id": result.active_option_id, 531 "created_subtasks": [ 532 subtask.subtask_id for subtask in result.created_subtasks 533 ], 534 "planning_budget_used": _planning_budget_used(result.planning_update), 535 } 536 ) 537 step = world.step(action) 538 if step.terminated: 539 break 540 541 return trace
Run a short smoke episode and return a compact trace.