-
Notifications
You must be signed in to change notification settings - Fork 19.5k
fix: reuse httpx client for plugin daemon requests #30684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
remove unnecessary time.sleep in dispatcher
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request addresses performance degradation in workflows with many plugin calls by implementing TCP connection pooling and removing unnecessary sleep delays.
Key changes:
- Introduced a shared httpx.Client instance for plugin daemon requests to enable connection reuse
- Removed the sleep(0.1) call in the dispatcher when the event queue is empty
- Updated all test mocks to patch the new client instance methods
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
api/core/plugin/impl/base.py |
Introduces module-level _httpx_client instance and updates _request() and _stream_request() methods to use it instead of creating new connections per request |
api/core/workflow/graph_engine/orchestration/dispatcher.py |
Removes unnecessary time.sleep(0.1) call in the empty queue exception handler since queue.get(timeout=0.1) already provides the wait mechanism |
api/tests/unit_tests/core/plugin/test_plugin_runtime.py |
Updates all test patches from httpx.request and httpx.stream to core.plugin.impl.base._httpx_client.request and core.plugin.impl.base._httpx_client.stream to match the new implementation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| T = TypeVar("T", bound=(BaseModel | dict[str, Any] | list[Any] | bool | str)) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
| _httpx_client = httpx.Client(timeout=plugin_daemon_request_timeout) |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module-level httpx.Client instance is created without a cleanup mechanism. The client should be properly closed when the application shuts down to avoid resource leaks. Consider using the http_client_pooling utility (similar to code_executor.py and ssrf_proxy.py) which provides automatic cleanup via atexit hooks, or add an atexit.register call to close this client.
| T = TypeVar("T", bound=(BaseModel | dict[str, Any] | list[Any] | bool | str)) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
| _httpx_client = httpx.Client(timeout=plugin_daemon_request_timeout) |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The httpx.Client is created without connection limits configuration. Similar to other HTTP client instances in the codebase (see code_executor.py and ssrf_proxy.py), this client should have httpx.Limits configured to control max_connections, max_keepalive_connections, and keepalive_expiry. This helps prevent resource exhaustion under high load and ensures predictable connection pooling behavior.
| _httpx_client = httpx.Client(timeout=plugin_daemon_request_timeout) | |
| _httpx_client = httpx.Client( | |
| timeout=plugin_daemon_request_timeout, | |
| limits=httpx.Limits( | |
| max_connections=100, | |
| max_keepalive_connections=20, | |
| keepalive_expiry=30.0, | |
| ), | |
| ) |
|
|
||
| try: | ||
| response = httpx.request(**request_kwargs) | ||
| response = _httpx_client.request(**request_kwargs) |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout parameter is being passed redundantly in request_kwargs since the httpx.Client was already initialized with a timeout at line 55. This creates duplicate timeout configuration. Consider removing the timeout from request_kwargs to avoid confusion, or document why both are needed if the per-request timeout is intended to override the client-level default.
|
|
||
| try: | ||
| with httpx.stream(**stream_kwargs) as response: | ||
| with _httpx_client.stream(**stream_kwargs) as response: |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout parameter is being passed redundantly in stream_kwargs since the httpx.Client was already initialized with a timeout at line 55. This creates duplicate timeout configuration. Consider removing the timeout from stream_kwargs to avoid confusion, or document why both are needed if the per-request timeout is intended to override the client-level default.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request effectively improves performance by reusing a single httpx.Client instance for all plugin daemon requests, which enables TCP connection pooling. The removal of the unnecessary time.sleep call in the dispatcher is also a good optimization. The tests have been updated comprehensively to reflect these changes.
My main feedback is to ensure the shared httpx.Client is properly closed on application shutdown to prevent potential resource leaks. I've added a specific suggestion in the code to handle this using the atexit module.
| T = TypeVar("T", bound=(BaseModel | dict[str, Any] | list[Any] | bool | str)) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
| _httpx_client = httpx.Client(timeout=plugin_daemon_request_timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While creating a shared httpx.Client is great for performance and connection pooling, it's important to ensure that the client is properly closed when the application shuts down to release network resources. A module-level client that is never closed can lead to resource leaks.
You can ensure cleanup by registering a closing function using the atexit module. Please add import atexit at the top of the file and then apply this suggestion.
_httpx_client = httpx.Client(timeout=plugin_daemon_request_timeout)
@atexit.register
def _close_httpx_client():
"""Close the httpx client on application exit."""
if not _httpx_client.is_closed:
_httpx_client.close()
Important
Fixes #<issue number>.Summary
Fixes: #30673
Introduce TCP connection pooling by replacing individual httx.request calls with a shared httx.Client instance, improving performance and reducing overhead for repeated HTTP requests.
Remove unnecessary sleep calls in the dispatcher when handling an empty event queue, preventing needless delays and optimizing throughput in idle scenarios.
Screenshots
Checklist
make lintandmake type-check(backend) andcd web && npx lint-staged(frontend) to appease the lint gods