Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 242 additions & 0 deletions stream_chat/async_chat/channel_batch_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
from typing import TYPE_CHECKING, List

from stream_chat.types.channel_batch import (
ChannelBatchMemberRequest,
ChannelDataUpdate,
ChannelsBatchFilters,
ChannelsBatchOptions,
)
from stream_chat.types.stream_response import StreamResponse

if TYPE_CHECKING:
from stream_chat.async_chat.client import StreamChatAsync


class ChannelBatchUpdater:
"""
Provides convenience methods for batch channel operations (async).
"""

def __init__(self, client: "StreamChatAsync") -> None:
self.client = client

async def add_members(
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
) -> StreamResponse:
"""
Adds members to channels matching the filter.

:param filter: The filter to match channels.
:param members: List of members to add.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "addMembers",
"filter": filter,
"members": members,
}
return await self.client.update_channels_batch(options)

async def remove_members(
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
) -> StreamResponse:
"""
Removes members from channels matching the filter.

:param filter: The filter to match channels.
:param members: List of members to remove.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "removeMembers",
"filter": filter,
"members": members,
}
return await self.client.update_channels_batch(options)

async def invite_members(
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
) -> StreamResponse:
"""
Invites members to channels matching the filter.

:param filter: The filter to match channels.
:param members: List of members to invite.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "inviteMembers",
"filter": filter,
"members": members,
}
return await self.client.update_channels_batch(options)

async def add_moderators(
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
) -> StreamResponse:
"""
Adds moderators to channels matching the filter.

:param filter: The filter to match channels.
:param members: List of members to add as moderators.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "addModerators",
"filter": filter,
"members": members,
}
return await self.client.update_channels_batch(options)

async def demote_moderators(
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
) -> StreamResponse:
"""
Removes moderator role from members in channels matching the filter.

:param filter: The filter to match channels.
:param members: List of members to demote from moderators.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "demoteModerators",
"filter": filter,
"members": members,
}
return await self.client.update_channels_batch(options)

async def assign_roles(
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
) -> StreamResponse:
"""
Assigns roles to members in channels matching the filter.

:param filter: The filter to match channels.
:param members: List of members with roles to assign.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "assignRoles",
"filter": filter,
"members": members,
}
return await self.client.update_channels_batch(options)

async def hide(
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
) -> StreamResponse:
"""
Hides channels matching the filter for the specified members.

:param filter: The filter to match channels.
:param members: List of members for whom to hide channels.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "hide",
"filter": filter,
"members": members,
}
return await self.client.update_channels_batch(options)

async def show(
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
) -> StreamResponse:
"""
Shows channels matching the filter for the specified members.

:param filter: The filter to match channels.
:param members: List of members for whom to show channels.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "show",
"filter": filter,
"members": members,
}
return await self.client.update_channels_batch(options)

async def archive(
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
) -> StreamResponse:
"""
Archives channels matching the filter for the specified members.

:param filter: The filter to match channels.
:param members: List of members for whom to archive channels.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "archive",
"filter": filter,
"members": members,
}
return await self.client.update_channels_batch(options)

async def unarchive(
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
) -> StreamResponse:
"""
Unarchives channels matching the filter for the specified members.

:param filter: The filter to match channels.
:param members: List of members for whom to unarchive channels.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "unarchive",
"filter": filter,
"members": members,
}
return await self.client.update_channels_batch(options)

async def update_data(
self, filter: ChannelsBatchFilters, data: ChannelDataUpdate
) -> StreamResponse:
"""
Updates data on channels matching the filter.

:param filter: The filter to match channels.
:param data: Channel data to update.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "updateData",
"filter": filter,
"data": data,
}
return await self.client.update_channels_batch(options)

async def add_filter_tags(
self, filter: ChannelsBatchFilters, tags: List[str]
) -> StreamResponse:
"""
Adds filter tags to channels matching the filter.

:param filter: The filter to match channels.
:param tags: List of filter tags to add.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "addFilterTags",
"filter": filter,
"filter_tags_update": tags,
}
return await self.client.update_channels_batch(options)

async def remove_filter_tags(
self, filter: ChannelsBatchFilters, tags: List[str]
) -> StreamResponse:
"""
Removes filter tags from channels matching the filter.

:param filter: The filter to match channels.
:param tags: List of filter tags to remove.
:return: StreamResponse containing task_id.
"""
options: ChannelsBatchOptions = {
"operation": "removeFilterTags",
"filter": filter,
"filter_tags_update": tags,
}
return await self.client.update_channels_batch(options)
30 changes: 30 additions & 0 deletions stream_chat/async_chat/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import warnings
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
AsyncContextManager,
Callable,
Expand All @@ -15,12 +16,17 @@
Union,
cast,
)

if TYPE_CHECKING:
from stream_chat.async_chat.channel_batch_updater import ChannelBatchUpdater

from urllib.parse import urlparse

from stream_chat.async_chat.campaign import Campaign
from stream_chat.async_chat.segment import Segment
from stream_chat.types.base import SortParam
from stream_chat.types.campaign import CampaignData, QueryCampaignsOptions
from stream_chat.types.channel_batch import ChannelsBatchOptions
from stream_chat.types.draft import QueryDraftsFilter, QueryDraftsOptions
from stream_chat.types.segment import (
QuerySegmentsOptions,
Expand Down Expand Up @@ -1028,6 +1034,30 @@ async def mark_delivered_simple(

return await self.mark_delivered(data)

async def update_channels_batch(
self, options: ChannelsBatchOptions
) -> StreamResponse:
"""
Updates channels in batch based on the provided options.

:param options: ChannelsBatchOptions containing operation, filter, and operation-specific data.
:return: StreamResponse containing task_id.
"""
if options is None:
raise ValueError("options must not be None")

return await self.put("channels/batch", data=options)

def channel_batch_updater(self) -> "ChannelBatchUpdater":
"""
Returns a ChannelBatchUpdater instance for batch channel operations.

:return: ChannelBatchUpdater instance.
"""
from stream_chat.async_chat.channel_batch_updater import ChannelBatchUpdater

return ChannelBatchUpdater(self)

async def close(self) -> None:
await self.session.close()

Expand Down
21 changes: 21 additions & 0 deletions stream_chat/base/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,27 @@ def get_task(
"""
pass

@abc.abstractmethod
def update_channels_batch(
self, options: Any
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
"""
Updates channels in batch based on the provided options.

:param options: ChannelsBatchOptions containing operation, filter, and operation-specific data.
:return: StreamResponse containing task_id.
"""
pass

@abc.abstractmethod
def channel_batch_updater(self) -> Any:
"""
Returns a ChannelBatchUpdater instance for batch channel operations.

:return: ChannelBatchUpdater instance.
"""
pass

@abc.abstractmethod
def send_user_custom_event(
self, user_id: str, event: Dict
Expand Down
Loading