Skip to content

Conversation

@praateekmahajan
Copy link
Contributor

@praateekmahajan praateekmahajan commented Nov 25, 2025

Description

This pull request refactors the workflow interfaces for the deduplication pipelines (exact, fuzzy, and semantic) to standardize their outputs and improve usability.

Core API and Interface Refactoring

  • Introduced a new WorkflowRunResult dataclass in nemo_curator/pipeline/workflow.py to encapsulate workflow outputs, pipeline task mappings, and metadata. Also added an abstract WorkflowBase class to standardize workflow interfaces.
  • Updated all deduplication workflow classes (ExactDeduplicationWorkflow, FuzzyDeduplicationWorkflow, SemanticDeduplicationWorkflow) to inherit from WorkflowBase and to return a WorkflowRunResult from their run methods, instead of returning None or a dictionary.

Workflow Output and Metadata Improvements

  • Refactored the run methods of all workflows to collect and record detailed timing and result metadata (such as per-stage execution times and duplicate counts) into the WorkflowRunResult object.
  • Each pipeline stage now adds its results and timing to the result object.

Usage

# Add snippet demonstrating usage

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Signed-off-by: Praateek <praateekm@gmail.com>
…dd-workflow-results

Signed-off-by: Praateek <praateekm@gmail.com>
@copy-pr-bot
Copy link

copy-pr-bot bot commented Nov 25, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
@praateekmahajan
Copy link
Contributor Author

/ok to test af0787c

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Comment on lines 233 to 256
num_duplicates_identified = sum(
task._metadata.get("num_removal_ids", 0) for task in removal_id_tasks or []
)
if num_duplicates_identified == 0:
logger.info("No exact duplicates found in the dataset.")

if self.assign_id:
id_generator_path = os.path.join(self.output_path, ID_GENERATOR_OUTPUT_FILENAME)
write_id_generator_to_disk(
id_generator_path,
storage_options=self.write_kwargs.get("storage_options")
if self.write_kwargs is not None
else None,
)
logger.info(f"Id generator written to {id_generator_path}")
end_time = time.time()
logger.info(f"Exact deduplication pipeline completed in {(end_time - start_time):.2f} seconds")
finally:
if self.assign_id:
kill_id_generator_actor()

total_end_time = time.time()
total_time = total_end_time - total_start_time
workflow_summary = {
"total_time": total_time,
"num_duplicates": num_duplicates_identified,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable num_duplicates_identified is defined inside the try block (line 233) but is referenced outside the try-finally blocks at line 256 in workflow_summary. If an exception occurs before line 233 is executed (e.g., during input filegroups pipeline or identification pipeline setup), the variable will be undefined, causing a NameError when line 256 tries to access it.

Fix: Initialize num_duplicates_identified = 0 before the try block (after line 212):

Suggested change
id_generator_path = None
num_duplicates_identified = 0
try:

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 10, 2026

Additional Comments (2)

nemo_curator/stages/text/deduplication/semantic.py
Multiple timing variables (embedding_time at line 472, semantic_time at line 495, removal_time at line 514) are defined inside the try block but are referenced outside it in the metadata dictionary construction (lines 550-554).

If an exception occurs during workflow execution before these variables are assigned (e.g., during embedding generation, semantic deduplication, or removal), the code at lines 547-562 (which executes after the except block re-raises) will attempt to access undefined variables, causing a NameError.

The except block at lines 543-545 re-raises the exception, but lines 547-562 still execute afterward.

Fix: Initialize all timing variables before the try block:

        total_start_time = time.time()
        workflow_result = WorkflowRunResult(workflow_name="text_semantic_deduplication")
        num_duplicates_identified = 0
        embedding_time = 0.0
        semantic_time = 0.0
        removal_time = 0.0

        try:

nemo_curator/stages/text/deduplication/semantic.py
TextSemanticDeduplicationWorkflow does not inherit from WorkflowBase, unlike the other deduplication workflows (ExactDeduplicationWorkflow, FuzzyDeduplicationWorkflow, SemanticDeduplicationWorkflow, TextDuplicatesRemovalWorkflow).

This creates inconsistency in the workflow interface - all workflows should inherit from WorkflowBase to standardize the interface as mentioned in the PR description.

class TextSemanticDeduplicationWorkflow(WorkflowBase):

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants