Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Salesforce Source #121

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: address lint errors
  • Loading branch information
turtleDev committed Feb 11, 2025
commit 35f3c97358db44f44e0d2a3d121476a65518f2cf
15 changes: 8 additions & 7 deletions ingestr/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2852,7 +2852,7 @@ def test_compound_table_name(dest_uri):
)
assert result.exit_code == 0
assert_rows(dest_uri, dest_table, 6)

def test_uri_precedence(dest_uri):
"""
When file glob is present in both URI and Source Table,
Expand All @@ -2868,7 +2868,7 @@ def test_uri_precedence(dest_uri):
dest_table = f"{schema_rand_prefix}.fs_{get_random_string(5)}"
result = invoke_ingest_command(
f"{protocol}://bucket/*.csv?{auth}",
"/path/to/file", # if this is used, it should result in an error
"/path/to/file", # if this is used, it should result in an error
dest_uri,
dest_table,
)
Expand Down Expand Up @@ -2919,6 +2919,7 @@ def test_s3(dest, test_case):
test_case(dest.start())
dest.stop()


def applovin_test_cases() -> Iterable[Callable]:
def missing_api_key():
result = invoke_ingest_command(
Expand All @@ -2929,12 +2930,13 @@ def missing_api_key():
)
assert result.exit_code != 0
assert has_exception(result.exception, MissingValueError)

def invalid_source_table():
result = invoke_ingest_command(
"applovin://?api_key=123",
"unknown-report",
"duckdb:///out.db",
"public.unknown_report"
"public.unknown_report",
)
assert result.exit_code != 0
assert has_exception(result.exception, UnsupportedResourceError)
Expand All @@ -2944,8 +2946,7 @@ def invalid_source_table():
invalid_source_table,
]

@pytest.mark.parametrize(
"testcase", applovin_test_cases()
)

@pytest.mark.parametrize("testcase", applovin_test_cases())
def test_applovin_source(testcase):
testcase()
testcase()
15 changes: 8 additions & 7 deletions ingestr/src/applovin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from datetime import datetime, timezone, timedelta
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Dict, List, Optional
from requests import Response

import dlt
from dlt.sources.rest_api import EndpointResource, RESTAPIConfig, rest_api_resources
from requests import Response


class InvalidCustomReportError(Exception):
Expand All @@ -13,9 +13,11 @@ def __init__(self):
"Custom report should be in the format 'custom:{endpoint}:{report_type}:{dimensions}"
)


class ClientError(Exception):
pass


TYPE_HINTS = {
"application_is_hidden": {"data_type": "bool"},
"average_cpa": {"data_type": "double"},
Expand Down Expand Up @@ -119,15 +121,14 @@ def applovin_source(
end_date: Optional[str],
custom: Optional[str],
):

backfill = False
if end_date is None:
backfill = True

# use the greatest of yesterday and start_date
end_date = max(
datetime.now(timezone.utc) - timedelta(days=1),
datetime.fromisoformat(start_date).replace(tzinfo=timezone.utc)
datetime.fromisoformat(start_date).replace(tzinfo=timezone.utc),
).strftime("%Y-%m-%d")

config: RESTAPIConfig = {
Expand Down Expand Up @@ -157,7 +158,7 @@ def applovin_source(
"paginator": "single_page",
"response_actions": [
http_error_handler,
]
],
},
},
"resources": [
Expand All @@ -177,8 +178,7 @@ def applovin_source(
"advertiser-probabilistic-report",
"probabilisticReport",
exclude(
REPORT_SCHEMA[ReportType.ADVERTISER],
PROBABILISTIC_REPORT_EXCLUDE
REPORT_SCHEMA[ReportType.ADVERTISER], PROBABILISTIC_REPORT_EXCLUDE
),
ReportType.ADVERTISER,
),
Expand Down Expand Up @@ -256,6 +256,7 @@ def exclude(source: List[str], exclude_list: List[str]) -> List[str]:
def build_type_hints(cols: List[str]) -> dict:
return {col: TYPE_HINTS[col] for col in cols if col in TYPE_HINTS}


def http_error_handler(resp: Response):
if not resp.ok:
raise ClientError(f"HTTP Status {resp.status_code}: {resp.text}")
2 changes: 1 addition & 1 deletion ingestr/src/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@
MongoDbSource,
NotionSource,
S3Source,
SalesforceSource,
ShopifySource,
SlackSource,
SqlSource,
StripeAnalyticsSource,
TikTokSource,
ZendeskSource,
SalesforceSource,
)

SQL_SOURCE_SCHEMES = [
Expand Down
27 changes: 12 additions & 15 deletions ingestr/src/salesforce/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
from dlt.sources import DltResource
from dlt.sources import incremental

from typing import Iterable

import dlt
from simple_salesforce import Salesforce
from dlt.common.typing import TDataItem

from dlt.sources import DltResource, incremental
from simple_salesforce import Salesforce

from .helpers import get_records


@dlt.source(name="salesforce")
def salesforce_source(
username: str,
username: str,
password: str,
token: str,
) -> Iterable[DltResource]:
Expand All @@ -22,8 +19,8 @@ def salesforce_source(

Args:
username (str): The username for authentication.
password (str): The password for authentication.
token (str): The security token for authentication.
password (str): The password for authentication.
token (str): The security token for authentication.

Yields:
DltResource: Data resources from Salesforce.
Expand All @@ -44,7 +41,7 @@ def user_role() -> Iterable[TDataItem]:
def opportunity(
last_timestamp: incremental[str] = dlt.sources.incremental(
"SystemModstamp", initial_value=None
)
),
) -> Iterable[TDataItem]:
yield get_records(
client, "Opportunity", last_timestamp.last_value, "SystemModstamp"
Expand All @@ -54,7 +51,7 @@ def opportunity(
def opportunity_line_item(
last_timestamp: incremental[str] = dlt.sources.incremental(
"SystemModstamp", initial_value=None
)
),
) -> Iterable[TDataItem]:
yield get_records(
client, "OpportunityLineItem", last_timestamp.last_value, "SystemModstamp"
Expand All @@ -64,7 +61,7 @@ def opportunity_line_item(
def opportunity_contact_role(
last_timestamp: incremental[str] = dlt.sources.incremental(
"SystemModstamp", initial_value=None
)
),
) -> Iterable[TDataItem]:
yield get_records(
client,
Expand All @@ -77,7 +74,7 @@ def opportunity_contact_role(
def account(
last_timestamp: incremental[str] = dlt.sources.incremental(
"LastModifiedDate", initial_value=None
)
),
) -> Iterable[TDataItem]:
yield get_records(
client, "Account", last_timestamp.last_value, "LastModifiedDate"
Expand All @@ -99,7 +96,7 @@ def campaign() -> Iterable[TDataItem]:
def campaign_member(
last_timestamp: incremental[str] = dlt.sources.incremental(
"SystemModstamp", initial_value=None
)
),
) -> Iterable[TDataItem]:
yield get_records(
client, "CampaignMember", last_timestamp.last_value, "SystemModstamp"
Expand All @@ -121,15 +118,15 @@ def pricebook_entry() -> Iterable[TDataItem]:
def task(
last_timestamp: incremental[str] = dlt.sources.incremental(
"SystemModstamp", initial_value=None
)
),
) -> Iterable[TDataItem]:
yield get_records(client, "Task", last_timestamp.last_value, "SystemModstamp")

@dlt.resource(write_disposition="merge")
def event(
last_timestamp: incremental[str] = dlt.sources.incremental(
"SystemModstamp", initial_value=None
)
),
) -> Iterable[TDataItem]:
yield get_records(client, "Event", last_timestamp.last_value, "SystemModstamp")

Expand Down
8 changes: 4 additions & 4 deletions ingestr/src/salesforce/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Salesforce source helpers"""

import pendulum

from typing import Optional, Iterable
from typing import Iterable, Optional

from simple_salesforce import Salesforce
import pendulum
from dlt.common.typing import TDataItem
from simple_salesforce import Salesforce


def get_records(
sf: Salesforce,
Expand Down
15 changes: 8 additions & 7 deletions ingestr/src/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
)
from ingestr.src.mongodb import mongodb_collection
from ingestr.src.notion import notion_databases
from ingestr.src.salesforce import salesforce_source
from ingestr.src.shopify import shopify_source
from ingestr.src.slack import slack_source
from ingestr.src.sql_database.callbacks import (
Expand All @@ -100,7 +101,6 @@
ZendeskCredentialsOAuth,
ZendeskCredentialsToken,
)
from ingestr.src.salesforce import salesforce_source

TableBackend = Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"]
TQueryAdapter = Callable[[SelectAny, Table], SelectAny]
Expand Down Expand Up @@ -1835,10 +1835,11 @@ def dlt_source(self, uri: str, table: str, **kwargs):
application=application[0],
).with_resources(table)


class SalesforceSource:
def handles_incrementality(self) -> bool:
return True

def dlt_source(self, uri: str, table: str, **kwargs):
if kwargs.get("incremental_key"):
raise ValueError(
Expand All @@ -1853,11 +1854,11 @@ def dlt_source(self, uri: str, table: str, **kwargs):
}
for k, v in creds.items():
if v is None:
raise MissingValueError(k)
src = salesforce_source(**creds)
raise MissingValueError(k, "Salesforce")

src = salesforce_source(**creds) # type: ignore

if table not in src.resources:
raise UnsupportedResourceError(table, "Salesforce")
return src.with_resources(table)

return src.with_resources(table)