oak_architecture.fine_grained

Optional fine-grained OaK building blocks and composites.

The default public surface of OaK is the four main interfaces in oak_architecture.interfaces together with OaKAgent.

This subpackage exposes a more detailed assembly layer for projects that want to swap internal pieces such as a planner, world model, or feature constructor independently.

 1"""Optional fine-grained OaK building blocks and composites.
 2
 3The default public surface of OaK is the four main interfaces in
 4`oak_architecture.interfaces` together with `OaKAgent`.
 5
 6This subpackage exposes a more detailed assembly layer for projects that want
 7to swap internal pieces such as a planner, world model, or feature constructor
 8independently.
 9"""
10
11from .composites import (
12    CompositePerception,
13    CompositeReactivePolicy,
14    CompositeTransitionModel,
15    CompositeValueFunction,
16)
17from .components import (
18    ActionSelector,
19    Curator,
20    FeatureBank,
21    FeatureConstructor,
22    FeatureRanker,
23    GeneralValueFunctionLearner,
24    MetaStepSizeLearner,
25    Option,
26    OptionKeyboard,
27    OptionLearner,
28    OptionLibrary,
29    OptionModel,
30    OptionModelLearner,
31    Planner,
32    StateBuilder,
33    SubtaskGenerator,
34    UtilityAssessor,
35    ValueEstimator,
36    WorldModel,
37)
38
39__all__ = [
40    "CompositePerception",
41    "CompositeTransitionModel",
42    "CompositeValueFunction",
43    "CompositeReactivePolicy",
44    "ActionSelector",
45    "Curator",
46    "FeatureBank",
47    "FeatureConstructor",
48    "FeatureRanker",
49    "GeneralValueFunctionLearner",
50    "MetaStepSizeLearner",
51    "Option",
52    "OptionKeyboard",
53    "OptionLearner",
54    "OptionLibrary",
55    "OptionModel",
56    "OptionModelLearner",
57    "Planner",
58    "StateBuilder",
59    "SubtaskGenerator",
60    "UtilityAssessor",
61    "ValueEstimator",
62    "WorldModel",
63]
class CompositePerception(oak_architecture.interfaces.Perception[~ObservationT, ~ActionT, ~SubjectiveStateT], typing.Generic[~ObservationT, ~ActionT, ~SubjectiveStateT]):
 66class CompositePerception(
 67    Perception[ObservationT, ActionT, SubjectiveStateT],
 68    Generic[ObservationT, ActionT, SubjectiveStateT],
 69):
 70    """Perception built from fine-grained components.
 71
 72    Components: `StateBuilder`, `FeatureBank`, `FeatureConstructor`,
 73    `FeatureRanker`, `SubtaskGenerator`, and optionally
 74    `MetaStepSizeLearner`.
 75    """
 76
 77    def __init__(
 78        self,
 79        state_builder: StateBuilder[ObservationT, ActionT, SubjectiveStateT],
 80        feature_bank: FeatureBank[SubjectiveStateT],
 81        feature_constructor: FeatureConstructor[SubjectiveStateT],
 82        feature_ranker: FeatureRanker,
 83        subtask_generator: SubtaskGenerator[SubjectiveStateT],
 84        meta_step_sizes: MetaStepSizeLearner | None = None,
 85    ) -> None:
 86        self._state_builder = state_builder
 87        self._feature_bank = feature_bank
 88        self._feature_constructor = feature_constructor
 89        self._feature_ranker = feature_ranker
 90        self._subtask_generator = subtask_generator
 91        self._meta_step_sizes = meta_step_sizes
 92
 93    def reset(self) -> None:
 94        self._state_builder.reset()
 95
 96    def update(
 97        self,
 98        observation: ObservationT,
 99        reward: float,
100        last_action: ActionT | None,
101    ) -> SubjectiveStateT:
102        return self._state_builder.update(observation, reward, last_action)
103
104    def current_subjective_state(self) -> SubjectiveStateT:
105        return self._state_builder.current_subjective_state()
106
107    def discover_and_rank_features(
108        self,
109        subjective_state: SubjectiveStateT,
110        utility_scores: Sequence[UtilityRecord],
111        feature_budget: int,
112    ) -> Sequence[FeatureId]:
113        candidates = self._feature_constructor.propose(
114            subjective_state, self._feature_bank.list_features()
115        )
116        if candidates:
117            self._feature_bank.add_candidates(candidates)
118        return self._feature_ranker.rank(
119            self._feature_bank.list_features(), utility_scores, limit=feature_budget
120        )
121
122    def generate_subtasks(
123        self,
124        ranked_feature_ids: Sequence[FeatureId],
125    ) -> Sequence[SubtaskSpec]:
126        return self._subtask_generator.generate(ranked_feature_ids, self._feature_bank)
127
128    def list_features(self) -> Sequence[FeatureSpec]:
129        return self._feature_bank.list_features()
130
131    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
132        self._feature_bank.remove(feature_ids)
133
134    def update_meta(self, error_signals: Mapping[str, float]) -> None:
135        if self._meta_step_sizes is not None:
136            self._meta_step_sizes.update(error_signals)

Perception built from fine-grained components.

Components: StateBuilder, FeatureBank, FeatureConstructor, FeatureRanker, SubtaskGenerator, and optionally MetaStepSizeLearner.

CompositePerception( state_builder: 'StateBuilder[ObservationT, ActionT, SubjectiveStateT]', feature_bank: 'FeatureBank[SubjectiveStateT]', feature_constructor: 'FeatureConstructor[SubjectiveStateT]', feature_ranker: 'FeatureRanker', subtask_generator: 'SubtaskGenerator[SubjectiveStateT]', meta_step_sizes: 'MetaStepSizeLearner | None' = None)
77    def __init__(
78        self,
79        state_builder: StateBuilder[ObservationT, ActionT, SubjectiveStateT],
80        feature_bank: FeatureBank[SubjectiveStateT],
81        feature_constructor: FeatureConstructor[SubjectiveStateT],
82        feature_ranker: FeatureRanker,
83        subtask_generator: SubtaskGenerator[SubjectiveStateT],
84        meta_step_sizes: MetaStepSizeLearner | None = None,
85    ) -> None:
86        self._state_builder = state_builder
87        self._feature_bank = feature_bank
88        self._feature_constructor = feature_constructor
89        self._feature_ranker = feature_ranker
90        self._subtask_generator = subtask_generator
91        self._meta_step_sizes = meta_step_sizes
def reset(self) -> 'None':
93    def reset(self) -> None:
94        self._state_builder.reset()

Reset all perception state for a new episode.

def update( self, observation: 'ObservationT', reward: 'float', last_action: 'ActionT | None') -> 'SubjectiveStateT':
 96    def update(
 97        self,
 98        observation: ObservationT,
 99        reward: float,
100        last_action: ActionT | None,
101    ) -> SubjectiveStateT:
102        return self._state_builder.update(observation, reward, last_action)

Process a new observation and return the updated subjective state.

def current_subjective_state(self) -> 'SubjectiveStateT':
104    def current_subjective_state(self) -> SubjectiveStateT:
105        return self._state_builder.current_subjective_state()

Return the most recently computed subjective state.

