Skip to main content

Ingesting Nulab Backlog Data Into Snowflake Tables: Part III

This tutorial has four parts in total:

We will continue from where we left off in the part two of this tutorial.

Before diving into creating Snowflake objects, let’s see what the final result looks like once again:

  1. Snowflake Task calls Stored Procedure every few hours
  2. Stored Procedure fetches data from Backlog API
  3. Stored Procedure records the fetched data into Snowflake table

In this tutorial we will create Python Stored Procedure. Instead of dumping all the code at once, I will incrementally build up our solution.

Creating Python Stored Procedure

Ultimately our stored procedure will be fetching data from backlog and recording it onto given table. If table does not exist, it should create the table. First let’s build the part of our stored procedure that checks if table exists or not.

code_sample_4.sql

CREATE OR REPLACE PROCEDURE backup_backlog_data(table_name VARCHAR)
RETURNS STRING NOT NULL
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core')
HANDLER = 'main' -- entry point to our handler
AS
$$
def main(snowflake_session, table_name: str) -> str:
    session = snowflake_session
    if session.catalog.tableExists(table_name):
        return f"table {table_name} already exists"
    return f"table {table_name} does not exist"
$$;

call backup_backlog_data('backlog_data');

In above code, since we are not providing database and schema names, stored procedure will be created in the current database and schema. When checking the existence of the table too, the stored procedure will be checking against current database and schema. If you want, you can provide database and schema names.

Unsurprisingly, the table does not exist yet. As a next step, our stored procedure should create a table.

code_sample_5.py

CREATE OR REPLACE PROCEDURE backup_backlog_data(table_name VARCHAR)
RETURNS STRING NOT NULL
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core')
HANDLER = 'main' -- entry point to our handler
AS
$$
from snowflake.snowpark.types import StructType


def create_table_definition() -> StructType:
    json_data = {
        "fields": [
            {"name": "col_1", "type": "string", "nullable": True},
            {"name": "col_2", "type": "string", "nullable": False},
            {"name": "col_3", "type": "variant", "nullable": True},
        ]
    }
    table_definition: StructType = StructType.from_json(json_data)
    return table_definition


def main(snowflake_session, table_name: str) -> str:
    session = snowflake_session

    if session.catalog.tableExists(table_name):
        return f"table {table_name} already exists"

    table_definition = create_table_definition()
    dataframe = session.create_dataframe([], schema=table_definition)
    dataframe.write.mode("ignore").save_as_table(table_name)
    return f"table {table_name} has been created"
$$;

call backup_backlog_data('backlog_data');

select * from backlog_data;

All of a sudden there are many new stuff in our code. Let me go over them one by one.

StructType instance represents a table schema (DDL). StructType is a class, when we initialize it, it returns its instance whose fields represent columns via StructFields. Subjectively, the easiest way to initialize StructType is by using its from_json method.

Snowflake extensively uses dataframes, if you know them, that’s great. If you don’t know them, don’t sweat. You can simply think of dataframe as an object representing a table.

session.create_dataframe method creates a dataframe in the memory with given column names, their datatype and null constraints. In the next line we are saving the dataframe in the database as a table. (It is possible to convert dataframe into csv and save it into Snowflake stages as well.)

You may be wondering where I got the mode("ignore") part. The docs page gives almost no information on available options.

Snowflake documentation is poor. To get more detailed information on any of the Snowflake concepts discussed on this post and beyond, click the “[source]” link on SIGNATURE section of the documentation page. The link takes you to a GitHub source page.

alt text

mode can take any of the below values.

“append”
Append data of this DataFrame to the existing table. Creates a table if it does not exist.
“overwrite”
Overwrite the existing table by dropping old table.
“truncate”
Overwrite the existing table by truncating old table.
“errorifexists”
Throw an exception if the table already exists.
“ignore”
Ignore this operation if the table already exists.

Default value is “errorifexists”.

Because ignore mode creates table only if it doesn’t already exist, we don’t need to check the existence of the table in the if statement of previous example. I decided to keep it for readability.

Converting Python Dataclass to Snowflake StructType

We learnt how to create tables using dataframes. But we haven’t created the table we need yet. We need a table with columns like activity_id, activity_type,activity_type_id etc, basically the same as the fields of our BacklogActivity dataclass.

As we saw in the previous example, we define the table structure using StructType. StructType needs information on columns.

json_data = {
        "fields": [
            {"name": "col_1", "type": "string", "nullable": True},
            {"name": "col_2", "type": "string", "nullable": False},
            {"name": "col_3", "type": "variant", "nullable": True},
        ]
    }
table_definition: StructType = StructType.from_json(json_data)

When we initialize StructType, we should specify the column name, datatype and whether it is nullable or not. Exact same data is defined in our BacklogActivity dataclass, though in a slightly different format.

@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
    activity_id: int
    activity_type: str
    activity_type_id: int
    project_id: int
    project_key: str
    project_name: str
    content: dict[str, Any]
    creator_id: int
    creator_name: str
    creator_email_address: str | None # nullable
    creator_nulab_account_id: str
    creator_nulab_unique_id: str
    created_at: Annotated[str, "timestamp_ntz"]

Our next goal is to extract necessary data (field names, datatypes and nullability) from BacklogActivity dataclass in JSON format and use it to create StructType (table definition).

code_sample_6.py

from dataclasses import dataclass
from pprint import pprint
from types import NoneType, UnionType
from typing import (
    Annotated,
    Any,
    Literal,
    NoReturn,
    TypedDict,
    Union,
    get_args,
    get_origin,
    get_type_hints,
)

# Create table definition from dataclass metadata


