Tile base class that powers every piece of functionality.

Tile

Bases: ABC

Base class to implement custom tiles.

Subclasses should define a unique name and implement execute. When async support is required override aexecute; otherwise the default implementation safely offloads the sync execute via asyncio.to_thread.

Source code in src/tileable/tile.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Tile[PayloadT: TilePayload, ResultT: TileResult](ABC):
    """Base class to implement custom tiles.

    Subclasses should define a unique ``name`` and implement ``execute``. When
    async support is required override ``aexecute``; otherwise the default
    implementation safely offloads the sync ``execute`` via ``asyncio.to_thread``.
    """

    name: ClassVar[str]
    description: ClassVar[str | None] = None

    def __init__(self, *, context: TileContext | None = None) -> None:
        self._context = context

    @property
    def context(self) -> TileContext:
        if self._context is None:
            msg = "Tile context is not attached. Pass `context=` when instantiating or use `invoke_tile`."
            raise RuntimeError(msg)
        return self._context

    def set_context(self, context: TileContext | None) -> None:
        """Attach or detach the runtime context."""

        self._context = context

    @abstractmethod
    def execute(self, payload: PayloadT) -> ResultT:
        """Execute the tile synchronously."""

    async def aexecute(self, payload: PayloadT) -> ResultT:
        """Execute the tile asynchronously.

        The default behavior offloads ``execute`` to a worker thread, ensuring the
        event loop never blocks. Overriding tiles can provide a native async
        implementation if needed.
        """

        return await asyncio.to_thread(self.execute, payload)

aexecute(payload) async

Execute the tile asynchronously.

The default behavior offloads execute to a worker thread, ensuring the event loop never blocks. Overriding tiles can provide a native async implementation if needed.

Source code in src/tileable/tile.py
43
44
45
46
47
48
49
50
51
async def aexecute(self, payload: PayloadT) -> ResultT:
    """Execute the tile asynchronously.

    The default behavior offloads ``execute`` to a worker thread, ensuring the
    event loop never blocks. Overriding tiles can provide a native async
    implementation if needed.
    """

    return await asyncio.to_thread(self.execute, payload)

execute(payload) abstractmethod

Execute the tile synchronously.

Source code in src/tileable/tile.py
39
40
41
@abstractmethod
def execute(self, payload: PayloadT) -> ResultT:
    """Execute the tile synchronously."""

set_context(context)

Attach or detach the runtime context.

Source code in src/tileable/tile.py
34
35
36
37
def set_context(self, context: TileContext | None) -> None:
    """Attach or detach the runtime context."""

    self._context = context

Execution context injected into every tile instance.

TileContext

Runtime context that tiles can use to interact with the system.

Source code in src/tileable/context.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class TileContext:
    """Runtime context that tiles can use to interact with the system."""

    def __init__(
        self,
        *,
        event_bus: EventBus,
        services: Mapping[str, Any] | None = None,
        state: MutableMapping[str, Any] | None = None,
    ) -> None:
        self._event_bus = event_bus
        self._services: dict[str, Any] = dict(services or {})
        self._state: MutableMapping[str, Any] = state or {}

    @property
    def event_bus(self) -> EventBus:
        return self._event_bus

    @property
    def services(self) -> Mapping[str, Any]:
        """Read-only view of registered services."""

        return MappingProxyType(self._services)

    @property
    def state(self) -> MutableMapping[str, Any]:
        """Mutable state bag shared across the current invocation."""

        return self._state

    def emit(self, event: str, /, **payload: Any) -> None:
        """Emit ``event`` with ``payload`` through the event bus."""

        self._event_bus.emit(event, **payload)

    def get_service(self, name: str) -> Any:
        """Return a registered service or raise ``KeyError``."""

        try:
            return self._services[name]
        except KeyError as exc:  # pragma: no cover - defensive path
            raise KeyError(name) from exc

    def get_service_or(self, name: str, default: Any | None = None) -> Any | None:
        """Return a service if available, otherwise ``default``."""

        return self._services.get(name, default)

    def set_service(self, name: str, value: Any) -> None:
        """Register or override a service for the current invocation."""

        self._services[name] = value

