Snowflake Snowpipe API in Python

Contents


Snowflake Snowpipe


Snowflake is cloud data warehouse like Redshift but it features in the seperation of computing resources and storage. Snowflake offers a solution called Snowpipe to help copy files from AWS S3 to Snowflake. There’re many ways to do it e.g. setting up AWS SQS but I would like to discuss a more stable way which is using its snowflake-ingest python package.


Python code setup


I have created a class called SnowpipeAPI shown below


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.error import IngestResponseError

class SnowpipeAPI(object):

    # ...

    def uploadFileList(self, file_list):
        self.logger.info("We are going to load: \n" + str(file_list))

        staged_file_list = [StagedFile(file_name, None)
                            for file_name in file_list]

        self.logger.info("loading pipe: "+self.table['pipe'])

        resp = self.ingest_manager.ingest_files(staged_file_list)

        try:
            resp = self.ingest_manager.ingest_files(staged_file_list)
        except IngestResponseError as e:
            self.logger.info(e)
            self.slackbot.warn(e)
            exit(1)

        uploaded_files = None

        if resp['responseCode'] == 'SUCCESS':
            self.logger.info(
                "Snowflake has received files and will start loading")
            self.logger.info("Request id is : " + resp['requestId'])

            try:
                uploaded_files = self.checkPipeStatus(
                    datetime.datetime.now(), file_list)
            except RecursionError as e:
                self.logger.info(
                    "Fail to continue fetching upload status as {} ".format(str(e)))
                self.slackbot.warn(
                    "Fail to continue fetching upload status as {} ".format(str(e)))

            self.getHistoryReport()

        else:
            self.logger.info("Request from snowpipe api has failed")
            self.slackbot.warn("Request from snowpipe api has failed")

        return uploaded_files

    def checkPipeStatus(self, startTime, file_list, uploaded_file_list=[]):

        if (datetime.datetime.now() - startTime).total_seconds() > self.config['timeout']:
            return uploaded_file_list

        history_resp = self.ingest_manager.get_history()

        uploaded_file_count = len(uploaded_file_list)

        if len(history_resp['files']) > 0:
            uploaded_file_list_per_request = [
                f for f in history_resp['files'] if f['status'] == 'LOADED' 
                and f['complete'] == True]
            uploaded_file_count += len(uploaded_file_list_per_request)
            self.logger.info('Totally processed {} files'.format(
                str(uploaded_file_count)))
            uploaded_file_list += uploaded_file_list_per_request

        filepathWithHour = False
        if re.search(r"\d\d\d\d/\d\d/\d\d/\d\d", file_list[0]):
            filepathWithHour = True

        if ((uploaded_file_count + 1) == len(file_list) and filepathWithHour == False) 
        or ((uploaded_file_count + 24) == len(file_list)  and filepathWithHour == True) 
        or (uploaded_file_count*2 == len(file_list)) 
        or (uploaded_file_count == len(file_list)):
            self.logger.info('Upload finished')
            self.logger.info('Ingest Report:\n')
            self.logger.info(history_resp)
            return uploaded_file_list
        else:
            # wait for 20 seconds
            self.logger.info("waiting for 5 sec")
            time.sleep(5)

        return self.checkPipeStatus(startTime, file_list, uploaded_file_list)

    def getHistoryReport(self):
        hour = datetime.timedelta(hours=1)
        date = datetime.datetime.utcnow() - hour
        history_range_resp = self.ingest_manager.get_history_range(
            date.isoformat() + 'Z')

        self.logger.info('\nHistory scan report: \n')
        self.logger.info(history_range_resp)

    # ...


There’re three important methods in the class uploadFileList , checkPipeStatus and getHistoryReport. I collect the files in S3 and pass them into uploadFileList method. And then I use a tail recursion checkPipeStatus to keep polling the status of the upload process. Once everything is finished , getHistoryReport will print out a report of processed files with their status.


How to use this class


Below is a funciton to pass files to upload into the API method uploadFileList using a higher order function get_file_list to collect a list of the s3 files for a snowflake stage table. For example, you can put a list of files with same table columns layout in one s3 bucket. This function will then load all the files in the bucket into the stage table with a ‘pipe’ which I will talk about in next section.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22

# ...

def process_snowpipe_api(get_file_list, table):

    logger.info("Creating snowpipe API client")
    apiCLient = SnowpipeAPI(config.data, table)

    file_list = get_file_list(table)

    logger.info("Loading file list " + str(file_list))

    if len(file_list) > 0:
        uploaded_files = apiCLient.uploadFileList(file_list)
    else:
        logger.info("No new file to load")
        uploaded_files = None

    return uploaded_files

# ...


Snowflake


For exmaple, I have created a database role in snowflake called snowpipe1. And follow the below steps:


  1. Grant the required privileges on the database objects

1
2
3
4
5
grant usage on database data_lake to role snowpipe1;

grant usage on schema data_lake.xxx to role snowpipe1;

grant insert, select on DATA_LAKE.xxx.table_name to role snowpipe1;

  1. Create file formats. And grant usage to role SNOWPIPE1

‘create file format’


  1. Create stage in sql. And then grant usuage to role SNOWPIPE1

1
2
3
4
  create   stage SNOWPIPE_STAGE_FOR_X
  file_format = SNOWPIPE_CSV_FOR_X
  url = 's3://datalake-bucket/folder/'
  credentials = (aws_key_id='xxx' aws_secret_key=xxx' );

  1. Create pipe and grant ownership. Please note once you grant ownership, your account can’t see the PIPE anymore. You need to switch to snowpipe account.

1
2
3
4
5
6
7
8
create pipe SNOWPIPE_PIPE_FOR_X as copy into DATA_LAKE.RPT.X from @SNOWPIPE_STAGE_FOR_X;
// pause the pipe first in order to switch ownership
ALTER PIPE SNOWPIPE_PIPE_FOR_X SET PIPE_EXECUTION_PAUSED=true;
grant ownership on pipe data_lake.rpt.SNOWPIPE_PIPE_FOR_X to role snowpipe1;
// change back the pipe to running status
select SYSTEM$PIPE_STATUS( 'SNOWPIPE_PIPE_FOR_X' );
ALTER PIPE SNOWPIPE_PIPE_FOR_X SET PIPE_EXECUTION_PAUSED=False;
select SYSTEM$PIPE_STATUS( 'SNOWPIPE_PIPE_FOR_X' );