Building ETL pipelines with ORM

One of the most common tasks for anyone working in software is to work at some point with persisted data. That means that the data is permanently stored either in some database (Relational or NoSQL) either even in some files.

In this post, we will examine how we can interact with databases – more specifically with Postgres- and Python by using an Object Relational Mapper (ORM). There are a ton of articles out there in this scope but the vast majority of them are using SQLite as a database, so I wanted to demonstrate how this works in a scenario closer to reality, thus we will use Postgres as our database.

Object Relational Mapping

First thins first, we need to make clear what an Object Relational Mapper is, and in order to do this, we need to clarify on Object Relational Mapping concept. As per Wikipedia:

Object-relational mapping (ORMO/RM, and O/R mapping tool) in computer science is a programming technique for converting data between incompatible type systems using object-oriented programming languages. This creates, in effect, a “virtual object database” that can be used from within the programming language. There are both free and commercial packages available that perform object-relational mapping, although some programmers opt to construct their own ORM tools.


Long story short, Mapping is the concept, and Mapper is the tool that implements it. So ORM refers to synchronization between two different representations of data. From one side, there is a relational database, where the data is represented in terms of tuples, grouped into relations, while in
an application there are objects.

For example, imagine that we have a ‘Cars’ table in our database. If we wanted to retrieve them all with SQL we would do something like:

SELECT * FROM cars;

Using an ORM -theoretically, for this example’s shake- in our codebase the same query would be written:

Car.find_all()

Where Car is a class that represents the table/entity in our database and find_all() a method that retrieves the data.

Most of the developers would not even consider working without an ORM tool while they develop an application. Although as anything in software engineering, using an ORM over native SQL has its Pros and Cons and vise versa.

ORMNative SQL
PROSCONSPROSCONS
Great for simple CRUD SlowerFaster and more expressive than ORMRequires manual intervention to handle SQL injections
Easy to switch or upgrade DatabaseMemory usageEasy to store in source controlSending raw SQL over network
Easy to store in source controlLearning curveContext switching
Speeds up developmentNot available for all languages

There are scenarios when we will need to have both in our source control. ORM for the more simple parts of interaction with database and SQL scripts for queries or aggregation where we need to be more expressive. Application wise, what we do is that we are adding one more layer in our application in order to implement ORM.

SQL Alchemy

Enough with the theory, for now, so let us begin diving to code. One of the most commonly used, well maintained, documented, and feature-rich mappers that we can use with Python is SQL Alchemy. This is the ORM that will use in this tutorial as well.

We can install it globally with:

pip install sqlalchemy

If you are intending to use it with Flask framework there is an extension for it:

pip install Flask-SQLAlchemy

Application Example

For our example, we will build an ETL pipeline which will get data from an online source (REST API), and it will store it in our database normalized. We will use PostgreSQL as our database.

