oak.fine_grained

Optional fine-grained OaK building blocks and composites.

The default public surface of OaK is the four main interfaces in oak.interfaces together with OaKAgent.

This subpackage exposes a more detailed assembly layer for projects that want to swap internal pieces such as a planner, world model, or feature constructor independently.

 1"""Optional fine-grained OaK building blocks and composites.
 2
 3The default public surface of OaK is the four main interfaces in
 4`oak.interfaces` together with `OaKAgent`.
 5
 6This subpackage exposes a more detailed assembly layer for projects that want
 7to swap internal pieces such as a planner, world model, or feature constructor
 8independently.
 9"""
10
11from .composites import (
12    CompositePerception,
13    CompositeReactivePolicy,
14    CompositeTransitionModel,
15    CompositeValueFunction,
16)
17from .components import (
18    ActionSelector,
19    Curator,
20    FeatureBank,
21    FeatureConstructor,
22    FeatureRanker,
23    GeneralValueFunctionLearner,
24    MetaStepSizeLearner,
25    Option,
26    OptionKeyboard,
27    OptionLearner,
28    OptionLibrary,
29    OptionModel,
30    OptionModelLearner,
31    Planner,
32    StateBuilder,
33    SubtaskGenerator,
34    UtilityAssessor,
35    ValueEstimator,
36    WorldModel,
37)
38
39__all__ = [
40    "CompositePerception",
41    "CompositeTransitionModel",
42    "CompositeValueFunction",
43    "CompositeReactivePolicy",
44    "ActionSelector",
45    "Curator",
46    "FeatureBank",
47    "FeatureConstructor",
48    "FeatureRanker",
49    "GeneralValueFunctionLearner",
50    "MetaStepSizeLearner",
51    "Option",
52    "OptionKeyboard",
53    "OptionLearner",
54    "OptionLibrary",
55    "OptionModel",
56    "OptionModelLearner",
57    "Planner",
58    "StateBuilder",
59    "SubtaskGenerator",
60    "UtilityAssessor",
61    "ValueEstimator",
62    "WorldModel",
63]
class CompositePerception(oak.interfaces.Perception[~ObservationT, ~ActionT, ~SubjectiveStateT], typing.Generic[~ObservationT, ~ActionT, ~SubjectiveStateT]):
 83class CompositePerception(
 84    Perception[ObservationT, ActionT, SubjectiveStateT],
 85    Generic[ObservationT, ActionT, SubjectiveStateT],
 86):
 87    """Perception built from fine-grained components.
 88
 89    Components: `StateBuilder`, `FeatureBank`, `FeatureConstructor`,
 90    `FeatureRanker`, `SubtaskGenerator`, and optionally
 91    `MetaStepSizeLearner`.
 92    """
 93
 94    def __init__(
 95        self,
 96        state_builder: StateBuilder[ObservationT, ActionT, SubjectiveStateT],
 97        feature_bank: FeatureBank[SubjectiveStateT],
 98        feature_constructor: FeatureConstructor[SubjectiveStateT],
 99        feature_ranker: FeatureRanker,
100        subtask_generator: SubtaskGenerator[SubjectiveStateT],
101        meta_step_sizes: MetaStepSizeLearner | None = None,
102    ) -> None:
103        self._state_builder = state_builder
104        self._feature_bank = feature_bank
105        self._feature_constructor = feature_constructor
106        self._feature_ranker = feature_ranker
107        self._subtask_generator = subtask_generator
108        self._meta_step_sizes = meta_step_sizes
109
110    def reset(self) -> None:
111        self._state_builder.reset()
112
113    def update(
114        self,
115        observation: ObservationT,
116        reward: float,
117        last_action: ActionT | None,
118    ) -> SubjectiveStateT:
119        return self._state_builder.update(observation, reward, last_action)
120
121    def current_subjective_state(self) -> SubjectiveStateT:
122        return self._state_builder.current_subjective_state()
123
124    def discover_and_rank_features(
125        self,
126        subjective_state: SubjectiveStateT,
127        utility_scores: Sequence[UtilityRecord],
128        feature_budget: int,
129    ) -> Sequence[FeatureId]:
130        candidates = self._feature_constructor.propose(
131            subjective_state, self._feature_bank.list_features()
132        )
133        if candidates:
134            self._feature_bank.add_candidates(candidates)
135        return self._feature_ranker.rank(
136            self._feature_bank.list_features(), utility_scores, limit=feature_budget
137        )
138
139    def generate_subtasks(
140        self,
141        ranked_feature_ids: Sequence[FeatureId],
142    ) -> Sequence[SubtaskSpec]:
143        return self._subtask_generator.generate(ranked_feature_ids, self._feature_bank)
144
145    def list_features(self) -> Sequence[FeatureSpec]:
146        return self._feature_bank.list_features()
147
148    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
149        self._feature_bank.remove(feature_ids)
150
151    def update_meta(self, error_signals: Mapping[str, float]) -> None:
152        if self._meta_step_sizes is not None:
153            self._meta_step_sizes.update(error_signals)

Perception built from fine-grained components.

Components: StateBuilder, FeatureBank, FeatureConstructor, FeatureRanker, SubtaskGenerator, and optionally MetaStepSizeLearner.

CompositePerception( state_builder: 'StateBuilder[ObservationT, ActionT, SubjectiveStateT]', feature_bank: 'FeatureBank[SubjectiveStateT]', feature_constructor: 'FeatureConstructor[SubjectiveStateT]', feature_ranker: 'FeatureRanker', subtask_generator: 'SubtaskGenerator[SubjectiveStateT]', meta_step_sizes: 'MetaStepSizeLearner | None' = None)
 94    def __init__(
 95        self,
 96        state_builder: StateBuilder[ObservationT, ActionT, SubjectiveStateT],
 97        feature_bank: FeatureBank[SubjectiveStateT],
 98        feature_constructor: FeatureConstructor[SubjectiveStateT],
 99        feature_ranker: FeatureRanker,
100        subtask_generator: SubtaskGenerator[SubjectiveStateT],
101        meta_step_sizes: MetaStepSizeLearner | None = None,
102    ) -> None:
103        self._state_builder = state_builder
104        self._feature_bank = feature_bank
105        self._feature_constructor = feature_constructor
106        self._feature_ranker = feature_ranker
107        self._subtask_generator = subtask_generator
108        self._meta_step_sizes = meta_step_sizes
def reset(self) -> 'None':
110    def reset(self) -> None:
111        self._state_builder.reset()

Reset all perception state for a new episode.

def update( self, observation: 'ObservationT', reward: 'float', last_action: 'ActionT | None') -> 'SubjectiveStateT':
113    def update(
114        self,
115        observation: ObservationT,
116        reward: float,
117        last_action: ActionT | None,
118    ) -> SubjectiveStateT:
119        return self._state_builder.update(observation, reward, last_action)

Process a new observation and return the updated subjective state.

def current_subjective_state(self) -> 'SubjectiveStateT':
121    def current_subjective_state(self) -> SubjectiveStateT:
122        return self._state_builder.current_subjective_state()

Return the most recently computed subjective state.

