Skip to main content

Ingesting Nulab Backlog Data Into Snowflake Tables: Part IV

This tutorial has four parts in total:

We will continue from where we left off in the part three of this tutorial.

It is time to give our stored procedure the freedom of accessing the internet. By default Snowflake does not allow stored procedures to access the internet. We must enable it by creating external access integration and attaching it to our stored procedure.

External Access Integration

External access integration is a Snowflake object. It gives stored procedures and user defined functions granular access to the internet. External access integration in itself is a container that contains network rules and secrets.

Network rules and secrets are building blocks for external access control. The ability to combine network rules (and secrets) makes external access control flexible and granular.

Network Rule

A network rule is a Snowflake object. It represents the external network’s location and restrictions for access. We can specify which domains, ip address or ip address ranges are allowed/disallowed, and whether the communication is ingress (coming from the external source) or egress (going to external source).

Let’s create our network rule:

code_sample_10.sql

CREATE NETWORK RULE external_access_network_rule_for_backlog
  TYPE = HOST_PORT
  MODE = EGRESS
  VALUE_LIST = ('*.backlog.com:0')
  COMMENT = 'Network rule for Backlog REST API endpoint';

In the domain we can use wildcard, *, for a subdomain. For example, instead of myorg.backlog.com, we can define *.backlog.com.

Domains can also include a port or range of ports. If we do not specify a port, it defaults to 443. To allow access to all ports, we can define the port as 0, for example: my-website.com:0.

Secret

Instead of hard-coding api keys, we can use Snowflake’s secrets feature. It is similar to GitHub Secrets or AWS secrets if you used either of them. Once a secret is created, we can access it from our stored procedure.

Snowflake requires us to attach the secret to the external access integration object if the external network, the network rule specifies, needs a password or api key for authentication.

code_sample_10.sql

CREATE OR REPLACE SECRET backlog_api_key
  TYPE = GENERIC_STRING
  SECRET_STRING = 'backlog_api_key_xyz';

Creating External Network Access

We have created network rule, and a secret; all the necessary components for external access integration. Finally, we can create an external access integration.

code_sample_10.sql

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION backlog_api_access_integration
  ALLOWED_NETWORK_RULES = (external_access_network_rule_for_backlog)
  ALLOWED_AUTHENTICATION_SECRETS = (backlog_api_key)
  ENABLED = true;

Fetching Data From Backlog

We have created external access integration. Now, we can finally access the internet. Let’s try it out.

code_sample_11.sql

Create or replace procedure fetch_data_from_backlog()
  returns varchar
  language python
  runtime_version = 3.13
  secrets = ('api_key' = backlog_api_key)
  external_access_integrations = (backlog_api_access_integration)
  packages = ('snowflake-snowpark-python','requests')
  handler = 'main'
as
$$
import requests
from _snowflake import get_generic_secret_string

def main(session):
    backlog_api_key = get_generic_secret_string('api_key')
    endpoint = "https://<my-org>.backlog.com/api/v2/space/activities"
    with requests.Session() as s:
        response = s.get(url=endpoint, params = {"apiKey": backlog_api_key, "count": 2})
        if response.status_code != 200:
            return response.text
        return response.json()
$$;

call fetch_data_from_backlog();

-- drop procedure fetch_data_from_backlog();

It is possible to attach more than one secret and external access integration to a stored procedure.

Inserting Fetched Data

Now, let’s insert backlog data into our table.

code_sample_12.sql

CREATE OR REPLACE PROCEDURE backup_backlog_data(table_name VARCHAR)
    RETURNS STRING NOT NULL
    LANGUAGE PYTHON
    RUNTIME_VERSION = '3.13'
    PACKAGES = ('snowflake-snowpark-python', 'snowflake.core', 'requests')
    HANDLER = 'main' -- entry point to our handler
    EXTERNAL_ACCESS_INTEGRATIONS= (backlog_api_access_integration)
    SECRETS = ('backlog_key'= backlog_api_key)
AS
$$
from dataclasses import dataclass, asdict
from types import NoneType, UnionType
from typing import (
    Annotated,
    Any,
    Literal,
    NoReturn,
    TypedDict,
    Union,
    get_args,
    get_origin,
    get_type_hints,
)

from snowflake.snowpark.types import StructType
from _snowflake import get_generic_secret_string

# insert data into table

@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
    activity_id: int
    type_id: int
    activity_type: str
    project_id: int
    project_key: str
    project_name: str
    content: dict[str, Any]
    creator_id: int
    creator_name: str
    creator_email_address: str | None
    creator_nulab_account_id: str
    creator_nulab_unique_id: str
    created_at: Annotated[str, "timestamp_ntz"]


# ----------- fetch backlog data --------------


@dataclass(frozen=True)
class BacklogApi:
    key = get_generic_secret_string("backlog_key")
    subdomain = "subdomain"
    base_url = f"https://{subdomain}.backlog.com/api/v2"
    recent_activities: str = f"{base_url}/space/activities"
    issues: str = f"{base_url}/issues"


def parse_activity_type(*, activity_type_id: int) -> str:
    activity = {
        1: "Issue Created",
        2: "Issue Updated",
        3: "Issue Commented",
        4: "Issue Deleted",
        5: "Wiki Created",
        6: "Wiki Updated",
        7: "Wiki Deleted",
        8: "File Added",
        9: "File Updated",
        10: "File Deleted",
        11: "SVN Committed",
        12: "Git Pushed",
        13: "Git Repository Created",
        14: "Issue Multi Updated",
        15: "Project User Added",
        16: "Project User Deleted",
        17: "Comment Notification Added",
        18: "Pull Request Added",
        19: "Pull Request Updated",
        20: "Comment Added on Pull Request",
        21: "Pull Request Deleted",
        22: "Milestone Created",
        23: "Milestone Updated",
        24: "Milestone Deleted",
        25: "Project Group Added",
        26: "Project Group Deleted",
    }
    return activity.get(activity_type_id, "Unknown activity_type_id")


def fetch_backlog_data() -> list[BacklogActivity]:
    endpoints = BacklogApi()
    with requests.Session() as session:
        url_parameters: dict[str, str | int] = {
            "apiKey": endpoints.key,
            "count": 10,
            "order": "desc",
        }

        response = session.get(
            url=endpoints.recent_activities, params=url_parameters, verify=True
        )
        if response.status_code != 200:
            raise ValueError(
                f"wrong response: {response.status_code=}\n{response.text=}"
            )

        backlog_data: list[BacklogActivity] = []
        for data in response.json():
            activity_creator = data.get("createdUser")
            backlog_activity = BacklogActivity(
                activity_id=data["id"],
                activity_type=parse_activity_type(activity_type_id=data["type"]),
                activity_type_id=data["type"],
                project_id=data["project"]["id"],
                project_key=data["project"]["projectKey"],
                project_name=data["project"]["name"],
                content=data["content"],
                created_at=data["created"],
                creator_id=activity_creator["id"],
                creator_name=activity_creator["name"],
                creator_email_address=activity_creator["mailAddress"],
                creator_nulab_account_id=activity_creator["nulabAccount"]["nulabId"],
                creator_nulab_unique_id=activity_creator["nulabAccount"]["uniqueId"],
            )

            backlog_data.append(backlog_activity)
        return backlog_data


# ---- Create table definition from dataclass metadata -------


class ColumnDefinition(TypedDict):
    name: str
    type: str
    nullable: bool


class ClassParser:
    """
    Parses the class attributes that use type hinting and generates column definitions compatible with Snowflake Snowpark API.
    If class attribute does not use type hinting, it is omitted from generated column definitions.
    """

    def __init__(self):
        pass

    @property
    def snowflake_supported_types(self):
        return {
            "string",
            "integer",
            "float",
            "decimal",
            "double",
            "short",
            "long",
            "boolean",
            "variant",
            "timestamp",
            "timestamp_tz",
            "timestamp_ltz",
            "timestamp_ntz",
            # There are more supported types
        }

    @snowflake_supported_types.setter
    def snowflake_supported_types(self, value: Any) -> NoReturn:
        raise ValueError("Assigning value is not allowed")

    def parse_class(self, _class: object) -> dict[str, list[ColumnDefinition]]:
        """
        takes a class, not a class instance and returns a dictionary of column definition inferred from class attributes.

        class fields cannot contain more than two type hints.
        If a field contains two type hints, one of the types must be NoneType.

        To specify Snowflake specific field such as TIMESTAMP_NTZ, use Annotated type hints
        e.g.

        /```python
        class A:
            field1: Annotated[str, "timestamp_ntz"]
        /```

        """
        columns: list[ColumnDefinition] = []
        for fieldname, type_hint in get_type_hints(_class, include_extras=True).items():
            result = self._parse_type_hint(type_hint)
            datatype, nullable = result
            columns.append(
                ColumnDefinition(name=fieldname, type=datatype, nullable=nullable)
            )
        return {"fields": columns}

    def _parse_type_hint(self, type_hint: object | UnionType) -> tuple[str, bool]:
        """
        returns a tuple of (datatype, nullable): tuple[str, bool]
        """
        datatype = ""
        nullable = False

        if res := self._parse_annotated_type(type_hint):
            if res["snowflake_type"]:
                datatype = res["snowflake_type"]
                nullable = True if self._parse_union_type(res["origin_type"]) else False
                return (datatype, nullable)
            type_hint = res["origin_type"]

        # This supports generic types, Callable, Tuple, Union, Literal, Final, ClassVar,
        # Annotated, and others. Return None for unsupported types.
        # get_origin never returns Optional, instead it returns Union
        complex_type = get_origin(type_hint)
        if res := self._parse_union_type(type_hint):
            main_type, nullable = res[0], res[1]
            type_hint = main_type  # main type is any type hint except for None
            complex_type = get_origin(main_type)
        if complex_type is Literal:
            atomic_type = get_args(type_hint)[0]
            datatype = self._translate_atomic_type(atomic_type)
        if complex_type in [dict, list, TypedDict, tuple]:
            datatype = "variant"
        if not complex_type:
            datatype = self._translate_atomic_type(type_hint)
        return (datatype, nullable)

    def _parse_annotated_type(self, type_hint: Any) -> dict[str, Any] | None:
        """
        returns dict of snowflake_type:str and origin_type: Any

        returns None if given type_hint is not Annotated

        If Annotated does not contain Snowflake data types,
        it returns `{"snowflake_type": "", "original_type": origin_type}`
        """
        if get_origin(type_hint) is not Annotated:
            return

        metadata = [
            metadata.strip().lower()
            for metadata in type_hint.__metadata__
            if isinstance(metadata, str)
        ]
        snow_types = self.snowflake_supported_types.intersection(metadata)

        if len(snow_types) > 1:
            raise ValueError(
                "Annotated cannot contain more than one snowflake data type"
            )

        snowflake_type = snow_types.pop() if snow_types else ""
        origin_type = type_hint.__origin__
        return {"snowflake_type": snowflake_type, "origin_type": origin_type}

    def _parse_union_type(self, type_hint: Any) -> tuple[object, bool] | None:
        """
        returns None if given type_hint is not UnionType

        UnionType must contain only two types and one of these types must be NoneType
        """
        types = get_args(type_hint)
        nullable = True
        if get_origin(type_hint) not in [Union, UnionType] or len(types) < 2:
            return
        if len(types) > 2:
            raise ValueError("Union type cannot contain more than two types")
        if NoneType not in types:
            raise ValueError("Union type must include NoneType")
        main_type = types[0] if types[0] is not NoneType else types[1]

        return (main_type, nullable)

    def _translate_atomic_type(self, atomic_type: object) -> str:
        snowflake_type = ""
        if atomic_type is str or isinstance(atomic_type, str):
            snowflake_type = "string"
        if atomic_type is int or isinstance(atomic_type, int):
            snowflake_type = "integer"
        if atomic_type is float or isinstance(atomic_type, float):
            snowflake_type = "float"
        if atomic_type is bool or isinstance(atomic_type, bool):
            snowflake_type = "boolean"
        if not snowflake_type:
            raise RuntimeError(
                f"{atomic_type=} is not defined in this method thus can't be translated"
            )
        return snowflake_type


def create_table_definition(_class: object) -> StructType:
    parser = ClassParser()
    column_definitions = parser.parse_class(_class)
    table_definition: StructType = StructType.from_json(column_definitions)
    return table_definition


def create_or_replace_table(session, table_name: str, table_definition: StructType):
    dataframe = session.create_dataframe([], schema=table_definition)
    if session.catalog.tableExists(table_name):
        dataframe.write.mode("overwrite").save_as_table(table_name)
        return
    # creates table if the table does not already exists.
    # otherwise, simple ignores this operation
    dataframe.write.mode("ignore").save_as_table(table_name)


def create_table(session, table_name: str, table_definition: StructType):
    dataframe = session.create_dataframe([], schema=table_definition)
    # Throws an exception if the table already exists.
    dataframe.write.mode("errorifexists").save_as_table(table_name)


def insert_data(
    session, table_name: str, table_definition: StructType, data: list[list[Any]]
):
    dataframe = session.create_dataframe(data, schema=table_definition)
    dataframe.write.mode("append").save_as_table(table_name)


def main(snowflake_session, table_name: str):
    session = snowflake_session
    table_definition = create_table_definition(BacklogActivity)

    if not session.catalog.tableExists(table_name):
        create_table(session, table_name, table_definition)

    backlog_data = fetch_backlog_data()
    values = [list(asdict(data).values()) for data in backlog_data]
    insert_data(session, table_name, table_definition, values)
    return f"data has been inserted into {table_name}"
$$;

call backup_backlog_data('backlog_data');

-- select * from backlog_data;

Fetching Fresh Data

Now we can fetch data from backlog and insert it into our table. Based on how frequently we run the stored procedure, there is a possibility of fetching already recorded data. To avoid that, we should get the latest recorded activity_id from the table, and provide it as min_id parameter to the Backlog API.

Getting the last activity_id record from Snowflake table:

code_sample_13.sql

CREATE OR REPLACE PROCEDURE get_latest_record(table_name VARCHAR, column_name VARCHAR)
RETURNS INTEGER
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core')
HANDLER = 'main' -- entry point to our handler
AS
$$
from typing import Any
import snowflake.snowpark.functions as f


def main(session, table_name: str, column_name: str) -> int | None:
    column_name = column_name.strip().upper()
    dframe_backlog_data = session.table(table_name)
    if column_name not in dframe_backlog_data.columns:
        raise ValueError(
            f"{column_name} is not defined in the given table {table_name}"
        )
    column_alias = f"LAST_{column_name}"
    result: list[Any] = dframe_backlog_data.agg(
        f.max(column_name).alias(column_alias)
    ).collect()
    # returns none if there are no records
    return result[0][column_alias]

$$;

call get_latest_record('backlog_data', 'activity_id');

-- drop procedure get_latest_record(varchar, varchar);

session.table(name=table_name) returns dataframe that points to underlying Snowflake table.

snowpark.functions module provides SQL functions like aggregation, sorting and other utilities that generate Column expressions you can pass to DataFrame transformation methods.

Putting Everything Together

Now we can put everything together.

code_sample_14.sql

CREATE OR REPLACE PROCEDURE backup_backlog_data(table_name VARCHAR)
RETURNS STRING NOT NULL
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core', 'requests')
HANDLER = 'main' -- entry point to our handler
EXTERNAL_ACCESS_INTEGRATIONS= (backlog_api_access_integration)
SECRETS = ('backlog_key'= backlog_api_key)
AS
$$
from dataclasses import dataclass, asdict
from types import NoneType, UnionType
from typing import (
    Annotated,
    Any,
    Literal,
    NoReturn,
    TypedDict,
    Union,
    get_args,
    get_origin,
    get_type_hints,
)

import requests
from _snowflake import get_generic_secret_string
from snowflake.snowpark.types import StructType
import snowflake.snowpark.functions as f

