oak_architecture

 1from .agent import OaKAgent
 2from . import fine_grained
 3from .interfaces import (
 4    ContinualLearner,
 5    Perception,
 6    ReactivePolicy,
 7    TransitionModel,
 8    ValueFunction,
 9    World,
10)
11from .types import (
12    AgentStepResult,
13    ComponentKind,
14    CurationDecision,
15    FeatureCandidate,
16    FeatureSpec,
17    GeneralValueFunctionSpec,
18    ModelPrediction,
19    OptionDescriptor,
20    PlanningUpdate,
21    PolicyDecision,
22    SubtaskSpec,
23    TimeStep,
24    Transition,
25    UsageRecord,
26    UtilityRecord,
27)
28
29__all__ = [
30    # ── Continual-learning mixin ──
31    "ContinualLearner",
32    # ── The four main OaK interfaces ──
33    "Perception",
34    "TransitionModel",
35    "ValueFunction",
36    "ReactivePolicy",
37    # ── Agent ──
38    "OaKAgent",
39    # ── Environment ──
40    "World",
41    # ── Optional advanced assembly layer ──
42    "fine_grained",
43    # ── Shared types ──
44    "AgentStepResult",
45    "ComponentKind",
46    "CurationDecision",
47    "FeatureCandidate",
48    "FeatureSpec",
49    "GeneralValueFunctionSpec",
50    "ModelPrediction",
51    "OptionDescriptor",
52    "PlanningUpdate",
53    "PolicyDecision",
54    "SubtaskSpec",
55    "TimeStep",
56    "Transition",
57    "UsageRecord",
58    "UtilityRecord",
59]

Architecture Guide

Main four-interface OaK view showing Perception, ValueFunction, TransitionModel, and ReactivePolicy.
`oak_core`. The default conceptual slot map: OaKAgent coordinating the four main interfaces and their main data flow.
Fine-grained slot map showing Composite modules, their delegated interfaces, and associated optional interfaces.
`oak_architecture`. The fine-grained slot map: Composite modules plus the lower-level interfaces available inside each slot.
Simplified runtime sequence for the six phases of OaKAgent.step(...).
`oak_runtime_overview`. The top-level step path through the four main interfaces for the six phases: Perceive, Learn, Grow, Plan, Act, Maintain.
Detailed runtime sequence showing the fine-grained interfaces actually touched during one OaKAgent step through Composite modules.
`oak_runtime_sequence`. The detailed composite-wired step path: OaKAgent -> Composite* -> fine_grained interface used during that step.

What You Must Implement

OaKAgent is the canonical coordinator. It is composed of exactly four objects, one per Sutton module:

You also configure scalar controls:

OaKAgent manages these runtime fields itself:

Your environment must implement the World protocol (reset, step, close) to use OaKAgent.train(). You can also drive the loop yourself by supplying TimeStep objects to OaKAgent.step(...) directly.

Two Ways to Implement

Direct approach: implement the four main interfaces directly. Each of your classes is a self-contained module. This is the simplest path and what the examples/smoke/minimal_oak.py example demonstrates.

Composite approach: use the fine-grained component interfaces from oak_architecture.fine_grained.components and wire them together using the composites from oak_architecture.fine_grained.composites. This is for projects that need to independently swap building blocks inside a module (e.g. replace the planner without touching the world model). The examples/smoke/minimal_oak_fine_grained.py example demonstrates this path with the same toy behavior as the direct example.

Main interface Composite class Fine-grained building blocks
Perception CompositePerception StateBuilder, FeatureBank, FeatureConstructor, FeatureRanker, SubtaskGenerator
TransitionModel CompositeTransitionModel WorldModel, OptionModelLearner, OptionModel, Planner
ValueFunction CompositeValueFunction ValueEstimator, GeneralValueFunctionLearner, UtilityAssessor, Curator, MetaStepSizeLearner
ReactivePolicy CompositeReactivePolicy ActionSelector, Option, OptionLibrary, OptionLearner, OptionKeyboard (optional)

Diagram-to-Code Mapping

The diagrams have different jobs, but they all describe the same implementation:

Recommended reading order for the diagrams:

  1. Read oak_core to understand the default four-interface surface.
  2. Read oak_runtime_overview for the six phases of step(...).
  3. Read oak_architecture to see how the optional fine-grained layer is assembled.
  4. Read oak_runtime_sequence to trace one composite-wired execution path.

oak_runtime_overview and oak_runtime_sequence describe the same six phases. The difference is only the level of expansion: oak_runtime_overview stays at the four-interface layer, while oak_runtime_sequence shows what happens when those slots are filled by the Composite* implementations from oak_architecture.fine_grained.composites. If either diagram and the code ever disagree, the documentation should be fixed.

The diagrams are intentionally runtime-oriented. They are not exhaustive method inventories for the interfaces. For the full surface area (reset, predict, current_subjective_state, OptionKeyboard, and so on), use the API reference below. oak_architecture is the broadest inventory view; oak_runtime_overview and oak_runtime_sequence are narrower and only show what matters for one OaKAgent.step(...).

Step Walkthrough

Read the method as a pipeline. Each block below corresponds to the next block of code in OaKAgent.step(...).

1. Perceive

subjective_state = self.perception.update(...)

time_step is the input. It carries observation, reward, terminated, truncated, and optional info. perception must turn these into the current subjective_state. Every later call in the step uses this subjective_state, so your Perception implementation defines what the agent actually reasons over.

2. Learn

td_errors = self.value_function.update(transition)
self.reactive_policy.update(transition, td_errors)
self.transition_model.update(transition)

Learning starts only once the agent has both a previous subjective_state and a previous action. The first call to step(...) therefore sets up memory but cannot yet build a full transition.

The Transition packages the previous/next subjective states, the action, reward, the termination outcome, and optional info. All three modules receive it. value_function.update returns TD errors that reactive_policy.update uses for policy improvement.

3. Grow

ranked_feature_ids = self.perception.discover_and_rank_features(...)
created_subtasks = self.perception.generate_subtasks(ranked_feature_ids)
self.reactive_policy.ingest_subtasks(created_subtasks)
self.reactive_policy.integrate_options()
self.transition_model.integrate_option_models()

perception proposes new features, ranks them by utility, and generates subtasks from the most useful ones. reactive_policy turns subtasks into options. transition_model integrates the latest option models so planning can reason about them. In the overview diagram this appears as top-level module calls; in the detailed sequence diagram the same phase is expanded into Composite* -> fine_grained interface calls.

4. Plan

planning_update = self.transition_model.plan(
    subjective_state, self.value_function, self.planning_budget
)
self.reactive_policy.apply_planning_update(planning_update)

transition_model.plan(...) receives the current subjective_state, the value_function (for state evaluation during search), and a budget. It returns a PlanningUpdate. reactive_policy is informed about the planner's output before action selection.

5. Act

action, active_option_id = self.reactive_policy.select_action(
    subjective_state, self.option_stop_threshold
)

The reactive policy either continues an active option or makes a fresh decision. The output is always a primitive action, because that is what the caller receives in AgentStepResult.

6. Maintain

self.value_function.observe_usage(usage_records)
curation_decision = self.value_function.curate()
self._apply_curation(curation_decision)

Usage records for ranked features and the active option are sent to the value function for utility tracking. The value function then decides what to prune. _apply_curation(...) dispatches the decision to the relevant modules: perception.remove_features(...), reactive_policy.remove_options(...), reactive_policy.remove_subtasks(...), transition_model.remove_option_models(...), value_function.remove(...).

Training Loop

OaKAgent.train() provides a standard episode loop so implementations don't need to rewrite the reset/step/terminate boilerplate:

agent = build_my_agent()
world = MyWorld()          # must implement the World protocol

def log_episode(episode, reward, avg_reward, agent):
    if episode % 10 == 0:
        print(f"episode={episode} reward={reward:.1f} avg={avg_reward:.1f}")

rewards = agent.train(
    world,
    num_episodes=500,
    solved_threshold=475.0,  # optional early stopping
    episode_logger=log_episode,
)
world.close()

The World protocol requires three methods:

If you need custom per-episode logging, pass episode_logger(...). If you need a fully custom training loop (non-episodic environments, multi-agent setups, custom control flow), call agent.step(time_step) directly instead.

Implementation Order

If your goal is to get a working agent quickly, implement in this order:

  1. Make Perception produce a useful subjective_state from TimeStep. Have discover_and_rank_features return a fixed list and generate_subtasks return empty.
  2. Make ReactivePolicy return valid actions from select_action. Have the other methods be no-ops.
  3. Make ValueFunction accept update and return predict values. Have curate return an empty CurationDecision.
  4. Make TransitionModel accept update and return a valid PlanningUpdate from plan, even if trivial.

That is enough to satisfy the exact call sequence of OaKAgent.step(...). After that, you can improve learning quality without changing the basic wiring.

Repository Examples

The concrete implementations live outside oak_architecture on purpose. That shows the intended usage pattern: the package provides the canonical OaKAgent coordinator and interfaces, while downstream code provides the implementations. The generated docs now include the repository-level examples package alongside the core oak_architecture API.

To run all repository smoke tests, including the minimal example:

pixi run tests

To inspect the smallest runnable example directly:

from examples import build_minimal_agent, run_minimal_episode

agent = build_minimal_agent()
trace = run_minimal_episode(horizon=5)

run_minimal_episode(...) returns a compact trace with the subjective_state, primitive action, active_option_id, created_subtasks, and planner output at each step.

Design Constraints

Keep these constraints in mind when you replace the minimal pieces with real ones:


API Documentation

class ContinualLearner:
 85class ContinualLearner:
 86    """Mixin for modules whose weights are adapted by meta-learned step sizes.
 87
 88    In Sutton's OaK architecture, every learned weight has a dedicated
 89    step-size parameter adapted via online cross-validation (e.g. IDBD,
 90    Sutton 1992; Adam-IDBD, Degris et al. 2024).
 91
 92    The agent loop calls `update_meta()` on all four modules after each
 93    learning step, passing the same error-signals dict.  Each module
 94    internally decides which signals are relevant and routes them to its
 95    per-weight step-size adaptation.
 96
 97    The default implementation is a no-op so that modules without
 98    meta-learning still work unchanged.
 99    """
100
101    def update_meta(self, error_signals: Mapping[str, float]) -> None:
102        """Adapt internal per-weight step sizes given error signals.
103
104        Parameters
105        ----------
106        error_signals:
107            Named scalar error signals from the current learning step,
108            e.g. `{"main_td_error": 0.05, "reward": 1.0}`.
109            Implementations pick the signals they need and ignore the rest.
110        """

Mixin for modules whose weights are adapted by meta-learned step sizes.

In Sutton's OaK architecture, every learned weight has a dedicated step-size parameter adapted via online cross-validation (e.g. IDBD, Sutton 1992; Adam-IDBD, Degris et al. 2024).

The agent loop calls update_meta() on all four modules after each learning step, passing the same error-signals dict. Each module internally decides which signals are relevant and routes them to its per-weight step-size adaptation.

The default implementation is a no-op so that modules without meta-learning still work unchanged.

def update_meta(self, error_signals: 'Mapping[str, float]') -> 'None':
101    def update_meta(self, error_signals: Mapping[str, float]) -> None:
102        """Adapt internal per-weight step sizes given error signals.
103
104        Parameters
105        ----------
106        error_signals:
107            Named scalar error signals from the current learning step,
108            e.g. `{"main_td_error": 0.05, "reward": 1.0}`.
109            Implementations pick the signals they need and ignore the rest.
110        """

Adapt internal per-weight step sizes given error signals.

Parameters

error_signals: Named scalar error signals from the current learning step, e.g. {"main_td_error": 0.05, "reward": 1.0}. Implementations pick the signals they need and ignore the rest.

class Perception(oak_architecture.ContinualLearner, abc.ABC, typing.Generic[~ObservationT, ~ActionT, ~SubjectiveStateT]):
118class Perception(
119    ContinualLearner, ABC, Generic[ObservationT, ActionT, SubjectiveStateT]
120):
121    """Sutton's Perception: observations → subjective state + feature management.
122
123    Turns raw observations into the agent's **subjective state**, the
124    internal representation that every other module sees.  Also discovers,
125    ranks, and manages **features** (learned representational structures
126    that grow over the agent's lifetime) and generates **subtasks** from
127    the most useful ones.
128
129    The finer-grained layer splits this into `StateBuilder`,
130    `FeatureBank`, `FeatureConstructor`, `FeatureRanker`, and
131    `SubtaskGenerator` (see `oak_architecture.fine_grained.components`).
132    """
133
134    @abstractmethod
135    def reset(self) -> None:
136        """Reset all perception state for a new episode."""
137        raise NotImplementedError
138
139    @abstractmethod
140    def update(
141        self,
142        observation: ObservationT,
143        reward: float,
144        last_action: ActionT | None,
145    ) -> SubjectiveStateT:
146        """Process a new observation and return the updated subjective state."""
147        raise NotImplementedError
148
149    @abstractmethod
150    def current_subjective_state(self) -> SubjectiveStateT:
151        """Return the most recently computed subjective state."""
152        raise NotImplementedError
153
154    @abstractmethod
155    def discover_and_rank_features(
156        self,
157        subjective_state: SubjectiveStateT,
158        utility_scores: Sequence[UtilityRecord],
159        feature_budget: int,
160    ) -> Sequence[FeatureId]:
161        """Propose new features, integrate them, and return the top-ranked IDs.
162
163        A typical implementation:
164
165        1. Proposes candidate features from the current subjective state.
166        2. Adds accepted candidates to its internal feature store.
167        3. Ranks all features using the provided utility scores.
168        4. Returns the top feature IDs (up to *feature_budget*).
169        """
170        raise NotImplementedError
171
172    @abstractmethod
173    def generate_subtasks(
174        self,
175        ranked_feature_ids: Sequence[FeatureId],
176    ) -> Sequence[SubtaskSpec]:
177        """Turn ranked feature IDs into subtask specifications."""
178        raise NotImplementedError
179
180    @abstractmethod
181    def list_features(self) -> Sequence[FeatureSpec]:
182        """Return all currently tracked features."""
183        raise NotImplementedError
184
185    @abstractmethod
186    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
187        """Remove features by ID (called during curation)."""
188        raise NotImplementedError

Sutton's Perception: observations → subjective state + feature management.

Turns raw observations into the agent's subjective state, the internal representation that every other module sees. Also discovers, ranks, and manages features (learned representational structures that grow over the agent's lifetime) and generates subtasks from the most useful ones.

The finer-grained layer splits this into StateBuilder, FeatureBank, FeatureConstructor, FeatureRanker, and SubtaskGenerator (see oak_architecture.fine_grained.components).

@abstractmethod
def reset(self) -> 'None':
134    @abstractmethod
135    def reset(self) -> None:
136        """Reset all perception state for a new episode."""
137        raise NotImplementedError

Reset all perception state for a new episode.

@abstractmethod
def update( self, observation: 'ObservationT', reward: 'float', last_action: 'ActionT | None') -> 'SubjectiveStateT':
139    @abstractmethod
140    def update(
141        self,
142        observation: ObservationT,
143        reward: float,
144        last_action: ActionT | None,
145    ) -> SubjectiveStateT:
146        """Process a new observation and return the updated subjective state."""
147        raise NotImplementedError

Process a new observation and return the updated subjective state.

@abstractmethod
def current_subjective_state(self) -> 'SubjectiveStateT':
149    @abstractmethod
150    def current_subjective_state(self) -> SubjectiveStateT:
151        """Return the most recently computed subjective state."""
152        raise NotImplementedError

Return the most recently computed subjective state.

@abstractmethod
def discover_and_rank_features( self, subjective_state: 'SubjectiveStateT', utility_scores: 'Sequence[UtilityRecord]', feature_budget: 'int') -> 'Sequence[FeatureId]':
154    @abstractmethod
155    def discover_and_rank_features(
156        self,
157        subjective_state: SubjectiveStateT,
158        utility_scores: Sequence[UtilityRecord],
159        feature_budget: int,
160    ) -> Sequence[FeatureId]:
161        """Propose new features, integrate them, and return the top-ranked IDs.
162
163        A typical implementation:
164
165        1. Proposes candidate features from the current subjective state.
166        2. Adds accepted candidates to its internal feature store.
167        3. Ranks all features using the provided utility scores.
168        4. Returns the top feature IDs (up to *feature_budget*).
169        """
170        raise NotImplementedError

Propose new features, integrate them, and return the top-ranked IDs.

A typical implementation:

  1. Proposes candidate features from the current subjective state.
  2. Adds accepted candidates to its internal feature store.
  3. Ranks all features using the provided utility scores.
  4. Returns the top feature IDs (up to feature_budget).
@abstractmethod
def generate_subtasks( self, ranked_feature_ids: 'Sequence[FeatureId]') -> 'Sequence[SubtaskSpec]':
172    @abstractmethod
173    def generate_subtasks(
174        self,
175        ranked_feature_ids: Sequence[FeatureId],
176    ) -> Sequence[SubtaskSpec]:
177        """Turn ranked feature IDs into subtask specifications."""
178        raise NotImplementedError

Turn ranked feature IDs into subtask specifications.

@abstractmethod
def list_features(self) -> 'Sequence[FeatureSpec]':
180    @abstractmethod
181    def list_features(self) -> Sequence[FeatureSpec]:
182        """Return all currently tracked features."""
183        raise NotImplementedError

Return all currently tracked features.

@abstractmethod
def remove_features(self, feature_ids: 'Sequence[FeatureId]') -> 'None':
185    @abstractmethod
186    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
187        """Remove features by ID (called during curation)."""
188        raise NotImplementedError

Remove features by ID (called during curation).

class TransitionModel(oak_architecture.ContinualLearner, abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
245class TransitionModel(ContinualLearner, ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
246    """Sutton's Transition Model: world dynamics + option models + planning.
247
248    Learns from observed transitions, maintains **option models** that
249    predict the effect of temporal abstractions, and runs bounded
250    **planning** using the world model and the value function to produce
251    improvement signals for the reactive policy.
252
253    The finer-grained layer splits this into `WorldModel`,
254    `OptionModelLearner`, individual `OptionModel` objects, and a
255    `Planner` (see `oak_architecture.fine_grained.components`).
256    """
257
258    @abstractmethod
259    def update(
260        self,
261        transition: Transition[ActionT, SubjectiveStateT, InfoT],
262    ) -> None:
263        """Learn from an observed transition.
264
265        This should update both the world model and any option-model learners.
266        """
267        raise NotImplementedError
268
269    @abstractmethod
270    def integrate_option_models(self) -> None:
271        """Export learned option models and integrate them into the world model.
272
273        Called after option learning so that planning reasons over fresh models.
274        """
275        raise NotImplementedError
276
277    @abstractmethod
278    def plan(
279        self,
280        subjective_state: SubjectiveStateT,
281        value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT],
282        budget: int,
283    ) -> PlanningUpdate[ActionT]:
284        """Run bounded planning and return improvement signals.
285
286        The planner uses the internal world model together with the supplied
287        *value_function* (for state evaluation) to produce value targets,
288        policy targets, or search statistics.
289        """
290        raise NotImplementedError
291
292    @abstractmethod
293    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
294        """Remove option models by ID (called during curation)."""
295        raise NotImplementedError

Sutton's Transition Model: world dynamics + option models + planning.

Learns from observed transitions, maintains option models that predict the effect of temporal abstractions, and runs bounded planning using the world model and the value function to produce improvement signals for the reactive policy.

The finer-grained layer splits this into WorldModel, OptionModelLearner, individual OptionModel objects, and a Planner (see oak_architecture.fine_grained.components).

@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'None':
258    @abstractmethod
259    def update(
260        self,
261        transition: Transition[ActionT, SubjectiveStateT, InfoT],
262    ) -> None:
263        """Learn from an observed transition.
264
265        This should update both the world model and any option-model learners.
266        """
267        raise NotImplementedError

Learn from an observed transition.

This should update both the world model and any option-model learners.

@abstractmethod
def integrate_option_models(self) -> 'None':
269    @abstractmethod
270    def integrate_option_models(self) -> None:
271        """Export learned option models and integrate them into the world model.
272
273        Called after option learning so that planning reasons over fresh models.
274        """
275        raise NotImplementedError

Export learned option models and integrate them into the world model.

Called after option learning so that planning reasons over fresh models.

@abstractmethod
def plan( self, subjective_state: 'SubjectiveStateT', value_function: 'ValueFunction[SubjectiveStateT, ActionT, InfoT]', budget: 'int') -> 'PlanningUpdate[ActionT]':
277    @abstractmethod
278    def plan(
279        self,
280        subjective_state: SubjectiveStateT,
281        value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT],
282        budget: int,
283    ) -> PlanningUpdate[ActionT]:
284        """Run bounded planning and return improvement signals.
285
286        The planner uses the internal world model together with the supplied
287        *value_function* (for state evaluation) to produce value targets,
288        policy targets, or search statistics.
289        """
290        raise NotImplementedError

Run bounded planning and return improvement signals.

The planner uses the internal world model together with the supplied value_function (for state evaluation) to produce value targets, policy targets, or search statistics.

@abstractmethod
def remove_option_models(self, option_ids: 'Sequence[OptionId]') -> 'None':
292    @abstractmethod
293    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
294        """Remove option models by ID (called during curation)."""
295        raise NotImplementedError

Remove option models by ID (called during curation).

class ValueFunction(oak_architecture.ContinualLearner, abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
191class ValueFunction(ContinualLearner, ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
192    """Sutton's Value Function: value learning + utility assessment + curation.
193
194    Learns **predictive value signals** (TD errors, GVF predictions) from
195    observed transitions and predicts cumulative signals for any given
196    subjective state.  Also assesses the **utility** of the agent's learned
197    structures (features, options, models) and produces concrete keep/drop
198    **curation** decisions.
199
200    The finer-grained layer splits this into `ValueEstimator`,
201    `GeneralValueFunctionLearner`, `UtilityAssessor`, `Curator`, and
202    `MetaStepSizeLearner` (see `oak_architecture.fine_grained.components`).
203    """
204
205    @abstractmethod
206    def update(
207        self,
208        transition: Transition[ActionT, SubjectiveStateT, InfoT],
209    ) -> Mapping[GeneralValueFunctionId, float]:
210        """Learn from a transition and return TD-error signals."""
211        raise NotImplementedError
212
213    @abstractmethod
214    def predict(
215        self,
216        subjective_state: SubjectiveStateT,
217    ) -> Mapping[GeneralValueFunctionId, float]:
218        """Predict values for the given subjective state."""
219        raise NotImplementedError
220
221    @abstractmethod
222    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
223        """Record usage evidence for utility assessment."""
224        raise NotImplementedError
225
226    @abstractmethod
227    def utility_scores(self) -> Sequence[UtilityRecord]:
228        """Return current utility estimates for all tracked structures."""
229        raise NotImplementedError
230
231    @abstractmethod
232    def curate(self) -> CurationDecision:
233        """Decide which learned structures to drop."""
234        raise NotImplementedError
235
236    @abstractmethod
237    def remove(
238        self,
239        general_value_function_ids: Sequence[GeneralValueFunctionId],
240    ) -> None:
241        """Remove value functions by ID (called during curation)."""
242        raise NotImplementedError

Sutton's Value Function: value learning + utility assessment + curation.

Learns predictive value signals (TD errors, GVF predictions) from observed transitions and predicts cumulative signals for any given subjective state. Also assesses the utility of the agent's learned structures (features, options, models) and produces concrete keep/drop curation decisions.

The finer-grained layer splits this into ValueEstimator, GeneralValueFunctionLearner, UtilityAssessor, Curator, and MetaStepSizeLearner (see oak_architecture.fine_grained.components).

@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'Mapping[GeneralValueFunctionId, float]':
205    @abstractmethod
206    def update(
207        self,
208        transition: Transition[ActionT, SubjectiveStateT, InfoT],
209    ) -> Mapping[GeneralValueFunctionId, float]:
210        """Learn from a transition and return TD-error signals."""
211        raise NotImplementedError

Learn from a transition and return TD-error signals.

@abstractmethod
def predict( self, subjective_state: 'SubjectiveStateT') -> 'Mapping[GeneralValueFunctionId, float]':
213    @abstractmethod
214    def predict(
215        self,
216        subjective_state: SubjectiveStateT,
217    ) -> Mapping[GeneralValueFunctionId, float]:
218        """Predict values for the given subjective state."""
219        raise NotImplementedError

Predict values for the given subjective state.

@abstractmethod
def observe_usage(self, usage_records: 'Sequence[UsageRecord]') -> 'None':
221    @abstractmethod
222    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
223        """Record usage evidence for utility assessment."""
224        raise NotImplementedError

Record usage evidence for utility assessment.

@abstractmethod
def utility_scores(self) -> 'Sequence[UtilityRecord]':
226    @abstractmethod
227    def utility_scores(self) -> Sequence[UtilityRecord]:
228        """Return current utility estimates for all tracked structures."""
229        raise NotImplementedError

Return current utility estimates for all tracked structures.

@abstractmethod
def curate(self) -> 'CurationDecision':
231    @abstractmethod
232    def curate(self) -> CurationDecision:
233        """Decide which learned structures to drop."""
234        raise NotImplementedError

Decide which learned structures to drop.

@abstractmethod
def remove( self, general_value_function_ids: 'Sequence[GeneralValueFunctionId]') -> 'None':
236    @abstractmethod
237    def remove(
238        self,
239        general_value_function_ids: Sequence[GeneralValueFunctionId],
240    ) -> None:
241        """Remove value functions by ID (called during curation)."""
242        raise NotImplementedError

Remove value functions by ID (called during curation).

class ReactivePolicy(oak_architecture.ContinualLearner, abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
298class ReactivePolicy(ContinualLearner, ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
299    """Sutton's Reactive Policy: action selection + option management.
300
301    Selects **actions**, primitive or temporal abstractions (options),
302    based on the current subjective state.  Manages the **option library**
303    and **option learning** pipeline, and integrates **planning updates**
304    into decision-making.
305
306    The finer-grained layer splits this into `ActionSelector`,
307    `OptionLibrary`, and `OptionLearner`
308    (see `oak_architecture.fine_grained.components`).
309    """
310
311    @abstractmethod
312    def update(
313        self,
314        transition: Transition[ActionT, SubjectiveStateT, InfoT],
315        td_errors: Mapping[GeneralValueFunctionId, float],
316    ) -> None:
317        """Update the policy and option learners from an observed transition."""
318        raise NotImplementedError
319
320    @abstractmethod
321    def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None:
322        """Integrate planning improvement signals into the policy."""
323        raise NotImplementedError
324
325    @abstractmethod
326    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
327        """Feed newly created subtasks into the option learner."""
328        raise NotImplementedError
329
330    @abstractmethod
331    def integrate_options(self) -> None:
332        """Export learned options into the option library."""
333        raise NotImplementedError
334
335    @abstractmethod
336    def select_action(
337        self,
338        subjective_state: SubjectiveStateT,
339        option_stop_threshold: float,
340    ) -> tuple[ActionT, OptionId | None]:
341        """Choose a primitive action, possibly by continuing an active option.
342
343        Returns a `(primitive_action, active_option_id)` pair.  When no
344        option is active, *active_option_id* is `None`.
345        """
346        raise NotImplementedError
347
348    @abstractmethod
349    def clear_active_option(self) -> None:
350        """Clear the currently executing option (e.g. at episode boundaries)."""
351        raise NotImplementedError
352
353    @abstractmethod
354    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
355        """Remove options by ID (called during curation)."""
356        raise NotImplementedError
357
358    @abstractmethod
359    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
360        """Remove subtasks by ID (called during curation)."""
361        raise NotImplementedError

Sutton's Reactive Policy: action selection + option management.

Selects actions, primitive or temporal abstractions (options), based on the current subjective state. Manages the option library and option learning pipeline, and integrates planning updates into decision-making.

The finer-grained layer splits this into ActionSelector, OptionLibrary, and OptionLearner (see oak_architecture.fine_grained.components).

@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]', td_errors: 'Mapping[GeneralValueFunctionId, float]') -> 'None':
311    @abstractmethod
312    def update(
313        self,
314        transition: Transition[ActionT, SubjectiveStateT, InfoT],
315        td_errors: Mapping[GeneralValueFunctionId, float],
316    ) -> None:
317        """Update the policy and option learners from an observed transition."""
318        raise NotImplementedError

Update the policy and option learners from an observed transition.

@abstractmethod
def apply_planning_update(self, update: 'PlanningUpdate[ActionT]') -> 'None':
320    @abstractmethod
321    def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None:
322        """Integrate planning improvement signals into the policy."""
323        raise NotImplementedError

Integrate planning improvement signals into the policy.

@abstractmethod
def ingest_subtasks(self, subtasks: 'Sequence[SubtaskSpec]') -> 'None':
325    @abstractmethod
326    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
327        """Feed newly created subtasks into the option learner."""
328        raise NotImplementedError

Feed newly created subtasks into the option learner.

@abstractmethod
def integrate_options(self) -> 'None':
330    @abstractmethod
331    def integrate_options(self) -> None:
332        """Export learned options into the option library."""
333        raise NotImplementedError

Export learned options into the option library.

@abstractmethod
def select_action( self, subjective_state: 'SubjectiveStateT', option_stop_threshold: 'float') -> 'tuple[ActionT, OptionId | None]':
335    @abstractmethod
336    def select_action(
337        self,
338        subjective_state: SubjectiveStateT,
339        option_stop_threshold: float,
340    ) -> tuple[ActionT, OptionId | None]:
341        """Choose a primitive action, possibly by continuing an active option.
342
343        Returns a `(primitive_action, active_option_id)` pair.  When no
344        option is active, *active_option_id* is `None`.
345        """
346        raise NotImplementedError

Choose a primitive action, possibly by continuing an active option.

Returns a (primitive_action, active_option_id) pair. When no option is active, active_option_id is None.

@abstractmethod
def clear_active_option(self) -> 'None':
348    @abstractmethod
349    def clear_active_option(self) -> None:
350        """Clear the currently executing option (e.g. at episode boundaries)."""
351        raise NotImplementedError

Clear the currently executing option (e.g. at episode boundaries).

@abstractmethod
def remove_options(self, option_ids: 'Sequence[OptionId]') -> 'None':
353    @abstractmethod
354    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
355        """Remove options by ID (called during curation)."""
356        raise NotImplementedError

Remove options by ID (called during curation).

@abstractmethod
def remove_subtasks(self, subtask_ids: 'Sequence[SubtaskId]') -> 'None':
358    @abstractmethod
359    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
360        """Remove subtasks by ID (called during curation)."""
361        raise NotImplementedError

Remove subtasks by ID (called during curation).

@dataclass
class OaKAgent(typing.Generic[~ObservationT, ~ActionT, ~SubjectiveStateT, ~InfoT]):
 51@dataclass
 52class OaKAgent(Generic[ObservationT, ActionT, SubjectiveStateT, InfoT]):
 53    """Coordinates one full OaK step across the four modules.
 54
 55    The agent is a wiring object: you provide concrete implementations of
 56    `Perception`, `TransitionModel`, `ValueFunction`, and
 57    `ReactivePolicy`, and `OaKAgent` ensures they are called in a
 58    consistent order.
 59    """
 60
 61    perception: Perception[ObservationT, ActionT, SubjectiveStateT]
 62    transition_model: TransitionModel[SubjectiveStateT, ActionT, InfoT]
 63    value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT]
 64    reactive_policy: ReactivePolicy[SubjectiveStateT, ActionT, InfoT]
 65
 66    planning_budget: int = 4
 67    feature_budget: int = 4
 68    option_stop_threshold: float = 0.5
 69
 70    last_action: ActionT | None = None
 71    last_subjective_state: SubjectiveStateT | None = None
 72
 73    def __init__(
 74        self,
 75        perception: Perception[ObservationT, ActionT, SubjectiveStateT],
 76        transition_model: TransitionModel[SubjectiveStateT, ActionT, InfoT],
 77        value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT],
 78        reactive_policy: ReactivePolicy[SubjectiveStateT, ActionT, InfoT],
 79        planning_budget: int = 4,
 80        feature_budget: int = 4,
 81        option_stop_threshold: float = 0.5,
 82    ):
 83        self.perception = perception
 84        self.transition_model = transition_model
 85        self.value_function = value_function
 86        self.reactive_policy = reactive_policy
 87        self.planning_budget = planning_budget
 88        self.feature_budget = feature_budget
 89        self.option_stop_threshold = option_stop_threshold
 90        self.last_action = None
 91        self.last_subjective_state = None
 92
 93    def __post_init__(self):
 94        """Validate that the modules are compatible."""
 95        if self.planning_budget < 1:
 96            raise ValueError("Planning budget must be at least 1.")
 97        if self.feature_budget < 1:
 98            raise ValueError("Feature budget must be at least 1.")
 99        if self.option_stop_threshold < 0 or self.option_stop_threshold > 1:
100            raise ValueError("Option stop threshold must be in [0, 1].")
101
102    def reset(self) -> None:
103        """Clear transient execution memory."""
104        self.perception.reset()
105        self.reactive_policy.clear_active_option()
106        self.last_action = None
107        self.last_subjective_state = None
108
109    def step(
110        self, time_step: TimeStep[ObservationT, InfoT]
111    ) -> AgentStepResult[ActionT, SubjectiveStateT]:
112        """Run one temporally uniform agent step.
113
114        The step follows six phases: perceive, learn, grow, plan, act, maintain.
115        """
116
117        # ================== 1. Perceive ================= #
118        subjective_state = self.perception.update(
119            observation=time_step.observation,
120            reward=time_step.reward,
121            last_action=self.last_action,
122        )
123
124        created_subtasks: Sequence[SubtaskSpec] = ()
125        ranked_feature_ids: Sequence[FeatureId] = ()
126        planning_update: PlanningUpdate[ActionT] | None = None
127        curation_decision: CurationDecision | None = None
128
129        # ================== 2. Learn ================== #
130        if self.last_subjective_state is not None and self.last_action is not None:
131            transition = Transition(
132                subjective_state=self.last_subjective_state,
133                action=self.last_action,
134                reward=time_step.reward,
135                next_subjective_state=subjective_state,
136                terminated=time_step.terminated or time_step.truncated,
137                info=time_step.info,
138            )
139            td_errors = self.value_function.update(transition)
140            self.reactive_policy.update(transition, td_errors)
141            self.transition_model.update(transition)
142
143            # Meta step-size adaptation (Sutton's IDBD / online cross-validation)
144            meta_signals = dict(td_errors)
145            meta_signals["reward"] = transition.reward
146            self.perception.update_meta(meta_signals)
147            self.value_function.update_meta(meta_signals)
148            self.reactive_policy.update_meta(meta_signals)
149            self.transition_model.update_meta(meta_signals)
150
151        # ================== 3. Grow ================== #
152        ranked_feature_ids = self.perception.discover_and_rank_features(
153            subjective_state,
154            self.value_function.utility_scores(),
155            self.feature_budget,
156        )
157        if ranked_feature_ids:
158            created_subtasks = self.perception.generate_subtasks(ranked_feature_ids)
159            if created_subtasks:
160                self.reactive_policy.ingest_subtasks(created_subtasks)
161
162        self.reactive_policy.integrate_options()
163        self.transition_model.integrate_option_models()
164
165        # ================== 4. Plan ================== #
166        planning_update = self.transition_model.plan(
167            subjective_state, self.value_function, self.planning_budget
168        )
169        self.reactive_policy.apply_planning_update(planning_update)
170
171        # ================== 5. Act ================== #
172        action, active_option_id = self.reactive_policy.select_action(
173            subjective_state, self.option_stop_threshold
174        )
175
176        # ================== 6. Maintain ================== #
177        usage_records = self._build_usage_records(ranked_feature_ids, active_option_id)
178        if usage_records:
179            self.value_function.observe_usage(usage_records)
180
181        curation_decision = self.value_function.curate()
182        self._apply_curation(curation_decision)
183
184        # ================== Update Memory ================== #
185        self.last_subjective_state = subjective_state
186        self.last_action = action
187
188        if time_step.terminated or time_step.truncated:
189            self.reactive_policy.clear_active_option()
190
191        return AgentStepResult(
192            action=action,
193            subjective_state=subjective_state,
194            active_option_id=active_option_id,
195            planning_update=planning_update,
196            created_subtasks=created_subtasks,
197            curation_decision=curation_decision,
198        )
199
200    # ── training loop ─────────────────────────────────────────────────
201
202    def train(
203        self,
204        world: World[ObservationT, ActionT, InfoT],
205        *,  # enforce keyword arguments after for clarity
206        num_episodes: int = 500,
207        average_window: int = 100,
208        solved_threshold: float | None = None,
209        episode_logger: Callable[[int, float, float, Self], None] | None = None,
210    ) -> list[float]:
211        """Run the standard OaK episode loop on the given world.
212
213        Parameters
214        ----------
215        world:
216            An environment implementing the `World` protocol.
217        num_episodes:
218            Maximum number of training episodes.
219        average_window:
220            Number of recent episodes to average for performance tracking.
221        solved_threshold:
222            If set, stop early when the average reward over the last `average_window`
223            episodes reaches this value.
224        episode_logger:
225            Optional callback `(episode, episode_reward, avg_reward, agent)`
226            called after each episode. Use this to own all per-episode logging
227            or other training-side effects at the call site.
228
229        Returns
230        -------
231        list[float]
232            Per-episode reward history.
233        """
234        if average_window < 1:
235            raise ValueError("average_window must be at least 1.")
236
237        reward_history: list[float] = []
238
239        for episode in range(num_episodes):
240            time_step = world.reset()
241            self.reset()
242            episode_reward = 0.0
243
244            while True:
245                result = self.step(time_step)
246
247                if time_step.terminated or time_step.truncated:
248                    break
249
250                time_step = world.step(result.action)
251                episode_reward += time_step.reward
252
253            reward_history.append(episode_reward)
254            recent_window = reward_history[-average_window:]
255            avg_reward = sum(recent_window) / len(recent_window)
256
257            solved = (
258                solved_threshold is not None
259                and len(reward_history) >= average_window
260                and avg_reward >= solved_threshold
261            )
262
263            if episode_logger is not None:
264                episode_logger(episode, episode_reward, avg_reward, self)
265
266            if solved:
267                break
268
269        return reward_history
270
271    # ── private helpers ──────────────────────────────────────────────
272
273    def _build_usage_records(
274        self,
275        ranked_feature_ids: Sequence[FeatureId],
276        active_option_id: OptionId | None,
277    ) -> Sequence[UsageRecord]:
278        """Build minimal utility-accounting observations for the current step."""
279        usage_records = [
280            UsageRecord(ComponentKind.FEATURE, feature_id)
281            for feature_id in ranked_feature_ids
282        ]
283        if active_option_id is not None:
284            usage_records.append(UsageRecord(ComponentKind.OPTION, active_option_id))
285        return tuple(usage_records)
286
287    def _apply_curation(self, decision: CurationDecision) -> None:
288        """Dispatch curation decisions to the relevant modules."""
289        if decision.drop_features:
290            self.perception.remove_features(decision.drop_features)
291        if decision.drop_subtasks:
292            self.reactive_policy.remove_subtasks(decision.drop_subtasks)
293        if decision.drop_options:
294            self.reactive_policy.remove_options(decision.drop_options)
295        if decision.drop_option_models:
296            self.transition_model.remove_option_models(decision.drop_option_models)
297        if decision.drop_general_value_functions:
298            self.value_function.remove(decision.drop_general_value_functions)

Coordinates one full OaK step across the four modules.

The agent is a wiring object: you provide concrete implementations of Perception, TransitionModel, ValueFunction, and ReactivePolicy, and OaKAgent ensures they are called in a consistent order.

OaKAgent( perception: 'Perception[ObservationT, ActionT, SubjectiveStateT]', transition_model: 'TransitionModel[SubjectiveStateT, ActionT, InfoT]', value_function: 'ValueFunction[SubjectiveStateT, ActionT, InfoT]', reactive_policy: 'ReactivePolicy[SubjectiveStateT, ActionT, InfoT]', planning_budget: 'int' = 4, feature_budget: 'int' = 4, option_stop_threshold: 'float' = 0.5)
73    def __init__(
74        self,
75        perception: Perception[ObservationT, ActionT, SubjectiveStateT],
76        transition_model: TransitionModel[SubjectiveStateT, ActionT, InfoT],
77        value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT],
78        reactive_policy: ReactivePolicy[SubjectiveStateT, ActionT, InfoT],
79        planning_budget: int = 4,
80        feature_budget: int = 4,
81        option_stop_threshold: float = 0.5,
82    ):
83        self.perception = perception
84        self.transition_model = transition_model
85        self.value_function = value_function
86        self.reactive_policy = reactive_policy
87        self.planning_budget = planning_budget
88        self.feature_budget = feature_budget
89        self.option_stop_threshold = option_stop_threshold
90        self.last_action = None
91        self.last_subjective_state = None
perception: 'Perception[ObservationT, ActionT, SubjectiveStateT]'
transition_model: 'TransitionModel[SubjectiveStateT, ActionT, InfoT]'
value_function: 'ValueFunction[SubjectiveStateT, ActionT, InfoT]'
reactive_policy: 'ReactivePolicy[SubjectiveStateT, ActionT, InfoT]'
planning_budget: 'int' = 4
feature_budget: 'int' = 4
option_stop_threshold: 'float' = 0.5
last_action: 'ActionT | None' = None
last_subjective_state: 'SubjectiveStateT | None' = None
def reset(self) -> 'None':
102    def reset(self) -> None:
103        """Clear transient execution memory."""
104        self.perception.reset()
105        self.reactive_policy.clear_active_option()
106        self.last_action = None
107        self.last_subjective_state = None

Clear transient execution memory.

def step( self, time_step: 'TimeStep[ObservationT, InfoT]') -> 'AgentStepResult[ActionT, SubjectiveStateT]':
109    def step(
110        self, time_step: TimeStep[ObservationT, InfoT]
111    ) -> AgentStepResult[ActionT, SubjectiveStateT]:
112        """Run one temporally uniform agent step.
113
114        The step follows six phases: perceive, learn, grow, plan, act, maintain.
115        """
116
117        # ================== 1. Perceive ================= #
118        subjective_state = self.perception.update(
119            observation=time_step.observation,
120            reward=time_step.reward,
121            last_action=self.last_action,
122        )
123
124        created_subtasks: Sequence[SubtaskSpec] = ()
125        ranked_feature_ids: Sequence[FeatureId] = ()
126        planning_update: PlanningUpdate[ActionT] | None = None
127        curation_decision: CurationDecision | None = None
128
129        # ================== 2. Learn ================== #
130        if self.last_subjective_state is not None and self.last_action is not None:
131            transition = Transition(
132                subjective_state=self.last_subjective_state,
133                action=self.last_action,
134                reward=time_step.reward,
135                next_subjective_state=subjective_state,
136                terminated=time_step.terminated or time_step.truncated,
137                info=time_step.info,
138            )
139            td_errors = self.value_function.update(transition)
140            self.reactive_policy.update(transition, td_errors)
141            self.transition_model.update(transition)
142
143            # Meta step-size adaptation (Sutton's IDBD / online cross-validation)
144            meta_signals = dict(td_errors)
145            meta_signals["reward"] = transition.reward
146            self.perception.update_meta(meta_signals)
147            self.value_function.update_meta(meta_signals)
148            self.reactive_policy.update_meta(meta_signals)
149            self.transition_model.update_meta(meta_signals)
150
151        # ================== 3. Grow ================== #
152        ranked_feature_ids = self.perception.discover_and_rank_features(
153            subjective_state,
154            self.value_function.utility_scores(),
155            self.feature_budget,
156        )
157        if ranked_feature_ids:
158            created_subtasks = self.perception.generate_subtasks(ranked_feature_ids)
159            if created_subtasks:
160                self.reactive_policy.ingest_subtasks(created_subtasks)
161
162        self.reactive_policy.integrate_options()
163        self.transition_model.integrate_option_models()
164
165        # ================== 4. Plan ================== #
166        planning_update = self.transition_model.plan(
167            subjective_state, self.value_function, self.planning_budget
168        )
169        self.reactive_policy.apply_planning_update(planning_update)
170
171        # ================== 5. Act ================== #
172        action, active_option_id = self.reactive_policy.select_action(
173            subjective_state, self.option_stop_threshold
174        )
175
176        # ================== 6. Maintain ================== #
177        usage_records = self._build_usage_records(ranked_feature_ids, active_option_id)
178        if usage_records:
179            self.value_function.observe_usage(usage_records)
180
181        curation_decision = self.value_function.curate()
182        self._apply_curation(curation_decision)
183
184        # ================== Update Memory ================== #
185        self.last_subjective_state = subjective_state
186        self.last_action = action
187
188        if time_step.terminated or time_step.truncated:
189            self.reactive_policy.clear_active_option()
190
191        return AgentStepResult(
192            action=action,
193            subjective_state=subjective_state,
194            active_option_id=active_option_id,
195            planning_update=planning_update,
196            created_subtasks=created_subtasks,
197            curation_decision=curation_decision,
198        )