services property

Read-only view of registered services.

state property

Mutable state bag shared across the current invocation.

emit(event, /, **payload)

Emit event with payload through the event bus.

Source code in src/tileable/context.py
42
43
44
45
def emit(self, event: str, /, **payload: Any) -> None:
    """Emit ``event`` with ``payload`` through the event bus."""

    self._event_bus.emit(event, **payload)

get_service(name)

Return a registered service or raise KeyError.

Source code in src/tileable/context.py
47
48
49
50
51
52
53
def get_service(self, name: str) -> Any:
    """Return a registered service or raise ``KeyError``."""

    try:
        return self._services[name]
    except KeyError as exc:  # pragma: no cover - defensive path
        raise KeyError(name) from exc

get_service_or(name, default=None)

Return a service if available, otherwise default.

Source code in src/tileable/context.py
55
56
57
58
def get_service_or(self, name: str, default: Any | None = None) -> Any | None:
    """Return a service if available, otherwise ``default``."""

    return self._services.get(name, default)

set_service(name, value)

Register or override a service for the current invocation.

Source code in src/tileable/context.py
60
61
62
63
def set_service(self, name: str, value: Any) -> None:
    """Register or override a service for the current invocation."""

    self._services[name] = value

Event bus abstraction built on top of :mod:blinker.

CapturedEvent dataclass

Structured representation of an emitted event.

Source code in src/tileable/events.py
22
23
24
25
26
27
28
@dataclass(frozen=True)
class CapturedEvent:
    """Structured representation of an emitted event."""

    name: str
    payload: dict[str, Any]
    sender: Any | None = None

EventBus

Small helper around :class:blinker.Namespace.

The bus exposes emit/subscribe/unsubscribe methods to keep the public API tiny while still giving developers full control over the underlying signal system.

Source code in src/tileable/events.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class EventBus:
    """Small helper around :class:`blinker.Namespace`.

    The bus exposes ``emit``/``subscribe``/``unsubscribe`` methods to keep the
    public API tiny while still giving developers full control over the
    underlying signal system.
    """

    def __init__(self) -> None:
        self._namespace = Namespace()

    def emit(self, event: str, /, **payload: Any) -> None:
        """Broadcast ``payload`` to all subscribers of ``event``."""

        signal = self._namespace.signal(event)
        signal.send(event, **payload)

    def subscribe(self, event: str, handler: Callable[..., Any], *, weak: bool = False) -> Callable[[], None]:
        """Register ``handler`` for ``event`` and return an unsubscribe callback."""

        signal = self._namespace.signal(event)
        signal.connect(handler, weak=weak)
        return lambda: signal.disconnect(handler)

    def unsubscribe(self, event: str, handler: Callable[..., Any]) -> None:
        """Remove ``handler`` from ``event`` subscribers if previously registered."""

        signal = self._namespace.signal(event)
        signal.disconnect(handler)

    def record(self, *events: str | Iterable[str], include_sender: bool = False) -> EventRecorder:
        """Capture emissions for ``events`` inside a context manager.

        When ``events`` is omitted the recorder listens to
        :data:`STANDARD_EVENTS`. The returned object exposes captured
        payloads for assertions and debugging.
        """

        names = _normalize_event_names(events)
        return EventRecorder(self, names, include_sender=include_sender)

emit(event, /, **payload)

Broadcast payload to all subscribers of event.

Source code in src/tileable/events.py
42
43
44
45
46
def emit(self, event: str, /, **payload: Any) -> None:
    """Broadcast ``payload`` to all subscribers of ``event``."""

    signal = self._namespace.signal(event)
    signal.send(event, **payload)

record(*events, include_sender=False)

Capture emissions for events inside a context manager.