def discover_and_rank_features( self, subjective_state: 'SubjectiveStateT', utility_scores: 'Sequence[UtilityRecord]', feature_budget: 'int') -> 'Sequence[FeatureId]':
107    def discover_and_rank_features(
108        self,
109        subjective_state: SubjectiveStateT,
110        utility_scores: Sequence[UtilityRecord],
111        feature_budget: int,
112    ) -> Sequence[FeatureId]:
113        candidates = self._feature_constructor.propose(
114            subjective_state, self._feature_bank.list_features()
115        )
116        if candidates:
117            self._feature_bank.add_candidates(candidates)
118        return self._feature_ranker.rank(
119            self._feature_bank.list_features(), utility_scores, limit=feature_budget
120        )

Propose new features, integrate them, and return the top-ranked IDs.

A typical implementation:

  1. Proposes candidate features from the current subjective state.
  2. Adds accepted candidates to its internal feature store.
  3. Ranks all features using the provided utility scores.
  4. Returns the top feature IDs (up to feature_budget).
def generate_subtasks( self, ranked_feature_ids: 'Sequence[FeatureId]') -> 'Sequence[SubtaskSpec]':
122    def generate_subtasks(
123        self,
124        ranked_feature_ids: Sequence[FeatureId],
125    ) -> Sequence[SubtaskSpec]:
126        return self._subtask_generator.generate(ranked_feature_ids, self._feature_bank)

Turn ranked feature IDs into subtask specifications.

def list_features(self) -> 'Sequence[FeatureSpec]':
128    def list_features(self) -> Sequence[FeatureSpec]:
129        return self._feature_bank.list_features()

Return all currently tracked features.

def remove_features(self, feature_ids: 'Sequence[FeatureId]') -> 'None':
131    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
132        self._feature_bank.remove(feature_ids)

Remove features by ID (called during curation).

def update_meta(self, error_signals: 'Mapping[str, float]') -> 'None':
134    def update_meta(self, error_signals: Mapping[str, float]) -> None:
135        if self._meta_step_sizes is not None:
136            self._meta_step_sizes.update(error_signals)

Adapt internal per-weight step sizes given error signals.

Parameters

error_signals: Named scalar error signals from the current learning step, e.g. {"main_td_error": 0.05, "reward": 1.0}. Implementations pick the signals they need and ignore the rest.

class CompositeTransitionModel(oak_architecture.interfaces.TransitionModel[~SubjectiveStateT, ~ActionT, ~InfoT], typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
253class CompositeTransitionModel(
254    TransitionModel[SubjectiveStateT, ActionT, InfoT],
255    Generic[SubjectiveStateT, ActionT, InfoT],
256):
257    """TransitionModel built from fine-grained components.
258
259    Components: `WorldModel`, `OptionModelLearner`, `Planner`, and
260    optionally `MetaStepSizeLearner`.
261    """
262
263    def __init__(
264        self,
265        world_model: WorldModel[SubjectiveStateT, ActionT, InfoT],
266        option_model_learner: OptionModelLearner[SubjectiveStateT, ActionT, InfoT],
267        planner: Planner[SubjectiveStateT, ActionT, InfoT],
268        meta_step_sizes: MetaStepSizeLearner | None = None,
269    ) -> None:
270        self._world_model = world_model
271        self._option_model_learner = option_model_learner
272        self._planner = planner
273        self._meta_step_sizes = meta_step_sizes
274
275    def update(
276        self,
277        transition: Transition[ActionT, SubjectiveStateT, InfoT],
278    ) -> None:
279        self._world_model.update(transition)
280        self._option_model_learner.update(transition)
281
282    def integrate_option_models(self) -> None:
283        models = self._option_model_learner.export_models()
284        self._world_model.add_or_replace_option_models(models)
285
286    def plan(
287        self,
288        subjective_state: SubjectiveStateT,
289        value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT],
290        budget: int,
291    ) -> PlanningUpdate[ActionT]:
292        adapter = _ValueEstimatorAdapter(value_function)
293        return self._planner.plan_step(
294            subjective_state, self._world_model, adapter, budget
295        )
296
297    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
298        self._world_model.remove_option_models(option_ids)
299
300    def update_meta(self, error_signals: Mapping[str, float]) -> None:
301        if self._meta_step_sizes is not None:
302            self._meta_step_sizes.update(error_signals)

TransitionModel built from fine-grained components.

Components: WorldModel, OptionModelLearner, Planner, and optionally MetaStepSizeLearner.

CompositeTransitionModel( world_model: 'WorldModel[SubjectiveStateT, ActionT, InfoT]', option_model_learner: 'OptionModelLearner[SubjectiveStateT, ActionT, InfoT]', planner: 'Planner[SubjectiveStateT, ActionT, InfoT]', meta_step_sizes: 'MetaStepSizeLearner | None' = None)
263    def __init__(
264        self,
265        world_model: WorldModel[SubjectiveStateT, ActionT, InfoT],
266        option_model_learner: OptionModelLearner[SubjectiveStateT, ActionT, InfoT],
267        planner: Planner[SubjectiveStateT, ActionT, InfoT],
268        meta_step_sizes: MetaStepSizeLearner | None = None,
269    ) -> None:
270        self._world_model = world_model
271        self._option_model_learner = option_model_learner
272        self._planner = planner
273        self._meta_step_sizes = meta_step_sizes
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'None':
275    def update(
276        self,
277        transition: Transition[ActionT, SubjectiveStateT, InfoT],
278    ) -> None:
279        self._world_model.update(transition)
280        self._option_model_learner.update(transition)

Learn from an observed transition.

This should update both the world model and any option-model learners.

def integrate_option_models(self) -> 'None':
282    def integrate_option_models(self) -> None:
283        models = self._option_model_learner.export_models()
284        self._world_model.add_or_replace_option_models(models)

Export learned option models and integrate them into the world model.

Called after option learning so that planning reasons over fresh models.

def plan( self, subjective_state: 'SubjectiveStateT', value_function: 'ValueFunction[SubjectiveStateT, ActionT, InfoT]', budget: 'int') -> 'PlanningUpdate[ActionT]':
286    def plan(
287        self,
288        subjective_state: SubjectiveStateT,
289        value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT],
290        budget: int,
291    ) -> PlanningUpdate[ActionT]:
292        adapter = _ValueEstimatorAdapter(value_function)
293        return self._planner.plan_step(
294            subjective_state, self._world_model, adapter, budget
295        )

Run bounded planning and return improvement signals.

The planner uses the internal world model together with the supplied value_function (for state evaluation) to produce value targets, policy targets, or search statistics.

def remove_option_models(self, option_ids: 'Sequence[OptionId]') -> 'None':
297    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
298        self._world_model.remove_option_models(option_ids)

Remove option models by ID (called during curation).

def update_meta(self, error_signals: 'Mapping[str, float]') -> 'None':
300    def update_meta(self, error_signals: Mapping[str, float]) -> None:
301        if self._meta_step_sizes is not None:
302            self._meta_step_sizes.update(error_signals)

Adapt internal per-weight step sizes given error signals.

Parameters

error_signals: Named scalar error signals from the current learning step, e.g. {"main_td_error": 0.05, "reward": 1.0}. Implementations pick the signals they need and ignore the rest.

