Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Store descriptive metrics identified by pipeline run id [cog-1260] #582

Merged
merged 8 commits into from
Mar 3, 2025

Conversation

alekszievr
Copy link
Contributor

@alekszievr alekszievr commented Feb 26, 2025

Description

DCO Affirmation

I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin

Summary by CodeRabbit

  • New Features

    • Introduced a new analytic capability that calculates descriptive graph metrics for pipeline runs when enabled.
    • Updated the execution flow to include an option for activating the graph metrics step.
  • Chores

    • Removed the previous mechanism for storing descriptive metrics to streamline the system.

Copy link
Contributor

coderabbitai bot commented Feb 26, 2025

Walkthrough

This update adds a new pipeline run metrics functionality. A new file with asynchronous functions is introduced to calculate and store descriptive metrics for pipeline runs. The main example script now assigns the output of the pipeline run process, checks if graph metrics are enabled, and conditionally calls the new metrics function. In contrast, the previously available store_descriptive_metrics functionality has been removed from both the public interface and its source file.

Changes

File(s) Change Summary
cognee/__init__.py Added import for get_pipeline_run_metrics from .../operations/get_pipeline_run_metrics.py.
cognee/modules/data/methods/__init__.py
cognee/modules/data/methods/store_descriptive_metrics.py
Removed the store_descriptive_metrics function and its associated asynchronous methods.
examples/python/dynamic_steps_example.py Modified the main function to assign pipeline_run, added a conditional check for "graph_metrics", replaced the previous comment with "Calculate descriptive metrics", and updated steps_to_enable with the "graph_metrics" key.
cognee/modules/data/operations/get_pipeline_run_metrics.py Introduced as a new file containing async functions: fetch_token_count (to query token count) and get_pipeline_run_metrics (to compute and store graph metrics for pipeline runs).

Sequence Diagram(s)

sequenceDiagram
    participant Ex as "Dynamic Steps Example"
    participant Cog as "Cognee Module"
    participant Ops as "Metrics Operations"
    participant DB as "Database Engine"
    
    Ex->>Cog: await cognify()
    Cog-->>Ex: pipeline_run
    Ex->>Cog: Check if "graph_metrics" enabled
    Cog->>Ops: get_pipeline_run_metrics(pipeline_run, include_optional=True)
    Ops->>DB: execute fetch_token_count query
    DB-->>Ops: return token count
    Ops->>DB: commit new GraphMetrics record
    Ops-->>Cog: return metrics
    Cog-->>Ex: proceed with updated pipeline_run metrics
Loading

Poem

I'm a rabbit with a happy hop,
Skipping through lines of code non-stop.
New metrics now join our burrowed den,
While old ones vanish like last season's pen.
With whiskers twitching, I cheer the change so bright!

✨ Finishing Touches
  • 📝 Generate Docstrings

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
cognee/modules/data/operations/store_descriptive_metrics.py (3)

27-62: Store descriptive metrics implementation.

The implementation of store_descriptive_metrics follows good practices:

  • It properly handles the case where metrics already exist for a pipeline run
  • It fetches token count using the previously defined function
  • It populates a GraphMetrics object with all required metrics
  • It commits changes to the database and returns the metrics

One suggestion for improvement:

Consider adding error handling for database operations and the call to graph_engine.get_graph_metrics(). These external operations could fail and should be handled gracefully.

 async def store_descriptive_metrics(pipeline_runs: list[PipelineRun], include_optional: bool):
     db_engine = get_relational_engine()
     graph_engine = await get_graph_engine()
 
     metrics_for_pipeline_runs = []
 
     async with db_engine.get_async_session() as session:
-        for pipeline_run in pipeline_runs:
-            existing_metrics = await session.execute(
-                select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id)
-            )
-            existing_metrics = existing_metrics.scalars().first()
+        try:
+            for pipeline_run in pipeline_runs:
+                try:
+                    existing_metrics = await session.execute(
+                        select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id)
+                    )
+                    existing_metrics = existing_metrics.scalars().first()
 
