In this post we will go through how to upload data from a CSV to a PostgreSQL Database using python. We will take advantage of pandas data frames to clean and create a schema, and eventually upload a CSV file to a created table of our choice within PostgreSQL database using the psycopg2 module. Pushing and pulling data from a database is a process used across many companies and I will try to review its basics.

For practice purposes we will get our data while connecting to JSONPlaceholder, which is a free online practise REST API. We will use the requests module for making HTTP requests and fetch the data in json format. It will be our first attempt to easily get a response from a REST API. I will cover connection with APIs in a different post.

I will use a pre-set PostgreSQL database but you can use any other PostgreSQL instance of your choice. Amazon RDS for PostgreSQL is yet another free possibility you can play with.

Table of Contents

  1. Getting Data
  2. Parsing JSON with pandas
  3. Table schema
  4. Connecting to PostgreSQL
  5. Creating a table
  6. Upload CSV file

We will start with importing the modules we need:

import requests
import pandas as pd
import psycopg2

Let’s GET() the Data

The GET method is used to retrieve information from a given server using a given url. Here, we are connecting to ‘https://jsonplaceholder.typicode.com/users‘ to get a list of users.

# Getting the data and normalizing the JSON to a pandas data frame 
url = 'https://jsonplaceholder.typicode.com/users'
response = requests.get(url)
json_response = response.json()
df = pd.json_normalize(json_response)

Parsing Nested JSON with Pandas

The response object could be used to access certain features such as content, headers, etc. In order to retrieve the data from the response object, we need to convert the raw response content into a JSON (JavaScript Object Notation) type data structure. JSON is a way to encode data structures and is the primary format in which data is passed back and forth to APIs, and most API servers will send their responses in JSON format.

We will use the json() method within requests to return a JSON object of the result. In the image below we can see the first 2 objects of the JSON response.

{'id': 1, 'name': 'Leanne Graham', 'username': 'Bret', 'email': 'Sincere@april.biz', 'address': {'street': 'Kulas Light', 'suite': 'Apt. 556', 'city': 'Gwenborough', 'zipcode': '92998-3874', 'geo': {'lat': '-37.3159', 'lng': '81.1496'}}, 'phone': '1-770-736-8031 x56442', 'website': 'hildegard.org', 'company': {'name': 'Romaguera-Crona', 'catchPhrase': 'Multi-layered client-server neural-net', 'bs': 'harness real-time e-markets'}},
{'id': 2, 'name': 'Ervin Howell', 'username': 'Antonette', 'email': 'Shanna@melissa.tv', 'address': {'street': 'Victor Plains', 'suite': 'Suite 879', 'city': 'Wisokyburgh', 'zipcode': '90566-7771', 'geo': {'lat': '-43.9509', 'lng': '-34.4618'}}, 'phone': '010-692-6593 x09125', 'website': 'anastasia.net', 'company': {'name': 'Deckow-Crist', 'catchPhrase': 'Proactive didactic contingency', 'bs': 'synergize scalable supply-chains'}},...

Now, let’s make use of Pandas’ json_normalize() to re-structure the JSON data and turn our array of nested JSON objects into a flat DataFrame with dotted column names.

Creating a Table Schema

Let’s continue with our path and use a list comprehension to clean and extract the column names.

df.columns = [item.lower().replace(" ", "_").replace(".", "_") for item in df.columns]

Our list comprehension includes three elements: [expression for item in iterable]

  1. Expression is the item itself, a call to a method, or any other valid expression that returns a value. In our use case, the expression: item.lower().replace(" ", "_").replace(".", "_") cleans and modifies each column name.
  2. Item is the object or value in the list or iterable. In our example, the item is the column name.
  3. Iterable is a list, set, sequence, or any other object that can return its elements one at a time. In our example, the iterable is df.columns .

Next thing we want to do is create a replacements dictionary in order to map pandas data types to SQL data types.





sdjhfgjsdhfgsdkjfh
import dfhgvh
# Replacements dictionary to map pandas dtypes to SQL dtypes
replacements = {
'object': 'varchar',
'float64': 'float',
'int64': 'int',
'datetime64': 'timestamp',
'timedelta64[ns]': 'varchar'
}

And create the final table schema in the form of: <column name1> <column name1 data type>, <column name2> <column name2 data type>,…. Using zip() to pair them as tuples.

We will also save the data frame as a CSV in our local directory to load it later to our database.

# Creating a table schema
col_str = ", ".join("{} {}".format(n, d) for (n, d) in zip(df.columns,     df.dtypes.replace(replacements)))

# Save df to csv
df.to_csv('post_users.csv', header=df.columns, index=False, encoding='utf-8')

Connecting to PostgreSQL

First, we need to log in to the PostgreSQL database server using a client tool such as pgAdmin and create or locate a database we would like to upload our data. PgAdmin would also be the tool to go when we want to check our actions within the database.

To create a database, you can use the following statement:

CREATE DATABASE <database_name>;

While establishing a connection with the PostgreSQL database server, we would need to have some parameters:

dbname  The name of the database.

password Password used to authenticate.

host  Database server address (e.g. localhost or an IP address).

port 5432 unless mentioned otherwise.

To connect to our database, we’ll use the connect() function and create a database instance. Using the connection object, we can create a cursor that will allow us to execute any SQL statement.

# db connection
host = 'IP_Address'
dbname = 'database_name'
user = 'username'
password = 'password'

Although for now, we use our credentials within the script, it is usually not the best practise. A better solution might be to store them in another python file (e.g. user_credentials.py), import the file/library and then call the credentials (e.g. host = user_credentials.host).

Another best practise when connecting to a resource is to use Python try...except...finally statement:

  • The try block enables us to test a block of code for errors.
  • The except block lets us handle the error.
  • And the finally block lets us execute code, regardless of the result of the try and except blocks. It is usually used to close all resources and connections made in the code.

Creating a Table

Let’s write our code using the try...except...finally statement. We will connect to the database using our credentials and create a curser where we will execute all our SQL queries.

conn = None
cur = None

try:
    conn_str = "host=%s dbname=%s user=%s password=%s" % (host, dbname, user, password)
    conn = psycopg2.connect(conn_str)
    print('PostgreSQL server information')
    print(conn.get_dsn_parameters(), "\n")

    # Cursor
    cur = conn.cursor()
    print('Database opened successfully')

    # Drop table if exists
    cur.execute("DROP TABLE IF EXISTS post_users_test")

    # Create table with the created table schema 
    cur.execute("CREATE TABLE post_users_test (%s)" % col_str)
    conn.commit()
    print('Table created successfully')

except (Exception, psycopg2.DatabaseError) as error:
    print(error)

finally:
    if cur is not None:
        cur.close()
    if conn is not None:
        conn.close()
    print('PostgreSQL connection is closed')

After creating the cursor, we execute 2 SQL queries. First to drop the table if exists and then to create the table post_users_test. With conn.commit() we basically commit the changes made in the database.

Lastly, unless intercepted with an error, we close the the cursor and the connection.

Finally: Uploading our CSV file

Now, that our table has been created, we are ready to push our CSV file with the users data. For that we will construct our SQL query and use a copy_expert() to inject the data to the database.

# Preparing to upload csv to db:
# Open csv file, and save it as an object
csv_file = open('post_users.csv')
print('file opened in memory')

# SQL copy statement
SQL = """
COPY post_users_test FROM STDIN WITH
    CSV
    HEADER
    DELIMITER AS ','
"""

cur.copy_expert(sql=SQL, file=csv_file)
conn.commit()
print('file copied to db')

We opened the CSV file in memory, and then copied the file to the table we have created.

We would basically want to include the last action we made within our  try...except...finally statement.

And So, our script is Done!

import requests
import pandas as pd
import psycopg2


# Getting Data
url = 'https://jsonplaceholder.typicode.com/users'

response = requests.get(url)
json_response = response.json()
df = pd.json_normalize(json_response)

# Creating a table schema
df.columns = [item.lower().replace(" ", "_").replace(".", "_") for item in df.columns]
print(df.columns)

# Replacements dictionary to map pandas dtypes to SQL dtypes
replacements = {
    'object': 'varchar',
    'float64': 'float',
    'int64': 'int',
    'datetime64': 'timestamp',
    'timedelta64[ns]': 'varchar'
}

col_str = ", ".join("{} {}".format(n, d) for (n, d) in zip(df.columns, df.dtypes.replace(replacements)))


# Save df to csv
df.to_csv('post_users.csv', header=df.columns, index=False, encoding='utf-8')
print('csv file created')

# db connection
host = 'IP_Address'
dbname = 'database_name'
user = 'username'
password = 'password'

conn = None


try:
    conn_str = "host=%s dbname=%s user=%s password=%s" % (host, dbname, user, password)
    with psycopg2.connect(conn_str) as conn:
        print("PostgreSQL server information")
        print(conn.get_dsn_parameters(), "\n")

        # Cursor
        with conn.cursor() as cur:
            print('database opened successfully')

            # Drop table if exists
            cur.execute("DROP TABLE IF EXISTS post_users_test")

            # Create table
            cur.execute("CREATE TABLE post_users_test (%s)" % col_str)
            print('table created successfully')

            # Preparing to upload csv to db:
            # Open csv file, and save it as an object
            csv_file = open('post_users.csv')
            print('file opened in memory')

            # SQL copy statement
            SQL = """
            COPY post_users_test FROM STDIN WITH
                CSV
                HEADER
                DELIMITER AS ','
            """

            cur.copy_expert(sql=SQL, file=csv_file)
            print('file copied to db')

            # Checking the data from PostgreSQL 
            cur.execute("SELECT * FROM post_users_test")
            for row in cur.fetchall():
                print(row)

except (Exception, psycopg2.DatabaseError) as error:
    print(error)


finally:
    if conn is not None:
        conn.close()
    print('PostgreSQL connection is closed')
    print('table post_users_test import completed')

If you follow along so far, you might have noticed an extra snippet of code at the end of the try block, where we check the table in the database and fetch all rows from post_users_test table to show in the console.

# Checking the data from PostgreSQL      
cur.execute("SELECT * FROM post_users_test")     
for row in cur.fetchall():         
    print(row)

Another ‘better code’ modification I have made was to use a context manager (e.g.  with statement). We have used that before while trying to open the file in memory, but we can also incorporate that when creating the connection and cursor. The with statement will make sure that the cursor will close automatically and all executions are committed!

How cool is that!

Summary

In this post we connected with a practise API to fetch data and uploaded that to a PostgreSQL database. We then used pandas to parse the data and create a table schema, and last we used psycopg2 to connect with our database and upload a CSV file with the data to populate the table in the database.

Although we are basically done with the script, there are surely a few things we can improve on. In a future post, we will modify and reconstruct our code in order to create functions for the tasks and in this way make the code robust and usable in other similar projects as well.