Skip to content

Commit

Permalink
testing framework integration
Browse files Browse the repository at this point in the history
  • Loading branch information
bryansmith-db committed Oct 23, 2023
1 parent 8f96e66 commit 4b5ffdf
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 181 deletions.
3 changes: 2 additions & 1 deletion 00_Intro & Config.py → 00_Intro_&_Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@

# DBTITLE 1,Directories
# path where files are stored
mount_path = '/tmp/zingg_ncvoters'
#mount_path = '/tmp/zingg_ncvoters'
mount_path = '/home/bryan.smithdatabricks.com/zingg_ncvoters'

config['dir'] = {}

Expand Down
12 changes: 7 additions & 5 deletions 01_Prepare Data.py → 01_Prepare_Data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# COMMAND ----------

# DBTITLE 1,Get Config
# MAGIC %run "./00_Intro & Config"
# MAGIC %run "./00_Intro_&_Config"

# COMMAND ----------

Expand All @@ -32,9 +32,11 @@
# COMMAND ----------

# DBTITLE 1,Reset the Directories
for k,v in config['dir'].items(): # for each directory identified in config
dbutils.fs.rm(v, recurse=True) # remove the dir and all child content
dbutils.fs.mkdirs(dir) # recreate empty dir
## CODE DISABLED TO ENABLE AUTOMATED TESTING

#for k,v in config['dir'].items(): # for each directory identified in config
# dbutils.fs.rm(v, recurse=True) # remove the dir and all child content
# dbutils.fs.mkdirs(dir) # recreate empty dir

# COMMAND ----------

Expand Down Expand Up @@ -72,7 +74,7 @@
# MAGIC cd /$DOWNLOADS_FOLDER
# MAGIC
# MAGIC # download the data file
# MAGIC wget -q https://www.informatik.uni-leipzig.de/~saeedi/5Party-ocp20.tar.gz
# MAGIC wget -q https://dbs.uni-leipzig.de/ds/5Party-ocp20.tar.gz
# MAGIC
# MAGIC # decompress the data file
# MAGIC tar -xf 5Party-ocp20.tar.gz
Expand Down
172 changes: 9 additions & 163 deletions 02_Initial Workflow.py → 02_Initial_Workflow_Part_A.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
# Databricks notebook source
# MAGIC %md The purpose of this notebook is to retrieve and label data as part of the the initial workflow in the Zingg Person Entity-Resolution solution accelerator. This notebook is available on https://github.com/databricks-industry-solutions/customer-er.
# MAGIC %md The purpose of this notebook is to retrieve and label data as part of the the initial workflow in the Zingg Person Entity-Resolution solution accelerator. This notebook is available on https://github.com/databricks-industry-solutions/customer-er.

# COMMAND ----------

# MAGIC %md ## Introduction
# MAGIC
# MAGIC The purpose of this notebook is to train a Zingg model using an initial dataset within which we know there are some duplicate records. With Zingg, this initial workload is executed through three sequential tasks (though the third task is optional). These tasks are:</p>
# MAGIC The purpose of this notebook is to label a number of duplicate records in preparation for the training of the Zingg model. This represents the first part of a two-step initial workflow. This first step is addressed through the execution of the Zingg *findTrainingData* task.
# MAGIC
# MAGIC 1. **findTrainingData** - identify pairs of records that are candidates for being a match. The user will label these pairs as a match, no match or uncertain.
# MAGIC 2. **train** - train a Zingg model using the labeled pairs resulting from user interactions on data coming from (multiple iterations of) the findTrainingData task.
# MAGIC 3. **match** - use the trained Zingg model to match duplicate records in the initial dataset.
# MAGIC </p>
# MAGIC
# MAGIC **NOTE** Because *train* and *match* are so frequently run together, Zingg provides a combined *trainMatch* task that we will employ to minimize the time to complete our work. We will continue to speak of these as two separate tasks but they will be executed as a combined task in this notebook.
# MAGIC
# MAGIC To enable this sequence of tasks, we will need to configure Zingg to understand where to read the initial data as its input. We will also need to specify an output location, though only the *match* task will write data to this location.
# MAGIC
# MAGIC While the *train* task does not write any explicit output, it does persist trained model assets to the *zingg* model folder. The *findTrainingData* task will also send intermidiary data outputs to an *unmarked* folder under the *zingg* model folder. (It will also expect us to place some data in a *marked* frolder location under that same *zingg* model folder.) Understanding where these assets reside can be helpful should you need to troubleshoot certain issues you might encounter during the *findTrainingData* or *train* tasks.
# MAGIC This first step is isolated in this notebook as the second step, *i.e.* training the Zingg model on the labeled data, is more reliably run on a cluster that has been restarted. Separating this first step of the initial workload from the second ensures a more reliable run of these notebooks. In the real world, these two steps of the initial workflow would typically be run separately so that this reflects a natural break in the initial workflow.
# MAGIC
# MAGIC Before jumping into these steps, we first need to verify the Zingg application JAR is properly installed on this cluster and then install the Zingg Python wrapper which provides a simple API for using the application:
# MAGIC Before jumping into this step, we first need to verify the Zingg application JAR is properly installed on this cluster and then install the Zingg Python wrapper which provides a simple API for using the application:

