Ingérer une base de données vers AWS S3 en Python avec Pandas
Nous allons voir comment ingérer simplement le contenu d’une base de données avec Python et Pandas.
1 Solutions existantes
1.1 AWS DMS
Il est important de demander la solution DMS (Data Migration Service) proposée par AWS. Cette solution est plutôt simple à configurer si votre base de données est accessible depuis AWS via un VPC (Virtual Private Cloud) .
De plus, une tâche de migration correspond à une seule table, et il est tout à fait possible de migrer plusieurs tables en parallèle, tant que la base de données le supporte.
En revanche, la reprise sur erreur est, à ce jour, en version 3.4.5, très mal gérée, puisque lorsqu’une erreur quelconque se produit, survient, les données sont effacées et le processus de migration rétabli de zéro.
C’est pourquoi nous vous proposons, dans cet article, une autre solution, simple et efficace .
1.2 Talend Big Data
La solution Talend Big Data est elle aussi envisageable dans le cadre de migrations de bases de données volumineuses vers le Cloud.
En revanche, elle requiert à la fois de l’expertise et du budget.
2 Principe de la solution proposée
Tout l’intérêt de notre solution est de tirer profit d’un cache, qui peut aussi bien être on premise que sur AWS, afin de bien gérer la reprise sur panne ou sur erreur, sans reprendre inutilement tout le processus.
Voyons en détail comment se dérouler chaque étape du schéma ci-dessus, au travers d’un exemple depuis une base de données SQL Server.
2.1 Chargement des partitions
L’idée est d’utiliser la (ou les) colonne(s) utilisée(s) pour partitionner la table à migrer, ou bien un critère qui va permettre de récupérer les données par lots.
Le critère le plus utilisé est celui de date, que l’on pourra réutiliser pour le partitionnement S3 in fine.
Le code Python ci-dessous permet de récupérer les valeurs des partitions, avec des critères optionnels de valeur de début (from_partition) et de fin (to_partition).
La première a choisi de faire ici de vérifier l’existence du fichier de partitions mis en cache partitions.csv : si un tel fichier existe, alors on le charge tel quel, sinon on va récupérer les partitions dans la base de données
def load_partitions(cache_folder, server, port, db_name, username, password, table_name, partition_column, from_partition, to_partition):
cache_partitions_file = join(cache_folder, "partitions.csv")
# load partitions from the cache partition file if the cache exists
if not isfile(cache_partitions_file):
logger.info("Loading partitions from database")
sql_load_partitions = "SELECT DISTINCT " + partition_column + " FROM " + table_name + " with (nolock) "
if from_partition is not None:
sql_load_partitions = sql_load_partitions + " WHERE " + partition_column + " >= '" + from_partition + "'"
if to_partition is not None:
if from_partition is None:
sql_load_partitions = sql_load_partitions + " WHERE "
else:
sql_load_partitions = sql_load_partitions + " AND "
sql_load_partitions = sql_load_partitions + partition_column + " <= '" + to_partition + "'"
sql_load_partitions = sql_load_partitions + " ORDER BY " + partition_column
logger.info("Loading partitions with query : " + sql_load_partitions)
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + server + ',' + port
+ ';DATABASE=' + db_name + ';UID=' + username + ';PWD=' + password)
df_partitions = pd.read_sql(sql_load_partitions, cnxn)
logger.info("Writing partitions to the cache")
df_partitions.to_csv(cache_partitions_file, index=False)
cnxn.close()
logger.info("Loading partitions from the cache")
df_partitions = pd.read_csv(cache_partitions_file)
logger.info("Partitions loaded")
return df_partitions
A ce niveau, les partitions sont transposées dans le dataframe Pandas df_partitions.
2.2 Récupération des données d’une partition
La fonction ci-dessous permet de charger les données correspondant à une partition, dont la valeur correspond au paramètre recovery_partition .
La même fonction prend aussi en paramètre la valeur de la clé primaire à partir de laquelle on démarrera le chargement : il s’agit du paramètre from_primary_key, qui prendra la valeur à partir de laquelle on voudra récupérer les données dans le cadre de la reprise sur erreur, que l’on étudiera plus loin.
Enfin, les paramètres contiennent :
- les partitions chargées précédemment ( df_partitions ) ;
- les paramètres de connexion à la base de données ( serveur, port, db_name, username, password ) ;
- les données que l’on souhaite migrer ( nom_table, colonne_partition, valeur_partition, partition_récupérée, colonne_clé_primaire, from_clé_primaire )
- le nombre de lignes par fichier CSV ingéré ( nb_rows_by_csv_file )
- les informations relatives au cache ( cache_folder, csv_file_pattern, csv_file_pattern_replaced )
- le client boto3 S3 ( s3_client )
- les informations relatives à la cible S3 ( bucket_name, prefix )
def ingest_remaining_partitions(df_partitions, server, port, db_name, username, password, table_name, partition_column, partition_value, recovered_partition, primary_key_column, from_primary_key, nb_rows_by_csv_file, cache_folder, csv_file_pattern, csv_file_pattern_replaced, s3_client, bucket_name, prefix):
logger.info("Lets ingest remaining partitions")
try:
ingest_partition = False if recovered_partition else True
for cur_partition in df_partitions.iterrows():
cur_partition_value = cur_partition[1][partition_column]
if not ingest_partition:
if cur_partition_value == recovered_partition:
ingest_partition = True
else:
sql_load_partition = 'SELECT * FROM ' + table_name \
+ ' with (nolock) WHERE ' + partition_column + " = '" + cur_partition_value + "'"
if partition_value is not None and from_primary_key is not None and partition_value == cur_partition_value:
sql_load_partition = sql_load_partition + " AND " + primary_key_column + " >= " + from_primary_key
sql_load_partition = sql_load_partition + " ORDER BY " + primary_key_column
logger.info("Ingesting partition " + cur_partition_value + " with query " + sql_load_partition)
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + server + ',' + port
+ ';DATABASE=' + db_name + ';UID=' + username + ';PWD=' + password)
df = pd.read_sql_query(sql_load_partition, cnxn, chunksize=nb_rows_by_csv_file)
cur_nb_file = 0
for cur_chunk in df:
cur_nb_file = cur_nb_file + 1
cur_file_name = generate_file_name(csv_file_pattern, cur_partition_value, cur_nb_file)
cur_file_path = join(cache_folder, cur_file_name)
cur_chunk.to_csv(cur_file_path, index=False)
remove_other_cache_files(cache_folder, csv_file_pattern_replaced, cur_partition_value, cur_nb_file)
s3_path = prefix + "/" + cur_file_name
s3_client.upload_file(cur_file_path, bucket_name, s3_path)
cnxn.close()
logger.info("Partition " + cur_partition_value + " ingested")
logger.info("All partitions ingested successfully for table " + table_name)
return True
except Exception as e:
logger.error("Error during remaining partitions ingestion:")
logger.error(traceback.format_exc())
logger.error("Waiting " + str(sleep_nb_secs) + " seconds before continue")
time.sleep(sleep_nb_secs)
return False
La fonction ingest_remaining_partitions renvoie True si l’ingstion se termine bien, sinon False
La fonction generate_file_name ci-dessous permet de créer un nom de fichier correspondant à un pattern de partition et un numéro de fichier incrémenté au fil de l’ingestion d’une partition, et réinitialisé lors du traitement de la partition suivante :
def remove_other_cache_files(cache_folder, partition_file_pattern, partition_name, file_number):
for cur_file_name in listdir(cache_folder):
cur_file_path = join(cache_folder, cur_file_name)
if isfile(cur_file_path):
file_search = re.search(partition_file_pattern, cur_file_name, re.IGNORECASE)
if file_search:
cur_partition_name = file_search.group(1)
cur_file_number = int(file_search.group(2))
if cur_partition_name != partition_name or cur_file_number != file_number:
os.remove(cur_file_path)
3 Gestion de la reprise sur erreur
La fonction recover_on_error ci-dessous va charger les fichiers contenus dans le cache et terminer uniquement l’ingestion de la partition qui a été identifiée dans le cache.
def generate_file_name(csv_file_pattern, partition_name, file_number):
file_name = csv_file_pattern.replace('#PARTITION#', partition_name)
file_name = file_name.replace('#FILE_NUMBER#', str(file_number))
return file_name
Enfin, il est important de comprendre que lorsqu’un fichier est mis en cache, les autres fichiers du cache supposés être déjà ingérés sont supprimés, grâce à la fonction remove_other_cache_files, qui va utiliser les patterns d’expression régulière correspondant aux fichiers ingérés pour savoir le(s)quel(s) supprimer :
def recover_on_error(cache_folder, csv_file_pattern, server, port, db_name, username, password, table_name, primary_key_column, partition_column, csv_file_pattern_replaced, nb_rows_by_csv_file, s3_client, bucket_name, prefix):
logger.info("Lets recover possible previous error")
# 1 retrieve last loaded file in the cache folder
cur_nb_file = 1
partition_to_recover = None
cache_file_path = None
cache_file_name = None
for cur_file_name in listdir(cache_folder):
cur_file_path = join(cache_folder, cur_file_name)
if isfile(cur_file_path):
file_search = re.search(csv_file_pattern_replaced, cur_file_name, re.IGNORECASE)
if file_search:
partition_to_recover = file_search.group(1)
cur_nb_file = int(file_search.group(2))
cache_file_path = cur_file_path
cache_file_name = cur_file_name
# 2 IF a last ingested file exists in the cache
last_recovery_on_error_value = None
if cache_file_path:
logger.info("Recovering previous error using cache file " + cache_file_path)
# 2.1 ingest the last ingested file in S3 (in the case it should not have been ingested yet)
s3_path = prefix + "/" + cache_file_name
s3_client.upload_file(cache_file_path, bucket_name, s3_path)
# 2.2 ingest remaining data of the partition
df = pd.read_csv(cache_file_path)
# 2.3 Generate query to find the remaining data :
# - to add partition criteria : partition column = current partition value
# - to add recovery on error criteria : col used as primary key > primary key max (last of the file) value found
cache_last_row = df.tail(1)
last_recovery_on_error_value = cache_last_row[primary_key_column].item()
sql_recover_last_partition = 'SELECT * FROM ' + table_name \
+ ' with (nolock) WHERE ' + partition_column + " = '" + partition_to_recover + "'"\
+ ' AND ' + primary_key_column + ' > ' + str(last_recovery_on_error_value) \
+ " ORDER BY " + primary_key_column
logger.info("Lets try to recover previous error with query " + sql_recover_last_partition)
# 2.4 Load remaining data in the database, by chunks
ingestion_terminated = False
while not ingestion_terminated:
try:
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + server + ',' + port
+ ';DATABASE=' + db_name + ';UID=' + username + ';PWD=' + password)
df = pd.read_sql_query(sql_recover_last_partition, cnxn, chunksize=nb_rows_by_csv_file)
# 2.5 Ingest each chunk one by one
for cur_chunk in df:
cur_nb_file = cur_nb_file + 1
cur_file_name = generate_file_name(csv_file_pattern, partition_to_recover, cur_nb_file)
cur_file_path = join(cache_folder, cur_file_name)
cur_chunk.to_csv(cur_file_path, index=False)
remove_other_cache_files(cache_folder, csv_file_pattern_replaced, partition_to_recover, cur_nb_file)
s3_path = prefix + "/" + cur_file_name
s3_client.upload_file(cur_file_path, bucket_name, s3_path)
cnxn.close()
ingestion_terminated = True
logger.info("Previous error recovered successfully")
except Exception as e:
#2.7 IF an error occurs, just sleep some seconds, and retry
logger.error("Error recovering previous error :")
logger.error(traceback.format_exc())
logger.error("Waiting " + str(sleep_nb_secs) + " seconds before continue")
time.sleep(sleep_nb_secs)
else:
logger.info("No previous error to recover")
return partition_to_recover
La partition à terminer d’ingérer est renvoyée, ou bien None si rien n’a été trouvé dans le cache
4 Vue d’ensemble du processus
Nous venons de voir l’implémentation des différentes étapes de notre ingestion.
Il ne nous reste plus qu’à les assembler pour réaliser notre ingestion, en supposant qu’il est nécessaire de réaliser une potentielle reprise sur erreur à partir des fichiers contenus dans le cache :
# 1 - load partitions
df_partitions = load_partitions(cache_folder, server, port, db_name, username, password, table_name,
partition_column, from_partition, to_partition)
ingestion_terminated = False
while not ingestion_terminated:
# 2 - recovery on error
recovered_partition = recover_on_error(cache_folder, csv_file_pattern, server, port, db_name, username,
password, table_name, primary_key_column, partition_column,
csv_file_pattern_replaced, nb_rows_by_csv_file, s3_client, bucket_name,
prefix)
# 3 - ingest remaining partitions
ingestion_terminated = ingest_remaining_partitions(df_partitions, server, port, db_name, username, password,
table_name, partition_column, from_partition,
recovered_partition, primary_key_column, from_primary_key,
nb_rows_by_csv_file, cache_folder, csv_file_pattern,
csv_file_pattern_replaced, s3_client, bucket_name, prefix)
5 Conclusion
Cette solution fut utilisée avec succès pour migrer des bases contenant plusieurs dizaines de téraoctets de données.
Son principal avantage est qu’il est tout à fait possible de lancer plusieurs lots de partitions en parallèle sur une même table, tant que la base de données le supporte.
Des améliorations sont possibles, comme l’envoi de notifications en cas d’erreur ou la transformation au format parquet, potentiellement compressé, au lieu du CSV actuel.
Bien sûr il sera nécessaire d’importer les librairies Python nécessaires au bon fonctionnement du script, comme pyodbc ou Pandas.
Pour toute remarque, suggestion ou demande de migration de vos bases de données, nous vous préférons à nous contacter .