From 0241971051dfc90aa3b95f6caf2118068fba3bbf Mon Sep 17 00:00:00 2001 From: turtleDev Date: Mon, 10 Feb 2025 23:12:43 +0530 Subject: [PATCH 1/5] add salesforce source --- ingestr/src/salesforce/__init__.py | 152 +++++++++++++++++++++++++++++ ingestr/src/salesforce/helpers.py | 64 ++++++++++++ ingestr/src/sources.py | 30 +++++- requirements.txt | 1 + 4 files changed, 246 insertions(+), 1 deletion(-) create mode 100644 ingestr/src/salesforce/__init__.py create mode 100644 ingestr/src/salesforce/helpers.py diff --git a/ingestr/src/salesforce/__init__.py b/ingestr/src/salesforce/__init__.py new file mode 100644 index 00000000..5496ef74 --- /dev/null +++ b/ingestr/src/salesforce/__init__.py @@ -0,0 +1,152 @@ +from dlt.sources import DltResource +from dlt.sources import incremental + +from typing import Iterable + +import dlt +from simple_salesforce import Salesforce +from dlt.common.typing import TDataItem + + +from .helpers import get_records + + +@dlt.source(name="salesforce") +def salesforce_source( + username: str, + password: str, + token: str, +) -> Iterable[DltResource]: + """ + Retrieves data from Salesforce using the Salesforce API. + + Args: + username (str): The username for authentication. + password (str): The password for authentication. + token (str): The security token for authentication. + + Yields: + DltResource: Data resources from Salesforce. + """ + + client = Salesforce(username, password, token) + + # define resources + @dlt.resource(write_disposition="replace") + def sf_user() -> Iterable[TDataItem]: + yield get_records(client, "User") + + @dlt.resource(write_disposition="replace") + def user_role() -> Iterable[TDataItem]: + yield get_records(client, "UserRole") + + @dlt.resource(write_disposition="merge") + def opportunity( + last_timestamp: incremental[str] = dlt.sources.incremental( + "SystemModstamp", initial_value=None + ) + ) -> Iterable[TDataItem]: + yield get_records( + client, "Opportunity", last_timestamp.last_value, "SystemModstamp" + ) + + @dlt.resource(write_disposition="merge") + def opportunity_line_item( + last_timestamp: incremental[str] = dlt.sources.incremental( + "SystemModstamp", initial_value=None + ) + ) -> Iterable[TDataItem]: + yield get_records( + client, "OpportunityLineItem", last_timestamp.last_value, "SystemModstamp" + ) + + @dlt.resource(write_disposition="merge") + def opportunity_contact_role( + last_timestamp: incremental[str] = dlt.sources.incremental( + "SystemModstamp", initial_value=None + ) + ) -> Iterable[TDataItem]: + yield get_records( + client, + "OpportunityContactRole", + last_timestamp.last_value, + "SystemModstamp", + ) + + @dlt.resource(write_disposition="merge") + def account( + last_timestamp: incremental[str] = dlt.sources.incremental( + "LastModifiedDate", initial_value=None + ) + ) -> Iterable[TDataItem]: + yield get_records( + client, "Account", last_timestamp.last_value, "LastModifiedDate" + ) + + @dlt.resource(write_disposition="replace") + def contact() -> Iterable[TDataItem]: + yield get_records(client, "Contact") + + @dlt.resource(write_disposition="replace") + def lead() -> Iterable[TDataItem]: + yield get_records(client, "Lead") + + @dlt.resource(write_disposition="replace") + def campaign() -> Iterable[TDataItem]: + yield get_records(client, "Campaign") + + @dlt.resource(write_disposition="merge") + def campaign_member( + last_timestamp: incremental[str] = dlt.sources.incremental( + "SystemModstamp", initial_value=None + ) + ) -> Iterable[TDataItem]: + yield get_records( + client, "CampaignMember", last_timestamp.last_value, "SystemModstamp" + ) + + @dlt.resource(write_disposition="replace") + def product_2() -> Iterable[TDataItem]: + yield get_records(client, "Product2") + + @dlt.resource(write_disposition="replace") + def pricebook_2() -> Iterable[TDataItem]: + yield get_records(client, "Pricebook2") + + @dlt.resource(write_disposition="replace") + def pricebook_entry() -> Iterable[TDataItem]: + yield get_records(client, "PricebookEntry") + + @dlt.resource(write_disposition="merge") + def task( + last_timestamp: incremental[str] = dlt.sources.incremental( + "SystemModstamp", initial_value=None + ) + ) -> Iterable[TDataItem]: + yield get_records(client, "Task", last_timestamp.last_value, "SystemModstamp") + + @dlt.resource(write_disposition="merge") + def event( + last_timestamp: incremental[str] = dlt.sources.incremental( + "SystemModstamp", initial_value=None + ) + ) -> Iterable[TDataItem]: + yield get_records(client, "Event", last_timestamp.last_value, "SystemModstamp") + + return ( + sf_user, + user_role, + opportunity, + opportunity_line_item, + opportunity_contact_role, + account, + contact, + lead, + campaign, + campaign_member, + product_2, + pricebook_2, + pricebook_entry, + task, + event, + ) diff --git a/ingestr/src/salesforce/helpers.py b/ingestr/src/salesforce/helpers.py new file mode 100644 index 00000000..5cd8c4f8 --- /dev/null +++ b/ingestr/src/salesforce/helpers.py @@ -0,0 +1,64 @@ +"""Salesforce source helpers""" + +import pendulum + +from typing import Optional, Iterable + +from simple_salesforce import Salesforce +from dlt.common.typing import TDataItem + +def get_records( + sf: Salesforce, + sobject: str, + last_state: Optional[str] = None, + replication_key: Optional[str] = None, +) -> Iterable[TDataItem]: + """ + Retrieves records from Salesforce for a specified sObject. + + Args: + sf (Salesforce): An instance of the Salesforce API client. + sobject (str): The name of the sObject to retrieve records from. + last_state (str, optional): The last known state for incremental loading. Defaults to None. + replication_key (str, optional): The replication key for incremental loading. Defaults to None. + + Yields: + Dict[TDataItem]: A dictionary representing a record from the Salesforce sObject. + """ + + # Get all fields for the sobject + desc = getattr(sf, sobject).describe() + # Salesforce returns compound fields as separate fields, so we need to filter them out + compound_fields = { + f["compoundFieldName"] + for f in desc["fields"] + if f["compoundFieldName"] is not None + } - {"Name"} + # Salesforce returns datetime fields as timestamps, so we need to convert them + date_fields = { + f["name"] for f in desc["fields"] if f["type"] in ("datetime",) and f["name"] + } + # If no fields are specified, use all fields except compound fields + fields = [f["name"] for f in desc["fields"] if f["name"] not in compound_fields] + + # Generate a predicate to filter records by the replication key + predicate, order_by, n_records = "", "", 0 + if replication_key: + if last_state: + predicate = f"WHERE {replication_key} > {last_state}" + order_by = f"ORDER BY {replication_key} ASC" + query = f"SELECT {', '.join(fields)} FROM {sobject} {predicate} {order_by}" + + # Query all records in batches + for page in getattr(sf.bulk, sobject).query_all(query, lazy_operation=True): + for record in page: + # Strip out the attributes field + record.pop("attributes", None) + for field in date_fields: + # Convert Salesforce timestamps to ISO 8601 + if record.get(field): + record[field] = pendulum.from_timestamp( + record[field] / 1000, + ).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + yield from page + n_records += len(page) diff --git a/ingestr/src/sources.py b/ingestr/src/sources.py index ac04a87d..69ad1f7f 100644 --- a/ingestr/src/sources.py +++ b/ingestr/src/sources.py @@ -100,6 +100,7 @@ ZendeskCredentialsOAuth, ZendeskCredentialsToken, ) +from ingestr.src.salesforce import salesforce_source TableBackend = Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"] TQueryAdapter = Callable[[SelectAny, Table], SelectAny] @@ -1753,7 +1754,7 @@ def handles_incrementality(self) -> bool: def dlt_source(self, uri: str, table: str, **kwargs): if kwargs.get("incremental_key") is not None: raise ValueError( - "Google Ads takes care of incrementality on its own, you should not provide incremental_key" + "Applovin takes care of incrementality on its own, you should not provide incremental_key" ) parsed_uri = urlparse(uri) @@ -1833,3 +1834,30 @@ def dlt_source(self, uri: str, table: str, **kwargs): api_key=api_key[0], application=application[0], ).with_resources(table) + +class SalesforceSource: + def handles_incrementality(self) -> bool: + return True + + def dlt_source(self, uri: str, table: str, **kwargs): + if kwargs.get("incremental_key"): + raise ValueError( + "Salesforce takes care of incrementality on its own, you should not provide incremental_key" + ) + + params = parse_qs(urlparse(uri).query) + creds = { + "username": params.get("username", [None])[0], + "password": params.get("password", [None])[0], + "token": params.get("token", [None])[0], + } + for k, v in creds.items(): + if v is None: + raise MissingValueError(k) + + src = salesforce_source(**creds) + + if table not in src.resources: + raise UnsupportedResourceError(table, "Salesforce") + + return src.with_resources(table) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 62aa95df..8907e352 100644 --- a/requirements.txt +++ b/requirements.txt @@ -46,3 +46,4 @@ gcsfs==2024.10.0 clickhouse-connect==0.8.14 clickhouse-driver==0.2.9 clickhouse-sqlalchemy==0.2.7 +simple-salesforce==1.12.6 From 2d1457d1c02a2bbd97f8548b452ea4d2167aa976 Mon Sep 17 00:00:00 2001 From: turtleDev Date: Mon, 10 Feb 2025 23:14:38 +0530 Subject: [PATCH 2/5] factory: add support for salesforce --- ingestr/src/factory.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ingestr/src/factory.py b/ingestr/src/factory.py index 0ba85e45..2ac16763 100644 --- a/ingestr/src/factory.py +++ b/ingestr/src/factory.py @@ -48,6 +48,7 @@ StripeAnalyticsSource, TikTokSource, ZendeskSource, + SalesforceSource, ) SQL_SOURCE_SCHEMES = [ @@ -136,6 +137,7 @@ class SourceDestinationFactory: "linkedinads": LinkedInAdsSource, "applovin": AppLovinSource, "applovinmax": ApplovinMaxSource, + "salesforce": SalesforceSource, } destinations: Dict[str, Type[DestinationProtocol]] = { "bigquery": BigQueryDestination, From 435c851fe447f27d11687e30b16e16f6799ad82f Mon Sep 17 00:00:00 2001 From: turtleDev Date: Mon, 10 Feb 2025 23:45:10 +0530 Subject: [PATCH 3/5] docs: add salesforce --- docs/.vitepress/config.mjs | 1 + docs/supported-sources/salesforce.md | 68 ++++++++++++++++++++++++++++ ingestr/src/salesforce/__init__.py | 12 ++--- 3 files changed, 75 insertions(+), 6 deletions(-) create mode 100644 docs/supported-sources/salesforce.md diff --git a/docs/.vitepress/config.mjs b/docs/.vitepress/config.mjs index ccf9cec3..0346022d 100644 --- a/docs/.vitepress/config.mjs +++ b/docs/.vitepress/config.mjs @@ -116,6 +116,7 @@ export default defineConfig({ { text: "LinkedIn Ads", link: "/supported-sources/linkedin_ads.md" }, { text: "Notion", link: "/supported-sources/notion.md" }, { text: "S3", link: "/supported-sources/s3.md" }, + { text: "Salesforce", link: "/supported-sources/salesforce.md" }, { text: "Shopify", link: "/supported-sources/shopify.md" }, { text: "Slack", link: "/supported-sources/slack.md" }, { text: "Stripe", link: "/supported-sources/stripe.md" }, diff --git a/docs/supported-sources/salesforce.md b/docs/supported-sources/salesforce.md new file mode 100644 index 00000000..2468ff31 --- /dev/null +++ b/docs/supported-sources/salesforce.md @@ -0,0 +1,68 @@ +# Salesforce +[Salesforce](https://www.salesforce.com/) is a cloud-based customer relationship management (CRM) platform that helps businesses manage sales, customer interactions, and business processes. It provides tools for sales automation, customer service, marketing, analytics, and application development. + +Ingestr supports Salesforce as a source. + +## URI format + +The URI format for Salesforce is as follows: +``` +salesforce://?username=&password=&token= +``` + +URI parameters: +- `username` is your Salesforce account username. +- `password` is your Salesforce account password. +- `token` is your Salesforce security token. + +You can obtain your security token by logging into your Salesforce account and navigating to the user settings under "Reset My Security Token." + +## Setting up a Salesforce Integration + +You can obtain an OAuth access token by setting up a connected app in Salesforce and using OAuth 2.0 authentication. For more information, see [Salesforce API Authentication](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/quickstart_oauth.htm). + +## Example + +Let's say: +* Your Salesforce username is `user`. +* Your password is `password123`. +* Your security token is `fake_token`. +* You want to ingest `account` data from your salesforce account +* You want to save this data in a duckdb database `sf.db` under the table `public.account` + +You can run the following command to achieve this: +```sh +ingestr ingest \ + --source-uri "salesforce://?username=user&password=password123&token=fake_token" \ + --source-table "account" \ + --dest-uri "duckdb:///sf.db" \ + --dest-table "public.account" +``` + +## Tables + +Salesforce source allows ingesting the following objects into separate tables: + +| **Table** | **Mode** | **Description** | +|---------------------------|-----------|----------------| +| `user` | replace | Refers to an individual who has access to a Salesforce org or instance. | +| `user_role` | replace | A standard object that represents a role within the organization's hierarchy. | +| `opportunity` | merge | Represents a sales opportunity for a specific account or contact. | +| `opportunity_line_item` | merge | Represents individual line items or products associated with an Opportunity. | +| `opportunity_contact_role` | merge | Represents the association between an Opportunity and a Contact. | +| `account` | merge | Individual or organization that interacts with your business. | +| `contact` | replace | An individual person associated with an account or organization. | +| `lead` | replace | Prospective customer/individual/org. that has shown interest in a company's products/services. | +| `campaign` | replace | Marketing initiative or project designed to achieve specific goals, such as generating leads. | +| `campaign_member` | merge | Association between a Contact or Lead and a Campaign. | +| `product` | replace | For managing and organizing your product-related data within the Salesforce ecosystem. | +| `pricebook` | replace | Used to manage product pricing and create price books. | +| `pricebook_entry` | replace | Represents a specific price for a product in a price book. | +| `task` | merge | Used to track and manage various activities and tasks within the Salesforce platform. | +| `event` | merge | Used to track and manage calendar-based events, such as meetings, appointments, or calls. | + +Use these as `--source-table` parameters in the `ingestr ingest` command. + +> [!WARNING] +> Salesforce API limits may affect the frequency and volume of data ingestion. Incremental loading is supported for objects with the `SystemModstamp` field, but some objects may require full-refresh loads. This is indicated by `mode` in the tables above. Tables with mode `replace` don't support incremental loads, while the ones with `merge` do. + diff --git a/ingestr/src/salesforce/__init__.py b/ingestr/src/salesforce/__init__.py index 5496ef74..bb4b005a 100644 --- a/ingestr/src/salesforce/__init__.py +++ b/ingestr/src/salesforce/__init__.py @@ -33,7 +33,7 @@ def salesforce_source( # define resources @dlt.resource(write_disposition="replace") - def sf_user() -> Iterable[TDataItem]: + def user() -> Iterable[TDataItem]: yield get_records(client, "User") @dlt.resource(write_disposition="replace") @@ -106,11 +106,11 @@ def campaign_member( ) @dlt.resource(write_disposition="replace") - def product_2() -> Iterable[TDataItem]: + def product() -> Iterable[TDataItem]: yield get_records(client, "Product2") @dlt.resource(write_disposition="replace") - def pricebook_2() -> Iterable[TDataItem]: + def pricebook() -> Iterable[TDataItem]: yield get_records(client, "Pricebook2") @dlt.resource(write_disposition="replace") @@ -134,7 +134,7 @@ def event( yield get_records(client, "Event", last_timestamp.last_value, "SystemModstamp") return ( - sf_user, + user, user_role, opportunity, opportunity_line_item, @@ -144,8 +144,8 @@ def event( lead, campaign, campaign_member, - product_2, - pricebook_2, + product, + pricebook, pricebook_entry, task, event, From 35f3c97358db44f44e0d2a3d121476a65518f2cf Mon Sep 17 00:00:00 2001 From: turtleDev Date: Tue, 11 Feb 2025 17:55:14 +0530 Subject: [PATCH 4/5] chore: address lint errors --- ingestr/main_test.py | 15 ++++++++------- ingestr/src/applovin/__init__.py | 15 ++++++++------- ingestr/src/factory.py | 2 +- ingestr/src/salesforce/__init__.py | 27 ++++++++++++--------------- ingestr/src/salesforce/helpers.py | 8 ++++---- ingestr/src/sources.py | 15 ++++++++------- 6 files changed, 41 insertions(+), 41 deletions(-) diff --git a/ingestr/main_test.py b/ingestr/main_test.py index 9ac949e4..3b4f6563 100644 --- a/ingestr/main_test.py +++ b/ingestr/main_test.py @@ -2852,7 +2852,7 @@ def test_compound_table_name(dest_uri): ) assert result.exit_code == 0 assert_rows(dest_uri, dest_table, 6) - + def test_uri_precedence(dest_uri): """ When file glob is present in both URI and Source Table, @@ -2868,7 +2868,7 @@ def test_uri_precedence(dest_uri): dest_table = f"{schema_rand_prefix}.fs_{get_random_string(5)}" result = invoke_ingest_command( f"{protocol}://bucket/*.csv?{auth}", - "/path/to/file", # if this is used, it should result in an error + "/path/to/file", # if this is used, it should result in an error dest_uri, dest_table, ) @@ -2919,6 +2919,7 @@ def test_s3(dest, test_case): test_case(dest.start()) dest.stop() + def applovin_test_cases() -> Iterable[Callable]: def missing_api_key(): result = invoke_ingest_command( @@ -2929,12 +2930,13 @@ def missing_api_key(): ) assert result.exit_code != 0 assert has_exception(result.exception, MissingValueError) + def invalid_source_table(): result = invoke_ingest_command( "applovin://?api_key=123", "unknown-report", "duckdb:///out.db", - "public.unknown_report" + "public.unknown_report", ) assert result.exit_code != 0 assert has_exception(result.exception, UnsupportedResourceError) @@ -2944,8 +2946,7 @@ def invalid_source_table(): invalid_source_table, ] -@pytest.mark.parametrize( - "testcase", applovin_test_cases() -) + +@pytest.mark.parametrize("testcase", applovin_test_cases()) def test_applovin_source(testcase): - testcase() \ No newline at end of file + testcase() diff --git a/ingestr/src/applovin/__init__.py b/ingestr/src/applovin/__init__.py index 22f03bdb..f0e34589 100644 --- a/ingestr/src/applovin/__init__.py +++ b/ingestr/src/applovin/__init__.py @@ -1,10 +1,10 @@ -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta, timezone from enum import Enum from typing import Dict, List, Optional -from requests import Response import dlt from dlt.sources.rest_api import EndpointResource, RESTAPIConfig, rest_api_resources +from requests import Response class InvalidCustomReportError(Exception): @@ -13,9 +13,11 @@ def __init__(self): "Custom report should be in the format 'custom:{endpoint}:{report_type}:{dimensions}" ) + class ClientError(Exception): pass + TYPE_HINTS = { "application_is_hidden": {"data_type": "bool"}, "average_cpa": {"data_type": "double"}, @@ -119,7 +121,6 @@ def applovin_source( end_date: Optional[str], custom: Optional[str], ): - backfill = False if end_date is None: backfill = True @@ -127,7 +128,7 @@ def applovin_source( # use the greatest of yesterday and start_date end_date = max( datetime.now(timezone.utc) - timedelta(days=1), - datetime.fromisoformat(start_date).replace(tzinfo=timezone.utc) + datetime.fromisoformat(start_date).replace(tzinfo=timezone.utc), ).strftime("%Y-%m-%d") config: RESTAPIConfig = { @@ -157,7 +158,7 @@ def applovin_source( "paginator": "single_page", "response_actions": [ http_error_handler, - ] + ], }, }, "resources": [ @@ -177,8 +178,7 @@ def applovin_source( "advertiser-probabilistic-report", "probabilisticReport", exclude( - REPORT_SCHEMA[ReportType.ADVERTISER], - PROBABILISTIC_REPORT_EXCLUDE + REPORT_SCHEMA[ReportType.ADVERTISER], PROBABILISTIC_REPORT_EXCLUDE ), ReportType.ADVERTISER, ), @@ -256,6 +256,7 @@ def exclude(source: List[str], exclude_list: List[str]) -> List[str]: def build_type_hints(cols: List[str]) -> dict: return {col: TYPE_HINTS[col] for col in cols if col in TYPE_HINTS} + def http_error_handler(resp: Response): if not resp.ok: raise ClientError(f"HTTP Status {resp.status_code}: {resp.text}") diff --git a/ingestr/src/factory.py b/ingestr/src/factory.py index 2ac16763..bea50577 100644 --- a/ingestr/src/factory.py +++ b/ingestr/src/factory.py @@ -42,13 +42,13 @@ MongoDbSource, NotionSource, S3Source, + SalesforceSource, ShopifySource, SlackSource, SqlSource, StripeAnalyticsSource, TikTokSource, ZendeskSource, - SalesforceSource, ) SQL_SOURCE_SCHEMES = [ diff --git a/ingestr/src/salesforce/__init__.py b/ingestr/src/salesforce/__init__.py index bb4b005a..c2a8a5b2 100644 --- a/ingestr/src/salesforce/__init__.py +++ b/ingestr/src/salesforce/__init__.py @@ -1,19 +1,16 @@ -from dlt.sources import DltResource -from dlt.sources import incremental - from typing import Iterable import dlt -from simple_salesforce import Salesforce from dlt.common.typing import TDataItem - +from dlt.sources import DltResource, incremental +from simple_salesforce import Salesforce from .helpers import get_records @dlt.source(name="salesforce") def salesforce_source( - username: str, + username: str, password: str, token: str, ) -> Iterable[DltResource]: @@ -22,8 +19,8 @@ def salesforce_source( Args: username (str): The username for authentication. - password (str): The password for authentication. - token (str): The security token for authentication. + password (str): The password for authentication. + token (str): The security token for authentication. Yields: DltResource: Data resources from Salesforce. @@ -44,7 +41,7 @@ def user_role() -> Iterable[TDataItem]: def opportunity( last_timestamp: incremental[str] = dlt.sources.incremental( "SystemModstamp", initial_value=None - ) + ), ) -> Iterable[TDataItem]: yield get_records( client, "Opportunity", last_timestamp.last_value, "SystemModstamp" @@ -54,7 +51,7 @@ def opportunity( def opportunity_line_item( last_timestamp: incremental[str] = dlt.sources.incremental( "SystemModstamp", initial_value=None - ) + ), ) -> Iterable[TDataItem]: yield get_records( client, "OpportunityLineItem", last_timestamp.last_value, "SystemModstamp" @@ -64,7 +61,7 @@ def opportunity_line_item( def opportunity_contact_role( last_timestamp: incremental[str] = dlt.sources.incremental( "SystemModstamp", initial_value=None - ) + ), ) -> Iterable[TDataItem]: yield get_records( client, @@ -77,7 +74,7 @@ def opportunity_contact_role( def account( last_timestamp: incremental[str] = dlt.sources.incremental( "LastModifiedDate", initial_value=None - ) + ), ) -> Iterable[TDataItem]: yield get_records( client, "Account", last_timestamp.last_value, "LastModifiedDate" @@ -99,7 +96,7 @@ def campaign() -> Iterable[TDataItem]: def campaign_member( last_timestamp: incremental[str] = dlt.sources.incremental( "SystemModstamp", initial_value=None - ) + ), ) -> Iterable[TDataItem]: yield get_records( client, "CampaignMember", last_timestamp.last_value, "SystemModstamp" @@ -121,7 +118,7 @@ def pricebook_entry() -> Iterable[TDataItem]: def task( last_timestamp: incremental[str] = dlt.sources.incremental( "SystemModstamp", initial_value=None - ) + ), ) -> Iterable[TDataItem]: yield get_records(client, "Task", last_timestamp.last_value, "SystemModstamp") @@ -129,7 +126,7 @@ def task( def event( last_timestamp: incremental[str] = dlt.sources.incremental( "SystemModstamp", initial_value=None - ) + ), ) -> Iterable[TDataItem]: yield get_records(client, "Event", last_timestamp.last_value, "SystemModstamp") diff --git a/ingestr/src/salesforce/helpers.py b/ingestr/src/salesforce/helpers.py index 5cd8c4f8..27c8af7e 100644 --- a/ingestr/src/salesforce/helpers.py +++ b/ingestr/src/salesforce/helpers.py @@ -1,11 +1,11 @@ """Salesforce source helpers""" -import pendulum - -from typing import Optional, Iterable +from typing import Iterable, Optional -from simple_salesforce import Salesforce +import pendulum from dlt.common.typing import TDataItem +from simple_salesforce import Salesforce + def get_records( sf: Salesforce, diff --git a/ingestr/src/sources.py b/ingestr/src/sources.py index 69ad1f7f..091faf8f 100644 --- a/ingestr/src/sources.py +++ b/ingestr/src/sources.py @@ -83,6 +83,7 @@ ) from ingestr.src.mongodb import mongodb_collection from ingestr.src.notion import notion_databases +from ingestr.src.salesforce import salesforce_source from ingestr.src.shopify import shopify_source from ingestr.src.slack import slack_source from ingestr.src.sql_database.callbacks import ( @@ -100,7 +101,6 @@ ZendeskCredentialsOAuth, ZendeskCredentialsToken, ) -from ingestr.src.salesforce import salesforce_source TableBackend = Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"] TQueryAdapter = Callable[[SelectAny, Table], SelectAny] @@ -1835,10 +1835,11 @@ def dlt_source(self, uri: str, table: str, **kwargs): application=application[0], ).with_resources(table) + class SalesforceSource: def handles_incrementality(self) -> bool: return True - + def dlt_source(self, uri: str, table: str, **kwargs): if kwargs.get("incremental_key"): raise ValueError( @@ -1853,11 +1854,11 @@ def dlt_source(self, uri: str, table: str, **kwargs): } for k, v in creds.items(): if v is None: - raise MissingValueError(k) - - src = salesforce_source(**creds) + raise MissingValueError(k, "Salesforce") + + src = salesforce_source(**creds) # type: ignore if table not in src.resources: raise UnsupportedResourceError(table, "Salesforce") - - return src.with_resources(table) \ No newline at end of file + + return src.with_resources(table) From 7ca11e22fe7c2b693c13125f3d3b13cb335054ce Mon Sep 17 00:00:00 2001 From: turtleDev Date: Tue, 11 Feb 2025 17:56:25 +0530 Subject: [PATCH 5/5] readme: add salesforce to supported platforms --- README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8d4cf006..1a5bc3ab 100644 --- a/README.md +++ b/README.md @@ -238,11 +238,16 @@ Pull requests are welcome. However, please open an issue first to discuss what y ✅ - - + S3 ✅ - + + Salesforce + ✅ + - + Shopify ✅