Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/cloudflare kv support #2642

Draft
wants to merge 35 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
7e5e7ee
Added initial Cloudflare KV storage provider
Mrinank-Bhowmick Feb 25, 2025
e0d5667
Merge branch 'mastra-ai:main' into feat/cloudflare-kv-support
Mrinank-Bhowmick Feb 25, 2025
2b0de39
Merge branch 'mastra-ai:main' into feat/cloudflare-kv-support
Mrinank-Bhowmick Feb 26, 2025
4fda684
Merge branch 'mastra-ai:main' into feat/cloudflare-kv-support
Mrinank-Bhowmick Feb 28, 2025
d9eb0ab
rewrite
Mrinank-Bhowmick Mar 4, 2025
582ce3d
Update index.ts
Mrinank-Bhowmick Mar 5, 2025
04d1ec4
Update index.ts
Mrinank-Bhowmick Mar 5, 2025
6ff92cf
Merge branch 'main' into feat/cloudflare-kv-support
Mrinank-Bhowmick Mar 5, 2025
6441695
Merge branch 'main' into feat/cloudflare-kv-support
NikAiyer Mar 11, 2025
f04e542
Merge branch 'main' into feat/cloudflare-kv-support
Mrinank-Bhowmick Mar 11, 2025
ee03615
Merge branch 'main' into feat/cloudflare-kv-support
NikAiyer Mar 14, 2025
a20c15c
Merge branch 'main' into feat/cloudflare-kv-support
NikAiyer Mar 14, 2025
4e38054
added changeset
NikAiyer Mar 14, 2025
e55839d
initial test commit
NikAiyer Mar 14, 2025
9e7445c
Merge branch 'main' into feat/cloudflare-kv-support
NikAiyer Mar 14, 2025
bf8fc62
updated implementation and tests
NikAiyer Mar 15, 2025
65920dd
updated tests
NikAiyer Mar 15, 2025
a27f3ed
update testing to prepare for bindings
NikAiyer Mar 15, 2025
5986833
update test
NikAiyer Mar 15, 2025
738fc35
add binding support to kv
NikAiyer Mar 16, 2025
c3ab31e
simplified schema creation
NikAiyer Mar 16, 2025
72f772f
update kv
NikAiyer Mar 16, 2025
f28b525
updated error handling
NikAiyer Mar 16, 2025
1fd99ef
updated validation
NikAiyer Mar 16, 2025
98f26b0
more validation
NikAiyer Mar 16, 2025
8797fe7
update tests
NikAiyer Mar 16, 2025
6d73796
updated tests
NikAiyer Mar 16, 2025
0e70d6a
updated waiting for test
NikAiyer Mar 16, 2025
b41fb81
update tests
NikAiyer Mar 16, 2025
5a4f405
added miniflare
NikAiyer Mar 16, 2025
cb74140
updated tests
NikAiyer Mar 17, 2025
8b59392
updated testing and schema validation
NikAiyer Mar 17, 2025
85b278c
updated message ordering
NikAiyer Mar 17, 2025
1109584
updated testing
NikAiyer Mar 17, 2025
b550643
merge
NikAiyer Mar 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
simplified schema creation
  • Loading branch information
