oak_architecture
1from .agent import OaKAgent 2from . import fine_grained 3from .interfaces import ( 4 ContinualLearner, 5 Perception, 6 ReactivePolicy, 7 TransitionModel, 8 ValueFunction, 9 World, 10) 11from .types import ( 12 AgentStepResult, 13 ComponentKind, 14 CurationDecision, 15 FeatureCandidate, 16 FeatureSpec, 17 GeneralValueFunctionSpec, 18 ModelPrediction, 19 OptionDescriptor, 20 PlanningUpdate, 21 PolicyDecision, 22 SubtaskSpec, 23 TimeStep, 24 Transition, 25 UsageRecord, 26 UtilityRecord, 27) 28 29__all__ = [ 30 # ── Continual-learning mixin ── 31 "ContinualLearner", 32 # ── The four main OaK interfaces ── 33 "Perception", 34 "TransitionModel", 35 "ValueFunction", 36 "ReactivePolicy", 37 # ── Agent ── 38 "OaKAgent", 39 # ── Environment ── 40 "World", 41 # ── Optional advanced assembly layer ── 42 "fine_grained", 43 # ── Shared types ── 44 "AgentStepResult", 45 "ComponentKind", 46 "CurationDecision", 47 "FeatureCandidate", 48 "FeatureSpec", 49 "GeneralValueFunctionSpec", 50 "ModelPrediction", 51 "OptionDescriptor", 52 "PlanningUpdate", 53 "PolicyDecision", 54 "SubtaskSpec", 55 "TimeStep", 56 "Transition", 57 "UsageRecord", 58 "UtilityRecord", 59]
Architecture Guide
Diagram Gallery
OaKAgent -> Composite* -> fine_grained interface used during that step.What You Must Implement
OaKAgent is the canonical coordinator. It is composed of exactly four
objects, one per Sutton module:
perceptionImplementsPerception. It receives raw environment data and must return the currentsubjective_state. It also manages feature discovery, ranking, and subtask generation.transition_modelImplementsTransitionModel. It learns from transitions, maintains option models, and runs bounded planning using the world model and value function.value_functionImplementsValueFunction. It learns fromTransitionobjects, predicts values, tracks utility of learned structures, and produces curation decisions.reactive_policyImplementsReactivePolicy. It selects actions (primitive or options), manages the option library and option learning, and integrates planning updates.
You also configure scalar controls:
planning_budgetfeature_budgetoption_stop_threshold
OaKAgent manages these runtime fields itself:
last_actionlast_subjective_state
Your environment must implement the World protocol (reset, step,
close) to use OaKAgent.train(). You can also drive the loop yourself
by supplying TimeStep objects to OaKAgent.step(...) directly.
Two Ways to Implement
Direct approach: implement the four main interfaces directly. Each of
your classes is a self-contained module. This is the simplest path and what
the examples/smoke/minimal_oak.py example demonstrates.
Composite approach: use the fine-grained component interfaces from
oak_architecture.fine_grained.components and wire them together using the
composites from oak_architecture.fine_grained.composites. This is for
projects that need to independently swap building blocks inside a module
(e.g. replace the planner without touching the world model). The
examples/smoke/minimal_oak_fine_grained.py example demonstrates this path with
the same toy behavior as the direct example.
| Main interface | Composite class | Fine-grained building blocks |
|---|---|---|
Perception |
CompositePerception |
StateBuilder, FeatureBank, FeatureConstructor, FeatureRanker, SubtaskGenerator |
TransitionModel |
CompositeTransitionModel |
WorldModel, OptionModelLearner, OptionModel, Planner |
ValueFunction |
CompositeValueFunction |
ValueEstimator, GeneralValueFunctionLearner, UtilityAssessor, Curator, MetaStepSizeLearner |
ReactivePolicy |
CompositeReactivePolicy |
ActionSelector, Option, OptionLibrary, OptionLearner, OptionKeyboard (optional) |
Diagram-to-Code Mapping
The diagrams have different jobs, but they all describe the same implementation:
oak_coreThe default conceptual slot map: OaKAgent plus the four main interfaces and the main data flow between them.oak_architectureThe fine-grained slot map: Composite modules, their delegated interfaces, and associated optional interfaces fromoak_architecture.fine_grained.components.oak_runtime_overviewThe top-level phase-by-phase sequence at the four-interface layer.oak_runtime_sequenceThe composite-wired per-step call order, showing only the fine-grained interfaces actually touched during onestep(...).
Recommended reading order for the diagrams:
- Read
oak_coreto understand the default four-interface surface. - Read
oak_runtime_overviewfor the six phases ofstep(...). - Read
oak_architectureto see how the optional fine-grained layer is assembled. - Read
oak_runtime_sequenceto trace one composite-wired execution path.
oak_runtime_overview and oak_runtime_sequence describe the same six
phases. The difference is only the level of expansion: oak_runtime_overview
stays at the four-interface layer, while oak_runtime_sequence shows what
happens when those slots are filled by the Composite* implementations from
oak_architecture.fine_grained.composites. If either diagram and the code
ever disagree, the documentation should be fixed.
The diagrams are intentionally runtime-oriented. They are not exhaustive
method inventories for the interfaces. For the full surface area
(reset, predict, current_subjective_state, OptionKeyboard, and so on),
use the API reference below. oak_architecture is the broadest inventory
view; oak_runtime_overview and oak_runtime_sequence are narrower and only
show what matters for one OaKAgent.step(...).
Step Walkthrough
Read the method as a pipeline. Each block below corresponds to the next
block of code in OaKAgent.step(...).
1. Perceive
subjective_state = self.perception.update(...)
time_step is the input. It carries observation, reward, terminated,
truncated, and optional info. perception must turn these into the
current subjective_state. Every later call in the step uses this
subjective_state, so your Perception implementation defines what the
agent actually reasons over.
2. Learn
td_errors = self.value_function.update(transition)
self.reactive_policy.update(transition, td_errors)
self.transition_model.update(transition)
Learning starts only once the agent has both a previous subjective_state
and a previous action. The first call to step(...) therefore sets up
memory but cannot yet build a full transition.
The Transition packages the previous/next subjective states, the action,
reward, the termination outcome, and optional info. All three modules
receive it.
value_function.update returns TD errors that reactive_policy.update
uses for policy improvement.
3. Grow
ranked_feature_ids = self.perception.discover_and_rank_features(...)
created_subtasks = self.perception.generate_subtasks(ranked_feature_ids)
self.reactive_policy.ingest_subtasks(created_subtasks)
self.reactive_policy.integrate_options()
self.transition_model.integrate_option_models()
perception proposes new features, ranks them by utility, and generates
subtasks from the most useful ones. reactive_policy turns subtasks into
options. transition_model integrates the latest option models so planning
can reason about them. In the overview diagram this appears as top-level
module calls; in the detailed sequence diagram the same phase is expanded into
Composite* -> fine_grained interface calls.
4. Plan
planning_update = self.transition_model.plan(
subjective_state, self.value_function, self.planning_budget
)
self.reactive_policy.apply_planning_update(planning_update)
transition_model.plan(...) receives the current subjective_state, the
value_function (for state evaluation during search), and a budget. It
returns a PlanningUpdate. reactive_policy is informed about the
planner's output before action selection.
5. Act
action, active_option_id = self.reactive_policy.select_action(
subjective_state, self.option_stop_threshold
)
The reactive policy either continues an active option or makes a fresh
decision. The output is always a primitive action, because that is what
the caller receives in AgentStepResult.
6. Maintain
self.value_function.observe_usage(usage_records)
curation_decision = self.value_function.curate()
self._apply_curation(curation_decision)
Usage records for ranked features and the active option are sent to the
value function for utility tracking. The value function then decides what
to prune. _apply_curation(...) dispatches the decision to the relevant
modules: perception.remove_features(...), reactive_policy.remove_options(...),
reactive_policy.remove_subtasks(...), transition_model.remove_option_models(...),
value_function.remove(...).
Training Loop
OaKAgent.train() provides a standard episode loop so implementations
don't need to rewrite the reset/step/terminate boilerplate:
agent = build_my_agent()
world = MyWorld() # must implement the World protocol
def log_episode(episode, reward, avg_reward, agent):
if episode % 10 == 0:
print(f"episode={episode} reward={reward:.1f} avg={avg_reward:.1f}")
rewards = agent.train(
world,
num_episodes=500,
solved_threshold=475.0, # optional early stopping
episode_logger=log_episode,
)
world.close()
The World protocol requires three methods:
reset() -> TimeStep-- start a new episodestep(action) -> TimeStep-- advance one stepclose() -> None-- release resources (can be a no-op)
If you need custom per-episode logging, pass episode_logger(...). If you
need a fully custom training loop (non-episodic environments, multi-agent
setups, custom control flow), call agent.step(time_step) directly instead.
Implementation Order
If your goal is to get a working agent quickly, implement in this order:
- Make
Perceptionproduce a usefulsubjective_statefromTimeStep. Havediscover_and_rank_featuresreturn a fixed list andgenerate_subtasksreturn empty. - Make
ReactivePolicyreturn valid actions fromselect_action. Have the other methods be no-ops. - Make
ValueFunctionacceptupdateand returnpredictvalues. Havecuratereturn an emptyCurationDecision. - Make
TransitionModelacceptupdateand return a validPlanningUpdatefromplan, even if trivial.
That is enough to satisfy the exact call sequence of OaKAgent.step(...).
After that, you can improve learning quality without changing the basic
wiring.
Repository Examples
The concrete implementations live outside oak_architecture on purpose.
That shows the intended usage pattern: the package provides the canonical
OaKAgent coordinator and interfaces, while downstream code provides the
implementations. The generated docs now include the repository-level
examples package alongside the core oak_architecture API.
examples/smoke/minimal_oak.pyA full smoke-path implementation using the direct approach. Each of the four interfaces is implemented as a single class with intentionally small behavior.examples/smoke/minimal_oak_fine_grained.pyThe same toy environment built from the fine-grained composite building blocks instead of direct interface implementations.examples/cartpole/A fuller learning agent that exercises discovery, perception, planning, value learning, and reactive control together.
To run all repository smoke tests, including the minimal example:
pixi run tests
To inspect the smallest runnable example directly:
from examples import build_minimal_agent, run_minimal_episode
agent = build_minimal_agent()
trace = run_minimal_episode(horizon=5)
run_minimal_episode(...) returns a compact trace with the
subjective_state, primitive action, active_option_id,
created_subtasks, and planner output at each step.
Design Constraints
Keep these constraints in mind when you replace the minimal pieces with real ones:
Perceptionshould define a usefulsubjective_statefor the domain. The rest of the agent only sees that representation.ReactivePolicyshould stay focused on choosing between primitive actions and options. It should not absorb the work of planning or prediction.ValueFunctionshould start with one meaningful predictive target before you expand to many General Value Functions.TransitionModelshould make honest predictions. Bounded planning becomes misleading quickly if the model invents certainty it does not have.ValueFunction.curate()should stay conservative until you have stable evidence that a learned structure is safely removable.
API Documentation
85class ContinualLearner: 86 """Mixin for modules whose weights are adapted by meta-learned step sizes. 87 88 In Sutton's OaK architecture, every learned weight has a dedicated 89 step-size parameter adapted via online cross-validation (e.g. IDBD, 90 Sutton 1992; Adam-IDBD, Degris et al. 2024). 91 92 The agent loop calls `update_meta()` on all four modules after each 93 learning step, passing the same error-signals dict. Each module 94 internally decides which signals are relevant and routes them to its 95 per-weight step-size adaptation. 96 97 The default implementation is a no-op so that modules without 98 meta-learning still work unchanged. 99 """ 100 101 def update_meta(self, error_signals: Mapping[str, float]) -> None: 102 """Adapt internal per-weight step sizes given error signals. 103 104 Parameters 105 ---------- 106 error_signals: 107 Named scalar error signals from the current learning step, 108 e.g. `{"main_td_error": 0.05, "reward": 1.0}`. 109 Implementations pick the signals they need and ignore the rest. 110 """
Mixin for modules whose weights are adapted by meta-learned step sizes.
In Sutton's OaK architecture, every learned weight has a dedicated step-size parameter adapted via online cross-validation (e.g. IDBD, Sutton 1992; Adam-IDBD, Degris et al. 2024).
The agent loop calls update_meta() on all four modules after each
learning step, passing the same error-signals dict. Each module
internally decides which signals are relevant and routes them to its
per-weight step-size adaptation.
The default implementation is a no-op so that modules without meta-learning still work unchanged.
101 def update_meta(self, error_signals: Mapping[str, float]) -> None: 102 """Adapt internal per-weight step sizes given error signals. 103 104 Parameters 105 ---------- 106 error_signals: 107 Named scalar error signals from the current learning step, 108 e.g. `{"main_td_error": 0.05, "reward": 1.0}`. 109 Implementations pick the signals they need and ignore the rest. 110 """
Adapt internal per-weight step sizes given error signals.
Parameters
error_signals:
Named scalar error signals from the current learning step,
e.g. {"main_td_error": 0.05, "reward": 1.0}.
Implementations pick the signals they need and ignore the rest.
118class Perception( 119 ContinualLearner, ABC, Generic[ObservationT, ActionT, SubjectiveStateT] 120): 121 """Sutton's Perception: observations → subjective state + feature management. 122 123 Turns raw observations into the agent's **subjective state**, the 124 internal representation that every other module sees. Also discovers, 125 ranks, and manages **features** (learned representational structures 126 that grow over the agent's lifetime) and generates **subtasks** from 127 the most useful ones. 128 129 The finer-grained layer splits this into `StateBuilder`, 130 `FeatureBank`, `FeatureConstructor`, `FeatureRanker`, and 131 `SubtaskGenerator` (see `oak_architecture.fine_grained.components`). 132 """ 133 134 @abstractmethod 135 def reset(self) -> None: 136 """Reset all perception state for a new episode.""" 137 raise NotImplementedError 138 139 @abstractmethod 140 def update( 141 self, 142 observation: ObservationT, 143 reward: float, 144 last_action: ActionT | None, 145 ) -> SubjectiveStateT: 146 """Process a new observation and return the updated subjective state.""" 147 raise NotImplementedError 148 149 @abstractmethod 150 def current_subjective_state(self) -> SubjectiveStateT: 151 """Return the most recently computed subjective state.""" 152 raise NotImplementedError 153 154 @abstractmethod 155 def discover_and_rank_features( 156 self, 157 subjective_state: SubjectiveStateT, 158 utility_scores: Sequence[UtilityRecord], 159 feature_budget: int, 160 ) -> Sequence[FeatureId]: 161 """Propose new features, integrate them, and return the top-ranked IDs. 162 163 A typical implementation: 164 165 1. Proposes candidate features from the current subjective state. 166 2. Adds accepted candidates to its internal feature store. 167 3. Ranks all features using the provided utility scores. 168 4. Returns the top feature IDs (up to *feature_budget*). 169 """ 170 raise NotImplementedError 171 172 @abstractmethod 173 def generate_subtasks( 174 self, 175 ranked_feature_ids: Sequence[FeatureId], 176 ) -> Sequence[SubtaskSpec]: 177 """Turn ranked feature IDs into subtask specifications.""" 178 raise NotImplementedError 179 180 @abstractmethod 181 def list_features(self) -> Sequence[FeatureSpec]: 182 """Return all currently tracked features.""" 183 raise NotImplementedError 184 185 @abstractmethod 186 def remove_features(self, feature_ids: Sequence[FeatureId]) -> None: 187 """Remove features by ID (called during curation).""" 188 raise NotImplementedError
Sutton's Perception: observations → subjective state + feature management.
Turns raw observations into the agent's subjective state, the internal representation that every other module sees. Also discovers, ranks, and manages features (learned representational structures that grow over the agent's lifetime) and generates subtasks from the most useful ones.
The finer-grained layer splits this into StateBuilder,
FeatureBank, FeatureConstructor, FeatureRanker, and
SubtaskGenerator (see oak_architecture.fine_grained.components).
134 @abstractmethod 135 def reset(self) -> None: 136 """Reset all perception state for a new episode.""" 137 raise NotImplementedError
Reset all perception state for a new episode.
139 @abstractmethod 140 def update( 141 self, 142 observation: ObservationT, 143 reward: float, 144 last_action: ActionT | None, 145 ) -> SubjectiveStateT: 146 """Process a new observation and return the updated subjective state.""" 147 raise NotImplementedError
Process a new observation and return the updated subjective state.
149 @abstractmethod 150 def current_subjective_state(self) -> SubjectiveStateT: 151 """Return the most recently computed subjective state.""" 152 raise NotImplementedError
Return the most recently computed subjective state.
154 @abstractmethod 155 def discover_and_rank_features( 156 self, 157 subjective_state: SubjectiveStateT, 158 utility_scores: Sequence[UtilityRecord], 159 feature_budget: int, 160 ) -> Sequence[FeatureId]: 161 """Propose new features, integrate them, and return the top-ranked IDs. 162 163 A typical implementation: 164 165 1. Proposes candidate features from the current subjective state. 166 2. Adds accepted candidates to its internal feature store. 167 3. Ranks all features using the provided utility scores. 168 4. Returns the top feature IDs (up to *feature_budget*). 169 """ 170 raise NotImplementedError
Propose new features, integrate them, and return the top-ranked IDs.
A typical implementation:
- Proposes candidate features from the current subjective state.
- Adds accepted candidates to its internal feature store.
- Ranks all features using the provided utility scores.
- Returns the top feature IDs (up to feature_budget).
172 @abstractmethod 173 def generate_subtasks( 174 self, 175 ranked_feature_ids: Sequence[FeatureId], 176 ) -> Sequence[SubtaskSpec]: 177 """Turn ranked feature IDs into subtask specifications.""" 178 raise NotImplementedError
Turn ranked feature IDs into subtask specifications.
180 @abstractmethod 181 def list_features(self) -> Sequence[FeatureSpec]: 182 """Return all currently tracked features.""" 183 raise NotImplementedError
Return all currently tracked features.
185 @abstractmethod 186 def remove_features(self, feature_ids: Sequence[FeatureId]) -> None: 187 """Remove features by ID (called during curation).""" 188 raise NotImplementedError
Remove features by ID (called during curation).
245class TransitionModel(ContinualLearner, ABC, Generic[SubjectiveStateT, ActionT, InfoT]): 246 """Sutton's Transition Model: world dynamics + option models + planning. 247 248 Learns from observed transitions, maintains **option models** that 249 predict the effect of temporal abstractions, and runs bounded 250 **planning** using the world model and the value function to produce 251 improvement signals for the reactive policy. 252 253 The finer-grained layer splits this into `WorldModel`, 254 `OptionModelLearner`, individual `OptionModel` objects, and a 255 `Planner` (see `oak_architecture.fine_grained.components`). 256 """ 257 258 @abstractmethod 259 def update( 260 self, 261 transition: Transition[ActionT, SubjectiveStateT, InfoT], 262 ) -> None: 263 """Learn from an observed transition. 264 265 This should update both the world model and any option-model learners. 266 """ 267 raise NotImplementedError 268 269 @abstractmethod 270 def integrate_option_models(self) -> None: 271 """Export learned option models and integrate them into the world model. 272 273 Called after option learning so that planning reasons over fresh models. 274 """ 275 raise NotImplementedError 276 277 @abstractmethod 278 def plan( 279 self, 280 subjective_state: SubjectiveStateT, 281 value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT], 282 budget: int, 283 ) -> PlanningUpdate[ActionT]: 284 """Run bounded planning and return improvement signals. 285 286 The planner uses the internal world model together with the supplied 287 *value_function* (for state evaluation) to produce value targets, 288 policy targets, or search statistics. 289 """ 290 raise NotImplementedError 291 292 @abstractmethod 293 def remove_option_models(self, option_ids: Sequence[OptionId]) -> None: 294 """Remove option models by ID (called during curation).""" 295 raise NotImplementedError
Sutton's Transition Model: world dynamics + option models + planning.
Learns from observed transitions, maintains option models that predict the effect of temporal abstractions, and runs bounded planning using the world model and the value function to produce improvement signals for the reactive policy.
The finer-grained layer splits this into WorldModel,
OptionModelLearner, individual OptionModel objects, and a
Planner (see oak_architecture.fine_grained.components).
258 @abstractmethod 259 def update( 260 self, 261 transition: Transition[ActionT, SubjectiveStateT, InfoT], 262 ) -> None: 263 """Learn from an observed transition. 264 265 This should update both the world model and any option-model learners. 266 """ 267 raise NotImplementedError
Learn from an observed transition.
This should update both the world model and any option-model learners.
269 @abstractmethod 270 def integrate_option_models(self) -> None: 271 """Export learned option models and integrate them into the world model. 272 273 Called after option learning so that planning reasons over fresh models. 274 """ 275 raise NotImplementedError
Export learned option models and integrate them into the world model.
Called after option learning so that planning reasons over fresh models.
277 @abstractmethod 278 def plan( 279 self, 280 subjective_state: SubjectiveStateT, 281 value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT], 282 budget: int, 283 ) -> PlanningUpdate[ActionT]: 284 """Run bounded planning and return improvement signals. 285 286 The planner uses the internal world model together with the supplied 287 *value_function* (for state evaluation) to produce value targets, 288 policy targets, or search statistics. 289 """ 290 raise NotImplementedError
Run bounded planning and return improvement signals.
The planner uses the internal world model together with the supplied value_function (for state evaluation) to produce value targets, policy targets, or search statistics.
292 @abstractmethod 293 def remove_option_models(self, option_ids: Sequence[OptionId]) -> None: 294 """Remove option models by ID (called during curation).""" 295 raise NotImplementedError
Remove option models by ID (called during curation).
191class ValueFunction(ContinualLearner, ABC, Generic[SubjectiveStateT, ActionT, InfoT]): 192 """Sutton's Value Function: value learning + utility assessment + curation. 193 194 Learns **predictive value signals** (TD errors, GVF predictions) from 195 observed transitions and predicts cumulative signals for any given 196 subjective state. Also assesses the **utility** of the agent's learned 197 structures (features, options, models) and produces concrete keep/drop 198 **curation** decisions. 199 200 The finer-grained layer splits this into `ValueEstimator`, 201 `GeneralValueFunctionLearner`, `UtilityAssessor`, `Curator`, and 202 `MetaStepSizeLearner` (see `oak_architecture.fine_grained.components`). 203 """ 204 205 @abstractmethod 206 def update( 207 self, 208 transition: Transition[ActionT, SubjectiveStateT, InfoT], 209 ) -> Mapping[GeneralValueFunctionId, float]: 210 """Learn from a transition and return TD-error signals.""" 211 raise NotImplementedError 212 213 @abstractmethod 214 def predict( 215 self, 216 subjective_state: SubjectiveStateT, 217 ) -> Mapping[GeneralValueFunctionId, float]: 218 """Predict values for the given subjective state.""" 219 raise NotImplementedError 220 221 @abstractmethod 222 def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None: 223 """Record usage evidence for utility assessment.""" 224 raise NotImplementedError 225 226 @abstractmethod 227 def utility_scores(self) -> Sequence[UtilityRecord]: 228 """Return current utility estimates for all tracked structures.""" 229 raise NotImplementedError 230 231 @abstractmethod 232 def curate(self) -> CurationDecision: 233 """Decide which learned structures to drop.""" 234 raise NotImplementedError 235 236 @abstractmethod 237 def remove( 238 self, 239 general_value_function_ids: Sequence[GeneralValueFunctionId], 240 ) -> None: 241 """Remove value functions by ID (called during curation).""" 242 raise NotImplementedError
Sutton's Value Function: value learning + utility assessment + curation.
Learns predictive value signals (TD errors, GVF predictions) from observed transitions and predicts cumulative signals for any given subjective state. Also assesses the utility of the agent's learned structures (features, options, models) and produces concrete keep/drop curation decisions.
The finer-grained layer splits this into ValueEstimator,
GeneralValueFunctionLearner, UtilityAssessor, Curator, and
MetaStepSizeLearner (see oak_architecture.fine_grained.components).
205 @abstractmethod 206 def update( 207 self, 208 transition: Transition[ActionT, SubjectiveStateT, InfoT], 209 ) -> Mapping[GeneralValueFunctionId, float]: 210 """Learn from a transition and return TD-error signals.""" 211 raise NotImplementedError
Learn from a transition and return TD-error signals.
213 @abstractmethod 214 def predict( 215 self, 216 subjective_state: SubjectiveStateT, 217 ) -> Mapping[GeneralValueFunctionId, float]: 218 """Predict values for the given subjective state.""" 219 raise NotImplementedError
Predict values for the given subjective state.
221 @abstractmethod 222 def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None: 223 """Record usage evidence for utility assessment.""" 224 raise NotImplementedError
Record usage evidence for utility assessment.
226 @abstractmethod 227 def utility_scores(self) -> Sequence[UtilityRecord]: 228 """Return current utility estimates for all tracked structures.""" 229 raise NotImplementedError
Return current utility estimates for all tracked structures.
231 @abstractmethod 232 def curate(self) -> CurationDecision: 233 """Decide which learned structures to drop.""" 234 raise NotImplementedError
Decide which learned structures to drop.
236 @abstractmethod 237 def remove( 238 self, 239 general_value_function_ids: Sequence[GeneralValueFunctionId], 240 ) -> None: 241 """Remove value functions by ID (called during curation).""" 242 raise NotImplementedError
Remove value functions by ID (called during curation).
298class ReactivePolicy(ContinualLearner, ABC, Generic[SubjectiveStateT, ActionT, InfoT]): 299 """Sutton's Reactive Policy: action selection + option management. 300 301 Selects **actions**, primitive or temporal abstractions (options), 302 based on the current subjective state. Manages the **option library** 303 and **option learning** pipeline, and integrates **planning updates** 304 into decision-making. 305 306 The finer-grained layer splits this into `ActionSelector`, 307 `OptionLibrary`, and `OptionLearner` 308 (see `oak_architecture.fine_grained.components`). 309 """ 310 311 @abstractmethod 312 def update( 313 self, 314 transition: Transition[ActionT, SubjectiveStateT, InfoT], 315 td_errors: Mapping[GeneralValueFunctionId, float], 316 ) -> None: 317 """Update the policy and option learners from an observed transition.""" 318 raise NotImplementedError 319 320 @abstractmethod 321 def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None: 322 """Integrate planning improvement signals into the policy.""" 323 raise NotImplementedError 324 325 @abstractmethod 326 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 327 """Feed newly created subtasks into the option learner.""" 328 raise NotImplementedError 329 330 @abstractmethod 331 def integrate_options(self) -> None: 332 """Export learned options into the option library.""" 333 raise NotImplementedError 334 335 @abstractmethod 336 def select_action( 337 self, 338 subjective_state: SubjectiveStateT, 339 option_stop_threshold: float, 340 ) -> tuple[ActionT, OptionId | None]: 341 """Choose a primitive action, possibly by continuing an active option. 342 343 Returns a `(primitive_action, active_option_id)` pair. When no 344 option is active, *active_option_id* is `None`. 345 """ 346 raise NotImplementedError 347 348 @abstractmethod 349 def clear_active_option(self) -> None: 350 """Clear the currently executing option (e.g. at episode boundaries).""" 351 raise NotImplementedError 352 353 @abstractmethod 354 def remove_options(self, option_ids: Sequence[OptionId]) -> None: 355 """Remove options by ID (called during curation).""" 356 raise NotImplementedError 357 358 @abstractmethod 359 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 360 """Remove subtasks by ID (called during curation).""" 361 raise NotImplementedError
Sutton's Reactive Policy: action selection + option management.
Selects actions, primitive or temporal abstractions (options), based on the current subjective state. Manages the option library and option learning pipeline, and integrates planning updates into decision-making.
The finer-grained layer splits this into ActionSelector,
OptionLibrary, and OptionLearner
(see oak_architecture.fine_grained.components).
311 @abstractmethod 312 def update( 313 self, 314 transition: Transition[ActionT, SubjectiveStateT, InfoT], 315 td_errors: Mapping[GeneralValueFunctionId, float], 316 ) -> None: 317 """Update the policy and option learners from an observed transition.""" 318 raise NotImplementedError
Update the policy and option learners from an observed transition.
320 @abstractmethod 321 def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None: 322 """Integrate planning improvement signals into the policy.""" 323 raise NotImplementedError
Integrate planning improvement signals into the policy.
325 @abstractmethod 326 def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None: 327 """Feed newly created subtasks into the option learner.""" 328 raise NotImplementedError
Feed newly created subtasks into the option learner.
330 @abstractmethod 331 def integrate_options(self) -> None: 332 """Export learned options into the option library.""" 333 raise NotImplementedError
Export learned options into the option library.
335 @abstractmethod 336 def select_action( 337 self, 338 subjective_state: SubjectiveStateT, 339 option_stop_threshold: float, 340 ) -> tuple[ActionT, OptionId | None]: 341 """Choose a primitive action, possibly by continuing an active option. 342 343 Returns a `(primitive_action, active_option_id)` pair. When no 344 option is active, *active_option_id* is `None`. 345 """ 346 raise NotImplementedError
Choose a primitive action, possibly by continuing an active option.
Returns a (primitive_action, active_option_id) pair. When no
option is active, active_option_id is None.
348 @abstractmethod 349 def clear_active_option(self) -> None: 350 """Clear the currently executing option (e.g. at episode boundaries).""" 351 raise NotImplementedError
Clear the currently executing option (e.g. at episode boundaries).
353 @abstractmethod 354 def remove_options(self, option_ids: Sequence[OptionId]) -> None: 355 """Remove options by ID (called during curation).""" 356 raise NotImplementedError
Remove options by ID (called during curation).
358 @abstractmethod 359 def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None: 360 """Remove subtasks by ID (called during curation).""" 361 raise NotImplementedError
Remove subtasks by ID (called during curation).
51@dataclass 52class OaKAgent(Generic[ObservationT, ActionT, SubjectiveStateT, InfoT]): 53 """Coordinates one full OaK step across the four modules. 54 55 The agent is a wiring object: you provide concrete implementations of 56 `Perception`, `TransitionModel`, `ValueFunction`, and 57 `ReactivePolicy`, and `OaKAgent` ensures they are called in a 58 consistent order. 59 """ 60 61 perception: Perception[ObservationT, ActionT, SubjectiveStateT] 62 transition_model: TransitionModel[SubjectiveStateT, ActionT, InfoT] 63 value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT] 64 reactive_policy: ReactivePolicy[SubjectiveStateT, ActionT, InfoT] 65 66 planning_budget: int = 4 67 feature_budget: int = 4 68 option_stop_threshold: float = 0.5 69 70 last_action: ActionT | None = None 71 last_subjective_state: SubjectiveStateT | None = None 72 73 def __init__( 74 self, 75 perception: Perception[ObservationT, ActionT, SubjectiveStateT], 76 transition_model: TransitionModel[SubjectiveStateT, ActionT, InfoT], 77 value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT], 78 reactive_policy: ReactivePolicy[SubjectiveStateT, ActionT, InfoT], 79 planning_budget: int = 4, 80 feature_budget: int = 4, 81 option_stop_threshold: float = 0.5, 82 ): 83 self.perception = perception 84 self.transition_model = transition_model 85 self.value_function = value_function 86 self.reactive_policy = reactive_policy 87 self.planning_budget = planning_budget 88 self.feature_budget = feature_budget 89 self.option_stop_threshold = option_stop_threshold 90 self.last_action = None 91 self.last_subjective_state = None 92 93 def __post_init__(self): 94 """Validate that the modules are compatible.""" 95 if self.planning_budget < 1: 96 raise ValueError("Planning budget must be at least 1.") 97 if self.feature_budget < 1: 98 raise ValueError("Feature budget must be at least 1.") 99 if self.option_stop_threshold < 0 or self.option_stop_threshold > 1: 100 raise ValueError("Option stop threshold must be in [0, 1].") 101 102 def reset(self) -> None: 103 """Clear transient execution memory.""" 104 self.perception.reset() 105 self.reactive_policy.clear_active_option() 106 self.last_action = None 107 self.last_subjective_state = None 108 109 def step( 110 self, time_step: TimeStep[ObservationT, InfoT] 111 ) -> AgentStepResult[ActionT, SubjectiveStateT]: 112 """Run one temporally uniform agent step. 113 114 The step follows six phases: perceive, learn, grow, plan, act, maintain. 115 """ 116 117 # ================== 1. Perceive ================= # 118 subjective_state = self.perception.update( 119 observation=time_step.observation, 120 reward=time_step.reward, 121 last_action=self.last_action, 122 ) 123 124 created_subtasks: Sequence[SubtaskSpec] = () 125 ranked_feature_ids: Sequence[FeatureId] = () 126 planning_update: PlanningUpdate[ActionT] | None = None 127 curation_decision: CurationDecision | None = None 128 129 # ================== 2. Learn ================== # 130 if self.last_subjective_state is not None and self.last_action is not None: 131 transition = Transition( 132 subjective_state=self.last_subjective_state, 133 action=self.last_action, 134 reward=time_step.reward, 135 next_subjective_state=subjective_state, 136 terminated=time_step.terminated or time_step.truncated, 137 info=time_step.info, 138 ) 139 td_errors = self.value_function.update(transition) 140 self.reactive_policy.update(transition, td_errors) 141 self.transition_model.update(transition) 142 143 # Meta step-size adaptation (Sutton's IDBD / online cross-validation) 144 meta_signals = dict(td_errors) 145 meta_signals["reward"] = transition.reward 146 self.perception.update_meta(meta_signals) 147 self.value_function.update_meta(meta_signals) 148 self.reactive_policy.update_meta(meta_signals) 149 self.transition_model.update_meta(meta_signals) 150 151 # ================== 3. Grow ================== # 152 ranked_feature_ids = self.perception.discover_and_rank_features( 153 subjective_state, 154 self.value_function.utility_scores(), 155 self.feature_budget, 156 ) 157 if ranked_feature_ids: 158 created_subtasks = self.perception.generate_subtasks(ranked_feature_ids) 159 if created_subtasks: 160 self.reactive_policy.ingest_subtasks(created_subtasks) 161 162 self.reactive_policy.integrate_options() 163 self.transition_model.integrate_option_models() 164 165 # ================== 4. Plan ================== # 166 planning_update = self.transition_model.plan( 167 subjective_state, self.value_function, self.planning_budget 168 ) 169 self.reactive_policy.apply_planning_update(planning_update) 170 171 # ================== 5. Act ================== # 172 action, active_option_id = self.reactive_policy.select_action( 173 subjective_state, self.option_stop_threshold 174 ) 175 176 # ================== 6. Maintain ================== # 177 usage_records = self._build_usage_records(ranked_feature_ids, active_option_id) 178 if usage_records: 179 self.value_function.observe_usage(usage_records) 180 181 curation_decision = self.value_function.curate() 182 self._apply_curation(curation_decision) 183 184 # ================== Update Memory ================== # 185 self.last_subjective_state = subjective_state 186 self.last_action = action 187 188 if time_step.terminated or time_step.truncated: 189 self.reactive_policy.clear_active_option() 190 191 return AgentStepResult( 192 action=action, 193 subjective_state=subjective_state, 194 active_option_id=active_option_id, 195 planning_update=planning_update, 196 created_subtasks=created_subtasks, 197 curation_decision=curation_decision, 198 ) 199 200 # ── training loop ───────────────────────────────────────────────── 201 202 def train( 203 self, 204 world: World[ObservationT, ActionT, InfoT], 205 *, # enforce keyword arguments after for clarity 206 num_episodes: int = 500, 207 average_window: int = 100, 208 solved_threshold: float | None = None, 209 episode_logger: Callable[[int, float, float, Self], None] | None = None, 210 ) -> list[float]: 211 """Run the standard OaK episode loop on the given world. 212 213 Parameters 214 ---------- 215 world: 216 An environment implementing the `World` protocol. 217 num_episodes: 218 Maximum number of training episodes. 219 average_window: 220 Number of recent episodes to average for performance tracking. 221 solved_threshold: 222 If set, stop early when the average reward over the last `average_window` 223 episodes reaches this value. 224 episode_logger: 225 Optional callback `(episode, episode_reward, avg_reward, agent)` 226 called after each episode. Use this to own all per-episode logging 227 or other training-side effects at the call site. 228 229 Returns 230 ------- 231 list[float] 232 Per-episode reward history. 233 """ 234 if average_window < 1: 235 raise ValueError("average_window must be at least 1.") 236 237 reward_history: list[float] = [] 238 239 for episode in range(num_episodes): 240 time_step = world.reset() 241 self.reset() 242 episode_reward = 0.0 243 244 while True: 245 result = self.step(time_step) 246 247 if time_step.terminated or time_step.truncated: 248 break 249 250 time_step = world.step(result.action) 251 episode_reward += time_step.reward 252 253 reward_history.append(episode_reward) 254 recent_window = reward_history[-average_window:] 255 avg_reward = sum(recent_window) / len(recent_window) 256 257 solved = ( 258 solved_threshold is not None 259 and len(reward_history) >= average_window 260 and avg_reward >= solved_threshold 261 ) 262 263 if episode_logger is not None: 264 episode_logger(episode, episode_reward, avg_reward, self) 265 266 if solved: 267 break 268 269 return reward_history 270 271 # ── private helpers ────────────────────────────────────────────── 272 273 def _build_usage_records( 274 self, 275 ranked_feature_ids: Sequence[FeatureId], 276 active_option_id: OptionId | None, 277 ) -> Sequence[UsageRecord]: 278 """Build minimal utility-accounting observations for the current step.""" 279 usage_records = [ 280 UsageRecord(ComponentKind.FEATURE, feature_id) 281 for feature_id in ranked_feature_ids 282 ] 283 if active_option_id is not None: 284 usage_records.append(UsageRecord(ComponentKind.OPTION, active_option_id)) 285 return tuple(usage_records) 286 287 def _apply_curation(self, decision: CurationDecision) -> None: 288 """Dispatch curation decisions to the relevant modules.""" 289 if decision.drop_features: 290 self.perception.remove_features(decision.drop_features) 291 if decision.drop_subtasks: 292 self.reactive_policy.remove_subtasks(decision.drop_subtasks) 293 if decision.drop_options: 294 self.reactive_policy.remove_options(decision.drop_options) 295 if decision.drop_option_models: 296 self.transition_model.remove_option_models(decision.drop_option_models) 297 if decision.drop_general_value_functions: 298 self.value_function.remove(decision.drop_general_value_functions)
Coordinates one full OaK step across the four modules.
The agent is a wiring object: you provide concrete implementations of
Perception, TransitionModel, ValueFunction, and
ReactivePolicy, and OaKAgent ensures they are called in a
consistent order.
73 def __init__( 74 self, 75 perception: Perception[ObservationT, ActionT, SubjectiveStateT], 76 transition_model: TransitionModel[SubjectiveStateT, ActionT, InfoT], 77 value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT], 78 reactive_policy: ReactivePolicy[SubjectiveStateT, ActionT, InfoT], 79 planning_budget: int = 4, 80 feature_budget: int = 4, 81 option_stop_threshold: float = 0.5, 82 ): 83 self.perception = perception 84 self.transition_model = transition_model 85 self.value_function = value_function 86 self.reactive_policy = reactive_policy 87 self.planning_budget = planning_budget 88 self.feature_budget = feature_budget 89 self.option_stop_threshold = option_stop_threshold 90 self.last_action = None 91 self.last_subjective_state = None
102 def reset(self) -> None: 103 """Clear transient execution memory.""" 104 self.perception.reset() 105 self.reactive_policy.clear_active_option() 106 self.last_action = None 107 self.last_subjective_state = None
Clear transient execution memory.
109 def step( 110 self, time_step: TimeStep[ObservationT, InfoT] 111 ) -> AgentStepResult[ActionT, SubjectiveStateT]: 112 """Run one temporally uniform agent step. 113 114 The step follows six phases: perceive, learn, grow, plan, act, maintain. 115 """ 116 117 # ================== 1. Perceive ================= # 118 subjective_state = self.perception.update( 119 observation=time_step.observation, 120 reward=time_step.reward, 121 last_action=self.last_action, 122 ) 123 124 created_subtasks: Sequence[SubtaskSpec] = () 125 ranked_feature_ids: Sequence[FeatureId] = () 126 planning_update: PlanningUpdate[ActionT] | None = None 127 curation_decision: CurationDecision | None = None 128 129 # ================== 2. Learn ================== # 130 if self.last_subjective_state is not None and self.last_action is not None: 131 transition = Transition( 132 subjective_state=self.last_subjective_state, 133 action=self.last_action, 134 reward=time_step.reward, 135 next_subjective_state=subjective_state, 136 terminated=time_step.terminated or time_step.truncated, 137 info=time_step.info, 138 ) 139 td_errors = self.value_function.update(transition) 140 self.reactive_policy.update(transition, td_errors) 141 self.transition_model.update(transition) 142 143 # Meta step-size adaptation (Sutton's IDBD / online cross-validation) 144 meta_signals = dict(td_errors) 145 meta_signals["reward"] = transition.reward 146 self.perception.update_meta(meta_signals) 147 self.value_function.update_meta(meta_signals) 148 self.reactive_policy.update_meta(meta_signals) 149 self.transition_model.update_meta(meta_signals) 150 151 # ================== 3. Grow ================== # 152 ranked_feature_ids = self.perception.discover_and_rank_features( 153 subjective_state, 154 self.value_function.utility_scores(), 155 self.feature_budget, 156 ) 157 if ranked_feature_ids: 158 created_subtasks = self.perception.generate_subtasks(ranked_feature_ids) 159 if created_subtasks: 160 self.reactive_policy.ingest_subtasks(created_subtasks) 161 162 self.reactive_policy.integrate_options() 163 self.transition_model.integrate_option_models() 164 165 # ================== 4. Plan ================== # 166 planning_update = self.transition_model.plan( 167 subjective_state, self.value_function, self.planning_budget 168 ) 169 self.reactive_policy.apply_planning_update(planning_update) 170 171 # ================== 5. Act ================== # 172 action, active_option_id = self.reactive_policy.select_action( 173 subjective_state, self.option_stop_threshold 174 ) 175 176 # ================== 6. Maintain ================== # 177 usage_records = self._build_usage_records(ranked_feature_ids, active_option_id) 178 if usage_records: 179 self.value_function.observe_usage(usage_records) 180 181 curation_decision = self.value_function.curate() 182 self._apply_curation(curation_decision) 183 184 # ================== Update Memory ================== # 185 self.last_subjective_state = subjective_state 186 self.last_action = action 187 188 if time_step.terminated or time_step.truncated: 189 self.reactive_policy.clear_active_option() 190 191 return AgentStepResult( 192 action=action, 193 subjective_state=subjective_state, 194 active_option_id=active_option_id, 195 planning_update=planning_update, 196 created_subtasks=created_subtasks, 197 curation_decision=curation_decision, 198 )
Run one temporally uniform agent step.
The step follows six phases: perceive, learn, grow, plan, act, maintain.
202 def train( 203 self, 204 world: World[ObservationT, ActionT, InfoT], 205 *, # enforce keyword arguments after for clarity 206 num_episodes: int = 500, 207 average_window: int = 100, 208 solved_threshold: float | None = None, 209 episode_logger: Callable[[int, float, float, Self], None] | None = None, 210 ) -> list[float]: 211 """Run the standard OaK episode loop on the given world. 212 213 Parameters 214 ---------- 215 world: 216 An environment implementing the `World` protocol. 217 num_episodes: 218 Maximum number of training episodes. 219 average_window: 220 Number of recent episodes to average for performance tracking. 221 solved_threshold: 222 If set, stop early when the average reward over the last `average_window` 223 episodes reaches this value. 224 episode_logger: 225 Optional callback `(episode, episode_reward, avg_reward, agent)` 226 called after each episode. Use this to own all per-episode logging 227 or other training-side effects at the call site. 228 229 Returns 230 ------- 231 list[float] 232 Per-episode reward history. 233 """ 234 if average_window < 1: 235 raise ValueError("average_window must be at least 1.") 236 237 reward_history: list[float] = [] 238 239 for episode in range(num_episodes): 240 time_step = world.reset() 241 self.reset() 242 episode_reward = 0.0 243 244 while True: 245 result = self.step(time_step) 246 247 if time_step.terminated or time_step.truncated: 248 break 249 250 time_step = world.step(result.action) 251 episode_reward += time_step.reward 252 253 reward_history.append(episode_reward) 254 recent_window = reward_history[-average_window:] 255 avg_reward = sum(recent_window) / len(recent_window) 256 257 solved = ( 258 solved_threshold is not None 259 and len(reward_history) >= average_window 260 and avg_reward >= solved_threshold 261 ) 262 263 if episode_logger is not None: 264 episode_logger(episode, episode_reward, avg_reward, self) 265 266 if solved: 267 break 268 269 return reward_history
Run the standard OaK episode loop on the given world.
Parameters
world:
An environment implementing the World protocol.
num_episodes:
Maximum number of training episodes.
average_window:
Number of recent episodes to average for performance tracking.
solved_threshold:
If set, stop early when the average reward over the last average_window
episodes reaches this value.
episode_logger:
Optional callback (episode, episode_reward, avg_reward, agent)
called after each episode. Use this to own all per-episode logging
or other training-side effects at the call site.
Returns
list[float] Per-episode reward history.
59@runtime_checkable 60class World(Protocol[ObservationT, ActionT, InfoT]): 61 """Minimal environment protocol. 62 63 A `World` may wrap a simulator, a benchmark environment, or a custom 64 continual data source. The protocol is intentionally small so the 65 package does not depend on a specific environment library. 66 67 Implement this protocol for any environment you want to use with 68 `OaKAgent.train()`. 69 """ 70 71 def reset(self) -> TimeStep[ObservationT, InfoT]: ... 72 73 def step(self, action: ActionT) -> TimeStep[ObservationT, InfoT]: ... 74 75 def close(self) -> None: 76 """Release environment resources. Default is a no-op.""" 77 ...
Minimal environment protocol.
A World may wrap a simulator, a benchmark environment, or a custom
continual data source. The protocol is intentionally small so the
package does not depend on a specific environment library.
Implement this protocol for any environment you want to use with
OaKAgent.train().
1957def _no_init_or_replace_init(self, *args, **kwargs): 1958 cls = type(self) 1959 1960 if cls._is_protocol: 1961 raise TypeError('Protocols cannot be instantiated') 1962 1963 # Already using a custom `__init__`. No need to calculate correct 1964 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1965 if cls.__init__ is not _no_init_or_replace_init: 1966 return 1967 1968 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1969 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1970 # searches for a proper new `__init__` in the MRO. The new `__init__` 1971 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1972 # instantiation of the protocol subclass will thus use the new 1973 # `__init__` and no longer call `_no_init_or_replace_init`. 1974 for base in cls.__mro__: 1975 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1976 if init is not _no_init_or_replace_init: 1977 cls.__init__ = init 1978 break 1979 else: 1980 # should not happen 1981 cls.__init__ = object.__init__ 1982 1983 cls.__init__(self, *args, **kwargs)
71 def reset(self) -> TimeStep[ObservationT, InfoT]: ...
73 def step(self, action: ActionT) -> TimeStep[ObservationT, InfoT]: ...
245@dataclass(slots=True, frozen=True) 246class AgentStepResult(Generic[ActionT, SubjectiveStateT]): 247 """Observable result of one OaK agent step. 248 249 This is the compact object a caller receives after stepping the agent. It 250 includes the primitive action actually executed, the current subjective 251 state, and any structures or planning signals created during that step. 252 """ 253 254 action: ActionT 255 subjective_state: SubjectiveStateT 256 active_option_id: OptionId | None = None 257 planning_update: PlanningUpdate[ActionT] | None = None 258 created_subtasks: Sequence[SubtaskSpec] = field(default_factory=tuple) 259 curation_decision: CurationDecision | None = None
Observable result of one OaK agent step.
This is the compact object a caller receives after stepping the agent. It includes the primitive action actually executed, the current subjective state, and any structures or planning signals created during that step.
60class ComponentKind(str, Enum): 61 """Kinds of learnable or managed elements in the architecture.""" 62 63 FEATURE = "feature" 64 SUBTASK = "subtask" 65 OPTION = "option" 66 VALUE_FUNCTION = "value_function" 67 OPTION_MODEL = "option_model" 68 TRANSITION_MODEL = "transition_model" 69 POLICY = "policy" 70 PERCEPTION = "perception" 71 PLANNER = "planner"
Kinds of learnable or managed elements in the architecture.
231@dataclass(slots=True, frozen=True) 232class CurationDecision: 233 """Pruning decision returned by the curator.""" 234 235 drop_features: Sequence[FeatureId] = field(default_factory=tuple) 236 drop_subtasks: Sequence[SubtaskId] = field(default_factory=tuple) 237 drop_options: Sequence[OptionId] = field(default_factory=tuple) 238 drop_option_models: Sequence[OptionId] = field(default_factory=tuple) 239 drop_general_value_functions: Sequence[GeneralValueFunctionId] = field( 240 default_factory=tuple 241 ) 242 notes: StructuredPayload = field(default_factory=dict)
Pruning decision returned by the curator.
129@dataclass(slots=True, frozen=True) 130class FeatureCandidate: 131 """A proposed feature that may be admitted into the feature bank.""" 132 133 feature_id: FeatureId 134 name: str 135 origin: str 136 description: str = "" 137 metadata: OpenPayload = field(default_factory=dict)
A proposed feature that may be admitted into the feature bank.
119@dataclass(slots=True, frozen=True) 120class FeatureSpec: 121 """Metadata describing a feature tracked by the agent.""" 122 123 feature_id: FeatureId 124 name: str 125 description: str = "" 126 metadata: OpenPayload = field(default_factory=dict)
Metadata describing a feature tracked by the agent.
140@dataclass(slots=True, frozen=True) 141class GeneralValueFunctionSpec(Generic[ActionT, SubjectiveStateT, InfoT]): 142 """General value function specification.""" 143 144 general_value_function_id: GeneralValueFunctionId 145 name: str 146 cumulant: ScalarSignal 147 continuation: ContinuationFn 148 termination_value: TerminationValueFn 149 metadata: OpenPayload = field(default_factory=dict)
General value function specification.
191@dataclass(slots=True, frozen=True) 192class ModelPrediction(Generic[SubjectiveStateT]): 193 """Prediction returned by an action or option model.""" 194 195 predicted_subjective_state: SubjectiveStateT 196 cumulative_reward: float 197 steps: int | None = None 198 terminated: bool = False 199 metadata: OpenPayload = field(default_factory=dict)
Prediction returned by an action or option model.
164@dataclass(slots=True, frozen=True) 165class OptionDescriptor: 166 """Lightweight metadata for an option.""" 167 168 option_id: OptionId 169 name: str 170 subtask_id: SubtaskId | None = None 171 metadata: OpenPayload = field(default_factory=dict)
Lightweight metadata for an option.
202@dataclass(slots=True, frozen=True) 203class PlanningUpdate(Generic[ActionT]): 204 """Outputs from one planning pass.""" 205 206 value_targets: Mapping[GeneralValueFunctionId, float] = field(default_factory=dict) 207 policy_targets: StructuredPayload = field(default_factory=dict) 208 search_statistics: StructuredPayload = field(default_factory=dict)
Outputs from one planning pass.
174@dataclass(slots=True, frozen=True) 175class PolicyDecision(Generic[ActionT]): 176 """Return type for reactive policy selection.""" 177 178 action: ActionT | None = None 179 option_id: OptionId | None = None 180 metadata: OpenPayload = field(default_factory=dict) 181 182 def __post_init__(self) -> None: 183 has_action = self.action is not None 184 has_option = self.option_id is not None 185 if has_action == has_option: 186 raise ValueError( 187 "PolicyDecision requires exactly one of action or option_id." 188 )
Return type for reactive policy selection.
152@dataclass(slots=True, frozen=True) 153class SubtaskSpec: 154 """A feature-grounded subtask description.""" 155 156 subtask_id: SubtaskId 157 name: str 158 feature_id: FeatureId 159 intensity: float = 1.0 160 general_value_function_id: GeneralValueFunctionId | None = None 161 metadata: OpenPayload = field(default_factory=dict)
A feature-grounded subtask description.
74@dataclass(slots=True, frozen=True) 75class TimeStep(Generic[ObservationT, InfoT]): 76 """One environment emission seen by the agent. 77 78 `TimeStep` is the object passed into `OaKAgent.step(...)`. It contains the 79 raw observation, scalar reward, episode-control flags, and optional 80 environment metadata. 81 """ 82 83 observation: ObservationT 84 reward: float 85 terminated: bool = False 86 truncated: bool = False 87 info: InfoT | None = None
One environment emission seen by the agent.
TimeStep is the object passed into OaKAgent.step(...). It contains the
raw observation, scalar reward, episode-control flags, and optional
environment metadata.
90@dataclass(slots=True, frozen=True) 91class Transition(Generic[ActionT, SubjectiveStateT, InfoT]): 92 """One subjective-state transition in agent terms. 93 94 `Transition` is constructed by the agent after two consecutive time steps. 95 Learners use it instead of the raw world stream so they can access both the 96 previous and next subjective state representations together with reward, 97 termination, and optional environment metadata. 98 """ 99 100 subjective_state: SubjectiveStateT 101 action: ActionT 102 reward: float 103 next_subjective_state: SubjectiveStateT 104 terminated: bool = False 105 info: InfoT | None = None
One subjective-state transition in agent terms.
Transition is constructed by the agent after two consecutive time steps.
Learners use it instead of the raw world stream so they can access both the
previous and next subjective state representations together with reward,
termination, and optional environment metadata.
211@dataclass(slots=True, frozen=True) 212class UsageRecord: 213 """Usage evidence gathered for utility assessment.""" 214 215 kind: ComponentKind 216 component_id: ComponentId 217 amount: float = 1.0 218 metadata: OpenPayload = field(default_factory=dict)
Usage evidence gathered for utility assessment.
221@dataclass(slots=True, frozen=True) 222class UtilityRecord: 223 """Utility score for one architectural element.""" 224 225 kind: ComponentKind 226 component_id: ComponentId 227 utility: float 228 evidence: StructuredPayload = field(default_factory=dict)
Utility score for one architectural element.