Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds StorageDescriptor and tests #2109

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions google/cloud/bigquery/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,3 +644,121 @@ def from_api_repr(cls, api_repr: dict) -> SerDeInfo:
config = cls("PLACEHOLDER")
config._properties = api_repr
return config


class StorageDescriptor:
"""Contains information about how a table's data is stored and accessed by open
source query engines.

Args:
input_format (Optional[str]): Specifies the fully qualified class name of
the InputFormat (e.g.
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"). The maximum
length is 128 characters.
location_uri (Optional[str]): The physical location of the table (e.g.
'gs://spark-dataproc-data/pangea-data/case_sensitive/' or
'gs://spark-dataproc-data/pangea-data/'). The maximum length is
2056 bytes.
output_format (Optional[str]): Specifies the fully qualified class name
of the OutputFormat (e.g.
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"). The maximum
length is 128 characters.
serde_info (Union[SerDeInfo, dict, None]): Serializer and deserializer information.
"""

def __init__(
self,
input_format: Optional[str] = None,
location_uri: Optional[str] = None,
output_format: Optional[str] = None,
serde_info: Union[SerDeInfo, dict, None] = None,
):
self._properties: Dict[str, Any] = {}
self.input_format = input_format
self.location_uri = location_uri
self.output_format = output_format
# Using typing.cast() because mypy cannot wrap it's head around the fact that:
# the setter can accept Union[SerDeInfo, dict, None]
# but the getter will only ever return Optional[SerDeInfo].
self.serde_info = typing.cast(Optional[SerDeInfo], serde_info)

@property
def input_format(self) -> Optional[str]:
"""Optional. Specifies the fully qualified class name of the InputFormat
(e.g. "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"). The maximum
length is 128 characters."""

return self._properties.get("inputFormat")

@input_format.setter
def input_format(self, value: Optional[str]):
value = _helpers._isinstance_or_raise(value, str, none_allowed=True)
self._properties["inputFormat"] = value

@property
def location_uri(self) -> Optional[str]:
"""Optional. The physical location of the table (e.g. 'gs://spark-
dataproc-data/pangea-data/case_sensitive/' or 'gs://spark-dataproc-
data/pangea-data/'). The maximum length is 2056 bytes."""

return self._properties.get("locationUri")

@location_uri.setter
def location_uri(self, value: Optional[str]):
value = _helpers._isinstance_or_raise(value, str, none_allowed=True)
self._properties["locationUri"] = value

@property
def output_format(self) -> Optional[str]:
"""Optional. Specifies the fully qualified class name of the
OutputFormat (e.g. "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat").
The maximum length is 128 characters."""

return self._properties.get("outputFormat")

@output_format.setter
def output_format(self, value: Optional[str]):
value = _helpers._isinstance_or_raise(value, str, none_allowed=True)
self._properties["outputFormat"] = value

@property
def serde_info(self) -> Optional[SerDeInfo]:
"""Optional. Serializer and deserializer information."""

prop = _helpers._get_sub_prop(self._properties, ["serDeInfo"])
if prop is not None:
return typing.cast(SerDeInfo, SerDeInfo.from_api_repr(prop))
return None

@serde_info.setter
def serde_info(self, value: Union[SerDeInfo, dict, None]):
value = _helpers._isinstance_or_raise(
value, (SerDeInfo, dict), none_allowed=True
)

if isinstance(value, SerDeInfo):
self._properties["serDeInfo"] = value.to_api_repr()
else:
self._properties["serDeInfo"] = value

def to_api_repr(self) -> dict:
"""Build an API representation of this object.
Returns:
Dict[str, Any]:
A dictionary in the format used by the BigQuery API.
"""
return self._properties

@classmethod
def from_api_repr(cls, resource: dict) -> StorageDescriptor:
"""Factory: constructs an instance of the class (cls)
given its API representation.
Args:
resource (Dict[str, Any]):
API representation of the object to be instantiated.
Returns:
An instance of the class initialized with data from 'resource'.
"""
config = cls()
config._properties = resource
return config
128 changes: 128 additions & 0 deletions tests/unit/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,3 +1213,131 @@ def test_from_api_repr(self):
# We convert both to dict format because these classes do not have a
# __eq__() method to facilitate direct equality comparisons.
assert result.to_api_repr() == expected.to_api_repr()


class TestStorageDescriptor:
"""Tests for the StorageDescriptor class."""

@staticmethod
def _get_target_class():
return schema.StorageDescriptor

def _make_one(self, *args, **kwargs):
return self._get_target_class()(*args, **kwargs)

serdeinfo_resource = {
"serialization_library": "testpath.to.LazySimpleSerDe",
"name": "serde_lib_name",
"parameters": {"key": "value"},
}

SERDEINFO = schema.SerDeInfo("PLACEHOLDER").from_api_repr(serdeinfo_resource)

STORAGEDESCRIPTOR = {
"inputFormat": "testpath.to.OrcInputFormat",
"locationUri": "gs://test/path/",
"outputFormat": "testpath.to.OrcOutputFormat",
"serDeInfo": SERDEINFO.to_api_repr(),
}

@pytest.mark.parametrize(
"input_format,location_uri,output_format,serde_info",
[
(None, None, None, None),
("testpath.to.OrcInputFormat", None, None, None),
(None, "gs://test/path/", None, None),
(None, None, "testpath.to.OrcOutputFormat", None),
(None, None, None, SERDEINFO),
(
"testpath.to.OrcInputFormat",
"gs://test/path/",
"testpath.to.OrcOutputFormat",
SERDEINFO, # uses SERDEINFO class format
),
(
"testpath.to.OrcInputFormat",
"gs://test/path/",
"testpath.to.OrcOutputFormat",
serdeinfo_resource, # uses api resource format (dict)
),
],
)
def test_ctor_valid_input(
self, input_format, location_uri, output_format, serde_info
):
storage_descriptor = self._make_one(
input_format=input_format,
location_uri=location_uri,
output_format=output_format,
serde_info=serde_info,
)
assert storage_descriptor.input_format == input_format
assert storage_descriptor.location_uri == location_uri
assert storage_descriptor.output_format == output_format
if isinstance(serde_info, schema.SerDeInfo):
assert (
storage_descriptor.serde_info.to_api_repr() == serde_info.to_api_repr()
)
elif isinstance(serde_info, dict):
assert storage_descriptor.serde_info.to_api_repr() == serde_info
else:
assert storage_descriptor.serde_info is None

@pytest.mark.parametrize(
"input_format,location_uri,output_format,serde_info",
[
(123, None, None, None),
(None, 123, None, None),
(None, None, 123, None),
(None, None, None, 123),
],
)
def test_ctor_invalid_input(
self, input_format, location_uri, output_format, serde_info
):
with pytest.raises(TypeError) as e:
self._make_one(
input_format=input_format,
location_uri=location_uri,
output_format=output_format,
serde_info=serde_info,
)

# Looking for the first word from the string "Pass <variable> as..."
assert "Pass " in str(e.value)

def test_to_api_repr(self):
storage_descriptor = self._make_one(
input_format="input_format",
location_uri="location_uri",
output_format="output_format",
serde_info=self.SERDEINFO,
)
expected_repr = {
"inputFormat": "input_format",
"locationUri": "location_uri",
"outputFormat": "output_format",
"serDeInfo": self.SERDEINFO.to_api_repr(),
}
assert storage_descriptor.to_api_repr() == expected_repr

def test_from_api_repr(self):
"""GIVEN an api representation of a StorageDescriptor (i.e. STORAGEDESCRIPTOR)
WHEN converted into a StorageDescriptor using from_api_repr() and
displayed as a dict
THEN it will have the same representation a StorageDescriptor created
directly (via the _make_one() func) and displayed as a dict.
"""

# generate via STORAGEDESCRIPTOR
resource = self.STORAGEDESCRIPTOR
result = self._get_target_class().from_api_repr(resource)
# result = klass.from_api_repr(resource)

expected = self._make_one(
input_format="testpath.to.OrcInputFormat",
location_uri="gs://test/path/",
output_format="testpath.to.OrcOutputFormat",
serde_info=self.SERDEINFO,
)
assert result.to_api_repr() == expected.to_api_repr()
Loading