Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ian/mastra slim #2972

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
89ea537
add generic to base storage to match
ianmacartney Mar 15, 2025
6c4a9cd
bypass libsql by using an in-memory store by default
ianmacartney Mar 15, 2025
e83bd25
generic second pass
ianmacartney Mar 15, 2025
96aae97
move the in memory out of the proxy
ianmacartney Mar 15, 2025
3689aea
add proxy vector like for storage
ianmacartney Mar 15, 2025
3ac98ff
use the existing db if there is one, or from the env var if set
ianmacartney Mar 15, 2025
a0fa794
use in-memory vector
ianmacartney Mar 15, 2025
f7fef13
more targeted imports
ianmacartney Mar 15, 2025
d6d5479
obfuscate import to avoid bundling
ianmacartney Mar 15, 2025
5d9e803
mark @libsql/client as optional
ianmacartney Mar 15, 2025
5b2afb1
make in-memory store transactional on write
ianmacartney Mar 15, 2025
8173028
move types back out
ianmacartney Mar 15, 2025
229bcc8
default proxy storage is where in memory is decided
ianmacartney Mar 15, 2025
e184af7
make default-proxy-vector decide between in-memory or not
ianmacartney Mar 15, 2025
b976769
move env vars up
ianmacartney Mar 16, 2025
28bcdc6
Revert "mark @libsql/client as optional"
ianmacartney Mar 16, 2025
720095c
minimize changes
ianmacartney Mar 16, 2025
4cc8613
fixup move env vars up
ianmacartney Mar 16, 2025
ae7bbc5
use in-memory for file::memory:* as well
ianmacartney Mar 16, 2025
4c64f13
resolve default
ianmacartney Mar 16, 2025
cc7cf39
dynamic import without obfuscation
ianmacartney Mar 16, 2025
2b821ab
dynamic import of libsql
ianmacartney Mar 16, 2025
828d9c6
move libsql type back in
ianmacartney Mar 16, 2025
8623aa5
remove unnecessary change
ianmacartney Mar 16, 2025
aec7e05
use @libsql/core/web
ianmacartney Mar 16, 2025
dc54ea4
remove vector proxy obfuscation
ianmacartney Mar 16, 2025
5e603b6
Revert "use @libsql/core/web"
ianmacartney Mar 16, 2025
a9cbc19
Revert "dynamic import of libsql"
ianmacartney Mar 16, 2025
084e827
remove unnecessary change
ianmacartney Mar 16, 2025
7419171
move from in-memory implementations back to just libsql for the defau…
ianmacartney Mar 18, 2025
09db6de
Merge branch 'main' into ian/mastra-slim
ianmacartney Mar 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/cli/src/commands/dev/dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const startServer = async (dotMastraPath: string, port: number, env: Map<string,
...Object.fromEntries(env),
PORT: port.toString() || process.env.PORT || '4111',
MASTRA_DEFAULT_STORAGE_URL: `file:${join(dotMastraPath, '..', 'mastra.db')}`,
MASTRA_DEFAULT_VECTOR_URL: `file:${join(dotMastraPath, '..', 'memory.db')}`,
},
stdio: 'inherit',
reject: false,
Expand Down
17 changes: 5 additions & 12 deletions packages/core/src/memory/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import { DefaultProxyStorage } from '../storage/default-proxy-storage';
import type { CoreTool } from '../tools';
import { deepMerge } from '../utils';
import type { MastraVector } from '../vector';
import { DefaultProxyVector } from '../vector/default-proxy-vector';
import { defaultEmbedder } from '../vector/fastembed';
import { DefaultVectorDB } from '../vector/libsql';

import type { MessageType, SharedMemoryConfig, StorageThreadType, MemoryConfig, AiMessageType } from './types';

Expand All @@ -39,17 +39,11 @@ export abstract class MastraMemory extends MastraBase {
generateTitle: true, // TODO: should we disable this by default to reduce latency?
},
};

constructor(config: { name: string } & SharedMemoryConfig) {
constructor(config: { name: string; defaultVectorUrl?: string; defaultStorageUrl?: string } & SharedMemoryConfig) {
super({ component: 'MEMORY', name: config.name });

this.storage =
config.storage ||
new DefaultProxyStorage({
config: {
url: 'file:memory.db',
},
});
config.storage || new DefaultProxyStorage({ config: { url: config.defaultStorageUrl || 'file:memory.db' } });

if (config.vector) {
this.vector = config.vector;
Expand All @@ -67,9 +61,8 @@ export abstract class MastraMemory extends MastraBase {
`Found deprecated Memory vector db file ${oldDb} this db is now merged with the default ${newDb} file. Delete the old one to use the new one. You will need to migrate any data if that's important to you. For now the deprecated path will be used but in a future breaking change we will only use the new db file path.`,
);
}

this.vector = new DefaultVectorDB({
connectionUrl: hasOldDb ? `file:${oldDb}` : `file:${newDb}`,
this.vector = new DefaultProxyVector({
connectionUrl: (hasOldDb && `file:${oldDb}`) || config.defaultVectorUrl || `file:${newDb}`,
});
}

Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/storage/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ export abstract class MastraStorage extends MastraBase {
return this.deleteThread({ threadId });
}

abstract getMessages({ threadId, selectBy, threadConfig }: StorageGetMessagesArg): Promise<MessageType[]>;
abstract getMessages<T extends MessageType>({
threadId,
selectBy,
threadConfig,
}: StorageGetMessagesArg): Promise<T[]>;

async __getMessages({ threadId, selectBy, threadConfig }: StorageGetMessagesArg): Promise<MessageType[]> {
await this.init();
Expand Down
106 changes: 36 additions & 70 deletions packages/core/src/storage/default-proxy-storage.ts
Original file line number Diff line number Diff line change
@@ -1,114 +1,81 @@
import type { MessageType, StorageThreadType } from '../memory/types';
import { MastraStorage } from './base';
import type { TABLE_NAMES } from './constants';
import type { DefaultStorage, type LibSQLConfig } from './libsql';
import type { LibSQLConfig } from './libsql';
import type { EvalRow, StorageColumn, StorageGetMessagesArg } from './types';

/**
* A proxy for the DefaultStorage (LibSQLStore) to allow for dynamically loading the storage in a constructor
* If the storage is in-memory, it will use the InMemoryStorage.
*/
export class DefaultProxyStorage extends MastraStorage {
private storage: DefaultStorage | null = null;
private storageConfig: LibSQLConfig;
private isInitializingPromise: Promise<void> | null = null;
private storage: Promise<MastraStorage>;

constructor({ config }: { config: LibSQLConfig }) {
super({ name: 'DefaultStorage' });
this.storageConfig = config;
this.storage = new Promise((resolve, reject) => {
import('./libsql')
.then(({ DefaultStorage }) => {
resolve(new DefaultStorage({ config }));
})
.catch(reject);
});
}

private setupStorage() {
if (!this.isInitializingPromise) {
this.isInitializingPromise = new Promise((resolve, reject) => {
import('./libsql')
.then(({ DefaultStorage }) => {
this.storage = new DefaultStorage({ config: this.storageConfig });
resolve();
})
.catch(reject);
});
}

return this.isInitializingPromise;
async createTable(args: { tableName: TABLE_NAMES; schema: Record<string, StorageColumn> }): Promise<void> {
return (await this.storage).createTable(args);
}

async createTable({
tableName,
schema,
}: {
tableName: TABLE_NAMES;
schema: Record<string, StorageColumn>;
}): Promise<void> {
await this.setupStorage();
return this.storage!.createTable({ tableName, schema });
async clearTable(args: { tableName: TABLE_NAMES }): Promise<void> {
return (await this.storage).clearTable(args);
}

async clearTable({ tableName }: { tableName: TABLE_NAMES }): Promise<void> {
await this.setupStorage();
return this.storage!.clearTable({ tableName });
async insert(args: { tableName: TABLE_NAMES; record: Record<string, any> }): Promise<void> {
return (await this.storage).insert(args);
}

async insert({ tableName, record }: { tableName: TABLE_NAMES; record: Record<string, any> }): Promise<void> {
await this.setupStorage();
return this.storage!.insert({ tableName, record });
async batchInsert(args: { tableName: TABLE_NAMES; records: Record<string, any>[] }): Promise<void> {
return (await this.storage).batchInsert(args);
}

async batchInsert({ tableName, records }: { tableName: TABLE_NAMES; records: Record<string, any>[] }): Promise<void> {
await this.setupStorage();
return this.storage!.batchInsert({ tableName, records });
async load<R>(args: { tableName: TABLE_NAMES; keys: Record<string, string> }): Promise<R | null> {
return (await this.storage).load<R>(args);
}

async load<R>({ tableName, keys }: { tableName: TABLE_NAMES; keys: Record<string, string> }): Promise<R | null> {
await this.setupStorage();
return this.storage!.load<R>({ tableName, keys });
async getThreadById(args: { threadId: string }): Promise<StorageThreadType | null> {
return (await this.storage).getThreadById(args);
}

async getThreadById({ threadId }: { threadId: string }): Promise<StorageThreadType | null> {
await this.setupStorage();
return this.storage!.getThreadById({ threadId });
async getThreadsByResourceId(args: { resourceId: string }): Promise<StorageThreadType[]> {
return (await this.storage).getThreadsByResourceId(args);
}

async getThreadsByResourceId({ resourceId }: { resourceId: string }): Promise<StorageThreadType[]> {
await this.setupStorage();
return this.storage!.getThreadsByResourceId({ resourceId });
async saveThread(args: { thread: StorageThreadType }): Promise<StorageThreadType> {
return (await this.storage).saveThread(args);
}

async saveThread({ thread }: { thread: StorageThreadType }): Promise<StorageThreadType> {
await this.setupStorage();
return this.storage!.saveThread({ thread });
}

async updateThread({
id,
title,
metadata,
}: {
async updateThread(args: {
id: string;
title: string;
metadata: Record<string, unknown>;
}): Promise<StorageThreadType> {
await this.setupStorage();
return this.storage!.updateThread({ id, title, metadata });
return (await this.storage).updateThread(args);
}

async deleteThread({ threadId }: { threadId: string }): Promise<void> {
await this.setupStorage();
return this.storage!.deleteThread({ threadId });
async deleteThread(args: { threadId: string }): Promise<void> {
return (await this.storage).deleteThread(args);
}

async getMessages<T extends MessageType[]>({ threadId, selectBy }: StorageGetMessagesArg): Promise<T> {
await this.setupStorage();
return this.storage!.getMessages<T>({ threadId, selectBy });
async getMessages<T extends MessageType>(args: StorageGetMessagesArg): Promise<T[]> {
return (await this.storage).getMessages<T>(args);
}

async saveMessages({ messages }: { messages: MessageType[] }): Promise<MessageType[]> {
await this.setupStorage();
return this.storage!.saveMessages({ messages });
async saveMessages(args: { messages: MessageType[] }): Promise<MessageType[]> {
return (await this.storage).saveMessages(args);
}

async getEvalsByAgentName(agentName: string, type?: 'test' | 'live'): Promise<EvalRow[]> {
await this.setupStorage();
return this.storage!.getEvalsByAgentName(agentName, type);
return (await this.storage).getEvalsByAgentName(agentName, type);
}

async getTraces(options?: {
Expand All @@ -118,7 +85,6 @@ export class DefaultProxyStorage extends MastraStorage {
perPage: number;
attributes?: Record<string, string>;
}): Promise<any[]> {
await this.setupStorage();
return this.storage!.getTraces(options);
return (await this.storage).getTraces(options ?? { page: 0, perPage: 100 });
}
}
4 changes: 2 additions & 2 deletions packages/core/src/storage/libsql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ export class LibSQLStore extends MastraStorage {
} as MessageType;
}

async getMessages<T extends MessageType[]>({ threadId, selectBy }: StorageGetMessagesArg): Promise<T> {
async getMessages<T extends MessageType>({ threadId, selectBy }: StorageGetMessagesArg): Promise<T[]> {
try {
const messages: MessageType[] = [];
const limit = typeof selectBy?.last === `number` ? selectBy.last : 40;
Expand Down Expand Up @@ -388,7 +388,7 @@ export class LibSQLStore extends MastraStorage {
// Sort all messages by creation date
messages.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime());

return messages as T;
return messages as T[];
} catch (error) {
this.logger.error('Error getting messages:', error as Error);
throw error;
Expand Down
62 changes: 62 additions & 0 deletions packages/core/src/vector/default-proxy-vector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import type { LibSQLVectorConfig } from './libsql';
import type {
CreateIndexParams,
UpsertVectorParams,
QueryVectorParams,
IndexStats,
ParamsToArgs,
QueryResult,
CreateIndexArgs,
UpsertVectorArgs,
QueryVectorArgs,
} from './types';
import { MastraVector } from './vector';

/**
* A proxy for the DefaultVector (LibSQLStore) to allow for dynamically loading the vectorDB in a constructor
* If the vectorDB is in-memory, it will use the InMemoryVector.
*/
export class DefaultProxyVector extends MastraVector {
private vectorDB: Promise<MastraVector>;

constructor(config: LibSQLVectorConfig) {
super();
this.vectorDB = new Promise((resolve, reject) => {
import('./libsql')
.then(({ DefaultVectorDB }) => {
resolve(new DefaultVectorDB(config));
})
.catch(reject);
});
}

async query<E extends QueryVectorArgs = QueryVectorArgs>(
...args: ParamsToArgs<QueryVectorParams> | E
): Promise<QueryResult[]> {
return (await this.vectorDB).query(...args);
}
// Adds type checks for positional arguments if used
async upsert<E extends UpsertVectorArgs = UpsertVectorArgs>(
...args: ParamsToArgs<UpsertVectorParams> | E
): Promise<string[]> {
return (await this.vectorDB).upsert(...args);
}
// Adds type checks for positional arguments if used
async createIndex<E extends CreateIndexArgs = CreateIndexArgs>(
...args: ParamsToArgs<CreateIndexParams> | E
): Promise<void> {
return (await this.vectorDB).createIndex(...args);
}

async listIndexes(): Promise<string[]> {
return (await this.vectorDB).listIndexes();
}

async describeIndex(indexName: string): Promise<IndexStats> {
return (await this.vectorDB).describeIndex(indexName);
}

async deleteIndex(indexName: string): Promise<void> {
return (await this.vectorDB).deleteIndex(indexName);
}
}
28 changes: 11 additions & 17 deletions packages/core/src/vector/libsql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,22 @@ interface LibSQLQueryParams extends QueryVectorParams {

type LibSQLQueryArgs = [...QueryVectorArgs, number?];

export type LibSQLVectorConfig = {
connectionUrl: string;
authToken?: string;
syncUrl?: string;
syncInterval?: number;
};

export class LibSQLVector extends MastraVector {
private turso: TursoClient;

constructor({
connectionUrl,
authToken,
syncUrl,
syncInterval,
}: {
connectionUrl: string;
authToken?: string;
syncUrl?: string;
syncInterval?: number;
}) {
constructor(config: LibSQLVectorConfig) {
super();
const { connectionUrl, authToken, syncUrl, syncInterval } = config;
const url = this.rewriteDbUrl(connectionUrl);

this.turso = createClient({
url: this.rewriteDbUrl(connectionUrl),
syncUrl: syncUrl,
authToken,
syncInterval,
});
this.turso = createClient({ url, syncUrl, authToken, syncInterval });
}

// If we're in the .mastra/output directory, use the dir outside .mastra dir
Expand Down
7 changes: 6 additions & 1 deletion packages/memory/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import { updateWorkingMemoryTool } from './tools/working-memory';
*/
export class Memory extends MastraMemory {
constructor(config: SharedMemoryConfig = {}) {
super({ name: 'Memory', ...config });
super({
name: 'Memory',
defaultStorageUrl: process.env.MASTRA_DEFAULT_STORAGE_URL,
defaultVectorUrl: process.env.MASTRA_DEFAULT_VECTOR_URL,
...config,
});

const mergedConfig = this.getMergedThreadConfig({
workingMemory: config.options?.workingMemory || {
Expand Down
10 changes: 5 additions & 5 deletions stores/upstash/src/storage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ export class UpstashStore extends MastraStorage {
return messages;
}

async getMessages<T = unknown>({ threadId, selectBy }: StorageGetMessagesArg): Promise<T[]> {
async getMessages<T extends MessageType>({ threadId, selectBy }: StorageGetMessagesArg): Promise<T[]> {
const limit = typeof selectBy?.last === `number` ? selectBy.last : 40;
const messageIds = new Set<string>();
const threadMessagesKey = this.getThreadMessagesKey(threadId);

if (limit === 0 && !selectBy?.include) {
return [];
return [] as T[];
}

// First, get specifically included messages and their context
Expand Down Expand Up @@ -253,17 +253,17 @@ export class UpstashStore extends MastraStorage {
const messages = (
await Promise.all(
Array.from(messageIds).map(async id =>
this.redis.get<MessageType & { _index?: number }>(this.getMessageKey(threadId, id)),
this.redis.get<T & { _index?: number }>(this.getMessageKey(threadId, id)),
),
)
).filter(msg => msg !== null) as (MessageType & { _index?: number })[];
).filter(msg => msg !== null) as (T & { _index?: number })[];

// Sort messages by their position in the sorted set
const messageOrder = await this.redis.zrange(threadMessagesKey, 0, -1);
messages.sort((a, b) => messageOrder.indexOf(a!.id) - messageOrder.indexOf(b!.id));

// Remove _index before returning
return messages.map(({ _index, ...message }) => message as unknown as T);
return messages.map(({ _index, ...message }) => message) as T[];
}

async persistWorkflowSnapshot(params: {
Expand Down