Run one temporally uniform agent step.

The step follows six phases: perceive, learn, grow, plan, act, maintain.

def train( self, world: 'World[ObservationT, ActionT, InfoT]', *, num_episodes: 'int' = 500, average_window: 'int' = 100, solved_threshold: 'float | None' = None, episode_logger: 'Callable[[int, float, float, Self], None] | None' = None) -> 'list[float]':
202    def train(
203        self,
204        world: World[ObservationT, ActionT, InfoT],
205        *,  # enforce keyword arguments after for clarity
206        num_episodes: int = 500,
207        average_window: int = 100,
208        solved_threshold: float | None = None,
209        episode_logger: Callable[[int, float, float, Self], None] | None = None,
210    ) -> list[float]:
211        """Run the standard OaK episode loop on the given world.
212
213        Parameters
214        ----------
215        world:
216            An environment implementing the `World` protocol.
217        num_episodes:
218            Maximum number of training episodes.
219        average_window:
220            Number of recent episodes to average for performance tracking.
221        solved_threshold:
222            If set, stop early when the average reward over the last `average_window`
223            episodes reaches this value.
224        episode_logger:
225            Optional callback `(episode, episode_reward, avg_reward, agent)`
226            called after each episode. Use this to own all per-episode logging
227            or other training-side effects at the call site.
228
229        Returns
230        -------
231        list[float]
232            Per-episode reward history.
233        """
234        if average_window < 1:
235            raise ValueError("average_window must be at least 1.")
236
237        reward_history: list[float] = []
238
239        for episode in range(num_episodes):
240            time_step = world.reset()
241            self.reset()
242            episode_reward = 0.0
243
244            while True:
245                result = self.step(time_step)
246
247                if time_step.terminated or time_step.truncated:
248                    break
249
250                time_step = world.step(result.action)
251                episode_reward += time_step.reward
252
253            reward_history.append(episode_reward)
254            recent_window = reward_history[-average_window:]
255            avg_reward = sum(recent_window) / len(recent_window)
256
257            solved = (
258                solved_threshold is not None
259                and len(reward_history) >= average_window
260                and avg_reward >= solved_threshold
261            )
262
263            if episode_logger is not None:
264                episode_logger(episode, episode_reward, avg_reward, self)
265
266            if solved:
267                break
268
269        return reward_history

Run the standard OaK episode loop on the given world.

Parameters

world: An environment implementing the World protocol. num_episodes: Maximum number of training episodes. average_window: Number of recent episodes to average for performance tracking. solved_threshold: If set, stop early when the average reward over the last average_window episodes reaches this value. episode_logger: Optional callback (episode, episode_reward, avg_reward, agent) called after each episode. Use this to own all per-episode logging or other training-side effects at the call site.

Returns

list[float] Per-episode reward history.

@runtime_checkable
class World(typing.Protocol[~ObservationT, ~ActionT, ~InfoT]):
59@runtime_checkable
60class World(Protocol[ObservationT, ActionT, InfoT]):
61    """Minimal environment protocol.
62
63    A `World` may wrap a simulator, a benchmark environment, or a custom
64    continual data source.  The protocol is intentionally small so the
65    package does not depend on a specific environment library.
66
67    Implement this protocol for any environment you want to use with
68    `OaKAgent.train()`.
69    """
70
71    def reset(self) -> TimeStep[ObservationT, InfoT]: ...
72
73    def step(self, action: ActionT) -> TimeStep[ObservationT, InfoT]: ...
74
75    def close(self) -> None:
76        """Release environment resources.  Default is a no-op."""
77        ...

