Liste des articles

Ingérer une base de données vers AWS S3 en Python avec Pandas

Nous allons voir comment ingérer simplement le contenu d’une base de données avec Python et Pandas.

1 Solutions existantes

1.1 AWS DMS

Il est important de demander la solution DMS (Data Migration Service) proposée par AWS. Cette solution est plutôt simple à configurer si votre base de données est accessible depuis AWS via un VPC (Virtual Private Cloud) .

De plus, une tâche de migration correspond à une seule table, et il est tout à fait possible de migrer plusieurs tables en parallèle, tant que la base de données le supporte.

En revanche, la reprise sur erreur est, à ce jour, en version 3.4.5, très mal gérée, puisque lorsqu’une erreur quelconque se produit, survient, les données sont effacées et le processus de migration rétabli de zéro.

C’est pourquoi nous vous proposons, dans cet article, une autre solution, simple et efficace .

1.2 Talend Big Data

La solution Talend Big Data est elle aussi envisageable dans le cadre de migrations de bases de données volumineuses vers le Cloud.

En revanche, elle requiert à la fois de l’expertise et du budget.

2 Principe de la solution proposée

Tout l’intérêt de notre solution est de tirer profit d’un cache, qui peut aussi bien être on premise que sur AWS, afin de bien gérer la reprise sur panne ou sur erreur, sans reprendre inutilement tout le processus.

Voyons en détail comment se dérouler chaque étape du schéma ci-dessus, au travers d’un exemple depuis une base de données SQL Server.

2.1 Chargement des partitions

L’idée est d’utiliser la (ou les) colonne(s) utilisée(s) pour partitionner la table à migrer, ou bien un critère qui va permettre de récupérer les données par lots.

Le critère le plus utilisé est celui de date, que l’on pourra réutiliser pour le partitionnement S3 in fine.

Le code Python ci-dessous permet de récupérer les valeurs des partitions, avec des critères optionnels de valeur de début (from_partition) et de fin (to_partition).

La première a choisi de faire ici de vérifier l’existence du fichier de partitions mis en cache partitions.csv : si un tel fichier existe, alors on le charge tel quel, sinon on va récupérer les partitions dans la base de données

def load_partitions(cache_folder, server, port, db_name, username, password, table_name, partition_column, from_partition, to_partition):
    cache_partitions_file = join(cache_folder, "partitions.csv")
    # load partitions from the cache partition file if the cache exists
    if not isfile(cache_partitions_file):
        logger.info("Loading partitions from database")
        sql_load_partitions = "SELECT DISTINCT " + partition_column + " FROM " + table_name + " with (nolock) "
        if from_partition is not None:
            sql_load_partitions = sql_load_partitions + " WHERE " + partition_column + " >= '" + from_partition + "'"
        if to_partition is not None:
            if from_partition is None:
                sql_load_partitions = sql_load_partitions + " WHERE "
            else:
                sql_load_partitions = sql_load_partitions + " AND "
            sql_load_partitions = sql_load_partitions + partition_column + " <= '" + to_partition + "'"
        sql_load_partitions = sql_load_partitions + " ORDER BY " + partition_column
        logger.info("Loading partitions with query : " + sql_load_partitions)
        cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + server + ',' + port
                              + ';DATABASE=' + db_name + ';UID=' + username + ';PWD=' + password)
        df_partitions = pd.read_sql(sql_load_partitions, cnxn)
        logger.info("Writing partitions to the cache")
        df_partitions.to_csv(cache_partitions_file, index=False)
        cnxn.close()
    logger.info("Loading partitions from the cache")
    df_partitions = pd.read_csv(cache_partitions_file)
    logger.info("Partitions loaded")
    return df_partitions

 

A ce niveau, les partitions sont transposées dans le dataframe Pandas df_partitions.

2.2 Récupération des données d’une partition

La fonction ci-dessous permet de charger les données correspondant à une partition, dont la valeur correspond au paramètre recovery_partition .

La même fonction prend aussi en paramètre la valeur de la clé primaire à partir de laquelle on démarrera le chargement : il s’agit du paramètre from_primary_key, qui prendra la valeur à partir de laquelle on voudra récupérer les données dans le cadre de la reprise sur erreur, que l’on étudiera plus loin.

Enfin, les paramètres contiennent :

  • les partitions chargées précédemment ( df_partitions ) ;
  • les paramètres de connexion à la base de données ( serveur, port, db_name, username, password ) ;
  • les données que l’on souhaite migrer ( nom_table, colonne_partition, valeur_partition, partition_récupérée, colonne_clé_primaire, from_clé_primaire )
  • le nombre de lignes par fichier CSV ingéré ( nb_rows_by_csv_file )
  • les informations relatives au cache ( cache_folder, csv_file_pattern, csv_file_pattern_replaced )
  • le client boto3 S3 ( s3_client )
  • les informations relatives à la cible S3 ( bucket_name, prefix )

 