class CompositeValueFunction(oak_architecture.interfaces.ValueFunction[~SubjectiveStateT, ~ActionT, ~InfoT], typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
144class CompositeValueFunction(
145    ValueFunction[SubjectiveStateT, ActionT, InfoT],
146    Generic[SubjectiveStateT, ActionT, InfoT],
147):
148    """ValueFunction built from fine-grained components.
149
150    Components: `ValueEstimator`, `UtilityAssessor`, `Curator`,
151    and optionally `MetaStepSizeLearner`.
152    """
153
154    def __init__(
155        self,
156        value_estimator: ValueEstimator[SubjectiveStateT, ActionT, InfoT],
157        utility_assessor: UtilityAssessor,
158        curator: Curator,
159        meta_step_sizes: MetaStepSizeLearner | None = None,
160    ) -> None:
161        self._value_estimator = value_estimator
162        self._utility_assessor = utility_assessor
163        self._curator = curator
164        self._meta_step_sizes = meta_step_sizes
165
166    def update(
167        self,
168        transition: Transition[ActionT, SubjectiveStateT, InfoT],
169    ) -> Mapping[GeneralValueFunctionId, float]:
170        return self._value_estimator.update(transition)
171
172    def predict(
173        self,
174        subjective_state: SubjectiveStateT,
175    ) -> Mapping[GeneralValueFunctionId, float]:
176        return self._value_estimator.predict(subjective_state)
177
178    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
179        self._utility_assessor.observe(usage_records)
180
181    def utility_scores(self) -> Sequence[UtilityRecord]:
182        return self._utility_assessor.scores()
183
184    def curate(self) -> CurationDecision:
185        scores = self._utility_assessor.scores()
186        if not scores:
187            return CurationDecision()
188        return self._curator.curate(scores)
189
190    def remove(
191        self,
192        general_value_function_ids: Sequence[GeneralValueFunctionId],
193    ) -> None:
194        self._value_estimator.remove(general_value_function_ids)
195
196    def update_meta(self, error_signals: Mapping[str, float]) -> None:
197        if self._meta_step_sizes is not None:
198            self._meta_step_sizes.update(error_signals)

ValueFunction built from fine-grained components.

Components: ValueEstimator, UtilityAssessor, Curator, and optionally MetaStepSizeLearner.

CompositeValueFunction( value_estimator: 'ValueEstimator[SubjectiveStateT, ActionT, InfoT]', utility_assessor: 'UtilityAssessor', curator: 'Curator', meta_step_sizes: 'MetaStepSizeLearner | None' = None)
154    def __init__(
155        self,
156        value_estimator: ValueEstimator[SubjectiveStateT, ActionT, InfoT],
157        utility_assessor: UtilityAssessor,
158        curator: Curator,
159        meta_step_sizes: MetaStepSizeLearner | None = None,
160    ) -> None:
161        self._value_estimator = value_estimator
162        self._utility_assessor = utility_assessor
163        self._curator = curator
164        self._meta_step_sizes = meta_step_sizes
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'Mapping[GeneralValueFunctionId, float]':
166    def update(
167        self,
168        transition: Transition[ActionT, SubjectiveStateT, InfoT],
169    ) -> Mapping[GeneralValueFunctionId, float]:
170        return self._value_estimator.update(transition)

Learn from a transition and return TD-error signals.

def predict( self, subjective_state: 'SubjectiveStateT') -> 'Mapping[GeneralValueFunctionId, float]':
172    def predict(
173        self,
174        subjective_state: SubjectiveStateT,
175    ) -> Mapping[GeneralValueFunctionId, float]:
176        return self._value_estimator.predict(subjective_state)

Predict values for the given subjective state.

def observe_usage(self, usage_records: 'Sequence[UsageRecord]') -> 'None':
178    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
179        self._utility_assessor.observe(usage_records)

Record usage evidence for utility assessment.

def utility_scores(self) -> 'Sequence[UtilityRecord]':
181    def utility_scores(self) -> Sequence[UtilityRecord]:
182        return self._utility_assessor.scores()

Return current utility estimates for all tracked structures.

def curate(self) -> 'CurationDecision':
184    def curate(self) -> CurationDecision:
185        scores = self._utility_assessor.scores()
186        if not scores:
187            return CurationDecision()
188        return self._curator.curate(scores)

Decide which learned structures to drop.

def remove( self, general_value_function_ids: 'Sequence[GeneralValueFunctionId]') -> 'None':
190    def remove(
191        self,
192        general_value_function_ids: Sequence[GeneralValueFunctionId],
193    ) -> None:
194        self._value_estimator.remove(general_value_function_ids)

Remove value functions by ID (called during curation).

def update_meta(self, error_signals: 'Mapping[str, float]') -> 'None':
196    def update_meta(self, error_signals: Mapping[str, float]) -> None:
197        if self._meta_step_sizes is not None:
198            self._meta_step_sizes.update(error_signals)

Adapt internal per-weight step sizes given error signals.

Parameters

error_signals: Named scalar error signals from the current learning step, e.g. {"main_td_error": 0.05, "reward": 1.0}. Implementations pick the signals they need and ignore the rest.

class CompositeReactivePolicy(oak_architecture.interfaces.ReactivePolicy[~SubjectiveStateT, ~ActionT, ~InfoT], typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
310class CompositeReactivePolicy(
311    ReactivePolicy[SubjectiveStateT, ActionT, InfoT],
312    Generic[SubjectiveStateT, ActionT, InfoT],
313):
314    """ReactivePolicy built from fine-grained components.
315
316    Components: `ActionSelector`, `OptionLibrary`, `OptionLearner`,
317    and optionally `OptionKeyboard` and `MetaStepSizeLearner`.
318    """
319
320    def __init__(
321        self,
322        action_selector: ActionSelector[SubjectiveStateT, ActionT],
323        option_library: OptionLibrary[SubjectiveStateT, ActionT],
324        option_learner: OptionLearner[SubjectiveStateT, ActionT, InfoT],
325        option_keyboard: OptionKeyboard | None = None,
326        meta_step_sizes: MetaStepSizeLearner | None = None,
327    ) -> None:
328        self._action_selector = action_selector
329        self._option_library = option_library
330        self._option_learner = option_learner
331        self._option_keyboard = option_keyboard
332        self._meta_step_sizes = meta_step_sizes
333        self._active_option: Option[SubjectiveStateT, ActionT] | None = None
334
335    def update(
336        self,
337        transition: Transition[ActionT, SubjectiveStateT, InfoT],
338        td_errors: Mapping[GeneralValueFunctionId, float],
339    ) -> None:
340        self._action_selector.update_from_values(
341            transition.next_subjective_state, td_errors
342        )
343        self._option_learner.update(transition)
344
345    def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None:
346        self._action_selector.apply_planning_update(update)
347
348    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
349        self._option_learner.ingest_subtasks(subtasks)
350
351    def integrate_options(self) -> None:
352        for option in self._option_learner.export_options():
353            self._option_library.add_or_replace(option)
354
355    def select_action(
356        self,
357        subjective_state: SubjectiveStateT,
358        option_stop_threshold: float,
359    ) -> tuple[ActionT, OptionId | None]:
360        if self._active_option is not None:
361            stop_prob = self._active_option.stop_probability(subjective_state)
362            if stop_prob < option_stop_threshold:
363                return (
364                    self._active_option.act(subjective_state),
365                    self._active_option.descriptor.option_id,
366                )
367            self._active_option = None
368
369        decision = self._action_selector.decide(
370            subjective_state=subjective_state,
371            active_option=None,
372            available_options=self._option_library.list_options(),
373        )
374
375        # Option composition via the keyboard: the ActionSelector may
376        # place per-option intensities in metadata["option_intensities"]
377        # to request blended behaviour rather than a single option.
378        intensities = decision.metadata.get("option_intensities")
379        if intensities is not None and self._option_keyboard is not None:
380            descriptor = self._option_keyboard.compose(intensities)
381            self._active_option = self._option_library.get(descriptor.option_id)
382            return (
383                self._active_option.act(subjective_state),
384                descriptor.option_id,
385            )
386
387        if decision.option_id is not None:
388            self._active_option = self._option_library.get(decision.option_id)
389            return (
390                self._active_option.act(subjective_state),
391                self._active_option.descriptor.option_id,
392            )
393
394        if decision.action is None:
395            raise RuntimeError(
396                "ActionSelector returned neither a primitive action nor an option."
397            )
398
399        return decision.action, None
400
401    def clear_active_option(self) -> None:
402        self._active_option = None
403
404    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
405        self._option_library.remove(option_ids)
406        if (
407            self._active_option is not None
408            and self._active_option.descriptor.option_id in option_ids
409        ):
410            self._active_option = None
411
412    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
413        self._option_learner.remove_subtasks(subtask_ids)
414
415    def update_meta(self, error_signals: Mapping[str, float]) -> None:
416        if self._meta_step_sizes is not None:
417            self._meta_step_sizes.update(error_signals)

ReactivePolicy built from fine-grained components.

Components: ActionSelector, OptionLibrary, OptionLearner, and optionally OptionKeyboard and MetaStepSizeLearner.

CompositeReactivePolicy( action_selector: 'ActionSelector[SubjectiveStateT, ActionT]', option_library: 'OptionLibrary[SubjectiveStateT, ActionT]', option_learner: 'OptionLearner[SubjectiveStateT, ActionT, InfoT]', option_keyboard: 'OptionKeyboard | None' = None, meta_step_sizes: 'MetaStepSizeLearner | None' = None)
320    def __init__(
321        self,
322        action_selector: ActionSelector[SubjectiveStateT, ActionT],
323        option_library: OptionLibrary[SubjectiveStateT, ActionT],
324        option_learner: OptionLearner[SubjectiveStateT, ActionT, InfoT],
325        option_keyboard: OptionKeyboard | None = None,
326        meta_step_sizes: MetaStepSizeLearner | None = None,
327    ) -> None:
328        self._action_selector = action_selector
329        self._option_library = option_library
330        self._option_learner = option_learner
331        self._option_keyboard = option_keyboard
332        self._meta_step_sizes = meta_step_sizes
333        self._active_option: Option[SubjectiveStateT, ActionT] | None = None
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]', td_errors: 'Mapping[GeneralValueFunctionId, float]') -> 'None':
335    def update(
336        self,
337        transition: Transition[ActionT, SubjectiveStateT, InfoT],
338        td_errors: Mapping[GeneralValueFunctionId, float],
339    ) -> None:
340        self._action_selector.update_from_values(
341            transition.next_subjective_state, td_errors
342        )
343        self._option_learner.update(transition)

Update the policy and option learners from an observed transition.

def apply_planning_update(self, update: 'PlanningUpdate[ActionT]') -> 'None':
345    def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None:
346        self._action_selector.apply_planning_update(update)

Integrate planning improvement signals into the policy.

def ingest_subtasks(self, subtasks: 'Sequence[SubtaskSpec]') -> 'None':
348    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
349        self._option_learner.ingest_subtasks(subtasks)

Feed newly created subtasks into the option learner.

def integrate_options(self) -> 'None':
351    def integrate_options(self) -> None:
352        for option in self._option_learner.export_options():
353            self._option_library.add_or_replace(option)

Export learned options into the option library.

def select_action( self, subjective_state: 'SubjectiveStateT', option_stop_threshold: 'float') -> 'tuple[ActionT, OptionId | None]':
355    def select_action(
356        self,
357        subjective_state: SubjectiveStateT,
358        option_stop_threshold: float,
359    ) -> tuple[ActionT, OptionId | None]:
360        if self._active_option is not None:
361            stop_prob = self._active_option.stop_probability(subjective_state)
362            if stop_prob < option_stop_threshold:
363                return (
364                    self._active_option.act(subjective_state),
365                    self._active_option.descriptor.option_id,
366                )
367            self._active_option = None
368
369        decision = self._action_selector.decide(
370            subjective_state=subjective_state,
371            active_option=None,
372            available_options=self._option_library.list_options(),
373        )
374
375        # Option composition via the keyboard: the ActionSelector may
376        # place per-option intensities in metadata["option_intensities"]
377        # to request blended behaviour rather than a single option.
378        intensities = decision.metadata.get("option_intensities")
379        if intensities is not None and self._option_keyboard is not None:
380            descriptor = self._option_keyboard.compose(intensities)
381            self._active_option = self._option_library.get(descriptor.option_id)
382            return (
383                self._active_option.act(subjective_state),
384                descriptor.option_id,
385            )
386
387        if decision.option_id is not None:
388            self._active_option = self._option_library.get(decision.option_id)
389            return (
390                self._active_option.act(subjective_state),
391                self._active_option.descriptor.option_id,
392            )
393
394        if decision.action is None:
395            raise RuntimeError(
396                "ActionSelector returned neither a primitive action nor an option."
397            )
398
399        return decision.action, None

Choose a primitive action, possibly by continuing an active option.

Returns a (primitive_action, active_option_id) pair. When no option is active, active_option_id is None.

def clear_active_option(self) -> 'None':
401    def clear_active_option(self) -> None:
402        self._active_option = None

Clear the currently executing option (e.g. at episode boundaries).

def remove_options(self, option_ids: 'Sequence[OptionId]') -> 'None':
404    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
405        self._option_library.remove(option_ids)
406        if (
407            self._active_option is not None
408            and self._active_option.descriptor.option_id in option_ids
409        ):
410            self._active_option = None

Remove options by ID (called during curation).

def remove_subtasks(self, subtask_ids: 'Sequence[SubtaskId]') -> 'None':
412    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
413        self._option_learner.remove_subtasks(subtask_ids)

Remove subtasks by ID (called during curation).

def update_meta(self, error_signals: 'Mapping[str, float]') -> 'None':
415    def update_meta(self, error_signals: Mapping[str, float]) -> None:
416        if self._meta_step_sizes is not None:
417            self._meta_step_sizes.update(error_signals)

Adapt internal per-weight step sizes given error signals.

Parameters

error_signals: Named scalar error signals from the current learning step, e.g. {"main_td_error": 0.05, "reward": 1.0}. Implementations pick the signals they need and ignore the rest.

class ActionSelector(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT]):
444class ActionSelector(ABC, Generic[SubjectiveStateT, ActionT]):
445    """Chooses primitive actions or options from the current subjective state.
446
447    This is the foreground action-selection mechanism.  It may be as small
448    as a hand-written policy for a toy domain or as complex as a learned
449    policy head over a rich subjective state representation.
450    """
451
452    @abstractmethod
453    def decide(
454        self,
455        subjective_state: SubjectiveStateT,
456        active_option: Option[SubjectiveStateT, ActionT] | None,
457        available_options: Sequence[Option[SubjectiveStateT, ActionT]],
458    ) -> "PolicyDecision[ActionT]":
459        raise NotImplementedError
460
461    @abstractmethod
462    def update_from_values(
463        self,
464        subjective_state: SubjectiveStateT,
465        td_errors: Mapping[GeneralValueFunctionId, float],
466    ) -> None:
467        raise NotImplementedError
468
469    @abstractmethod
470    def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None:
471        raise NotImplementedError