-            if existing_metrics:
-                metrics_for_pipeline_runs.append(existing_metrics)
-            else:
-                graph_metrics = await graph_engine.get_graph_metrics(include_optional)
-                metrics = GraphMetrics(
-                    id=pipeline_run.pipeline_run_id,
-                    num_tokens=await fetch_token_count(db_engine),
-                    num_nodes=graph_metrics["num_nodes"],
-                    num_edges=graph_metrics["num_edges"],
-                    mean_degree=graph_metrics["mean_degree"],
-                    edge_density=graph_metrics["edge_density"],
-                    num_connected_components=graph_metrics["num_connected_components"],
-                    sizes_of_connected_components=graph_metrics["sizes_of_connected_components"],
-                    num_selfloops=graph_metrics["num_selfloops"],
-                    diameter=graph_metrics["diameter"],
-                    avg_shortest_path_length=graph_metrics["avg_shortest_path_length"],
-                    avg_clustering=graph_metrics["avg_clustering"],
-                )
-                metrics_for_pipeline_runs.append(metrics)
-                session.add(metrics)
-        await session.commit()
+                    if existing_metrics:
+                        metrics_for_pipeline_runs.append(existing_metrics)
+                    else:
+                        try:
+                            graph_metrics = await graph_engine.get_graph_metrics(include_optional)
+                            token_count = await fetch_token_count(db_engine)
+                            metrics = GraphMetrics(
+                                id=pipeline_run.pipeline_run_id,
+                                num_tokens=token_count,
+                                num_nodes=graph_metrics["num_nodes"],
+                                num_edges=graph_metrics["num_edges"],
+                                mean_degree=graph_metrics["mean_degree"],
+                                edge_density=graph_metrics["edge_density"],
+                                num_connected_components=graph_metrics["num_connected_components"],
+                                sizes_of_connected_components=graph_metrics["sizes_of_connected_components"],
+                                num_selfloops=graph_metrics["num_selfloops"],
+                                diameter=graph_metrics["diameter"],
+                                avg_shortest_path_length=graph_metrics["avg_shortest_path_length"],
+                                avg_clustering=graph_metrics["avg_clustering"],
+                            )
+                            metrics_for_pipeline_runs.append(metrics)
+                            session.add(metrics)
+                        except Exception as e:
+                            # Log the error but continue processing other pipeline runs
+                            print(f"Error fetching graph metrics for pipeline run {pipeline_run.pipeline_run_id}: {str(e)}")
+                except Exception as e:
+                    print(f"Error processing pipeline run {pipeline_run.pipeline_run_id}: {str(e)}")
+            await session.commit()
+        except Exception as e:
+            print(f"Database error: {str(e)}")
+            await session.rollback()

33-38: Consider adding pagination for large result sets.

When fetching metrics for many pipeline runs, consider implementing pagination to avoid potential memory issues with large result sets.

For applications where the number of pipeline runs could be large, consider implementing a pagination mechanism or processing pipeline runs in batches to avoid memory issues.


40-60: Transaction handling considerations.

The function correctly uses a single transaction for all operations, which is efficient. However, if processing a large number of pipeline runs, consider adding a batch commit strategy.

If the function is expected to process many pipeline runs at once, consider implementing batch processing with intermediate commits to prevent large transactions:

BATCH_SIZE = 100  # Adjust based on expected load

async def store_descriptive_metrics(pipeline_runs: list[PipelineRun], include_optional: bool):
    db_engine = get_relational_engine()
    graph_engine = await get_graph_engine()
    
    metrics_for_pipeline_runs = []
    
    # Process pipeline runs in batches
    for i in range(0, len(pipeline_runs), BATCH_SIZE):
        batch = pipeline_runs[i:i+BATCH_SIZE]
        
        async with db_engine.get_async_session() as session:
            for pipeline_run in batch:
                # Existing logic here
                
            await session.commit()
    
    return metrics_for_pipeline_runs
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f6ced41 and c38fd9f.

📒 Files selected for processing (5)
  • cognee/__init__.py (1 hunks)
  • cognee/modules/data/methods/__init__.py (0 hunks)
  • cognee/modules/data/methods/store_descriptive_metrics.py (0 hunks)
  • cognee/modules/data/operations/store_descriptive_metrics.py (1 hunks)
  • examples/python/dynamic_steps_example.py (2 hunks)
💤 Files with no reviewable changes (2)
  • cognee/modules/data/methods/init.py
  • cognee/modules/data/methods/store_descriptive_metrics.py
⏰ Context from checks skipped due to timeout of 90000ms (27)
  • GitHub Check: run_multimedia_example_test / test
  • GitHub Check: run_networkx_metrics_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on macos-15
  • GitHub Check: Test on macos-15
  • GitHub Check: run_dynamic_steps_example_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on macos-15
  • GitHub Check: Test on macos-13
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: run_eval_framework_test / test
  • GitHub Check: test
  • GitHub Check: Test on macos-13
  • GitHub Check: windows-latest
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: docker-compose-test
🔇 Additional comments (7)
cognee/__init__.py (1)

11-11: Import looks good.

The addition of this import makes the relocated store_descriptive_metrics function accessible from the top-level package, maintaining a clean public API.

