Skip to content

Commit

Permalink
Added functionality to upload remote Excel files into the DOT DB. THI…
Browse files Browse the repository at this point in the history
…S REPLACES THE SYNCHRONIZATION FROM A REMOTE DB, but the data is pulled from this file.
  • Loading branch information
JanPeterDatakind committed Jun 21, 2023
1 parent 7d1466f commit d671d77
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 37 deletions.
86 changes: 60 additions & 26 deletions docker/airflow/dags/run_dot_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"""
import json
from os import system
from os import environ
import os
from datetime import datetime, timedelta
import pandas as pd
from airflow.models import DAG # pylint: disable=import-error
Expand All @@ -17,6 +19,7 @@
from airflow.hooks.base import BaseHook # pylint: disable=import-error
from airflow.models import Variable # pylint: disable=import-error
from sqlalchemy import create_engine
import requests


def get_object(
Expand Down Expand Up @@ -269,6 +272,7 @@ def sync_object(
object_name_in, target_conn_in, data, column_list, type_list, source_conn_in
)


def drop_tables_in_dot_tests_schema(target_conn_in, schema_to_drop_from):
"""
We are syncing new data where new columns and columns types might change.
Expand Down Expand Up @@ -303,6 +307,7 @@ def drop_tables_in_dot_tests_schema(target_conn_in, schema_to_drop_from):
cur.execute(query1)
cur.execute(query2)


def run_dot_app(project_id_in):
"""
Method to run the DOT.
Expand Down Expand Up @@ -333,6 +338,52 @@ def default_config():
file = open("./dags/dot_projects.json")
return file

def sync_daily_commcare_report(target_conn_in, report):
username = environ.get("COMMCARE_USER")
password = environ.get("COMMCARE_PW")
print(f"Username: {username}, Password: {password}")
url = Variable.get(f"{report}_report")
response = requests.get(url, auth=(username, password))

with open(f'./dags/{report}.xlsx', 'wb') as f:
f.write(response.content)

connection = BaseHook.get_connection(target_conn_in)
connection_string = (
"postgresql://"
+ str(connection.login)
+ ":"
+ str(connection.password)
+ "@"
+ str(connection.host)
+ ":"
+ str(connection.port)
+ "/"
+ target_conn_in
)

engine = create_engine(
connection_string,
paramstyle="format",
executemany_mode="values",
executemany_values_page_size=1000,
executemany_batch_page_size=200,
)
# read response into pandas dataframe and save to postgres
report_excel = pd.read_excel(f'./dags/{report}.xlsx')
# replace "---" with null so SQL and DOT can handle it
report_excel = report_excel.replace('---', 0)
# transform columns "Date & Time" and "date to date format
report_excel['Date & Time'] = pd.to_datetime(report_excel['Date & Time'])
if report == 'transportlogs':
report_excel['date'] = pd.to_datetime(report_excel['Date & Time']).dt.date

report_excel.to_sql(report, engine, if_exists='replace', index=False, schema='public')

# remove file from local directory
os.remove(f'./dags/{report}.xlsx')


def set_earliest_sync_date():
"""
Sets the earliest date to sync for a project.
Expand All @@ -345,8 +396,8 @@ def set_earliest_sync_date():
config = json.load(file)

# Make edits to the Python object
#ToDo: Make this more dynamic, so that the max date of the table to be scanned is pulled into
#the setting instead of the current date - this gives more flexibility and wont cause
# ToDo: Make this more dynamic, so that the max date of the table to be scanned is pulled into
# the setting instead of the current date - this gives more flexibility and wont cause
# records not to be scanned if the source DB is updated on a different schedule than DOT runs
# Alternatively, always use two weeks to deal with delayed syncing (aka. if a record with
# date 01-01-2023 is only synced into the DB on 01-05-2023)
Expand All @@ -359,6 +410,7 @@ def set_earliest_sync_date():
# Write the modified Python object back to the file in JSON format
json.dump(config, file)


with DAG(
dag_id="run_dot_project",
schedule_interval="@weekly",
Expand Down Expand Up @@ -405,35 +457,17 @@ def set_earliest_sync_date():
)
)

# Sync data and link to dot.
for i in range(len(objects_to_sync)):

object_name = objects_to_sync[i]["object"]
if "date_field" in objects_to_sync[i] and objects_to_sync[i]["date_field"] != "":
date_field = objects_to_sync[i]["date_field"]
else:
date_field = None
id_field = objects_to_sync[i]["id_field"]
columns_to_exclude = (
objects_to_sync[i]["columns_to_exclude"]
if "columns_to_exclude" in objects_to_sync[i]
else []
)

# Get the data from a object in Postgres and copy to target DB
for element in ["transportlogs", "fuellogs"]:
# Get the data from scheduled report in commcare, transform it, and upload into DOT
af_tasks.append(
PythonOperator(
task_id=f"sync_object_{project_id}_{object_name}",
python_callable=sync_object,
task_id=f'sync_{element}_from_commcare',
python_callable=sync_daily_commcare_report,
op_kwargs={
"object_name_in": object_name,
"earliest_date_to_sync": earliest_date_to_sync,
"date_field": date_field,
"source_conn_in": source_conn,
"target_conn_in": target_conn,
"columns_to_exclude": columns_to_exclude,
"report": element,
},
dag=dag,
dag=dag
)
)

Expand Down
27 changes: 16 additions & 11 deletions docker/docker-compose-with-airflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ x-airflow-common:
AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'
PYTHON_BASE_IMAGE: "python:3:8-slim-buster"
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
COMMCARE_USER: ${COMMCARE_USER}
COMMCARE_PW: ${COMMCARE_PW}
SSH_AUTH_SOCK: /ssh-agent
volumes:
- ./airflow/dags:/opt/airflow/dags
Expand Down Expand Up @@ -86,6 +88,8 @@ services:
POSTGRES_DB: dot_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
COMMCARE_USER: ${COMMCARE_USER}
COMMCARE_PW: ${COMMCARE_PW}
volumes:
- .:/db_dumps
- ../db/dot:/docker-entrypoint-initdb.d
Expand Down Expand Up @@ -126,17 +130,14 @@ services:
# - ../webapp/server:/usr/src/app
# - ./webapp/server/env:/usr/src/app/.env

#dot-webapp-frontend:
# build:
# context: ..
# dockerfile: ./docker/webapp/frontend/Dockerfile
# image: dot-tool-webapp-frontend
# container_name: dot-dot-tool-web-app-frontend
# ports:
# - "3000:3000"
# volumes:
# - ../webapp/frontend:/usr/src/app
# - ./webapp/frontend/env:/usr/src/app/.env
appsmith:
image: datakind/dot_appsmith:latest
container_name: appsmith
ports:
- "82:80"
- "446:443"
volumes:
- ./demo/appsmith/stacks:/appsmith-stacks

# ================================== AIRFLOW ===============================

Expand All @@ -149,6 +150,8 @@ services:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
COMMCARE_USER: ${COMMCARE_USER}
COMMCARE_PW: ${COMMCARE_PW}
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
Expand Down Expand Up @@ -199,6 +202,8 @@ services:
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
COMMCARE_USER: ${COMMCARE_USER}
COMMCARE_PW: ${COMMCARE_PW}

flower:
<<: *airflow-common
Expand Down

0 comments on commit d671d77

Please sign in to comment.