Chooses primitive actions or options from the current subjective state.

This is the foreground action-selection mechanism. It may be as small as a hand-written policy for a toy domain or as complex as a learned policy head over a rich subjective state representation.

@abstractmethod
def decide( self, subjective_state: 'SubjectiveStateT', active_option: 'Option[SubjectiveStateT, ActionT] | None', available_options: 'Sequence[Option[SubjectiveStateT, ActionT]]') -> "'PolicyDecision[ActionT]'":
452    @abstractmethod
453    def decide(
454        self,
455        subjective_state: SubjectiveStateT,
456        active_option: Option[SubjectiveStateT, ActionT] | None,
457        available_options: Sequence[Option[SubjectiveStateT, ActionT]],
458    ) -> "PolicyDecision[ActionT]":
459        raise NotImplementedError
@abstractmethod
def update_from_values( self, subjective_state: 'SubjectiveStateT', td_errors: 'Mapping[GeneralValueFunctionId, float]') -> 'None':
461    @abstractmethod
462    def update_from_values(
463        self,
464        subjective_state: SubjectiveStateT,
465        td_errors: Mapping[GeneralValueFunctionId, float],
466    ) -> None:
467        raise NotImplementedError
@abstractmethod
def apply_planning_update(self, update: 'PlanningUpdate[ActionT]') -> 'None':
469    @abstractmethod
470    def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None:
471        raise NotImplementedError
class Curator(abc.ABC):
346class Curator(ABC):
347    """Prunes low-utility architectural elements."""
348
349    @abstractmethod
350    def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision:
351        raise NotImplementedError