examples/python/dynamic_steps_example.py (4)

183-183: Store pipeline_runs result for later use.

Good change to store the cognify() result as it's now needed for the new graph metrics functionality.


186-190: New step for calculating and storing descriptive metrics.

This is a good addition that implements the new functionality for calculating and storing descriptive metrics. The comments accurately reflect what the step does, and the conditional execution based on the graph_metrics flag follows the pattern of other steps in this file.


191-191: Updated step numbering.

The step numbering has been correctly updated to reflect the addition of the new step.


209-209: New step enabled in configuration.

The graph_metrics step has been properly added to the steps_to_enable dictionary and tied to the rebuild_kg flag, consistent with the pattern used for other related steps.

cognee/modules/data/operations/store_descriptive_metrics.py (2)

1-10: Imports look well-structured.

The imports are organized logically, grouping related modules and avoiding unnecessary dependencies.


12-25: Well-documented token count function.

The fetch_token_count function is well-documented with a clear purpose and return type. It efficiently fetches and sums token counts using asynchronous database session handling.

@alekszievr alekszievr self-assigned this Feb 26, 2025
@@ -180,10 +180,15 @@ async def main(enable_steps):

# Step 3: Create knowledge graph
if enable_steps.get("cognify"):
await cognee.cognify()
pipeline_runs = await cognee.cognify()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline_run, it's single instance right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, cognify returns a list of pipeline_run instances.
In this case yes, it is a list of length 1. Sure I can rename it here, I just wanted to match the next function's signature

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
cognee/modules/data/operations/get_pipeline_run_metrics.py (1)

41-56: Add defensive programming for graph metrics dictionary access.

The code assumes all keys will be present in the graph_metrics dictionary. Consider adding validation or using .get() with default values to handle missing keys gracefully.

graph_metrics = await graph_engine.get_graph_metrics(include_optional)
metrics = GraphMetrics(
    id=pipeline_run.pipeline_run_id,
    num_tokens=await fetch_token_count(db_engine),
-   num_nodes=graph_metrics["num_nodes"],
-   num_edges=graph_metrics["num_edges"],
-   mean_degree=graph_metrics["mean_degree"],
-   edge_density=graph_metrics["edge_density"],
-   num_connected_components=graph_metrics["num_connected_components"],
-   sizes_of_connected_components=graph_metrics["sizes_of_connected_components"],
-   num_selfloops=graph_metrics["num_selfloops"],
-   diameter=graph_metrics["diameter"],
-   avg_shortest_path_length=graph_metrics["avg_shortest_path_length"],
-   avg_clustering=graph_metrics["avg_clustering"],
+   num_nodes=graph_metrics.get("num_nodes", 0),
+   num_edges=graph_metrics.get("num_edges", 0),
+   mean_degree=graph_metrics.get("mean_degree", 0.0),
+   edge_density=graph_metrics.get("edge_density", 0.0),
+   num_connected_components=graph_metrics.get("num_connected_components", 0),
+   sizes_of_connected_components=graph_metrics.get("sizes_of_connected_components", []),
+   num_selfloops=graph_metrics.get("num_selfloops", 0),
+   diameter=graph_metrics.get("diameter", 0.0),
+   avg_shortest_path_length=graph_metrics.get("avg_shortest_path_length", 0.0),
+   avg_clustering=graph_metrics.get("avg_clustering", 0.0),
)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c38fd9f and d1c9b20.

📒 Files selected for processing (3)
  • cognee/__init__.py (1 hunks)
  • cognee/modules/data/operations/get_pipeline_run_metrics.py (1 hunks)
  • examples/python/dynamic_steps_example.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • cognee/init.py
  • examples/python/dynamic_steps_example.py
⏰ Context from checks skipped due to timeout of 90000ms (29)
  • GitHub Check: Test on macos-15
  • GitHub Check: run_networkx_metrics_test / test
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_dynamic_steps_example_test / test
  • GitHub Check: run_multimedia_example_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on macos-15
  • GitHub Check: Test on macos-15
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: windows-latest
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: run_eval_framework_test / test
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on macos-13
  • GitHub Check: test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: Build Cognee Backend Docker App Image
  • GitHub Check: docker-compose-test

Comment on lines +10 to +22
async def fetch_token_count(db_engine) -> int:
"""
Fetches and sums token counts from the database.

Returns:
int: The total number of tokens across all documents.
"""

async with db_engine.get_async_session() as session:
token_count_sum = await session.execute(select(func.sum(Data.token_count)))
token_count_sum = token_count_sum.scalar()

