This document details how to employ custom Java code within Adeptia for Snowflake bulk operations and runtime file cleanup. The custom JAR enables bulk loading of CSV files into Snowflake using PUT, COPY, and MERGE operations, and also facilitates the cleanup of temporary or operational files created during processing.
Requirements:
- Java JDK 17
- Add below Java arguments to JAVA_OPTS of runtime microservice
--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
Deploying the Custom JAR in Adeptia:
- Place the downloaded JAR file into the `shared/customClasses` directory. Rename the JAR file if necessary.
- Restart the Runtime, Portal, and Webrunner services.
Using the Custom jar in Process Design for Bulk Load:
Following JAR deployment, its classes and methods can be integrated into Adeptia Process flows as demonstrated below.
Creating Custom Plugin:
To configure Snowflake bulk load after restarting the necessary PODs:
- Create a custom plugin.
- Set the Interpreter Type to “Spring Bean”.
- From the Spring Bean dropdown, choose the “SnowflakeBulkInsert” class.
- Save the custom plugin.
Example:
Using Custom Plugin in Process Flow:
Snowflake Bulk Load Process:
- Process Flow Setup: Design a process flow that includes a source, a Data Mapping component, and a CSV text layout as the destination.
- CSV File Output: Configure the process to write the generated CSV file to a specified LAN target.
- Custom Plugin Integration: Following the successful writing of the CSV file to the LAN target, attach the custom plugin as illustrated below.
- Consume Stream - false
- Generate Stream - false
- The custom Snowflake bulk load plugin requires five parameters to be provided as process flow variables. These values can either be hardcoded or retrieved from the context:
- `connectionInfoID`: The ID of the Snowflake database connection configured in Adeptia.
- `targetTableName`: The name of the table where data will be written.
- `targetSchemaName`: The name of the schema containing the target table.
- `databaseName`: The name of the database where the table is located.
- `filePath`: The path to the generated CSV file.
- `keyColumn`: (Required for UPDATE operations) The primary key column of the target table.
Key Notes:
- When defining the Text Schema for Snowflake bulk loads, ensure the following advanced properties are enabled:
- Quotes Handling: On
- Enclose Target Field Values In Double Quotes
- To handle new line characters or double quotes within the data, perform replacements in the mapper. Additionally, replace any special characters that could corrupt the CSV file.
How The Custom Plugin works
Snowflake bulk loading can be achieved through the following steps:
- The custom plugin reads parameters and creates a temporary stage.
- The PUT command is executed to upload the CSV file to the temporary stage.
- A temporary table, mirroring the target table's structure, is created.
- The COPY INTO command loads data from the CSV file in the temporary stage into the temporary table.
- Finally, a MERGE INTO operation inserts or updates data from the temporary table into the target table.
- To prevent unnecessary storage usage in Snowflake, we perform a cleanup Temp Objects d after code completion, regardless of whether it succeeds or encounters an error.
For optimization:
- If the `KeyColumn` value is null, only an INSERT INTO operation is performed.
- If a primary key value exists in `KeyColumn`, the MERGE INTO command is executed.
Troubleshooting
If you experience problems with the custom JAR:
- Enable INFO mode for the process flow to get detailed, step-by-step logs.
- Check the "BulkInsertCompleted" context variable. A value of true indicates the bulk operation finished successfully. If it's false, the "errorMessage" variable will contain the reason for the failure.
- The INFO logging level provides a detailed breakdown of the custom plugin's operations.
Using the Custom jar in Process Design for temp files cleanup:
Following JAR deployment, its classes and methods can be integrated into Adeptia Process flows as demonstrated below.
Creating Custom Plugin:
To configure Snowflake bulk load after restarting the necessary PODs:
- Create a custom plugin.
- Set the Interpreter Type to “Spring Bean”.
- From the Spring Bean dropdown, choose the “FolderCleanup” class.
- Save the custom plugin.
Using Custom Plugin in Process Flow:
- Integrate your custom plugin within your process flow
- Consume Stream - false
- Generate Stream - false
- The custom plugin requires below parameters to be passed as process flow variables:
- folderPath - Path location from where you want to delete the files
- format - minutes/hours/days
- duration - should be a number
Please note that this will delete files older than the specified time.
Troubleshooting:
If you experience problems with the custom JAR:
- Enable INFO mode for the process flow to get detailed, step-by-step logs.
- Check the "CleanupCompleted" context variable. A value of true indicates the operation finished successfully. If it's false, the "errorMessage" variable will contain the reason for the failure.
- The context variable “deletedFileCount” gives you the number of files deleted.
- The INFO logging level provides a detailed breakdown of the custom plugin's operations.
Comments
0 comments
Please sign in to leave a comment.