-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: Store descriptive metrics identified by pipeline run id [cog-1260] #582
Conversation
WalkthroughThis update adds a new pipeline run metrics functionality. A new file with asynchronous functions is introduced to calculate and store descriptive metrics for pipeline runs. The main example script now assigns the output of the pipeline run process, checks if graph metrics are enabled, and conditionally calls the new metrics function. In contrast, the previously available Changes
Sequence Diagram(s)sequenceDiagram
participant Ex as "Dynamic Steps Example"
participant Cog as "Cognee Module"
participant Ops as "Metrics Operations"
participant DB as "Database Engine"
Ex->>Cog: await cognify()
Cog-->>Ex: pipeline_run
Ex->>Cog: Check if "graph_metrics" enabled
Cog->>Ops: get_pipeline_run_metrics(pipeline_run, include_optional=True)
Ops->>DB: execute fetch_token_count query
DB-->>Ops: return token count
Ops->>DB: commit new GraphMetrics record
Ops-->>Cog: return metrics
Cog-->>Ex: proceed with updated pipeline_run metrics
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
cognee/modules/data/operations/store_descriptive_metrics.py (3)
27-62
: Store descriptive metrics implementation.The implementation of
store_descriptive_metrics
follows good practices:
- It properly handles the case where metrics already exist for a pipeline run
- It fetches token count using the previously defined function
- It populates a GraphMetrics object with all required metrics
- It commits changes to the database and returns the metrics
One suggestion for improvement:
Consider adding error handling for database operations and the call to
graph_engine.get_graph_metrics()
. These external operations could fail and should be handled gracefully.async def store_descriptive_metrics(pipeline_runs: list[PipelineRun], include_optional: bool): db_engine = get_relational_engine() graph_engine = await get_graph_engine() metrics_for_pipeline_runs = [] async with db_engine.get_async_session() as session: - for pipeline_run in pipeline_runs: - existing_metrics = await session.execute( - select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id) - ) - existing_metrics = existing_metrics.scalars().first() + try: + for pipeline_run in pipeline_runs: + try: + existing_metrics = await session.execute( + select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id) + ) + existing_metrics = existing_metrics.scalars().first() - if existing_metrics: - metrics_for_pipeline_runs.append(existing_metrics) - else: - graph_metrics = await graph_engine.get_graph_metrics(include_optional) - metrics = GraphMetrics( - id=pipeline_run.pipeline_run_id, - num_tokens=await fetch_token_count(db_engine), - num_nodes=graph_metrics["num_nodes"], - num_edges=graph_metrics["num_edges"], - mean_degree=graph_metrics["mean_degree"], - edge_density=graph_metrics["edge_density"], - num_connected_components=graph_metrics["num_connected_components"], - sizes_of_connected_components=graph_metrics["sizes_of_connected_components"], - num_selfloops=graph_metrics["num_selfloops"], - diameter=graph_metrics["diameter"], - avg_shortest_path_length=graph_metrics["avg_shortest_path_length"], - avg_clustering=graph_metrics["avg_clustering"], - ) - metrics_for_pipeline_runs.append(metrics) - session.add(metrics) - await session.commit() + if existing_metrics: + metrics_for_pipeline_runs.append(existing_metrics) + else: + try: + graph_metrics = await graph_engine.get_graph_metrics(include_optional) + token_count = await fetch_token_count(db_engine) + metrics = GraphMetrics( + id=pipeline_run.pipeline_run_id, + num_tokens=token_count, + num_nodes=graph_metrics["num_nodes"], + num_edges=graph_metrics["num_edges"], + mean_degree=graph_metrics["mean_degree"], + edge_density=graph_metrics["edge_density"], + num_connected_components=graph_metrics["num_connected_components"], + sizes_of_connected_components=graph_metrics["sizes_of_connected_components"], + num_selfloops=graph_metrics["num_selfloops"], + diameter=graph_metrics["diameter"], + avg_shortest_path_length=graph_metrics["avg_shortest_path_length"], + avg_clustering=graph_metrics["avg_clustering"], + ) + metrics_for_pipeline_runs.append(metrics) + session.add(metrics) + except Exception as e: + # Log the error but continue processing other pipeline runs + print(f"Error fetching graph metrics for pipeline run {pipeline_run.pipeline_run_id}: {str(e)}") + except Exception as e: + print(f"Error processing pipeline run {pipeline_run.pipeline_run_id}: {str(e)}") + await session.commit() + except Exception as e: + print(f"Database error: {str(e)}") + await session.rollback()
33-38
: Consider adding pagination for large result sets.When fetching metrics for many pipeline runs, consider implementing pagination to avoid potential memory issues with large result sets.
For applications where the number of pipeline runs could be large, consider implementing a pagination mechanism or processing pipeline runs in batches to avoid memory issues.
40-60
: Transaction handling considerations.The function correctly uses a single transaction for all operations, which is efficient. However, if processing a large number of pipeline runs, consider adding a batch commit strategy.
If the function is expected to process many pipeline runs at once, consider implementing batch processing with intermediate commits to prevent large transactions:
BATCH_SIZE = 100 # Adjust based on expected load async def store_descriptive_metrics(pipeline_runs: list[PipelineRun], include_optional: bool): db_engine = get_relational_engine() graph_engine = await get_graph_engine() metrics_for_pipeline_runs = [] # Process pipeline runs in batches for i in range(0, len(pipeline_runs), BATCH_SIZE): batch = pipeline_runs[i:i+BATCH_SIZE] async with db_engine.get_async_session() as session: for pipeline_run in batch: # Existing logic here await session.commit() return metrics_for_pipeline_runs
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
cognee/__init__.py
(1 hunks)cognee/modules/data/methods/__init__.py
(0 hunks)cognee/modules/data/methods/store_descriptive_metrics.py
(0 hunks)cognee/modules/data/operations/store_descriptive_metrics.py
(1 hunks)examples/python/dynamic_steps_example.py
(2 hunks)
💤 Files with no reviewable changes (2)
- cognee/modules/data/methods/init.py
- cognee/modules/data/methods/store_descriptive_metrics.py
⏰ Context from checks skipped due to timeout of 90000ms (27)
- GitHub Check: run_multimedia_example_test / test
- GitHub Check: run_networkx_metrics_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: Test on macos-15
- GitHub Check: Test on macos-15
- GitHub Check: run_dynamic_steps_example_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_simple_example_test / test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: Test on macos-13
- GitHub Check: Test on macos-15
- GitHub Check: Test on macos-13
- GitHub Check: run_notebook_test / test
- GitHub Check: Test on ubuntu-22.04
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: Test on ubuntu-22.04
- GitHub Check: run_eval_framework_test / test
- GitHub Check: test
- GitHub Check: Test on macos-13
- GitHub Check: windows-latest
- GitHub Check: Test on ubuntu-22.04
- GitHub Check: docker-compose-test
🔇 Additional comments (7)
cognee/__init__.py (1)
11-11
: Import looks good.The addition of this import makes the relocated
store_descriptive_metrics
function accessible from the top-level package, maintaining a clean public API.examples/python/dynamic_steps_example.py (4)
183-183
: Store pipeline_runs result for later use.Good change to store the
cognify()
result as it's now needed for the new graph metrics functionality.
186-190
: New step for calculating and storing descriptive metrics.This is a good addition that implements the new functionality for calculating and storing descriptive metrics. The comments accurately reflect what the step does, and the conditional execution based on the
graph_metrics
flag follows the pattern of other steps in this file.
191-191
: Updated step numbering.The step numbering has been correctly updated to reflect the addition of the new step.
209-209
: New step enabled in configuration.The
graph_metrics
step has been properly added to thesteps_to_enable
dictionary and tied to therebuild_kg
flag, consistent with the pattern used for other related steps.cognee/modules/data/operations/store_descriptive_metrics.py (2)
1-10
: Imports look well-structured.The imports are organized logically, grouping related modules and avoiding unnecessary dependencies.
12-25
: Well-documented token count function.The
fetch_token_count
function is well-documented with a clear purpose and return type. It efficiently fetches and sums token counts using asynchronous database session handling.
@@ -180,10 +180,15 @@ async def main(enable_steps): | |||
|
|||
# Step 3: Create knowledge graph | |||
if enable_steps.get("cognify"): | |||
await cognee.cognify() | |||
pipeline_runs = await cognee.cognify() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipeline_run
, it's single instance right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, cognify returns a list of pipeline_run instances.
In this case yes, it is a list of length 1. Sure I can rename it here, I just wanted to match the next function's signature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
cognee/modules/data/operations/get_pipeline_run_metrics.py (1)
41-56
: Add defensive programming for graph metrics dictionary access.The code assumes all keys will be present in the graph_metrics dictionary. Consider adding validation or using .get() with default values to handle missing keys gracefully.
graph_metrics = await graph_engine.get_graph_metrics(include_optional) metrics = GraphMetrics( id=pipeline_run.pipeline_run_id, num_tokens=await fetch_token_count(db_engine), - num_nodes=graph_metrics["num_nodes"], - num_edges=graph_metrics["num_edges"], - mean_degree=graph_metrics["mean_degree"], - edge_density=graph_metrics["edge_density"], - num_connected_components=graph_metrics["num_connected_components"], - sizes_of_connected_components=graph_metrics["sizes_of_connected_components"], - num_selfloops=graph_metrics["num_selfloops"], - diameter=graph_metrics["diameter"], - avg_shortest_path_length=graph_metrics["avg_shortest_path_length"], - avg_clustering=graph_metrics["avg_clustering"], + num_nodes=graph_metrics.get("num_nodes", 0), + num_edges=graph_metrics.get("num_edges", 0), + mean_degree=graph_metrics.get("mean_degree", 0.0), + edge_density=graph_metrics.get("edge_density", 0.0), + num_connected_components=graph_metrics.get("num_connected_components", 0), + sizes_of_connected_components=graph_metrics.get("sizes_of_connected_components", []), + num_selfloops=graph_metrics.get("num_selfloops", 0), + diameter=graph_metrics.get("diameter", 0.0), + avg_shortest_path_length=graph_metrics.get("avg_shortest_path_length", 0.0), + avg_clustering=graph_metrics.get("avg_clustering", 0.0), )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cognee/__init__.py
(1 hunks)cognee/modules/data/operations/get_pipeline_run_metrics.py
(1 hunks)examples/python/dynamic_steps_example.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- cognee/init.py
- examples/python/dynamic_steps_example.py
⏰ Context from checks skipped due to timeout of 90000ms (29)
- GitHub Check: Test on macos-15
- GitHub Check: run_networkx_metrics_test / test
- GitHub Check: run_simple_example_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_dynamic_steps_example_test / test
- GitHub Check: run_multimedia_example_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: run_notebook_test / test
- GitHub Check: Test on macos-13
- GitHub Check: Test on macos-15
- GitHub Check: Test on macos-15
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: windows-latest
- GitHub Check: Test on ubuntu-22.04
- GitHub Check: run_eval_framework_test / test
- GitHub Check: Test on macos-13
- GitHub Check: Test on macos-13
- GitHub Check: test
- GitHub Check: Test on ubuntu-22.04
- GitHub Check: Test on ubuntu-22.04
- GitHub Check: Test on ubuntu-22.04
- GitHub Check: test
- GitHub Check: test
- GitHub Check: test
- GitHub Check: Build Cognee Backend Docker App Image
- GitHub Check: docker-compose-test
async def fetch_token_count(db_engine) -> int: | ||
""" | ||
Fetches and sums token counts from the database. | ||
|
||
Returns: | ||
int: The total number of tokens across all documents. | ||
""" | ||
|
||
async with db_engine.get_async_session() as session: | ||
token_count_sum = await session.execute(select(func.sum(Data.token_count))) | ||
token_count_sum = token_count_sum.scalar() | ||
|
||
return token_count_sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Function needs error handling for database operations.
The fetch_token_count
function could encounter database errors or return None if there are no records. Consider adding try/except blocks and handling the case when token_count_sum is None.
async def fetch_token_count(db_engine) -> int:
"""
Fetches and sums token counts from the database.
Returns:
int: The total number of tokens across all documents.
"""
+ try:
async with db_engine.get_async_session() as session:
token_count_sum = await session.execute(select(func.sum(Data.token_count)))
token_count_sum = token_count_sum.scalar()
+ except Exception as e:
+ # Log the error
+ raise RuntimeError(f"Error fetching token count: {str(e)}") from e
+ # Handle case when no data exists
+ if token_count_sum is None:
+ return 0
return token_count_sum
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def fetch_token_count(db_engine) -> int: | |
""" | |
Fetches and sums token counts from the database. | |
Returns: | |
int: The total number of tokens across all documents. | |
""" | |
async with db_engine.get_async_session() as session: | |
token_count_sum = await session.execute(select(func.sum(Data.token_count))) | |
token_count_sum = token_count_sum.scalar() | |
return token_count_sum | |
async def fetch_token_count(db_engine) -> int: | |
""" | |
Fetches and sums token counts from the database. | |
Returns: | |
int: The total number of tokens across all documents. | |
""" | |
try: | |
async with db_engine.get_async_session() as session: | |
token_count_sum = await session.execute(select(func.sum(Data.token_count))) | |
token_count_sum = token_count_sum.scalar() | |
except Exception as e: | |
# Log the error | |
raise RuntimeError(f"Error fetching token count: {str(e)}") from e | |
# Handle case when no data exists | |
if token_count_sum is None: | |
return 0 | |
return token_count_sum |
async def get_pipeline_run_metrics(pipeline_runs: list[PipelineRun], include_optional: bool): | ||
db_engine = get_relational_engine() | ||
graph_engine = await get_graph_engine() | ||
|
||
metrics_for_pipeline_runs = [] | ||
|
||
async with db_engine.get_async_session() as session: | ||
for pipeline_run in pipeline_runs: | ||
existing_metrics = await session.execute( | ||
select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id) | ||
) | ||
existing_metrics = existing_metrics.scalars().first() | ||
|
||
if existing_metrics: | ||
metrics_for_pipeline_runs.append(existing_metrics) | ||
else: | ||
graph_metrics = await graph_engine.get_graph_metrics(include_optional) | ||
metrics = GraphMetrics( | ||
id=pipeline_run.pipeline_run_id, | ||
num_tokens=await fetch_token_count(db_engine), | ||
num_nodes=graph_metrics["num_nodes"], | ||
num_edges=graph_metrics["num_edges"], | ||
mean_degree=graph_metrics["mean_degree"], | ||
edge_density=graph_metrics["edge_density"], | ||
num_connected_components=graph_metrics["num_connected_components"], | ||
sizes_of_connected_components=graph_metrics["sizes_of_connected_components"], | ||
num_selfloops=graph_metrics["num_selfloops"], | ||
diameter=graph_metrics["diameter"], | ||
avg_shortest_path_length=graph_metrics["avg_shortest_path_length"], | ||
avg_clustering=graph_metrics["avg_clustering"], | ||
) | ||
metrics_for_pipeline_runs.append(metrics) | ||
session.add(metrics) | ||
await session.commit() | ||
|
||
return metrics_for_pipeline_runs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error handling and transaction management in pipeline metrics collection.
The function has several issues:
- No error handling for database or graph engine operations
- Single commit at the end means failure on one pipeline run affects all others
- No validation for empty pipeline_runs list
- Missing type annotation for the return value
async def get_pipeline_run_metrics(pipeline_runs: list[PipelineRun], include_optional: bool):
+ """
+ Retrieve or calculate metrics for a list of pipeline runs.
+
+ Args:
+ pipeline_runs: List of PipelineRun objects to process
+ include_optional: Whether to include optional metrics in the calculation
+
+ Returns:
+ list[GraphMetrics]: List of metrics for the provided pipeline runs
+ """
+ if not pipeline_runs:
+ return []
db_engine = get_relational_engine()
graph_engine = await get_graph_engine()
metrics_for_pipeline_runs = []
async with db_engine.get_async_session() as session:
for pipeline_run in pipeline_runs:
+ try:
existing_metrics = await session.execute(
select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id)
)
existing_metrics = existing_metrics.scalars().first()
if existing_metrics:
metrics_for_pipeline_runs.append(existing_metrics)
else:
graph_metrics = await graph_engine.get_graph_metrics(include_optional)
metrics = GraphMetrics(
id=pipeline_run.pipeline_run_id,
num_tokens=await fetch_token_count(db_engine),
num_nodes=graph_metrics["num_nodes"],
num_edges=graph_metrics["num_edges"],
mean_degree=graph_metrics["mean_degree"],
edge_density=graph_metrics["edge_density"],
num_connected_components=graph_metrics["num_connected_components"],
sizes_of_connected_components=graph_metrics["sizes_of_connected_components"],
num_selfloops=graph_metrics["num_selfloops"],
diameter=graph_metrics["diameter"],
avg_shortest_path_length=graph_metrics["avg_shortest_path_length"],
avg_clustering=graph_metrics["avg_clustering"],
)
metrics_for_pipeline_runs.append(metrics)
session.add(metrics)
+ # Commit after each successful pipeline run to ensure partial success
+ await session.commit()
+ except Exception as e:
+ # Log the error but continue processing other pipeline runs
+ await session.rollback()
+ print(f"Error processing pipeline run {pipeline_run.pipeline_run_id}: {str(e)}")
+ continue
- await session.commit()
return metrics_for_pipeline_runs
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def get_pipeline_run_metrics(pipeline_runs: list[PipelineRun], include_optional: bool): | |
db_engine = get_relational_engine() | |
graph_engine = await get_graph_engine() | |
metrics_for_pipeline_runs = [] | |
async with db_engine.get_async_session() as session: | |
for pipeline_run in pipeline_runs: | |
existing_metrics = await session.execute( | |
select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id) | |
) | |
existing_metrics = existing_metrics.scalars().first() | |
if existing_metrics: | |
metrics_for_pipeline_runs.append(existing_metrics) | |
else: | |
graph_metrics = await graph_engine.get_graph_metrics(include_optional) | |
metrics = GraphMetrics( | |
id=pipeline_run.pipeline_run_id, | |
num_tokens=await fetch_token_count(db_engine), | |
num_nodes=graph_metrics["num_nodes"], | |
num_edges=graph_metrics["num_edges"], | |
mean_degree=graph_metrics["mean_degree"], | |
edge_density=graph_metrics["edge_density"], | |
num_connected_components=graph_metrics["num_connected_components"], | |
sizes_of_connected_components=graph_metrics["sizes_of_connected_components"], | |
num_selfloops=graph_metrics["num_selfloops"], | |
diameter=graph_metrics["diameter"], | |
avg_shortest_path_length=graph_metrics["avg_shortest_path_length"], | |
avg_clustering=graph_metrics["avg_clustering"], | |
) | |
metrics_for_pipeline_runs.append(metrics) | |
session.add(metrics) | |
await session.commit() | |
return metrics_for_pipeline_runs | |
async def get_pipeline_run_metrics(pipeline_runs: list[PipelineRun], include_optional: bool): | |
""" | |
Retrieve or calculate metrics for a list of pipeline runs. | |
Args: | |
pipeline_runs: List of PipelineRun objects to process | |
include_optional: Whether to include optional metrics in the calculation | |
Returns: | |
list[GraphMetrics]: List of metrics for the provided pipeline runs | |
""" | |
if not pipeline_runs: | |
return [] | |
db_engine = get_relational_engine() | |
graph_engine = await get_graph_engine() | |
metrics_for_pipeline_runs = [] | |
async with db_engine.get_async_session() as session: | |
for pipeline_run in pipeline_runs: | |
try: | |
existing_metrics = await session.execute( | |
select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id) | |
) | |
existing_metrics = existing_metrics.scalars().first() | |
if existing_metrics: | |
metrics_for_pipeline_runs.append(existing_metrics) | |
else: | |
graph_metrics = await graph_engine.get_graph_metrics(include_optional) | |
metrics = GraphMetrics( | |
id=pipeline_run.pipeline_run_id, | |
num_tokens=await fetch_token_count(db_engine), | |
num_nodes=graph_metrics["num_nodes"], | |
num_edges=graph_metrics["num_edges"], | |
mean_degree=graph_metrics["mean_degree"], | |
edge_density=graph_metrics["edge_density"], | |
num_connected_components=graph_metrics["num_connected_components"], | |
sizes_of_connected_components=graph_metrics["sizes_of_connected_components"], | |
num_selfloops=graph_metrics["num_selfloops"], | |
diameter=graph_metrics["diameter"], | |
avg_shortest_path_length=graph_metrics["avg_shortest_path_length"], | |
avg_clustering=graph_metrics["avg_clustering"], | |
) | |
metrics_for_pipeline_runs.append(metrics) | |
session.add(metrics) | |
# Commit after each successful pipeline run to ensure partial success | |
await session.commit() | |
except Exception as e: | |
# Log the error but continue processing other pipeline runs | |
await session.rollback() | |
print(f"Error processing pipeline run {pipeline_run.pipeline_run_id}: {str(e)}") | |
continue | |
return metrics_for_pipeline_runs |
Description
DCO Affirmation
I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin
Summary by CodeRabbit
New Features
Chores