Minimal environment protocol.

A World may wrap a simulator, a benchmark environment, or a custom continual data source. The protocol is intentionally small so the package does not depend on a specific environment library.

Implement this protocol for any environment you want to use with OaKAgent.train().

World(*args, **kwargs)
1957def _no_init_or_replace_init(self, *args, **kwargs):
1958    cls = type(self)
1959
1960    if cls._is_protocol:
1961        raise TypeError('Protocols cannot be instantiated')
1962
1963    # Already using a custom `__init__`. No need to calculate correct
1964    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1965    if cls.__init__ is not _no_init_or_replace_init:
1966        return
1967
1968    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1969    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1970    # searches for a proper new `__init__` in the MRO. The new `__init__`
1971    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1972    # instantiation of the protocol subclass will thus use the new
1973    # `__init__` and no longer call `_no_init_or_replace_init`.
1974    for base in cls.__mro__:
1975        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1976        if init is not _no_init_or_replace_init:
1977            cls.__init__ = init
1978            break
1979    else:
1980        # should not happen
1981        cls.__init__ = object.__init__
1982
1983    cls.__init__(self, *args, **kwargs)
def reset(self) -> 'TimeStep[ObservationT, InfoT]':
71    def reset(self) -> TimeStep[ObservationT, InfoT]: ...
def step(self, action: 'ActionT') -> 'TimeStep[ObservationT, InfoT]':
73    def step(self, action: ActionT) -> TimeStep[ObservationT, InfoT]: ...
def close(self) -> 'None':
75    def close(self) -> None:
76        """Release environment resources.  Default is a no-op."""
77        ...

Release environment resources. Default is a no-op.

@dataclass(slots=True, frozen=True)
class AgentStepResult(typing.Generic[~ActionT, ~SubjectiveStateT]):
245@dataclass(slots=True, frozen=True)
246class AgentStepResult(Generic[ActionT, SubjectiveStateT]):
247    """Observable result of one OaK agent step.
248
249    This is the compact object a caller receives after stepping the agent. It
250    includes the primitive action actually executed, the current subjective
251    state, and any structures or planning signals created during that step.
252    """
253
254    action: ActionT
255    subjective_state: SubjectiveStateT
256    active_option_id: OptionId | None = None
257    planning_update: PlanningUpdate[ActionT] | None = None
258    created_subtasks: Sequence[SubtaskSpec] = field(default_factory=tuple)
259    curation_decision: CurationDecision | None = None

Observable result of one OaK agent step.

This is the compact object a caller receives after stepping the agent. It includes the primitive action actually executed, the current subjective state, and any structures or planning signals created during that step.

AgentStepResult( action: 'ActionT', subjective_state: 'SubjectiveStateT', active_option_id: 'OptionId | None' = None, planning_update: 'PlanningUpdate[ActionT] | None' = None, created_subtasks: 'Sequence[SubtaskSpec]' = <factory>, curation_decision: 'CurationDecision | None' = None)
action: 'ActionT'
subjective_state: 'SubjectiveStateT'
active_option_id: 'OptionId | None'
planning_update: 'PlanningUpdate[ActionT] | None'
created_subtasks: 'Sequence[SubtaskSpec]'
curation_decision: 'CurationDecision | None'
class ComponentKind(builtins.str, enum.Enum):
60class ComponentKind(str, Enum):
61    """Kinds of learnable or managed elements in the architecture."""
62
63    FEATURE = "feature"
64    SUBTASK = "subtask"
65    OPTION = "option"
66    VALUE_FUNCTION = "value_function"
67    OPTION_MODEL = "option_model"
68    TRANSITION_MODEL = "transition_model"
69    POLICY = "policy"
70    PERCEPTION = "perception"
71    PLANNER = "planner"

Kinds of learnable or managed elements in the architecture.

FEATURE = <ComponentKind.FEATURE: 'feature'>
SUBTASK = <ComponentKind.SUBTASK: 'subtask'>
OPTION = <ComponentKind.OPTION: 'option'>
VALUE_FUNCTION = <ComponentKind.VALUE_FUNCTION: 'value_function'>
OPTION_MODEL = <ComponentKind.OPTION_MODEL: 'option_model'>
TRANSITION_MODEL = <ComponentKind.TRANSITION_MODEL: 'transition_model'>
POLICY = <ComponentKind.POLICY: 'policy'>
PERCEPTION = <ComponentKind.PERCEPTION: 'perception'>
PLANNER = <ComponentKind.PLANNER: 'planner'>
@dataclass(slots=True, frozen=True)
class CurationDecision:
231@dataclass(slots=True, frozen=True)
232class CurationDecision:
233    """Pruning decision returned by the curator."""
234
235    drop_features: Sequence[FeatureId] = field(default_factory=tuple)
236    drop_subtasks: Sequence[SubtaskId] = field(default_factory=tuple)
237    drop_options: Sequence[OptionId] = field(default_factory=tuple)
238    drop_option_models: Sequence[OptionId] = field(default_factory=tuple)
239    drop_general_value_functions: Sequence[GeneralValueFunctionId] = field(
240        default_factory=tuple
241    )
242    notes: StructuredPayload = field(default_factory=dict)

Pruning decision returned by the curator.

CurationDecision( drop_features: 'Sequence[FeatureId]' = <factory>, drop_subtasks: 'Sequence[SubtaskId]' = <factory>, drop_options: 'Sequence[OptionId]' = <factory>, drop_option_models: 'Sequence[OptionId]' = <factory>, drop_general_value_functions: 'Sequence[GeneralValueFunctionId]' = <factory>, notes: 'StructuredPayload' = <factory>)
drop_features: 'Sequence[FeatureId]'
drop_subtasks: 'Sequence[SubtaskId]'
drop_options: 'Sequence[OptionId]'
drop_option_models: 'Sequence[OptionId]'
drop_general_value_functions: 'Sequence[GeneralValueFunctionId]'
notes: 'StructuredPayload'
@dataclass(slots=True, frozen=True)
class FeatureCandidate:
129@dataclass(slots=True, frozen=True)
130class FeatureCandidate:
131    """A proposed feature that may be admitted into the feature bank."""
132
133    feature_id: FeatureId
134    name: str
135    origin: str
136    description: str = ""
137    metadata: OpenPayload = field(default_factory=dict)

A proposed feature that may be admitted into the feature bank.

FeatureCandidate( feature_id: 'FeatureId', name: 'str', origin: 'str', description: 'str' = '', metadata: 'OpenPayload' = <factory>)
feature_id: 'FeatureId'
name: 'str'
origin: 'str'
description: 'str'
metadata: 'OpenPayload'
@dataclass(slots=True, frozen=True)
class FeatureSpec:
119@dataclass(slots=True, frozen=True)
120class FeatureSpec:
121    """Metadata describing a feature tracked by the agent."""
122
123    feature_id: FeatureId
124    name: str
125    description: str = ""
126    metadata: OpenPayload = field(default_factory=dict)

Metadata describing a feature tracked by the agent.

FeatureSpec( feature_id: 'FeatureId', name: 'str', description: 'str' = '', metadata: 'OpenPayload' = <factory>)
feature_id: 'FeatureId'
name: 'str'
description: 'str'
metadata: 'OpenPayload'
@dataclass(slots=True, frozen=True)
class GeneralValueFunctionSpec(typing.Generic[~ActionT, ~SubjectiveStateT, ~InfoT]):
140@dataclass(slots=True, frozen=True)
141class GeneralValueFunctionSpec(Generic[ActionT, SubjectiveStateT, InfoT]):
142    """General value function specification."""
143
144    general_value_function_id: GeneralValueFunctionId
145    name: str
146    cumulant: ScalarSignal
147    continuation: ContinuationFn
148    termination_value: TerminationValueFn
149    metadata: OpenPayload = field(default_factory=dict)

General value function specification.

