Snowflake Snowpipe API in Python to ingest data from AWS S3 to Snowflake stage tables


Snowflake Snowpipe


Snowflake is cloud data warehouse like Redshift but it features in the seperation of computing resources and storage. Snowflake offers a solution called Snowpipe to help copy files from AWS S3 to Snowflake. There're many ways to do it e.g. setting up AWS SQS but I would like to discuss a more stable way which is using its snowflake-ingest python package.


Python code setup


I have created a class called SnowpipeAPI shown below


from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.error import IngestResponseError

class SnowpipeAPI(object):

    # ...

    def uploadFileList(self, file_list):
        self.logger.info("We are going to load: \n" + str(file_list))

        staged_file_list = [StagedFile(file_name, None)
                            for file_name in file_list]

        self.logger.info("loading pipe: "+self.table['pipe'])

        resp = self.ingest_manager.ingest_files(staged_file_list)

        try:
            resp = self.ingest_manager.ingest_files(staged_file_list)
        except IngestResponseError as e:
            self.logger.info(e)
            self.slackbot.warn(e)
            exit(1)

        uploaded_files = None

        if resp['responseCode'] == 'SUCCESS':
            self.logger.info(
                "Snowflake has received files and will start loading")
            self.logger.info("Request id is : " + resp['requestId'])

            try:
                uploaded_files = self.checkPipeStatus(
                    datetime.datetime.now(), file_list)
            except RecursionError as e:
                self.logger.info(
                    "Fail to continue fetching upload status as {} ".format(str(e)))
                self.slackbot.warn(
                    "Fail to continue fetching upload status as {} ".format(str(e)))

            self.getHistoryReport()

        else:
            self.logger.info("Request from snowpipe api has failed")
            self.slackbot.warn("Request from snowpipe api has failed")

        return uploaded_files

    def checkPipeStatus(self, startTime, file_list, uploaded_file_list=[]):

        if (datetime.datetime.now() - startTime).total_seconds() > self.config['timeout']:
            return uploaded_file_list

        history_resp = self.ingest_manager.get_history()

        uploaded_file_count = len(uploaded_file_list)

        if len(history_resp['files']) > 0:
            uploaded_file_list_per_request = [
                f for f in history_resp['files'] if f['status'] == 'LOADED' 
                and f['complete'] == True]
            uploaded_file_count += len(uploaded_file_list_per_request)
            self.logger.info('Totally processed {} files'.format(
                str(uploaded_file_count)))
            uploaded_file_list += uploaded_file_list_per_request

        filepathWithHour = False
        if re.search(r"\d\d\d\d/\d\d/\d\d/\d\d", file_list[0]):
            filepathWithHour = True

        if ((uploaded_file_count + 1) == len(file_list) and filepathWithHour == False) 
        or ((uploaded_file_count + 24) == len(file_list)  and filepathWithHour == True) 
        or (uploaded_file_count*2 == len(file_list)) 
        or (uploaded_file_count == len(file_list)):
            self.logger.info('Upload finished')
            self.logger.info('Ingest Report:\n')
            self.logger.info(history_resp)
            return uploaded_file_list
        else:
            # wait for 20 seconds
            self.logger.info("waiting for 5 sec")
            time.sleep(5)

        return self.checkPipeStatus(startTime, file_list, uploaded_file_list)

    def getHistoryReport(self):
        hour = datetime.timedelta(hours=1)
        date = datetime.datetime.utcnow() - hour
        history_range_resp = self.ingest_manager.get_history_range(
            date.isoformat() + 'Z')

        self.logger.info('\nHistory scan report: \n')
        self.logger.info(history_range_resp)

    # ...

There're three important methods in the class uploadFileList , checkPipeStatus and getHistoryReport. I collect the files in S3 and pass them into uploadFileList method. And then I use a tail recursion checkPipeStatus to keep polling the status of the upload process. Once everything is finished , getHistoryReport will print out a report of processed files with their status.


How to use this class


Below is a funciton to pass files to upload into the API method uploadFileList using a higher order function get_file_list to collect a list of the s3 files for a snowflake stage table. For example, you can put a list of files with same table columns layout in one s3 bucket. This function will then load all the files in the bucket into the stage table with a 'pipe' which I will talk about in next section.


# ...

def process_snowpipe_api(get_file_list, table):

    logger.info("Creating snowpipe API client")
    apiCLient = SnowpipeAPI(config.data, table)

    file_list = get_file_list(table)

    logger.info("Loading file list " + str(file_list))

    if len(file_list) > 0:
        uploaded_files = apiCLient.uploadFileList(file_list)
    else:
        logger.info("No new file to load")
        uploaded_files = None

    return uploaded_files

# ...

Snowflake


For exmaple, I have created a database role in snowflake called snowpipe1. And follow the below steps:


  1. Grant the required privileges on the database objects

grant usage on database data_lake to role snowpipe1;

grant usage on schema data_lake.xxx to role snowpipe1;

grant insert, select on DATA_LAKE.xxx.table_name to role snowpipe1;

  1. Create file formats. And grant usage to role SNOWPIPE1

'create file format'


  1. Create stage in sql. And then grant usage to role SNOWPIPE1

  create   stage SNOWPIPE_STAGE_FOR_X
  file_format = SNOWPIPE_CSV_FOR_X
  url = 's3://datalake-bucket/folder/'
  credentials = (aws_key_id='xxx' aws_secret_key=xxx' );

  1. Create pipe and grant ownership. Please note once you grant ownership, your account can’t see the PIPE anymore. You need to switch to snowpipe account.

create pipe SNOWPIPE_PIPE_FOR_X as copy into DATA_LAKE.RPT.X from @SNOWPIPE_STAGE_FOR_X;
// pause the pipe first in order to switch ownership
ALTER PIPE SNOWPIPE_PIPE_FOR_X SET PIPE_EXECUTION_PAUSED=true;
grant ownership on pipe data_lake.rpt.SNOWPIPE_PIPE_FOR_X to role snowpipe1;
// change back the pipe to running status
select SYSTEM$PIPE_STATUS( 'SNOWPIPE_PIPE_FOR_X' );
ALTER PIPE SNOWPIPE_PIPE_FOR_X SET PIPE_EXECUTION_PAUSED=False;
select SYSTEM$PIPE_STATUS( 'SNOWPIPE_PIPE_FOR_X' );

Snowflake Snowpipe API in Python to ingest data from AWS S3 to Snowflake stage tables
arrow_back

Previous

Easy steps to set up your backend API token using Django OAuth Toolkit

Next

AWS Workmail, AWS SES, AWS S3 and AWS Lambda Function to automate
arrow_forward