Skip to content

Conversation

@mscolnick
Copy link
Contributor

@mscolnick mscolnick commented Dec 19, 2025

This is further refactoring to make SessionConsumer a type of extension (plus some).

@vercel
Copy link

vercel bot commented Dec 19, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Review Updated (UTC)
marimo-docs Ready Ready Preview, Comment Dec 22, 2025 4:19pm

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors SessionConsumer to become a type of SessionExtension, enabling a more composable architecture where consumers can be treated as extensions to the session lifecycle. Key changes include renaming lifecycle methods (on_starton_attach, on_stopon_detach, write_operationnotify) and introducing a new NotificationListenerExtension to handle kernel message distribution.

  • Moved SessionConsumer from marimo._server.model to marimo._server.consumer as an abstract base class extending SessionExtension
  • Introduced NotificationListenerExtension to handle kernel notification distribution, extracting this logic from the session core
  • Updated all consumers and tests to use the new lifecycle method names and notify signature

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
marimo/_server/consumer.py New file defining SessionConsumer as an ABC extending SessionExtension with updated method signatures
marimo/_server/sessions/extensions/types.py New file defining the SessionExtension protocol with on_attach/on_detach lifecycle hooks
marimo/_server/sessions/extensions/extensions.py Moved SessionExtension definition to types.py and added NotificationListenerExtension to handle kernel message distribution
marimo/_server/sessions/session.py Updated session to treat consumers as extensions, removed message_distributor logic (moved to extension), and renamed write_operation to notify
marimo/_server/sessions/room.py Simplified room.close() and updated broadcast to work with KernelMessage instead of MessageOperation
marimo/_server/api/endpoints/ws.py Updated WebSocketHandler to implement new SessionConsumer interface with on_attach/on_detach and notify methods
marimo/_server/export/init.py Updated RunUntilCompletionSessionConsumer to implement new SessionConsumer interface
marimo/_server/sessions/utils.py Updated to serialize operations before notifying consumers
marimo/_server/sessions/types.py Updated Session protocol to use notify instead of write_operation, moved imports to TYPE_CHECKING
tests/_server/test_sessions.py Updated all test assertions to use new method names (on_attach, on_detach, notify)
tests/_server/test_session_manager.py Updated imports to use SessionConsumer from new location
tests/_server/sessions/test_file_change_handler.py Updated mock session to use notify instead of write_operation
marimo/_server/start.py Updated to use session.notify instead of session.write_operation
marimo/_server/sessions/file_change_handler.py Updated to use session.notify and moved Session import to TYPE_CHECKING
marimo/_server/sessions/session_manager.py Updated to use session.notify and moved SessionConsumer import
marimo/_server/sessions/events.py Added on_notification_sent event and emit_notification_sent method
marimo/_server/model.py Removed SessionConsumer protocol (moved to consumer.py)
marimo/_server/api/endpoints/execution.py Updated to use session.notify
marimo/_server/api/endpoints/editing.py Updated to use session.notify
marimo/_server/api/deps.py Moved Session import to TYPE_CHECKING
marimo/_ai/_tools/tools/datasource.py Moved Session import to TYPE_CHECKING
marimo/_ai/_tools/base.py Moved Session and SessionManager imports to TYPE_CHECKING
marimo/_server/sessions/extensions/init.py New file with copyright header

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.



class WebsocketHandler(SessionConsumer):
class WebsocketHandler(SessionConsumer, SessionExtension):
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WebsocketHandler inherits from both SessionConsumer and SessionExtension, but SessionConsumer already inherits from SessionExtension (as defined in marimo/_server/consumer.py). The explicit SessionExtension inheritance is redundant and can be removed.