def ingest_remaining_partitions(df_partitions, server, port, db_name, username, password, table_name, partition_column, partition_value, recovered_partition, primary_key_column, from_primary_key, nb_rows_by_csv_file, cache_folder, csv_file_pattern, csv_file_pattern_replaced, s3_client, bucket_name, prefix):
    logger.info("Lets ingest remaining partitions")
    try:
        ingest_partition = False if recovered_partition else True
        for cur_partition in df_partitions.iterrows():
            cur_partition_value = cur_partition[1][partition_column]
            if not ingest_partition:
                if cur_partition_value == recovered_partition:
                    ingest_partition = True
            else:
                sql_load_partition = 'SELECT * FROM ' + table_name \
                                         + ' with (nolock) WHERE ' + partition_column + " = '" + cur_partition_value + "'"
                if partition_value is not None and from_primary_key is not None and partition_value == cur_partition_value:
                    sql_load_partition = sql_load_partition + " AND " + primary_key_column + " >= " + from_primary_key
                sql_load_partition = sql_load_partition + " ORDER BY " + primary_key_column
                logger.info("Ingesting partition " + cur_partition_value + " with query " + sql_load_partition)
                cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + server + ',' + port
                                      + ';DATABASE=' + db_name + ';UID=' + username + ';PWD=' + password)
                df = pd.read_sql_query(sql_load_partition, cnxn, chunksize=nb_rows_by_csv_file)

                cur_nb_file = 0
                for cur_chunk in df:
                    cur_nb_file = cur_nb_file + 1
                    cur_file_name = generate_file_name(csv_file_pattern, cur_partition_value, cur_nb_file)
                    cur_file_path = join(cache_folder, cur_file_name)
                    cur_chunk.to_csv(cur_file_path, index=False)
                    remove_other_cache_files(cache_folder, csv_file_pattern_replaced, cur_partition_value, cur_nb_file)
                    s3_path = prefix + "/" + cur_file_name
                    s3_client.upload_file(cur_file_path, bucket_name, s3_path)
                cnxn.close()
                logger.info("Partition " + cur_partition_value + " ingested")
        logger.info("All partitions ingested successfully for table " + table_name)
        return True
    except Exception as e:
        logger.error("Error during remaining partitions ingestion:")
        logger.error(traceback.format_exc())
        logger.error("Waiting " + str(sleep_nb_secs) + " seconds before continue")
        time.sleep(sleep_nb_secs)
        return False

La fonction  ingest_remaining_partitions renvoie True si l’ingstion se termine bien, sinon False

La fonction generate_file_name ci-dessous permet de créer un nom de fichier correspondant à un pattern de partition et un numéro de fichier incrémenté au fil de l’ingestion d’une partition, et réinitialisé lors du traitement de la partition suivante :

def remove_other_cache_files(cache_folder, partition_file_pattern, partition_name, file_number):
    for cur_file_name in listdir(cache_folder):
        cur_file_path = join(cache_folder, cur_file_name)
        if isfile(cur_file_path):
            file_search = re.search(partition_file_pattern, cur_file_name, re.IGNORECASE)
            if file_search:
                cur_partition_name = file_search.group(1)
                cur_file_number = int(file_search.group(2))
                if cur_partition_name != partition_name or cur_file_number != file_number:
                    os.remove(cur_file_path)

3 Gestion de la reprise sur erreur

La fonction recover_on_error ci-dessous va charger les fichiers contenus dans le cache et terminer uniquement l’ingestion de la partition qui a été identifiée dans le cache.

def generate_file_name(csv_file_pattern, partition_name, file_number):
    file_name = csv_file_pattern.replace('#PARTITION#', partition_name)
    file_name = file_name.replace('#FILE_NUMBER#', str(file_number))
    return file_name

Enfin, il est important de comprendre que lorsqu’un fichier est mis en cache, les autres fichiers du cache supposés être déjà ingérés sont supprimés, grâce à la fonction  remove_other_cache_files, qui va utiliser les patterns d’expression régulière correspondant aux fichiers ingérés pour savoir le(s)quel(s) supprimer :

def recover_on_error(cache_folder, csv_file_pattern, server, port, db_name, username, password, table_name, primary_key_column, partition_column, csv_file_pattern_replaced, nb_rows_by_csv_file, s3_client, bucket_name, prefix):
    logger.info("Lets recover possible previous error")
    # 1 retrieve last loaded file in the cache folder

    cur_nb_file = 1
    partition_to_recover = None
    cache_file_path = None
    cache_file_name = None
    for cur_file_name in listdir(cache_folder):
        cur_file_path = join(cache_folder, cur_file_name)
        if isfile(cur_file_path):
            file_search = re.search(csv_file_pattern_replaced, cur_file_name, re.IGNORECASE)
            if file_search:
                partition_to_recover = file_search.group(1)
                cur_nb_file = int(file_search.group(2))
                cache_file_path = cur_file_path
                cache_file_name = cur_file_name

    # 2 IF a last ingested file exists in the cache
    last_recovery_on_error_value = None
    if cache_file_path:
        logger.info("Recovering previous error using cache file " + cache_file_path)
        # 2.1 ingest the last ingested file in S3 (in the case it should not have been ingested yet)
        s3_path = prefix + "/" + cache_file_name
        s3_client.upload_file(cache_file_path, bucket_name, s3_path)
        # 2.2 ingest remaining data of the partition
        df = pd.read_csv(cache_file_path)
        # 2.3 Generate query to find the remaining data :
        # - to add partition criteria : partition column = current partition value
        # - to add recovery on error criteria : col used as primary key > primary key max (last of the file) value found
        cache_last_row = df.tail(1)
        last_recovery_on_error_value = cache_last_row[primary_key_column].item()
        sql_recover_last_partition = 'SELECT * FROM ' + table_name \
                                     + ' with (nolock) WHERE ' + partition_column + " = '" + partition_to_recover + "'"\
                                     + ' AND ' + primary_key_column + ' > ' + str(last_recovery_on_error_value) \
                                     + " ORDER BY " + primary_key_column
        logger.info("Lets try to recover previous error with query " + sql_recover_last_partition)
        # 2.4 Load remaining data in the database, by chunks
        ingestion_terminated = False
        while not ingestion_terminated:
            try:
                cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + server + ',' + port
                                  + ';DATABASE=' + db_name + ';UID=' + username + ';PWD=' + password)
                df = pd.read_sql_query(sql_recover_last_partition, cnxn, chunksize=nb_rows_by_csv_file)
                # 2.5 Ingest each chunk one by one
                for cur_chunk in df:
                    cur_nb_file = cur_nb_file + 1
                    cur_file_name = generate_file_name(csv_file_pattern, partition_to_recover, cur_nb_file)
                    cur_file_path = join(cache_folder, cur_file_name)
                    cur_chunk.to_csv(cur_file_path, index=False)
                    remove_other_cache_files(cache_folder, csv_file_pattern_replaced, partition_to_recover, cur_nb_file)
                    s3_path = prefix + "/" + cur_file_name
                    s3_client.upload_file(cur_file_path, bucket_name, s3_path)
                cnxn.close()
                ingestion_terminated = True
                logger.info("Previous error recovered successfully")
            except Exception as e:
                #2.7 IF an error occurs, just sleep some seconds, and retry
                logger.error("Error recovering previous error :")
                logger.error(traceback.format_exc())
                logger.error("Waiting " + str(sleep_nb_secs) + " seconds before continue")
                time.sleep(sleep_nb_secs)
    else:
        logger.info("No previous error to recover")
    return partition_to_recover

La partition à terminer d’ingérer est renvoyée, ou bien None si rien n’a été trouvé dans le cache

4 Vue d’ensemble du processus

Nous venons de voir l’implémentation des différentes étapes de notre ingestion.

Il ne nous reste plus qu’à les assembler pour réaliser notre ingestion, en supposant qu’il est nécessaire de réaliser une potentielle reprise sur erreur à partir des fichiers contenus dans le cache :

    # 1 - load partitions
    df_partitions = load_partitions(cache_folder, server, port, db_name, username, password, table_name,
                                    partition_column, from_partition, to_partition)

    ingestion_terminated = False
    while not ingestion_terminated:
        # 2 - recovery on error
        recovered_partition = recover_on_error(cache_folder, csv_file_pattern, server, port, db_name, username,
                                               password, table_name, primary_key_column, partition_column,
                                               csv_file_pattern_replaced, nb_rows_by_csv_file, s3_client, bucket_name,
                                               prefix)

        # 3 - ingest remaining partitions
        ingestion_terminated = ingest_remaining_partitions(df_partitions, server, port, db_name, username, password,
                                                           table_name, partition_column, from_partition,
                                                           recovered_partition, primary_key_column, from_primary_key,
                                                           nb_rows_by_csv_file, cache_folder, csv_file_pattern,
                                                           csv_file_pattern_replaced, s3_client, bucket_name, prefix)

5 Conclusion

Cette solution fut utilisée avec succès pour migrer des bases contenant plusieurs dizaines de téraoctets de données.

Son principal avantage est qu’il est tout à fait possible de lancer plusieurs lots de partitions en parallèle sur une même table, tant que la base de données le supporte.

Des améliorations sont possibles, comme l’envoi de notifications en cas d’erreur ou la transformation au format parquet, potentiellement compressé, au lieu du CSV actuel.

Bien sûr il sera nécessaire d’importer les librairies Python nécessaires au bon fonctionnement du script, comme pyodbc ou Pandas.

Pour toute remarque, suggestion ou demande de migration de vos bases de données, nous vous préférons à nous contacter .