# insert data into table


@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
    activity_id: int
    activity_type_id: int
    activity_type: str
    project_id: int
    project_key: str
    project_name: str
    content: dict[str, Any]
    creator_id: int
    creator_name: str
    creator_email_address: str | None
    creator_nulab_account_id: str
    creator_nulab_unique_id: str
    created_at: Annotated[str, "timestamp_ntz"]


# ----------- fetch backlog data --------------

@dataclass(frozen=True)
class BacklogApi:
    key = get_generic_secret_string("backlog_key")
    subdomain = "subdomain"
    base_url = f"https://my-org.backlog.com/api/v2"
    recent_activities: str = f"{base_url}/space/activities"
    issues: str = f"{base_url}/issues"


def parse_activity_type(*, activity_type_id: int) -> str:
    activity = {
        1: "Issue Created",
        2: "Issue Updated",
        3: "Issue Commented",
        4: "Issue Deleted",
        5: "Wiki Created",
        6: "Wiki Updated",
        7: "Wiki Deleted",
        8: "File Added",
        9: "File Updated",
        10: "File Deleted",
        11: "SVN Committed",
        12: "Git Pushed",
        13: "Git Repository Created",
        14: "Issue Multi Updated",
        15: "Project User Added",
        16: "Project User Deleted",
        17: "Comment Notification Added",
        18: "Pull Request Added",
        19: "Pull Request Updated",
        20: "Comment Added on Pull Request",
        21: "Pull Request Deleted",
        22: "Milestone Created",
        23: "Milestone Updated",
        24: "Milestone Deleted",
        25: "Project Group Added",
        26: "Project Group Deleted",
    }
    return activity.get(activity_type_id, "Unknown activity_type_id")


def fetch_backlog_data(last_activity_id: int) -> list[BacklogActivity]:
    endpoints = BacklogApi()
    with requests.Session() as session:
        url_parameters: dict[str, str | int] = {
            "apiKey": endpoints.key,
            "count": 100,
            "minId": last_activity_id,
            "order": "desc"
        }

        response = session.get(
            url=endpoints.recent_activities, params=url_parameters, verify=True
        )
        if response.status_code != 200:
            raise ValueError(
                f"wrong response: {response.status_code=}\n{response.text=}"
            )

        backlog_data: list[BacklogActivity] = []
        for data in response.json():
            activity_creator = data.get("createdUser")
            backlog_activity = BacklogActivity(
                activity_id=data["id"],
                activity_type_id=data["type"],
                activity_type=parse_activity_type(activity_type_id=data["type"]),
                project_id=data["project"]["id"],
                project_key=data["project"]["projectKey"],
                project_name=data["project"]["name"],
                content=data["content"],
                created_at=data["created"],
                creator_id=activity_creator["id"],
                creator_name=activity_creator["name"],
                creator_email_address=activity_creator["mailAddress"],
                creator_nulab_account_id=activity_creator["nulabAccount"]["nulabId"],
                creator_nulab_unique_id=activity_creator["nulabAccount"]["uniqueId"],
            )

            backlog_data.append(backlog_activity)
        return backlog_data


# ---- Create table definition from dataclass metadata -------


class ColumnDefinition(TypedDict):
    name: str
    type: str
    nullable: bool


class ClassParser:
    """
    Parses the class attributes that use type hinting and generates column definitions compatible with Snowflake Snowpark API.
    If class attribute does not use type hinting, it is omitted from generated column definitions.
    """

    def __init__(self):
        pass

    @property
    def snowflake_supported_types(self):
        return {
            "string",
            "integer",
            "float",
            "decimal",
            "double",
            "short",
            "long",
            "boolean",
            "variant",
            "timestamp",
            "timestamp_tz",
            "timestamp_ltz",
            "timestamp_ntz",
            # There are more supported types
        }

    @snowflake_supported_types.setter
    def snowflake_supported_types(self, value: Any) -> NoReturn:
        raise ValueError("Assigning value is not allowed")

    def parse_class(self, _class: object) -> dict[str, list[ColumnDefinition]]:
        """
        takes a class, not a class instance and returns a dictionary of column definition inferred from class attributes.

        class fields cannot contain more than two type hints.
        If a field contains two type hints, one of the types must be NoneType.

        To specify Snowflake specific field such as TIMESTAMP_NTZ, use Annotated type hints
        e.g.

        /```python
        class A:
            field1: Annotated[str, "timestamp_ntz"]
        /```

        """
        columns: list[ColumnDefinition] = []
        for fieldname, type_hint in get_type_hints(_class, include_extras=True).items():
            result = self._parse_type_hint(type_hint)
            datatype, nullable = result
            columns.append(
                ColumnDefinition(name=fieldname, type=datatype, nullable=nullable)
            )
        return {"fields": columns}

    def _parse_type_hint(self, type_hint: object | UnionType) -> tuple[str, bool]:
        """
        returns a tuple of (datatype, nullable): tuple[str, bool]
        """
        datatype = ""
        nullable = False

        if res := self._parse_annotated_type(type_hint):
            if res["snowflake_type"]:
                datatype = res["snowflake_type"]
                nullable = True if self._parse_union_type(res["origin_type"]) else False
                return (datatype, nullable)
            type_hint = res["origin_type"]

        # This supports generic types, Callable, Tuple, Union, Literal, Final, ClassVar,
        # Annotated, and others. Return None for unsupported types.
        # get_origin never returns Optional, instead it returns Union
        complex_type = get_origin(type_hint)
        if res := self._parse_union_type(type_hint):
            main_type, nullable = res[0], res[1]
            type_hint = main_type  # main type is any type hint except for None
            complex_type = get_origin(main_type)
        if complex_type is Literal:
            atomic_type = get_args(type_hint)[0]
            datatype = self._translate_atomic_type(atomic_type)
        if complex_type in [dict, list, TypedDict, tuple]:
            datatype = "variant"
        if not complex_type:
            datatype = self._translate_atomic_type(type_hint)
        return (datatype, nullable)

    def _parse_annotated_type(self, type_hint: Any) -> dict[str, Any] | None:
        """
        returns dict of snowflake_type:str and origin_type: Any

        returns None if given type_hint is not Annotated

        If Annotated does not contain Snowflake data types,
        it returns `{"snowflake_type": "", "original_type": origin_type}`
        """
        if get_origin(type_hint) is not Annotated:
            return

        metadata = [
            metadata.strip().lower()
            for metadata in type_hint.__metadata__
            if isinstance(metadata, str)
        ]
        snow_types = self.snowflake_supported_types.intersection(metadata)

        if len(snow_types) > 1:
            raise ValueError(
                "Annotated cannot contain more than one snowflake data type"
            )

        snowflake_type = snow_types.pop() if snow_types else ""
        origin_type = type_hint.__origin__
        return {"snowflake_type": snowflake_type, "origin_type": origin_type}

    def _parse_union_type(self, type_hint: Any) -> tuple[object, bool] | None:
        """
        returns None if given type_hint is not UnionType

        UnionType must contain only two types and one of these types must be NoneType
        """
        types = get_args(type_hint)
        nullable = True
        if get_origin(type_hint) not in [Union, UnionType] or len(types) < 2:
            return
        if len(types) > 2:
            raise ValueError("Union type cannot contain more than two types")
        if NoneType not in types:
            raise ValueError("Union type must include NoneType")
        main_type = types[0] if types[0] is not NoneType else types[1]

        return (main_type, nullable)

    def _translate_atomic_type(self, atomic_type: object) -> str:
        snowflake_type = ""
        if atomic_type is str or isinstance(atomic_type, str):
            snowflake_type = "string"
        if atomic_type is int or isinstance(atomic_type, int):
            snowflake_type = "integer"
        if atomic_type is float or isinstance(atomic_type, float):
            snowflake_type = "float"
        if atomic_type is bool or isinstance(atomic_type, bool):
            snowflake_type = "boolean"
        if not snowflake_type:
            raise RuntimeError(
                f"{atomic_type=} is not defined in this method thus can't be translated"
            )
        return snowflake_type


def create_table_definition(_class: object) -> StructType:
    parser = ClassParser()
    column_definitions = parser.parse_class(_class)
    table_definition: StructType = StructType.from_json(column_definitions)
    return table_definition


def create_or_replace_table(session, table_name: str, table_definition: StructType):
    dataframe = session.create_dataframe([], schema=table_definition)
    if session.catalog.tableExists(table_name):
        dataframe.write.mode("overwrite").save_as_table(table_name)
        return
    # creates table if the table does not already exists.
    # otherwise, simple ignores this operation
    dataframe.write.mode("ignore").save_as_table(table_name)


def create_table(session, table_name: str, table_definition: StructType):
    dataframe = session.create_dataframe([], schema=table_definition)
    # Throws an exception if the table already exists.
    dataframe.write.mode("errorifexists").save_as_table(table_name)


def insert_data(
    session, table_name: str, table_definition: StructType, data: list[list[Any]]
):
    dataframe = session.create_dataframe(data, schema=table_definition)
    dataframe.write.mode("append").save_as_table(table_name)

def get_last_recorded_value(session, table_name: str, column_name: str) -> int | None:
    column_name = column_name.strip().upper()
    dframe_backlog_data = session.table(table_name)
    if column_name not in dframe_backlog_data.columns:
        raise ValueError(
            f"{column_name} is not defined in the given table {table_name}"
        )
    column_alias = f"LAST_{column_name}"
    result: list[Any] = dframe_backlog_data.agg(
        f.max(column_name).alias(column_alias)
    ).collect()
    # returns none if there are not records
    return result[0][column_alias]


def main(snowflake_session, table_name: str):
    session = snowflake_session
    table_definition = create_table_definition(BacklogActivity)

    if not session.catalog.tableExists(table_name):
        create_table(session, table_name, table_definition)

    last_activity_id = get_last_recorded_value(session, table_name, "activity_id") or 0
    count = 0
    api_requests = 0
    while backlog_data := fetch_backlog_data(last_activity_id):
        values = [list(asdict(data).values()) for data in backlog_data]
        insert_data(session, table_name, table_definition, values)
        last_activity_id = backlog_data[0].activity_id
        count += len(values)
        api_requests += 1
    return f"{count} rows of data have been inserted into {table_name} in {api_requests} api_requests"
$$;

call backup_backlog_data('backlog_data');

select * from backlog_data;

If you think that our stored procedure became too long, feel free to divide it up into multiple stored procedures, user defined functions, or even models (stored procedure can use script files stored in stages).

If you split it into multiple stored procedures, you can call another stored procedure withing a stored procedure using session.call() function.