In order to accomplish the above tasks, we will use SQL Alchemy. Note that, we will use the native package and not the one coming with Flask. There are some differences in implementation in case you will use the Flask extension. Let us first examine the source we need to integrate, which comes from a country API (https://restcountries.eu/) and for each record it looks like this:

  {
    "name": "Italy",
    "topLevelDomain": [
      ".it"
    ],
    "alpha2Code": "IT",
    "alpha3Code": "ITA",
    "callingCodes": [
      "39"
    ],
    "capital": "Rome",
    "altSpellings": [
      "IT",
      "Italian Republic",
      "Repubblica italiana"
    ],
    "region": "Europe",
    "subregion": "Southern Europe",
    "population": 60665551,
    "latlng": [
      42.83333333,
      12.83333333
    ],
    "demonym": "Italian",
    "area": 301336.0,
    "gini": 36.0,
    "timezones": [
      "UTC+01:00"
    ],
    "borders": [
      "AUT",
      "FRA",
      "SMR",
      "SVN",
      "CHE",
      "VAT"
    ],
    "nativeName": "Italia",
    "numericCode": "380",
    "currencies": [
      {
        "code": "EUR",
        "name": "Euro",
        "symbol": "€"
      }
    ],
    "languages": [
      {
        "iso639_1": "it",
        "iso639_2": "ita",
        "name": "Italian",
        "nativeName": "Italiano"
      }
    ],
    "translations": {
      "de": "Italien",
      "es": "Italia",
      "fr": "Italie",
      "ja": "イタリア",
      "it": "Italia",
      "br": "Itália",
      "pt": "Itália",
      "nl": "Italië",
      "hr": "Italija",
      "fa": "ایتالیا"
    },
    "flag": "https://restcountries.eu/data/ita.svg",
    "regionalBlocs": [
      {
        "acronym": "EU",
        "name": "European Union",
        "otherAcronyms": [
          
        ],
        "otherNames": [
          
        ]
      }
    ],
    "cioc": "ITA"
  }

There are several entities, where entities in our context mean both tables and objects, in this data. For example we can easily identify the Country entity which can hold all the high level details for each one of the countries provided from the API. There is also the Language entity and it seems, since this is an array that a country can have several languages related and similarly the Currency entity. Let us start with those by designing our database.

Database Design

From the entities which we identified earlier, we could conclude in a schema which would look like this:

This is pretty straight forward. We are using some “main” tables to hold the necessary info and we also use two intermediate tables to hold the relationships (many to many).
Now that we know what we want to do with our data, let us proceed with the actual implementation of the application.

Implementation

As mentioned earlier we will extract some data from the web and we will load it in our database. The idea is to use and object oriented approach so we will be using SQLAlchemy to help us achieve this.

Example

Before we dive in the actual implementation let us check on a sample usage of SQLAlchemy.

from sqlalchemy import create_engine, event, DDL
from sqlalchemy import Column, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

conn_string = "postgres://user:password@host:port/db"
db = create_engine(conn_string, echo=True)
Base = declarative_base()

event.listen(
    Base.metadata, "before_create", DDL("CREATE SCHEMA IF NOT EXISTS orm_example")
)


class Test(Base):
    __tablename__ = "test"
    __table_args__ = {"schema": "orm_example"}

    title = Column(String, primary_key=True)


Session = sessionmaker(db)
session = Session()

Base.metadata.create_all(db)

new_entry = Test(title="hello")

session.add(new_entry)
session.commit()

In lines 1-4 we have the imports which we are using. Next we are setting up the connection string for our database. In line 7 we create an engine to interact with our database. We can use a lot of keyword arguments while calling create_engine() after we have entered the connection string. In this case we have opted for echo=True which will print all the interactions (SQL commands) of our program with our database and it will help us with debugging.

Then we get a base class for declarative class definitions from the declarative_base() invocation. This will be able to produce Table objects and mappings when it is called for a class.

In line 10 we have a command which will run just before a specific SQLAlchemy operation. On this case before creating our tables (line 25). Those are called events. In this specific scenario what we want to do is to create the schema which we want to work on if it is does not exists.

Next, we declare our class Test which represent a database table. The attributes of this class are the table’s columns. Since we are inheriting from Base class -the one we created earlier-, this simple class will have all the basic CRUD functionality given from SQLAlchemy ORM. Although be aware of the two dunder attributes __tablename__ and __table_args__. The tablename is the actual name of the table in the database and in the table_args we can provide any additional info as keyword arguments. In our case we provided the schema of the table.

In lines 22 and 23 we create a session to interact with our database. Then, in line 25 we use create_all method to create all the tables we have declared earlier (Pretty much anything that inherits from Base).

From this point onward, we can easily interact with our database. All we need is to create instances of our classes and call the respective CRUD methods.

This should suffice for now to get us going with our mini project. Let us start, by creating our models (classes) that will represent database tables, based on the diagram we saw earlier.

Modeling

The difference know is that we will have associations between our tables. To be specific we need to represent many too many relationships with our classes, in both the country-currency and the country language. Those will be similar so let us create the models-classes for the country currency.

from sqlalchemy import Column, String, create_engine, event, DDL, Integer, ForeignKey
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship

conn_string = "postgres://user:password@host:port/db"
db = create_engine(conn_string, echo=True)
Base = declarative_base()


event.listen(
    Base.metadata, "before_create", DDL("CREATE SCHEMA IF NOT EXISTS orm_example_countries")
)


class Country(Base):
    __tablename__ = "country"
    __table_args__ = {"schema": "orm_example_countries"}

    country_tlc = Column(String(3), primary_key=True)
    name = Column(String(150))
    capital = Column(String(30))
    population = Column(Integer)
    native_name = Column(String(150))

    country_currencies = relationship("CountryCurrency", cascade="all, delete-orphan", backref="country")
    currencies = association_proxy('country_currencies', 'currency')

class Currency(Base):
    __tablename__ = "currency"
    __table_args__ = {"schema": "orm_example_countries"}

    code = Column(String(6), primary_key=True,)
    name = Column(String)
    symbol = Column(String(50))

    def __init__(self, code=None, name=None, symbol=None):
        self.code = code
        self.name = name
        self.symbol = symbol


class CountryCurrency(Base):
    __tablename__ = "country_currency"
    __table_args__ = {"schema": "orm_example_countries"}

    country_tlc = Column(String, ForeignKey(Country.country_tlc), primary_key=True)
    currency_code = Column(String, ForeignKey(Currency.code), primary_key=True)

    currency = relationship("Currency")

    def __init__(self, currency_to_store):
        self.currency_code = currency_to_store.code


Session = sessionmaker(db)
session = Session()

Base.metadata.drop_all(db)
Base.metadata.create_all(db)


There are couple of thinks to note in this code. Let ‘s start with the Country model. Lines 17-24 are similar to what we did earlier. In line 26 we are establishing the relation with the intermediate many to many table (CountryCurrency) and also declare the CASCADE behavior. In line 27 we can see the association_proxy. This is one of the ways to build associations in SQLAlchemy. From docs:

association_proxy is used to create a read/write view of a target attribute across a relationship. It essentially conceals the usage of a “middle” attribute between two endpoints, and can be used to cherry-pick fields from a collection of related objects or to reduce the verbosity of using the association object pattern. Applied creatively, the association proxy allows the construction of sophisticated collections and dictionary views of virtually any geometry, persisted to the database using standard, transparently configured relational patterns

Code wise that means we can access the currencies of a country as the country’s attribute. Although, to make it work as we want, which actually is to each country having one list of currencies we need to declare the constructors in the associated classes. For the CountryCurrency note that we declare in the constructor only the currency. The reason is that we will call it from an already created country object, thus already knowing which instance we refer to. Similarly we will create the language model and association.

The last new thing in this code is the line 59. This is essentially a DROP SCHEMA CASCADE statement. You can use it while developing to test you program end to end.

Let us now proceed on consuming the API and saving the data in the database, meaning developing an ETL pipeline.

ETL

For consuming the API we will simply use the requests library. As you may noticed the API returns a list. For each element of this list we want to save the country, its languages and its currencies.

response = requests.get("https://restcountries.eu/rest/v2/all")

countries_data = response.json()

for country in countries_data:

    country_to_save = Country(
        country_tlc=country.get("alpha3Code"),
        name=country.get("name"),
        capital=country.get("capital"),
        population=country.get("population"),
        native_name=country.get("nativeName"),
    )

    currencies_from_api = country.get("currencies")
    for currency in currencies_from_api:
        currency_to_save = Currency(
            code=currency.get("code"),
            name=currency.get("name"),
            symbol=currency.get("symbol"),
        )

        if currency_to_save.code is None:
            continue

        currency_exists = session.query(Currency).filter_by(code=currency_to_save.code).first()
        if currency_exists:
            country_to_save.currencies.append(currency_to_save)
            continue
        else:
            session.add(currency_to_save)
            session.commit()
            country_to_save.currencies.append(currency_to_save)

    languages_from_api = country.get("languages")
    for language in languages_from_api:
        language_to_save = Language(
            iso639_1=language.get("iso639_1"),
            iso639_2=language.get("iso639_2"),
            name=language.get("name"),
            native_name=language.get("nativeName")
        )
        language_exists = session.query(Language).filter_by(iso639_2=language_to_save.iso639_2).first()
        if language_exists:
            country_to_save.languages.append(language_to_save)
            continue
        else:
            session.add(language_to_save)
            session.commit()
            country_to_save.languages.append(language_to_save)

    session.add(country_to_save)
    session.commit()

For each element of the initial returned list we create an instance of the country class. This element also holds lists with the currencies and the languages. For each one of them we create an object as well. We are checking if this instance already exists in the database and if yes we just add it in the country (Lines 27-29 and 44-46) and we continue to the next element. If it does not exists we save it in the database (Lines 31-32 and 48-49) and then we add it in the country’s respective attribute (Lines 33 and 50). In the end we save the country object in the database. At this point we also save and all the relations of its country object.

The complete code can be found below:

import requests
from sqlalchemy import Column, String, create_engine, event, DDL, Integer, ForeignKey
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship

conn_string = "postgres://user:password@host:port/db"
db = create_engine(conn_string, echo=True)
Base = declarative_base()

event.listen(
    Base.metadata, "before_create", DDL("CREATE SCHEMA IF NOT EXISTS orm_example_countries")
)


class Country(Base):
    __tablename__ = "country"
    __table_args__ = {"schema": "orm_example_countries"}

    country_tlc = Column(String(3), primary_key=True)
    name = Column(String(150))
    capital = Column(String(30))
    population = Column(Integer)
    native_name = Column(String(150))

    country_currencies = relationship("CountryCurrency", cascade="all, delete-orphan", backref="country")
    currencies = association_proxy('country_currencies', 'currency')

    country_languages = relationship("CountryLanguage", cascade="all, delete-orphan", backref="country")
    languages = association_proxy('country_languages', 'language')


class Currency(Base):
    __tablename__ = "currency"
    __table_args__ = {"schema": "orm_example_countries"}

    code = Column(String(6), primary_key=True, )
    name = Column(String)
    symbol = Column(String(50))

    def __init__(self, code=None, name=None, symbol=None):
        self.code = code
        self.name = name
        self.symbol = symbol


class Language(Base):
    __tablename__ = "language"
    __table_args__ = {"schema": "orm_example_countries"}

    iso639_2 = Column(String(3), primary_key=True, )
    iso639_1 = Column(String(2))
    name = Column(String)
    native_name = Column(String(50))

    def __init__(self, iso639_1, iso639_2, name, native_name):
        self.iso639_1 = iso639_1
        self.iso639_2 = iso639_2
        self.name = name
        self.native_name = native_name


class CountryCurrency(Base):
    __tablename__ = "country_currency"
    __table_args__ = {"schema": "orm_example_countries"}

    country_tlc = Column(String, ForeignKey(Country.country_tlc), primary_key=True)
    currency_code = Column(String, ForeignKey(Currency.code), primary_key=True)

    currency = relationship("Currency")

    def __init__(self, currency_to_store):
        self.currency_code = currency_to_store.code


class CountryLanguage(Base):
    __tablename__ = "country_language"
    __table_args__ = {"schema": "orm_example_countries"}

    country_tlc = Column(String, ForeignKey(Country.country_tlc), primary_key=True)
    language_id = Column(String, ForeignKey(Language.iso639_2), primary_key=True)

    language = relationship("Language")

    def __init__(self, language_to_store):
        self.language_id = language_to_store.iso639_2


Session = sessionmaker(db)
session = Session()

Base.metadata.drop_all(db)
Base.metadata.create_all(db)

response = requests.get("https://restcountries.eu/rest/v2/all")

countries_data = response.json()

for country in countries_data:

    country_to_save = Country(
        country_tlc=country.get("alpha3Code"),
        name=country.get("name"),
        capital=country.get("capital"),
        population=country.get("population"),
        native_name=country.get("nativeName"),
    )

    currencies_from_api = country.get("currencies")
    for currency in currencies_from_api:
        currency_to_save = Currency(
            code=currency.get("code"),
            name=currency.get("name"),
            symbol=currency.get("symbol"),
        )

        if currency_to_save.code is None:
            continue

        currency_exists = session.query(Currency).filter_by(code=currency_to_save.code).first()
        if currency_exists:
            country_to_save.currencies.append(currency_to_save)
            continue
        else:
            session.add(currency_to_save)
            session.commit()
            country_to_save.currencies.append(currency_to_save)

    languages_from_api = country.get("languages")
    for language in languages_from_api:
        language_to_save = Language(
            iso639_1=language.get("iso639_1"),
            iso639_2=language.get("iso639_2"),
            name=language.get("name"),
            native_name=language.get("nativeName")
        )
        language_exists = session.query(Language).filter_by(iso639_2=language_to_save.iso639_2).first()
        if language_exists:
            country_to_save.languages.append(language_to_save)
            continue
        else:
            session.add(language_to_save)
            session.commit()
            country_to_save.languages.append(language_to_save)

    session.add(country_to_save)
    session.commit()

And in github.

This is a complete ETL pipeline developed with ORM. Although as you can see, we have some repeated code and also our code is in one file. Creating modular -or package ORM- applications with SQLAlchemy is a bit tricky due to potential circular importations. Reducing the code and making the code base more readable and easier to maintain are some goals which we should always try to achieve. We will tackle both of those issues in a future post.