# COMMAND ----------

Expand All @@ -27,7 +18,7 @@
zingg_jar_path = None

# for each jar in the jars folder
for j in dbutils.fs.ls('/FileStore/jars') + dbutils.fs.ls('/tmp/solacc/customer_er/jar/'): # either manually extracted and uploaded jar or use the RUNME Notebook automation
for j in dbutils.fs.ls('/FileStore/jars'):
# locate the zingg jar
if '-zingg_' in j.path:
zingg_jar_path = j.path
Expand All @@ -45,7 +36,7 @@
# COMMAND ----------

# DBTITLE 1,Initialize Config
# MAGIC %run "./00_Intro & Config"
# MAGIC %run "./00_Intro_&_Config"

# COMMAND ----------

Expand Down Expand Up @@ -165,7 +156,7 @@
args.setNumPartitions( sc.defaultParallelism * 20 ) # default parallelism reflects databricks's cluster capacity

# define sample size
args.setLabelDataSampleSize(0.05)
args.setLabelDataSampleSize(0.1)

# COMMAND ----------

Expand Down Expand Up @@ -217,6 +208,7 @@

# this is where you store your marked candidate pairs that will be read by the Zingg train task
MARKED_DIR = findTrainingData.getArguments().getZinggTrainingDataMarkedDir()
print(MARKED_DIR)

# COMMAND ----------

Expand Down Expand Up @@ -475,153 +467,7 @@ def _get_candidate_pairs(depth=0):

# COMMAND ----------

# MAGIC %md ##Step 3: Train the Model & Perform Initial Match
# MAGIC
# MAGIC With a set of labeled data in place, we can now *train* our Zingg model. A common action performed immediately after this is the identification of duplicates (through a *match* operation) within the initial dataset. While we could implement these in two tasks, these so frequently occur together that Zingg provides an option to combine the two tasks into one action:

# COMMAND ----------

# DBTITLE 1,Clear Old Outputs from Any Prior Runs
dbutils.fs.rm(output_dir, recurse=True)

# COMMAND ----------

# DBTITLE 1,Execute the trainMatch Step
trainMatch.execute()

# COMMAND ----------

# MAGIC %md The end result of the *train* portion of this task is a Zingg model, housed in the Zingg model directory. If we ever want to retrain this model, we can re-run the findTrainingData step and add more labeled pairs to our marked dataset and re-run this action. We may need to do that if we notice a lot of poor quality matches or if Zingg complains about having insufficient data to train our model. But once this is run, we should verify a model has been recorded in the Zingg model directory:
# MAGIC
# MAGIC **NOTE** If Zingg complains about *insufficient data*, you may want to restart your Databricks cluster and retry the step before creating additional labeled pairs.

# COMMAND ----------

# DBTITLE 1,Examine Zingg Model Folder
display(
dbutils.fs.ls( trainMatch.getArguments().getZinggModelDir() )
)

# COMMAND ----------

# MAGIC %md The end result of the *match* portion of the last task are outputs representing potential duplicates in our dataset. We can examine those outputs as follows:

# COMMAND ----------

# DBTITLE 1,Examine Initial Output
matches = (
spark
.read
.format('delta')
.option('path', output_dir)
.load()
.orderBy('z_cluster')
)

display(matches)

# COMMAND ----------