Scheduling Stored Procedure Execution

Our stored procedure is complete. Now, we can run it at anytime we want. But scheduling its execution is a much better idea. We can do so using Snowflake Tasks.

Tasks definition:

Tasks can run at scheduled times or can be triggered by events, such as when new data arrives in a stream. Tasks can run SQL commands and stored procedures.

When we create a task, it will have “suspended” status by default. What it means is, the task is defined and ready to run but it won’t run until we specify so. In other words, the task is waiting for us to press the start button. We start the task by putting it into the “resume” status.

Creating and starting a task:

code_sample_15.sql

CREATE TASK backup_backlog_data
  SCHEDULE='60 MINUTES'
  SERVERLESS_TASK_MAX_STATEMENT_SIZE='LARGE'
  SUSPEND_TASK_AFTER_NUM_FAILURES = 1
  AS
    call backup_backlog_data('backlog_data');

ALTER TASK backup_backlog_data resume; -- starts the task

Complete Solution

complete_solution.sql

CREATE OR REPLACE SECRET backlog_api_key
  TYPE = GENERIC_STRING
  SECRET_STRING = 'backlog_api_key_xyz';

GRANT READ ON SECRET backlog_api_key TO ROLE <my_role>;
-- -------------------------------------------
USE ROLE SYSADMIN;
CREATE NETWORK RULE external_access_network_rule_for_backlog
TYPE = HOST_PORT
MODE = EGRESS
VALUE_LIST = ('*.backlog.com:0')
COMMENT = 'Network rule for Backlog REST API endpoint';
-- -------------------------------------------
USE ROLE ACCOUNTADMIN;
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION backlog_api_access_integration
ALLOWED_NETWORK_RULES = (external_access_network_rule_for_backlog)
ALLOWED_AUTHENTICATION_SECRETS = (backlog_api_key)
ENABLED = true;

GRANT USAGE ON INTEGRATION backlog_api_access_integration TO ROLE <my_role>;

-------------------------------------------
CREATE OR REPLACE PROCEDURE backup_backlog_data(table_name VARCHAR)
RETURNS STRING NOT NULL
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core', 'requests')
HANDLER = 'main' -- entry point to our handler
EXTERNAL_ACCESS_INTEGRATIONS= (backlog_api_access_integration)
SECRETS = ('backlog_key'= backlog_api_key)
AS
$$
from dataclasses import dataclass, asdict
from types import NoneType, UnionType
from typing import (
    Annotated,
    Any,
    Literal,
    NoReturn,
    TypedDict,
    Union,
    get_args,
    get_origin,
    get_type_hints,
)

import requests
from _snowflake import get_generic_secret_string
from snowflake.snowpark.types import StructType
import snowflake.snowpark.functions as f

# insert data into table


@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
    activity_id: int
    activity_type_id: int
    activity_type: str
    project_id: int
    project_key: str
    project_name: str
    content: dict[str, Any]
    creator_id: int
    creator_name: str
    creator_email_address: str | None
    creator_nulab_account_id: str
    creator_nulab_unique_id: str
    created_at: Annotated[str, "timestamp_ntz"]


# ----------- fetch backlog data --------------

@dataclass(frozen=True)
class BacklogApi:
    key = get_generic_secret_string("backlog_key")
    subdomain = "subdomain"
    base_url = f"https://my-org.backlog.com/api/v2"
    recent_activities: str = f"{base_url}/space/activities"
    issues: str = f"{base_url}/issues"


def parse_activity_type(*, activity_type_id: int) -> str:
    activity = {
        1: "Issue Created",
        2: "Issue Updated",
        3: "Issue Commented",
        4: "Issue Deleted",
        5: "Wiki Created",
        6: "Wiki Updated",
        7: "Wiki Deleted",
        8: "File Added",
        9: "File Updated",
        10: "File Deleted",
        11: "SVN Committed",
        12: "Git Pushed",
        13: "Git Repository Created",
        14: "Issue Multi Updated",
        15: "Project User Added",
        16: "Project User Deleted",
        17: "Comment Notification Added",
        18: "Pull Request Added",
        19: "Pull Request Updated",
        20: "Comment Added on Pull Request",
        21: "Pull Request Deleted",
        22: "Milestone Created",
        23: "Milestone Updated",
        24: "Milestone Deleted",
        25: "Project Group Added",
        26: "Project Group Deleted",
    }
    return activity.get(activity_type_id, "Unknown activity_type_id")


def fetch_backlog_data(last_activity_id: int) -> list[BacklogActivity]:
    endpoints = BacklogApi()
    with requests.Session() as session:
        url_parameters: dict[str, str | int] = {
            "apiKey": endpoints.key,
            "count": 100,
            "minId": last_activity_id,
            "order": "desc"
        }

        response = session.get(
            url=endpoints.recent_activities, params=url_parameters, verify=True
        )
        if response.status_code != 200:
            raise ValueError(
                f"wrong response: {response.status_code=}\n{response.text=}"
            )

        backlog_data: list[BacklogActivity] = []
        for data in response.json():
            activity_creator = data.get("createdUser")
            backlog_activity = BacklogActivity(
                activity_id=data["id"],
                activity_type_id=data["type"],
                activity_type=parse_activity_type(activity_type_id=data["type"]),
                project_id=data["project"]["id"],
                project_key=data["project"]["projectKey"],
                project_name=data["project"]["name"],
                content=data["content"],
                created_at=data["created"],
                creator_id=activity_creator["id"],
                creator_name=activity_creator["name"],
                creator_email_address=activity_creator["mailAddress"],
                creator_nulab_account_id=activity_creator["nulabAccount"]["nulabId"],
                creator_nulab_unique_id=activity_creator["nulabAccount"]["uniqueId"],
            )

            backlog_data.append(backlog_activity)
        return backlog_data