When events is omitted the recorder listens to :data:STANDARD_EVENTS. The returned object exposes captured payloads for assertions and debugging.

Source code in src/tileable/events.py
61
62
63
64
65
66
67
68
69
70
def record(self, *events: str | Iterable[str], include_sender: bool = False) -> EventRecorder:
    """Capture emissions for ``events`` inside a context manager.

    When ``events`` is omitted the recorder listens to
    :data:`STANDARD_EVENTS`. The returned object exposes captured
    payloads for assertions and debugging.
    """

    names = _normalize_event_names(events)
    return EventRecorder(self, names, include_sender=include_sender)

subscribe(event, handler, *, weak=False)

Register handler for event and return an unsubscribe callback.

Source code in src/tileable/events.py
48
49
50
51
52
53
def subscribe(self, event: str, handler: Callable[..., Any], *, weak: bool = False) -> Callable[[], None]:
    """Register ``handler`` for ``event`` and return an unsubscribe callback."""

    signal = self._namespace.signal(event)
    signal.connect(handler, weak=weak)
    return lambda: signal.disconnect(handler)

unsubscribe(event, handler)

Remove handler from event subscribers if previously registered.

Source code in src/tileable/events.py
55
56
57
58
59
def unsubscribe(self, event: str, handler: Callable[..., Any]) -> None:
    """Remove ``handler`` from ``event`` subscribers if previously registered."""

    signal = self._namespace.signal(event)
    signal.disconnect(handler)

EventRecorder

Context manager that records events emitted by :class:EventBus.

Source code in src/tileable/events.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class EventRecorder:
    """Context manager that records events emitted by :class:`EventBus`."""

    def __init__(self, bus: EventBus, events: Iterable[str], *, include_sender: bool) -> None:
        event_names = tuple(dict.fromkeys(events))
        if not event_names:
            msg = "EventRecorder requires at least one event name"
            raise ValueError(msg)

        self._bus = bus
        self._event_names = event_names
        self._include_sender = include_sender
        self._records: list[CapturedEvent] = []
        self._subscriptions: list[Callable[[], None]] = []

    def __enter__(self) -> EventRecorder:
        for name in self._event_names:
            handler = self._create_handler(name)
            unsubscribe = self._bus.subscribe(name, handler, weak=False)
            self._subscriptions.append(unsubscribe)
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        tb: TracebackType | None,
    ) -> None:
        while self._subscriptions:
            unsubscribe = self._subscriptions.pop()
            unsubscribe()

    def __iter__(self) -> Iterator[CapturedEvent]:
        return iter(self._records)

    def __len__(self) -> int:
        return len(self._records)

    @property
    def events(self) -> tuple[str, ...]:
        """Names of the events being recorded."""

        return self._event_names

    def records(self) -> list[CapturedEvent]:
        """Return captured events in chronological order."""

        return list(self._records)

    def payloads(self, name: str | None = None) -> list[dict[str, Any]]:
        """Return recorded payloads.

        When ``name`` is provided only events matching the name are
        returned. Copies of the payload dictionaries are produced so callers
        can mutate them freely.
        """

        entries = self._records if name is None else [record for record in self._records if record.name == name]
        return [record.payload.copy() for record in entries]

    def last(self, name: str | None = None) -> CapturedEvent | None:
        """Return the most recent captured event, optionally filtered by name."""

        if name is None:
            return self._records[-1] if self._records else None
        for record in reversed(self._records):
            if record.name == name:
                return record
        return None

    def clear(self) -> None:
        """Drop any previously captured events."""

        self._records.clear()

    def _create_handler(self, name: str) -> Callable[..., None]:
        def handler(sender: Any, **payload: Any) -> None:
            captured = CapturedEvent(name=name, payload=dict(payload), sender=sender if self._include_sender else None)
            self._records.append(captured)

        return handler

events property

Names of the events being recorded.

clear()

Drop any previously captured events.

Source code in src/tileable/events.py
169
170
171
172
def clear(self) -> None:
    """Drop any previously captured events."""

    self._records.clear()

last(name=None)

Return the most recent captured event, optionally filtered by name.

Source code in src/tileable/events.py
159
160
161
162
163
164
165
166
167
def last(self, name: str | None = None) -> CapturedEvent | None:
    """Return the most recent captured event, optionally filtered by name."""

    if name is None:
        return self._records[-1] if self._records else None
    for record in reversed(self._records):
        if record.name == name:
            return record
    return None

payloads(name=None)

Return recorded payloads.

When name is provided only events matching the name are returned. Copies of the payload dictionaries are produced so callers can mutate them freely.

Source code in src/tileable/events.py
148
149
150
151
152
153
154
155
156
157
def payloads(self, name: str | None = None) -> list[dict[str, Any]]:
    """Return recorded payloads.

    When ``name`` is provided only events matching the name are
    returned. Copies of the payload dictionaries are produced so callers
    can mutate them freely.
    """

    entries = self._records if name is None else [record for record in self._records if record.name == name]
    return [record.payload.copy() for record in entries]

records()

Return captured events in chronological order.

Source code in src/tileable/events.py
143
144
145
146
def records(self) -> list[CapturedEvent]:
    """Return captured events in chronological order."""

    return list(self._records)

get_event_bus()

Get a shared :class:EventBus instance.

The helper keeps the footprint tiny for users who prefer not to manually manage bus instances yet allows passing a custom bus everywhere if desired.

Source code in src/tileable/events.py
76
77
78
79
80
81
82
83
84
85
86
87
def get_event_bus() -> EventBus:
    """Get a shared :class:`EventBus` instance.

    The helper keeps the footprint tiny for users who prefer not to manually
    manage bus instances yet allows passing a custom bus everywhere if desired.
    """

    global _default_bus

    if _default_bus is None:
        _default_bus = EventBus()
    return _default_bus

Tile registry keeping track of available tile classes.

TileRecord dataclass

Metadata describing a registered tile.

Source code in src/tileable/registry.py
13
14
15
16
17
18
19
20
@dataclass(frozen=True)
class TileRecord:
    """Metadata describing a registered tile."""

    name: str
    tile_cls: type[Tile[Any, Any]]
    description: str | None
    source: str | None = None

TileRegistry

Central registry for tile classes.

Source code in src/tileable/registry.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class TileRegistry:
    """Central registry for tile classes."""

    def __init__(self) -> None:
        self._tiles: dict[str, TileRecord] = {}

    def register(self, tile_cls: type[Tile[Any, Any]], *, source: str | None = None) -> None:
        if not issubclass(tile_cls, Tile):
            raise TileRegistrationError.not_subclass(tile_cls)
        name = getattr(tile_cls, "name", None)
        if not name:
            raise TileRegistrationError.missing_name(tile_cls)
        if name in self._tiles:
            raise TileRegistrationError.duplicate(name)
        description = getattr(tile_cls, "description", None)
        self._tiles[name] = TileRecord(name=name, tile_cls=tile_cls, description=description, source=source)

    def bulk_register(self, tiles: Iterable[type[Tile[Any, Any]]], *, source: str | None = None) -> None:
        for tile_cls in tiles:
            self.register(tile_cls, source=source)

    def get(self, name: str) -> type[Tile[Any, Any]]:
        try:
            return self._tiles[name].tile_cls
        except KeyError as exc:
            raise TileLookupError.missing(name) from exc

    def info(self, name: str) -> TileRecord:
        try:
            return self._tiles[name]
        except KeyError as exc:
            raise TileLookupError.missing(name) from exc

    def list(self) -> list[TileRecord]:
        return list(self._tiles.values())

    def __contains__(self, name: object) -> bool:
        return name in self._tiles

Plugin integration powered by :mod:pluggy.

HookSpecs