def discover_and_rank_features( self, subjective_state: 'SubjectiveStateT', utility_scores: 'Sequence[UtilityRecord]', feature_budget: 'int') -> 'Sequence[FeatureId]':
124    def discover_and_rank_features(
125        self,
126        subjective_state: SubjectiveStateT,
127        utility_scores: Sequence[UtilityRecord],
128        feature_budget: int,
129    ) -> Sequence[FeatureId]:
130        candidates = self._feature_constructor.propose(
131            subjective_state, self._feature_bank.list_features()
132        )
133        if candidates:
134            self._feature_bank.add_candidates(candidates)
135        return self._feature_ranker.rank(
136            self._feature_bank.list_features(), utility_scores, limit=feature_budget
137        )

Propose new features, integrate them, and return the top-ranked IDs.

A typical implementation:

  1. Proposes candidate features from the current subjective state.
  2. Adds accepted candidates to its internal feature store.
  3. Ranks all features using the provided utility scores.
  4. Returns the top feature IDs (up to feature_budget).
def generate_subtasks( self, ranked_feature_ids: 'Sequence[FeatureId]') -> 'Sequence[SubtaskSpec]':
139    def generate_subtasks(
140        self,
141        ranked_feature_ids: Sequence[FeatureId],
142    ) -> Sequence[SubtaskSpec]:
143        return self._subtask_generator.generate(ranked_feature_ids, self._feature_bank)

Turn ranked feature IDs into subtask specifications.

def list_features(self) -> 'Sequence[FeatureSpec]':
145    def list_features(self) -> Sequence[FeatureSpec]:
146        return self._feature_bank.list_features()

Return all currently tracked features.

def remove_features(self, feature_ids: 'Sequence[FeatureId]') -> 'None':
148    def remove_features(self, feature_ids: Sequence[FeatureId]) -> None:
149        self._feature_bank.remove(feature_ids)

Remove features by ID (called during curation).

def update_meta(self, error_signals: 'Mapping[str, float]') -> 'None':
151    def update_meta(self, error_signals: Mapping[str, float]) -> None:
152        if self._meta_step_sizes is not None:
153            self._meta_step_sizes.update(error_signals)

Adapt internal per-weight step sizes given error signals.

Parameters

error_signals: Named scalar error signals from the current learning step, e.g. {"main_td_error": 0.05, "reward": 1.0}. Implementations pick the signals they need and ignore the rest.

class CompositeTransitionModel(oak.interfaces.TransitionModel[~SubjectiveStateT, ~ActionT, ~InfoT], typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
274class CompositeTransitionModel(
275    TransitionModel[SubjectiveStateT, ActionT, InfoT],
276    Generic[SubjectiveStateT, ActionT, InfoT],
277):
278    """TransitionModel built from fine-grained components.
279
280    Components: `WorldModel`, `OptionModelLearner`, `Planner`, and
281    optionally `MetaStepSizeLearner`.
282    """
283
284    def __init__(
285        self,
286        world_model: WorldModel[SubjectiveStateT, ActionT, InfoT],
287        option_model_learner: OptionModelLearner[SubjectiveStateT, ActionT, InfoT],
288        planner: Planner[SubjectiveStateT, ActionT, InfoT],
289        meta_step_sizes: MetaStepSizeLearner | None = None,
290    ) -> None:
291        self._world_model = world_model
292        self._option_model_learner = option_model_learner
293        self._planner = planner
294        self._meta_step_sizes = meta_step_sizes
295
296    def update(
297        self,
298        transition: Transition[ActionT, SubjectiveStateT, InfoT],
299    ) -> None:
300        self._world_model.update(transition)
301        self._option_model_learner.update(transition)
302
303    def integrate_option_models(self) -> None:
304        models = self._option_model_learner.export_models()
305        self._world_model.add_or_replace_option_models(models)
306
307    def plan(
308        self,
309        subjective_state: SubjectiveStateT,
310        value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT],
311        budget: int,
312    ) -> PlanningUpdate[ActionT]:
313        adapter = _ValueEstimatorAdapter(value_function)
314        return self._planner.plan_step(
315            subjective_state, self._world_model, adapter, budget
316        )
317
318    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
319        self._world_model.remove_option_models(option_ids)
320
321    def update_meta(self, error_signals: Mapping[str, float]) -> None:
322        if self._meta_step_sizes is not None:
323            self._meta_step_sizes.update(error_signals)

TransitionModel built from fine-grained components.

Components: WorldModel, OptionModelLearner, Planner, and optionally MetaStepSizeLearner.

CompositeTransitionModel( world_model: 'WorldModel[SubjectiveStateT, ActionT, InfoT]', option_model_learner: 'OptionModelLearner[SubjectiveStateT, ActionT, InfoT]', planner: 'Planner[SubjectiveStateT, ActionT, InfoT]', meta_step_sizes: 'MetaStepSizeLearner | None' = None)
284    def __init__(
285        self,
286        world_model: WorldModel[SubjectiveStateT, ActionT, InfoT],
287        option_model_learner: OptionModelLearner[SubjectiveStateT, ActionT, InfoT],
288        planner: Planner[SubjectiveStateT, ActionT, InfoT],
289        meta_step_sizes: MetaStepSizeLearner | None = None,
290    ) -> None:
291        self._world_model = world_model
292        self._option_model_learner = option_model_learner
293        self._planner = planner
294        self._meta_step_sizes = meta_step_sizes
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'None':
296    def update(
297        self,
298        transition: Transition[ActionT, SubjectiveStateT, InfoT],
299    ) -> None:
300        self._world_model.update(transition)
301        self._option_model_learner.update(transition)

Learn from an observed transition.

This should update both the world model and any option-model learners.

def integrate_option_models(self) -> 'None':
303    def integrate_option_models(self) -> None:
304        models = self._option_model_learner.export_models()
305        self._world_model.add_or_replace_option_models(models)

Export learned option models and integrate them into the world model.

Called after option learning so that planning reasons over fresh models.

def plan( self, subjective_state: 'SubjectiveStateT', value_function: 'ValueFunction[SubjectiveStateT, ActionT, InfoT]', budget: 'int') -> 'PlanningUpdate[ActionT]':
307    def plan(
308        self,
309        subjective_state: SubjectiveStateT,
310        value_function: ValueFunction[SubjectiveStateT, ActionT, InfoT],
311        budget: int,
312    ) -> PlanningUpdate[ActionT]:
313        adapter = _ValueEstimatorAdapter(value_function)
314        return self._planner.plan_step(
315            subjective_state, self._world_model, adapter, budget
316        )

Run bounded planning and return improvement signals.

The planner uses the internal world model together with the supplied value_function (for state evaluation) to produce value targets, policy targets, or search statistics.

def remove_option_models(self, option_ids: 'Sequence[OptionId]') -> 'None':
318    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
319        self._world_model.remove_option_models(option_ids)

Remove option models by ID (called during curation).

def update_meta(self, error_signals: 'Mapping[str, float]') -> 'None':
321    def update_meta(self, error_signals: Mapping[str, float]) -> None:
322        if self._meta_step_sizes is not None:
323            self._meta_step_sizes.update(error_signals)

Adapt internal per-weight step sizes given error signals.

Parameters

error_signals: Named scalar error signals from the current learning step, e.g. {"main_td_error": 0.05, "reward": 1.0}. Implementations pick the signals they need and ignore the rest.

