Script Acceso Libre 20 Feb, 2026

Script Python: ETL Pipeline — CSV a PostgreSQL con pandas

Pipeline de extracción, transformación y carga de datos desde archivos CSV hacia PostgreSQL. Incluye validación, limpieza de datos y manejo de duplicados.

#python #etl #pandas #postgresql #datos

Contenido

ETL Pipeline: CSV → PostgreSQL

Un pipeline ETL sencillo pero production-ready usando pandas y SQLAlchemy.

Requisitos

pip install pandas sqlalchemy psycopg2-binary python-dotenv

Código

import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import os

load_dotenv()

class ETLPipeline:
    def __init__(self, db_url=None):
        self.db_url = db_url or os.getenv('DATABASE_URL')
        self.engine = create_engine(self.db_url)

    def extract(self, filepath, encoding='utf-8'):
        """Lee CSV con detección automática de separador"""
        df = pd.read_csv(filepath, encoding=encoding, sep=None, engine='python')
        print(f"Extraídos: {len(df)} registros, {len(df.columns)} columnas")
        return df

    def transform(self, df):
        """Limpia y transforma los datos"""
        # Eliminar duplicados
        initial = len(df)
        df = df.drop_duplicates()
        print(f"Duplicados eliminados: {initial - len(df)}")

        # Normalizar columnas
        df.columns = [c.lower().strip().replace(' ', '_') for c in df.columns]

        # Eliminar filas completamente vacías
        df = df.dropna(how='all')

        # Convertir fechas si existen
        for col in df.columns:
            if 'fecha' in col or 'date' in col:
                df[col] = pd.to_datetime(df[col], errors='coerce')

        return df

    def load(self, df, table_name, if_exists='append'):
        """Carga a PostgreSQL"""
        df.to_sql(table_name, self.engine, if_exists=if_exists, index=False)
        print(f"Cargados: {len(df)} registros → tabla '{table_name}'")

    def run(self, filepath, table_name):
        """Ejecuta el pipeline completo"""
        df = self.extract(filepath)
        df = self.transform(df)
        self.load(df, table_name)
        return df

Uso

pipeline = ETLPipeline()
df = pipeline.run('ventas_2024.csv', 'ventas')

Recurso Externo

Este recurso incluye un enlace externo. Regístrate para acceder.

Inicia Sesión para Acceder

Únete a la Comunidad

Regístrate gratis para descargar archivos, guardar recursos en favoritos, ganar XP y acceder a cursos y el foro de la comunidad.

¿Ya tienes cuenta? Inicia sesión

Erik Taveras

Autor

Erik Taveras

Recursos Relacionados