-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async cluster support #96
Conversation
WalkthroughThis pull request introduces cluster connection support for Redis in the asynchronous FalkorDB implementation. A new file, Changes
Possibly related PRs
Suggested reviewers
Poem
Tip ⚡🧪 Multi-step agentic review comment chat (experimental)
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
✅ Files skipped from review due to trivial changes (1)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @gkorland, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
Summary of Changes
This pull request adds support for asynchronous Redis Cluster functionality to the falkordb library. It introduces a new module falkordb/asyncio/cluster.py
which contains functions to detect if a connection is a cluster and to create a cluster connection from a standard Redis connection. The changes also modify falkordb/asyncio/falkordb.py
to incorporate cluster-specific parameters and logic during initialization, allowing the library to connect to and interact with Redis clusters.
Highlights
- Async Cluster Support: Adds the ability to connect to and interact with Redis clusters asynchronously.
- New Cluster Module: Introduces
falkordb/asyncio/cluster.py
with functions for cluster detection and connection creation. - Falkordb Initialization: Modifies
falkordb/asyncio/falkordb.py
to include cluster-specific parameters and connection logic.
Changelog
- falkordb/asyncio/cluster.py
- Created a new module to handle asynchronous Redis Cluster connections.
- Implemented
Is_Cluster
function to detect if a connection is a Redis Cluster. - Implemented
Cluster_Conn
function to create a RedisCluster instance from a Redis connection, extracting connection details and setting appropriate cluster parameters.
- falkordb/asyncio/falkordb.py
- Imported the new cluster module.
- Added cluster-specific parameters to the FalkorDB class constructor, including
cluster_error_retry_attempts
,startup_nodes
,require_full_coverage
,reinitialize_steps
,read_from_replicas
, andaddress_remap
. - Modified the initialization logic to check if the connection is a cluster using
Is_Cluster
and, if so, create aRedisCluster
connection usingCluster_Conn
.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
In realms of data, where clusters reside,
Async connections, a powerful stride.
FalkorDB awakens, with newfound might,
To weave through the nodes, day and night.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces asynchronous cluster support for FalkorDB, which is a valuable enhancement. The changes involve creating cluster connections from Redis connections and detecting if a connection is a cluster. Overall, the implementation seems reasonable, but there are a few areas that could be improved for clarity and robustness.
Summary of Findings
- Error Handling in
Is_Cluster
: TheIs_Cluster
function does not handle potential exceptions that might occur when callingconn.info()
. Adding error handling would make the function more robust. - Parameter Handling in
Cluster_Conn
: TheCluster_Conn
function pops parameters fromconnection_kwargs
without checking if they exist. This could lead to errors if the connection does not have these parameters. Consider adding checks or usingget
with default values. - Inconsistent Parameter Passing: In
falkordb.py
, thessl
parameter is passed directly toCluster_Conn
, but other parameters likeretry
are not. This inconsistency should be addressed for better code maintainability.
Merge Readiness
The pull request introduces important functionality, but there are a few issues that should be addressed before merging. Specifically, the error handling in Is_Cluster
and parameter handling in Cluster_Conn
should be improved. Additionally, the parameter passing in falkordb.py
should be made consistent. I am unable to approve this pull request, and recommend that others review and approve this code before merging.
falkordb/asyncio/cluster.py
Outdated
async def Is_Cluster(conn): | ||
info = await conn.info(section="server") | ||
return "redis_mode" in info and info["redis_mode"] == "cluster" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Is_Cluster
function does not handle potential exceptions that might occur when calling conn.info()
. Consider adding a try-except block to catch and log any exceptions, or return a default value (e.g., False
) if an error occurs. This will make the function more robust.
try:
info = await conn.info(section="server")
return "redis_mode" in info and info["redis_mode"] == "cluster"
except Exception:
return False
host = connection_kwargs.pop("host") | ||
port = connection_kwargs.pop("port") | ||
username = connection_kwargs.pop("username") | ||
password = connection_kwargs.pop("password") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Cluster_Conn
function pops parameters from connection_kwargs
without checking if they exist. If the connection does not have these parameters, it will raise an exception. Consider using connection_kwargs.get('host')
with a default value or checking if the key exists before popping.
host = connection_kwargs.get("host")
port = connection_kwargs.get("port")
username = connection_kwargs.get("username")
password = connection_kwargs.get("password")
conn = Cluster_Conn( | ||
conn, | ||
ssl, | ||
cluster_error_retry_attempts, | ||
startup_nodes, | ||
require_full_coverage, | ||
reinitialize_steps, | ||
read_from_replicas, | ||
address_remap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ssl
parameter is passed directly to Cluster_Conn
, but other parameters like retry
are not. To maintain consistency and make the code more maintainable, consider passing all relevant parameters explicitly.
conn = Cluster_Conn( | |
conn, | |
ssl, | |
cluster_error_retry_attempts, | |
startup_nodes, | |
require_full_coverage, | |
reinitialize_steps, | |
read_from_replicas, | |
address_remap, | |
conn = Cluster_Conn( | |
conn, | |
ssl, | |
cluster_error_retry_attempts=cluster_error_retry_attempts, | |
startup_nodes=startup_nodes, | |
require_full_coverage=require_full_coverage, | |
reinitialize_steps=reinitialize_steps, | |
read_from_replicas=read_from_replicas, | |
address_remap=address_remap, | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
falkordb/asyncio/falkordb.py (1)
110-142
: 🛠️ Refactor suggestionAdd cluster support to the
from_url
methodThe
from_url
method creates a Redis connection but doesn't check if it's a cluster or handle it appropriately like the__init__
method does. This could lead to inconsistent behavior when connecting through URLs.def from_url(cls, url: str, **kwargs) -> "FalkorDB": """ Creates a new FalkorDB instance from a URL. Args: cls: The class itself. url (str): The URL. kwargs: Additional keyword arguments to pass to the ``DB.from_url`` function. Returns: DB: A new DB instance. Usage example:: db = FalkorDB.from_url("falkor://[[username]:[password]]@localhost:6379") db = FalkorDB.from_url("falkors://[[username]:[password]]@localhost:6379") db = FalkorDB.from_url("unix://[username@]/path/to/socket.sock?db=0[&password=password]") """ db = cls() # switch from redis:// to falkordb:// if url.startswith('falkor://'): url = 'redis://' + url[len('falkor://'):] elif url.startswith('falkors://'): url = 'rediss://' + url[len('falkors://'):] conn = redis.from_url(url, **kwargs) + + # Check if the connection is a cluster and initialize it accordingly + if await Is_Cluster(conn): + ssl = url.startswith('rediss://') or url.startswith('falkors://') + conn = Cluster_Conn( + conn, + ssl, + **{k: v for k, v in kwargs.items() if k in [ + 'cluster_error_retry_attempts', + 'startup_nodes', + 'require_full_coverage', + 'reinitialize_steps', + 'read_from_replicas', + 'address_remap' + ]} + ) + db.connection = conn db.flushdb = conn.flushdb db.execute_command = conn.execute_command return db
🧹 Nitpick comments (4)
falkordb/asyncio/cluster.py (2)
5-8
: Update function name and comment to follow Python conventionsThe comment "detect if a connection is a sentinel" doesn't match the function's actual purpose (detecting cluster mode). Additionally, the function name uses PascalCase instead of snake_case, which is inconsistent with Python naming conventions (PEP 8).
- # detect if a connection is a sentinel - async def Is_Cluster(conn): + # detect if a connection is in cluster mode + async def is_cluster(conn): info = await conn.info(section="server") return "redis_mode" in info and info["redis_mode"] == "cluster"
11-57
: Improve function naming and remove commented out codeThe function uses PascalCase instead of snake_case, which is inconsistent with Python conventions. There are also commented out parameters and code that should either be removed or implemented.
- def Cluster_Conn( + def cluster_conn( conn, ssl, cluster_error_retry_attempts=3, startup_nodes=None, require_full_coverage=False, reinitialize_steps=5, read_from_replicas=False, - # dynamic_startup_nodes=True, - # url=None, address_remap=None, ): connection_kwargs = conn.connection_pool.connection_kwargs host = connection_kwargs.pop("host") port = connection_kwargs.pop("port") username = connection_kwargs.pop("username") password = connection_kwargs.pop("password") retry = connection_kwargs.pop("retry", None) - # retry_on_timeout = connection_kwargs.pop("retry_on_timeout", None) + retry_on_timeout = connection_kwargs.pop("retry_on_timeout", None) retry_on_error = connection_kwargs.pop( "retry_on_error",Additionally, consider whether this function should be async since it's part of an asyncio module, although the underlying RedisCluster constructor might not be async.
falkordb/asyncio/falkordb.py (2)
2-2
: Avoid wildcard importsWildcard imports make it difficult to determine which names are present in the namespace and can lead to unexpected name conflicts. They're also discouraged by PEP 8.
- from .cluster import * + from .cluster import Is_Cluster, Cluster_Conn🧰 Tools
🪛 Ruff (0.8.2)
2-2:
from .cluster import *
used; unable to detect undefined names(F403)
11-27
: Add documentation for cluster parametersThe class docstring doesn't include any information about the newly added cluster parameters. Update the documentation to include these parameters and their purpose.
class FalkorDB(): """ Asynchronous FalkorDB Class for interacting with a FalkorDB server. Usage example:: from falkordb.asyncio import FalkorDB # connect to the database and select the 'social' graph db = FalkorDB() graph = db.select_graph("social") # get a single 'Person' node from the graph and print its name response = await graph.query("MATCH (n:Person) RETURN n LIMIT 1") result = response.result_set person = result[0][0] print(node.properties['name']) + + Cluster Configuration: + For Redis Cluster support, the following parameters can be used: + - cluster_error_retry_attempts: Number of attempts to retry on cluster errors + - startup_nodes: List of nodes to initially connect to + - require_full_coverage: If True, requires full cluster coverage for operations + - reinitialize_steps: Number of steps to reinitialize the cluster + - read_from_replicas: If True, enables read from replica nodes + - address_remap: Dictionary of address remapping {old_addr: new_addr} """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
falkordb/asyncio/cluster.py
(1 hunks)falkordb/asyncio/falkordb.py
(3 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
falkordb/asyncio/falkordb.py
2-2: from .cluster import *
used; unable to detect undefined names
(F403)
94-94: Is_Cluster
may be undefined, or defined from star imports
(F405)
95-95: Cluster_Conn
may be undefined, or defined from star imports
(F405)
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #96 +/- ##
==========================================
- Coverage 92.26% 91.98% -0.28%
==========================================
Files 35 36 +1
Lines 2612 2634 +22
==========================================
+ Hits 2410 2423 +13
- Misses 202 211 +9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
falkordb/asyncio/cluster.py (2)
8-17
: 🛠️ Refactor suggestionHandle potential exceptions and resource management
The
Is_Cluster
function doesn't handle exceptions and doesn't properly close the Redis connection. This can lead to resource leaks and unexpected errors.def Is_Cluster(conn: redis.Redis): pool = conn.connection_pool kwargs = pool.connection_kwargs.copy() - # Create a synchronous Redis client with the same parameters - # as the connection pool just to keep Is_Cluster synchronous - info = sync_redis.Redis(**kwargs).info(section="server") - - return "redis_mode" in info and info["redis_mode"] == "cluster" + # Create a synchronous Redis client with the same parameters + # as the connection pool just to keep Is_Cluster synchronous + sync_client = None + try: + sync_client = sync_redis.Redis(**kwargs) + info = sync_client.info(section="server") + return "redis_mode" in info and info["redis_mode"] == "cluster" + except Exception: + return False + finally: + if sync_client: + sync_client.close()
31-36
: 🛠️ Refactor suggestionUse get() instead of pop() for safer parameter extraction
The function pops parameters from
connection_kwargs
without checking if they exist. This could lead to exceptions if any of these keys are missing.connection_kwargs = conn.connection_pool.connection_kwargs - host = connection_kwargs.pop("host") - port = connection_kwargs.pop("port") - username = connection_kwargs.pop("username") - password = connection_kwargs.pop("password") + host = connection_kwargs.get("host") + port = connection_kwargs.get("port") + username = connection_kwargs.get("username") + password = connection_kwargs.get("password") + + # Remove used keys if they exist + for key in ["host", "port", "username", "password"]: + connection_kwargs.pop(key, None)
🧹 Nitpick comments (4)
falkordb/asyncio/cluster.py (4)
7-8
: Fix misleading commentThe comment mentions "sentinel" but the function actually checks for cluster mode. Update the comment to accurately reflect the function's purpose.
-# detect if a connection is a sentinel +# detect if a connection is in cluster mode def Is_Cluster(conn: redis.Redis):
20-31
: Add docstring with parameter descriptionsThe
Cluster_Conn
function lacks a docstring explaining its purpose and parameters. This makes it harder for other developers to understand how to use this function properly.# create a cluster connection from a Redis connection def Cluster_Conn( conn, ssl, cluster_error_retry_attempts=3, startup_nodes=None, require_full_coverage=False, reinitialize_steps=5, read_from_replicas=False, address_remap=None, ): + """ + Create a Redis Cluster connection from a standard Redis connection. + + Args: + conn: The existing Redis connection + ssl: Whether to use SSL for the cluster connection + cluster_error_retry_attempts: Number of attempts to retry on cluster errors + startup_nodes: List of nodes to try to connect to initially + require_full_coverage: Whether to require all slots to be covered by the cluster + reinitialize_steps: Number of steps to try before reinitializing connection + read_from_replicas: Whether read commands can be sent to replicas + address_remap: Dictionary of remapped node addresses (for NAT environments) + + Returns: + A configured RedisCluster instance + """ connection_kwargs = conn.connection_pool.connection_kwargs
37-47
: Add validation for retry parameterThe code extracts
retry
andretry_on_error
but doesn't validate if they're appropriate for the RedisCluster constructor. Consider adding validation or documentation about expected types.- retry = connection_kwargs.pop("retry", None) + # Extract retry configuration with validation + retry = connection_kwargs.pop("retry", None) + # Validate retry is compatible with RedisCluster if specified + if retry is not None and not hasattr(retry, 'retry'): + raise ValueError("retry must be a valid Retry object with retry method") + retry_on_error = connection_kwargs.pop( "retry_on_error", [
1-6
: Consider organizing imports according to standard conventionsThe imports are not organized according to common Python conventions. Consider organizing them by standard library, third-party, and local imports with alphabetical ordering within each group.
+import socket + from redis.asyncio.cluster import RedisCluster import redis.exceptions as redis_exceptions import redis.asyncio as redis import redis as sync_redis -import socket
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
.gitignore
(1 hunks)falkordb/asyncio/cluster.py
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- .gitignore
return RedisCluster( | ||
host=host, | ||
port=port, | ||
username=username, | ||
password=password, | ||
decode_responses=True, | ||
ssl=ssl, | ||
retry=retry, | ||
retry_on_error=retry_on_error, | ||
require_full_coverage=require_full_coverage, | ||
reinitialize_steps=reinitialize_steps, | ||
read_from_replicas=read_from_replicas, | ||
address_remap=address_remap, | ||
startup_nodes=startup_nodes, | ||
cluster_error_retry_attempts=cluster_error_retry_attempts, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for cluster connection creation
The function doesn't handle potential exceptions when creating the RedisCluster instance. This makes error diagnosis harder and doesn't provide a graceful fallback.
- return RedisCluster(
- host=host,
- port=port,
- username=username,
- password=password,
- decode_responses=True,
- ssl=ssl,
- retry=retry,
- retry_on_error=retry_on_error,
- require_full_coverage=require_full_coverage,
- reinitialize_steps=reinitialize_steps,
- read_from_replicas=read_from_replicas,
- address_remap=address_remap,
- startup_nodes=startup_nodes,
- cluster_error_retry_attempts=cluster_error_retry_attempts,
- )
+ try:
+ return RedisCluster(
+ host=host,
+ port=port,
+ username=username,
+ password=password,
+ decode_responses=True,
+ ssl=ssl,
+ retry=retry,
+ retry_on_error=retry_on_error,
+ require_full_coverage=require_full_coverage,
+ reinitialize_steps=reinitialize_steps,
+ read_from_replicas=read_from_replicas,
+ address_remap=address_remap,
+ startup_nodes=startup_nodes,
+ cluster_error_retry_attempts=cluster_error_retry_attempts,
+ )
+ except Exception as e:
+ # Provide more context for the error
+ raise redis_exceptions.ConnectionError(
+ f"Failed to initialize Redis cluster connection: {str(e)}"
+ ) from e
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
return RedisCluster( | |
host=host, | |
port=port, | |
username=username, | |
password=password, | |
decode_responses=True, | |
ssl=ssl, | |
retry=retry, | |
retry_on_error=retry_on_error, | |
require_full_coverage=require_full_coverage, | |
reinitialize_steps=reinitialize_steps, | |
read_from_replicas=read_from_replicas, | |
address_remap=address_remap, | |
startup_nodes=startup_nodes, | |
cluster_error_retry_attempts=cluster_error_retry_attempts, | |
) | |
try: | |
return RedisCluster( | |
host=host, | |
port=port, | |
username=username, | |
password=password, | |
decode_responses=True, | |
ssl=ssl, | |
retry=retry, | |
retry_on_error=retry_on_error, | |
require_full_coverage=require_full_coverage, | |
reinitialize_steps=reinitialize_steps, | |
read_from_replicas=read_from_replicas, | |
address_remap=address_remap, | |
startup_nodes=startup_nodes, | |
cluster_error_retry_attempts=cluster_error_retry_attempts, | |
) | |
except Exception as e: | |
# Provide more context for the error | |
raise redis_exceptions.ConnectionError( | |
f"Failed to initialize Redis cluster connection: {str(e)}" | |
) from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No tests ?
[ | ||
ConnectionRefusedError, | ||
ConnectionError, | ||
TimeoutError, | ||
socket.timeout, | ||
redis_exceptions.ConnectionError, | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this ?
startup_nodes=None, | ||
require_full_coverage=False, | ||
reinitialize_steps=5, | ||
read_from_replicas=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why False ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following the none async configuration
There are no tests for cluster in general, I tested it manually. |
fix #95
Summary by CodeRabbit
.gitignore
to includepoetry.lock
and retain__pycache__
.