class CompositeValueFunction(oak.interfaces.ValueFunction[~SubjectiveStateT, ~ActionT, ~InfoT], typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
161class CompositeValueFunction(
162    ValueFunction[SubjectiveStateT, ActionT, InfoT],
163    Generic[SubjectiveStateT, ActionT, InfoT],
164):
165    """ValueFunction built from fine-grained components.
166
167    Components: `ValueEstimator`, `UtilityAssessor`, `Curator`,
168    and optionally `MetaStepSizeLearner`.
169    """
170
171    def __init__(
172        self,
173        value_estimator: ValueEstimator[SubjectiveStateT, ActionT, InfoT],
174        utility_assessor: UtilityAssessor,
175        curator: Curator,
176        meta_step_sizes: MetaStepSizeLearner | None = None,
177    ) -> None:
178        self._value_estimator = value_estimator
179        self._utility_assessor = utility_assessor
180        self._curator = curator
181        self._meta_step_sizes = meta_step_sizes
182
183    def update(
184        self,
185        transition: Transition[ActionT, SubjectiveStateT, InfoT],
186        *,
187        planning: bool = False,
188    ) -> Mapping[GeneralValueFunctionId, float]:
189        if planning:
190            return {}
191        return self._value_estimator.update(transition)
192
193    def predict(
194        self,
195        subjective_state: SubjectiveStateT,
196    ) -> Mapping[GeneralValueFunctionId, float]:
197        return self._value_estimator.predict(subjective_state)
198
199    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
200        self._utility_assessor.observe(usage_records)
201
202    def utility_scores(self) -> Sequence[UtilityRecord]:
203        return self._utility_assessor.scores()
204
205    def curate(self) -> CurationDecision:
206        scores = self._utility_assessor.scores()
207        if not scores:
208            return CurationDecision()
209        return self._curator.curate(scores)
210
211    def remove(
212        self,
213        general_value_function_ids: Sequence[GeneralValueFunctionId],
214    ) -> None:
215        self._value_estimator.remove(general_value_function_ids)
216
217    def update_meta(self, error_signals: Mapping[str, float]) -> None:
218        if self._meta_step_sizes is not None:
219            self._meta_step_sizes.update(error_signals)

ValueFunction built from fine-grained components.

Components: ValueEstimator, UtilityAssessor, Curator, and optionally MetaStepSizeLearner.

CompositeValueFunction( value_estimator: 'ValueEstimator[SubjectiveStateT, ActionT, InfoT]', utility_assessor: 'UtilityAssessor', curator: 'Curator', meta_step_sizes: 'MetaStepSizeLearner | None' = None)
171    def __init__(
172        self,
173        value_estimator: ValueEstimator[SubjectiveStateT, ActionT, InfoT],
174        utility_assessor: UtilityAssessor,
175        curator: Curator,
176        meta_step_sizes: MetaStepSizeLearner | None = None,
177    ) -> None:
178        self._value_estimator = value_estimator
179        self._utility_assessor = utility_assessor
180        self._curator = curator
181        self._meta_step_sizes = meta_step_sizes
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]', *, planning: 'bool' = False) -> 'Mapping[GeneralValueFunctionId, float]':
183    def update(
184        self,
185        transition: Transition[ActionT, SubjectiveStateT, InfoT],
186        *,
187        planning: bool = False,
188    ) -> Mapping[GeneralValueFunctionId, float]:
189        if planning:
190            return {}
191        return self._value_estimator.update(transition)

Learn from a transition and return TD-error signals.

def predict( self, subjective_state: 'SubjectiveStateT') -> 'Mapping[GeneralValueFunctionId, float]':
193    def predict(
194        self,
195        subjective_state: SubjectiveStateT,
196    ) -> Mapping[GeneralValueFunctionId, float]:
197        return self._value_estimator.predict(subjective_state)

Predict values for the given subjective state.

def observe_usage(self, usage_records: 'Sequence[UsageRecord]') -> 'None':
199    def observe_usage(self, usage_records: Sequence[UsageRecord]) -> None:
200        self._utility_assessor.observe(usage_records)

Record usage evidence for utility assessment.

def utility_scores(self) -> 'Sequence[UtilityRecord]':
202    def utility_scores(self) -> Sequence[UtilityRecord]:
203        return self._utility_assessor.scores()

Return current utility estimates for all tracked structures.

def curate(self) -> 'CurationDecision':
205    def curate(self) -> CurationDecision:
206        scores = self._utility_assessor.scores()
207        if not scores:
208            return CurationDecision()
209        return self._curator.curate(scores)

Decide which learned structures to drop.

def remove( self, general_value_function_ids: 'Sequence[GeneralValueFunctionId]') -> 'None':
211    def remove(
212        self,
213        general_value_function_ids: Sequence[GeneralValueFunctionId],
214    ) -> None:
215        self._value_estimator.remove(general_value_function_ids)

Remove value functions by ID (called during curation).

def update_meta(self, error_signals: 'Mapping[str, float]') -> 'None':
217    def update_meta(self, error_signals: Mapping[str, float]) -> None:
218        if self._meta_step_sizes is not None:
219            self._meta_step_sizes.update(error_signals)

Adapt internal per-weight step sizes given error signals.

Parameters

error_signals: Named scalar error signals from the current learning step, e.g. {"main_td_error": 0.05, "reward": 1.0}. Implementations pick the signals they need and ignore the rest.

