Skip to content

Commit

Permalink
Initial implementation of HTTP connector
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Apr 28, 2023
1 parent 662266a commit 6ab2dc8
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 19 deletions.
50 changes: 50 additions & 0 deletions singer_sdk/authenticators.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from requests.auth import AuthBase

from singer_sdk.helpers._util import utc_now

Expand Down Expand Up @@ -589,3 +590,52 @@ def oauth_request_payload(self) -> dict:
"RS256",
),
}


class NoopAuth(AuthBase):
"""No-op authenticator."""

def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest:
"""Do nothing.
Args:
r: The prepared request.
Returns:
The unmodified prepared request.
"""
return r


class HeaderAuth(AuthBase):
"""Header-based authenticator."""

def __init__(
self,
keyword: str,
value: str,
header: str = "Authorization",
) -> None:
"""Initialize the authenticator.
Args:
keyword: The keyword to use in the header, e.g. "Bearer".
value: The value to use in the header, e.g. "my-token".
header: The header to add the keyword and value to, defaults to
``"Authorization"``.
"""
self.keyword = keyword
self.value = value
self.header = header

def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest:
"""Add the header to the request.
Args:
r: The prepared request.
Returns:
The prepared request with the header added.
"""
r.headers[self.header] = f"{self.keyword} {self.value}"
return r
3 changes: 2 additions & 1 deletion singer_sdk/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from ._http import HTTPConnector
from .sql import SQLConnector

__all__ = ["SQLConnector"]
__all__ = ["HTTPConnector", "SQLConnector"]
112 changes: 112 additions & 0 deletions singer_sdk/connectors/_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""HTTP-based tap class for Singer SDK."""

from __future__ import annotations

import typing as t

import requests

from singer_sdk.authenticators import NoopAuth
from singer_sdk.connectors.base import BaseConnector

if t.TYPE_CHECKING:
import sys

from requests.adapters import BaseAdapter

if sys.version_info >= (3, 10):
from typing import TypeAlias # noqa: ICN003
else:
from typing_extensions import TypeAlias

_Auth: TypeAlias = t.Callable[[requests.PreparedRequest], requests.PreparedRequest]


class HTTPConnector(BaseConnector[requests.Session]):
"""Base class for all HTTP-based connectors."""

def __init__(self, config: t.Mapping[str, t.Any] | None) -> None:
"""Initialize the HTTP connector.
Args:
config: Connector configuration parameters.
"""
super().__init__(config)
self._session = self.get_session()
self.refresh_auth()

def get_connection(self, *, authenticate: bool = True) -> requests.Session:
"""Return a new HTTP session object.
Adds adapters and optionally authenticates the session.
Args:
authenticate: Whether to authenticate the request.
Returns:
A new HTTP session object.
"""
for prefix, adapter in self.adapters.items():
self._session.mount(prefix, adapter)

self._session.auth = self._auth if authenticate else None

return self._session

def get_session(self) -> requests.Session:
"""Return a new HTTP session object.
Returns:
A new HTTP session object.
"""
return requests.Session()

def get_authenticator(self) -> _Auth:
"""Authenticate the HTTP session.
Returns:
An auth callable.
"""
return NoopAuth()

def refresh_auth(self) -> None:
"""Refresh the HTTP session authentication."""
self._auth = self.get_authenticator()

@property
def adapters(self) -> dict[str, BaseAdapter]:
"""Return a mapping of URL prefixes to adapter objects.
Returns:
A mapping of URL prefixes to adapter objects.
"""
return {}

@property
def default_request_kwargs(self) -> dict[str, t.Any]:
"""Return default kwargs for HTTP requests.
Returns:
A mapping of default kwargs for HTTP requests.
"""
return {}

def request(
self,
*args: t.Any,
authenticate: bool = True,
**kwargs: t.Any,
) -> requests.Response:
"""Make an HTTP request.
Args:
*args: Positional arguments to pass to the request method.
authenticate: Whether to authenticate the request.
**kwargs: Keyword arguments to pass to the request method.
Returns:
The HTTP response object.
"""
with self._connect(authenticate=authenticate) as session:
kwargs = {**self.default_request_kwargs, **kwargs}
return session.request(*args, **kwargs)
67 changes: 67 additions & 0 deletions singer_sdk/connectors/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Base class for all connectors."""