Hook specification container registered with :class:pluggy.PluginManager.

Source code in src/tileable/plugins.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class HookSpecs:
    """Hook specification container registered with :class:`pluggy.PluginManager`."""

    @hookspec
    def tile_specs(self) -> Iterable[type]:  # pragma: no cover - executed via Pluggy
        """Yield or return an iterable of tile classes to register."""
        raise NotImplementedError

    @hookspec
    def tile_startup(self, ctx: Any, tile: Any) -> None:  # pragma: no cover - executed via Pluggy
        """Run before a tile executes."""
        raise NotImplementedError

    @hookspec
    def tile_shutdown(
        self, ctx: Any, tile: Any, error: BaseException | None
    ) -> None:  # pragma: no cover - executed via Pluggy
        """Run after a tile executes, regardless of success."""
        raise NotImplementedError

tile_shutdown(ctx, tile, error)

Run after a tile executes, regardless of success.

Source code in src/tileable/plugins.py
29
30
31
32
33
34
@hookspec
def tile_shutdown(
    self, ctx: Any, tile: Any, error: BaseException | None
) -> None:  # pragma: no cover - executed via Pluggy
    """Run after a tile executes, regardless of success."""
    raise NotImplementedError

tile_specs()

Yield or return an iterable of tile classes to register.

Source code in src/tileable/plugins.py
19
20
21
22
@hookspec
def tile_specs(self) -> Iterable[type]:  # pragma: no cover - executed via Pluggy
    """Yield or return an iterable of tile classes to register."""
    raise NotImplementedError

tile_startup(ctx, tile)

Run before a tile executes.

Source code in src/tileable/plugins.py
24
25
26
27
@hookspec
def tile_startup(self, ctx: Any, tile: Any) -> None:  # pragma: no cover - executed via Pluggy
    """Run before a tile executes."""
    raise NotImplementedError

TilePluginManager

Thin wrapper around :class:pluggy.PluginManager with helpful utilities.

Source code in src/tileable/plugins.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class TilePluginManager:
    """Thin wrapper around :class:`pluggy.PluginManager` with helpful utilities."""

    def __init__(self) -> None:
        self._manager = pluggy.PluginManager("tileable")
        self._manager.add_hookspecs(HookSpecs)

    @property
    def hook(self) -> pluggy.PluginManager.hook:  # type: ignore[valid-type]
        return self._manager.hook

    def register(self, plugin: Any, name: str | None = None) -> None:
        if self._manager.is_registered(plugin):
            raise PluginError("register", ValueError(f"Plugin {plugin!r} is already registered"))
        if name is not None and self._manager.has_plugin(name):
            raise PluginError("register", ValueError(f"Plugin name '{name}' is already registered"))
        self._manager.register(plugin, name=name)

    def iter_tiles(self) -> Iterable[type]:
        hook = self._manager.hook.tile_specs
        try:
            contributions = hook()
        except Exception as exc:  # pragma: no cover - defensive path
            raise PluginError("tile_specs", exc) from exc

        implementations = hook.get_hookimpls()
        for impl, contribution in zip(implementations, contributions, strict=False):
            if contribution is None:
                continue
            if not isinstance(contribution, Iterable):
                plugin_name = impl.plugin_name or repr(impl.plugin)
                error = TypeError(f"Plugin {plugin_name} returned non-iterable tile_specs result")
                raise PluginError("tile_specs", error) from error
            yield from contribution

    def startup(self, *, ctx: Any, tile: Any) -> None:
        try:
            self._manager.hook.tile_startup(ctx=ctx, tile=tile)
        except Exception as exc:
            raise PluginError("tile_startup", exc) from exc

    def shutdown(self, *, ctx: Any, tile: Any, error: BaseException | None) -> None:
        try:
            self._manager.hook.tile_shutdown(ctx=ctx, tile=tile, error=error)
        except Exception as exc:
            raise PluginError("tile_shutdown", exc) from exc

