-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsqlite.py
74 lines (66 loc) · 1.94 KB
/
sqlite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import pathlib
import sqlite3
from datetime import datetime
from typing import Optional, Union, cast
from cachepot.backend import CacheBackendProtocol
from cachepot.expire import ExpireSeconds, to_timedelta
ConnectionLike = Union[str, pathlib.Path, sqlite3.Connection]
class SQLiteCacheBackend(CacheBackendProtocol):
conn: sqlite3.Connection
def __init__(self, conn: ConnectionLike):
if isinstance(conn, str):
conn = sqlite3.connect(conn)
elif isinstance(conn, pathlib.Path):
conn = sqlite3.connect(conn)
conn.text_factory = bytes
conn.execute(
"""\
CREATE TABLE IF NOT EXISTS cachepot
( key BLOB PRIMARY KEY
, value BLOB
, expire_at timestamp
)"""
)
conn.execute(
"""\
CREATE UNIQUE INDEX IF NOT EXISTS idx_cachepot
ON cachepot
( key
, expire_at
)"""
)
self.conn = conn
def save(
self, key: bytes, value: bytes, *, expire_seconds: ExpireSeconds
) -> None:
expire_at = datetime.now() + to_timedelta(expire_seconds)
self.conn.execute(
"""\
INSERT OR REPLACE INTO cachepot
(key, value, expire_at)
VALUES (?, ?, ?)""",
(key, value, expire_at),
)
self.conn.commit()
def load(self, key: bytes) -> Optional[bytes]:
current_datetime = datetime.now()
result = self.conn.execute(
"""\
SELECT value
FROM cachepot
WHERE key = ?
AND expire_at > ?""",
(key, current_datetime),
).fetchone()
if result:
return cast(bytes, result[0])
return None
def delete(self, key: bytes) -> None:
self.conn.execute(
"""\
DELETE
FROM cachepot
WHERE key = ?""",
(key,),
)
self.conn.commit()