Prunes low-utility architectural elements.

@abstractmethod
def curate(self, utilities: 'Sequence[UtilityRecord]') -> 'CurationDecision':
349    @abstractmethod
350    def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision:
351        raise NotImplementedError
class FeatureBank(abc.ABC, typing.Generic[~SubjectiveStateT]):
100class FeatureBank(ABC, Generic[SubjectiveStateT]):
101    """Stores currently active features and their activations."""
102
103    @abstractmethod
104    def list_features(self) -> Sequence[FeatureSpec]:
105        raise NotImplementedError
106
107    @abstractmethod
108    def activations(
109        self,
110        subjective_state: SubjectiveStateT,
111    ) -> Mapping[FeatureId, float]:
112        """Return per-feature activation values for the given state.
113
114        Intended for `SubtaskGenerator` implementations, which receive
115        the `FeatureBank` and may use activations to decide which
116        features warrant new subtasks.
117        """
118        raise NotImplementedError
119
120    @abstractmethod
121    def add_candidates(
122        self, candidates: Sequence[FeatureCandidate]
123    ) -> Sequence[FeatureSpec]:
124        raise NotImplementedError
125
126    @abstractmethod
127    def remove(self, feature_ids: Sequence[FeatureId]) -> None:
128        raise NotImplementedError

Stores currently active features and their activations.

@abstractmethod
def list_features(self) -> 'Sequence[FeatureSpec]':
103    @abstractmethod
104    def list_features(self) -> Sequence[FeatureSpec]:
105        raise NotImplementedError
@abstractmethod
def activations( self, subjective_state: 'SubjectiveStateT') -> 'Mapping[FeatureId, float]':
107    @abstractmethod
108    def activations(
109        self,
110        subjective_state: SubjectiveStateT,
111    ) -> Mapping[FeatureId, float]:
112        """Return per-feature activation values for the given state.
113
114        Intended for `SubtaskGenerator` implementations, which receive
115        the `FeatureBank` and may use activations to decide which
116        features warrant new subtasks.
117        """
118        raise NotImplementedError

Return per-feature activation values for the given state.

Intended for SubtaskGenerator implementations, which receive the FeatureBank and may use activations to decide which features warrant new subtasks.

@abstractmethod
def add_candidates( self, candidates: 'Sequence[FeatureCandidate]') -> 'Sequence[FeatureSpec]':
120    @abstractmethod
121    def add_candidates(
122        self, candidates: Sequence[FeatureCandidate]
123    ) -> Sequence[FeatureSpec]:
124        raise NotImplementedError
@abstractmethod
def remove(self, feature_ids: 'Sequence[FeatureId]') -> 'None':
126    @abstractmethod
127    def remove(self, feature_ids: Sequence[FeatureId]) -> None:
128        raise NotImplementedError
class FeatureConstructor(abc.ABC, typing.Generic[~SubjectiveStateT]):
131class FeatureConstructor(ABC, Generic[SubjectiveStateT]):
132    """Proposes new candidate features."""
133
134    @abstractmethod
135    def propose(
136        self,
137        subjective_state: SubjectiveStateT,
138        active_features: Sequence[FeatureSpec],
139    ) -> Sequence[FeatureCandidate]:
140        raise NotImplementedError

Proposes new candidate features.

@abstractmethod
def propose( self, subjective_state: 'SubjectiveStateT', active_features: 'Sequence[FeatureSpec]') -> 'Sequence[FeatureCandidate]':
134    @abstractmethod
135    def propose(
136        self,
137        subjective_state: SubjectiveStateT,
138        active_features: Sequence[FeatureSpec],
139    ) -> Sequence[FeatureCandidate]:
140        raise NotImplementedError
class FeatureRanker(abc.ABC):
143class FeatureRanker(ABC):
144    """Ranks features for downstream use."""
145
146    @abstractmethod
147    def rank(
148        self,
149        features: Sequence[FeatureSpec],
150        utilities: Sequence[UtilityRecord],
151        limit: int | None = None,
152    ) -> Sequence[FeatureId]:
153        raise NotImplementedError

Ranks features for downstream use.