@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
    activity_id: int
    type_id: int
    activity_type: str
    project_id: int
    project_key: str
    project_name: str
    content: dict[str, Any]
    creator_id: int
    creator_name: str
    creator_email_address: str | None
    creator_nulab_account_id: str
    creator_nulab_unique_id: str
    created_at: Annotated[str, "timestamp_ntz"]


class ColumnDefinition(TypedDict):
    name: str
    type: str
    nullable: bool


class ClassParser:
    """
    Parses the class attributes that use type hinting and generates column definitions compatible with Snowflake Snowpark API.
    If class attribute does not use type hinting, it is omitted from generated column definitions.
    """

    def __init__(self):
        pass

    @property
    def snowflake_supported_types(self):
        return {
            "string",
            "integer",
            "float",
            "decimal",
            "double",
            "short",
            "long",
            "boolean",
            "variant",
            "timestamp",
            "timestamp_tz",
            "timestamp_ltz",
            "timestamp_ntz",
            # There are more supported types
        }

    @snowflake_supported_types.setter
    def snowflake_supported_types(self, value: Any) -> NoReturn:
        raise ValueError("Assigning value is not allowed")

    def parse_class(self, _class: object) -> dict[str, list[ColumnDefinition]]:
        """
        takes a class, not a class instance, and returns a dictionary of column definitions inferred from class attributes.

        class fields cannot contain more than two types.
        If a field contains two types, one of the types must be NoneType.

        To specify Snowflake specific field such as TIMESTAMP_NTZ, use Annotated type hints
        e.g.

        \```python
        class A:
            field1: Annotated[str, "timestamp_ntz"]
        \```
        """

        columns: list[ColumnDefinition] = []
        for fieldname, type_hint in get_type_hints(_class, include_extras=True).items():
            result = self._parse_type_hint(type_hint)
            datatype, nullable = result
            columns.append(
                ColumnDefinition(name=fieldname, type=datatype, nullable=nullable)
            )
        return {"fields": columns}

    def _parse_type_hint(self, type_hint: object | UnionType) -> tuple[str, bool]:
        """
        returns a tuple of (datatype, nullable): tuple[str, bool]
        """
        datatype = ""
        nullable = False

        if res := self._parse_annotated_type(type_hint):
            if res["snowflake_type"]:
                datatype = res["snowflake_type"]
                nullable = True if self._parse_union_type(res["origin_type"]) else False
                return (datatype, nullable)
            type_hint = res["origin_type"]

        # This supports generic types, Callable, Tuple, Union, Literal, Final, ClassVar,
        # Annotated, and others. Return None for unsupported types.
        # get_origin never returns Optional, instead it returns Union
        complex_type = get_origin(type_hint)
        if res := self._parse_union_type(type_hint):
            main_type, nullable = res[0], res[1]
            type_hint = main_type # main type is any type hint except for None
            complex_type = get_origin(main_type)
        if complex_type is Literal:
            atomic_type = get_args(type_hint)[0]
            datatype = self._translate_atomic_type(atomic_type)
        if complex_type in [dict, list, TypedDict, tuple]:
            datatype = "variant"
        if not complex_type:
            datatype = self._translate_atomic_type(type_hint)
        return (datatype, nullable)

    def _parse_annotated_type(self, type_hint: Any) -> dict[str, Any] | None:
        """
        returns dict of snowflake_type:str and origin_type: Any

        returns None if given type_hint is not Annotated

        If Annotated does not contain Snowflake data types,
        it returns `{"snowflake_type": "", "original_type": origin_type}`
        """
        if get_origin(type_hint) is not Annotated:
            return

        metadata = [
            metadata.strip().lower()
            for metadata in type_hint.__metadata__
            if isinstance(metadata, str)
        ]
        snow_types = self.snowflake_supported_types.intersection(metadata)

        if len(snow_types) > 1:
            raise ValueError(
                "Annotated cannot contain more than one snowflake data type"
            )

        snowflake_type = snow_types.pop() if snow_types else ""
        origin_type = type_hint.__origin__
        return {"snowflake_type": snowflake_type, "origin_type": origin_type}

    def _parse_union_type(self, type_hint: Any) -> tuple[object, bool] | None:
        """
        returns None if given type_hint is not UnionType

        UnionType must contain only two types and one of these types must be NoneType.

        Raises exception if UnionType does not include NoneType.
        """
        types = get_args(type_hint)
        nullable = True
        if get_origin(type_hint) not in [Union, UnionType] or len(types) < 2:
            return
        if len(types) > 2:
            raise ValueError("Union type cannot contain more than two types")
        if NoneType not in types:
            raise ValueError("Union type must include NoneType")
        main_type = types[0] if types[0] is not NoneType else types[1]

        return (main_type, nullable)

    def _translate_atomic_type(self, atomic_type: object) -> str:
        snowflake_type = ""
        if atomic_type is str or isinstance(atomic_type, str):
            snowflake_type = "string"
        if atomic_type is int or isinstance(atomic_type, int):
            snowflake_type = "integer"
        if atomic_type is float or isinstance(atomic_type, float):
            snowflake_type = "float"
        if atomic_type is bool or isinstance(atomic_type, bool):
            snowflake_type = "boolean"
        if not snowflake_type:
            raise RuntimeError(
                f"{atomic_type=} is not defined in this method thus can't be translated"
            )
        return snowflake_type


def create_column_definitions():
    parser = ClassParser()
    pprint(parser.parse_class(BacklogActivity), sort_dicts=False)


if __name__ == "__main__":
    create_column_definitions()

Above code outputs this:

{'fields': [{'name': 'activity_id', 'type': 'integer', 'nullable': False},
            {'name': 'type_id', 'type': 'integer', 'nullable': False},
            {'name': 'activity_type', 'type': 'string', 'nullable': False},
            {'name': 'project_id', 'type': 'integer', 'nullable': False},
            {'name': 'project_key', 'type': 'string', 'nullable': False},
            {'name': 'project_name', 'type': 'string', 'nullable': False},
            {'name': 'content', 'type': 'variant', 'nullable': False},
            {'name': 'creator_id', 'type': 'integer', 'nullable': False},
            {'name': 'creator_name', 'type': 'string', 'nullable': False},
            {'name': 'creator_email_address', 'type': 'string', 'nullable': True},
            {'name': 'creator_nulab_account_id', 'type': 'string', 'nullable': False},
            {'name': 'creator_nulab_unique_id', 'type': 'string', 'nullable': False},
            {'name': 'created_at', 'type': 'timestamp_ntz', 'nullable': False}]}

Types that Snowflake StructField accepts are the following:

  • string
  • binary
  • boolean
  • decimal
  • float
  • double
  • byte
  • short
  • integer
  • long
  • date
  • null
  • timestamp
  • timestamp_ltz
  • timestamp_ntz
  • array
  • map
  • variant (for objects, dictionaries and lists)
  • some more

You can check these and other accepted data-types on the GitHub source page of StructField.

Now let’s put it together, and create a proper table using dataclass metadata.

If you haven’t dropped previously created table, you should drop it.

drop table backlog_data;

code_sample_7.sql

CREATE OR REPLACE PROCEDURE backup_backlog_data(table_name VARCHAR)
RETURNS STRING NOT NULL
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core')
HANDLER = 'main' -- entry point to our handler
AS
$$
# Create table from dataclass metadata
from dataclasses import dataclass
from types import NoneType, UnionType
from typing import (
    Annotated,
    Any,
    Literal,
    NoReturn,
    TypedDict,
    Union,
    get_args,
    get_origin,
    get_type_hints,
)

from snowflake.snowpark.types import StructType


@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
    activity_id: int
    type_id: int
    activity_type: str
    project_id: int
    project_key: str
    project_name: str
    content: dict[str, Any]
    creator_id: int
    creator_name: str
    creator_email_address: str | None
    creator_nulab_account_id: str
    creator_nulab_unique_id: str
    created_at: Annotated[str, "timestamp_ntz"]


class ColumnDefinition(TypedDict):
    name: str
    type: str
    nullable: bool


class ClassParser:
    """
    Parses the class attributes that use type hinting and generates column definitions compatible with Snowflake Snowpark API.
    If class attribute does not use type hinting, it is omitted from generated column definitions.
    """

    def __init__(self):
        pass

    @property
    def snowflake_supported_types(self):
        return {
            "string",
            "integer",
            "float",
            "decimal",
            "double",
            "short",
            "long",
            "boolean",
            "variant",
            "timestamp",
            "timestamp_tz",
            "timestamp_ltz",
            "timestamp_ntz",
            # There are more supported types
        }

    @snowflake_supported_types.setter
    def snowflake_supported_types(self, value: Any) -> NoReturn:
        raise ValueError("Assigning value is not allowed")

    def parse_class(self, _class: object) -> dict[str, list[ColumnDefinition]]:
        """
        takes a class, not a class instance and returns a dictionary of column definition inferred from class attributes.

        class fields cannot contain more than two type hints.
        If a field contains two type hints, one of the types must be NoneType.

        To specify Snowflake specific field such as TIMESTAMP_NTZ, use Annotated type hints
        e.g.

        \```python
        class A:
            field1: Annotated[str, "timestamp_ntz"]
        \```

        """
        columns: list[ColumnDefinition] = []
        for fieldname, type_hint in get_type_hints(_class, include_extras=True).items():
            result = self._parse_type_hint(type_hint)
            datatype, nullable = result
            columns.append(
                ColumnDefinition(name=fieldname, type=datatype, nullable=nullable)
            )
        return {"fields": columns}

    def _parse_type_hint(self, type_hint: object | UnionType) -> tuple[str, bool]:
        """
        returns a tuple of (datatype, nullable): tuple[str, bool]
        """
        datatype = ""
        nullable = False

        if res := self._parse_annotated_type(type_hint):
            if res["snowflake_type"]:
                datatype = res["snowflake_type"]
                nullable = True if self._parse_union_type(res["origin_type"]) else False
                return (datatype, nullable)
            type_hint = res["origin_type"]

        # This supports generic types, Callable, Tuple, Union, Literal, Final, ClassVar,
        # Annotated, and others. Return None for unsupported types.
        # get_origin never returns Optional, instead it returns Union
        complex_type = get_origin(type_hint)
        if res := self._parse_union_type(type_hint):
            main_type, nullable = res[0], res[1]
            type_hint = main_type  # main type is any type hint except for None
            complex_type = get_origin(main_type)
        if complex_type is Literal:
            atomic_type = get_args(type_hint)[0]
            datatype = self._translate_atomic_type(atomic_type)
        if complex_type in [dict, list, TypedDict, tuple]:
            datatype = "variant"
        if not complex_type:
            datatype = self._translate_atomic_type(type_hint)
        return (datatype, nullable)

    def _parse_annotated_type(self, type_hint: Any) -> dict[str, Any] | None:
        """
        returns dict of snowflake_type:str and origin_type: Any

        returns None if given type_hint is not Annotated

        If Annotated does not contain Snowflake data types,
        it returns `{"snowflake_type": "", "original_type": origin_type}`
        """
        if get_origin(type_hint) is not Annotated:
            return

        metadata = [
            metadata.strip().lower()
            for metadata in type_hint.__metadata__
            if isinstance(metadata, str)
        ]
        snow_types = self.snowflake_supported_types.intersection(metadata)

        if len(snow_types) > 1:
            raise ValueError(
                "Annotated cannot contain more than one snowflake data type"
            )

        snowflake_type = snow_types.pop() if snow_types else ""
        origin_type = type_hint.__origin__
        return {"snowflake_type": snowflake_type, "origin_type": origin_type}

    def _parse_union_type(self, type_hint: Any) -> tuple[object, bool] | None:
        """
        returns None if given type_hint is not UnionType

        UnionType must contain only two types and one of these types must be NoneType.

        Raises exception if UnionType does not include NoneType.
        """
        types = get_args(type_hint)
        nullable = True
        if get_origin(type_hint) not in [Union, UnionType] or len(types) < 2:
            return
        if len(types) > 2:
            raise ValueError("Union type cannot contain more than two types")
        if NoneType not in types:
            raise ValueError("Union type must include NoneType")
        main_type = types[0] if types[0] is not NoneType else types[1]

        return (main_type, nullable)

    def _translate_atomic_type(self, atomic_type: object) -> str:
        snowflake_type = ""
        if atomic_type is str or isinstance(atomic_type, str):
            snowflake_type = "string"
        if atomic_type is int or isinstance(atomic_type, int):
            snowflake_type = "integer"
        if atomic_type is float or isinstance(atomic_type, float):
            snowflake_type = "float"
        if atomic_type is bool or isinstance(atomic_type, bool):
            snowflake_type = "boolean"
        if not snowflake_type:
            raise RuntimeError(
                f"{atomic_type=} is not defined in this method thus can't be translated"
            )
        return snowflake_type


