Skip to content
This repository was archived by the owner on Aug 25, 2024. It is now read-only.

Commit 38390a4

Browse files
programmer290399pdxjohnny
authored andcommitted
model: Archive support
Pulled __aenter__/__aexit__() out of context classes put in parent classes. Added a tutorial for archive support. Spacy model_name_or_path was changed to model_name. Fixes: #662 Signed-off-by: John Andersen <johnandersenpdx@gmail.com>
1 parent 413bf3e commit 38390a4

File tree

40 files changed

+1489
-826
lines changed

40 files changed

+1489
-826
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2525
- Usecase example notebook for "Saving and loading models"
2626
- Usecase example notebook for "Transfer Learning"
2727
- Usecase example notebook for "Ensemble by stacking"
28+
- Support for Archive Storage of Models
2829
- Support for Multi-Output models.
2930
- Usecase example notebook for "Working with Multi-Output models"
3031
- Optimizer `parameter_grid` for tuning models.
@@ -38,6 +39,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3839
- Config objects now support mutability/immutability at the property scope.
3940
See `docs/arch/0003-Config-Property-Mutable-vs-Immutable` for details.
4041
- high_level `accuracy()` now takes predict features as parameter.
42+
- Spacy `model_name_or_path` was changed to `model_name`. Functionality is the
43+
same, it still accepts a name or a path.
4144
### Fixed
4245
- Record object key properties are now always strings
4346

dffml/df/archive.py

+204
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
import uuid
2+
import pathlib
3+
import zipfile
4+
import tarfile
5+
import mimetypes
6+
from typing import Dict, Tuple, Any
7+
8+
from .types import DataFlow, Input, InputFlow, Operation
9+
from ..operation.archive import (
10+
make_tar_archive,
11+
make_zip_archive,
12+
extract_tar_archive,
13+
extract_zip_archive,
14+
)
15+
from ..operation.compression import (
16+
gz_compress,
17+
gz_decompress,
18+
bz2_compress,
19+
bz2_decompress,
20+
xz_compress,
21+
xz_decompress,
22+
)
23+
24+
25+
def get_key_substr(string: str, dict: dict, return_value: bool = True) -> Any:
26+
"""
27+
A function to find dictionary items whose key matches a substring.
28+
"""
29+
return [
30+
value if return_value else key
31+
for key, value in dict.items()
32+
if string in key.lower()
33+
][0]
34+
35+
36+
def get_archive_type(file_path: str) -> Tuple[str]:
37+
"""
38+
A function to get archive type if the file exists.
39+
"""
40+
archive_type, compression_type = None, None
41+
if zipfile.is_zipfile(file_path):
42+
archive_type = "zip"
43+
if tarfile.is_tarfile(file_path):
44+
archive_type = "tar"
45+
compression_type = mimetypes.guess_type(file_path)[1]
46+
return archive_type, compression_type
47+
48+
49+
def get_archive_path_info(path: str) -> Tuple[str]:
50+
"""
51+
A function to find type of archive from the given path
52+
if the file does not exists.
53+
"""
54+
archive_type, compression_type = None, None
55+
file_type, compression_type = mimetypes.guess_type(path)
56+
file_subtype = file_type.split("/")[-1] if file_type is not None else None
57+
if file_subtype == "zip":
58+
archive_type = "zip"
59+
if file_subtype == "x-tar":
60+
archive_type = "tar"
61+
return archive_type, compression_type
62+
63+
64+
def get_operations(
65+
archive_action: str, archive_type: str, compression_type: str
66+
) -> Tuple[Operation]:
67+
"""
68+
A function to fetch relevant operations based on type of archive
69+
and compression if any.
70+
"""
71+
operations = {
72+
"archive_ops": {
73+
"zip": {
74+
"extract": extract_zip_archive,
75+
"archive": make_zip_archive,
76+
},
77+
"tar": {
78+
"extract": extract_tar_archive,
79+
"archive": make_tar_archive,
80+
},
81+
},
82+
"compression_ops": {
83+
"gzip": {"compress": gz_compress, "decompress": gz_decompress},
84+
"xz": {"compress": xz_compress, "decompress": xz_decompress},
85+
"bzip2": {"compress": bz2_compress, "decompress": bz2_decompress,},
86+
},
87+
}
88+
archive_op = operations["archive_ops"][archive_type][archive_action]
89+
compression_op = None
90+
if compression_type is not None:
91+
compression_action = (
92+
"compress" if archive_action == "archive" else "decompress"
93+
)
94+
compression_op = operations["compression_ops"][compression_type][
95+
compression_action
96+
]
97+
return archive_op, compression_op
98+
99+
100+
def deduce_archive_action(seed: Dict) -> Tuple[str]:
101+
"""
102+
A function to deduce archive action as 'extract' or 'archive'
103+
based on the seed and find type and compression of the archive.
104+
"""
105+
input_path, output_path = seed["input_path"], seed["output_path"]
106+
input_exists, input_is_file, input_is_dir = (
107+
input_path.exists(),
108+
input_path.is_file(),
109+
input_path.is_dir(),
110+
)
111+
output_exists, output_is_file, output_is_dir = (
112+
output_path.exists(),
113+
output_path.is_file(),
114+
output_path.is_dir(),
115+
)
116+
117+
if all([input_exists, output_exists, output_is_dir, input_is_file]):
118+
action = "extract"
119+
archive_type, compression_type = get_archive_type(input_path)
120+
elif all([input_exists, output_exists, input_is_dir, output_is_file]):
121+
action = "archive"
122+
archive_type, compression_type = get_archive_type(output_path)
123+
elif all([input_exists, not output_exists, input_is_dir]):
124+
# Triggered on first time use
125+
action = "archive"
126+
archive_type, compression_type = get_archive_path_info(output_path)
127+
return action, archive_type, compression_type
128+
129+
130+
def create_chained_archive_dataflow(
131+
action, first_op, second_op, seed, temp_dir
132+
) -> DataFlow:
133+
"""
134+
A function to create chained dataflows for archive extraction/creation.
135+
"""
136+
second_op_output_typ = "directory" if action == "extract" else "file"
137+
dataflow = DataFlow(
138+
operations={first_op.op.name: first_op, second_op.op.name: second_op,},
139+
seed={
140+
Input(
141+
value=seed["input_path"],
142+
definition=get_key_substr("input", first_op.op.inputs),
143+
),
144+
Input(
145+
value=temp_dir / f"{str(uuid.uuid4())}.tar",
146+
definition=get_key_substr("output", first_op.op.inputs),
147+
),
148+
Input(
149+
value=seed["output_path"],
150+
definition=get_key_substr("output", second_op.op.inputs),
151+
origin="seed.final_output",
152+
),
153+
},
154+
)
155+
dataflow.flow.update(
156+
{
157+
second_op.op.name: InputFlow(
158+
inputs={
159+
"input_file_path": [{first_op.op.name: "output_path"}],
160+
f"output_{second_op_output_typ}_path": [
161+
"seed.final_output"
162+
],
163+
}
164+
)
165+
}
166+
)
167+
dataflow.update()
168+
return dataflow
169+
170+
171+
def create_archive_dataflow(seed: set) -> DataFlow:
172+
"""
173+
A function to create appropriate dataflow to extract/create an archive
174+
if it is supported.
175+
"""
176+
seed = {input_.origin: pathlib.Path(input_.value) for input_ in seed}
177+
action, archive_type, compression_type = deduce_archive_action(seed)
178+
archive_op, compression_op = get_operations(
179+
action, archive_type, compression_type
180+
)
181+
182+
if compression_op is None:
183+
dataflow = DataFlow(
184+
operations={archive_op.op.name: archive_op},
185+
seed={
186+
Input(
187+
value=seed["input_path"],
188+
definition=get_key_substr("input", archive_op.op.inputs),
189+
),
190+
Input(
191+
value=seed["output_path"],
192+
definition=get_key_substr("output", archive_op.op.inputs),
193+
),
194+
},
195+
)
196+
else:
197+
first_op = compression_op if action == "extract" else archive_op
198+
second_op = (
199+
compression_op if first_op is not compression_op else archive_op
200+
)
201+
dataflow = create_chained_archive_dataflow(
202+
action, first_op, second_op, seed, seed["input_path"].parent
203+
)
204+
return dataflow

0 commit comments

Comments
 (0)