from __future__ import annotations

import abc
import typing as t
from contextlib import contextmanager

_T = t.TypeVar("_T", covariant=True)


class ContextManagerProtocol(t.Protocol[_T]):
"""Protocol for context manager enter/exit."""

def __enter__(self) -> _T: # noqa: D105
...

def __exit__(self, *args: t.Any) -> None: # noqa: D105
...


_C = t.TypeVar("_C", bound=ContextManagerProtocol)


class BaseConnector(abc.ABC, t.Generic[_C]):
"""Base class for all connectors."""

def __init__(self, config: t.Mapping[str, t.Any] | None) -> None:
"""Initialize the connector.
Args:
config: Plugin configuration parameters.
"""
self._config = config or {}

@property
def config(self) -> t.Mapping:
"""Return the connector configuration.
Returns:
A mapping of configuration parameters.
"""
return self._config

@contextmanager
def _connect(self, *args: t.Any, **kwargs: t.Any) -> t.Generator[_C, None, None]:
"""Connect to the destination.
Args:
args: Positional arguments to pass to the connection method.
kwargs: Keyword arguments to pass to the connection method.
Yields:
A connection object.
"""
with self.get_connection(*args, **kwargs) as connection:
yield connection

@abc.abstractmethod
def get_connection(self, *args: t.Any, **kwargs: t.Any) -> _C:
"""Connect to the destination.
Args:
args: Positional arguments to pass to the connection method.
kwargs: Keyword arguments to pass to the connection method.
"""
...
37 changes: 19 additions & 18 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
import typing as t
import warnings
from contextlib import contextmanager
from datetime import datetime
from functools import lru_cache

Expand All @@ -14,13 +13,14 @@

from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.connectors.base import BaseConnector
from singer_sdk.exceptions import ConfigValidationError

if t.TYPE_CHECKING:
from sqlalchemy.engine.reflection import Inspector


class SQLConnector:
class SQLConnector(BaseConnector):
"""Base class for SQLAlchemy-based connectors.
The connector class serves as a wrapper around the SQL connection.
Expand All @@ -42,7 +42,7 @@ class SQLConnector:

def __init__(
self,
config: dict | None = None,
config: t.Mapping[str, t.Any] | None = None,
sqlalchemy_url: str | None = None,
) -> None:
"""Initialize the SQL connector.
Expand All @@ -51,18 +51,9 @@ def __init__(
config: The parent tap or target object's config.
sqlalchemy_url: Optional URL for the connection.
"""
self._config: dict[str, t.Any] = config or {}
super().__init__(config=config)
self._sqlalchemy_url: str | None = sqlalchemy_url or None

@property
def config(self) -> dict:
"""If set, provides access to the tap or target config.
Returns:
The settings as a dict.
"""
return self._config

@property
def logger(self) -> logging.Logger:
"""Get logger.
Expand All @@ -72,10 +63,20 @@ def logger(self) -> logging.Logger:
"""
return logging.getLogger("sqlconnector")

@contextmanager
def _connect(self) -> t.Iterator[sqlalchemy.engine.Connection]:
with self._engine.connect().execution_options(stream_results=True) as conn:
yield conn
def get_connection(
self,
*,
stream_results: bool = True,
) -> sqlalchemy.engine.Connection:
"""Return a new SQLAlchemy connection using the provided config.
Args:
stream_results: Whether to stream results from the database.
Returns:
A newly created SQLAlchemy connection object.
"""
return self._engine.connect().execution_options(stream_results=stream_results)

def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection:
"""(DEPRECATED) Return a new SQLAlchemy connection using the provided config.
Expand Down Expand Up @@ -155,7 +156,7 @@ def sqlalchemy_url(self) -> str:

return self._sqlalchemy_url

def get_sqlalchemy_url(self, config: dict[str, t.Any]) -> str:
def get_sqlalchemy_url(self, config: t.Mapping[str, t.Any]) -> str:
"""Return the SQLAlchemy URL string.
Developers can generally override just one of the following:
Expand Down
Empty file.
Loading

0 comments on commit 6ab2dc8

Please sign in to comment.