Data Engineering Saga part 2
Cris Crawford
Posted on January 20, 2024
Today I was again extricated from a self-imposed ditch of error by the instructor and another guy.
I understood that I needed a "docker network" in order to run postgres in one container and pgAdmin in another. I knew that the pgAdmin container had to know the name of the postgres docker file. I could do this. I could log in to pgAdmin and see the postgres database. The next step was to watch a video where the instructor turned the python notebook into a python script, and then created a third docker container that ingested data into postgres.
I watched until a certain point, then tried to run the docker container that I built. Little did I know that the very next thing the instructor did in the video was to add the line "--network=pg-network" to the argument list of "docker run". Then to make matters worse, I also stopped the video just before he changed the host to the postgres database rather than "localhost".
Eventually I got it working, but it was very frustrating, and more so because it was completely avoidable. Next time watch the whole video from start to finish, and then go back and write the code.
Here's the docker command that worked:
docker run -it \
--network=pg-network \
taxi_ingest:v001 \
--user=root \
--password=root \
--host=pg-database \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_data
taxi_ingest:v001 was the name of the docker container that I built, in which a python script read a csv file and pushed it to postgres. I used argv to parse the six arguments that follow.
Here's the python script:
import pandas as pd
from sqlalchemy import create_engine
from time import time
import argparse
def main(params):
user = params.user
password = params.password
host = params.host
port = params.port
db = params.db
table_name = params.table_name
csv_name = 'yellow_tripdata_2021-01.csv'
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
while (df := next(df_iter, None)) is not None:
t_start = time()
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
t_end = time()
print('inserted another chunk..., took %.3f seconds' % (t_end - t_start))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Ingest CSV data to Postgres')
parser.add_argument('--user', help='user name for postgres')
parser.add_argument('--password', help='password for postgres')
parser.add_argument('--host', help='host for postgres')
parser.add_argument('--port', help='port for postgres')
parser.add_argument('--db', help='database name for postgres')
parser.add_argument('--table_name', help='name of the table that we write')
args = parser.parse_args()
main(args)
and here is the Dockerfile:
FROM python:3.9
RUN pip install pandas sqlalchemy psycopg2
WORKDIR /app
COPY ingest_data.py ingest_data.py
COPY yellow_tripdata_2021-01.csv yellow_tripdata_2021-01.csv
ENTRYPOINT [ "python", "ingest_data.py" ]
These are very elementary steps to read one month of data into a local postgres database. In the coming weeks we'll read many more of these data files into Google Cloud Platform, and learn now to visualize and query them. There are hundreds of millions of taxi trips in New York City in any particular year. Anyone can join the class at any time, but the time for a newbie like me to ask questions and get answers is now. https://github.com/DataTalksClub/data-engineering-zoomcamp
Posted on January 20, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.