-
Notifications
You must be signed in to change notification settings - Fork 205
Add workflow results #1275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add workflow results #1275
Conversation
…dd-workflow-results Signed-off-by: Praateek <praateekm@gmail.com>
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
|
/ok to test af0787c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 files reviewed, 3 comments
| num_duplicates_identified = sum( | ||
| task._metadata.get("num_removal_ids", 0) for task in removal_id_tasks or [] | ||
| ) | ||
| if num_duplicates_identified == 0: | ||
| logger.info("No exact duplicates found in the dataset.") | ||
|
|
||
| if self.assign_id: | ||
| id_generator_path = os.path.join(self.output_path, ID_GENERATOR_OUTPUT_FILENAME) | ||
| write_id_generator_to_disk( | ||
| id_generator_path, | ||
| storage_options=self.write_kwargs.get("storage_options") | ||
| if self.write_kwargs is not None | ||
| else None, | ||
| ) | ||
| logger.info(f"Id generator written to {id_generator_path}") | ||
| end_time = time.time() | ||
| logger.info(f"Exact deduplication pipeline completed in {(end_time - start_time):.2f} seconds") | ||
| finally: | ||
| if self.assign_id: | ||
| kill_id_generator_actor() | ||
|
|
||
| total_end_time = time.time() | ||
| total_time = total_end_time - total_start_time | ||
| workflow_summary = { | ||
| "total_time": total_time, | ||
| "num_duplicates": num_duplicates_identified, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable num_duplicates_identified is defined inside the try block (line 233) but is referenced outside the try-finally blocks at line 256 in workflow_summary. If an exception occurs before line 233 is executed (e.g., during input filegroups pipeline or identification pipeline setup), the variable will be undefined, causing a NameError when line 256 tries to access it.
Fix: Initialize num_duplicates_identified = 0 before the try block (after line 212):
| id_generator_path = None | |
| num_duplicates_identified = 0 | |
| try: |
Additional Comments (2)
If an exception occurs during workflow execution before these variables are assigned (e.g., during embedding generation, semantic deduplication, or removal), the code at lines 547-562 (which executes after the except block re-raises) will attempt to access undefined variables, causing a The except block at lines 543-545 re-raises the exception, but lines 547-562 still execute afterward. Fix: Initialize all timing variables before the try block:
This creates inconsistency in the workflow interface - all workflows should inherit from Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
Description
This pull request refactors the workflow interfaces for the deduplication pipelines (exact, fuzzy, and semantic) to standardize their outputs and improve usability.
Core API and Interface Refactoring
WorkflowRunResultdataclass innemo_curator/pipeline/workflow.pyto encapsulate workflow outputs, pipeline task mappings, and metadata. Also added an abstractWorkflowBaseclass to standardize workflow interfaces.ExactDeduplicationWorkflow,FuzzyDeduplicationWorkflow,SemanticDeduplicationWorkflow) to inherit fromWorkflowBaseand to return aWorkflowRunResultfrom theirrunmethods, instead of returningNoneor a dictionary.Workflow Output and Metadata Improvements
runmethods of all workflows to collect and record detailed timing and result metadata (such as per-stage execution times and duplicate counts) into theWorkflowRunResultobject.Usage
# Add snippet demonstrating usageChecklist