@abstractmethod
def rank( self, features: 'Sequence[FeatureSpec]', utilities: 'Sequence[UtilityRecord]', limit: 'int | None' = None) -> 'Sequence[FeatureId]':
146    @abstractmethod
147    def rank(
148        self,
149        features: Sequence[FeatureSpec],
150        utilities: Sequence[UtilityRecord],
151        limit: int | None = None,
152    ) -> Sequence[FeatureId]:
153        raise NotImplementedError
class GeneralValueFunctionLearner(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
264class GeneralValueFunctionLearner(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
265    """Learns one General Value Function online."""
266
267    @property
268    @abstractmethod
269    def spec(self) -> GeneralValueFunctionSpec[ActionT, SubjectiveStateT, InfoT]:
270        raise NotImplementedError
271
272    @abstractmethod
273    def predict(
274        self,
275        subjective_state: SubjectiveStateT,
276        action: ActionT | None = None,
277    ) -> float:
278        raise NotImplementedError
279
280    @abstractmethod
281    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> float:
282        raise NotImplementedError

Learns one General Value Function online.

spec: 'GeneralValueFunctionSpec[ActionT, SubjectiveStateT, InfoT]'
267    @property
268    @abstractmethod
269    def spec(self) -> GeneralValueFunctionSpec[ActionT, SubjectiveStateT, InfoT]:
270        raise NotImplementedError
@abstractmethod
def predict( self, subjective_state: 'SubjectiveStateT', action: 'ActionT | None' = None) -> 'float':
272    @abstractmethod
273    def predict(
274        self,
275        subjective_state: SubjectiveStateT,
276        action: ActionT | None = None,
277    ) -> float:
278        raise NotImplementedError
@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'float':
280    @abstractmethod
281    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> float:
282        raise NotImplementedError
class MetaStepSizeLearner(abc.ABC):
354class MetaStepSizeLearner(ABC):
355    """Adapts per-weight step sizes using meta-gradient methods.
356
357    Implementations may use IDBD (Sutton 1992), Adam-IDBD
358    (Degris et al. 2024), or other online cross-validation algorithms.
359    Each learned weight in the target module gets a dedicated step-size
360    parameter adapted by this learner.
361
362    The agent loop passes error signals (TD errors, reward, etc.) to
363    each module's `update_meta()`; composite implementations delegate
364    to this learner.
365    """
366
367    @abstractmethod
368    def update(self, error_signals: Mapping[str, float]) -> None:
369        """Receive error signals and adapt per-weight step sizes."""
370        raise NotImplementedError

Adapts per-weight step sizes using meta-gradient methods.

Implementations may use IDBD (Sutton 1992), Adam-IDBD (Degris et al. 2024), or other online cross-validation algorithms. Each learned weight in the target module gets a dedicated step-size parameter adapted by this learner.

The agent loop passes error signals (TD errors, reward, etc.) to each module's update_meta(); composite implementations delegate to this learner.

@abstractmethod
def update(self, error_signals: 'Mapping[str, float]') -> 'None':
367    @abstractmethod
368    def update(self, error_signals: Mapping[str, float]) -> None:
369        """Receive error signals and adapt per-weight step sizes."""
370        raise NotImplementedError

Receive error signals and adapt per-weight step sizes.

class Option(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT]):
378class Option(ABC, Generic[SubjectiveStateT, ActionT]):
379    """Temporal abstraction consisting of a policy and termination condition."""
380
381    @property
382    @abstractmethod
383    def descriptor(self) -> OptionDescriptor:
384        raise NotImplementedError
385
386    @abstractmethod
387    def is_available(self, subjective_state: SubjectiveStateT) -> bool:
388        """Whether this option can be initiated in the given state.
389
390        Intended for `ActionSelector` implementations, which receive
391        available options and may filter by initiation conditions.
392        """
393        raise NotImplementedError
394
395    @abstractmethod
396    def act(self, subjective_state: SubjectiveStateT) -> ActionT:
397        raise NotImplementedError
398
399    @abstractmethod
400    def stop_probability(self, subjective_state: SubjectiveStateT) -> float:
401        raise NotImplementedError

Temporal abstraction consisting of a policy and termination condition.

descriptor: 'OptionDescriptor'
381    @property
382    @abstractmethod
383    def descriptor(self) -> OptionDescriptor:
384        raise NotImplementedError
@abstractmethod
def is_available(self, subjective_state: 'SubjectiveStateT') -> 'bool':
386    @abstractmethod
387    def is_available(self, subjective_state: SubjectiveStateT) -> bool:
388        """Whether this option can be initiated in the given state.
389
390        Intended for `ActionSelector` implementations, which receive
391        available options and may filter by initiation conditions.
392        """
393        raise NotImplementedError

Whether this option can be initiated in the given state.

Intended for ActionSelector implementations, which receive available options and may filter by initiation conditions.

@abstractmethod
def act(self, subjective_state: 'SubjectiveStateT') -> 'ActionT':
395    @abstractmethod
396    def act(self, subjective_state: SubjectiveStateT) -> ActionT:
397        raise NotImplementedError
@abstractmethod
def stop_probability(self, subjective_state: 'SubjectiveStateT') -> 'float':
399    @abstractmethod
400    def stop_probability(self, subjective_state: SubjectiveStateT) -> float:
401        raise NotImplementedError
class OptionKeyboard(abc.ABC):
474class OptionKeyboard(ABC):
475    """Composes multiple options into a single blended behavior.
476
477    Named after Sutton's analogy: each option is a key on a keyboard,
478    and playing a "chord" (setting per-option intensities) produces a
479    composed temporal abstraction.  The `ActionSelector` determines
480    the intensities, then the keyboard produces a new option descriptor
481    representing the blended behavior.
482
483    Used by `CompositeReactivePolicy` when an `ActionSelector`
484    returns a `PolicyDecision` with `option_intensities` set.
485    """
486
487    @abstractmethod
488    def compose(self, intensities: Sequence[float]) -> OptionDescriptor:
489        """Blend options according to *intensities* and return the result."""
490        raise NotImplementedError

Composes multiple options into a single blended behavior.

Named after Sutton's analogy: each option is a key on a keyboard, and playing a "chord" (setting per-option intensities) produces a composed temporal abstraction. The ActionSelector determines the intensities, then the keyboard produces a new option descriptor representing the blended behavior.

Used by CompositeReactivePolicy when an ActionSelector returns a PolicyDecision with option_intensities set.

@abstractmethod
def compose(self, intensities: 'Sequence[float]') -> 'OptionDescriptor':
487    @abstractmethod
488    def compose(self, intensities: Sequence[float]) -> OptionDescriptor:
489        """Blend options according to *intensities* and return the result."""
490        raise NotImplementedError

Blend options according to intensities and return the result.

class OptionLearner(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
424class OptionLearner(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
425    """Learns options from subtasks and experience."""
426
427    @abstractmethod
428    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
429        raise NotImplementedError
430
431    @abstractmethod
432    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
433        raise NotImplementedError
434
435    @abstractmethod
436    def export_options(self) -> Sequence[Option[SubjectiveStateT, ActionT]]:
437        raise NotImplementedError
438
439    @abstractmethod
440    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
441        raise NotImplementedError

Learns options from subtasks and experience.

@abstractmethod
def ingest_subtasks(self, subtasks: 'Sequence[SubtaskSpec]') -> 'None':
427    @abstractmethod
428    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
429        raise NotImplementedError
@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'None':
431    @abstractmethod
432    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
433        raise NotImplementedError
@abstractmethod
def export_options(self) -> 'Sequence[Option[SubjectiveStateT, ActionT]]':
435    @abstractmethod
436    def export_options(self) -> Sequence[Option[SubjectiveStateT, ActionT]]:
437        raise NotImplementedError
@abstractmethod
def remove_subtasks(self, subtask_ids: 'Sequence[SubtaskId]') -> 'None':
439    @abstractmethod
440    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
441        raise NotImplementedError
class OptionLibrary(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT]):
404class OptionLibrary(ABC, Generic[SubjectiveStateT, ActionT]):
405    """Stores learned options."""
406
407    @abstractmethod
408    def list_options(self) -> Sequence[Option[SubjectiveStateT, ActionT]]:
409        raise NotImplementedError
410
411    @abstractmethod
412    def get(self, option_id: OptionId) -> Option[SubjectiveStateT, ActionT]:
413        raise NotImplementedError
414
415    @abstractmethod
416    def add_or_replace(self, option: Option[SubjectiveStateT, ActionT]) -> None:
417        raise NotImplementedError
418
419    @abstractmethod
420    def remove(self, option_ids: Sequence[OptionId]) -> None:
421        raise NotImplementedError

Stores learned options.

@abstractmethod
def list_options(self) -> 'Sequence[Option[SubjectiveStateT, ActionT]]':
407    @abstractmethod
408    def list_options(self) -> Sequence[Option[SubjectiveStateT, ActionT]]:
409        raise NotImplementedError
@abstractmethod
def get(self, option_id: 'OptionId') -> 'Option[SubjectiveStateT, ActionT]':
411    @abstractmethod
412    def get(self, option_id: OptionId) -> Option[SubjectiveStateT, ActionT]:
413        raise NotImplementedError
@abstractmethod
def add_or_replace(self, option: 'Option[SubjectiveStateT, ActionT]') -> 'None':
415    @abstractmethod
416    def add_or_replace(self, option: Option[SubjectiveStateT, ActionT]) -> None:
417        raise NotImplementedError
@abstractmethod
def remove(self, option_ids: 'Sequence[OptionId]') -> 'None':
419    @abstractmethod
420    def remove(self, option_ids: Sequence[OptionId]) -> None:
421        raise NotImplementedError
class OptionModel(abc.ABC, typing.Generic[~SubjectiveStateT]):
212class OptionModel(ABC, Generic[SubjectiveStateT]):
213    """Predictive model for one option."""
214
215    @property
216    @abstractmethod
217    def option_id(self) -> OptionId:
218        raise NotImplementedError
219
220    @abstractmethod
221    def predict(
222        self,
223        subjective_state: SubjectiveStateT,
224    ) -> ModelPrediction[SubjectiveStateT]:
225        raise NotImplementedError

Predictive model for one option.

option_id: 'OptionId'
215    @property
216    @abstractmethod
217    def option_id(self) -> OptionId:
218        raise NotImplementedError
@abstractmethod
def predict( self, subjective_state: 'SubjectiveStateT') -> 'ModelPrediction[SubjectiveStateT]':
220    @abstractmethod
221    def predict(
222        self,
223        subjective_state: SubjectiveStateT,
224    ) -> ModelPrediction[SubjectiveStateT]:
225        raise NotImplementedError
class OptionModelLearner(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
228class OptionModelLearner(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
229    """Learns option models from experience."""
230
231    @abstractmethod
232    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
233        raise NotImplementedError
234
235    @abstractmethod
236    def export_models(self) -> Sequence[OptionModel[SubjectiveStateT]]:
237        raise NotImplementedError

Learns option models from experience.

@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'None':
231    @abstractmethod
232    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
233        raise NotImplementedError
@abstractmethod
def export_models(self) -> 'Sequence[OptionModel[SubjectiveStateT]]':
235    @abstractmethod
236    def export_models(self) -> Sequence[OptionModel[SubjectiveStateT]]:
237        raise NotImplementedError
class Planner(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
240class Planner(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
241    """Produces planning updates from the world model.
242
243    The planner does not directly act in the world.  Instead it returns
244    improvement signals, targets, or search statistics that the reactive
245    policy and value learners can use.
246    """
247
248    @abstractmethod
249    def plan_step(
250        self,
251        subjective_state: SubjectiveStateT,
252        model: WorldModel[SubjectiveStateT, ActionT, InfoT],
253        value_function: ValueEstimator[SubjectiveStateT, ActionT, InfoT],
254        budget: int,
255    ) -> PlanningUpdate[ActionT]:
256        raise NotImplementedError

Produces planning updates from the world model.

The planner does not directly act in the world. Instead it returns improvement signals, targets, or search statistics that the reactive policy and value learners can use.

@abstractmethod
def plan_step( self, subjective_state: 'SubjectiveStateT', model: 'WorldModel[SubjectiveStateT, ActionT, InfoT]', value_function: 'ValueEstimator[SubjectiveStateT, ActionT, InfoT]', budget: 'int') -> 'PlanningUpdate[ActionT]':
248    @abstractmethod
249    def plan_step(
250        self,
251        subjective_state: SubjectiveStateT,
252        model: WorldModel[SubjectiveStateT, ActionT, InfoT],
253        value_function: ValueEstimator[SubjectiveStateT, ActionT, InfoT],
254        budget: int,
255    ) -> PlanningUpdate[ActionT]:
256        raise NotImplementedError
class StateBuilder(abc.ABC, typing.Generic[~ObservationT, ~ActionT, ~SubjectiveStateT]):
74class StateBuilder(ABC, Generic[ObservationT, ActionT, SubjectiveStateT]):
75    """Builds and updates the subjective state seen by every other component.
76
77    This is where an implementation decides what *subjective_state* means.
78    For a simple domain it may be a hand-built summary; for a more ambitious
79    project it may be the output of a learned encoder or recurrent memory.
80    """
81
82    @abstractmethod
83    def reset(self) -> None:
84        raise NotImplementedError
85
86    @abstractmethod
87    def update(
88        self,
89        observation: ObservationT,
90        reward: float,
91        last_action: ActionT | None,
92    ) -> SubjectiveStateT:
93        raise NotImplementedError
94
95    @abstractmethod
96    def current_subjective_state(self) -> SubjectiveStateT:
97        raise NotImplementedError

Builds and updates the subjective state seen by every other component.

This is where an implementation decides what subjective_state means. For a simple domain it may be a hand-built summary; for a more ambitious project it may be the output of a learned encoder or recurrent memory.

@abstractmethod
def reset(self) -> 'None':
82    @abstractmethod
83    def reset(self) -> None:
84        raise NotImplementedError
@abstractmethod
def update( self, observation: 'ObservationT', reward: 'float', last_action: 'ActionT | None') -> 'SubjectiveStateT':
86    @abstractmethod
87    def update(
88        self,
89        observation: ObservationT,
90        reward: float,
91        last_action: ActionT | None,
92    ) -> SubjectiveStateT:
93        raise NotImplementedError
@abstractmethod
def current_subjective_state(self) -> 'SubjectiveStateT':
95    @abstractmethod
96    def current_subjective_state(self) -> SubjectiveStateT:
97        raise NotImplementedError
class SubtaskGenerator(abc.ABC, typing.Generic[~SubjectiveStateT]):
156class SubtaskGenerator(ABC, Generic[SubjectiveStateT]):
157    """Maps ranked features to subtasks."""
158
159    @abstractmethod
160    def generate(
161        self,
162        ranked_feature_ids: Sequence[FeatureId],
163        feature_bank: FeatureBank[SubjectiveStateT],
164    ) -> Sequence[SubtaskSpec]:
165        raise NotImplementedError

Maps ranked features to subtasks.

@abstractmethod
def generate( self, ranked_feature_ids: 'Sequence[FeatureId]', feature_bank: 'FeatureBank[SubjectiveStateT]') -> 'Sequence[SubtaskSpec]':
159    @abstractmethod
160    def generate(
161        self,
162        ranked_feature_ids: Sequence[FeatureId],
163        feature_bank: FeatureBank[SubjectiveStateT],
164    ) -> Sequence[SubtaskSpec]:
165        raise NotImplementedError
class UtilityAssessor(abc.ABC):
334class UtilityAssessor(ABC):
335    """Aggregates usage signals into utility estimates."""
336
337    @abstractmethod
338    def observe(self, usage: Sequence[UsageRecord]) -> None:
339        raise NotImplementedError
340
341    @abstractmethod
342    def scores(self) -> Sequence[UtilityRecord]:
343        raise NotImplementedError

Aggregates usage signals into utility estimates.

@abstractmethod
def observe(self, usage: 'Sequence[UsageRecord]') -> 'None':
337    @abstractmethod
338    def observe(self, usage: Sequence[UsageRecord]) -> None:
339        raise NotImplementedError
@abstractmethod
def scores(self) -> 'Sequence[UtilityRecord]':
341    @abstractmethod
342    def scores(self) -> Sequence[UtilityRecord]:
343        raise NotImplementedError
class ValueEstimator(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
285class ValueEstimator(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
286    """Owns the main and auxiliary value learners.
287
288    A minimal implementation can expose a single predictive learner.  A
289    richer implementation can maintain a bank of General Value Functions.
290    """
291
292    @abstractmethod
293    def list_general_value_functions(
294        self,
295    ) -> Sequence[GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]]:
296        """Return all managed GVF learners.
297
298        Intended for `Planner` implementations that need to inspect
299        the GVF bank (e.g., to evaluate auxiliary predictions during
300        planning).
301        """
302        raise NotImplementedError
303
304    @abstractmethod
305    def predict(
306        self, subjective_state: SubjectiveStateT
307    ) -> Mapping[GeneralValueFunctionId, float]:
308        raise NotImplementedError
309
310    @abstractmethod
311    def update(
312        self, transition: Transition[ActionT, SubjectiveStateT, InfoT]
313    ) -> Mapping[GeneralValueFunctionId, float]:
314        raise NotImplementedError
315
316    @abstractmethod
317    def add_or_replace(
318        self, learner: GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]
319    ) -> None:
320        """Add or replace a GVF learner in the bank.
321
322        Used for dynamic GVF management, e.g., creating new GVFs when
323        new subtasks or options are discovered.
324        """
325        raise NotImplementedError
326
327    @abstractmethod
328    def remove(
329        self, general_value_function_ids: Sequence[GeneralValueFunctionId]
330    ) -> None:
331        raise NotImplementedError

Owns the main and auxiliary value learners.

A minimal implementation can expose a single predictive learner. A richer implementation can maintain a bank of General Value Functions.

@abstractmethod
def list_general_value_functions( self) -> 'Sequence[GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]]':
292    @abstractmethod
293    def list_general_value_functions(
294        self,
295    ) -> Sequence[GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]]:
296        """Return all managed GVF learners.
297
298        Intended for `Planner` implementations that need to inspect
299        the GVF bank (e.g., to evaluate auxiliary predictions during
300        planning).
301        """
302        raise NotImplementedError

Return all managed GVF learners.

Intended for Planner implementations that need to inspect the GVF bank (e.g., to evaluate auxiliary predictions during planning).

@abstractmethod
def predict( self, subjective_state: 'SubjectiveStateT') -> 'Mapping[GeneralValueFunctionId, float]':
304    @abstractmethod
305    def predict(
306        self, subjective_state: SubjectiveStateT
307    ) -> Mapping[GeneralValueFunctionId, float]:
308        raise NotImplementedError
@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'Mapping[GeneralValueFunctionId, float]':
310    @abstractmethod
311    def update(
312        self, transition: Transition[ActionT, SubjectiveStateT, InfoT]
313    ) -> Mapping[GeneralValueFunctionId, float]:
314        raise NotImplementedError
@abstractmethod
def add_or_replace( self, learner: 'GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]') -> 'None':
316    @abstractmethod
317    def add_or_replace(
318        self, learner: GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]
319    ) -> None:
320        """Add or replace a GVF learner in the bank.
321
322        Used for dynamic GVF management, e.g., creating new GVFs when
323        new subtasks or options are discovered.
324        """
325        raise NotImplementedError

Add or replace a GVF learner in the bank.

Used for dynamic GVF management, e.g., creating new GVFs when new subtasks or options are discovered.

@abstractmethod
def remove( self, general_value_function_ids: 'Sequence[GeneralValueFunctionId]') -> 'None':
327    @abstractmethod
328    def remove(
329        self, general_value_function_ids: Sequence[GeneralValueFunctionId]
330    ) -> None:
331        raise NotImplementedError
class WorldModel(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
173class WorldModel(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
174    """Predictive world model for actions and options.
175
176    This is the planner-facing model of what will happen next.  It may be
177    learned, analytic, approximate, or hybrid, as long as it can answer the
178    bounded queries the planner needs.
179    """
180
181    @abstractmethod
182    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
183        raise NotImplementedError
184
185    @abstractmethod
186    def predict_action(
187        self,
188        subjective_state: SubjectiveStateT,
189        action: ActionT,
190    ) -> ModelPrediction[SubjectiveStateT]:
191        raise NotImplementedError
192
193    @abstractmethod
194    def predict_option(
195        self,
196        subjective_state: SubjectiveStateT,
197        option_id: OptionId,
198    ) -> ModelPrediction[SubjectiveStateT]:
199        raise NotImplementedError
200
201    @abstractmethod
202    def add_or_replace_option_models(
203        self, models: Sequence[OptionModel[SubjectiveStateT]]
204    ) -> None:
205        raise NotImplementedError
206
207    @abstractmethod
208    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
209        raise NotImplementedError

Predictive world model for actions and options.

This is the planner-facing model of what will happen next. It may be learned, analytic, approximate, or hybrid, as long as it can answer the bounded queries the planner needs.

@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'None':
181    @abstractmethod
182    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
183        raise NotImplementedError
@abstractmethod
def predict_action( self, subjective_state: 'SubjectiveStateT', action: 'ActionT') -> 'ModelPrediction[SubjectiveStateT]':
185    @abstractmethod
186    def predict_action(
187        self,
188        subjective_state: SubjectiveStateT,
189        action: ActionT,
190    ) -> ModelPrediction[SubjectiveStateT]:
191        raise NotImplementedError
@abstractmethod
def predict_option( self, subjective_state: 'SubjectiveStateT', option_id: 'OptionId') -> 'ModelPrediction[SubjectiveStateT]':
193    @abstractmethod
194    def predict_option(
195        self,
196        subjective_state: SubjectiveStateT,
197        option_id: OptionId,
198    ) -> ModelPrediction[SubjectiveStateT]:
199        raise NotImplementedError
@abstractmethod
def add_or_replace_option_models(self, models: 'Sequence[OptionModel[SubjectiveStateT]]') -> 'None':
201    @abstractmethod
202    def add_or_replace_option_models(
203        self, models: Sequence[OptionModel[SubjectiveStateT]]
204    ) -> None:
205        raise NotImplementedError
@abstractmethod
def remove_option_models(self, option_ids: 'Sequence[OptionId]') -> 'None':
207    @abstractmethod
208    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
209        raise NotImplementedError