# ---- Create table definition from dataclass metadata -------


class ColumnDefinition(TypedDict):
    name: str
    type: str
    nullable: bool


class ClassParser:
    """
    Parses the class attributes that use type hinting and generates column definitions compatible with Snowflake Snowpark API.
    If class attribute does not use type hinting, it is omitted from generated column definitions.
    """

    def __init__(self):
        pass

    @property
    def snowflake_supported_types(self):
        return {
            "string",
            "integer",
            "float",
            "decimal",
            "double",
            "short",
            "long",
            "boolean",
            "variant",
            "timestamp",
            "timestamp_tz",
            "timestamp_ltz",
            "timestamp_ntz",
            # There are more supported types
        }

    @snowflake_supported_types.setter
    def snowflake_supported_types(self, value: Any) -> NoReturn:
        raise ValueError("Assigning value is not allowed")

    def parse_class(self, _class: object) -> dict[str, list[ColumnDefinition]]:
        """
        takes a class, not a class instance and returns a dictionary of column definition inferred from class attributes.

        class fields cannot contain more than two type hints.
        If a field contains two type hints, one of the types must be NoneType.

        To specify Snowflake specific field such as TIMESTAMP_NTZ, use Annotated type hints
        e.g.

        /```python
        class A:
            field1: Annotated[str, "timestamp_ntz"]
        /```

        """
        columns: list[ColumnDefinition] = []
        for fieldname, type_hint in get_type_hints(_class, include_extras=True).items():
            result = self._parse_type_hint(type_hint)
            datatype, nullable = result
            columns.append(
                ColumnDefinition(name=fieldname, type=datatype, nullable=nullable)
            )
        return {"fields": columns}

    def _parse_type_hint(self, type_hint: object | UnionType) -> tuple[str, bool]:
        """
        returns a tuple of (datatype, nullable): tuple[str, bool]
        """
        datatype = ""
        nullable = False

        if res := self._parse_annotated_type(type_hint):
            if res["snowflake_type"]:
                datatype = res["snowflake_type"]
                nullable = True if self._parse_union_type(res["origin_type"]) else False
                return (datatype, nullable)
            type_hint = res["origin_type"]

        # This supports generic types, Callable, Tuple, Union, Literal, Final, ClassVar,
        # Annotated, and others. Return None for unsupported types.
        # get_origin never returns Optional, instead it returns Union
        complex_type = get_origin(type_hint)
        if res := self._parse_union_type(type_hint):
            main_type, nullable = res[0], res[1]
            type_hint = main_type  # main type is any type hint except for None
            complex_type = get_origin(main_type)
        if complex_type is Literal:
            atomic_type = get_args(type_hint)[0]
            datatype = self._translate_atomic_type(atomic_type)
        if complex_type in [dict, list, TypedDict, tuple]:
            datatype = "variant"
        if not complex_type:
            datatype = self._translate_atomic_type(type_hint)
        return (datatype, nullable)

    def _parse_annotated_type(self, type_hint: Any) -> dict[str, Any] | None:
        """
        returns dict of snowflake_type:str and origin_type: Any

        returns None if given type_hint is not Annotated

        If Annotated does not contain Snowflake data types,
        it returns `{"snowflake_type": "", "original_type": origin_type}`
        """
        if get_origin(type_hint) is not Annotated:
            return

        metadata = [
            metadata.strip().lower()
            for metadata in type_hint.__metadata__
            if isinstance(metadata, str)
        ]
        snow_types = self.snowflake_supported_types.intersection(metadata)

        if len(snow_types) > 1:
            raise ValueError(
                "Annotated cannot contain more than one snowflake data type"
            )

        snowflake_type = snow_types.pop() if snow_types else ""
        origin_type = type_hint.__origin__
        return {"snowflake_type": snowflake_type, "origin_type": origin_type}

    def _parse_union_type(self, type_hint: Any) -> tuple[object, bool] | None:
        """
        returns None if given type_hint is not UnionType

        UnionType must contain only two types and one of these types must be NoneType
        """
        types = get_args(type_hint)
        nullable = True
        if get_origin(type_hint) not in [Union, UnionType] or len(types) < 2:
            return
        if len(types) > 2:
            raise ValueError("Union type cannot contain more than two types")
        if NoneType not in types:
            raise ValueError("Union type must include NoneType")
        main_type = types[0] if types[0] is not NoneType else types[1]

        return (main_type, nullable)

    def _translate_atomic_type(self, atomic_type: object) -> str:
        snowflake_type = ""
        if atomic_type is str or isinstance(atomic_type, str):
            snowflake_type = "string"
        if atomic_type is int or isinstance(atomic_type, int):
            snowflake_type = "integer"
        if atomic_type is float or isinstance(atomic_type, float):
            snowflake_type = "float"
        if atomic_type is bool or isinstance(atomic_type, bool):
            snowflake_type = "boolean"
        if not snowflake_type:
            raise RuntimeError(
                f"{atomic_type=} is not defined in this method thus can't be translated"
            )
        return snowflake_type


def create_table_definition(_class: object) -> StructType:
    parser = ClassParser()
    column_definitions = parser.parse_class(_class)
    table_definition: StructType = StructType.from_json(column_definitions)
    return table_definition


def create_or_replace_table(session, table_name: str, table_definition: StructType):
    dataframe = session.create_dataframe([], schema=table_definition)
    if session.catalog.tableExists(table_name):
        dataframe.write.mode("overwrite").save_as_table(table_name)
        return
    # creates table if the table does not already exists.
    # otherwise, simple ignores this operation
    dataframe.write.mode("ignore").save_as_table(table_name)


def create_table(session, table_name: str, table_definition: StructType):
    dataframe = session.create_dataframe([], schema=table_definition)
    # Throws an exception if the table already exists.
    dataframe.write.mode("errorifexists").save_as_table(table_name)


def insert_data(
    session, table_name: str, table_definition: StructType, data: list[list[Any]]
):
    dataframe = session.create_dataframe(data, schema=table_definition)
    dataframe.write.mode("append").save_as_table(table_name)

def get_last_recorded_value(session, table_name: str, column_name: str) -> int | None:
    column_name = column_name.strip().upper()
    dframe_backlog_data = session.table(table_name)
    if column_name not in dframe_backlog_data.columns:
        raise ValueError(
            f"{column_name} is not defined in the given table {table_name}"
        )
    column_alias = f"LAST_{column_name}"
    result: list[Any] = dframe_backlog_data.agg(
        f.max(column_name).alias(column_alias)
    ).collect()
    # returns none if there are not records
    return result[0][column_alias]


def main(snowflake_session, table_name: str):
    session = snowflake_session
    table_definition = create_table_definition(BacklogActivity)

    if not session.catalog.tableExists(table_name):
        create_table(session, table_name, table_definition)

    last_activity_id = get_last_recorded_value(session, table_name, "activity_id") or 0
    count = 0
    api_requests = 0
    while backlog_data := fetch_backlog_data(last_activity_id):
        values = [list(asdict(data).values()) for data in backlog_data]
        insert_data(session, table_name, table_definition, values)
        last_activity_id = backlog_data[0].activity_id
        count += len(values)
        api_requests += 1
    return f"{count} rows of data have been inserted into {table_name} in {api_requests} api_requests"
$$;

-- call backup_backlog_data('backlog_data');

-- select * from backlog_data;
-------------------------------------
CREATE TASK backup_backlog_data
  SCHEDULE='60 MINUTES'
  SERVERLESS_TASK_MAX_STATEMENT_SIZE='SMALL'
  SUSPEND_TASK_AFTER_NUM_FAILURES = 1
  AS
    call backup_backlog_data('backlog_data');

ALTER TASK backup_backlog_data resume;

Comments

Popular posts from this blog

Introduction to SQLFluff: How to make your SQL code clean and error-free

Image by Jake Aldridge from Pixabay You know oftentimes, the cause of runtime or compile errors and hours of debugging agony is all due to simply a missing semicolon. Have you ever had such experience? If you had, you are not alone. There are two ways to avoid these unfortunate situations: either become a perfect developer who never makes mistakes, or use helpful tools such as linters that can catch these errors early on. I am nowhere near being a perfect developer who never makes a mistake. In fact, I'm probably the opposite of a perfect developer, so even if I wanted to, I wouldn’t be able to teach you how to become a perfect developer. But what I can teach you is using linters. A Wikipedia defines a linter as a "static code analysis tool used to flag programming errors, bugs, stylistic errors and suspicious constructs." If you're not convinced yet on using linters, consider this scenario: in a large project with multiple members, different people tend to ...

How To Use KeePassXC Cli

There are similarly named programs: KeePass, KeePassX and KeePassXC (many of which are each others’ forks). Program Condition KeePass primarily for Windows. KeePassX no longer actively maintained. KeePassXC actively maintained and runs natively on Linux, macOS and Windows . Note: GUI version of the KeePassXC has more features than cli version. GUI version has variety of shortcuts as well. Regarding how to use GUI version of the KeePassXC, visit Getting Started Guide . Below features are available only in GUI version. Setting “Name” and “Description” fields of passwords database. Nesting Groups. Creating entry attributes ( open issue ). Adding Timed One-Time Passwords (TOTP). Adding entry with the same title as existing entry. KeePassXC stores all the passwords in passwords database. A passwords database (hereafter referred to as database) is an (encrypted) binary file. It can have any or no extension, but the .kdbx extension is commonly used. The ...

Git squash merge explained

There are many ways to integrate changes in git: regular / normal git merge, git squash merge, git rebase etc. This article explains git squash merge by comparing it to regular merge. Let’s use below example: In the repository with default main branch, after two commits, a new feature branch is created. Some work happened in feature branch. feature branch now has 2 commits that it shares with main branch, and three exclusive commits (exists only in feature branch). In the meantime, others worked on main branch and added two new commits (exists only in main branch). git log output of the main branch: c72d4a9 ( HEAD - > main ) fourth commit on main 2c3dd61 third commit on main 0c2eec3 second commit on main 9b968e8 first commit on main git log output of the feature branch: 786650f ( HEAD - > feature ) third commit on feature 21cbaf1 second commit on feature 677bc7f first commit on feature 0c2eec3 second commit on main 9b968e8 first commit on mai...