NikAiyer committed Mar 16, 2025
commit c3ab31eb922e64a6b6cae8ee4960eed9bf79b13d
1 change: 0 additions & 1 deletion stores/cloudflare/src/storage/binding-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ describe.skip('CloudflareStore Workers Binding', () => {
`${TEST_CONFIG.namespacePrefix}_mastra_threads`,
`${TEST_CONFIG.namespacePrefix}_mastra_workflows`,
`${TEST_CONFIG.namespacePrefix}_mastra_evals`,
`${TEST_CONFIG.namespacePrefix}_mastra_schemas`,
];

for (const namespace of namespaces) {
Expand Down
92 changes: 51 additions & 41 deletions stores/cloudflare/src/storage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,18 @@ export class CloudflareStore extends MastraStorage {

private async listNamespaces() {
if (this.bindings) {
// For Workers API, we already have the namespaces bound
// For Workers API, return a response matching REST API format
return {
result: Object.keys(this.bindings).map(name => ({
id: name,
title: name,
// Other fields that REST API returns but we don't need
created_on: new Date().toISOString(), // Current time as creation date
modified_on: new Date().toISOString(),
supports_url_encoding: true, // Match REST API behavior
})),
success: true,
errors: [],
messages: [],
};
}
// For REST API
Expand All @@ -67,7 +72,7 @@ export class CloudflareStore extends MastraStorage {
if (!result) return null;
return JSON.stringify(result);
} else {
const namespaceId = await this.getNamespaceId({ tableName });
const namespaceId = await this.getNamespaceId(tableName);
const response = await this.client!.kv.namespaces.values.get(namespaceId, key, {
account_id: this.accountId!,
});
Expand All @@ -81,7 +86,7 @@ export class CloudflareStore extends MastraStorage {
// Instead, clear all keys in the namespace
await this.clearTable({ tableName });
} else {
const namespaceId = await this.getNamespaceId({ tableName });
const namespaceId = await this.getNamespaceId(tableName);
await this.client!.kv.namespaces.delete(namespaceId, {
account_id: this.accountId!,
});
Expand All @@ -93,20 +98,24 @@ export class CloudflareStore extends MastraStorage {
key,
value,
metadata,
schema,
}: {
tableName: TABLE_NAMES;
key: string;
value: string;
metadata?: string;
schema?: boolean;
}) {
// Ensure consistent serialization
const serializedValue = this.safeSerialize(value);
const serializedMetadata = metadata ? this.safeSerialize(metadata) : undefined;
if (this.bindings) {
const binding = this.getBinding(tableName);
if (serializedMetadata) {
await binding.put(key, serializedValue, { metadata: serializedMetadata });
} else {
await binding.put(key, serializedValue);
}
} else {
const namespaceId = await this.getNamespaceId({ tableName, schema });
const namespaceId = await this.getNamespaceId(tableName);
await this.client!.kv.namespaces.values.update(namespaceId, key, {
account_id: this.accountId!,
value: serializedValue,
Expand All @@ -120,32 +129,37 @@ export class CloudflareStore extends MastraStorage {
const binding = this.getBinding(tableName);
await binding.delete(key);
} else {
const namespaceId = await this.getNamespaceId({ tableName });
const namespaceId = await this.getNamespaceId(tableName);
await this.client!.kv.namespaces.values.delete(namespaceId, key, {
account_id: this.accountId!,
});
}
}

async listNamespaceKeys(tableName: TABLE_NAMES, options?: { limit?: number; prefix?: string }) {
if (this.bindings) {
const binding = this.getBinding(tableName);
const response = await binding.list({
limit: options?.limit || 1000,
prefix: options?.prefix,
});
try {
if (this.bindings) {
const binding = this.getBinding(tableName);
const response = await binding.list({
limit: options?.limit || 1000,
prefix: options?.prefix,
});

// Convert Workers API response to match REST API format
return response.keys;
} else {
const namespaceId = await this.getNamespaceId({ tableName });
// Use REST API
const response = await this.client!.kv.namespaces.keys.list(namespaceId, {
account_id: this.accountId!,
limit: options?.limit || 1000,
prefix: options?.prefix,
});
return response.result;
// Convert Workers API response to match REST API format
return response.keys;
} else {
const namespaceId = await this.getNamespaceId(tableName);
// Use REST API
const response = await this.client!.kv.namespaces.keys.list(namespaceId, {
account_id: this.accountId!,
limit: options?.limit || 1000,
prefix: options?.prefix,
});
return response.result;
}
} catch (error: any) {
console.error(`Error listing keys for namespace ${tableName}:`, error);
throw new Error(`Failed to list namespace keys: ${error.message}`);
}
}

Expand All @@ -154,8 +168,10 @@ export class CloudflareStore extends MastraStorage {
// For Workers API, namespaces are created at deploy time
// Return a mock response that matches REST API shape
return {
id: title, // Use title as ID since that's what we need
title: title,
result: {
id: title, // Use title as ID since that's what we need
title: title,
},
};
}
return await this.client!.kv.namespaces.create({
Expand Down Expand Up @@ -188,6 +204,10 @@ export class CloudflareStore extends MastraStorage {
private async createNamespace(namespaceName: string): Promise<string> {
try {
const response = await this.createNamespaceById(namespaceName);
// Handle both REST API and Workers API response shapes
if ('result' in response) {
return response.result.id;
}
return response.id;
} catch (error: any) {
// Check if the error is because it already exists
Expand All @@ -210,19 +230,11 @@ export class CloudflareStore extends MastraStorage {
return namespaceId;
}

private async getNamespaceId({
tableName,
schema = false,
}: {
tableName: TABLE_NAMES;
schema?: boolean;
}): Promise<string> {
private async getNamespaceId(tableName: TABLE_NAMES): Promise<string> {
const prefix = this.namespacePrefix;

try {
if (schema) {
return await this.getOrCreateNamespaceId(`${prefix}_mastra_schemas`);
} else if (tableName === TABLE_MESSAGES || tableName === TABLE_THREADS) {
if (tableName === TABLE_MESSAGES || tableName === TABLE_THREADS) {
return await this.getOrCreateNamespaceId(`${prefix}_mastra_threads`);
} else if (tableName === TABLE_WORKFLOW_SNAPSHOT) {
return await this.getOrCreateNamespaceId(`${prefix}_mastra_workflows`);
Expand Down Expand Up @@ -263,16 +275,14 @@ export class CloudflareStore extends MastraStorage {
key,
value,
metadata,
schema,
}: {
tableName: TABLE_NAMES;
key: string;
value: any;
metadata?: any;
schema?: boolean;
}): Promise<void> {
try {
await this.putNamespaceValue({ tableName, key, value, metadata, schema });
await this.putNamespaceValue({ tableName, key, value, metadata });
} catch (error: any) {
console.error(`Error putting KV for table ${tableName}, key ${key}:`, error);
throw new Error(`Failed to put KV: ${error.message}`);
Expand Down Expand Up @@ -551,7 +561,7 @@ export class CloudflareStore extends MastraStorage {
tableName,
createdAt: new Date().toISOString(),
};
await this.putKV({ tableName, key: schemaKey, value: schema, metadata, schema: true });
await this.putKV({ tableName, key: schemaKey, value: schema, metadata });
} catch (error) {
const errorMessage = `Failed to store schema for table ${tableName}`;
console.error(errorMessage, error);
Expand Down
1 change: 0 additions & 1 deletion stores/cloudflare/src/storage/rest-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ describe('CloudflareStore REST API', () => {
`${TEST_CONFIG.namespacePrefix}_mastra_threads`,
`${TEST_CONFIG.namespacePrefix}_mastra_workflows`,
`${TEST_CONFIG.namespacePrefix}_mastra_evals`,
`${TEST_CONFIG.namespacePrefix}_mastra_schemas`,
];

for (const namespace of namespaces) {
Expand Down