Transformations
SQL Providers
Register a SQL transformation source.
The name of the function is the name of the resulting source.
Sources for the transformation can be specified by adding the Name and Variant in brackets '{{ name.variant }}'. The correct source is substituted when the query is run.
Examples:
postgres = client.get_provider("my_postgres")
@postgres.sql_transformation(variant="quickstart")
def average_user_transaction():
return "SELECT CustomerID as user_id, avg(TransactionAmount) as avg_transaction_amt from {{transactions.v1}} GROUP BY user_id"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Name of source |
''
|
variant |
str
|
Name of variant |
''
|
schedule |
str
|
The frequency at which the transformation is run as a cron expression |
''
|
owner |
Union[str, UserRegistrar]
|
Owner |
''
|
description |
str
|
Description of primary data to be registered |
''
|
inputs |
list
|
A list of Source NameVariant Tuples to input into the transformation |
None
|
Returns:
Name | Type | Description |
---|---|---|
source |
ColumnSourceRegistrar
|
Source |
Spark
SQL Transformation
Register a SQL transformation source. The spark.sql_transformation decorator takes the returned string in the following function and executes it as a SQL Query.
The name of the function is the name of the resulting source.
Sources for the transformation can be specified by adding the Name and Variant in brackets '{{ name.variant }}'. The correct source is substituted when the query is run.
Examples:
@spark.sql_transformation(variant="quickstart")
def average_user_transaction():
return "SELECT CustomerID as user_id, avg(TransactionAmount) as avg_transaction_amt from {{transactions.v1}} GROUP BY user_id"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Name of source |
''
|
variant |
str
|
Name of variant |
''
|
owner |
Union[str, UserRegistrar]
|
Owner |
''
|
description |
str
|
Description of primary data to be registered |
''
|
Returns:
Name | Type | Description |
---|---|---|
source |
ColumnSourceRegistrar
|
Source |
Dataframe Transformation
Register a Dataframe transformation source. The spark.df_transformation decorator takes the contents of the following function and executes the code it contains at serving time.
The name of the function is used as the name of the source when being registered.
The specified inputs are loaded into dataframes that can be accessed using the function parameters.
Examples:
@spark.df_transformation(inputs=[("source", "one")]) # Sources are added as inputs
def average_user_transaction(df): # Sources can be manipulated by adding them as params
return df
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Name of source |
''
|
variant |
str
|
Name of variant |
''
|
owner |
Union[str, UserRegistrar]
|
Owner |
''
|
description |
str
|
Description of primary data to be registered |
''
|
inputs |
list[Tuple(str, str)]
|
A list of Source NameVariant Tuples to input into the transformation |
[]
|
Returns:
Name | Type | Description |
---|---|---|
source |
ColumnSourceRegistrar
|
Source |
Kubernetes Pandas Runner
SQL Transformation
Register a SQL transformation source. The k8s.sql_transformation decorator takes the returned string in the following function and executes it as a SQL Query.
The name of the function is the name of the resulting source.
Sources for the transformation can be specified by adding the Name and Variant in brackets '{{ name.variant }}'. The correct source is substituted when the query is run.
Examples:
@k8s.sql_transformation(variant="quickstart")
def average_user_transaction():
return "SELECT CustomerID as user_id, avg(TransactionAmount) as avg_transaction_amt from {{transactions.v1}} GROUP BY user_id"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Name of source |
''
|
variant |
str
|
Name of variant |
''
|
owner |
Union[str, UserRegistrar]
|
Owner |
''
|
inputs |
list
|
A list of Source NameVariant Tuples to input into the transformation |
None
|
description |
str
|
Description of primary data to be registered |
''
|
docker_image |
str
|
A custom Docker image to run the transformation |
''
|
resource_specs |
K8sResourceSpecs
|
Custom resource requests and limits |
None
|
Returns:
Name | Type | Description |
---|---|---|
source |
ColumnSourceRegistrar
|
Source |
Dataframe Transformation
Register a Dataframe transformation source. The k8s.df_transformation decorator takes the contents of the following function and executes the code it contains at serving time.
The name of the function is used as the name of the source when being registered.
The specified inputs are loaded into dataframes that can be accessed using the function parameters.
Examples:
@k8s.df_transformation(inputs=[("source", "one")]) # Sources are added as inputs
def average_user_transaction(df): # Sources can be manipulated by adding them as params
return df
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Name of source |
''
|
variant |
str
|
Name of variant |
''
|
owner |
Union[str, UserRegistrar]
|
Owner |
''
|
description |
str
|
Description of primary data to be registered |
''
|
inputs |
list[Tuple(str, str)]
|
A list of Source NameVariant Tuples to input into the transformation |
[]
|
docker_image |
str
|
A custom Docker image to run the transformation |
''
|
resource_specs |
K8sResourceSpecs
|
Custom resource requests and limits |
None
|
Returns:
Name | Type | Description |
---|---|---|
source |
ColumnSourceRegistrar
|
Source |