def create_table_definition(_class: object) -> StructType:
    parser = ClassParser()
    column_definitions = parser.parse_class(_class)
    table_definition: StructType = StructType.from_json(column_definitions)
    return table_definition


def create_or_replace_table(session, table_name: str, table_definition: StructType):
    dataframe = session.create_dataframe([], schema=table_definition)
    if session.catalog.tableExists(table_name):
        dataframe.write.mode("overwrite").save_as_table(table_name)
        return
    # creates table if the table does not already exists.
    # otherwise, simple ignores this operation
    dataframe.write.mode("ignore").save_as_table(table_name)


def create_table(session, table_name: str, table_definition: StructType):
    dataframe = session.create_dataframe([], schema=table_definition)
    # Throws an exception if the table already exists.
    dataframe.write.mode("errorifexists").save_as_table(table_name)


def main(snowflake_session, table_name: str):
    session = snowflake_session
    table_definition = create_table_definition(BacklogActivity)

    if not session.catalog.tableExists(table_name):
        create_table(session, table_name, table_definition)

    return f"table {table_name} has been created"

$$;

call backup_backlog_data('backlog_data');

select * from backlog_data;

Inserting Data

The table has been created. Now let’s learn how to insert data into our table. Snowflake procedures cannot access the internet by default. To enable internet access we need to create external access integration. But before that let’s just create a insert_into function and test it with sample data.

Preparing sample data.

code_sample_8.py

from dataclasses import dataclass, asdict
from typing import Annotated, Any

@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
    activity_id: int
    type_id: int
    activity_type: str
    project_id: int
    project_key: str
    project_name: str
    content: dict[str, Any]
    creator_id: int
    creator_name: str
    creator_email_address: str | None
    creator_nulab_account_id: str
    creator_nulab_unique_id: str
    created_at: Annotated[str, "timestamp_ntz"]


def create_sample_data() -> list[BacklogActivity]:
    data1 = BacklogActivity(
        activity_id=630,
        type_id=2,
        activity_type="Issue Updated",
        project_id=61,
        project_key="MY_IT_006",
        project_name="XYZ",
        content={
            "id": 200,
            "key_id": 36,
            "summary": "loreum",
            "description": "lorem ipsum",
            "comment": {"id": 76, "content": "Great"},
            "changes": [
                {
                    "field": "description",
                    "field_text": "Description",
                    "new_value": "description version 2",
                    "old_value": "description version 1",
                    "type": "standard",
                }
            ],
            "attachments": [],
            "shared_files": [],
            "external_file_links": [],
        },
        created_at="2026-01-18T06:22:30Z",
        creator_id=80,
        creator_name="John Doe",
        creator_email_address=None,
        creator_nulab_account_id="xyzt",
        creator_nulab_unique_id="JohnDoe007",
    )
    data2 = BacklogActivity(
        activity_id=631,  # <<--- different activity_id
        type_id=2,
        activity_type="Issue Updated",
        project_id=11,
        project_key="MY_IT_006",
        project_name="XYZ",
        content={
            "id": 200,
            "key_id": 36,
            "summary": "loreum",
            "description": "lorem ipsum",
            "comment": {"id": 76, "content": "Great"},
            "changes": [
                {
                    "field": "description",
                    "field_text": "Description",
                    "new_value": "description version 2",
                    "old_value": "description version 1",
                    "type": "standard",
                }
            ],
            "attachments": [],
            "shared_files": [],
            "external_file_links": [],
        },
        created_at="2026-01-19T06:22:30Z",
        creator_id=80,
        creator_name="John Doe",
        creator_email_address=None,
        creator_nulab_account_id="xyzt",
        creator_nulab_unique_id="JohnDoe007",
    )
    return [data1, data2]

def main():
    sample_data = create_sample_data()
    values = [list(asdict(data).values()) for data in sample_data]
    print(values)

if __name__ == "__main__":
    main()

output:

[
    [630, 2, 'Issue Updated', 61, 'MY_IT_006', 'XYZ', {'id': 200, 'key_id': 36, 'summary': 'loreum', 'description': 'lorem ipsum', 'comment': {'id': 76, 'content': 'Great'}, 'changes': [{'field': 'description', 'field_text': 'Description', 'new_value': 'description version 2', 'old_value': 'description version 1', 'type': 'standard'}], 'attachments': [], 'shared_files': [], 'external_file_links': []}, 80, 'John Doe', None, 'xyzt', 'JohnDoe007', '2026-01-18T06:22:30Z'],
    [631, 2, 'Issue Updated', 11, 'MY_IT_006', 'XYZ', {'id': 200, 'key_id': 36, 'summary': 'loreum', 'description': 'lorem ipsum', 'comment': {'id': 76, 'content': 'Great'}, 'changes': [{'field': 'description', 'field_text': 'Description', 'new_value': 'description version 2', 'old_value': 'description version 1', 'type': 'standard'}], 'attachments': [], 'shared_files': [], 'external_file_links': []}, 80, 'John Doe', None, 'xyzt', 'JohnDoe007', '2026-01-19T06:22:30Z']
]

Insert sample data into our table:

code_sample_9.sql

CREATE OR REPLACE PROCEDURE backup_backlog_data(table_name VARCHAR)
RETURNS STRING NOT NULL
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core')
HANDLER = 'main' -- entry point to our handler
AS
$$
from dataclasses import dataclass, asdict
from types import NoneType, UnionType
from typing import (
    Annotated,
    Any,
    Literal,
    NoReturn,
    TypedDict,
    Union,
    get_args,
    get_origin,
    get_type_hints,
)

from snowflake.snowpark.types import StructType

# insert data into table

@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
    activity_id: int
    type_id: int
    activity_type: str
    project_id: int
    project_key: str
    project_name: str
    content: dict[str, Any]
    creator_id: int
    creator_name: str
    creator_email_address: str | None
    creator_nulab_account_id: str
    creator_nulab_unique_id: str
    created_at: Annotated[str, "timestamp_ntz"]


def create_sample_data() -> list[BacklogActivity]:
    data1 = BacklogActivity(
        activity_id=630,
        type_id=2,
        activity_type="Issue Updated",
        project_id=61,
        project_key="MY_IT_006",
        project_name="XYZ",
        content={
            "id": 200,
            "key_id": 36,
            "summary": "loreum",
            "description": "lorem ipsum",
            "comment": {"id": 76, "content": "Great"},
            "changes": [
                {
                    "field": "description",
                    "field_text": "Description",
                    "new_value": "description version 2",
                    "old_value": "description version 1",
                    "type": "standard",
                }
            ],
            "attachments": [],
            "shared_files": [],
            "external_file_links": [],
        },
        created_at="2026-01-18T06:22:30Z",
        creator_id=80,
        creator_name="John Doe",
        creator_email_address=None,
        creator_nulab_account_id="xyzt",
        creator_nulab_unique_id="JohnDoe007",
    )
    data2 = BacklogActivity(
        activity_id=631,  # <<--- different activity_id
        type_id=2,
        activity_type="Issue Updated",
        project_id=11,
        project_key="MY_IT_006",
        project_name="XYZ",
        content={
            "id": 200,
            "key_id": 36,
            "summary": "loreum",
            "description": "lorem ipsum",
            "comment": {"id": 76, "content": "Great"},
            "changes": [
                {
                    "field": "description",
                    "field_text": "Description",
                    "new_value": "description version 2",
                    "old_value": "description version 1",
                    "type": "standard",
                }
            ],
            "attachments": [],
            "shared_files": [],
            "external_file_links": [],
        },
        created_at="2026-01-19T06:22:30Z",
        creator_id=80,
        creator_name="John Doe",
        creator_email_address=None,
        creator_nulab_account_id="xyzt",
        creator_nulab_unique_id="JohnDoe007",
    )
    return [data1, data2]


# Create table definition from dataclass metadata


class ColumnDefinition(TypedDict):
    name: str
    type: str
    nullable: bool


class ClassParser:
    """
    Parses the class attributes that use type hinting and generates column definitions compatible with Snowflake Snowpark API.
    If class attribute does not use type hinting, it is omitted from generated column definitions.
    """

    def __init__(self):
        pass

    @property
    def snowflake_supported_types(self):
        return {
            "string",
            "integer",
            "float",
            "decimal",
            "double",
            "short",
            "long",
            "boolean",
            "variant",
            "timestamp",
            "timestamp_tz",
            "timestamp_ltz",
            "timestamp_ntz",
            # There are more supported types
        }

    @snowflake_supported_types.setter
    def snowflake_supported_types(self, value: Any) -> NoReturn:
        raise ValueError("Assigning value is not allowed")

    def parse_class(self, _class: object) -> dict[str, list[ColumnDefinition]]:
        """
        takes a class, not a class instance and returns a dictionary of column definition inferred from class attributes.

        class fields cannot contain more than two type hints.
        If a field contains two type hints, one of the types must be NoneType.

        To specify Snowflake specific field such as TIMESTAMP_NTZ, use Annotated type hints
        e.g.

        ```python
        class A:
            field1: Annotated[str, "timestamp_ntz"]
        ```

        """
        columns: list[ColumnDefinition] = []
        for fieldname, type_hint in get_type_hints(_class, include_extras=True).items():
            result = self._parse_type_hint(type_hint)
            datatype, nullable = result
            columns.append(
                ColumnDefinition(name=fieldname, type=datatype, nullable=nullable)
            )
        return {"fields": columns}

    def _parse_type_hint(self, type_hint: object | UnionType) -> tuple[str, bool]:
        """
        returns a tuple of (datatype, nullable): tuple[str, bool]
        """
        datatype = ""
        nullable = False

        if res := self._parse_annotated_type(type_hint):
            if res["snowflake_type"]:
                datatype = res["snowflake_type"]
                nullable = True if self._parse_union_type(res["origin_type"]) else False
                return (datatype, nullable)
            type_hint = res["origin_type"]

        # This supports generic types, Callable, Tuple, Union, Literal, Final, ClassVar,
        # Annotated, and others. Return None for unsupported types.
        # get_origin never returns Optional, instead it returns Union
        complex_type = get_origin(type_hint)
        if res := self._parse_union_type(type_hint):
            main_type, nullable = res[0], res[1]
            type_hint = main_type  # main type is any type hint except for None
            complex_type = get_origin(main_type)
        if complex_type is Literal:
            atomic_type = get_args(type_hint)[0]
            datatype = self._translate_atomic_type(atomic_type)
        if complex_type in [dict, list, TypedDict, tuple]:
            datatype = "variant"
        if not complex_type:
            datatype = self._translate_atomic_type(type_hint)
        return (datatype, nullable)

    def _parse_annotated_type(self, type_hint: Any) -> dict[str, Any] | None:
        """
        returns dict of snowflake_type:str and origin_type: Any

        returns None if given type_hint is not Annotated

        If Annotated does not contain Snowflake data types,
        it returns `{"snowflake_type": "", "original_type": origin_type}`
        """
        if get_origin(type_hint) is not Annotated:
            return

        metadata = [
            metadata.strip().lower()
            for metadata in type_hint.__metadata__
            if isinstance(metadata, str)
        ]
        snow_types = self.snowflake_supported_types.intersection(metadata)

        if len(snow_types) > 1:
            raise ValueError(
                "Annotated cannot contain more than one snowflake data type"
            )

        snowflake_type = snow_types.pop() if snow_types else ""
        origin_type = type_hint.__origin__
        return {"snowflake_type": snowflake_type, "origin_type": origin_type}

    def _parse_union_type(self, type_hint: Any) -> tuple[object, bool] | None:
        """
        returns None if given type_hint is not UnionType

        UnionType must contain only two types and one of these types must be NoneType
        """
        types = get_args(type_hint)
        nullable = True
        if get_origin(type_hint) not in [Union, UnionType] or len(types) < 2:
            return
        if len(types) > 2:
            raise ValueError("Union type cannot contain more than two types")
        if NoneType not in types:
            raise ValueError("Union type must include NoneType")
        main_type = types[0] if types[0] is not NoneType else types[1]

        return (main_type, nullable)

    def _translate_atomic_type(self, atomic_type: object) -> str:
        snowflake_type = ""
        if atomic_type is str or isinstance(atomic_type, str):
            snowflake_type = "string"
        if atomic_type is int or isinstance(atomic_type, int):
            snowflake_type = "integer"
        if atomic_type is float or isinstance(atomic_type, float):
            snowflake_type = "float"
        if atomic_type is bool or isinstance(atomic_type, bool):
            snowflake_type = "boolean"
        if not snowflake_type:
            raise RuntimeError(
                f"{atomic_type=} is not defined in this method thus can't be translated"
            )
        return snowflake_type


def create_table_definition(_class: object) -> StructType:
    parser = ClassParser()
    column_definitions = parser.parse_class(_class)
    table_definition: StructType = StructType.from_json(column_definitions)
    return table_definition


def create_or_replace_table(session, table_name: str, table_definition: StructType):
    dataframe = session.create_dataframe([], schema=table_definition)
    if session.catalog.tableExists(table_name):
        dataframe.write.mode("overwrite").save_as_table(table_name)
        return
    # creates table if the table does not already exists.
    # otherwise, simple ignores this operation
    dataframe.write.mode("ignore").save_as_table(table_name)


def create_table(session, table_name: str, table_definition: StructType):
    dataframe = session.create_dataframe([], schema=table_definition)
    # Throws an exception if the table already exists.
    dataframe.write.mode("errorifexists").save_as_table(table_name)


def insert_data(
    session, table_name: str, table_definition: StructType, data: list[list[Any]]
):
    dataframe = session.create_dataframe(data, schema=table_definition)
    dataframe.write.mode("append").save_as_table(table_name)


def main(snowflake_session, table_name: str):
    session = snowflake_session
    table_definition = create_table_definition(BacklogActivity)

    if not session.catalog.tableExists(table_name):
        create_table(session, table_name, table_definition)

    sample_data = create_sample_data()
    values = [list(asdict(data).values()) for data in sample_data]
    insert_data(session, table_name, table_definition, values)
    return f"data has been inserted into {table_name}"

$$;

call backup_backlog_data('backlog_data');

select * from backlog_data;

We have confirmed that our stored procedure creates table if it doesn’t exist and inserts data into it. Now we need to implement data fetching from Backlog API.

The append mode inserts data into a table, if the table does not exist, it creates the table. In essence, we don’t have to check the existence of the table, nor create it separetely. But for modularity and readibily I am leaving the code as it is.

This post has already become too log. Let’s finish the final piece of our project in the next post.

Comments

Popular posts from this blog

Introduction to SQLFluff: How to make your SQL code clean and error-free

Image by Jake Aldridge from Pixabay You know oftentimes, the cause of runtime or compile errors and hours of debugging agony is all due to simply a missing semicolon. Have you ever had such experience? If you had, you are not alone. There are two ways to avoid these unfortunate situations: either become a perfect developer who never makes mistakes, or use helpful tools such as linters that can catch these errors early on. I am nowhere near being a perfect developer who never makes a mistake. In fact, I'm probably the opposite of a perfect developer, so even if I wanted to, I wouldn’t be able to teach you how to become a perfect developer. But what I can teach you is using linters. A Wikipedia defines a linter as a "static code analysis tool used to flag programming errors, bugs, stylistic errors and suspicious constructs." If you're not convinced yet on using linters, consider this scenario: in a large project with multiple members, different people tend to ...

How To Use KeePassXC Cli

There are similarly named programs: KeePass, KeePassX and KeePassXC (many of which are each others’ forks). Program Condition KeePass primarily for Windows. KeePassX no longer actively maintained. KeePassXC actively maintained and runs natively on Linux, macOS and Windows . Note: GUI version of the KeePassXC has more features than cli version. GUI version has variety of shortcuts as well. Regarding how to use GUI version of the KeePassXC, visit Getting Started Guide . Below features are available only in GUI version. Setting “Name” and “Description” fields of passwords database. Nesting Groups. Creating entry attributes ( open issue ). Adding Timed One-Time Passwords (TOTP). Adding entry with the same title as existing entry. KeePassXC stores all the passwords in passwords database. A passwords database (hereafter referred to as database) is an (encrypted) binary file. It can have any or no extension, but the .kdbx extension is commonly used. The ...

Git squash merge explained

There are many ways to integrate changes in git: regular / normal git merge, git squash merge, git rebase etc. This article explains git squash merge by comparing it to regular merge. Let’s use below example: In the repository with default main branch, after two commits, a new feature branch is created. Some work happened in feature branch. feature branch now has 2 commits that it shares with main branch, and three exclusive commits (exists only in feature branch). In the meantime, others worked on main branch and added two new commits (exists only in main branch). git log output of the main branch: c72d4a9 ( HEAD - > main ) fourth commit on main 2c3dd61 third commit on main 0c2eec3 second commit on main 9b968e8 first commit on main git log output of the feature branch: 786650f ( HEAD - > feature ) third commit on feature 21cbaf1 second commit on feature 677bc7f first commit on feature 0c2eec3 second commit on main 9b968e8 first commit on mai...

例を使ってSnowflakeストアドプロシージャを学びましょう

Image by Gerd Altmann from Pixabay データベースの操作において、反復的なタスクや複雑なロジックの実行は、時間と労力を要する作業になりがちです。Snowflakeストアドプロシージャは、こうした課題を解決するための強力な機能であり、SQLクエリを拡張して、より効率的かつ安全なデータ処理を実現します。 本稿では、Snowflakeストアドプロシージャの基本的な概念から、JavaScript、Python、そしてSnowflake Scripting (SQL)といった複数のプログラミング言語を使った作成方法、さらにはセキュリティ対策まで、実践的な知識を提供します。 小売業におけるキャンペーン管理を例に、県名に応じてキャンペーン情報と割引率を一括更新するストアドプロシージャを実装します。 ストアドプロシージャと言うのは ストアドプロシージャを関数の一つ種類と考えてもいいです。ストアドプロシージャを記述して、 SQL を実行する手続き型コードでシステムを拡張できます。ストアドプロシージャを作成すると、何度でも再利用できます。 値を明示的に返すことが許可されていますが、必須ではないです。ストアドプロシージャを実行するロールの権限だけではなく、プロシージャを所有するロールの権限でも実行出来ます。 サポートされている言語: Java JavaScript Python Scala Snowflake Scripting (SQL) ストアドプロシージャの形: CREATE OR REPLACE PROCEDURE プロシージャ名(arguments argumentsのタイプ) RETURNS レターんタイプ LANGUAGE 言語 -- (例:python, JavaScript等) -- RUNTIME_VERSION = '3.8' (言語がpython, java, scalaなら必要 ) -- PACKAGES = ('snowflake-snowpark-python') (言語がpython, java, scalaなら必要 ) -- HANDLER = 'run' (言語がpython, java, scalaなら必要 ) EXECUTE AS ...

Snowflake Load History vs Copy History: 7 differences

Image by Icons8_team from Pixabay Tracking data loads in Snowflake is crucial to maintaining data health and performance. Load History and Copy History are features that provide valuable information about past data loads. Understanding these features can help you efficiently troubleshoot, audit, and analyze performance. You might be wondering why two functions exist to achieve the same goal, what are the differences, which one I am supposed to use and when? In this article we will provide you with all the answers. So, let's learn what are the differences and when to use which! Load History vs Copy History: 7 differences Differences 1 and 2: views vs table function and Account Usage vs information Schema Here things get little confusing, bare with me, there are two Load History views, a view that belongs to Information Schema and a view that belongs to Account Usage schema . As for Copy History, there are Copy History table function of Information schema and a Copy H...

WinMerge のセットアップと使う方

WinMerge は、Windows 用のオープン ソースの差分およびマージ ツールです。WinMerge は、フォルダーとファイルの両方を比較し、違いを理解して扱いやすい視覚的なテキスト形式で表示します。この記事でWinMerge のセットアップと使う方を教えます。 source: https://winmerge.org WinMerge をダウンロード WinMerge のウェブサイト に行って、「WinMerge-2.16.44-x64-Setup.exe」ボタンを押し、WinMerge 2.16 をダウンロードしてください。 WinMerge をインストール ダウンロードされたソフトウェアをクリックし、ポップアップ画面で「Next」を押してください 「Languages」部分をスクロールダウンし、「Japanese menus and dialogs」を選択し、「Next」ボタンを押してください ターミナル等からも WinMerge をアクセス出来ようにする為に「Add WinMerge folder to your system path」オプションを選択し、希望によって他のオプション選択してください 「Enable Explorer context menu Integration」オプションを選択したら、フォルダ/ファイルを右キリックし、コンテクストメニューから WinMerge を開くようになります。 「Install」ボタンを押し、「Next」ボタンを押し、その後、「Finish」ボタンを押してください 言語を日本語にする もし WinMerge の言語が日本語じゃなくて、英語なら、「Edit」タブから「Options」を押してください。 ポップアップ画面で右側の下にある「Languages」と言うドロップダウンメニューから日本語を選択し、「OK」ボタンを押してください WinMerge を使う方 「ファイル」タッブから「開く」を押し 参照ボタンを押し、比較したいフォルダ・ファイルを指定 比較したいフォルダを指定する方法: ポップアップ画面から対象のフォルダーを選択し、「Open」を押してくだい 何も選択しないで、「Open」を押してくだい 右側下にある「比較」ボタンを押し ...