class CompositeReactivePolicy(oak.interfaces.ReactivePolicy[~SubjectiveStateT, ~ActionT, ~InfoT], typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
331class CompositeReactivePolicy(
332    ReactivePolicy[SubjectiveStateT, ActionT, InfoT],
333    Generic[SubjectiveStateT, ActionT, InfoT],
334):
335    """ReactivePolicy built from fine-grained components.
336
337    Components: `ActionSelector`, `OptionLibrary`, `OptionLearner`,
338    and optionally `OptionKeyboard` and `MetaStepSizeLearner`.
339    """
340
341    def __init__(
342        self,
343        action_selector: ActionSelector[SubjectiveStateT, ActionT],
344        option_library: OptionLibrary[SubjectiveStateT, ActionT],
345        option_learner: OptionLearner[SubjectiveStateT, ActionT, InfoT],
346        option_keyboard: OptionKeyboard | None = None,
347        meta_step_sizes: MetaStepSizeLearner | None = None,
348    ) -> None:
349        self._action_selector = action_selector
350        self._option_library = option_library
351        self._option_learner = option_learner
352        self._option_keyboard = option_keyboard
353        self._meta_step_sizes = meta_step_sizes
354        self._active_option: Option[SubjectiveStateT, ActionT] | None = None
355
356    def update(
357        self,
358        transition: Transition[ActionT, SubjectiveStateT, InfoT],
359        td_errors: Mapping[GeneralValueFunctionId, float],
360    ) -> None:
361        self._action_selector.update_from_values(
362            transition.next_subjective_state, td_errors
363        )
364        self._option_learner.update(transition)
365
366    def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None:
367        self._action_selector.apply_planning_update(update)
368
369    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
370        self._option_learner.ingest_subtasks(subtasks)
371
372    def integrate_options(self) -> None:
373        for option in self._option_learner.export_options():
374            self._option_library.add_or_replace(option)
375
376    def select_action(
377        self,
378        subjective_state: SubjectiveStateT,
379        option_stop_threshold: float,
380    ) -> tuple[ActionT, OptionId | None]:
381        if self._active_option is not None:
382            stop_prob = self._active_option.stop_probability(subjective_state)
383            if stop_prob < option_stop_threshold:
384                return (
385                    self._active_option.act(subjective_state),
386                    self._active_option.descriptor.option_id,
387                )
388            self._active_option = None
389
390        decision = self._action_selector.decide(
391            subjective_state=subjective_state,
392            active_option=None,
393            available_options=self._option_library.list_options(),
394        )
395
396        # Option composition via the keyboard: the ActionSelector may
397        # place per-option intensities in metadata["option_intensities"]
398        # to request blended behaviour rather than a single option.
399        intensities = _as_option_intensities(
400            decision.metadata.get("option_intensities")
401        )
402        if intensities is not None and self._option_keyboard is not None:
403            descriptor = self._option_keyboard.compose(intensities)
404            self._active_option = self._option_library.get(descriptor.option_id)
405            return (
406                self._active_option.act(subjective_state),
407                descriptor.option_id,
408            )
409
410        if decision.option_id is not None:
411            self._active_option = self._option_library.get(decision.option_id)
412            return (
413                self._active_option.act(subjective_state),
414                self._active_option.descriptor.option_id,
415            )
416
417        if decision.action is None:
418            raise RuntimeError(
419                "ActionSelector returned neither a primitive action nor an option."
420            )
421
422        return decision.action, None
423
424    def clear_active_option(self) -> None:
425        self._active_option = None
426
427    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
428        self._option_library.remove(option_ids)
429        if (
430            self._active_option is not None
431            and self._active_option.descriptor.option_id in option_ids
432        ):
433            self._active_option = None
434
435    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
436        self._option_learner.remove_subtasks(subtask_ids)
437
438    def update_meta(self, error_signals: Mapping[str, float]) -> None:
439        if self._meta_step_sizes is not None:
440            self._meta_step_sizes.update(error_signals)

ReactivePolicy built from fine-grained components.

Components: ActionSelector, OptionLibrary, OptionLearner, and optionally OptionKeyboard and MetaStepSizeLearner.

CompositeReactivePolicy( action_selector: 'ActionSelector[SubjectiveStateT, ActionT]', option_library: 'OptionLibrary[SubjectiveStateT, ActionT]', option_learner: 'OptionLearner[SubjectiveStateT, ActionT, InfoT]', option_keyboard: 'OptionKeyboard | None' = None, meta_step_sizes: 'MetaStepSizeLearner | None' = None)
341    def __init__(
342        self,
343        action_selector: ActionSelector[SubjectiveStateT, ActionT],
344        option_library: OptionLibrary[SubjectiveStateT, ActionT],
345        option_learner: OptionLearner[SubjectiveStateT, ActionT, InfoT],
346        option_keyboard: OptionKeyboard | None = None,
347        meta_step_sizes: MetaStepSizeLearner | None = None,
348    ) -> None:
349        self._action_selector = action_selector
350        self._option_library = option_library
351        self._option_learner = option_learner
352        self._option_keyboard = option_keyboard
353        self._meta_step_sizes = meta_step_sizes
354        self._active_option: Option[SubjectiveStateT, ActionT] | None = None
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]', td_errors: 'Mapping[GeneralValueFunctionId, float]') -> 'None':
356    def update(
357        self,
358        transition: Transition[ActionT, SubjectiveStateT, InfoT],
359        td_errors: Mapping[GeneralValueFunctionId, float],
360    ) -> None:
361        self._action_selector.update_from_values(
362            transition.next_subjective_state, td_errors
363        )
364        self._option_learner.update(transition)

Update the policy and option learners from an observed transition.

def apply_planning_update(self, update: 'PlanningUpdate[ActionT]') -> 'None':
366    def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None:
367        self._action_selector.apply_planning_update(update)

Integrate planning improvement signals into the policy.

def ingest_subtasks(self, subtasks: 'Sequence[SubtaskSpec]') -> 'None':
369    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
370        self._option_learner.ingest_subtasks(subtasks)

Feed newly created subtasks into the option learner.

def integrate_options(self) -> 'None':
372    def integrate_options(self) -> None:
373        for option in self._option_learner.export_options():
374            self._option_library.add_or_replace(option)

Export learned options into the option library.

def select_action( self, subjective_state: 'SubjectiveStateT', option_stop_threshold: 'float') -> 'tuple[ActionT, OptionId | None]':
376    def select_action(
377        self,
378        subjective_state: SubjectiveStateT,
379        option_stop_threshold: float,
380    ) -> tuple[ActionT, OptionId | None]:
381        if self._active_option is not None:
382            stop_prob = self._active_option.stop_probability(subjective_state)
383            if stop_prob < option_stop_threshold:
384                return (
385                    self._active_option.act(subjective_state),
386                    self._active_option.descriptor.option_id,
387                )
388            self._active_option = None
389
390        decision = self._action_selector.decide(
391            subjective_state=subjective_state,
392            active_option=None,
393            available_options=self._option_library.list_options(),
394        )
395
396        # Option composition via the keyboard: the ActionSelector may
397        # place per-option intensities in metadata["option_intensities"]
398        # to request blended behaviour rather than a single option.
399        intensities = _as_option_intensities(
400            decision.metadata.get("option_intensities")
401        )
402        if intensities is not None and self._option_keyboard is not None:
403            descriptor = self._option_keyboard.compose(intensities)
404            self._active_option = self._option_library.get(descriptor.option_id)
405            return (
406                self._active_option.act(subjective_state),
407                descriptor.option_id,
408            )
409
410        if decision.option_id is not None:
411            self._active_option = self._option_library.get(decision.option_id)
412            return (
413                self._active_option.act(subjective_state),
414                self._active_option.descriptor.option_id,
415            )
416
417        if decision.action is None:
418            raise RuntimeError(
419                "ActionSelector returned neither a primitive action nor an option."
420            )
421
422        return decision.action, None

Choose a primitive action, possibly by continuing an active option.

Returns a (primitive_action, active_option_id) pair. When no option is active, active_option_id is None.

def clear_active_option(self) -> 'None':
424    def clear_active_option(self) -> None:
425        self._active_option = None

Clear the currently executing option (e.g. at episode boundaries).

def remove_options(self, option_ids: 'Sequence[OptionId]') -> 'None':
427    def remove_options(self, option_ids: Sequence[OptionId]) -> None:
428        self._option_library.remove(option_ids)
429        if (
430            self._active_option is not None
431            and self._active_option.descriptor.option_id in option_ids
432        ):
433            self._active_option = None

Remove options by ID (called during curation).

def remove_subtasks(self, subtask_ids: 'Sequence[SubtaskId]') -> 'None':
435    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
436        self._option_learner.remove_subtasks(subtask_ids)

Remove subtasks by ID (called during curation).

def update_meta(self, error_signals: 'Mapping[str, float]') -> 'None':
438    def update_meta(self, error_signals: Mapping[str, float]) -> None:
439        if self._meta_step_sizes is not None:
440            self._meta_step_sizes.update(error_signals)

Adapt internal per-weight step sizes given error signals.

Parameters

error_signals: Named scalar error signals from the current learning step, e.g. {"main_td_error": 0.05, "reward": 1.0}. Implementations pick the signals they need and ignore the rest.

class ActionSelector(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT]):
444class ActionSelector(ABC, Generic[SubjectiveStateT, ActionT]):
445    """Chooses primitive actions or options from the current subjective state.
446
447    This is the foreground action-selection mechanism.  It may be as small
448    as a hand-written policy for a toy domain or as complex as a learned
449    policy head over a rich subjective state representation.
450    """
451
452    @abstractmethod
453    def decide(
454        self,
455        subjective_state: SubjectiveStateT,
456        active_option: Option[SubjectiveStateT, ActionT] | None,
457        available_options: Sequence[Option[SubjectiveStateT, ActionT]],
458    ) -> "PolicyDecision[ActionT]":
459        raise NotImplementedError
460
461    @abstractmethod
462    def update_from_values(
463        self,
464        subjective_state: SubjectiveStateT,
465        td_errors: Mapping[GeneralValueFunctionId, float],
466    ) -> None:
467        raise NotImplementedError
468
469    @abstractmethod
470    def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None:
471        raise NotImplementedError

Chooses primitive actions or options from the current subjective state.

This is the foreground action-selection mechanism. It may be as small as a hand-written policy for a toy domain or as complex as a learned policy head over a rich subjective state representation.

@abstractmethod
def decide( self, subjective_state: 'SubjectiveStateT', active_option: 'Option[SubjectiveStateT, ActionT] | None', available_options: 'Sequence[Option[SubjectiveStateT, ActionT]]') -> "'PolicyDecision[ActionT]'":
452    @abstractmethod
453    def decide(
454        self,
455        subjective_state: SubjectiveStateT,
456        active_option: Option[SubjectiveStateT, ActionT] | None,
457        available_options: Sequence[Option[SubjectiveStateT, ActionT]],
458    ) -> "PolicyDecision[ActionT]":
459        raise NotImplementedError
@abstractmethod
def update_from_values( self, subjective_state: 'SubjectiveStateT', td_errors: 'Mapping[GeneralValueFunctionId, float]') -> 'None':
461    @abstractmethod
462    def update_from_values(
463        self,
464        subjective_state: SubjectiveStateT,
465        td_errors: Mapping[GeneralValueFunctionId, float],
466    ) -> None:
467        raise NotImplementedError
@abstractmethod
def apply_planning_update(self, update: 'PlanningUpdate[ActionT]') -> 'None':
469    @abstractmethod
470    def apply_planning_update(self, update: PlanningUpdate[ActionT]) -> None:
471        raise NotImplementedError
class Curator(abc.ABC):
346class Curator(ABC):
347    """Prunes low-utility architectural elements."""
348
349    @abstractmethod
350    def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision:
351        raise NotImplementedError

Prunes low-utility architectural elements.

@abstractmethod
def curate(self, utilities: 'Sequence[UtilityRecord]') -> 'CurationDecision':
349    @abstractmethod
350    def curate(self, utilities: Sequence[UtilityRecord]) -> CurationDecision:
351        raise NotImplementedError
class FeatureBank(abc.ABC, typing.Generic[~SubjectiveStateT]):
100class FeatureBank(ABC, Generic[SubjectiveStateT]):
101    """Stores currently active features and their activations."""
102
103    @abstractmethod
104    def list_features(self) -> Sequence[FeatureSpec]:
105        raise NotImplementedError
106
107    @abstractmethod
108    def activations(
109        self,
110        subjective_state: SubjectiveStateT,
111    ) -> Mapping[FeatureId, float]:
112        """Return per-feature activation values for the given state.
113
114        Intended for `SubtaskGenerator` implementations, which receive
115        the `FeatureBank` and may use activations to decide which
116        features warrant new subtasks.
117        """
118        raise NotImplementedError
119
120    @abstractmethod
121    def add_candidates(
122        self, candidates: Sequence[FeatureCandidate]
123    ) -> Sequence[FeatureSpec]:
124        raise NotImplementedError
125
126    @abstractmethod
127    def remove(self, feature_ids: Sequence[FeatureId]) -> None:
128        raise NotImplementedError

Stores currently active features and their activations.

@abstractmethod
def list_features(self) -> 'Sequence[FeatureSpec]':
103    @abstractmethod
104    def list_features(self) -> Sequence[FeatureSpec]:
105        raise NotImplementedError
@abstractmethod
def activations( self, subjective_state: 'SubjectiveStateT') -> 'Mapping[FeatureId, float]':
107    @abstractmethod
108    def activations(
109        self,
110        subjective_state: SubjectiveStateT,
111    ) -> Mapping[FeatureId, float]:
112        """Return per-feature activation values for the given state.
113
114        Intended for `SubtaskGenerator` implementations, which receive
115        the `FeatureBank` and may use activations to decide which
116        features warrant new subtasks.
117        """
118        raise NotImplementedError

Return per-feature activation values for the given state.

Intended for SubtaskGenerator implementations, which receive the FeatureBank and may use activations to decide which features warrant new subtasks.

@abstractmethod
def add_candidates( self, candidates: 'Sequence[FeatureCandidate]') -> 'Sequence[FeatureSpec]':
120    @abstractmethod
121    def add_candidates(
122        self, candidates: Sequence[FeatureCandidate]
123    ) -> Sequence[FeatureSpec]:
124        raise NotImplementedError
@abstractmethod
def remove(self, feature_ids: 'Sequence[FeatureId]') -> 'None':
126    @abstractmethod
127    def remove(self, feature_ids: Sequence[FeatureId]) -> None:
128        raise NotImplementedError
class FeatureConstructor(abc.ABC, typing.Generic[~SubjectiveStateT]):
131class FeatureConstructor(ABC, Generic[SubjectiveStateT]):
132    """Proposes new candidate features."""
133
134    @abstractmethod
135    def propose(
136        self,
137        subjective_state: SubjectiveStateT,
138        active_features: Sequence[FeatureSpec],
139    ) -> Sequence[FeatureCandidate]:
140        raise NotImplementedError

Proposes new candidate features.

@abstractmethod
def propose( self, subjective_state: 'SubjectiveStateT', active_features: 'Sequence[FeatureSpec]') -> 'Sequence[FeatureCandidate]':
134    @abstractmethod
135    def propose(
136        self,
137        subjective_state: SubjectiveStateT,
138        active_features: Sequence[FeatureSpec],
139    ) -> Sequence[FeatureCandidate]:
140        raise NotImplementedError
class FeatureRanker(abc.ABC):
143class FeatureRanker(ABC):
144    """Ranks features for downstream use."""
145
146    @abstractmethod
147    def rank(
148        self,
149        features: Sequence[FeatureSpec],
150        utilities: Sequence[UtilityRecord],
151        limit: int | None = None,
152    ) -> Sequence[FeatureId]:
153        raise NotImplementedError

Ranks features for downstream use.