return token_count_sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Function needs error handling for database operations.

The fetch_token_count function could encounter database errors or return None if there are no records. Consider adding try/except blocks and handling the case when token_count_sum is None.

async def fetch_token_count(db_engine) -> int:
    """
    Fetches and sums token counts from the database.

    Returns:
        int: The total number of tokens across all documents.
    """

+    try:
        async with db_engine.get_async_session() as session:
            token_count_sum = await session.execute(select(func.sum(Data.token_count)))
            token_count_sum = token_count_sum.scalar()
+    except Exception as e:
+        # Log the error
+        raise RuntimeError(f"Error fetching token count: {str(e)}") from e

+    # Handle case when no data exists
+    if token_count_sum is None:
+        return 0

    return token_count_sum
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def fetch_token_count(db_engine) -> int:
"""
Fetches and sums token counts from the database.
Returns:
int: The total number of tokens across all documents.
"""
async with db_engine.get_async_session() as session:
token_count_sum = await session.execute(select(func.sum(Data.token_count)))
token_count_sum = token_count_sum.scalar()
return token_count_sum
async def fetch_token_count(db_engine) -> int:
"""
Fetches and sums token counts from the database.
Returns:
int: The total number of tokens across all documents.
"""
try:
async with db_engine.get_async_session() as session:
token_count_sum = await session.execute(select(func.sum(Data.token_count)))
token_count_sum = token_count_sum.scalar()
except Exception as e:
# Log the error
raise RuntimeError(f"Error fetching token count: {str(e)}") from e
# Handle case when no data exists
if token_count_sum is None:
return 0
return token_count_sum

Comment on lines +25 to +60
async def get_pipeline_run_metrics(pipeline_runs: list[PipelineRun], include_optional: bool):
db_engine = get_relational_engine()
graph_engine = await get_graph_engine()

metrics_for_pipeline_runs = []

async with db_engine.get_async_session() as session:
for pipeline_run in pipeline_runs:
existing_metrics = await session.execute(
select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id)
)
existing_metrics = existing_metrics.scalars().first()

if existing_metrics:
metrics_for_pipeline_runs.append(existing_metrics)
else:
graph_metrics = await graph_engine.get_graph_metrics(include_optional)
metrics = GraphMetrics(
id=pipeline_run.pipeline_run_id,
num_tokens=await fetch_token_count(db_engine),
num_nodes=graph_metrics["num_nodes"],
num_edges=graph_metrics["num_edges"],
mean_degree=graph_metrics["mean_degree"],
edge_density=graph_metrics["edge_density"],
num_connected_components=graph_metrics["num_connected_components"],
sizes_of_connected_components=graph_metrics["sizes_of_connected_components"],
num_selfloops=graph_metrics["num_selfloops"],
diameter=graph_metrics["diameter"],
avg_shortest_path_length=graph_metrics["avg_shortest_path_length"],
avg_clustering=graph_metrics["avg_clustering"],
)
metrics_for_pipeline_runs.append(metrics)
session.add(metrics)
await session.commit()

return metrics_for_pipeline_runs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling and transaction management in pipeline metrics collection.

The function has several issues:

  1. No error handling for database or graph engine operations
  2. Single commit at the end means failure on one pipeline run affects all others
  3. No validation for empty pipeline_runs list
  4. Missing type annotation for the return value
async def get_pipeline_run_metrics(pipeline_runs: list[PipelineRun], include_optional: bool):
+    """
+    Retrieve or calculate metrics for a list of pipeline runs.
+
+    Args:
+        pipeline_runs: List of PipelineRun objects to process
+        include_optional: Whether to include optional metrics in the calculation
+
+    Returns:
+        list[GraphMetrics]: List of metrics for the provided pipeline runs
+    """
+    if not pipeline_runs:
+        return []

    db_engine = get_relational_engine()
    graph_engine = await get_graph_engine()

    metrics_for_pipeline_runs = []

    async with db_engine.get_async_session() as session:
        for pipeline_run in pipeline_runs:
+            try:
                existing_metrics = await session.execute(
                    select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id)
                )
                existing_metrics = existing_metrics.scalars().first()

                if existing_metrics:
                    metrics_for_pipeline_runs.append(existing_metrics)
                else:
                    graph_metrics = await graph_engine.get_graph_metrics(include_optional)
                    metrics = GraphMetrics(
                        id=pipeline_run.pipeline_run_id,
                        num_tokens=await fetch_token_count(db_engine),
                        num_nodes=graph_metrics["num_nodes"],
                        num_edges=graph_metrics["num_edges"],
                        mean_degree=graph_metrics["mean_degree"],
                        edge_density=graph_metrics["edge_density"],
                        num_connected_components=graph_metrics["num_connected_components"],
                        sizes_of_connected_components=graph_metrics["sizes_of_connected_components"],
                        num_selfloops=graph_metrics["num_selfloops"],
                        diameter=graph_metrics["diameter"],
                        avg_shortest_path_length=graph_metrics["avg_shortest_path_length"],
                        avg_clustering=graph_metrics["avg_clustering"],
                    )
                    metrics_for_pipeline_runs.append(metrics)
                    session.add(metrics)
+                # Commit after each successful pipeline run to ensure partial success
+                await session.commit()
+            except Exception as e:
+                # Log the error but continue processing other pipeline runs
+                await session.rollback()
+                print(f"Error processing pipeline run {pipeline_run.pipeline_run_id}: {str(e)}")
+                continue
-        await session.commit()

    return metrics_for_pipeline_runs
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def get_pipeline_run_metrics(pipeline_runs: list[PipelineRun], include_optional: bool):
db_engine = get_relational_engine()
graph_engine = await get_graph_engine()
metrics_for_pipeline_runs = []
async with db_engine.get_async_session() as session:
for pipeline_run in pipeline_runs:
existing_metrics = await session.execute(
select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id)
)
existing_metrics = existing_metrics.scalars().first()
if existing_metrics:
metrics_for_pipeline_runs.append(existing_metrics)
else:
graph_metrics = await graph_engine.get_graph_metrics(include_optional)
metrics = GraphMetrics(
id=pipeline_run.pipeline_run_id,
num_tokens=await fetch_token_count(db_engine),
num_nodes=graph_metrics["num_nodes"],
num_edges=graph_metrics["num_edges"],
mean_degree=graph_metrics["mean_degree"],
edge_density=graph_metrics["edge_density"],
num_connected_components=graph_metrics["num_connected_components"],
sizes_of_connected_components=graph_metrics["sizes_of_connected_components"],
num_selfloops=graph_metrics["num_selfloops"],
diameter=graph_metrics["diameter"],
avg_shortest_path_length=graph_metrics["avg_shortest_path_length"],
avg_clustering=graph_metrics["avg_clustering"],
)
metrics_for_pipeline_runs.append(metrics)
session.add(metrics)
await session.commit()
return metrics_for_pipeline_runs
async def get_pipeline_run_metrics(pipeline_runs: list[PipelineRun], include_optional: bool):
"""
Retrieve or calculate metrics for a list of pipeline runs.
Args:
pipeline_runs: List of PipelineRun objects to process
include_optional: Whether to include optional metrics in the calculation
Returns:
list[GraphMetrics]: List of metrics for the provided pipeline runs
"""
if not pipeline_runs:
return []
db_engine = get_relational_engine()
graph_engine = await get_graph_engine()
metrics_for_pipeline_runs = []
async with db_engine.get_async_session() as session:
for pipeline_run in pipeline_runs:
try:
existing_metrics = await session.execute(
select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id)
)
existing_metrics = existing_metrics.scalars().first()
if existing_metrics:
metrics_for_pipeline_runs.append(existing_metrics)
else:
graph_metrics = await graph_engine.get_graph_metrics(include_optional)
metrics = GraphMetrics(
id=pipeline_run.pipeline_run_id,
num_tokens=await fetch_token_count(db_engine),
num_nodes=graph_metrics["num_nodes"],
num_edges=graph_metrics["num_edges"],
mean_degree=graph_metrics["mean_degree"],
edge_density=graph_metrics["edge_density"],
num_connected_components=graph_metrics["num_connected_components"],
sizes_of_connected_components=graph_metrics["sizes_of_connected_components"],
num_selfloops=graph_metrics["num_selfloops"],
diameter=graph_metrics["diameter"],
avg_shortest_path_length=graph_metrics["avg_shortest_path_length"],
avg_clustering=graph_metrics["avg_clustering"],
)
metrics_for_pipeline_runs.append(metrics)
session.add(metrics)
# Commit after each successful pipeline run to ensure partial success
await session.commit()
except Exception as e:
# Log the error but continue processing other pipeline runs
await session.rollback()
print(f"Error processing pipeline run {pipeline_run.pipeline_run_id}: {str(e)}")
continue
return metrics_for_pipeline_runs

@borisarzentar borisarzentar merged commit 6d7a68d into dev Mar 3, 2025
37 checks passed
@borisarzentar borisarzentar deleted the feat/cog-1260-descriptive-metrics-module branch March 3, 2025 18:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants