List of articles

Ingest a database to AWS S3 with Python and Pandas

We will see how to ingest to AWS S3 easily the data contained in a table of a database in Python with Pandas.

1  The existing solutions

1.1 AWS DMS

It is important to ask for the DMS (Data Migration Service) solution offered by AWS. This solution is rather simple to configure if your database is accessible from AWS via a VPC (Virtual Private Cloud).

Moreover, a migration task corresponds to a single table, and it is quite possible to migrate several tables in parallel, as long as the database supports it.

On the other hand, error recovery is, to date, in version 3.4.5, very poorly managed, since when any error occurs, the data is erased and the migration process restarted from scratch.

That’s why we propose, in this article, another solution, simple and effective.

1.2 Talend Big Data

Talend’s Big Data solution can also be used to migrate large databases to the cloud.

However, it requires both expertise and budget.

2 Principle of the proposed solution

The whole point of our solution is to take advantage of a cache, which can be on premise as well as on AWS, in order to manage failure or error recovery, without unnecessarily repeating the whole process.

Let’s see in detail how each step of the above scheme works, through an example from a SQL Server database.

2.1 Loading partitions

The idea is to use the column(s) used to partition the table to be migrated, or a criterion that will allow the data to be recovered in batches.

The most used criterion is the date criterion, which can be reused for the S3 partitioning in fine.

The Python code below retrieves partition values, with optional start (from_partition) and end (to_partition) value criterian).


The first one has chosen to check the existence of the cached partitions.csv : if such a file exists, then we load it as it is, otherwise we will retrieve the partitions from the database

def load_partitions(cache_folder, server, port, db_name, username, password, table_name, partition_column, from_partition, to_partition):
    cache_partitions_file = join(cache_folder, "partitions.csv")
    # load partitions from the cache partition file if the cache exists
    if not isfile(cache_partitions_file):
        logger.info("Loading partitions from database")
        sql_load_partitions = "SELECT DISTINCT " + partition_column + " FROM " + table_name + " with (nolock) "
        if from_partition is not None:
            sql_load_partitions = sql_load_partitions + " WHERE " + partition_column + " >= '" + from_partition + "'"
        if to_partition is not None:
            if from_partition is None:
                sql_load_partitions = sql_load_partitions + " WHERE "
            else:
                sql_load_partitions = sql_load_partitions + " AND "
            sql_load_partitions = sql_load_partitions + partition_column + " <= '" + to_partition + "'"
        sql_load_partitions = sql_load_partitions + " ORDER BY " + partition_column
        logger.info("Loading partitions with query : " + sql_load_partitions)
        cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + server + ',' + port
                              + ';DATABASE=' + db_name + ';UID=' + username + ';PWD=' + password)
        df_partitions = pd.read_sql(sql_load_partitions, cnxn)
        logger.info("Writing partitions to the cache")
        df_partitions.to_csv(cache_partitions_file, index=False)
        cnxn.close()
    logger.info("Loading partitions from the cache")
    df_partitions = pd.read_csv(cache_partitions_file)
    logger.info("Partitions loaded")
    return df_partitions

 

At this level, the partitions are transposed into the Pandas dataframe df_partitions.

2.2 Recovering data from a partition

The function below allows to load the data corresponding to a partition, whose value corresponds to the parameter recovered_partition.

The same function also takes as a parameter the value of the primary key from which to start the loadin : this is the from_primary_key, which will take the value from which we want to recover the data in the context of error recovery, which we will study later.

Finally, the parameters contain :

  • the previously loaded partitions (df_partitions) ;
  • database connection parameters (server, port, db_name, username, password) ;
  • the data you want to migrate (table_name, partition_column, partition_value, recovered_partition, primary_key_column, from_primary_key)
  • the number of rows per CSV file ingested (nb_rows_by_csv_file)
  • cache information (cache_folder, csv_file_pattern, csv_file_pattern_replaced)
  • the client boto3 S3 (s3_client)
  • information about the target S3 (bucket_name, prefix)

 

def ingest_remaining_partitions(df_partitions, server, port, db_name, username, password, table_name, partition_column, partition_value, recovered_partition, primary_key_column, from_primary_key, nb_rows_by_csv_file, cache_folder, csv_file_pattern, csv_file_pattern_replaced, s3_client, bucket_name, prefix):
    logger.info("Lets ingest remaining partitions")
    try:
        ingest_partition = False if recovered_partition else True
        for cur_partition in df_partitions.iterrows():
            cur_partition_value = cur_partition[1][partition_column]
            if not ingest_partition:
                if cur_partition_value == recovered_partition:
                    ingest_partition = True
            else:
                sql_load_partition = 'SELECT * FROM ' + table_name \
                                         + ' with (nolock) WHERE ' + partition_column + " = '" + cur_partition_value + "'"
                if partition_value is not None and from_primary_key is not None and partition_value == cur_partition_value:
                    sql_load_partition = sql_load_partition + " AND " + primary_key_column + " >= " + from_primary_key
                sql_load_partition = sql_load_partition + " ORDER BY " + primary_key_column
                logger.info("Ingesting partition " + cur_partition_value + " with query " + sql_load_partition)
                cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + server + ',' + port
                                      + ';DATABASE=' + db_name + ';UID=' + username + ';PWD=' + password)
                df = pd.read_sql_query(sql_load_partition, cnxn, chunksize=nb_rows_by_csv_file)

                cur_nb_file = 0
                for cur_chunk in df:
                    cur_nb_file = cur_nb_file + 1
                    cur_file_name = generate_file_name(csv_file_pattern, cur_partition_value, cur_nb_file)
                    cur_file_path = join(cache_folder, cur_file_name)
                    cur_chunk.to_csv(cur_file_path, index=False)
                    remove_other_cache_files(cache_folder, csv_file_pattern_replaced, cur_partition_value, cur_nb_file)
                    s3_path = prefix + "/" + cur_file_name
                    s3_client.upload_file(cur_file_path, bucket_name, s3_path)
                cnxn.close()
                logger.info("Partition " + cur_partition_value + " ingested")
        logger.info("All partitions ingested successfully for table " + table_name)
        return True
    except Exception as e:
        logger.error("Error during remaining partitions ingestion:")
        logger.error(traceback.format_exc())
        logger.error("Waiting " + str(sleep_nb_secs) + " seconds before continue")
        time.sleep(sleep_nb_secs)
        return False

The  ingest_remaining_partitions function returns True if the ingstion ends well, otherwise False

The generate_file_name function below creates a file name corresponding to a partition pattern and a file number that is incremented as a partition is ingested, and reset when the next partition is processed :

def remove_other_cache_files(cache_folder, partition_file_pattern, partition_name, file_number):
    for cur_file_name in listdir(cache_folder):
        cur_file_path = join(cache_folder, cur_file_name)
        if isfile(cur_file_path):
            file_search = re.search(partition_file_pattern, cur_file_name, re.IGNORECASE)
            if file_search:
                cur_partition_name = file_search.group(1)
                cur_file_number = int(file_search.group(2))
                if cur_partition_name != partition_name or cur_file_number != file_number:
                    os.remove(cur_file_path)

3 Error recovery management

The recover_on_error function below will load the files contained in the cache and terminate the ingestion of only the partition that has been identified in the cache.

def generate_file_name(csv_file_pattern, partition_name, file_number):
    file_name = csv_file_pattern.replace('#PARTITION#', partition_name)
    file_name = file_name.replace('#FILE_NUMBER#', str(file_number))
    return file_name

Finally, it is important to understand that when a file is cached, the other files in the cache that are supposed to be already ingested are removed, thanks to the  remove_other_cache_files, which will use the regular expression patterns corresponding to the ingested files to know which one(s) to remove :

def recover_on_error(cache_folder, csv_file_pattern, server, port, db_name, username, password, table_name, primary_key_column, partition_column, csv_file_pattern_replaced, nb_rows_by_csv_file, s3_client, bucket_name, prefix):
    logger.info("Lets recover possible previous error")
    # 1 retrieve last loaded file in the cache folder

    cur_nb_file = 1
    partition_to_recover = None
    cache_file_path = None
    cache_file_name = None
    for cur_file_name in listdir(cache_folder):
        cur_file_path = join(cache_folder, cur_file_name)
        if isfile(cur_file_path):
            file_search = re.search(csv_file_pattern_replaced, cur_file_name, re.IGNORECASE)
            if file_search:
                partition_to_recover = file_search.group(1)
                cur_nb_file = int(file_search.group(2))
                cache_file_path = cur_file_path
                cache_file_name = cur_file_name

    # 2 IF a last ingested file exists in the cache
    last_recovery_on_error_value = None
    if cache_file_path:
        logger.info("Recovering previous error using cache file " + cache_file_path)
        # 2.1 ingest the last ingested file in S3 (in the case it should not have been ingested yet)
        s3_path = prefix + "/" + cache_file_name
        s3_client.upload_file(cache_file_path, bucket_name, s3_path)
        # 2.2 ingest remaining data of the partition
        df = pd.read_csv(cache_file_path)
        # 2.3 Generate query to find the remaining data :
        # - to add partition criteria : partition column = current partition value
        # - to add recovery on error criteria : col used as primary key > primary key max (last of the file) value found
        cache_last_row = df.tail(1)
        last_recovery_on_error_value = cache_last_row[primary_key_column].item()
        sql_recover_last_partition = 'SELECT * FROM ' + table_name \
                                     + ' with (nolock) WHERE ' + partition_column + " = '" + partition_to_recover + "'"\
                                     + ' AND ' + primary_key_column + ' > ' + str(last_recovery_on_error_value) \
                                     + " ORDER BY " + primary_key_column
        logger.info("Lets try to recover previous error with query " + sql_recover_last_partition)
        # 2.4 Load remaining data in the database, by chunks
        ingestion_terminated = False
        while not ingestion_terminated:
            try:
                cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + server + ',' + port
                                  + ';DATABASE=' + db_name + ';UID=' + username + ';PWD=' + password)
                df = pd.read_sql_query(sql_recover_last_partition, cnxn, chunksize=nb_rows_by_csv_file)
                # 2.5 Ingest each chunk one by one
                for cur_chunk in df:
                    cur_nb_file = cur_nb_file + 1
                    cur_file_name = generate_file_name(csv_file_pattern, partition_to_recover, cur_nb_file)
                    cur_file_path = join(cache_folder, cur_file_name)
                    cur_chunk.to_csv(cur_file_path, index=False)
                    remove_other_cache_files(cache_folder, csv_file_pattern_replaced, partition_to_recover, cur_nb_file)
                    s3_path = prefix + "/" + cur_file_name
                    s3_client.upload_file(cur_file_path, bucket_name, s3_path)
                cnxn.close()
                ingestion_terminated = True
                logger.info("Previous error recovered successfully")
            except Exception as e:
                #2.7 IF an error occurs, just sleep some seconds, and retry
                logger.error("Error recovering previous error :")
                logger.error(traceback.format_exc())
                logger.error("Waiting " + str(sleep_nb_secs) + " seconds before continue")
                time.sleep(sleep_nb_secs)
    else:
        logger.info("No previous error to recover")
    return partition_to_recover

The partition to finish ingesting is returned, or None if nothing was found in the cache

4 Overview of the processs

We have just seen the implementation of the different steps of our ingestion.

We only need to assemble them to perform our ingestion, assuming that it is necessary to perform a potential error recovery from the files contained in the cache:

    # 1 - load partitions
    df_partitions = load_partitions(cache_folder, server, port, db_name, username, password, table_name,
                                    partition_column, from_partition, to_partition)

    ingestion_terminated = False
    while not ingestion_terminated:
        # 2 - recovery on error
        recovered_partition = recover_on_error(cache_folder, csv_file_pattern, server, port, db_name, username,
                                               password, table_name, primary_key_column, partition_column,
                                               csv_file_pattern_replaced, nb_rows_by_csv_file, s3_client, bucket_name,
                                               prefix)

        # 3 - ingest remaining partitions
        ingestion_terminated = ingest_remaining_partitions(df_partitions, server, port, db_name, username, password,
                                                           table_name, partition_column, from_partition,
                                                           recovered_partition, primary_key_column, from_primary_key,
                                                           nb_rows_by_csv_file, cache_folder, csv_file_pattern,
                                                           csv_file_pattern_replaced, s3_client, bucket_name, prefix)

5 Conclusion

This solution was successfully used to migrate databases containing several tens of terabytes of data.

Its main advantage is that it is possible to run several batches of partitions in parallel on the same table, as long as the database supports it.

Improvements are possible, such as sending notifications in case of errors or transforming to parquet format, potentially compressed, instead of the current CSV.

Of course it will be necessary to import the Python libraries needed to run the script, such as pyodbc or Pandas.

For any remark, suggestion or request of migration of your databases we prefer you to contact us.