Copilot uses AI. Check for mistakes.
"""
Consumer for a session. This extends the SessionExtension interface.
This allows use to communicate with a session via different
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in docstring: "use to" should be "us to"

Suggested change
This allows use to communicate with a session via different
This allows us to communicate with a session via different

Copilot uses AI. Check for mistakes.
consumer.write_operation(operation)
consumer.notify(notification)

def close(self) -> None:
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The close method no longer calls on_detach for consumers, making it inconsistent with remove_consumer which does call on_detach. While this works correctly in practice because session.close() calls _detach_extensions() first, the room.close() method is now asymmetric with room.add_consumer/remove_consumer, which could be confusing for maintainability.

Suggested change
def close(self) -> None:
def close(self) -> None:
# Detach all consumers before clearing the room to keep behavior
# consistent with remove_consumer.
for consumer in list(self.consumers.keys()):
consumer.on_detach()

Copilot uses AI. Check for mistakes.
Comment on lines +305 to +307
# Consumers are also extensions, so we want to attach them to the session
self.extensions.append(session_consumer)
session_consumer.on_attach(self, self._event_bus)
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The on_attach method is called twice for the initial session consumer during initialization. The flow is: init calls connect_consumer (line 169) which adds the consumer to extensions and explicitly calls on_attach (lines 306-307), then _attach_extensions is called (line 173) which calls on_attach again for all extensions including the newly added consumer. This double attachment could cause issues such as duplicate event subscriptions or resource initialization.

Suggested change
# Consumers are also extensions, so we want to attach them to the session
self.extensions.append(session_consumer)
session_consumer.on_attach(self, self._event_bus)
# Consumers are also extensions; register them so the extension
# attachment mechanism can attach them appropriately.
self.extensions.append(session_consumer)

Copilot uses AI. Check for mistakes.
# HACK: Ideally we don't need to reach into this extension directly
for extension in self.extensions:
if isinstance(extension, NotificationListenerExtension):
extension.distributor.flush()
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush_messages method accesses extension.distributor without checking if the attribute exists. If on_attach fails or hasn't been called yet, this will raise an AttributeError. Consider adding a check or using getattr with a default value.

Suggested change
extension.distributor.flush()
distributor = getattr(extension, "distributor", None)
if distributor is not None:
distributor.flush()

Copilot uses AI. Check for mistakes.
def on_attach(self, session: Session, event_bus: SessionEventBus) -> None:
del session
del event_bus
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably something that should be caught by linters instead of humans

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.astral.sh/ruff/rules/useless-return/, can add this, but need to clean up some old code

) -> None:
del session
del event_bus
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return None

def notify(self, notification: KernelMessage) -> None: ...

@abstractmethod
def connection_state(self) -> ConnectionState: ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
def connection_state(self) -> ConnectionState: ...
def get_connection_state(self) -> ConnectionState: ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can do this in a followup for renaming

Light2Dark
Light2Dark previously approved these changes Dec 22, 2025
@mscolnick mscolnick merged commit 5b14c18 into main Dec 22, 2025
38 of 41 checks passed
@mscolnick mscolnick deleted the ms/session-consumer-as-extension branch December 22, 2025 16:38
Comment on lines -88 to -97
session = app_state.require_current_session()
prev_path = session.app_file_manager.path

session.app_file_manager.rename(body.filename)
new_path = session.app_file_manager.path

if prev_path and new_path:
app_state.session_manager.recents.rename(prev_path, new_path)
elif new_path:
app_state.session_manager.recents.touch(new_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice to see these conditionals moved

Comment on lines 89 to +90
ttl_seconds: Optional[int],
extensions: list[SessionExtension] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tangent: are we fine to use | syntax now for Optional? Would be nice to be consistent with that across the codebase.

Copy link
Contributor Author

@mscolnick mscolnick Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we can move it all to | at some point. (easy by flipping on with a ruff rule). some type inference stuff like our custom parse_raw may require Optional in the dataclasses

@dmadisetti dmadisetti added the internal A refactor or improvement that is not user facing label Jan 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

internal A refactor or improvement that is not user facing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants