This tutorial has four parts in total:
We will continue from where we left off in the part three of this tutorial.
It is time to give our stored procedure the freedom of accessing the internet. By default Snowflake does not allow stored procedures to access the internet. We must enable it by creating external access integration and attaching it to our stored procedure.
External Access Integration
External access integration is a Snowflake object. It gives stored procedures and user defined functions granular access to the internet. External access integration in itself is a container that contains network rules and secrets.
Network rules and secrets are building blocks for external access control. The ability to combine network rules (and secrets) makes external access control flexible and granular.
Network Rule
A network rule is a Snowflake object. It represents the external network’s location and restrictions for access. We can specify which domains, ip address or ip address ranges are allowed/disallowed, and whether the communication is ingress (coming from the external source) or egress (going to external source).
Let’s create our network rule:
code_sample_10.sql
CREATE NETWORK RULE external_access_network_rule_for_backlog
TYPE = HOST_PORT
MODE = EGRESS
VALUE_LIST = ('*.backlog.com:0')
COMMENT = 'Network rule for Backlog REST API endpoint';
In the domain we can use wildcard, *, for a subdomain. For example, instead of myorg.backlog.com, we can define *.backlog.com.
Domains can also include a port or range of ports. If we do not specify a port, it defaults to 443. To allow access to all ports, we can define the port as 0, for example: my-website.com:0.
Secret
Instead of hard-coding api keys, we can use Snowflake’s secrets feature. It is similar to GitHub Secrets or AWS secrets if you used either of them. Once a secret is created, we can access it from our stored procedure.
Snowflake requires us to attach the secret to the external access integration object if the external network, the network rule specifies, needs a password or api key for authentication.
code_sample_10.sql
CREATE OR REPLACE SECRET backlog_api_key
TYPE = GENERIC_STRING
SECRET_STRING = 'backlog_api_key_xyz';
Creating External Network Access
We have created network rule, and a secret; all the necessary components for external access integration. Finally, we can create an external access integration.
code_sample_10.sql
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION backlog_api_access_integration
ALLOWED_NETWORK_RULES = (external_access_network_rule_for_backlog)
ALLOWED_AUTHENTICATION_SECRETS = (backlog_api_key)
ENABLED = true;
Fetching Data From Backlog
We have created external access integration. Now, we can finally access the internet. Let’s try it out.
code_sample_11.sql
Create or replace procedure fetch_data_from_backlog()
returns varchar
language python
runtime_version = 3.13
secrets = ('api_key' = backlog_api_key)
external_access_integrations = (backlog_api_access_integration)
packages = ('snowflake-snowpark-python','requests')
handler = 'main'
as
$$
import requests
from _snowflake import get_generic_secret_string
def main(session):
backlog_api_key = get_generic_secret_string('api_key')
endpoint = "https://<my-org>.backlog.com/api/v2/space/activities"
with requests.Session() as s:
response = s.get(url=endpoint, params = {"apiKey": backlog_api_key, "count": 2})
if response.status_code != 200:
return response.text
return response.json()
$$;
call fetch_data_from_backlog();
-- drop procedure fetch_data_from_backlog();
It is possible to attach more than one secret and external access integration to a stored procedure.
Inserting Fetched Data
Now, let’s insert backlog data into our table.
code_sample_12.sql
CREATE OR REPLACE PROCEDURE backup_backlog_data(table_name VARCHAR)
RETURNS STRING NOT NULL
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core', 'requests')
HANDLER = 'main' -- entry point to our handler
EXTERNAL_ACCESS_INTEGRATIONS= (backlog_api_access_integration)
SECRETS = ('backlog_key'= backlog_api_key)
AS
$$
from dataclasses import dataclass, asdict
from types import NoneType, UnionType
from typing import (
Annotated,
Any,
Literal,
NoReturn,
TypedDict,
Union,
get_args,
get_origin,
get_type_hints,
)
from snowflake.snowpark.types import StructType
from _snowflake import get_generic_secret_string
# insert data into table
@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
activity_id: int
type_id: int
activity_type: str
project_id: int
project_key: str
project_name: str
content: dict[str, Any]
creator_id: int
creator_name: str
creator_email_address: str | None
creator_nulab_account_id: str
creator_nulab_unique_id: str
created_at: Annotated[str, "timestamp_ntz"]
# ----------- fetch backlog data --------------
@dataclass(frozen=True)
class BacklogApi:
key = get_generic_secret_string("backlog_key")
subdomain = "subdomain"
base_url = f"https://{subdomain}.backlog.com/api/v2"
recent_activities: str = f"{base_url}/space/activities"
issues: str = f"{base_url}/issues"
def parse_activity_type(*, activity_type_id: int) -> str:
activity = {
1: "Issue Created",
2: "Issue Updated",
3: "Issue Commented",
4: "Issue Deleted",
5: "Wiki Created",
6: "Wiki Updated",
7: "Wiki Deleted",
8: "File Added",
9: "File Updated",
10: "File Deleted",
11: "SVN Committed",
12: "Git Pushed",
13: "Git Repository Created",
14: "Issue Multi Updated",
15: "Project User Added",
16: "Project User Deleted",
17: "Comment Notification Added",
18: "Pull Request Added",
19: "Pull Request Updated",
20: "Comment Added on Pull Request",
21: "Pull Request Deleted",
22: "Milestone Created",
23: "Milestone Updated",
24: "Milestone Deleted",
25: "Project Group Added",
26: "Project Group Deleted",
}
return activity.get(activity_type_id, "Unknown activity_type_id")
def fetch_backlog_data() -> list[BacklogActivity]:
endpoints = BacklogApi()
with requests.Session() as session:
url_parameters: dict[str, str | int] = {
"apiKey": endpoints.key,
"count": 10,
"order": "desc",
}
response = session.get(
url=endpoints.recent_activities, params=url_parameters, verify=True
)
if response.status_code != 200:
raise ValueError(
f"wrong response: {response.status_code=}\n{response.text=}"
)
backlog_data: list[BacklogActivity] = []
for data in response.json():
activity_creator = data.get("createdUser")
backlog_activity = BacklogActivity(
activity_id=data["id"],
activity_type=parse_activity_type(activity_type_id=data["type"]),
activity_type_id=data["type"],
project_id=data["project"]["id"],
project_key=data["project"]["projectKey"],
project_name=data["project"]["name"],
content=data["content"],
created_at=data["created"],
creator_id=activity_creator["id"],
creator_name=activity_creator["name"],
creator_email_address=activity_creator["mailAddress"],
creator_nulab_account_id=activity_creator["nulabAccount"]["nulabId"],
creator_nulab_unique_id=activity_creator["nulabAccount"]["uniqueId"],
)
backlog_data.append(backlog_activity)
return backlog_data
# ---- Create table definition from dataclass metadata -------
class ColumnDefinition(TypedDict):
name: str
type: str
nullable: bool
class ClassParser:
"""
Parses the class attributes that use type hinting and generates column definitions compatible with Snowflake Snowpark API.
If class attribute does not use type hinting, it is omitted from generated column definitions.
"""
def __init__(self):
pass
@property
def snowflake_supported_types(self):
return {
"string",
"integer",
"float",
"decimal",
"double",
"short",
"long",
"boolean",
"variant",
"timestamp",
"timestamp_tz",
"timestamp_ltz",
"timestamp_ntz",
# There are more supported types
}
@snowflake_supported_types.setter
def snowflake_supported_types(self, value: Any) -> NoReturn:
raise ValueError("Assigning value is not allowed")
def parse_class(self, _class: object) -> dict[str, list[ColumnDefinition]]:
"""
takes a class, not a class instance and returns a dictionary of column definition inferred from class attributes.
class fields cannot contain more than two type hints.
If a field contains two type hints, one of the types must be NoneType.
To specify Snowflake specific field such as TIMESTAMP_NTZ, use Annotated type hints
e.g.
/```python
class A:
field1: Annotated[str, "timestamp_ntz"]
/```
"""
columns: list[ColumnDefinition] = []
for fieldname, type_hint in get_type_hints(_class, include_extras=True).items():
result = self._parse_type_hint(type_hint)
datatype, nullable = result
columns.append(
ColumnDefinition(name=fieldname, type=datatype, nullable=nullable)
)
return {"fields": columns}
def _parse_type_hint(self, type_hint: object | UnionType) -> tuple[str, bool]:
"""
returns a tuple of (datatype, nullable): tuple[str, bool]
"""
datatype = ""
nullable = False
if res := self._parse_annotated_type(type_hint):
if res["snowflake_type"]:
datatype = res["snowflake_type"]
nullable = True if self._parse_union_type(res["origin_type"]) else False
return (datatype, nullable)
type_hint = res["origin_type"]
# This supports generic types, Callable, Tuple, Union, Literal, Final, ClassVar,
# Annotated, and others. Return None for unsupported types.
# get_origin never returns Optional, instead it returns Union
complex_type = get_origin(type_hint)
if res := self._parse_union_type(type_hint):
main_type, nullable = res[0], res[1]
type_hint = main_type # main type is any type hint except for None
complex_type = get_origin(main_type)
if complex_type is Literal:
atomic_type = get_args(type_hint)[0]
datatype = self._translate_atomic_type(atomic_type)
if complex_type in [dict, list, TypedDict, tuple]:
datatype = "variant"
if not complex_type:
datatype = self._translate_atomic_type(type_hint)
return (datatype, nullable)
def _parse_annotated_type(self, type_hint: Any) -> dict[str, Any] | None:
"""
returns dict of snowflake_type:str and origin_type: Any
returns None if given type_hint is not Annotated
If Annotated does not contain Snowflake data types,
it returns `{"snowflake_type": "", "original_type": origin_type}`
"""
if get_origin(type_hint) is not Annotated:
return
metadata = [
metadata.strip().lower()
for metadata in type_hint.__metadata__
if isinstance(metadata, str)
]
snow_types = self.snowflake_supported_types.intersection(metadata)
if len(snow_types) > 1:
raise ValueError(
"Annotated cannot contain more than one snowflake data type"
)
snowflake_type = snow_types.pop() if snow_types else ""
origin_type = type_hint.__origin__
return {"snowflake_type": snowflake_type, "origin_type": origin_type}
def _parse_union_type(self, type_hint: Any) -> tuple[object, bool] | None:
"""
returns None if given type_hint is not UnionType
UnionType must contain only two types and one of these types must be NoneType
"""
types = get_args(type_hint)
nullable = True
if get_origin(type_hint) not in [Union, UnionType] or len(types) < 2:
return
if len(types) > 2:
raise ValueError("Union type cannot contain more than two types")
if NoneType not in types:
raise ValueError("Union type must include NoneType")
main_type = types[0] if types[0] is not NoneType else types[1]
return (main_type, nullable)
def _translate_atomic_type(self, atomic_type: object) -> str:
snowflake_type = ""
if atomic_type is str or isinstance(atomic_type, str):
snowflake_type = "string"
if atomic_type is int or isinstance(atomic_type, int):
snowflake_type = "integer"
if atomic_type is float or isinstance(atomic_type, float):
snowflake_type = "float"
if atomic_type is bool or isinstance(atomic_type, bool):
snowflake_type = "boolean"
if not snowflake_type:
raise RuntimeError(
f"{atomic_type=} is not defined in this method thus can't be translated"
)
return snowflake_type
def create_table_definition(_class: object) -> StructType:
parser = ClassParser()
column_definitions = parser.parse_class(_class)
table_definition: StructType = StructType.from_json(column_definitions)
return table_definition
def create_or_replace_table(session, table_name: str, table_definition: StructType):
dataframe = session.create_dataframe([], schema=table_definition)
if session.catalog.tableExists(table_name):
dataframe.write.mode("overwrite").save_as_table(table_name)
return
# creates table if the table does not already exists.
# otherwise, simple ignores this operation
dataframe.write.mode("ignore").save_as_table(table_name)
def create_table(session, table_name: str, table_definition: StructType):
dataframe = session.create_dataframe([], schema=table_definition)
# Throws an exception if the table already exists.
dataframe.write.mode("errorifexists").save_as_table(table_name)
def insert_data(
session, table_name: str, table_definition: StructType, data: list[list[Any]]
):
dataframe = session.create_dataframe(data, schema=table_definition)
dataframe.write.mode("append").save_as_table(table_name)
def main(snowflake_session, table_name: str):
session = snowflake_session
table_definition = create_table_definition(BacklogActivity)
if not session.catalog.tableExists(table_name):
create_table(session, table_name, table_definition)
backlog_data = fetch_backlog_data()
values = [list(asdict(data).values()) for data in backlog_data]
insert_data(session, table_name, table_definition, values)
return f"data has been inserted into {table_name}"
$$;
call backup_backlog_data('backlog_data');
-- select * from backlog_data;
Fetching Fresh Data
Now we can fetch data from backlog and insert it into our table. Based on how frequently we run the stored procedure, there is a possibility of fetching already recorded data. To avoid that, we should get the latest recorded activity_id from the table, and provide it as min_id parameter to the Backlog API.
Getting the last activity_id record from Snowflake table:
code_sample_13.sql
CREATE OR REPLACE PROCEDURE get_latest_record(table_name VARCHAR, column_name VARCHAR)
RETURNS INTEGER
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core')
HANDLER = 'main' -- entry point to our handler
AS
$$
from typing import Any
import snowflake.snowpark.functions as f
def main(session, table_name: str, column_name: str) -> int | None:
column_name = column_name.strip().upper()
dframe_backlog_data = session.table(table_name)
if column_name not in dframe_backlog_data.columns:
raise ValueError(
f"{column_name} is not defined in the given table {table_name}"
)
column_alias = f"LAST_{column_name}"
result: list[Any] = dframe_backlog_data.agg(
f.max(column_name).alias(column_alias)
).collect()
# returns none if there are no records
return result[0][column_alias]
$$;
call get_latest_record('backlog_data', 'activity_id');
-- drop procedure get_latest_record(varchar, varchar);
session.table(name=table_name) returns dataframe that points to underlying Snowflake table.
snowpark.functions module provides SQL functions like aggregation, sorting and other utilities that generate Column expressions you can pass to DataFrame transformation methods.
Putting Everything Together
Now we can put everything together.
code_sample_14.sql
CREATE OR REPLACE PROCEDURE backup_backlog_data(table_name VARCHAR)
RETURNS STRING NOT NULL
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core', 'requests')
HANDLER = 'main' -- entry point to our handler
EXTERNAL_ACCESS_INTEGRATIONS= (backlog_api_access_integration)
SECRETS = ('backlog_key'= backlog_api_key)
AS
$$
from dataclasses import dataclass, asdict
from types import NoneType, UnionType
from typing import (
Annotated,
Any,
Literal,
NoReturn,
TypedDict,
Union,
get_args,
get_origin,
get_type_hints,
)
import requests
from _snowflake import get_generic_secret_string
from snowflake.snowpark.types import StructType
import snowflake.snowpark.functions as f
# insert data into table
@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
activity_id: int
activity_type_id: int
activity_type: str
project_id: int
project_key: str
project_name: str
content: dict[str, Any]
creator_id: int
creator_name: str
creator_email_address: str | None
creator_nulab_account_id: str
creator_nulab_unique_id: str
created_at: Annotated[str, "timestamp_ntz"]
# ----------- fetch backlog data --------------
@dataclass(frozen=True)
class BacklogApi:
key = get_generic_secret_string("backlog_key")
subdomain = "subdomain"
base_url = f"https://my-org.backlog.com/api/v2"
recent_activities: str = f"{base_url}/space/activities"
issues: str = f"{base_url}/issues"
def parse_activity_type(*, activity_type_id: int) -> str:
activity = {
1: "Issue Created",
2: "Issue Updated",
3: "Issue Commented",
4: "Issue Deleted",
5: "Wiki Created",
6: "Wiki Updated",
7: "Wiki Deleted",
8: "File Added",
9: "File Updated",
10: "File Deleted",
11: "SVN Committed",
12: "Git Pushed",
13: "Git Repository Created",
14: "Issue Multi Updated",
15: "Project User Added",
16: "Project User Deleted",
17: "Comment Notification Added",
18: "Pull Request Added",
19: "Pull Request Updated",
20: "Comment Added on Pull Request",
21: "Pull Request Deleted",
22: "Milestone Created",
23: "Milestone Updated",
24: "Milestone Deleted",
25: "Project Group Added",
26: "Project Group Deleted",
}
return activity.get(activity_type_id, "Unknown activity_type_id")
def fetch_backlog_data(last_activity_id: int) -> list[BacklogActivity]:
endpoints = BacklogApi()
with requests.Session() as session:
url_parameters: dict[str, str | int] = {
"apiKey": endpoints.key,
"count": 100,
"minId": last_activity_id,
"order": "desc"
}
response = session.get(
url=endpoints.recent_activities, params=url_parameters, verify=True
)
if response.status_code != 200:
raise ValueError(
f"wrong response: {response.status_code=}\n{response.text=}"
)
backlog_data: list[BacklogActivity] = []
for data in response.json():
activity_creator = data.get("createdUser")
backlog_activity = BacklogActivity(
activity_id=data["id"],
activity_type_id=data["type"],
activity_type=parse_activity_type(activity_type_id=data["type"]),
project_id=data["project"]["id"],
project_key=data["project"]["projectKey"],
project_name=data["project"]["name"],
content=data["content"],
created_at=data["created"],
creator_id=activity_creator["id"],
creator_name=activity_creator["name"],
creator_email_address=activity_creator["mailAddress"],
creator_nulab_account_id=activity_creator["nulabAccount"]["nulabId"],
creator_nulab_unique_id=activity_creator["nulabAccount"]["uniqueId"],
)
backlog_data.append(backlog_activity)
return backlog_data
# ---- Create table definition from dataclass metadata -------
class ColumnDefinition(TypedDict):
name: str
type: str
nullable: bool
class ClassParser:
"""
Parses the class attributes that use type hinting and generates column definitions compatible with Snowflake Snowpark API.
If class attribute does not use type hinting, it is omitted from generated column definitions.
"""
def __init__(self):
pass
@property
def snowflake_supported_types(self):
return {
"string",
"integer",
"float",
"decimal",
"double",
"short",
"long",
"boolean",
"variant",
"timestamp",
"timestamp_tz",
"timestamp_ltz",
"timestamp_ntz",
# There are more supported types
}
@snowflake_supported_types.setter
def snowflake_supported_types(self, value: Any) -> NoReturn:
raise ValueError("Assigning value is not allowed")
def parse_class(self, _class: object) -> dict[str, list[ColumnDefinition]]:
"""
takes a class, not a class instance and returns a dictionary of column definition inferred from class attributes.
class fields cannot contain more than two type hints.
If a field contains two type hints, one of the types must be NoneType.
To specify Snowflake specific field such as TIMESTAMP_NTZ, use Annotated type hints
e.g.
/```python
class A:
field1: Annotated[str, "timestamp_ntz"]
/```
"""
columns: list[ColumnDefinition] = []
for fieldname, type_hint in get_type_hints(_class, include_extras=True).items():
result = self._parse_type_hint(type_hint)
datatype, nullable = result
columns.append(
ColumnDefinition(name=fieldname, type=datatype, nullable=nullable)
)
return {"fields": columns}
def _parse_type_hint(self, type_hint: object | UnionType) -> tuple[str, bool]:
"""
returns a tuple of (datatype, nullable): tuple[str, bool]
"""
datatype = ""
nullable = False
if res := self._parse_annotated_type(type_hint):
if res["snowflake_type"]:
datatype = res["snowflake_type"]
nullable = True if self._parse_union_type(res["origin_type"]) else False
return (datatype, nullable)
type_hint = res["origin_type"]
# This supports generic types, Callable, Tuple, Union, Literal, Final, ClassVar,
# Annotated, and others. Return None for unsupported types.
# get_origin never returns Optional, instead it returns Union
complex_type = get_origin(type_hint)
if res := self._parse_union_type(type_hint):
main_type, nullable = res[0], res[1]
type_hint = main_type # main type is any type hint except for None
complex_type = get_origin(main_type)
if complex_type is Literal:
atomic_type = get_args(type_hint)[0]
datatype = self._translate_atomic_type(atomic_type)
if complex_type in [dict, list, TypedDict, tuple]:
datatype = "variant"
if not complex_type:
datatype = self._translate_atomic_type(type_hint)
return (datatype, nullable)
def _parse_annotated_type(self, type_hint: Any) -> dict[str, Any] | None:
"""
returns dict of snowflake_type:str and origin_type: Any
returns None if given type_hint is not Annotated
If Annotated does not contain Snowflake data types,
it returns `{"snowflake_type": "", "original_type": origin_type}`
"""
if get_origin(type_hint) is not Annotated:
return
metadata = [
metadata.strip().lower()
for metadata in type_hint.__metadata__
if isinstance(metadata, str)
]
snow_types = self.snowflake_supported_types.intersection(metadata)
if len(snow_types) > 1:
raise ValueError(
"Annotated cannot contain more than one snowflake data type"
)
snowflake_type = snow_types.pop() if snow_types else ""
origin_type = type_hint.__origin__
return {"snowflake_type": snowflake_type, "origin_type": origin_type}
def _parse_union_type(self, type_hint: Any) -> tuple[object, bool] | None:
"""
returns None if given type_hint is not UnionType
UnionType must contain only two types and one of these types must be NoneType
"""
types = get_args(type_hint)
nullable = True
if get_origin(type_hint) not in [Union, UnionType] or len(types) < 2:
return
if len(types) > 2:
raise ValueError("Union type cannot contain more than two types")
if NoneType not in types:
raise ValueError("Union type must include NoneType")
main_type = types[0] if types[0] is not NoneType else types[1]
return (main_type, nullable)
def _translate_atomic_type(self, atomic_type: object) -> str:
snowflake_type = ""
if atomic_type is str or isinstance(atomic_type, str):
snowflake_type = "string"
if atomic_type is int or isinstance(atomic_type, int):
snowflake_type = "integer"
if atomic_type is float or isinstance(atomic_type, float):
snowflake_type = "float"
if atomic_type is bool or isinstance(atomic_type, bool):
snowflake_type = "boolean"
if not snowflake_type:
raise RuntimeError(
f"{atomic_type=} is not defined in this method thus can't be translated"
)
return snowflake_type
def create_table_definition(_class: object) -> StructType:
parser = ClassParser()
column_definitions = parser.parse_class(_class)
table_definition: StructType = StructType.from_json(column_definitions)
return table_definition
def create_or_replace_table(session, table_name: str, table_definition: StructType):
dataframe = session.create_dataframe([], schema=table_definition)
if session.catalog.tableExists(table_name):
dataframe.write.mode("overwrite").save_as_table(table_name)
return
# creates table if the table does not already exists.
# otherwise, simple ignores this operation
dataframe.write.mode("ignore").save_as_table(table_name)
def create_table(session, table_name: str, table_definition: StructType):
dataframe = session.create_dataframe([], schema=table_definition)
# Throws an exception if the table already exists.
dataframe.write.mode("errorifexists").save_as_table(table_name)
def insert_data(
session, table_name: str, table_definition: StructType, data: list[list[Any]]
):
dataframe = session.create_dataframe(data, schema=table_definition)
dataframe.write.mode("append").save_as_table(table_name)
def get_last_recorded_value(session, table_name: str, column_name: str) -> int | None:
column_name = column_name.strip().upper()
dframe_backlog_data = session.table(table_name)
if column_name not in dframe_backlog_data.columns:
raise ValueError(
f"{column_name} is not defined in the given table {table_name}"
)
column_alias = f"LAST_{column_name}"
result: list[Any] = dframe_backlog_data.agg(
f.max(column_name).alias(column_alias)
).collect()
# returns none if there are not records
return result[0][column_alias]
def main(snowflake_session, table_name: str):
session = snowflake_session
table_definition = create_table_definition(BacklogActivity)
if not session.catalog.tableExists(table_name):
create_table(session, table_name, table_definition)
last_activity_id = get_last_recorded_value(session, table_name, "activity_id") or 0
count = 0
api_requests = 0
while backlog_data := fetch_backlog_data(last_activity_id):
values = [list(asdict(data).values()) for data in backlog_data]
insert_data(session, table_name, table_definition, values)
last_activity_id = backlog_data[0].activity_id
count += len(values)
api_requests += 1
return f"{count} rows of data have been inserted into {table_name} in {api_requests} api_requests"
$$;
call backup_backlog_data('backlog_data');
select * from backlog_data;
If you think that our stored procedure became too long, feel free to divide it up into multiple stored procedures, user defined functions, or even models (stored procedure can use script files stored in stages).
If you split it into multiple stored procedures, you can call another stored procedure withing a stored procedure using session.call() function.
Scheduling Stored Procedure Execution
Our stored procedure is complete. Now, we can run it at anytime we want. But scheduling its execution is a much better idea. We can do so using Snowflake Tasks.
Tasks definition:
Tasks can run at scheduled times or can be triggered by events, such as when new data arrives in a stream. Tasks can run SQL commands and stored procedures.
When we create a task, it will have “suspended” status by default. What it means is, the task is defined and ready to run but it won’t run until we specify so. In other words, the task is waiting for us to press the start button. We start the task by putting it into the “resume” status.
Creating and starting a task:
code_sample_15.sql
CREATE TASK backup_backlog_data
SCHEDULE='60 MINUTES'
SERVERLESS_TASK_MAX_STATEMENT_SIZE='LARGE'
SUSPEND_TASK_AFTER_NUM_FAILURES = 1
AS
call backup_backlog_data('backlog_data');
ALTER TASK backup_backlog_data resume; -- starts the task
Complete Solution
complete_solution.sql
CREATE OR REPLACE SECRET backlog_api_key
TYPE = GENERIC_STRING
SECRET_STRING = 'backlog_api_key_xyz';
GRANT READ ON SECRET backlog_api_key TO ROLE <my_role>;
-- -------------------------------------------
USE ROLE SYSADMIN;
CREATE NETWORK RULE external_access_network_rule_for_backlog
TYPE = HOST_PORT
MODE = EGRESS
VALUE_LIST = ('*.backlog.com:0')
COMMENT = 'Network rule for Backlog REST API endpoint';
-- -------------------------------------------
USE ROLE ACCOUNTADMIN;
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION backlog_api_access_integration
ALLOWED_NETWORK_RULES = (external_access_network_rule_for_backlog)
ALLOWED_AUTHENTICATION_SECRETS = (backlog_api_key)
ENABLED = true;
GRANT USAGE ON INTEGRATION backlog_api_access_integration TO ROLE <my_role>;
-------------------------------------------
CREATE OR REPLACE PROCEDURE backup_backlog_data(table_name VARCHAR)
RETURNS STRING NOT NULL
LANGUAGE PYTHON
RUNTIME_VERSION = '3.13'
PACKAGES = ('snowflake-snowpark-python', 'snowflake.core', 'requests')
HANDLER = 'main' -- entry point to our handler
EXTERNAL_ACCESS_INTEGRATIONS= (backlog_api_access_integration)
SECRETS = ('backlog_key'= backlog_api_key)
AS
$$
from dataclasses import dataclass, asdict
from types import NoneType, UnionType
from typing import (
Annotated,
Any,
Literal,
NoReturn,
TypedDict,
Union,
get_args,
get_origin,
get_type_hints,
)
import requests
from _snowflake import get_generic_secret_string
from snowflake.snowpark.types import StructType
import snowflake.snowpark.functions as f
# insert data into table
@dataclass(frozen=True, kw_only=True)
class BacklogActivity:
activity_id: int
activity_type_id: int
activity_type: str
project_id: int
project_key: str
project_name: str
content: dict[str, Any]
creator_id: int
creator_name: str
creator_email_address: str | None
creator_nulab_account_id: str
creator_nulab_unique_id: str
created_at: Annotated[str, "timestamp_ntz"]
# ----------- fetch backlog data --------------
@dataclass(frozen=True)
class BacklogApi:
key = get_generic_secret_string("backlog_key")
subdomain = "subdomain"
base_url = f"https://my-org.backlog.com/api/v2"
recent_activities: str = f"{base_url}/space/activities"
issues: str = f"{base_url}/issues"
def parse_activity_type(*, activity_type_id: int) -> str:
activity = {
1: "Issue Created",
2: "Issue Updated",
3: "Issue Commented",
4: "Issue Deleted",
5: "Wiki Created",
6: "Wiki Updated",
7: "Wiki Deleted",
8: "File Added",
9: "File Updated",
10: "File Deleted",
11: "SVN Committed",
12: "Git Pushed",
13: "Git Repository Created",
14: "Issue Multi Updated",
15: "Project User Added",
16: "Project User Deleted",
17: "Comment Notification Added",
18: "Pull Request Added",
19: "Pull Request Updated",
20: "Comment Added on Pull Request",
21: "Pull Request Deleted",
22: "Milestone Created",
23: "Milestone Updated",
24: "Milestone Deleted",
25: "Project Group Added",
26: "Project Group Deleted",
}
return activity.get(activity_type_id, "Unknown activity_type_id")
def fetch_backlog_data(last_activity_id: int) -> list[BacklogActivity]:
endpoints = BacklogApi()
with requests.Session() as session:
url_parameters: dict[str, str | int] = {
"apiKey": endpoints.key,
"count": 100,
"minId": last_activity_id,
"order": "desc"
}
response = session.get(
url=endpoints.recent_activities, params=url_parameters, verify=True
)
if response.status_code != 200:
raise ValueError(
f"wrong response: {response.status_code=}\n{response.text=}"
)
backlog_data: list[BacklogActivity] = []
for data in response.json():
activity_creator = data.get("createdUser")
backlog_activity = BacklogActivity(
activity_id=data["id"],
activity_type_id=data["type"],
activity_type=parse_activity_type(activity_type_id=data["type"]),
project_id=data["project"]["id"],
project_key=data["project"]["projectKey"],
project_name=data["project"]["name"],
content=data["content"],
created_at=data["created"],
creator_id=activity_creator["id"],
creator_name=activity_creator["name"],
creator_email_address=activity_creator["mailAddress"],
creator_nulab_account_id=activity_creator["nulabAccount"]["nulabId"],
creator_nulab_unique_id=activity_creator["nulabAccount"]["uniqueId"],
)
backlog_data.append(backlog_activity)
return backlog_data
# ---- Create table definition from dataclass metadata -------
class ColumnDefinition(TypedDict):
name: str
type: str
nullable: bool
class ClassParser:
"""
Parses the class attributes that use type hinting and generates column definitions compatible with Snowflake Snowpark API.
If class attribute does not use type hinting, it is omitted from generated column definitions.
"""
def __init__(self):
pass
@property
def snowflake_supported_types(self):
return {
"string",
"integer",
"float",
"decimal",
"double",
"short",
"long",
"boolean",
"variant",
"timestamp",
"timestamp_tz",
"timestamp_ltz",
"timestamp_ntz",
# There are more supported types
}
@snowflake_supported_types.setter
def snowflake_supported_types(self, value: Any) -> NoReturn:
raise ValueError("Assigning value is not allowed")
def parse_class(self, _class: object) -> dict[str, list[ColumnDefinition]]:
"""
takes a class, not a class instance and returns a dictionary of column definition inferred from class attributes.
class fields cannot contain more than two type hints.
If a field contains two type hints, one of the types must be NoneType.
To specify Snowflake specific field such as TIMESTAMP_NTZ, use Annotated type hints
e.g.
/```python
class A:
field1: Annotated[str, "timestamp_ntz"]
/```
"""
columns: list[ColumnDefinition] = []
for fieldname, type_hint in get_type_hints(_class, include_extras=True).items():
result = self._parse_type_hint(type_hint)
datatype, nullable = result
columns.append(
ColumnDefinition(name=fieldname, type=datatype, nullable=nullable)
)
return {"fields": columns}
def _parse_type_hint(self, type_hint: object | UnionType) -> tuple[str, bool]:
"""
returns a tuple of (datatype, nullable): tuple[str, bool]
"""
datatype = ""
nullable = False
if res := self._parse_annotated_type(type_hint):
if res["snowflake_type"]:
datatype = res["snowflake_type"]
nullable = True if self._parse_union_type(res["origin_type"]) else False
return (datatype, nullable)
type_hint = res["origin_type"]
# This supports generic types, Callable, Tuple, Union, Literal, Final, ClassVar,
# Annotated, and others. Return None for unsupported types.
# get_origin never returns Optional, instead it returns Union
complex_type = get_origin(type_hint)
if res := self._parse_union_type(type_hint):
main_type, nullable = res[0], res[1]
type_hint = main_type # main type is any type hint except for None
complex_type = get_origin(main_type)
if complex_type is Literal:
atomic_type = get_args(type_hint)[0]
datatype = self._translate_atomic_type(atomic_type)
if complex_type in [dict, list, TypedDict, tuple]:
datatype = "variant"
if not complex_type:
datatype = self._translate_atomic_type(type_hint)
return (datatype, nullable)
def _parse_annotated_type(self, type_hint: Any) -> dict[str, Any] | None:
"""
returns dict of snowflake_type:str and origin_type: Any
returns None if given type_hint is not Annotated
If Annotated does not contain Snowflake data types,
it returns `{"snowflake_type": "", "original_type": origin_type}`
"""
if get_origin(type_hint) is not Annotated:
return
metadata = [
metadata.strip().lower()
for metadata in type_hint.__metadata__
if isinstance(metadata, str)
]
snow_types = self.snowflake_supported_types.intersection(metadata)
if len(snow_types) > 1:
raise ValueError(
"Annotated cannot contain more than one snowflake data type"
)
snowflake_type = snow_types.pop() if snow_types else ""
origin_type = type_hint.__origin__
return {"snowflake_type": snowflake_type, "origin_type": origin_type}
def _parse_union_type(self, type_hint: Any) -> tuple[object, bool] | None:
"""
returns None if given type_hint is not UnionType
UnionType must contain only two types and one of these types must be NoneType
"""
types = get_args(type_hint)
nullable = True
if get_origin(type_hint) not in [Union, UnionType] or len(types) < 2:
return
if len(types) > 2:
raise ValueError("Union type cannot contain more than two types")
if NoneType not in types:
raise ValueError("Union type must include NoneType")
main_type = types[0] if types[0] is not NoneType else types[1]
return (main_type, nullable)
def _translate_atomic_type(self, atomic_type: object) -> str:
snowflake_type = ""
if atomic_type is str or isinstance(atomic_type, str):
snowflake_type = "string"
if atomic_type is int or isinstance(atomic_type, int):
snowflake_type = "integer"
if atomic_type is float or isinstance(atomic_type, float):
snowflake_type = "float"
if atomic_type is bool or isinstance(atomic_type, bool):
snowflake_type = "boolean"
if not snowflake_type:
raise RuntimeError(
f"{atomic_type=} is not defined in this method thus can't be translated"
)
return snowflake_type
def create_table_definition(_class: object) -> StructType:
parser = ClassParser()
column_definitions = parser.parse_class(_class)
table_definition: StructType = StructType.from_json(column_definitions)
return table_definition
def create_or_replace_table(session, table_name: str, table_definition: StructType):
dataframe = session.create_dataframe([], schema=table_definition)
if session.catalog.tableExists(table_name):
dataframe.write.mode("overwrite").save_as_table(table_name)
return
# creates table if the table does not already exists.
# otherwise, simple ignores this operation
dataframe.write.mode("ignore").save_as_table(table_name)
def create_table(session, table_name: str, table_definition: StructType):
dataframe = session.create_dataframe([], schema=table_definition)
# Throws an exception if the table already exists.
dataframe.write.mode("errorifexists").save_as_table(table_name)
def insert_data(
session, table_name: str, table_definition: StructType, data: list[list[Any]]
):
dataframe = session.create_dataframe(data, schema=table_definition)
dataframe.write.mode("append").save_as_table(table_name)
def get_last_recorded_value(session, table_name: str, column_name: str) -> int | None:
column_name = column_name.strip().upper()
dframe_backlog_data = session.table(table_name)
if column_name not in dframe_backlog_data.columns:
raise ValueError(
f"{column_name} is not defined in the given table {table_name}"
)
column_alias = f"LAST_{column_name}"
result: list[Any] = dframe_backlog_data.agg(
f.max(column_name).alias(column_alias)
).collect()
# returns none if there are not records
return result[0][column_alias]
def main(snowflake_session, table_name: str):
session = snowflake_session
table_definition = create_table_definition(BacklogActivity)
if not session.catalog.tableExists(table_name):
create_table(session, table_name, table_definition)
last_activity_id = get_last_recorded_value(session, table_name, "activity_id") or 0
count = 0
api_requests = 0
while backlog_data := fetch_backlog_data(last_activity_id):
values = [list(asdict(data).values()) for data in backlog_data]
insert_data(session, table_name, table_definition, values)
last_activity_id = backlog_data[0].activity_id
count += len(values)
api_requests += 1
return f"{count} rows of data have been inserted into {table_name} in {api_requests} api_requests"
$$;
-- call backup_backlog_data('backlog_data');
-- select * from backlog_data;
-------------------------------------
CREATE TASK backup_backlog_data
SCHEDULE='60 MINUTES'
SERVERLESS_TASK_MAX_STATEMENT_SIZE='SMALL'
SUSPEND_TASK_AFTER_NUM_FAILURES = 1
AS
call backup_backlog_data('backlog_data');
ALTER TASK backup_backlog_data resume;
Comments
Post a Comment