GeneralValueFunctionSpec( general_value_function_id: 'GeneralValueFunctionId', name: 'str', cumulant: 'ScalarSignal', continuation: 'ContinuationFn', termination_value: 'TerminationValueFn', metadata: 'OpenPayload' = <factory>)
general_value_function_id: 'GeneralValueFunctionId'
name: 'str'
cumulant: 'ScalarSignal'
continuation: 'ContinuationFn'
termination_value: 'TerminationValueFn'
metadata: 'OpenPayload'
@dataclass(slots=True, frozen=True)
class ModelPrediction(typing.Generic[~SubjectiveStateT]):
191@dataclass(slots=True, frozen=True)
192class ModelPrediction(Generic[SubjectiveStateT]):
193    """Prediction returned by an action or option model."""
194
195    predicted_subjective_state: SubjectiveStateT
196    cumulative_reward: float
197    steps: int | None = None
198    terminated: bool = False
199    metadata: OpenPayload = field(default_factory=dict)

Prediction returned by an action or option model.

ModelPrediction( predicted_subjective_state: 'SubjectiveStateT', cumulative_reward: 'float', steps: 'int | None' = None, terminated: 'bool' = False, metadata: 'OpenPayload' = <factory>)
predicted_subjective_state: 'SubjectiveStateT'
cumulative_reward: 'float'
steps: 'int | None'
terminated: 'bool'
metadata: 'OpenPayload'
@dataclass(slots=True, frozen=True)
class OptionDescriptor:
164@dataclass(slots=True, frozen=True)
165class OptionDescriptor:
166    """Lightweight metadata for an option."""
167
168    option_id: OptionId
169    name: str
170    subtask_id: SubtaskId | None = None
171    metadata: OpenPayload = field(default_factory=dict)

Lightweight metadata for an option.

OptionDescriptor( option_id: 'OptionId', name: 'str', subtask_id: 'SubtaskId | None' = None, metadata: 'OpenPayload' = <factory>)
option_id: 'OptionId'
name: 'str'
subtask_id: 'SubtaskId | None'
metadata: 'OpenPayload'
@dataclass(slots=True, frozen=True)
class PlanningUpdate(typing.Generic[~ActionT]):
202@dataclass(slots=True, frozen=True)
203class PlanningUpdate(Generic[ActionT]):
204    """Outputs from one planning pass."""
205
206    value_targets: Mapping[GeneralValueFunctionId, float] = field(default_factory=dict)
207    policy_targets: StructuredPayload = field(default_factory=dict)
208    search_statistics: StructuredPayload = field(default_factory=dict)

Outputs from one planning pass.

PlanningUpdate( value_targets: 'Mapping[GeneralValueFunctionId, float]' = <factory>, policy_targets: 'StructuredPayload' = <factory>, search_statistics: 'StructuredPayload' = <factory>)
value_targets: 'Mapping[GeneralValueFunctionId, float]'
policy_targets: 'StructuredPayload'
search_statistics: 'StructuredPayload'
@dataclass(slots=True, frozen=True)
class PolicyDecision(typing.Generic[~ActionT]):
174@dataclass(slots=True, frozen=True)
175class PolicyDecision(Generic[ActionT]):
176    """Return type for reactive policy selection."""
177
178    action: ActionT | None = None
179    option_id: OptionId | None = None
180    metadata: OpenPayload = field(default_factory=dict)
181
182    def __post_init__(self) -> None:
183        has_action = self.action is not None
184        has_option = self.option_id is not None
185        if has_action == has_option:
186            raise ValueError(
187                "PolicyDecision requires exactly one of action or option_id."
188            )

Return type for reactive policy selection.

PolicyDecision( action: 'ActionT | None' = None, option_id: 'OptionId | None' = None, metadata: 'OpenPayload' = <factory>)
action: 'ActionT | None'
option_id: 'OptionId | None'
metadata: 'OpenPayload'
@dataclass(slots=True, frozen=True)
class SubtaskSpec:
152@dataclass(slots=True, frozen=True)
153class SubtaskSpec:
154    """A feature-grounded subtask description."""
155
156    subtask_id: SubtaskId
157    name: str
158    feature_id: FeatureId
159    intensity: float = 1.0
160    general_value_function_id: GeneralValueFunctionId | None = None
161    metadata: OpenPayload = field(default_factory=dict)

A feature-grounded subtask description.

SubtaskSpec( subtask_id: 'SubtaskId', name: 'str', feature_id: 'FeatureId', intensity: 'float' = 1.0, general_value_function_id: 'GeneralValueFunctionId | None' = None, metadata: 'OpenPayload' = <factory>)
subtask_id: 'SubtaskId'
name: 'str'
feature_id: 'FeatureId'
intensity: 'float'
general_value_function_id: 'GeneralValueFunctionId | None'
metadata: 'OpenPayload'
@dataclass(slots=True, frozen=True)
class TimeStep(typing.Generic[~ObservationT, ~InfoT]):
74@dataclass(slots=True, frozen=True)
75class TimeStep(Generic[ObservationT, InfoT]):
76    """One environment emission seen by the agent.
77
78    `TimeStep` is the object passed into `OaKAgent.step(...)`. It contains the
79    raw observation, scalar reward, episode-control flags, and optional
80    environment metadata.
81    """
82
83    observation: ObservationT
84    reward: float
85    terminated: bool = False
86    truncated: bool = False
87    info: InfoT | None = None

One environment emission seen by the agent.

TimeStep is the object passed into OaKAgent.step(...). It contains the raw observation, scalar reward, episode-control flags, and optional environment metadata.

TimeStep( observation: 'ObservationT', reward: 'float', terminated: 'bool' = False, truncated: 'bool' = False, info: 'InfoT | None' = None)
observation: 'ObservationT'
reward: 'float'
terminated: 'bool'
truncated: 'bool'
info: 'InfoT | None'
@dataclass(slots=True, frozen=True)
class Transition(typing.Generic[~ActionT, ~SubjectiveStateT, ~InfoT]):
 90@dataclass(slots=True, frozen=True)
 91class Transition(Generic[ActionT, SubjectiveStateT, InfoT]):
 92    """One subjective-state transition in agent terms.
 93
 94    `Transition` is constructed by the agent after two consecutive time steps.
 95    Learners use it instead of the raw world stream so they can access both the
 96    previous and next subjective state representations together with reward,
 97    termination, and optional environment metadata.
 98    """
 99
100    subjective_state: SubjectiveStateT
101    action: ActionT
102    reward: float
103    next_subjective_state: SubjectiveStateT
104    terminated: bool = False
105    info: InfoT | None = None

One subjective-state transition in agent terms.

Transition is constructed by the agent after two consecutive time steps. Learners use it instead of the raw world stream so they can access both the previous and next subjective state representations together with reward, termination, and optional environment metadata.

Transition( subjective_state: 'SubjectiveStateT', action: 'ActionT', reward: 'float', next_subjective_state: 'SubjectiveStateT', terminated: 'bool' = False, info: 'InfoT | None' = None)
subjective_state: 'SubjectiveStateT'
action: 'ActionT'
reward: 'float'
next_subjective_state: 'SubjectiveStateT'
terminated: 'bool'
info: 'InfoT | None'
@dataclass(slots=True, frozen=True)
class UsageRecord:
211@dataclass(slots=True, frozen=True)
212class UsageRecord:
213    """Usage evidence gathered for utility assessment."""
214
215    kind: ComponentKind
216    component_id: ComponentId
217    amount: float = 1.0
218    metadata: OpenPayload = field(default_factory=dict)

Usage evidence gathered for utility assessment.

UsageRecord( kind: 'ComponentKind', component_id: 'ComponentId', amount: 'float' = 1.0, metadata: 'OpenPayload' = <factory>)
kind: 'ComponentKind'
component_id: 'ComponentId'
amount: 'float'
metadata: 'OpenPayload'
@dataclass(slots=True, frozen=True)
class UtilityRecord:
221@dataclass(slots=True, frozen=True)
222class UtilityRecord:
223    """Utility score for one architectural element."""
224
225    kind: ComponentKind
226    component_id: ComponentId
227    utility: float
228    evidence: StructuredPayload = field(default_factory=dict)

Utility score for one architectural element.

UtilityRecord( kind: 'ComponentKind', component_id: 'ComponentId', utility: 'float', evidence: 'StructuredPayload' = <factory>)
kind: 'ComponentKind'
component_id: 'ComponentId'
utility: 'float'
evidence: 'StructuredPayload'