Execution helpers for running tiles.

ainvoke_tile(tile, payload, *, registry=None, event_bus=None, plugins=None, services=None, state=None, return_context=False) async

Async counterpart to :func:invoke_tile.

Source code in src/tileable/runtime.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
async def ainvoke_tile(
    tile: TileRef,
    payload: Any,
    *,
    registry: TileRegistry | None = None,
    event_bus: EventBus | None = None,
    plugins: TilePluginManager | None = None,
    services: Mapping[str, Any] | None = None,
    state: MutableMapping[str, Any] | None = None,
    return_context: bool = False,
) -> Any:
    """Async counterpart to :func:`invoke_tile`."""

    tile_cls, tile_obj, ctx, resolved_bus, resolved_plugins = _resolve_invocation(
        tile,
        payload,
        registry=registry,
        event_bus=event_bus,
        plugins=plugins,
        services=services,
        state=state,
    )

    started = False
    try:
        _start_invocation(
            tile_cls=tile_cls,
            tile_obj=tile_obj,
            payload=payload,
            ctx=ctx,
            event_bus=resolved_bus,
            plugins=resolved_plugins,
        )
        started = True
        result = await tile_obj.aexecute(payload)
    except TileExecutionError:
        raise
    except Exception as exc:
        _handle_execution_failure(
            tile_cls=tile_cls,
            tile_obj=tile_obj,
            payload=payload,
            ctx=ctx,
            event_bus=resolved_bus,
            plugins=resolved_plugins,
            error=exc,
        )
    else:
        completed = _complete_invocation(
            tile_cls=tile_cls,
            tile_obj=tile_obj,
            payload=payload,
            result=result,
            ctx=ctx,
            event_bus=resolved_bus,
            plugins=resolved_plugins,
        )
        return (completed, ctx) if return_context else completed
    finally:
        _finalize_invocation(
            tile_cls=tile_cls,
            tile_obj=tile_obj,
            payload=payload,
            event_bus=resolved_bus,
            started=started,
        )

invoke_tile(tile, payload, *, registry=None, event_bus=None, plugins=None, services=None, state=None, return_context=False)

Execute tile synchronously and return the result.

Source code in src/tileable/runtime.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def invoke_tile(
    tile: TileRef,
    payload: Any,
    *,
    registry: TileRegistry | None = None,
    event_bus: EventBus | None = None,
    plugins: TilePluginManager | None = None,
    services: Mapping[str, Any] | None = None,
    state: MutableMapping[str, Any] | None = None,
    return_context: bool = False,
) -> Any:
    """Execute ``tile`` synchronously and return the result."""

    tile_cls, tile_obj, ctx, resolved_bus, resolved_plugins = _resolve_invocation(
        tile,
        payload,
        registry=registry,
        event_bus=event_bus,
        plugins=plugins,
        services=services,
        state=state,
    )

    started = False
    try:
        _start_invocation(
            tile_cls=tile_cls,
            tile_obj=tile_obj,
            payload=payload,
            ctx=ctx,
            event_bus=resolved_bus,
            plugins=resolved_plugins,
        )
        started = True
        result = tile_obj.execute(payload)
    except TileExecutionError:
        raise
    except Exception as exc:
        _handle_execution_failure(
            tile_cls=tile_cls,
            tile_obj=tile_obj,
            payload=payload,
            ctx=ctx,
            event_bus=resolved_bus,
            plugins=resolved_plugins,
            error=exc,
        )
    else:
        completed = _complete_invocation(
            tile_cls=tile_cls,
            tile_obj=tile_obj,
            payload=payload,
            result=result,
            ctx=ctx,
            event_bus=resolved_bus,
            plugins=resolved_plugins,
        )
        return (completed, ctx) if return_context else completed
    finally:
        _finalize_invocation(
            tile_cls=tile_cls,
            tile_obj=tile_obj,
            payload=payload,
            event_bus=resolved_bus,
            started=started,
        )