@abstractmethod
def rank( self, features: 'Sequence[FeatureSpec]', utilities: 'Sequence[UtilityRecord]', limit: 'int | None' = None) -> 'Sequence[FeatureId]':
146    @abstractmethod
147    def rank(
148        self,
149        features: Sequence[FeatureSpec],
150        utilities: Sequence[UtilityRecord],
151        limit: int | None = None,
152    ) -> Sequence[FeatureId]:
153        raise NotImplementedError
class GeneralValueFunctionLearner(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
264class GeneralValueFunctionLearner(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
265    """Learns one General Value Function online."""
266
267    @property
268    @abstractmethod
269    def spec(self) -> GeneralValueFunctionSpec[ActionT, SubjectiveStateT, InfoT]:
270        raise NotImplementedError
271
272    @abstractmethod
273    def predict(
274        self,
275        subjective_state: SubjectiveStateT,
276        action: ActionT | None = None,
277    ) -> float:
278        raise NotImplementedError
279
280    @abstractmethod
281    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> float:
282        raise NotImplementedError

Learns one General Value Function online.

spec: 'GeneralValueFunctionSpec[ActionT, SubjectiveStateT, InfoT]'
267    @property
268    @abstractmethod
269    def spec(self) -> GeneralValueFunctionSpec[ActionT, SubjectiveStateT, InfoT]:
270        raise NotImplementedError
@abstractmethod
def predict( self, subjective_state: 'SubjectiveStateT', action: 'ActionT | None' = None) -> 'float':
272    @abstractmethod
273    def predict(
274        self,
275        subjective_state: SubjectiveStateT,
276        action: ActionT | None = None,
277    ) -> float:
278        raise NotImplementedError
@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'float':
280    @abstractmethod
281    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> float:
282        raise NotImplementedError
class MetaStepSizeLearner(abc.ABC):
354class MetaStepSizeLearner(ABC):
355    """Adapts per-weight step sizes using meta-gradient methods.
356
357    Implementations may use IDBD (Sutton 1992), Adam-IDBD
358    (Degris et al. 2024), or other online cross-validation algorithms.
359    Each learned weight in the target module gets a dedicated step-size
360    parameter adapted by this learner.
361
362    The agent loop passes error signals (TD errors, reward, etc.) to
363    each module's `update_meta()`; composite implementations delegate
364    to this learner.
365    """
366
367    @abstractmethod
368    def update(self, error_signals: Mapping[str, float]) -> None:
369        """Receive error signals and adapt per-weight step sizes."""
370        raise NotImplementedError

Adapts per-weight step sizes using meta-gradient methods.

Implementations may use IDBD (Sutton 1992), Adam-IDBD (Degris et al. 2024), or other online cross-validation algorithms. Each learned weight in the target module gets a dedicated step-size parameter adapted by this learner.

The agent loop passes error signals (TD errors, reward, etc.) to each module's update_meta(); composite implementations delegate to this learner.

@abstractmethod
def update(self, error_signals: 'Mapping[str, float]') -> 'None':
367    @abstractmethod
368    def update(self, error_signals: Mapping[str, float]) -> None:
369        """Receive error signals and adapt per-weight step sizes."""
370        raise NotImplementedError

Receive error signals and adapt per-weight step sizes.

class Option(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT]):
378class Option(ABC, Generic[SubjectiveStateT, ActionT]):
379    """Temporal abstraction consisting of a policy and termination condition."""
380
381    @property
382    @abstractmethod
383    def descriptor(self) -> OptionDescriptor:
384        raise NotImplementedError
385
386    @abstractmethod
387    def is_available(self, subjective_state: SubjectiveStateT) -> bool:
388        """Whether this option can be initiated in the given state.
389
390        Intended for `ActionSelector` implementations, which receive
391        available options and may filter by initiation conditions.
392        """
393        raise NotImplementedError
394
395    @abstractmethod
396    def act(self, subjective_state: SubjectiveStateT) -> ActionT:
397        raise NotImplementedError
398
399    @abstractmethod
400    def stop_probability(self, subjective_state: SubjectiveStateT) -> float:
401        raise NotImplementedError

Temporal abstraction consisting of a policy and termination condition.

descriptor: 'OptionDescriptor'
381    @property
382    @abstractmethod
383    def descriptor(self) -> OptionDescriptor:
384        raise NotImplementedError
@abstractmethod
def is_available(self, subjective_state: 'SubjectiveStateT') -> 'bool':
386    @abstractmethod
387    def is_available(self, subjective_state: SubjectiveStateT) -> bool:
388        """Whether this option can be initiated in the given state.
389
390        Intended for `ActionSelector` implementations, which receive
391        available options and may filter by initiation conditions.
392        """
393        raise NotImplementedError

Whether this option can be initiated in the given state.

Intended for ActionSelector implementations, which receive available options and may filter by initiation conditions.

@abstractmethod
def act(self, subjective_state: 'SubjectiveStateT') -> 'ActionT':
395    @abstractmethod
396    def act(self, subjective_state: SubjectiveStateT) -> ActionT:
397        raise NotImplementedError
@abstractmethod
def stop_probability(self, subjective_state: 'SubjectiveStateT') -> 'float':
399    @abstractmethod
400    def stop_probability(self, subjective_state: SubjectiveStateT) -> float:
401        raise NotImplementedError
class OptionKeyboard(abc.ABC):
474class OptionKeyboard(ABC):
475    """Composes multiple options into a single blended behavior.
476
477    Named after Sutton's analogy: each option is a key on a keyboard,
478    and playing a "chord" (setting per-option intensities) produces a
479    composed temporal abstraction.  The `ActionSelector` determines
480    the intensities, then the keyboard produces a new option descriptor
481    representing the blended behavior.
482
483    Used by `CompositeReactivePolicy` when an `ActionSelector`
484    returns a `PolicyDecision` with `option_intensities` set.
485    """
486
487    @abstractmethod
488    def compose(self, intensities: Sequence[float]) -> OptionDescriptor:
489        """Blend options according to *intensities* and return the result."""
490        raise NotImplementedError

Composes multiple options into a single blended behavior.

Named after Sutton's analogy: each option is a key on a keyboard, and playing a "chord" (setting per-option intensities) produces a composed temporal abstraction. The ActionSelector determines the intensities, then the keyboard produces a new option descriptor representing the blended behavior.

Used by CompositeReactivePolicy when an ActionSelector returns a PolicyDecision with option_intensities set.

@abstractmethod
def compose(self, intensities: 'Sequence[float]') -> 'OptionDescriptor':
487    @abstractmethod
488    def compose(self, intensities: Sequence[float]) -> OptionDescriptor:
489        """Blend options according to *intensities* and return the result."""
490        raise NotImplementedError

Blend options according to intensities and return the result.

class OptionLearner(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
424class OptionLearner(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
425    """Learns options from subtasks and experience."""
426
427    @abstractmethod
428    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
429        raise NotImplementedError
430
431    @abstractmethod
432    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
433        raise NotImplementedError
434
435    @abstractmethod
436    def export_options(self) -> Sequence[Option[SubjectiveStateT, ActionT]]:
437        raise NotImplementedError
438
439    @abstractmethod
440    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
441        raise NotImplementedError

Learns options from subtasks and experience.

@abstractmethod
def ingest_subtasks(self, subtasks: 'Sequence[SubtaskSpec]') -> 'None':
427    @abstractmethod
428    def ingest_subtasks(self, subtasks: Sequence[SubtaskSpec]) -> None:
429        raise NotImplementedError
@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'None':
431    @abstractmethod
432    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
433        raise NotImplementedError
@abstractmethod
def export_options(self) -> 'Sequence[Option[SubjectiveStateT, ActionT]]':
435    @abstractmethod
436    def export_options(self) -> Sequence[Option[SubjectiveStateT, ActionT]]:
437        raise NotImplementedError
@abstractmethod
def remove_subtasks(self, subtask_ids: 'Sequence[SubtaskId]') -> 'None':
439    @abstractmethod
440    def remove_subtasks(self, subtask_ids: Sequence[SubtaskId]) -> None:
441        raise NotImplementedError
class OptionLibrary(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT]):
404class OptionLibrary(ABC, Generic[SubjectiveStateT, ActionT]):
405    """Stores learned options."""
406
407    @abstractmethod
408    def list_options(self) -> Sequence[Option[SubjectiveStateT, ActionT]]:
409        raise NotImplementedError
410
411    @abstractmethod
412    def get(self, option_id: OptionId) -> Option[SubjectiveStateT, ActionT]:
413        raise NotImplementedError
414
415    @abstractmethod
416    def add_or_replace(self, option: Option[SubjectiveStateT, ActionT]) -> None:
417        raise NotImplementedError
418
419    @abstractmethod
420    def remove(self, option_ids: Sequence[OptionId]) -> None:
421        raise NotImplementedError

Stores learned options.

@abstractmethod
def list_options(self) -> 'Sequence[Option[SubjectiveStateT, ActionT]]':
407    @abstractmethod
408    def list_options(self) -> Sequence[Option[SubjectiveStateT, ActionT]]:
409        raise NotImplementedError
@abstractmethod
def get(self, option_id: 'OptionId') -> 'Option[SubjectiveStateT, ActionT]':
411    @abstractmethod
412    def get(self, option_id: OptionId) -> Option[SubjectiveStateT, ActionT]:
413        raise NotImplementedError
@abstractmethod
def add_or_replace(self, option: 'Option[SubjectiveStateT, ActionT]') -> 'None':
415    @abstractmethod
416    def add_or_replace(self, option: Option[SubjectiveStateT, ActionT]) -> None:
417        raise NotImplementedError
@abstractmethod
def remove(self, option_ids: 'Sequence[OptionId]') -> 'None':
419    @abstractmethod
420    def remove(self, option_ids: Sequence[OptionId]) -> None:
421        raise NotImplementedError
class OptionModel(abc.ABC, typing.Generic[~SubjectiveStateT]):
212class OptionModel(ABC, Generic[SubjectiveStateT]):
213    """Predictive model for one option."""
214
215    @property
216    @abstractmethod
217    def option_id(self) -> OptionId:
218        raise NotImplementedError
219
220    @abstractmethod
221    def predict(
222        self,
223        subjective_state: SubjectiveStateT,
224    ) -> ModelPrediction[SubjectiveStateT]:
225        raise NotImplementedError

Predictive model for one option.

option_id: 'OptionId'
215    @property
216    @abstractmethod
217    def option_id(self) -> OptionId:
218        raise NotImplementedError
@abstractmethod
def predict( self, subjective_state: 'SubjectiveStateT') -> 'ModelPrediction[SubjectiveStateT]':
220    @abstractmethod
221    def predict(
222        self,
223        subjective_state: SubjectiveStateT,
224    ) -> ModelPrediction[SubjectiveStateT]:
225        raise NotImplementedError
class OptionModelLearner(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
228class OptionModelLearner(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
229    """Learns option models from experience."""
230
231    @abstractmethod
232    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
233        raise NotImplementedError
234
235    @abstractmethod
236    def export_models(self) -> Sequence[OptionModel[SubjectiveStateT]]:
237        raise NotImplementedError

Learns option models from experience.

@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'None':
231    @abstractmethod
232    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
233        raise NotImplementedError
@abstractmethod
def export_models(self) -> 'Sequence[OptionModel[SubjectiveStateT]]':
235    @abstractmethod
236    def export_models(self) -> Sequence[OptionModel[SubjectiveStateT]]:
237        raise NotImplementedError
class Planner(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
240class Planner(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
241    """Produces planning updates from the world model.
242
243    The planner does not directly act in the world.  Instead it returns
244    improvement signals, targets, or search statistics that the reactive
245    policy and value learners can use.
246    """
247
248    @abstractmethod
249    def plan_step(
250        self,
251        subjective_state: SubjectiveStateT,
252        model: WorldModel[SubjectiveStateT, ActionT, InfoT],
253        value_function: ValueEstimator[SubjectiveStateT, ActionT, InfoT],
254        budget: int,
255    ) -> PlanningUpdate[ActionT]:
256        raise NotImplementedError

Produces planning updates from the world model.

The planner does not directly act in the world. Instead it returns improvement signals, targets, or search statistics that the reactive policy and value learners can use.

@abstractmethod
def plan_step( self, subjective_state: 'SubjectiveStateT', model: 'WorldModel[SubjectiveStateT, ActionT, InfoT]', value_function: 'ValueEstimator[SubjectiveStateT, ActionT, InfoT]', budget: 'int') -> 'PlanningUpdate[ActionT]':
248    @abstractmethod
249    def plan_step(
250        self,
251        subjective_state: SubjectiveStateT,
252        model: WorldModel[SubjectiveStateT, ActionT, InfoT],
253        value_function: ValueEstimator[SubjectiveStateT, ActionT, InfoT],
254        budget: int,
255    ) -> PlanningUpdate[ActionT]:
256        raise NotImplementedError
class StateBuilder(abc.ABC, typing.Generic[~ObservationT, ~ActionT, ~SubjectiveStateT]):
74class StateBuilder(ABC, Generic[ObservationT, ActionT, SubjectiveStateT]):
75    """Builds and updates the subjective state seen by every other component.
76
77    This is where an implementation decides what *subjective_state* means.
78    For a simple domain it may be a hand-built summary; for a more ambitious
79    project it may be the output of a learned encoder or recurrent memory.
80    """
81
82    @abstractmethod
83    def reset(self) -> None:
84        raise NotImplementedError
85
86    @abstractmethod
87    def update(
88        self,
89        observation: ObservationT,
90        reward: float,
91        last_action: ActionT | None,
92    ) -> SubjectiveStateT:
93        raise NotImplementedError
94
95    @abstractmethod
96    def current_subjective_state(self) -> SubjectiveStateT:
97        raise NotImplementedError

Builds and updates the subjective state seen by every other component.

This is where an implementation decides what subjective_state means. For a simple domain it may be a hand-built summary; for a more ambitious project it may be the output of a learned encoder or recurrent memory.

@abstractmethod
def reset(self) -> 'None':
82    @abstractmethod
83    def reset(self) -> None:
84        raise NotImplementedError
@abstractmethod
def update( self, observation: 'ObservationT', reward: 'float', last_action: 'ActionT | None') -> 'SubjectiveStateT':
86    @abstractmethod
87    def update(
88        self,
89        observation: ObservationT,
90        reward: float,
91        last_action: ActionT | None,
92    ) -> SubjectiveStateT:
93        raise NotImplementedError
@abstractmethod
def current_subjective_state(self) -> 'SubjectiveStateT':
95    @abstractmethod
96    def current_subjective_state(self) -> SubjectiveStateT:
97        raise NotImplementedError
class SubtaskGenerator(abc.ABC, typing.Generic[~SubjectiveStateT]):
156class SubtaskGenerator(ABC, Generic[SubjectiveStateT]):
157    """Maps ranked features to subtasks."""
158
159    @abstractmethod
160    def generate(
161        self,
162        ranked_feature_ids: Sequence[FeatureId],
163        feature_bank: FeatureBank[SubjectiveStateT],
164    ) -> Sequence[SubtaskSpec]:
165        raise NotImplementedError

Maps ranked features to subtasks.

@abstractmethod
def generate( self, ranked_feature_ids: 'Sequence[FeatureId]', feature_bank: 'FeatureBank[SubjectiveStateT]') -> 'Sequence[SubtaskSpec]':
159    @abstractmethod
160    def generate(
161        self,
162        ranked_feature_ids: Sequence[FeatureId],
163        feature_bank: FeatureBank[SubjectiveStateT],
164    ) -> Sequence[SubtaskSpec]:
165        raise NotImplementedError
class UtilityAssessor(abc.ABC):
334class UtilityAssessor(ABC):
335    """Aggregates usage signals into utility estimates."""
336
337    @abstractmethod
338    def observe(self, usage: Sequence[UsageRecord]) -> None:
339        raise NotImplementedError
340
341    @abstractmethod
342    def scores(self) -> Sequence[UtilityRecord]:
343        raise NotImplementedError

Aggregates usage signals into utility estimates.

@abstractmethod
def observe(self, usage: 'Sequence[UsageRecord]') -> 'None':
337    @abstractmethod
338    def observe(self, usage: Sequence[UsageRecord]) -> None:
339        raise NotImplementedError
@abstractmethod
def scores(self) -> 'Sequence[UtilityRecord]':
341    @abstractmethod
342    def scores(self) -> Sequence[UtilityRecord]:
343        raise NotImplementedError
class ValueEstimator(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
285class ValueEstimator(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
286    """Owns the main and auxiliary value learners.
287
288    A minimal implementation can expose a single predictive learner.  A
289    richer implementation can maintain a bank of General Value Functions.
290    """
291
292    @abstractmethod
293    def list_general_value_functions(
294        self,
295    ) -> Sequence[GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]]:
296        """Return all managed GVF learners.
297
298        Intended for `Planner` implementations that need to inspect
299        the GVF bank (e.g., to evaluate auxiliary predictions during
300        planning).
301        """
302        raise NotImplementedError
303
304    @abstractmethod
305    def predict(
306        self, subjective_state: SubjectiveStateT
307    ) -> Mapping[GeneralValueFunctionId, float]:
308        raise NotImplementedError
309
310    @abstractmethod
311    def update(
312        self, transition: Transition[ActionT, SubjectiveStateT, InfoT]
313    ) -> Mapping[GeneralValueFunctionId, float]:
314        raise NotImplementedError
315
316    @abstractmethod
317    def add_or_replace(
318        self, learner: GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]
319    ) -> None:
320        """Add or replace a GVF learner in the bank.
321
322        Used for dynamic GVF management, e.g., creating new GVFs when
323        new subtasks or options are discovered.
324        """
325        raise NotImplementedError
326
327    @abstractmethod
328    def remove(
329        self, general_value_function_ids: Sequence[GeneralValueFunctionId]
330    ) -> None:
331        raise NotImplementedError

Owns the main and auxiliary value learners.

A minimal implementation can expose a single predictive learner. A richer implementation can maintain a bank of General Value Functions.

@abstractmethod
def list_general_value_functions( self) -> 'Sequence[GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]]':
292    @abstractmethod
293    def list_general_value_functions(
294        self,
295    ) -> Sequence[GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]]:
296        """Return all managed GVF learners.
297
298        Intended for `Planner` implementations that need to inspect
299        the GVF bank (e.g., to evaluate auxiliary predictions during
300        planning).
301        """
302        raise NotImplementedError

Return all managed GVF learners.

Intended for Planner implementations that need to inspect the GVF bank (e.g., to evaluate auxiliary predictions during planning).

@abstractmethod
def predict( self, subjective_state: 'SubjectiveStateT') -> 'Mapping[GeneralValueFunctionId, float]':
304    @abstractmethod
305    def predict(
306        self, subjective_state: SubjectiveStateT
307    ) -> Mapping[GeneralValueFunctionId, float]:
308        raise NotImplementedError
@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'Mapping[GeneralValueFunctionId, float]':
310    @abstractmethod
311    def update(
312        self, transition: Transition[ActionT, SubjectiveStateT, InfoT]
313    ) -> Mapping[GeneralValueFunctionId, float]:
314        raise NotImplementedError
@abstractmethod
def add_or_replace( self, learner: 'GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]') -> 'None':
316    @abstractmethod
317    def add_or_replace(
318        self, learner: GeneralValueFunctionLearner[SubjectiveStateT, ActionT, InfoT]
319    ) -> None:
320        """Add or replace a GVF learner in the bank.
321
322        Used for dynamic GVF management, e.g., creating new GVFs when
323        new subtasks or options are discovered.
324        """
325        raise NotImplementedError

Add or replace a GVF learner in the bank.

Used for dynamic GVF management, e.g., creating new GVFs when new subtasks or options are discovered.

@abstractmethod
def remove( self, general_value_function_ids: 'Sequence[GeneralValueFunctionId]') -> 'None':
327    @abstractmethod
328    def remove(
329        self, general_value_function_ids: Sequence[GeneralValueFunctionId]
330    ) -> None:
331        raise NotImplementedError
class WorldModel(abc.ABC, typing.Generic[~SubjectiveStateT, ~ActionT, ~InfoT]):
173class WorldModel(ABC, Generic[SubjectiveStateT, ActionT, InfoT]):
174    """Predictive world model for actions and options.
175
176    This is the planner-facing model of what will happen next.  It may be
177    learned, analytic, approximate, or hybrid, as long as it can answer the
178    bounded queries the planner needs.
179    """
180
181    @abstractmethod
182    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
183        raise NotImplementedError
184
185    @abstractmethod
186    def predict_action(
187        self,
188        subjective_state: SubjectiveStateT,
189        action: ActionT,
190    ) -> ModelPrediction[SubjectiveStateT]:
191        raise NotImplementedError
192
193    @abstractmethod
194    def predict_option(
195        self,
196        subjective_state: SubjectiveStateT,
197        option_id: OptionId,
198    ) -> ModelPrediction[SubjectiveStateT]:
199        raise NotImplementedError
200
201    @abstractmethod
202    def add_or_replace_option_models(
203        self, models: Sequence[OptionModel[SubjectiveStateT]]
204    ) -> None:
205        raise NotImplementedError
206
207    @abstractmethod
208    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
209        raise NotImplementedError

Predictive world model for actions and options.

This is the planner-facing model of what will happen next. It may be learned, analytic, approximate, or hybrid, as long as it can answer the bounded queries the planner needs.

@abstractmethod
def update( self, transition: 'Transition[ActionT, SubjectiveStateT, InfoT]') -> 'None':
181    @abstractmethod
182    def update(self, transition: Transition[ActionT, SubjectiveStateT, InfoT]) -> None:
183        raise NotImplementedError
@abstractmethod
def predict_action( self, subjective_state: 'SubjectiveStateT', action: 'ActionT') -> 'ModelPrediction[SubjectiveStateT]':
185    @abstractmethod
186    def predict_action(
187        self,
188        subjective_state: SubjectiveStateT,
189        action: ActionT,
190    ) -> ModelPrediction[SubjectiveStateT]:
191        raise NotImplementedError
@abstractmethod
def predict_option( self, subjective_state: 'SubjectiveStateT', option_id: 'OptionId') -> 'ModelPrediction[SubjectiveStateT]':
193    @abstractmethod
194    def predict_option(
195        self,
196        subjective_state: SubjectiveStateT,
197        option_id: OptionId,
198    ) -> ModelPrediction[SubjectiveStateT]:
199        raise NotImplementedError
@abstractmethod
def add_or_replace_option_models(self, models: 'Sequence[OptionModel[SubjectiveStateT]]') -> 'None':
201    @abstractmethod
202    def add_or_replace_option_models(
203        self, models: Sequence[OptionModel[SubjectiveStateT]]
204    ) -> None:
205        raise NotImplementedError
@abstractmethod
def remove_option_models(self, option_ids: 'Sequence[OptionId]') -> 'None':
207    @abstractmethod
208    def remove_option_models(self, option_ids: Sequence[OptionId]) -> None:
209        raise NotImplementedError