# MAGIC %md In the *match* output, we see records associated with one another through a *z_cluster* assignment. The strongest and weakest association of that record with any other member of the cluster is reflected in the *z_minScore* and *z_maxScore* values, respectively. We will want to consider these values as we decide which matches to accept and which to reject.
# MAGIC
# MAGIC Also, we can look at the number of records within each cluster. Most clusters will consist of 1 or 2 records. Thta said there will be other clusters with abnormally large numbers of entries that desere a bit more scrutiny:

# COMMAND ----------

# DBTITLE 1,Examine Number of Clusters by Record Count
display(
matches
.groupBy('z_cluster')
.agg(
fn.count('*').alias('records')
)
.groupBy('records')
.agg(
fn.count('*').alias('clusters')
)
.orderBy('records')
)

# COMMAND ----------

# MAGIC %md As we review the match output, it seems it would be helpful if Zingg provided some high-level metrics and diagnostics to help us understand the performance of our model. The reality is that outside of evaluation scenarios where we may have some form of ground-truth against to evaluate our results, its very difficult to clearly identify the precision of a model such as this. Quite often, the best we can do is review the results and make a judgement call based on the volume of identifiable errors and the patterns associated with those errors to develop a sense of whether our model's performance is adequate for our needs.
# MAGIC
# MAGIC Even if we have a high-performing model, we will have members we will have some matches that we will need to reject. For this exercise, we will accept all the cluster assignment suggestions but in a real-world implementation, you'd want to accept only those cluster and cluster member assignments where all the entries are above a given threshold, *i.e.* *z_minScore*. Any clusters not meeting this criteria would require a manual review.
# MAGIC
# MAGIC If we are satisfied with our results, we can capture our cluster data to a table structure that will allow us to more easily perform incremental data processing. Please note that we are assigning our own unique identifier to the clusters as the *z_cluster* value Zingg assigns is specific to the output associated with a task:

# COMMAND ----------

# DBTITLE 1,Get Clusters & IDs
# custom function to generate a guid string
@fn.udf('string')
def guid():
return str(uuid.uuid1())

# get distinct clusters and assign a guid to each
clusters = (
matches
.select('z_cluster')
.distinct()
.withColumn('cluster_id', guid())
.cache() # cache to avoid re-generating guids with subsequent calls
)

clusters.count() # force full dataset into memory

display( clusters )

# COMMAND ----------

# MAGIC %md And now we can persist our data to tables named *clusters* and *cluster_members*:

# COMMAND ----------

# DBTITLE 1,Write Clusters
_ = (
clusters
.select('cluster_id')
.withColumn('datetime', fn.current_timestamp())
.write
.format('delta')
.mode('overwrite')
.option('overwriteSchema','true')
.saveAsTable('clusters')
)

display(spark.table('clusters'))

# COMMAND ----------

# DBTITLE 1,Write Cluster Members
_ = (
matches
.join(clusters, on='z_cluster')
.selectExpr(
'cluster_id',
'recid',
'givenname',
'surname',
'suburb',
'postcode',
'z_minScore',
'z_maxScore',
'current_timestamp() as datetime'
)
.write
.format('delta')
.mode('overwrite')
.option('overwriteSchema','true')
.saveAsTable('cluster_members')
)

display(spark.table('cluster_members'))

# COMMAND ----------

# MAGIC %md Upon review of the persisted data, it's very likely we will encounter records we need to correct. Using standard [DELETE](https://docs.databricks.com/sql/language-manual/delta-delete-from.html), [UPDATE](https://docs.databricks.com/sql/language-manual/delta-update.html) and [INSERT](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-dml-insert-into.html) statements, we can update the delta lake formatted tables in this environment to achieve the results we require.
# MAGIC
# MAGIC But what happens if we decide to retrain our model after we've setup these mappings? As mentioned above, retraining our model will cause new clusters with overlapping integer *z_cluster* identifiers to be created. In this scenario, you need to decide whether you wish to preserve any manually adjusted mappings from before or otherwise start over from scratch. If starting over, then simply drop and recreate the *clusters* and *cluster_members* tables. If preserving manually adjusted records, the GUID value associated with each cluster will keep the cluster identifiers unique. You'll need to then decide how records assigned to clusters by the newly re-trained model should be merged with the preserved cluster data. It's a bit of a juggle so this isn't something you'll want to do on a regular basis.
# MAGIC %md With labeled data in place, we can now proceed to the next part of the initial workflow.

# COMMAND ----------

Expand Down
Loading

0 comments on commit 4b5ffdf

Please sign in to comment.