From Generic to Genius: Fine-tuning LLMs for Superior Accuracy in Snowflake

TL;DR: Cortex Fine-tuning is a fully managed service that lets you fine-tune popular LLMs using your data, all within Snowflake. While large language models (LLMs) are revolutionizing various fields, their "out-of-the-box" capabilities might not always align perfectly with your specific needs. This is where the power of fine-tuning comes into play. As it will be explained in this article, this feature empowers you to take a base LLM and customize it to excel in your particular domain. Here's the brief summary of why you might want to leverage Snowflake's fine-tuning capabilities: Unlocking Domain Expertise : Pre-trained LLMs are trained on massive, general datasets. Fine-tuning allows you to build upon this foundation and train the LLM further using data specific to your field, such as legal documents, medical records, or financial data. This empowers the LLM to understand complex terminology and patterns unique to your domain, leading to more accurate a...

脱初心者! Git ワークフローを理解して開発効率アップ

Git – チーム開発に必須のバージョン管理システムですが、その真価を発揮するにはワークフローの理解が欠かせません。 色々な人は Git の使い方を良く知っていますが、Git を仕事やワークフローに統合する方法を余り良く知らない人もいます。本記事では、Git をワークフローに組み込むことで、開発プロセスがどのように効率化され、チーム全体のパフォーマンスが向上するのかを解説します。Centralized Workflow から Forking Workflow まで、代表的な 9 つのワークフローの特徴を分かりやすく紹介します。それぞれのメリット・デメリット、そして最適なユースケースを理解することで、あなたのプロジェクトに最適なワークフローを選択し、開発をスムーズに進めましょう! Centralized Workflow Feature branching/GitHub Flow Trunk Based Flow Git Feature Flow Git Flow Enhanced Git Flow One Flow GitLab Flow Forking Workflow 分かりやすくするために、同じコンセプトを説明するに一つ以上の図を使った場合があります。 Centralized Workflow 説明: 集中化ワークフローではプロジェクトにおけるすべての変更の単一の入力箇所として中央リポジトリを使用します。デフォルトの開発用ブランチは main と呼ばれ、すべての変更がこのブランチにコミットされます。 集中化ワークフローでは main 以外のブランチは不要です。チームメンバー全員がひとつのブランチで作業し、変更を直接中央リポジトリにプッシュします。 メリット: SVN のような集中型バージョン管理システムから移行する小規模チームに最適。 デメリット: お互いのコードが邪魔になり (お互いの変更を上書きするように)、プロダクション環境にバグをい入れる可能性が高くて、複数のメンバいるチームでこのフローを使いにくい。 地図: graph TD; A[Central Repository] -->|Clone| B1[Developer A's Local Repo] A --...

Streamline Your Workflow: Send Snowflake Alerts to Slack

Do you know integrating Snowflake and Slack can make your life as a data engineer much easier?  Here's why: Real-time error catching and debugging : Instead of constantly checking logs for errors, you can set up Snowflake to automatically ping you in a Slack channel when something goes wrong. This is like having a dedicated assistant who watches for errors in your code and immediately lets you know so you can fix them faster. This is achieved through the use of webhooks, which are essentially automated HTTP requests that Snowflake sends to Slack when triggered by an event. Keep everyone in the loop : Slack integration also means you can keep your entire team informed about the status of data pipelines and other processes. You can configure Snowflake to send notifications to a shared channel whenever a pipeline completes, fails, or encounters an issue. This keeps everyone on the same page and avoids unnecessary status update meetings. This integration turns Slack ...

SQLFluff入門:SQLコードをクリーンかつエラーフリーに

コードを書いた後に実行したとき、エラーが発生するとイライラします。さらに厄介なのは、そのエラーの原因が分からないときです。また、複数のメンバーがいる大規模なプロジェクトでは、メンバーごとにコードの書き方が異なる傾向があり、その結果、コードレビューが難しくなり、ソースコードに不整合が生じます。コードを実行する前にエラーを検出できた方が良いと思いませんか?さらに、チームメンバー全員が同じフォーマットでコードを書けば、もっと効率的になるでしょう。 SQLFluffと言うツールがこの全ての事を実現させます。 SQLFluffは何でしょう? SQLFluffは、SQLファイル用の最も人気のあるリンターです。構文エラーを検出すると、そのエラーが発生した行番号や位置、エラーの原因が表示されます。SQLFluffはエラーの検出だけでなく、SQLコードのフォーマットや構文エラーの修正も可能です。PostgreSQL、MySQL、Google BigQuery、Snowflakeなど、複数の SQL 言語 をサポートしています。つまり、SQLコードを実行する前に構文エラーを検出・修正できるので、非常に役立ち、重要な作業に集中することができます。また、SQLFluffは非常に設定が簡単で、コンマの位置、文字の大文字小文字、インデントなどのルールを簡単に設定できます。 エンジニアは自分のパソコンにSQLFluffをインストールし、SQLFluffを利用してコードのエラーを検出・修正した後にGitにコミットし、GitLabやGitHubなどにプッシュすることをお勧めします。 全てのドキュメントはこちらにあります : Docs 。 インストール SQLFluff は以下のようにインストールできます VSCode エクステンション プリコミットフック コマンドラインツール CI/CDパイプラインツール SQLFluffをコマンドラインツールとして設定し、実行してくださいのが一番簡単です。また、この記事でプレコミットフックとしての使い方も説明します。 SQLFluffをコマンドラインツールとしてインストール 注意点: SQLFluffをインストールするにはPythonとpip (またはpoetryやpipenvなどのパッケージマネージャ)が必要です。この...