From 8c422429c4cadf66c989cd5b84ce19e9f9f38f7e Mon Sep 17 00:00:00 2001 From: dbbnicole Date: Mon, 28 Aug 2023 15:56:20 +0000 Subject: [PATCH] Commit new notebooks --- 00.0_ Intro & Config.py | 270 --------------- 00.1_ Prepare Data.py | 206 ------------ 00.2_ Prepare Jobs.py | 412 ----------------------- 00_Intro & Config.py | 119 +++++++ 01_ Initial Workflow.py | 559 ------------------------------- 01_Prepare Data.py | 284 ++++++++++++++++ 02_ Incremental Workflow.py | 497 ---------------------------- 02_Initial Workflow.py | 634 ++++++++++++++++++++++++++++++++++++ 03_Incremental Workflow.py | 407 +++++++++++++++++++++++ RUNME.py | 50 ++- config/setup.py | 39 --- 11 files changed, 1465 insertions(+), 2012 deletions(-) delete mode 100644 00.0_ Intro & Config.py delete mode 100644 00.1_ Prepare Data.py delete mode 100644 00.2_ Prepare Jobs.py create mode 100644 00_Intro & Config.py delete mode 100644 01_ Initial Workflow.py create mode 100644 01_Prepare Data.py delete mode 100644 02_ Incremental Workflow.py create mode 100644 02_Initial Workflow.py create mode 100644 03_Incremental Workflow.py diff --git a/00.0_ Intro & Config.py b/00.0_ Intro & Config.py deleted file mode 100644 index 2e595e2..0000000 --- a/00.0_ Intro & Config.py +++ /dev/null @@ -1,270 +0,0 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC You may find this series of notebooks at https://github.com/databricks-industry-solutions/customer-er. For more information about this solution accelerator, visit https://www.databricks.com/solutions/accelerators/customer-entity-resolution. - -# COMMAND ---------- - -# MAGIC %md The purpose of this notebook is provided an overview of the content in the other notebooks in this accelerator and to make accessible to these notebooks a set of standard configuration values. - -# COMMAND ---------- - -# MAGIC %md ## Introduction -# MAGIC -# MAGIC The process of matching records to one another is known as entity-resolution. When dealing with entities such as persons, the process often requires the comparison of name and address information which is subject to inconsistencies and errors. In these scenarios, we often rely on probabilistic (*fuzzy*) matching techniques that identify likely matches based on degrees of similarity between these elements. -# MAGIC -# MAGIC There are a wide range of techniques which can be employed to perform such matching. The challenge is not just to identify which of these techniques provide the best matches but how to compare one record to all the other records that make up the dataset in an efficient manner. Data Scientists specializing in entity-resolution often employ specialized *blocking* techniques that limit which customers should be compared to one another using mathematical short-cuts. -# MAGIC -# MAGIC In a [prior solution accelerator](https://databricks.com/blog/2021/05/24/machine-learning-based-item-matching-for-retailers-and-brands.html), we explored how some of these techniques may be employed (in a product matching scenario). In this solution accelerator, we'll take a look at how the [Zingg](https://github.com/zinggAI/zingg) library can be used to simplify an person matching implementation that takes advantage of a fuller range of techniques. - -# COMMAND ---------- - -# MAGIC %md ### The Zingg Workflow -# MAGIC -# MAGIC Zingg is a library that provides the building blocks for ML-based entity-resolution using industry-recognized best practices. It is not an application, but it provides the capabilities required to the assemble a robust application. When run in combination with Databricks, Zingg provides the application the scalability that's often needed to perform entity-resolution on enterprise-scaled datasets. -# MAGIC -# MAGIC To build a Zingg-enabled application, it's easiest to think of Zingg as being deployed in two phases. In the first phase, candidate pairs of potential duplicates are extracted from an initial dataset and labeled by expert users. These labeled pairs are then used to train a model capable of scoring likely matches. -# MAGIC -# MAGIC In the second phase, the model trained in the first phase is applied to newly arrived data. Those data are compared to data processed in prior runs to identify likely matches between in incoming and previously processed dataset. The application engineer is responsible for how matched and unmatched data will be handled, but typically information about groups of matching records are updated with each incremental run to identify all the record variations believed to represent the same entity. - -# COMMAND ---------- - -# MAGIC %md ### Building a Solution -# MAGIC -# MAGIC A typical entity-resolution application will provide a nice UI for end-user interactions with the data and an accessible database from which downstream applications can access deduplicated data. To handle the back-office processing supported by Zingg, Databricks jobs (*workflows*) representing various tasks performed in the Zingg-enabled workflows are implemented and triggered through [Databricks REST API](https://docs.databricks.com/dev-tools/api/latest/index.html) calls originating from the UI. To keep our deployment simple, we'll implement the jobs that would be expected in a typical deployment but then leverage Databricks notebooks to provide the UI experience. The UI experience provided by notebooks is less than ideal for the type of user who would typically be performing expert review in a customer identity resolution scenario but is sufficient to work out the steps in the workflow prior to a more robust UI implementation. -# MAGIC -# MAGIC The Zingg-aligned jobs we will setup in the Databricks environment make use of a JAR file which must be uploaded into the Databricks workspace as a *workspace library* prior to their execution. To access this JAR and create the required workspace library, you can perform the following steps manually:

-# MAGIC -# MAGIC 1. Navigate to the [Zingg GitHub repo](https://github.com/zinggAI/zingg) -# MAGIC 2. Click on *Releases* (located on the right-hand side of repository page) -# MAGIC 3. Locate the latest release for your version of Spark (which was *zingg-0.3.3-SNAPSHOT-spark-3.1.2* at the time this notebook was written) -# MAGIC 4. Download the compiled, gzipped *tar* file (found under the *Assets* heading) to your local machine -# MAGIC 5. Unzip the *tar.gz* file and retrieve the *jar* file -# MAGIC 6. Use the file to create a JAR-based library in your Databricks workspace following [these steps](https://docs.databricks.com/libraries/workspace-libraries.html) -# MAGIC -# MAGIC We provided a script in `./config/setup` that automates these steps and sets up the jar file and source data for you. The script is executed as part of the next notebook (`00.1`). - -# COMMAND ---------- - -# MAGIC %md ### The Solution Accelerator Assets -# MAGIC -# MAGIC This accelerator is divided into two high-level parts. In the first part, we focus on the setup of the environment required by the application. In the second part, we implement the Zingg-enabled application workflow. -# MAGIC -# MAGIC The notebooks that make up the first part are:

-# MAGIC -# MAGIC * **00.0 Intro & Config** - used to provide access to a common set of configuration settings -# MAGIC * **00.1 Setup 01: Prepare Data** - used to setup the data that will be matched/linked in the application -# MAGIC * **00.2 Setup 02: Prepare Jobs** - used to setup the Zingg jobs -# MAGIC -# MAGIC The notebooks that make up the second part are:

-# MAGIC -# MAGIC * **01: Initial Workflow** - implements the process of identifying candidate pairs and assigning labels to them. From the labeled pairs, a model is trained and database structures are initialized. -# MAGIC * **02: Incremental Workflow** - implements the process of incrementally updating the database based on newly arriving records. - -# COMMAND ---------- - -# MAGIC %md ## Configuration -# MAGIC -# MAGIC To enable consistent settings across the notebooks in this accelerator, we establish the following configuration settings: - -# COMMAND ---------- - -# DBTITLE 1,Initialize Configuration Variable -if 'config' not in locals(): - config = {} - -# COMMAND ---------- - -# DBTITLE 1,Initialize Database -# set database name -config['database name'] = 'ncvoters' - -# create database to house mappings -_ = spark.sql('CREATE DATABASE IF NOT EXISTS {0}'.format(config['database name'])) - -# set database as default for queries -_ = spark.catalog.setCurrentDatabase(config['database name'] ) - -# COMMAND ---------- - -# DBTITLE 1,Set Zingg Model Name -config['model name'] = 'ncvoters' - -# COMMAND ---------- - -# MAGIC %md We'll need to house quite a bit of data in specific locations to support different stages of our work. We'll target a folder structure as follows:

-# MAGIC -# MAGIC -# MAGIC -# MAGIC **NOTE** We will create several additional subfolders under many of the directories in this folder structure. This will be done within the various notebooks as needed. - -# COMMAND ---------- - -# DBTITLE 1,Initialize Folder Structure -# mount path where files are stored -mount_path = '/tmp/ncvoters' - -config['dir'] = {} -config['dir']['config'] = f'{mount_path}/config' -config['dir']['downloads'] = f'{mount_path}/downloads' # original unzipped data files -config['dir']['input'] = f'{mount_path}/input' -config['dir']['output'] = f'{mount_path}/output' -config['dir']['staging'] = f'{mount_path}/staging' # staging area for incremental files -config['dir']['zingg'] = f'{mount_path}/zingg' # zingg models and temp data - -# make sure directories exist -for dir in config['dir'].values(): - dbutils.fs.mkdirs(dir) - -# COMMAND ---------- - -# MAGIC %md The Zingg jobs will be separated into those that support the initial workflow and those that support the incremental workflow. While some jobs such as *zingg_initial_match* and *zingg_incremental_match* are fundamentally the same, we separate the jobs in this manner to simplify this deployment and to illustrate how you might support differing job configurations with each phase:

-# MAGIC -# MAGIC Initial -# MAGIC * zingg_initial_findTrainingData -# MAGIC * zingg_initial_train -# MAGIC * zingg_initial_match -# MAGIC -# MAGIC Incremental -# MAGIC * zingg_incremental_link -# MAGIC * zingg_incremental_match -# MAGIC -# MAGIC Please note that in order to trigger these jobs, calls to the REST API will need to supply a Personal Access Token with appropriate permissions. You will need to enter that PAT in the configuration below. More information on creating Personal Access Tokens can be found [here](https://docs.databricks.com/dev-tools/api/latest/authentication.html). - -# COMMAND ---------- - -# DBTITLE 1,Initialize Job Settings -# job names -config['job'] = {} -config['job']['initial'] = {} -config['job']['initial']['findTrainingData'] = 'zingg_initial_findTrainingData' -config['job']['initial']['train'] = 'zingg_initial_train' -config['job']['initial']['match'] = 'zingg_initial_match' -config['job']['incremental'] = {} -config['job']['incremental']['link'] = 'zingg_incremental_link' -config['job']['incremental']['match'] = 'zingg_incremental_match' - -# parameters used to setup job configurations -config['job']['config']={} -config['job']['config']['spark version'] = sc.getConf().get('spark.databricks.clusterUsageTags.effectiveSparkVersion') -config['job']['config']['node type id'] = sc.getConf().get('spark.databricks.clusterUsageTags.clusterNodeType') -config['job']['config']['driver node type id'] = sc.getConf().get('spark.databricks.clusterUsageTags.clusterNodeType') -config['job']['config']['num workers'] = 8 # feel free to adjust the cluster size here -config['job']['config']['num partitions'] = sc.defaultParallelism * 10 - -config['job']['zingg jar path'] = "dbfs:/tmp/solacc/customer_er/jar/zingg-0.3.3-SNAPSHOT/zingg-0.3.3-SNAPSHOT.jar" - -# settings to launch zingg jobs via rest api -config['job']['databricks workspace url'] = spark.sparkContext.getConf().get('spark.databricks.workspaceUrl') -config['job']['api token'] = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None) - -# COMMAND ---------- - -# MAGIC %md In addition, we will define a ZinggJob class which will make executing the workflow jobs via the REST API easier: - -# COMMAND ---------- - -# DBTITLE 1,Define ZinggJob Class -import requests -import json -import time - -class ZinggJob: - - name = None - id = None - url = None - _headers = None - - - def __init__(self, name, databricks_workspace_url, api_token): - - # attribute assignments - self.name = name - self.url = databricks_workspace_url - self._headers = {'Authorization': f'Bearer {api_token}', 'User-Agent':'zinggai_zingg'} - - # get job id (based on job name) - self.id = self._get_job_id() - if self.id is None: - self = None # invalidate self - raise ValueError(f"A job with the name '{name}' was not found. Please create the required jobs before attempting to proceed.") - - def _get_job_id(self): - - job_id = None - - # get list of jobs in databricks workspace - job_resp = requests.get(f'https://{self.url}/api/2.0/jobs/list', headers=self._headers) - - # Handle edge case where no jobs are present in the workspace, otherwise attempting to iterate over job_resp will throw an error - if len(job_resp.json()) == 0 or job_resp.json().get('jobs') is None: - return None - - # find job by name - for job in job_resp.json().get('jobs'): - if job.get('settings').get('name')==self.name: - job_id = job.get('job_id') - break - return job_id - - def run(self): - post_body = {'job_id': self.id} - run_resp = requests.post(f'https://{self.url}/api/2.0/jobs/run-now', json=post_body, headers=self._headers) - run_id = run_resp.json().get('run_id') - return run_id - - def wait_for_completion(self, run_id): - - # seconds to sleep between checks - sleep_seconds = 30 - start_time = time.time() - - # loop indefinitely - while True: - - # retrieve job info - resp = requests.get(f'https://{self.url}/api/2.0/jobs/runs/get?run_id={run_id}', headers=self._headers) - - #calculate elapsed seconds - elapsed_seconds = int(time.time()-start_time) - - # get job lfe cycle state - life_cycle_state = resp.json().get('state').get('life_cycle_state') - - # if terminated, then get result state & break loop - if life_cycle_state == 'TERMINATED': - result_state = resp.json().get('state').get('result_state') - break - - # else, report to user and sleep - else: - if elapsed_seconds > 0: - print(f'Job in {life_cycle_state} state at { elapsed_seconds } seconds since launch. Waiting {sleep_seconds} seconds before checking again.', end='\r') - - time.sleep(sleep_seconds) - - # return results - print(f'Job completed in {result_state} state after { elapsed_seconds } seconds. Please proceed with next steps to process the records identified by the job.') - print('\n') - - return result_state - - def run_and_wait(self): - return self.wait_for_completion(self.run()) - - -# COMMAND ---------- - -# MAGIC %md -# MAGIC -# MAGIC © 2022 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source). All included or referenced third party libraries are subject to the licenses set forth below. -# MAGIC -# MAGIC | library | description | license | source | -# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------| -# MAGIC | zingg | entity resolution library | GNU Affero General Public License v3.0 | https://github.com/zinggAI/zingg/ | -# MAGIC | tabulate | pretty-print tabular data in Python | MIT License | https://pypi.org/project/tabulate/ | -# MAGIC | filesplit | Python module that is capable of splitting files and merging it back | MIT License | https://pypi.org/project/filesplit/ | - -# COMMAND ---------- - - diff --git a/00.1_ Prepare Data.py b/00.1_ Prepare Data.py deleted file mode 100644 index cea1002..0000000 --- a/00.1_ Prepare Data.py +++ /dev/null @@ -1,206 +0,0 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC You may find this series of notebooks at https://github.com/databricks-industry-solutions/customer-er. For more information about this solution accelerator, visit https://www.databricks.com/solutions/accelerators/customer-entity-resolution. - -# COMMAND ---------- - -# MAGIC %md The purpose of this notebook is to setup the data assets needed by the remaining notebooks in the customer entity-resolution solution accelerator. - -# COMMAND ---------- - -# MAGIC %md ## Introduction -# MAGIC -# MAGIC For this solution accelerator, we'll make use of the [North Carolina Voters 5M](https://dbs.uni-leipzig.de/research/projects/object_matching/benchmark_datasets_for_entity_resolution) dataset made available by the [Database Group Leipzig](https://dbs.uni-leipzig.de/en). This dataset, more fully documented in [this paper](https://dbs.uni-leipzig.de/file/famer-adbis2017.pdf), contains name and limited address information for several million registered voters within the state of North Carolina. There are purposefully duplicate records inserted in the set with specific adjustments to make them fuzzy matchable bringing the total number of records in the dataset to around 5-million, hence the name of the dataset. -# MAGIC -# MAGIC The dataset is made available for download as a gzipped TAR file which needs to be downloaded, unzipped, untarred and uploaded to a folder named *downloads* under a [mount point](https://docs.databricks.com/data/databricks-file-system.html#mount-object-storage-to-dbfs) in your environment before running the remainder of these notebooks. In our environment, we've used a default name of */mnt/ncvoters* for our mount point. You can alter this in the *ER Setup 00* notebook if you've elected to create a mount point under a different name. - -# COMMAND ---------- - -# DBTITLE 1,Install Required Libraries -# MAGIC %pip install filesplit - -# COMMAND ---------- - -# DBTITLE 1,Get Config -# MAGIC %run "./00.0_ Intro & Config" - -# COMMAND ---------- - -# DBTITLE 1,Download data and Zingg jar -# MAGIC %run "./config/setup" - -# COMMAND ---------- - -# DBTITLE 1,Import Required Libraries -import pyspark.sql.functions as f - -from filesplit.split import Split - -# COMMAND ---------- - -# MAGIC %md ## Step 1: Reset the Environment -# MAGIC -# MAGIC Zingg depends on having just the right data in the right place. To ensure we maintain a clean environment, we'll reset all the directories housing input, output and transient data. In most environments, such a step should not be necessary. This is just a precaution: - -# COMMAND ---------- - -# DBTITLE 1,Reset the Data Environment -dbutils.fs.rm(config['dir']['input'], recurse=True) -dbutils.fs.rm(config['dir']['output'], recurse=True) -dbutils.fs.rm(config['dir']['staging'], recurse=True) - -# COMMAND ---------- - -# MAGIC %md ## Step 2: Separate Data into Initial and Incremental Sets -# MAGIC -# MAGIC The North Carolina Voters 5M dataset consists of 5 files containing roughly 1-million records each. We will use the data in the first 4 files as our initial dataset and then split the 5th file into incremental sets of approximately 10,000 records each: - -# COMMAND ---------- - -# DBTITLE 1,Verify File Count -# count the files in the downloads directory -file_count = len(dbutils.fs.ls(config['dir']['downloads'])) - -print('Expecting 5 files in {0}'.format(config['dir']['downloads'])) -print('Found {0} files in {1}'.format(file_count, config['dir']['downloads'])) - -# COMMAND ---------- - -# DBTITLE 1,Move Raw Inputs into Initial & Incremental Folders -# function to help with file copy -def copy_file_with_overwrite(from_file_path, to_file_path): - - # remove to-file if already exists - try: - dbutils.fs.rm(to_file_path) - except: - pass - - # copy from-file to intended destination - dbutils.fs.cp(from_file_path, to_file_path) - - -# for each file in downloaded dataset -for file in dbutils.fs.ls(config['dir']['downloads']): - - # determine file number (ncvr_numrec_1000000_modrec_2_ocp_20_myp__nump_5.csv) - file_num = int(file.name.split('_')[-3]) - - # if 0 - 3: copy to initial folder - if file_num < 4: - copy_file_with_overwrite(file.path, config['dir']['input'] + '/initial/' + file.name) - - # if 4: split into files with n customers at time copy to incremental folder - elif file_num == 4: - - dbutils.fs.mkdirs(config['dir']['staging']) - - # split file into files of 10,000 records each - split = Split('/'+ file.path.replace(':',''), '/dbfs' + config['dir']['staging']) - split.bylinecount(10000, includeheader=True) - - # cleanup manifest file created by Split - dbutils.fs.rm(config['dir']['staging']+'/manifest') - -# COMMAND ---------- - -# DBTITLE 1,Verify Initial Dataset Has 4 Files -display( - dbutils.fs.ls(config['dir']['input'] + '/initial') - ) - -# COMMAND ---------- - -# DBTITLE 1,Verify Staging Area for Incremental Data Is Populated -display( - dbutils.fs.ls(config['dir']['staging']) - ) - -# COMMAND ---------- - -# MAGIC %md ## Step 2: Examine the Data -# MAGIC -# MAGIC To get a sense of the data, we'll examine the records in the original dataset: - -# COMMAND ---------- - -# DBTITLE 1,Access Initial Data Set -data = ( - spark - .read - .csv( - path=config['dir']['downloads'], - sep=',', - header=True, - inferSchema=True - ) - .createOrReplaceTempView('ncvoters') - ) - -display(spark.table('ncvoters')) - -# COMMAND ---------- - -# MAGIC %md In the dataset, voters are identified based on the following fields:

-# MAGIC -# MAGIC * givenname -# MAGIC * surname -# MAGIC * suburb -# MAGIC * postcode -# MAGIC -# MAGIC A unique identifier, *recid*, was used in the original dataset to identify unique records. -# MAGIC -# MAGIC To create duplicates in the dataset, the team responsible for creating it simply re-inserted some number of the rows back into it without any modifications. Another set of duplicates was created by re-inserting rows while *corrupting* one or multiple of the 4 fields identified above. Corruptions take the form of the removal, replacement or reversal of some number or characters from within a string as would be typical of a poor data entry process. These duplicates are identifiable by their duplicate *recid* values: - -# COMMAND ---------- - -# DBTITLE 1,Identify Author-Generated Duplicates -# MAGIC %sql -# MAGIC -# MAGIC SELECT * -# MAGIC FROM ncvoters -# MAGIC WHERE recid IN ( -# MAGIC SELECT recid -- duplicate recid values -# MAGIC FROM ncvoters a -# MAGIC GROUP BY recid -# MAGIC HAVING COUNT(*) > 1 -# MAGIC ) -# MAGIC ORDER BY recid - -# COMMAND ---------- - -# MAGIC %md Still other duplicates are naturally occuring in the dataset. With a dataset of this size, it's not unexpected that some errors were not caught following data entry. For example, consider these records which appear to be exact duplicates but which have separate *recid* values. -# MAGIC -# MAGIC It is possible that two individuals within a given zip code have the same first and last name so that some of these records only appear to be duplicates given the lack of additional identifying data in the dataset. However, the uniqueness of some of these names would indicate that some are true duplicates in the original dataset: - -# COMMAND ---------- - -# DBTITLE 1,Identify Apparent Duplicates In Original Dataset -# MAGIC %sql -# MAGIC -# MAGIC SELECT -# MAGIC a.* -# MAGIC FROM ncvoters a -# MAGIC INNER JOIN ncvoters b -# MAGIC ON a.givenname=b.givenname AND a.surname=b.surname AND a.suburb=b.suburb AND a.postcode=b.postcode -# MAGIC WHERE a.recid != b.recid -# MAGIC UNION -# MAGIC SELECT -# MAGIC b.* -# MAGIC FROM ncvoters a -# MAGIC INNER JOIN ncvoters b -# MAGIC ON a.givenname=b.givenname AND a.surname=b.surname AND a.suburb=b.suburb AND a.postcode=b.postcode -# MAGIC WHERE a.recid != b.recid -# MAGIC ORDER BY givenname, surname, suburb, postcode - -# COMMAND ---------- - -# MAGIC %md -# MAGIC -# MAGIC © 2022 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source). All included or referenced third party libraries are subject to the licenses set forth below. -# MAGIC -# MAGIC | library | description | license | source | -# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------| -# MAGIC | zingg | entity resolution library | GNU Affero General Public License v3.0 | https://github.com/zinggAI/zingg/ | -# MAGIC | tabulate | pretty-print tabular data in Python | MIT License | https://pypi.org/project/tabulate/ | -# MAGIC | filesplit | Python module that is capable of splitting files and merging it back | MIT License | https://pypi.org/project/filesplit/ | diff --git a/00.2_ Prepare Jobs.py b/00.2_ Prepare Jobs.py deleted file mode 100644 index 2a75ff6..0000000 --- a/00.2_ Prepare Jobs.py +++ /dev/null @@ -1,412 +0,0 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC You may find this series of notebooks at https://github.com/databricks-industry-solutions/customer-er. For more information about this solution accelerator, visit https://www.databricks.com/solutions/accelerators/customer-entity-resolution. - -# COMMAND ---------- - -# MAGIC %md The purpose of this notebook is to setup the jobs needed by the remaining notebooks in the customer entity-resolution solution accelerator. - -# COMMAND ---------- - -# MAGIC %md ## Introduction -# MAGIC -# MAGIC In this notebook, we will be setting up the jobs that will be used to implement each of the two-phases of the Zingg workflow. Each job is a [Spark Submit job](https://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit) referencing the Zingg JAR installed as described in the *ER Setup 00* notebook. Each will have access to its own [configuration file](https://docs.zingg.ai/zingg/stepbystep/configuration) based on specs provided by Zingg. - -# COMMAND ---------- - -# DBTITLE 1,Get Config -# MAGIC %run "./00.0_ Intro & Config" - -# COMMAND ---------- - -# DBTITLE 1,Import Required Libraries -import requests, json, time -from copy import deepcopy - -from pprint import PrettyPrinter -pp = PrettyPrinter() - -# COMMAND ---------- - -# DBTITLE 1,Remove Any Prior Config Files -dbutils.fs.rm( config['dir']['config'], recurse=True) - -# COMMAND ---------- - -# MAGIC %md ## Step 1: Verify Zingg JAR Installed -# MAGIC -# MAGIC Before proceeding with job setup, be sure to have downloaded the Zingg JAR as described in *ER Setup 00*. Here, we will quickly verify it's location: - -# COMMAND ---------- - -# DBTITLE 1,Verify the Zingg JAR Installed -display(dbutils.fs.ls(config['job']['zingg jar path'])) - -# COMMAND ---------- - -# MAGIC %md ## Step 2: Build Config Files -# MAGIC -# MAGIC Each Zingg job makes use of a config file. The first element of the configuration file that we will address will be the *data* element that defines how input files will be read and fields in those files interpreted. To understand the attributes we will specify for this element, it helps to examine one of the input files: - -# COMMAND ---------- - -# DBTITLE 1,Examine Input Data File -file_name = dbutils.fs.ls(config['dir']['input'] + '/initial')[0].path - -pp.pprint( dbutils.fs.head(file_name, 500) ) - -# COMMAND ---------- - -# MAGIC %md Our input files are a simple comma-separated value (CSV) file with fields of:

-# MAGIC -# MAGIC * recid - the unique identifier for a voter on the voter registration system -# MAGIC * givenname - the given (first) name associated with a voter -# MAGIC * surname - the family (last) name associated with a voter -# MAGIC * suburb - the city within which the voter lives -# MAGIC * postcode - the postal code within which the voter lives -# MAGIC -# MAGIC Leveraging this information, we might define our input files as follows: -# MAGIC -# MAGIC **NOTE** The schema information associated with the input files is encoded as a multi-line string that will be evaluated to a dictionary by Zingg during job execution. - -# COMMAND ---------- - -# DBTITLE 1,Define Inputs -data = [ # defined as a list as we may have multiple sets of input data - { - 'name':'input', # name you assign to the dataset - 'format':'csv', # format of the dataset: csv or parquet - 'props': { # various properties associated with the file - 'delimiter':',', # comma delimiter - 'header':'true', # has a header - 'location':config['dir']['input'] # path to folder holding files - }, - 'schema': # schema to apply to the data when read - """{ - 'type': 'struct', - 'fields':[ - {'name':'recid', 'type':'integer', 'nullable': true, 'metadata': {}}, - {'name':'givenname', 'type':'string', 'nullable': true, 'metadata': {}}, - {'name':'surname', 'type':'string', 'nullable': true, 'metadata': {}}, - {'name':'suburb', 'type':'string', 'nullable': true, 'metadata': {}}, - {'name':'postcode', 'type':'string', 'nullable': true, 'metadata': {}} - ] - }""" - } - ] - -# COMMAND ---------- - -# MAGIC %md You may have noticed that we've defined the input folder path as the top-level input folder. This is something we will adjust below based on the needs of the specific job. The same goes for the output folder location in the output configuration: - -# COMMAND ---------- - -# DBTITLE 1,Define Outputs -output = [ - { - 'name':'output', # name to assign to data outputs - 'format':'parquet', # format to employ with data outputs: csv or parquet - 'props':{ # output file properties - 'location':config['dir']['output'], # folder where to deposit outputs - 'overwrite':'true' # overwrite any existing data in output dir - } - } - ] - -# COMMAND ---------- - -# MAGIC %md Now that we have the basic configuration inputs and outputs defined, let's specify how incoming fields will be matched against other records and the output they will produce. These input-output field mappings are captured in the *fieldDefinition* element. -# MAGIC -# MAGIC Our incoming dataset has *recid*, *givenname*, *surname*, *suburb* and *postcode* fields. As explained in the last notebook, the *recid* is a unique identifier left over from the original dataset from which this dataset was created. Duplicate entries inserted by the dataset's authors can be identified through matching *recid* values (though there appear to be duplicates also in the original dataset). -# MAGIC -# MAGIC We won't use the *recid* ID to match records and instead will focus on voters' names and place of residence. Supported match types are *FUZZY*, *EXACT*, *PINCODE*, *EMAIL*, *TEXT*, *NUMERIC*, and [many others](https://github.com/zinggAI/zingg/blob/031ed56945e8112c4c772b122cb0c40c67e59662/client/src/main/java/zingg/client/MatchType.java). A match type of *DONT USE* will cause Zingg to ignore a field for matching purposes: -# MAGIC -# MAGIC **NOTE** *fieldName* refers to the input field as defined in the *data* element and *fields* and *dataType* refer to the field to be produced as part of the defined *output*: - -# COMMAND ---------- - -# DBTITLE 1,Define Field Mappings -fieldDefinition = [ - {'fieldName':'recid', 'matchType':'DONT USE', 'fields':'recid', 'dataType':'"integer"'}, - {'fieldName':'givenname', 'matchType':'FUZZY', 'fields':'first_name', 'dataType':'"string"'}, - {'fieldName':'surname', 'matchType':'FUZZY', 'fields':'last_name', 'dataType':'"string"'}, - {'fieldName':'suburb', 'matchType':'FUZZY', 'fields':'city', 'dataType':'"string"'}, - {'fieldName':'postcode', 'matchType':'FUZZY', 'fields':'zipcode', 'dataType':'"string"'} - ] - -# COMMAND ---------- - -# MAGIC %md And now we can put the configuration data together, incorporating various elements that will control the Zingg job. These elements include the *labelDataSampleSize* which controls the random sampling rate for training the Zingg model, the *numPartitions* which controls the degree of distribution to apply to the Zingg job, and the *modelId* which assigns a name to the Zingg model: -# MAGIC -# MAGIC **NOTE** We are defining the number of partitions to use based on the size of the current cluster. If you adjust the cluster specs of your jobs, you may want to adjust the *numPartitions* configuration setting to align with your cluster size in order to maximize processing efficiency. - -# COMMAND ---------- - -# DBTITLE 1,Assemble Base Config File -job_config = { - 'labelDataSampleSize':0.05, # fraction of records to sample during model training exercises - 'numPartitions':config['job']['config']['num partitions'], # number of partitions against which to scale out data processing & model training - 'modelId': config['model name'], # friendly name of model to be produced - 'zinggDir': config['dir']['zingg'], # folder within which to persist the model between steps - 'data': data, # the input data - 'output': output, # the output data - 'fieldDefinition': fieldDefinition # the input-output field mappings - } - -# COMMAND ---------- - -# MAGIC %md The config file definition we've assembled can now be adjusted to address the slightly different needs of our initial and incremental data processing steps. For the initial steps, *i.e.* the steps of identifying candidate pairs to label and then training a model, data output will be sent to an *initial* output folder. For the incremental step, *i.e.* the step by which incoming data is linked to previously processed data, data output will be sent to an *incremental* output folder. In addition, the incremental step will read both initial and incremental data files which have the same structure but which are found in different folder locations: - -# COMMAND ---------- - -# DBTITLE 1,Define Config for Initial Steps -# copy job config dictionary so that changes don't impact the base config -initial_job_config = deepcopy(job_config) - -# adjust input settings -initial_job_config['data'][0]['name'] = initial_job_config['data'][0]['name'] + '_initial' -initial_job_config['data'][0]['props']['location'] = initial_job_config['data'][0]['props']['location'] + '/initial' - -# adjust output settings -initial_job_config['output'][0]['name'] = initial_job_config['output'][0]['name'] + '_initial' -initial_job_config['output'][0]['props']['location'] = initial_job_config['output'][0]['props']['location'] + '/initial' - -# display config -pp.pprint(initial_job_config) - -# COMMAND ---------- - -# DBTITLE 1,Define Config for Incremental Match -# copy job config dictionary so that changes don't impact the base config -incremental_match_job_config = deepcopy(job_config) - -# adjust input settings -incremental_match_job_config['data'][0]['name'] = incremental_match_job_config['data'][0]['name'] + '_incremental_match' -incremental_match_job_config['data'][0]['props']['location'] = incremental_match_job_config['data'][0]['props']['location'] + '/incremental/incoming' - -# adjust output settings -incremental_match_job_config['output'][0]['name'] = incremental_match_job_config['output'][0]['name'] + '_incremental_match' -incremental_match_job_config['output'][0]['props']['location'] = incremental_match_job_config['output'][0]['props']['location'] + '/incremental/match' - -# display config -pp.pprint(incremental_match_job_config) - -# COMMAND ---------- - -# DBTITLE 1,Define Config for Incremental Link -# copy job config so that changes don't impact the base config -incremental_link_job_config = deepcopy(job_config) - -# adjust input settings to support two inputs -incremental_link_job_config['data'] += deepcopy(incremental_link_job_config['data']) - -# first input is priors -incremental_link_job_config['data'][0]['name'] = incremental_link_job_config['data'][0]['name'] + '_incremental_link_prior' -incremental_link_job_config['data'][0]['props']['location'] = incremental_link_job_config['data'][0]['props']['location'] + '/incremental/prior' - -# second input is (new) incoming -incremental_link_job_config['data'][1]['name'] = incremental_link_job_config['data'][1]['name'] + '_incremental_link_incoming' -incremental_link_job_config['data'][1]['props']['location'] = incremental_link_job_config['data'][1]['props']['location'] + '/incremental/incoming' - -# adjust output settings -incremental_link_job_config['output'][0]['name'] = incremental_link_job_config['output'][0]['name'] + '_incremental_link' -incremental_link_job_config['output'][0]['props']['location'] = incremental_link_job_config['output'][0]['props']['location'] + '/incremental/link' - -# display incremental config -pp.pprint(incremental_link_job_config) - -# COMMAND ---------- - -# MAGIC %md And now we can save our config data to actual file outputs: - -# COMMAND ---------- - -# DBTITLE 1,Write Configs to Files -dbutils.fs.mkdirs(config['dir']['config']) - -# define function to facilitate creation of json files -def write_config_file(config, local_file_path): - with open(local_file_path, 'w') as fp: - fp.write(json.dumps(config).replace("'", '\\"')) - -# write config output -write_config_file(initial_job_config, '/dbfs'+ config['dir']['config'] + '/initial.json') -write_config_file(incremental_match_job_config, '/dbfs'+ config['dir']['config'] + '/incremental_match.json') -write_config_file(incremental_link_job_config, '/dbfs'+ config['dir']['config'] + '/incremental_link.json') - -display(dbutils.fs.ls(config['dir']['config'])) - -# COMMAND ---------- - -# MAGIC %md ##Step 3: Setup Jobs -# MAGIC -# MAGIC With the configuration files in place, we can now define the workflows (*jobs*) that will execute the Zingg logic. Each job will run on a dedicated jobs cluster and will be governed by a set of parameters defined as follows: -# MAGIC -# MAGIC **NOTE** Some elements of the job specification are specific to the cloud environment on which you are running. The easiest way to identify which elements are cloud-specific and which settings you may prefer to assign to each is to [manually create a temporary job](https://docs.databricks.com/data-engineering/jobs/jobs.html) and then review its JSON definition. - -# COMMAND ---------- - -# DBTITLE 1,Define Generic Job Specification -job_spec = { - 'new_cluster': { - 'spark_version': config['job']['config']['spark version'], - 'spark_conf': { - 'spark.databricks.delta.preview.enabled': 'true' - }, - 'node_type_id': config['job']['config']['node type id'], - 'spark_env_vars': { - 'PYSPARK_PYTHON': '/databricks/python3/bin/python3' - }, - 'enable_elastic_disk': 'true', - 'num_workers': int(config['job']['config']['num workers']) - }, - 'timeout_seconds': 0, - 'email_notifications': {}, - 'name': 'find_training_job', - 'max_concurrent_runs': 1 - } - -# COMMAND ---------- - -# MAGIC %md We can now complete the job spec definition for each of our three jobs. These jobs will run the Zingg library as a [Spark Submit job](https://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit). The parameters submitted with this job are captured as strings within a list as follows: - -# COMMAND ---------- - -# DBTITLE 1,Initial Find Training Data Job Spec -initial_findTraining_jobspec = deepcopy(job_spec) - -initial_findTraining_jobspec['name'] = config['job']['initial']['findTrainingData'] - -initial_findTraining_jobspec['spark_submit_task'] = { - 'parameters': [ - '--class', 'zingg.client.Client', # class within the zingg library (jar) to employ - config['job']['zingg jar path'], # path to zingg jar file - '--phase=findTrainingData', # job phase (to be employed by zingg) - '--conf={0}'.format('/dbfs' + config['dir']['config'] + '/initial.json'), # local path to zingg conf file - '--license=abc' # license to associate with zingg - ] - } - -# COMMAND ---------- - -# DBTITLE 1,Initial Train Job Spec -initial_train_jobspec = deepcopy(job_spec) - -initial_train_jobspec['name'] = config['job']['initial']['train'] - -initial_train_jobspec['spark_submit_task'] = { - 'parameters': [ - '--class', - 'zingg.client.Client', - config['job']['zingg jar path'], - '--phase=train', - '--conf={0}'.format('/dbfs' + config['dir']['config'] + '/initial.json'), - '--license=abc' - ] - } - -# COMMAND ---------- - -# DBTITLE 1,Initial Match Job Spec -initial_match_jobspec = deepcopy(job_spec) - -initial_match_jobspec['name'] = config['job']['initial']['match'] - -initial_match_jobspec['spark_submit_task'] = { - 'parameters': [ - '--class', - 'zingg.client.Client', - config['job']['zingg jar path'], - '--phase=match', - '--conf={0}'.format('/dbfs' + config['dir']['config'] + '/initial.json'), - '--license=abc' - ] - } - -# COMMAND ---------- - -# DBTITLE 1,Incremental Link Job Spec -incremental_link_jobspec = deepcopy(job_spec) - -incremental_link_jobspec['name'] = config['job']['incremental']['link'] - -incremental_link_jobspec['spark_submit_task'] = { - 'parameters': [ - '--class', - 'zingg.client.Client', - config['job']['zingg jar path'], - '--phase=link', - '--conf={0}'.format('/dbfs' + config['dir']['config'] + '/incremental_link.json'), - '--license=abc' - ] - } - -# COMMAND ---------- - -# DBTITLE 1,Incremental Match Job Spec -incremental_match_jobspec = deepcopy(job_spec) - -incremental_match_jobspec['name'] = config['job']['incremental']['match'] - -incremental_match_jobspec['spark_submit_task'] = { - 'parameters': [ - '--class', - 'zingg.client.Client', - config['job']['zingg jar path'], - '--phase=match', - '--conf={0}'.format('/dbfs' + config['dir']['config'] + '/incremental_match.json'), - '--license=abc' - ] - } - -# COMMAND ---------- - -# MAGIC %md Now that we have the job specs defined, we can create the associated jobs as follows: - -# COMMAND ---------- - -# DBTITLE 1,Create the Jobs -job_create_url = 'https://{0}/api/2.1/jobs/create'.format(config['job']['databricks workspace url']) -job_update_url = 'https://{0}/api/2.1/jobs/reset'.format(config['job']['databricks workspace url']) -headers = {"Authorization":"Bearer {0}".format(config['job']['api token'])} - -for spec in [ - initial_findTraining_jobspec, - initial_train_jobspec, - initial_match_jobspec, - incremental_link_jobspec, - incremental_match_jobspec - ]: - - # find job with this name - try: - job = ZinggJob(spec['name'], config['job']['databricks workspace url'], config['job']['api token']) - except ValueError: - pass - job = None - - # create or update job: - if job is None: - # create new job - resp = requests.post(job_create_url, headers=headers, json=spec) - else: - # update the job with new settings - resp = requests.post(job_update_url, headers=headers, json={'job_id':job.id, 'new_settings': spec}) - - # get results of create/update - try: - resp.raise_for_status() - except: - print(resp.text) - -# COMMAND ---------- - -# MAGIC %md -# MAGIC -# MAGIC © 2022 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source). All included or referenced third party libraries are subject to the licenses set forth below. -# MAGIC -# MAGIC | library | description | license | source | -# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------| -# MAGIC | zingg | entity resolution library | GNU Affero General Public License v3.0 | https://github.com/zinggAI/zingg/ | -# MAGIC | tabulate | pretty-print tabular data in Python | MIT License | https://pypi.org/project/tabulate/ | -# MAGIC | filesplit | Python module that is capable of splitting files and merging it back | MIT License | https://pypi.org/project/filesplit/ | diff --git a/00_Intro & Config.py b/00_Intro & Config.py new file mode 100644 index 0000000..541e0af --- /dev/null +++ b/00_Intro & Config.py @@ -0,0 +1,119 @@ +# Databricks notebook source +# MAGIC %md The purpose of this notebook is provided an overview of the Zingg Person Entity-Resolution solution accelerator and to set the configuration values to be used throughout the remaining notebooks. This notebook is available on https://github.com/databricks-industry-solutions/customer-er. + +# COMMAND ---------- + +# MAGIC %md ## Introduction +# MAGIC +# MAGIC The process of matching records related to key business concepts (such as customers, products, *etc.*) to one another is known as entity-resolution. When dealing with entities such as persons, the process often requires the comparison of name and address information which is subject to inconsistencies and errors. In this scenario, we often rely on probabilistic (*fuzzy*) matching techniques that identify likely matches based on degrees of similarity between these attributes. +# MAGIC +# MAGIC There are a wide range of techniques which can be employed to perform such matching. The challenge is not just to identify which of these techniques provide the best matches but how to compare one record to all the other records that make up the dataset in an efficient manner. Data Scientists specializing in entity-resolution often employ *blocking* techniques that limit which records are similar enough to one another to warrent a more detailed evaluation. As a result entity-resolution problems require familiarity with a breadth of techniques coupled with some domain knowledge, making this a challenge space for most non-specialists to enter. +# MAGIC +# MAGIC Fortunately, the [Zingg](https://www.zingg.ai/) library encapsulates these techniques, allowing Data Scientists and Engineers with limited experience to quickly make use of industry-recognized best practices and techniques for entity-resolution. When run on Databricks, Zingg can tap into the scalabilty of the platform to make relatively quick work of large data matching workloads. +# MAGIC +# MAGIC + +# COMMAND ---------- + +# MAGIC %md ###Understanding Zingg +# MAGIC +# MAGIC To build a Zingg-enabled application, it's easiest to think of Zingg as being deployed in two phases. In the first phase that we will refer to as the initial workflow, candidate pairs of potential duplicates are extracted from an initial dataset and labeled by expert users. These labeled pairs are then used to train a model capable of scoring likely matches. +# MAGIC +# MAGIC In the second phase that we will refer to as the incremental workflow, the trained model is applied to newly arrived data. Those data are compared to data processed in prior runs to identify likely matches between in incoming and previously processed dataset. The application engineer is responsible for how matched and unmatched data will be handled, but typically information about groups of matching records are updated with each incremental run to identify all the record variations believed to represent the same entity. +# MAGIC +# MAGIC The initial workflow must be run before we can proceed with the incremental workflow. The incremental workflow is run whenever new data arrive that require linking and deduplication. If we feel that the model could be improved through additional training, we can perform additional cycles of record labeling by rerunning the initial workflow. The retrained model will then be picked up in the next incremental run.

+# MAGIC +# MAGIC A typical entity-resolution application will provide a nice UI for end-user interactions with the data and an accesible database from which downstream applications can access deduplicated data. Our focus here is on the backend processes triggered by those user interactions. We will use [ipywidgets](https://ipywidgets.readthedocs.io/) to enable some limited UI-capabilities, but in a real-world deployment, you should work with an application developer to provide user experience better aligned with the needs of a business-aligned user. + +# COMMAND ---------- + +# MAGIC %md ### Installing Zingg +# MAGIC +# MAGIC To leverage Zingg, we'll need to first install the Zingg JAR file as a *workspace library* on our cluster. To do this, please follow these steps: +# MAGIC

+# MAGIC +# MAGIC 1. Navigate to the releases associated with the [Zingg GitHub repo](https://github.com/zinggAI/zingg/releases) +# MAGIC 2. Click on *Releases* (located on the right-hand side of repository page) +# MAGIC 3. Locate the latest release for your version of Spark (which was *zingg-0.3.4-SNAPSHOT-spark-3.1.2* at the time this notebook was written) +# MAGIC 4. Download the compiled, gzipped *tar* file (found under the *Assets* heading) to your local machine +# MAGIC 5. Unzip the *tar.gz* file and retrieve the *jar* file +# MAGIC 6. Use the file to create a JAR-based library in your Databricks workspace following [these steps](https://docs.databricks.com/libraries/workspace-libraries.html) +# MAGIC +# MAGIC Alternatively you can run the **./RUNME** notebook and use the Workflow and Cluster created in that notebook to run this accelerator. The **RUNME** notebook automated the download, extraction and installation of the Zingg jar. +# MAGIC +# MAGIC At the top of each notebook in this accelerator, we will verify the JAR file has been properly installed just to make sure you don't encounter unexplained errors later in the code. You will also see where we perform a *pip install* of the Zingg Python library. This serves as a wrapper for the JAR which makes triggering Zingg jobs easier in Python. + +# COMMAND ---------- + +# MAGIC %md ### Solution Accelerator Assets +# MAGIC +# MAGIC This accelerator is divided into a series of notebooks. The role of these in the solution accelerator is as follows

+# MAGIC +# MAGIC * **00 Intro & Config** - implements configuration values used throughout the other notebooks and provides an overview of the solution accelerator. +# MAGIC * **01 Data Prep** - setups the data assets used in the solution accelerator. +# MAGIC * **02 Initial Workflow** - implements the process of identifying candidate pairs and assigning labels to them. From the labeled pairs, a model is trained and database structures are initialized. +# MAGIC * **03 Incremental Workflow** - implements the process of incrementally updating the database based on newly arriving records. + +# COMMAND ---------- + +# MAGIC %md ## Configuration +# MAGIC +# MAGIC To enable consistent settings across the notebooks in this accelerator, we establish the following configuration settings: + +# COMMAND ---------- + +# DBTITLE 1,Initialize Configuration Variable +if 'config' not in locals(): + config = {} + +# COMMAND ---------- + +# DBTITLE 1,Database +# set database name +config['database name'] = 'zingg_ncvoters' + +# create database to house mappings +_ = spark.sql('CREATE DATABASE IF NOT EXISTS {0}'.format(config['database name'])) + +# set database as default for queries +_ = spark.catalog.setCurrentDatabase(config['database name'] ) + +# COMMAND ---------- + +# DBTITLE 1,Zingg Model +config['model name'] = 'zingg_ncvoters' + +# COMMAND ---------- + +# MAGIC %md The Zingg workflow depends on access to various folder locations where the trained model and intermediary assets can be placed between various steps. The purpose of these locations will be explained in the subsequent notebooks: + +# COMMAND ---------- + +# DBTITLE 1,Directories +# path where files are stored +mount_path = '/tmp/zingg_ncvoters' + +config['dir'] = {} + +# folder locations where you place your data +config['dir']['downloads'] = f'{mount_path}/downloads' # original unzipped data files that you will upload to the environment +config['dir']['input'] = f'{mount_path}/inputs' # folder where downloaded files will be seperated into initial and incremental data assets +config['dir']['tables'] = f'{mount_path}/tables' # location where external tables based on the data files will house data + +# folder locations Zingg writes data +config['dir']['zingg'] = f'{mount_path}/zingg' # zingg models and temp data +config['dir']['output'] = f'{mount_path}/output' + +# make sure directories exist +for dir in config['dir'].values(): + dbutils.fs.mkdirs(dir) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC +# MAGIC © 2023 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source). All included or referenced third party libraries are subject to the licenses set forth below. +# MAGIC +# MAGIC | library | description | license | source | +# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------| +# MAGIC | zingg | entity resolution library | GNU Affero General Public License v3.0 | https://github.com/zinggAI/zingg/ | diff --git a/01_ Initial Workflow.py b/01_ Initial Workflow.py deleted file mode 100644 index 6969efc..0000000 --- a/01_ Initial Workflow.py +++ /dev/null @@ -1,559 +0,0 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC You may find this series of notebooks at https://github.com/databricks-industry-solutions/customer-er. For more information about this solution accelerator, visit https://www.databricks.com/solutions/accelerators/customer-entity-resolution. - -# COMMAND ---------- - -# MAGIC %md The purpose of this notebook is to demonstrate the initial workflow by which Zingg trains a model to be used for (later) incremental data processing. - -# COMMAND ---------- - -# MAGIC %md ## Introduction -# MAGIC -# MAGIC The initial Zingg workflow consists of two primary steps plus one additional step that is often performed to initialize the environment. These three steps are:

-# MAGIC -# MAGIC 1. Label Training Data -# MAGIC 2. Train Model on Labeled Data -# MAGIC 3. Perform Initial Deduplication -# MAGIC -# MAGIC The end result of this workflow is a trained model and tables capturing which records in the initial dataset match with each other. -# MAGIC -# MAGIC Each of the three steps is facilitated by a separate job that was setup in the *ER Setup 02* notebook. If you haven't successfully run that notebook (as well as the *ER Setup 01* notebook), please do so before proceeding with this one. - -# COMMAND ---------- - -# DBTITLE 1,Install Required LIbraries -# MAGIC %pip install tabulate - -# COMMAND ---------- - -# DBTITLE 1,Initialize Config -# MAGIC %run "./00.0_ Intro & Config" - -# COMMAND ---------- - -# DBTITLE 1,Set Additional Configurations -# folders housing candidate pairs, labeled (marked) and unlabeled (unmarked) -MARKED_DIR = config['dir']['zingg'] + '/' + config['model name'] + '/trainingData/marked' -UNMARKED_DIR = config['dir']['zingg'] + '/' + config['model name'] + '/trainingData/unmarked' -OUTPUT_DIR = config['dir']['output'] + '/initial' - -# COMMAND ---------- - -# DBTITLE 1,Import Required Libraries -import pandas as pd -import numpy as np - -import time -import uuid - -from tabulate import tabulate - -# COMMAND ---------- - -# MAGIC %md ##Step 0: Setup Helper Functions & Widget -# MAGIC -# MAGIC As mentioned in the *ER Setup 00* notebook, Zingg provides building block components for the construction of an entity-resolution workflow application. To keep things simple, we will attempt to emulate an application workflow from within this notebook, recognizing that most applications would provide users with a specialized client UI for the labeling and data interpretation work performed here. -# MAGIC -# MAGIC To assist us in implementing an in-notebook experience, we'll define a few helper functions now: - -# COMMAND ---------- - -# DBTITLE 1,Reset Label Stage -def reset_labels(reset_model=True): - ''' - The purpose of this function is to reset the labeled data record in - prior iterations of the label stage. Because the Zingg model trained - on these data is invalidated by this step, the reset_model argument - instructs the function to delete the model information as well. - ''' - - # drop marked (labeled) data - dbutils.fs.rm(MARKED_DIR, recurse=True) - - # drop unmarked data - dbutils.fs.rm(UNMARKED_DIR, recurse=True) - - if reset_model: - dbutils.fs.rm('{0}/{1}'.format(config['dir']['zingg'], config['model name']), recurse=True) - dbutils.fs.rm('{0}'.format(config['dir']['output']), recurse=True) - - return - -# COMMAND ---------- - -# DBTITLE 1,Generate Candidate Pairs -# run the find training job -def run_find_training_job(): - ''' - The purpose of this function is to run the Zingg findTraining job that generates - candidate pairs from the initial set of data specified in the job's configuration - ''' - - # identify the find training job - find_training_job = ZinggJob( config['job']['initial']['findTrainingData'], config['job']['databricks workspace url'], config['job']['api token']) - - # run the job and wait for its completion - find_training_job.run_and_wait() - - return - - -# retrieve candidate pairs -def get_candidate_pairs(): - ''' - The purpose of this function is to retrieve candidate pairs that need labeling. - The function compares the content of the unmarked folder within which the Zingg - findTraining job deposits candidate paris with those of the marked folder where - we persist labeled pairs so that no previously labeled pairs are returned. - ''' - unmarked_pd = pd.DataFrame({'z_cluster':[]}) - marked_pd = pd.DataFrame({'z_cluster':[]}) - - # read unmarked pairs - try: - tmp_pd = pd.read_parquet( - '/dbfs'+ UNMARKED_DIR, - engine='pyarrow' - ) - if tmp_pd.shape[0] != 0: unmarked_pd = tmp_pd - except: - pass - - # read marked pairs - try: - tmp_pd = pd.read_parquet( - '/dbfs'+ MARKED_DIR, - engine='pyarrow' - ) - if tmp_pd.shape[0] != 0: marked_pd = tmp_pd - except: - pass - - # get unmarked not in marked - candidate_pairs_pd = unmarked_pd[~unmarked_pd['z_cluster'].isin(marked_pd['z_cluster'])] - - return candidate_pairs_pd - -# COMMAND ---------- - -# DBTITLE 1,Assign Labels to Candidate Pairs -# assign label to candidate pair -def assign_label(candidate_pairs_pd, z_cluster, label): - ''' - The purpose of this function is to assign a label to a candidate pair - identified by its z_cluster value. Valid labels include: - 0 - not matched - 1 - matched - 2 - uncertain - ''' - - # assign label - candidate_pairs_pd.loc[ candidate_pairs_pd['z_cluster']==z_cluster, 'z_isMatch'] = label - - return - -# persist labels to marked folder -def save_labels(candidate_pairs_pd): - ''' - The purpose of this function is to save labeled pairs to the unmarked folder. - ''' - - # make dir if not exists - dbutils.fs.mkdirs(MARKED_DIR) - - # save labeled data to file - candidate_pairs_pd.to_parquet( - '/dbfs/' + MARKED_DIR +'/markedRecords_'+ str(time.time_ns()/1000) + '.parquet', - compression='snappy', - index=False # do not include index - ) - - return - - -def count_labeled_pairs(): - ''' - The purpose of this function is to count the labeled pairs in the marked folder. - ''' - - # create initial dataframes - marked_pd = pd.DataFrame({'z_cluster':[]}) - - # read unmarked pairs - try: - marked_pd = pd.read_parquet( - '/dbfs'+ MARKED_DIR, - engine='pyarrow' - ) - except: - pass - - n_total = len(np.unique(marked_pd['z_cluster'])) - n_positive = len(np.unique(marked_pd[marked_pd['z_isMatch']==1]['z_cluster'])) - n_negative = len(np.unique(marked_pd[marked_pd['z_isMatch']==0]['z_cluster'])) - - return n_positive, n_negative, n_total - -# COMMAND ---------- - -# DBTITLE 1,Setup DropDown Widget -# setup widget -available_labels = { - 'No Match':0, - 'Match':1, - 'Uncertain':2 - } -dbutils.widgets.dropdown('label', 'Uncertain', available_labels.keys(), 'Is this pair a match?') - -# COMMAND ---------- - -# MAGIC %md ##Step 1: Label Training Data -# MAGIC -# MAGIC With our helper function in place, we can now implement the first step of the initial workflow. Within this step, Zingg will read an initial set of input data and from it generate a set of record pairs that it believes may be duplicates. As "expert data reviewers", we will review each pair and label it as either a *Match* or *No Match*. (We may also label it as *Uncertain* if we cannot determine if the records are a match.) The labeled data will be persisted to facilitate the model training step that follows.

-# MAGIC -# MAGIC -# MAGIC -# MAGIC The Zingg job called for this step is *zingg_initial_findTrainingData*. This job uses a set of *blocking* techniques to identify potential duplicates in the initial dataset. Some techniques work better than others so as you perform the multiple cycles of candidate pair generation and labeling required before we can proceed to model training, you will notice some candidate pairs suggestions are better than others. It is common that the quality of the suggestions ebbs and flows as you move through various pair generation cycles. Please understand that this is part of the process by which Zingg learns. - -# COMMAND ---------- - -# DBTITLE 1,Reset Label Data (Optional) -# only enable this if you want to -# delete all previously labeled data -if True: - reset_labels(reset_model=True) - print('Labeled data deleted') - -# COMMAND ---------- - -# MAGIC %md To get started, we generate our candidate pairs: -# MAGIC -# MAGIC **NOTE** This step will trigger the *zingg_initial_findTrainingData* job only if no unlabeled pairs exist from prior runs. If unlabeled pairs exist from prior runs, the routine will return those to be labeled before triggering the job. Because different algorithms are used to identify potential duplicates between runs, some instances of the will run noticeably longer than others. - -# COMMAND ---------- - -# DBTITLE 1,Get Data (Run Once Per Cycle) -# get candidate pairs -candidate_pairs_pd = get_candidate_pairs() - -# if no candidate pairs, run job and wait -if candidate_pairs_pd.shape[0] == 0: - print('No unlabeled candidate pairs found. Running findTraining job ...') - run_find_training_job() - candidate_pairs_pd = get_candidate_pairs() - -# get list of pairs (as identified by z_cluster) to label -z_clusters = list(np.unique(candidate_pairs_pd['z_cluster'])) - -# identify last reviewed cluster -last_z_cluster = '' # none yet - -# print candidate pair stats -print('{0} candidate pairs found for labeling'.format(len(z_clusters))) - -# COMMAND ---------- - -# MAGIC %md With candidate pairs now available for labeling, we are presented with one pair at a time and are tasked with using the drop-down widget at the top of this notebook to assign a label to each one. As you consider each pair, remember that pairs with a shared *recid* values are definitely duplicates but some duplicates may exist for which the *recid* values differ. (Per our job configurations, the *recid* field is not used for record matching.) -# MAGIC -# MAGIC Once the widget reflects your label assignment, re-run the cell to assign the label and bring up the next pair: -# MAGIC -# MAGIC **NOTE** Changing the value of the widget will trigger the following cell to re-execute. You can disable this functionality by clicking ion the settings icon in the widget bar and changing *On Widget Change* to *Do Nothing*. - -# COMMAND ---------- - -# DBTITLE 1,Perform Labeling (Run Repeatedly Until All Candidate Pairs Labeled) -# get current label setting (which is from last cluster) -last_label = available_labels[dbutils.widgets.get('label')] - -# assign label to last cluster -if last_z_cluster != '': - assign_label(candidate_pairs_pd, last_z_cluster, last_label) - -# get next cluster to label -try: - z_cluster = candidate_pairs_pd[(candidate_pairs_pd['z_isMatch']==-1) & (candidate_pairs_pd['z_cluster'] != last_z_cluster)].head(1)['z_cluster'].values[0] -except: - pass - z_cluster = '' - -# present the next pair -if z_cluster != '': - print('IS THIS PAIR A MATCH?') - print(f"Current widget setting will label this as '{dbutils.widgets.get('label')}'.") - print('Change widget value if different label required.\n') - print( - tabulate( - candidate_pairs_pd[candidate_pairs_pd['z_cluster']==z_cluster][['recid','givenname','surname','suburb','postcode']], - headers = 'keys', - tablefmt = 'psql' - ) - ) -else: - print('All candidate pairs have been labeled.\n') - -# hold last items for assignnment in next run -last_z_cluster = z_cluster - -# if no more to label -if last_z_cluster == '': - - # save labels - save_labels(candidate_pairs_pd) - - # count labels accumulated - n_pos, n_neg, n_tot = count_labeled_pairs() - print(f'You have accumulated {n_pos} pairs labeled as positive matches.') - print("If you need more pairs to label, re-run the cell titled 'Get Data (Run Once Per Cycle).'") - -# COMMAND ---------- - -# MAGIC %md You are encouraged to execute the block above repeatedly until all candidate pairs are labeled. Here we provide some marked data to make sure the labeling outcome is consistent between runs. Please remove the following block if you would like to use your own marked data to train your model. - -# COMMAND ---------- - -dbutils.fs.rm(MARKED_DIR, True) -dbutils.fs.rm(UNMARKED_DIR, True) -dbutils.fs.cp("s3://db-gtm-industry-solutions/data/rcg/customer_er/data/marked/", MARKED_DIR, True) -dbutils.fs.cp("s3://db-gtm-industry-solutions/data/rcg/customer_er/data/unmarked/", UNMARKED_DIR, True) - -# COMMAND ---------- - -# MAGIC %md Before moving on to the next phase, it's a good idea to review the labels assigned to the candidate pairs for errors: - -# COMMAND ---------- - -# DBTITLE 1,Review Labeled Pairs -marked_pd = pd.read_parquet( - '/dbfs'+ MARKED_DIR, - engine='pyarrow' - ) - -display(marked_pd) - -# COMMAND ---------- - -# MAGIC %md Should you have any mislabeled pairs, simply run the following with the appropriate substitutions for each pair you wish to correct: -# MAGIC -# MAGIC ``` -# MAGIC -# MAGIC # set values here -# MAGIC z_cluster = 'Z_CLUSTER VALUE ASSOCIATED WITH PAIR TO RELABEL' -# MAGIC new_label = available_labels['VALUE FROM WIDGET DROP DOWN TO ASSIGN'] -# MAGIC -# MAGIC -# MAGIC # read existing data -# MAGIC marked_pd = pd.read_parquet( -# MAGIC '/dbfs'+ MARKED_DIR, -# MAGIC engine='pyarrow' -# MAGIC ) -# MAGIC -# MAGIC # assign new label -# MAGIC marked_pd.loc[ marked_pd['z_cluster']==z_cluster, 'z_isMatch'] = label -# MAGIC -# MAGIC # delete old records -# MAGIC dbutils.fs.rm(MARKED_DIR, recurse=True) -# MAGIC dbutils.fs.mkdirs(MARKED_DIR) -# MAGIC -# MAGIC # write updated records -# MAGIC marked_pd.to_parquet( -# MAGIC '/dbfs' + MARKED_DIR +'/markedRecords_'+ str(time.time_ns()/1000) + '.parquet', -# MAGIC compression='snappy', -# MAGIC index=False # do not include index -# MAGIC ) -# MAGIC -# MAGIC ``` - -# COMMAND ---------- - -# MAGIC %md ## Step 2: Train Model on Labeled Data -# MAGIC -# MAGIC To train the model against the labeled pairs, we simply kickoff the *zingg_initial_train* job which call's Zingg's *train* logic. In this job, the labeled pairs are used to train a model which scores candidate pairs (generated by Zingg in later stages) for the probability of a match:

-# MAGIC -# MAGIC - -# COMMAND ---------- - -# DBTITLE 1,Train the Model -train_job = ZinggJob( config['job']['initial']['train'], config['job']['databricks workspace url'], config['job']['api token']) -train_job.run_and_wait() - -# COMMAND ---------- - -# MAGIC %md ## Step 3: Perform Initial Deduplication -# MAGIC -# MAGIC Using the trained model, we can examine the initial dataset to identify clusters of records. A cluster is a group of records that are believed to be duplicates of one another. The Zingg *match* logic combines both blocking and candidate pair scoring to generate the clustered results:

-# MAGIC -# MAGIC -# MAGIC -# MAGIC **NOTE** Given the volume of initial data and the scale of the Databricks cluster assigned to the *match* job, this step may take a while to run. -# MAGIC -# MAGIC Once the matched data are generated, we will capture the output to a set of tables that will enable incremental processing. This table structure is as follows:

-# MAGIC -# MAGIC - -# COMMAND ---------- - -# DBTITLE 1,Clear Output from Any Prior Runs -dbutils.fs.rm(OUTPUT_DIR, recurse=True) - -# COMMAND ---------- - -# DBTITLE 1,Run Match Job -match_job = ZinggJob( config['job']['initial']['match'], config['job']['databricks workspace url'], config['job']['api token']) -match_job.run_and_wait() - -# COMMAND ---------- - -# MAGIC %md Exploring the output of the match job, we can see how our model groups potentially duplicate records. The output groups duplicates around a shared *z_cluster* value. A min and max score, *i.e.* *z_minScore* and *z_maxScore*, is assigned to each cluster to help us understand the certainty with which records are assigned within a given cluster. For clusters comprised of only one record, *i.e.* no duplicates were believed to have been found, these scores are omitted: - -# COMMAND ---------- - -# DBTITLE 1,Review Matches -# retrieve initial matches -results = ( - spark - .read - .parquet(OUTPUT_DIR) - .orderBy('z_cluster', ascending=True) - ) - -# persist results to temp view -results.createOrReplaceTempView('matches') - -# retrieve results from temp view -display(spark.table('matches')) - -# COMMAND ---------- - -# MAGIC %md A quick spot check reveals that most clusters consist of 1 or 2 records. However, there may be clusters with many more records based on how the duplicates were engineered for this dataset. To get a quick sense of the cohesion of our clusters, we might take a quick look at the number of clusters associated with different counts of records: - -# COMMAND ---------- - -# DBTITLE 1,Examine Number of Clusters by Record Count -# MAGIC %sql -# MAGIC -# MAGIC SELECT -# MAGIC record_count, -# MAGIC COUNT(z_cluster) as clusters -# MAGIC FROM ( -# MAGIC SELECT -# MAGIC z_cluster, -# MAGIC count(*) as record_count -# MAGIC FROM matches -# MAGIC GROUP BY z_cluster -# MAGIC ) -# MAGIC GROUP BY record_count -# MAGIC ORDER BY record_count - -# COMMAND ---------- - -# MAGIC %md A review the number of clusters by cluster member count shows that the majority of our clusters are believed to contain just a few duplicate records. However, there are quite a few clusters with a large number of records associated with them. It might be worth examining these to understand what may be happening here but it's important to keep in mind that we never expect perfection from our model. If we feel our model could be better at defining clusters, it's important we return to the labeling phase of our work and then re-train and re-match our data. -# MAGIC -# MAGIC With that in mind, it would be helpful if Zingg provided some high-level metrics and diagnostics to help us understand the performance of our model. The reality is that outside of evaluation scenarios where we may have some form of ground-truth against to evaluate our results, its very difficult to clearly identify the precision of a model such as this. Quite often, the best we can do is review the results and make a judgement call based on the volume of identifiable errors and the patterns associated with those errors to develop a sense of whether our model's performance is adequate for our needs. - -# COMMAND ---------- - -# MAGIC %md If we are satisfied with our results, we can capture our cluster data to a table structure that will allow us to more easily perform incremental data processing. - -# COMMAND ---------- - -# DBTITLE 1,Create Table Structures -# MAGIC %sql -# MAGIC -# MAGIC DROP TABLE IF EXISTS clusters; -# MAGIC -# MAGIC CREATE TABLE IF NOT EXISTS clusters ( -# MAGIC cluster_id bigint GENERATED ALWAYS AS IDENTITY, -# MAGIC z_cluster string -# MAGIC ) -# MAGIC USING DELTA; -# MAGIC -# MAGIC DROP TABLE IF EXISTS cluster_members; -# MAGIC -# MAGIC CREATE TABLE IF NOT EXISTS cluster_members ( -# MAGIC cluster_id integer, -# MAGIC givenname string, -# MAGIC surname string, -# MAGIC suburb string, -# MAGIC postcode string -# MAGIC ) -# MAGIC USING DELTA; - -# COMMAND ---------- - -# MAGIC %md Now we can insert our cluster data into the appropriate table. Note that we are modifying the integer-based cluster identifier generated during the matching process to be a string with an appended unique identifier. If we chose to re-run the match step, the *z_cluster* values generated will overlap with those generated in prior runs. If we wish to hold-on to some prior cluster assignments, it might be helpful to distinguish between clusters generated in different *match* cycles: - -# COMMAND ---------- - -# DBTITLE 1,Get Unique Identifier -# create a unique identifier -guid = str(uuid.uuid4()) -print(f"A unique identifier of '{guid}' will be assigned to clusters from this run.") - -# COMMAND ---------- - -# DBTITLE 1,Insert Clusters Data -_ = spark.sql(f""" -INSERT INTO clusters (z_cluster) -SELECT DISTINCT - CONCAT('{guid}',':',CAST(z_cluster as string)) -FROM matches -""") - -display(spark.table('clusters')) - -# COMMAND ---------- - -# MAGIC %md And now we can insert the cluster members: - -# COMMAND ---------- - -# DBTITLE 1,Insert Record-to-Cluster Mapping Table -_ = spark.sql(f""" -INSERT INTO cluster_members (cluster_id, givenname, surname, suburb, postcode) -SELECT DISTINCT - a.cluster_id, - b.givenname, - b.surname, - b.suburb, - b.postcode -FROM clusters a -INNER JOIN matches b - ON a.z_cluster=CONCAT('{guid}',':',CAST(b.z_cluster as string)) -""") - -display(spark.table('cluster_members').orderBy('cluster_id')) - -# COMMAND ---------- - -# MAGIC %md Of course, we may have some clusters we might want to manually correct. Using standard DELETE, UPDATE and INSERT statements, we can update the delta lake formatted tables in this environment to achieve the results we require. If we create new clusters to which to assign users as part of a manual correction, we might create a new entry in the *clusters* table by simply inserting a value as follows: -# MAGIC -# MAGIC ``` -# MAGIC import uuid -# MAGIC -# MAGIC # create new entry -# MAGIC guid= str(uuid.uuid4()) -# MAGIC _ = spark.sql(f"INSERT INTO clusters (z_cluster) VALUES('{guid}')") -# MAGIC -# MAGIC # get cluster id for new entry -# MAGIC cluster_id = spark.sql(f"SELECT cluster_id FROM clusters WHERE z_cluster='{guid}'").collect()[0]['cluster_id'] -# MAGIC print(f"New z_cluster '{guid}' created with cluster_id of {cluster_id}") -# MAGIC -# MAGIC ``` - -# COMMAND ---------- - -# MAGIC %md But what happens if we decide to retrain our model after we've setup these mappings? As mentioned above, retraining our model will cause new clusters with overlapping integer *z_cluster* identifiers to be created. In this scenario, you need to decide whether you wish to preserve any manually adjusted mappings from before or otherwise start over from scratch. If starting over, then simply drop and recreate the *clusters* and *cluster_members* tables. If preserving manually adjusted records, the GUID value associated with each cluster will keep the cluster identifiers unique. You'll need to then decide how records assigned to clusters by the newly re-trained model should be merged with the preserved cluster data. It's a bit of a juggle so this isn't something you'll want to do on a regular basis. - -# COMMAND ---------- - -# MAGIC %md -# MAGIC -# MAGIC © 2022 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source). All included or referenced third party libraries are subject to the licenses set forth below. -# MAGIC -# MAGIC | library | description | license | source | -# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------| -# MAGIC | zingg | entity resolution library | GNU Affero General Public License v3.0 | https://github.com/zinggAI/zingg/ | -# MAGIC | tabulate | pretty-print tabular data in Python | MIT License | https://pypi.org/project/tabulate/ | -# MAGIC | filesplit | Python module that is capable of splitting files and merging it back | MIT License | https://pypi.org/project/filesplit/ | diff --git a/01_Prepare Data.py b/01_Prepare Data.py new file mode 100644 index 0000000..a9e4056 --- /dev/null +++ b/01_Prepare Data.py @@ -0,0 +1,284 @@ +# Databricks notebook source +# MAGIC %md The purpose of this notebook is to setup the data assets used in the Zingg Person Entity-Resolution solution accelerator. This notebook is available on https://github.com/databricks-industry-solutions/customer-er. + +# COMMAND ---------- + +# MAGIC %md ## Introduction +# MAGIC +# MAGIC In this notebook, we will download a dataset, break it up into records representing an initial data load and an incremental data load, and then persist the data to a database table. We also will take a moment to explore the data before proceeding to the entity resolution work taking place in the subsequent notebooks. + +# COMMAND ---------- + +# DBTITLE 1,Get Config +# MAGIC %run "./00_Intro & Config" + +# COMMAND ---------- + +# DBTITLE 1,Import Required Libraries +import pyspark.sql.functions as fn + +import os + +# COMMAND ---------- + +# MAGIC %md ## Step 1: Reset the Environment +# MAGIC +# MAGIC Zingg depends on having the right data in just the right place. To ensure we maintain a clean environment, we'll reset all the directories housing input, output and transient data. In most environments, such a step should not be necessary. This is just a precaution to ensure you get valid results should you run this series of notebooks multiple times: +# MAGIC +# MAGIC **NOTE** Running this step resets this solution accelerator. Do not run the following code unless you are reinitializing the solution accelerator in its entirity. +# MAGIC +# MAGIC **NOTE** The cell below depends on you having already created the mount point discussed at the top of Step 2 within this notebook. + +# COMMAND ---------- + +# DBTITLE 1,Reset the Directories +for k,v in config['dir'].items(): # for each directory identified in config + dbutils.fs.rm(v, recurse=True) # remove the dir and all child content + dbutils.fs.mkdirs(dir) # recreate empty dir + +# COMMAND ---------- + +# DBTITLE 1,Reset the Database +# delete database +_ = spark.sql('DROP DATABASE IF EXISTS {0} CASCADE'.format(config['database name'])) + +# create database to house mappings +_ = spark.sql('CREATE DATABASE IF NOT EXISTS {0}'.format(config['database name'])) + +# set database as default for queries +_ = spark.catalog.setCurrentDatabase(config['database name'] ) + +# COMMAND ---------- + +# MAGIC %md ## Step 2: Access Source Data +# MAGIC +# MAGIC For this solution accelerator, we'll make use of the [North Carolina Voters 5M](https://dbs.uni-leipzig.de/research/projects/object_matching/benchmark_datasets_for_entity_resolution) dataset made available by the [Database Group Leipzig](https://dbs.uni-leipzig.de/en). This dataset, more fully documented in [this paper](https://dbs.uni-leipzig.de/file/famer-adbis2017.pdf), contains name and limited address information for several million registered voters within the state of North Carolina. There are duplicate records purposefully inserted into the set with specific adjustments to make them fuzzy matchable, bringing the total number of records in the dataset to around 5-million and, hence, the name of the dataset. +# MAGIC +# MAGIC The dataset is made available for download as a gzipped TAR file which needs to be downloaded, unzipped, and untarred to a folder named *downloads* under a [mount point](https://docs.databricks.com/data/databricks-file-system.html#mount-object-storage-to-dbfs) you will need to setup in advance in your environment. In our environment, we've used a default name of */mnt/zingg_ncvoters* for our mount point. You can alter this in the *00* notebook if you've elected to create a mount point using a different name: + +# COMMAND ---------- + +# DBTITLE 1,Make Downloads Folder Location Accessible to Shell Script +os.environ['DOWNLOADS_FOLDER'] = '/dbfs' + config['dir']['downloads'] + +# COMMAND ---------- + +# DBTITLE 1,Download Data Assets +# MAGIC %sh +# MAGIC +# MAGIC # move to the downloads folder +# MAGIC rm -rf $DOWNLOADS_FOLDER +# MAGIC mkdir -p $DOWNLOADS_FOLDER +# MAGIC cd /$DOWNLOADS_FOLDER +# MAGIC +# MAGIC # download the data file +# MAGIC wget -q https://www.informatik.uni-leipzig.de/~saeedi/5Party-ocp20.tar.gz +# MAGIC +# MAGIC # decompress the data file +# MAGIC tar -xf 5Party-ocp20.tar.gz +# MAGIC mv ./5Party-ocp20/*.csv ./ +# MAGIC rm 5Party-ocp20.tar.gz +# MAGIC rm -r 5Party-ocp20 +# MAGIC +# MAGIC # list download folder contents +# MAGIC ls -l + +# COMMAND ---------- + +# MAGIC %md The North Carolina Voters 5M dataset consists of 5 files containing roughly 1-million records each. We will use the data in the first 4 files as our initial dataset and the 5th file for our incremental dataset: + +# COMMAND ---------- + +# DBTITLE 1,Verify File Count +# count the files in the downloads directory +file_count = len(dbutils.fs.ls(config['dir']['downloads'])) + +print('Expecting 5 files in {0}'.format(config['dir']['downloads'])) +print('Found {0} files in {1}'.format(file_count, config['dir']['downloads'])) + +# COMMAND ---------- + +# DBTITLE 1,Move Raw Inputs into Initial & Incremental Folders +# function to help with file copy +def copy_file_with_overwrite(from_file_path, to_file_path): + + # remove to-file if already exists + try: + dbutils.fs.rm(to_file_path) + except: + pass + + # copy from-file to intended destination + dbutils.fs.cp(from_file_path, to_file_path) + + +# for each file in downloaded dataset +for file in dbutils.fs.ls(config['dir']['downloads']): + + # determine file number (ncvr_numrec_1000000_modrec_2_ocp_20_myp__nump_5.csv) + file_num = int(file.name.split('_')[-3]) + + # if 0 - 3: copy to initial folder + if file_num < 4: + copy_file_with_overwrite(file.path, config['dir']['input'] + '/initial/' + file.name) + + # if 4: copy to incremental folder + elif file_num == 4: + copy_file_with_overwrite(file.path, config['dir']['input'] + '/incremental/' + file.name) + +# COMMAND ---------- + +# DBTITLE 1,Verify Initial Dataset Has 4 Files +display( + dbutils.fs.ls(config['dir']['input'] + '/initial') + ) + +# COMMAND ---------- + +# DBTITLE 1,Verify Incremental Dataset Has 1 File +display( + dbutils.fs.ls(config['dir']['input'] + '/incremental') + ) + +# COMMAND ---------- + +# MAGIC %md ##Step 3: Persist Data to Database Tables +# MAGIC +# MAGIC Next, we will move our initial and staging data into delta tables to enable easier access in later notebooks. It's important to note that to make use of a JAR-based library like Zingg within a [Unity Catalog](https://docs.databricks.com/data-governance/unity-catalog/index.html) enabled Databricks workspace, you will need to make these tables external tables at this point in time. You will more clearly see why in the subsequent notebooks as we define our Zingg pipelines: + +# COMMAND ---------- + +# DBTITLE 1,Define Logic to Write CSV Inputs to Delta Table +def csv_to_delta_table( + dbfs_folder_path, + table_name, + make_external=False + ): + + # read folder data to data frame + df = ( + spark + .read + .csv( + path=dbfs_folder_path, + sep=',', + header=True, + inferSchema=True + ) + ) + + # establish base writing logic + df_writer = df.write.format('delta').mode('overwrite').option('overwriteSchema','true') + + # make table external if required + if make_external: + df_writer = df_writer.option('path', f"{config['dir']['tables']}/{table_name}") + + # write data to table + _ = df_writer.saveAsTable(table_name) + +# COMMAND ---------- + +# DBTITLE 1,Write Initial Data to Delta Table +csv_to_delta_table( + dbfs_folder_path=config['dir']['input'] + '/initial/', + table_name='initial', + make_external=True + ) + +display( + spark.sql(f"DESCRIBE EXTENDED initial") + ) + +# COMMAND ---------- + +# DBTITLE 1,Write Incremental Data to Delta Table +csv_to_delta_table( + dbfs_folder_path=config['dir']['input'] + '/incremental/', + table_name='incremental', + make_external=True + ) + +display( + spark.sql(f"DESCRIBE EXTENDED incremental") + ) + +# COMMAND ---------- + +# MAGIC %md ## Step 4: Examine the Data +# MAGIC +# MAGIC To get a sense of the data, we'll examine the records in the initial dataset: + +# COMMAND ---------- + +# DBTITLE 1,Access Initial Data Set +initial = ( + spark + .table('initial') + ) + +display(initial) + +# COMMAND ---------- + +# MAGIC %md In the dataset, voters are identified based on the following fields:

+# MAGIC +# MAGIC * **givenname** - the first or *given* name of the person +# MAGIC * **surname** - the family or *surname* of the person +# MAGIC * **suburb** - the town, city or other municipal entity associated with the person +# MAGIC * **postcode** - the postal (zip) code associated with the person +# MAGIC +# MAGIC A unique identifier, *recid*, is used in the original dataset to identify unique records. This identifier enables us to validate some portion of our work but we will need to make sure Zingg ignores this field so that it does not learn that two rows with the same *recid* are a match. +# MAGIC +# MAGIC To create duplicates in the dataset, the team responsible for creating it simply re-inserted some number of the rows back into it without any modifications. Another set of duplicates was created by re-inserting rows while *corrupting* one or multiple of the 4 fields identified above. Corruptions take the form of the removal, replacement or reversal of some number or characters from within a string as would be typical of a poor data entry process. These duplicates are identifiable by their matching *recid* values: + +# COMMAND ---------- + +# DBTITLE 1,Identify Author-Generated Duplicates +# rec ids of records with multiple entries +dups = ( + initial + .select('recid') + .groupBy('recid') + .agg(fn.count('*').alias('recs')) + .filter('recs>1') + .select('recid') + ) + +# displa full record for identified dups +display( + initial + .join(dups, on='recid') + .orderBy('recid') + ) + +# COMMAND ---------- + +# MAGIC %md Still other duplicates are naturally occuring in the dataset. With a dataset of this size, it's not unexpected that some errors were not caught following data entry. For example, consider these records which appear to be exact duplicates but which have separate *recid* values. It is possible that two individuals within a given zip code have the same first and last name so that some of these records only appear to be duplicates given the lack of additional identifying data in the dataset. However, the uniqueness of some of these names would indicate that some are true duplicates in the original dataset: + +# COMMAND ---------- + +# DBTITLE 1,Identify Apparent Duplicates In Original Dataset +dups = ( + initial.alias('a') + .join( + initial.alias('b'), + on=['givenname','surname','suburb','postcode'] + ) + .filter('a.recid != b.recid') + .select('a.recid','a.givenname','a.surname','a.suburb','a.postcode') + .distinct() + .orderBy('givenname','surname','suburb','postcode') + ) + +display(dups) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC +# MAGIC © 2023 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source). All included or referenced third party libraries are subject to the licenses set forth below. +# MAGIC +# MAGIC | library | description | license | source | +# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------| +# MAGIC | zingg | entity resolution library | GNU Affero General Public License v3.0 | https://github.com/zinggAI/zingg/ | diff --git a/02_ Incremental Workflow.py b/02_ Incremental Workflow.py deleted file mode 100644 index aef584c..0000000 --- a/02_ Incremental Workflow.py +++ /dev/null @@ -1,497 +0,0 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC You may find this series of notebooks at https://github.com/databricks-industry-solutions/customer-er. For more information about this solution accelerator, visit https://www.databricks.com/solutions/accelerators/customer-entity-resolution. - -# COMMAND ---------- - -# MAGIC %md The purpose of this notebook is to demonstrate the incremental workflow by which Zingg applies a trained model to newly arriving data. - -# COMMAND ---------- - -# MAGIC %md ## Introduction -# MAGIC -# MAGIC The incremental Zingg workflow consists of two steps, each of which is intended to examine incoming data for the inclusion of duplicate records. These steps are:

-# MAGIC -# MAGIC 1. Identify duplicates between incoming and previously observed records (linking) -# MAGIC 2. Identify duplicates within the incoming dataset (matching) -# MAGIC -# MAGIC As part of this workflow, newly observed linked and matched records are appended to the set of previously observed data. This updated dataset then forms the basis for the next incremental update cycle. - -# COMMAND ---------- - -# DBTITLE 1,Initialize Config -# MAGIC %run "./00.0_ Intro & Config" - -# COMMAND ---------- - -# MAGIC %md **NOTE** This workflow is highly dependent upon data residing in the folder locations specified in the configuration files associated with the relevant Zingg jobs. If you change the following values, be sure to update the corresponding values in the appropriate configuration files. - -# COMMAND ---------- - -# DBTITLE 1,Set Additional Configurations -INCOMING_INPUT_DIR = config['dir']['input'] + '/incremental/incoming' -PRIOR_INPUT_DIR = config['dir']['input'] + '/incremental/prior' - -LINK_OUTPUT_DIR = config['dir']['output'] + '/incremental/link' -MATCH_OUTPUT_DIR = config['dir']['output'] + '/incremental/match' - -# COMMAND ---------- - -# DBTITLE 1,Import Required Libraries -import pandas as pd -import uuid -import pyspark.sql.functions as fn - -# COMMAND ---------- - -# MAGIC %md ## Step 0: Setup Required Data -# MAGIC -# MAGIC The incremental workflow is dependent upon the arrival of new records needing deduplication. To simulate this, we withheld one of the 5 data files in the downloaded dataset and split it into a smaller number of files with roughly 10,000 records in each. These files are currently housed in the staging folder: - -# COMMAND ---------- - -# DBTITLE 1,Examine Staging Folder Contents -display(dbutils.fs.ls(config['dir']['staging'])) - -# COMMAND ---------- - -# MAGIC %md To simulate the 'arrival' of one of these files, we'll simply move it into an incremental input folder: - -# COMMAND ---------- - -# DBTITLE 1,Moving Incoming Data into Input Folders -# delete any previous incremental data -dbutils.fs.rm(INCOMING_INPUT_DIR, recurse=True) - -# get name of next incremental file -for incr_file in dbutils.fs.ls(config['dir']['staging']): - if incr_file.size > 0: # skip subdirectories - break - -# copy file into position -dbutils.fs.cp(incr_file.path, INCOMING_INPUT_DIR + '/' + incr_file.name) - -# archive this incremental file -arch_file_name = config['dir']['staging'] + '/archived/' + incr_file.name -try: - dbutils.fs.rm(arch_file_name) -except: - pass - -dbutils.fs.mv(incr_file.path, arch_file_name) - -# display incoming data folder contents -display(dbutils.fs.ls(INCOMING_INPUT_DIR)) - -# COMMAND ---------- - -# MAGIC %md In addition to having access to incoming data, the linking step of this workflow requires access to data previously observed. We have been housing this data inside of a set of tables in a Databricks catalog. As this data may be modified through user activity, we will need to export this data to our input folder at the start of each incremental run: -# MAGIC -# MAGIC **NOTE** The link and match Zingg job's configuration files specify that an integer *recid* is present (but not used) in the incoming data. We did not capture this field as part of the *cluster_members* table. To meet the schema requirements, we'll put a dummy value of 0 in place of the *recid* as we setup the prior dataset. - -# COMMAND ---------- - -# DBTITLE 1,Move Prior Data into Link Input Folder -# clean up the prior input folder -dbutils.fs.rm(PRIOR_INPUT_DIR, recurse=True) - -# write latest version of priors to folder as csv (per specs in config file) -_ = ( - spark - .table('cluster_members') - .selectExpr('0 as recid','givenname','surname','suburb','postcode') - .write - .csv( - path=PRIOR_INPUT_DIR, - mode='overwrite', - sep=',', - header=True - ) - ) - -# COMMAND ---------- - -# MAGIC %md ## Step 1: Identify Duplicates Between Incoming & Previous Data -# MAGIC -# MAGIC With the required data in position, we can now link our incoming data to our priors by calling the appropriate Zingg job. The *link* logic executed through the *zingg_incremental_link* job compares the incoming and prior data to identify matches between the two using the model trained in the initial workflow:

-# MAGIC -# MAGIC - -# COMMAND ---------- - -# DBTITLE 1,Link Incoming to Prior Data -link_job = ZinggJob( config['job']['incremental']['link'], config['job']['databricks workspace url'], config['job']['api token']) -link_job.run_and_wait() - -# COMMAND ---------- - -# MAGIC %md The result of the link job is an output dataset identifying the records in the incoming data that are likely to match those in the prior data: -# MAGIC -# MAGIC **NOTE** With only 10,000 records per incremental file, it is possible that no linkages were detected between the incoming data and priors. -# MAGIC -# MAGIC **NOTE** We are coalescing all null values to empty strings to make comparisons in the code that follows easier to implement. - -# COMMAND ---------- - -# DBTITLE 1,Review Link Job Output -linked = ( - spark - .read - .parquet(LINK_OUTPUT_DIR) - .selectExpr( - "COALESCE(givenname,'') as givenname", - "COALESCE(surname,'') as surname", - "COALESCE(suburb,'') as suburb", - "COALESCE(postcode,'') as postcode", - 'z_score', - 'z_cluster', - 'z_source' - ) - ) - -display(linked.orderBy('z_cluster')) - -# COMMAND ---------- - -# MAGIC %md The link job output assigns a *z_cluster* value to records in the incoming dataset likely to match a record in the prior dataset. A *z_score* helps us understand the probability of that match. The *z_source* field differentiates between records coming from the prior and the incoming datasets. -# MAGIC -# MAGIC It's important to note that an incoming record may be linked to more than one prior records. Also, incoming records that do not have likely matches in the prior dataset (as determined by the blocking portion of the Zingg logic), will not appear in the linking output. This knowledge needs to be taken into the data processing steps that follow. -# MAGIC -# MAGIC To help us work with the linked data, we might separate those records from the prior dataset from those in the incoming dataset. For the prior dataset, we can lookup the *cluster_id* in our *cluster_members* table to make the appending of new data to that table easier in later steps: - -# COMMAND ---------- - -# DBTITLE 1,Get Linked Priors -linked_prior = ( - linked - .alias('a') - .filter(fn.expr("a.z_source = 'input_incremental_link_prior'")) - .join( - spark.table('cluster_members').alias('b'), - on=fn.expr(""" - a.givenname=COALESCE(b.givenname,'') AND - a.surname=COALESCE(b.surname,'') AND - a.suburb=COALESCE(b.suburb,'') AND - a.postcode=COALESCE(b.postcode,'') - """) - ) - .selectExpr( - 'a.givenname', - 'a.surname', - 'a.suburb', - 'a.postcode', - 'b.cluster_id', - 'a.z_cluster', - 'a.z_score', - 'a.z_source' - ) - ) -linked_prior.createOrReplaceTempView('linked_prior') - -# COMMAND ---------- - -# DBTITLE 1,Get Linked Incoming -linked_incoming = linked.filter(fn.expr("z_source = 'input_incremental_link_incoming'")) -linked_incoming.createOrReplaceTempView('linked_incoming') - -# COMMAND ---------- - -# MAGIC %md We can now handle this data through the following *actions*:

-# MAGIC -# MAGIC 1. For those prior records linked to incoming records with a probability above a given threshold, add the record to the *cluster_members* table (assuming the record is not identical to the one already in the table). -# MAGIC 2. For any priors assigned a linked incoming record not addressed in the prior step, hand those records over for expert review. - -# COMMAND ---------- - -# DBTITLE 1,Action 1: Accept Good, High Scoring Linkages -# MAGIC %sql -# MAGIC -# MAGIC INSERT INTO cluster_members (cluster_id, givenname, surname, suburb, postcode) -# MAGIC SELECT -- cluster assignment for these records -# MAGIC o.cluster_id, -# MAGIC m.givenname, -# MAGIC m.surname, -# MAGIC m.suburb, -# MAGIC m.postcode -# MAGIC FROM ( -- matching records with this high score -# MAGIC SELECT -# MAGIC x.givenname, -# MAGIC x.surname, -# MAGIC x.suburb, -# MAGIC x.postcode, -# MAGIC x.z_score -# MAGIC FROM linked_incoming x -# MAGIC INNER JOIN ( -# MAGIC SELECT -- highest scoring record for each unique entry -# MAGIC givenname, -# MAGIC surname, -# MAGIC suburb, -# MAGIC postcode, -# MAGIC max(z_score) as max_z_score -# MAGIC FROM linked_incoming -# MAGIC WHERE z_score >= 0.90 -- threshold -# MAGIC GROUP BY givenname, surname, suburb, postcode -# MAGIC ) y -# MAGIC ON x.givenname=y.givenname AND x.surname=y.surname AND x.suburb=y.suburb AND x.postcode=y.postcode AND x.z_score=y.max_z_score -# MAGIC GROUP BY -# MAGIC x.givenname, -# MAGIC x.surname, -# MAGIC x.suburb, -# MAGIC x.postcode, -# MAGIC x.z_score -# MAGIC HAVING COUNT(*) = 1 -- make sure only one record matches high score -# MAGIC ) m -# MAGIC INNER JOIN linked_incoming n -- locate the record and find its z_cluster value -# MAGIC ON m.givenname=n.givenname AND m.surname=n.surname AND m.suburb=n.suburb AND m.postcode=n.postcode AND m.z_score=n.z_score -# MAGIC INNER JOIN linked_prior o -- find the prior record for this z_cluster -# MAGIC ON n.z_cluster=o.z_cluster -# MAGIC WHERE -- something in the incoming record is different from what's already in prior -# MAGIC m.givenname != o.givenname OR -# MAGIC m.surname != o.surname OR -# MAGIC m.suburb != o.suburb OR -# MAGIC m.postcode != o.postcode - -# COMMAND ---------- - -# DBTITLE 1,Action 2: Apply Expert Review to Remaining Linkages -# MAGIC %sql -# MAGIC -# MAGIC WITH linked_incoming_not_in_cluster_members AS ( -# MAGIC SELECT -# MAGIC a.z_cluster, -# MAGIC a.givenname, -# MAGIC a.surname, -# MAGIC a.suburb, -# MAGIC a.postcode, -# MAGIC a.z_source, -# MAGIC a.z_score -# MAGIC FROM linked_incoming a -# MAGIC LEFT OUTER JOIN cluster_members b -# MAGIC ON a.givenname=COALESCE(b.givenname,'') AND a.surname=COALESCE(b.surname,'') AND a.suburb=COALESCE(b.suburb,'') AND a.postcode=COALESCE(b.postcode,'') -# MAGIC WHERE b.cluster_id Is Null -# MAGIC ) -# MAGIC SELECT * FROM linked_incoming_not_in_cluster_members -# MAGIC UNION ALL -# MAGIC SELECT -# MAGIC z_cluster, -# MAGIC givenname, -# MAGIC surname, -# MAGIC suburb, -# MAGIC postcode, -# MAGIC z_source, -# MAGIC z_score -# MAGIC FROM linked_prior -# MAGIC WHERE -# MAGIC z_cluster IN (SELECT z_cluster FROM linked_incoming_not_in_cluster_members) -# MAGIC ORDER BY z_cluster, z_source DESC - -# COMMAND ---------- - -# MAGIC %md ## Step 2: Identify Duplicates within Incoming Data -# MAGIC -# MAGIC The *link* logic identifies matches between the incoming and the prior data. Records not linked to prior data can be assumed to not be in the prior dataset. But before simply inserting these new records into the database tables, we need to identify any duplicates in the incoming records themselves. This is handled by applying the *match* logic to the incoming data.

-# MAGIC -# MAGIC - -# COMMAND ---------- - -# DBTITLE 1,Match Records within Incoming Data -match_job = ZinggJob( config['job']['incremental']['match'], config['job']['databricks workspace url'], config['job']['api token']) -match_job.run_and_wait() - -# COMMAND ---------- - -# DBTITLE 1,Review Clusters -# retrieve matches -matches = ( - spark - .read - .parquet(MATCH_OUTPUT_DIR) - .orderBy('z_cluster', ascending=True) - ) - -# persist results to temp view -matches.createOrReplaceTempView('matches') - -# retrieve results from temp view -display(spark.table('matches')) - -# COMMAND ---------- - -# MAGIC %md With clusters assigned within the dataset, we can break down out data persistence actions as follows:

-# MAGIC -# MAGIC 1. If a matching cluster has no records already in *cluster_members*, we can simply insert these clusters into the *clusters* and *cluster_members* tables. -# MAGIC 2. If a matching cluster has records already in *cluster_members* where there is linkage to one and only one cluster as recorded in the *clusters* table, then we can move all the matching cluster members under that cluster id. -# MAGIC 3. Any remaining matching clusters will require a manual review. - -# COMMAND ---------- - -# DBTITLE 1,Get Unique Identifier -# create a unique identifier -guid = str(uuid.uuid4()) -print(f"A unique identifier of '{guid}' will be assigned to clusters from this run.") - -# COMMAND ---------- - -# DBTITLE 1,Action 1: No Linkage Exists -_ = ( # insert cluster records - spark - .sql( - f""" - INSERT INTO clusters (z_cluster) - WITH z_clusters_with_records_in_cluster_members AS ( - SELECT DISTINCT - a.z_cluster - FROM matches a - INNER JOIN cluster_members b - ON - COALESCE(a.givenname,'')=COALESCE(b.givenname,'') AND - COALESCE(a.surname,'')=COALESCE(b.surname,'') AND - COALESCE(a.suburb,'')=COALESCE(b.suburb,'') AND - COALESCE(a.postcode,'')=COALESCE(b.postcode,'') - ) - SELECT DISTINCT - CONCAT('{guid}',':', CAST(z_cluster as string)) - FROM matches - WHERE - z_cluster NOT IN ( - SELECT z_cluster FROM z_clusters_with_records_in_cluster_members - ) AND - CONCAT('{guid}',':', CAST(z_cluster as string)) NOT IN (SELECT z_cluster FROM clusters) -- avoid repeat inserts - """ - ) - ) - -_ = ( # insert cluster members - spark - .sql( - f""" - INSERT INTO cluster_members (cluster_id, givenname, surname, suburb, postcode) - WITH z_clusters_with_records_in_cluster_members AS ( - SELECT DISTINCT - a.z_cluster - FROM matches a - INNER JOIN cluster_members b - ON - COALESCE(a.givenname,'')=COALESCE(b.givenname,'') AND - COALESCE(a.surname,'')=COALESCE(b.surname,'') AND - COALESCE(a.suburb,'')=COALESCE(b.suburb,'') AND - COALESCE(a.postcode,'')=COALESCE(b.postcode,'') - ) - SELECT p.* - FROM ( - SELECT DISTINCT - n.cluster_id, - m.givenname, - m.surname, - m.suburb, - m.postcode - FROM matches m - INNER JOIN ( - SELECT DISTINCT - y.cluster_id, - x.z_cluster - FROM matches x - INNER JOIN clusters y - ON CONCAT('{guid}',':', CAST(x.z_cluster as string))=y.z_cluster - WHERE - x.z_cluster NOT IN ( - SELECT z_cluster FROM z_clusters_with_records_in_cluster_members - ) - ) n - ON m.z_cluster=n.z_cluster - ) p - LEFT OUTER JOIN cluster_members q -- avoid repeat inserts - ON - COALESCE(p.givenname,'')=COALESCE(q.givenname,'') AND - COALESCE(p.surname,'')=COALESCE(q.surname,'') AND - COALESCE(p.suburb,'')=COALESCE(q.suburb,'') AND - COALESCE(p.postcode,'')=COALESCE(q.postcode,'') - WHERE q.cluster_id Is Null - """ - ) - ) - -# COMMAND ---------- - -# DBTITLE 1,Action 2: Linkage through Match Cluster -# MAGIC %sql -# MAGIC -# MAGIC INSERT INTO cluster_members (cluster_id, givenname, surname, suburb, postcode) -# MAGIC WITH z_clusters_with_linkage ( -# MAGIC SELECT DISTINCT -# MAGIC a.z_cluster, -# MAGIC b.cluster_id -# MAGIC FROM matches a -# MAGIC INNER JOIN cluster_members b -# MAGIC ON -# MAGIC COALESCE(a.givenname,'')=COALESCE(b.givenname,'') AND -# MAGIC COALESCE(a.surname,'')=COALESCE(b.surname,'') AND -# MAGIC COALESCE(a.suburb,'')=COALESCE(b.suburb,'') AND -# MAGIC COALESCE(a.postcode,'')=COALESCE(b.postcode,'') -# MAGIC ), -# MAGIC z_clusters_only_one_cluster_linkage AS ( -# MAGIC SELECT -# MAGIC z_cluster -# MAGIC FROM z_clusters_with_linkage -# MAGIC GROUP BY z_cluster -# MAGIC HAVING COUNT(*)=1 -# MAGIC ) -# MAGIC SELECT DISTINCT -# MAGIC y.cluster_id, -# MAGIC x.givenname, -# MAGIC x.surname, -# MAGIC x.suburb, -# MAGIC x.postcode -# MAGIC FROM matches x -# MAGIC INNER JOIN z_clusters_with_linkage y -# MAGIC ON x.z_cluster=y.z_cluster -# MAGIC LEFT OUTER JOIN cluster_members z -# MAGIC ON -# MAGIC y.cluster_id=z.cluster_id AND -# MAGIC COALESCE(x.givenname,'')=COALESCE(z.givenname,'') AND -# MAGIC COALESCE(x.surname,'')=COALESCE(z.surname,'') AND -# MAGIC COALESCE(x.suburb,'')=COALESCE(z.suburb,'') AND -# MAGIC COALESCE(x.postcode,'')=COALESCE(z.postcode,'') -# MAGIC WHERE -# MAGIC x.z_cluster IN (SELECT z_cluster FROM z_clusters_only_one_cluster_linkage) AND -# MAGIC z.cluster_id Is Null - -# COMMAND ---------- - -# DBTITLE 1,Action 3: Matches Requiring Manual Review -# MAGIC %sql -# MAGIC -# MAGIC WITH z_clusters_with_records_NOT_in_cluster_members AS ( -# MAGIC SELECT DISTINCT -# MAGIC a.z_cluster -# MAGIC FROM matches a -# MAGIC LEFT OUTER JOIN cluster_members b -# MAGIC ON -# MAGIC COALESCE(a.givenname,'')=COALESCE(b.givenname,'') AND -# MAGIC COALESCE(a.surname,'')=COALESCE(b.surname,'') AND -# MAGIC COALESCE(a.suburb,'')=COALESCE(b.suburb,'') AND -# MAGIC COALESCE(a.postcode,'')=COALESCE(b.postcode,'') -# MAGIC WHERE -# MAGIC b.cluster_id Is Null -# MAGIC ) -# MAGIC SELECT * -# MAGIC FROM matches -# MAGIC WHERE -# MAGIC z_cluster IN (SELECT z_cluster FROM z_clusters_with_records_NOT_in_cluster_members) - -# COMMAND ---------- - -# MAGIC %md With the incremental data processed and the *cluster_members* table updated, we are ready to repeat this workflow for the next incoming set of data. - -# COMMAND ---------- - -# MAGIC %md -# MAGIC -# MAGIC © 2022 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source). All included or referenced third party libraries are subject to the licenses set forth below. -# MAGIC -# MAGIC | library | description | license | source | -# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------| -# MAGIC | zingg | entity resolution library | GNU Affero General Public License v3.0 | https://github.com/zinggAI/zingg/ | -# MAGIC | tabulate | pretty-print tabular data in Python | MIT License | https://pypi.org/project/tabulate/ | -# MAGIC | filesplit | Python module that is capable of splitting files and merging it back | MIT License | https://pypi.org/project/filesplit/ | diff --git a/02_Initial Workflow.py b/02_Initial Workflow.py new file mode 100644 index 0000000..48730b7 --- /dev/null +++ b/02_Initial Workflow.py @@ -0,0 +1,634 @@ +# Databricks notebook source +# MAGIC %md The purpose of this notebook is to retrieve and label data as part of the the initial workflow in the Zingg Person Entity-Resolution solution accelerator. This notebook is available on https://github.com/databricks-industry-solutions/customer-er. + +# COMMAND ---------- + +# MAGIC %md ## Introduction +# MAGIC +# MAGIC The purpose of this notebook is to train a Zingg model using an initial dataset within which we know there are some duplicate records. With Zingg, this initial workload is executed through three sequential tasks (though the third task is optional). These tasks are:

+# MAGIC +# MAGIC 1. **findTrainingData** - identify pairs of records that are candidates for being a match. The user will label these pairs as a match, no match or uncertain. +# MAGIC 2. **train** - train a Zingg model using the labeled pairs resulting from user interactions on data coming from (multiple iterations of) the findTrainingData task. +# MAGIC 3. **match** - use the trained Zingg model to match duplicate records in the initial dataset. +# MAGIC

+# MAGIC +# MAGIC **NOTE** Because *train* and *match* are so frequently run together, Zingg provides a combined *trainMatch* task that we will employ to minimize the time to complete our work. We will continue to speak of these as two separate tasks but they will be executed as a combined task in this notebook. +# MAGIC +# MAGIC To enable this sequence of tasks, we will need to configure Zingg to understand where to read the initial data as its input. We will also need to specify an output location, though only the *match* task will write data to this location. +# MAGIC +# MAGIC While the *train* task does not write any explicit output, it does persist trained model assets to the *zingg* model folder. The *findTrainingData* task will also send intermidiary data outputs to an *unmarked* folder under the *zingg* model folder. (It will also expect us to place some data in a *marked* frolder location under that same *zingg* model folder.) Understanding where these assets reside can be helpful should you need to troubleshoot certain issues you might encounter during the *findTrainingData* or *train* tasks. +# MAGIC +# MAGIC Before jumping into these steps, we first need to verify the Zingg application JAR is properly installed on this cluster and then install the Zingg Python wrapper which provides a simple API for using the application: + +# COMMAND ---------- + +# DBTITLE 1,Verify Zingg JAR Installed +# set default zingg path +zingg_jar_path = None + +# for each jar in the jars folder +for j in dbutils.fs.ls('/FileStore/jars') + dbutils.fs.ls('/tmp/solacc/customer_er/jar/'): # either manually extracted and uploaded jar or use the RUNME Notebook automation + # locate the zingg jar + if '-zingg_' in j.path: + zingg_jar_path = j.path + print(f'Zingg JAR found at {zingg_jar_path}') + break + +if zingg_jar_path is None: + raise Exception('The Zingg JAR was NOT found. Please install the JAR file before proceeding.') + +# COMMAND ---------- + +# DBTITLE 1,Install Zingg Python Library +# MAGIC %pip install zingg + +# COMMAND ---------- + +# DBTITLE 1,Initialize Config +# MAGIC %run "./00_Intro & Config" + +# COMMAND ---------- + +# DBTITLE 1,Import Required Libraries +import pandas as pd +import numpy as np + +import time +import uuid + +from zingg.client import Arguments, ClientOptions, ZinggWithSpark +from zingg.pipes import Pipe, FieldDefinition, MatchType + +from ipywidgets import widgets, interact + +import pyspark.sql.functions as fn + +# COMMAND ---------- + +# MAGIC %md ##Step 1: Configure the Zingg Client +# MAGIC +# MAGIC With Zingg's new [Python API](https://docs.zingg.ai/zingg/working-with-python), we can configure the Zingg tasks to read data from a given input data set, perform matching based on logic assigned to each field in the dataset, and generate output results (as part of the *match* task) in a specific format and structure. These inputs are captured as a collection of arguments, the first of which we will assign being those associated with the model and its folder location: + +# COMMAND ---------- + +# DBTITLE 1,Initialize Zingg Arguments +args = Arguments() + +# COMMAND ---------- + +# DBTITLE 1,Assign Model Arguments + +# this is where zingg models and intermediary assets will be stored +args.setZinggDir(config['dir']['zingg'] ) + +# this uniquely identifies the model you are training +args.setModelId(config['model name']) + +# COMMAND ---------- + +# MAGIC %md Our next arguments are the input and output [pipes](https://docs.zingg.ai/zingg/connectors/pipes) which define where and in what format data is read or written. +# MAGIC +# MAGIC For our input pipe, we are reading from a table in the [delta lake format](https://delta.io/). Because this format captures schema information, we do not need to provide any additional structural details. However, because Zingg doesn't have the ability to read this as a table in a Unity Catalog-enabled Databricks workspace, we've implemented the input table as an [external table](https://docs.databricks.com/sql/language-manual/sql-ref-external-tables.html) and are pointing our input pipe to the location where that table houses its data: + +# COMMAND ---------- + +# DBTITLE 1,Config Model Inputs +# get location of initial table's data +input_path = spark.sql("DESCRIBE DETAIL initial").select('location').collect()[0]['location'] + +# configure Zingg input pipe +inputPipe = Pipe(name='initial', format='delta') +inputPipe.addProperty('path', input_path ) + +# add input pipe to arguments collection +args.setData(inputPipe) + +# COMMAND ---------- + +# DBTITLE 1,Config Model Outputs +output_dir = config['dir']['output'] + '/initial' + +# configure Zingg output pipe +outputPipe = Pipe(name='initial_matched', format='delta') +outputPipe.addProperty('path', output_dir) + +# add output pipe to arguments collection +args.setOutput(outputPipe) + +# COMMAND ---------- + +# MAGIC %md Next, we need to define how each field from our input pipe will be used by Zingg. This is what Zingg refers to as a [field definition](https://docs.zingg.ai/zingg/stepbystep/configuration/field-definitions). The logic accessible to Zingg depends on the Zingg MatchType assigned to each field. The MatchTypes supported by Zingg at the time this notebook was developed were: +# MAGIC

+# MAGIC +# MAGIC * **DONT_USE** - appears in the output but no computation is done on these +# MAGIC * **EMAIL** - matches only the id part of the email before the @ character +# MAGIC * **EXACT** - no tolerance with variations, Preferable for country codes, pin codes, and other categorical variables where you expect no variations +# MAGIC * **FUZZY** - broad matches with typos, abbreviations, and other variations +# MAGIC * **NULL_OR_BLANK** - by default Zingg marks matches as +# MAGIC * **NUMERIC** - extracts numbers from strings and compares how many of them are same across both strings +# MAGIC * **NUMERIC_WITH_UNITS** - extracts product codes or numbers with units, for example 16gb from strings and compares how many are same across both strings +# MAGIC * **ONLY_ALPHABETS_EXACT** - only looks at the alphabetical characters and compares if they are exactly the same +# MAGIC * **ONLY_ALPHABETS_FUZZY** - ignores any numbers in the strings and then does a fuzzy comparison +# MAGIC * **PINCODE** - matches pin codes like xxxxx-xxxx with xxxxx +# MAGIC * **TEXT** - compares words overlap between two strings +# MAGIC +# MAGIC For our needs, we'll make use of fuzzy matching on most of our fields while instructing Zingg to ignore the *recid* field as discussed in notebook *01*: + +# COMMAND ---------- + +# DBTITLE 1,Configure Field Definitions +# define logic for each field in incoming dataset +recid = FieldDefinition('recid', 'integer', MatchType.DONT_USE) +givenname = FieldDefinition("givenname", 'string', MatchType.FUZZY) +surname = FieldDefinition('surname', 'string', MatchType.FUZZY) +suburb = FieldDefinition('suburb', 'string', MatchType.FUZZY) +postcode = FieldDefinition('postcode', 'string', MatchType.FUZZY) + +# define sequence of fields to receive +field_defs = [recid, givenname, surname, suburb, postcode] + +# add field definitions to arguments collection +args.setFieldDefinition(field_defs) + +# COMMAND ---------- + +# MAGIC %md Lastly, we need to configure a few settings that affect Zingg performance. +# MAGIC +# MAGIC On Databricks, Zingg runs as a distributed process. We want to ensure that Zingg can more fully take advantage of the distributed processing capabilities of the platform by dividing the data across some number of partitions aligned with the computational resources of our Databricks cluster. +# MAGIC +# MAGIC We also want Zingg to sample our data at various stages. Too big of a sample and Zingg will run slowly. Too small a sample and Zingg will struggle to find enough samples to learn. We typically will use a sample size between 0.0001 and 0.1, but finding the right value for a given dataset is more art that science: + +# COMMAND ---------- + +# DBTITLE 1,Config Performance Settings +# define number of partitions to distribute data across +args.setNumPartitions( sc.defaultParallelism * 20 ) # default parallelism reflects databricks's cluster capacity + +# define sample size +args.setLabelDataSampleSize(0.1) + +# COMMAND ---------- + +# MAGIC %md With all our Zingg configurations defined, we can now setup the Zingg client. The client is configured for specific tasks. The first task we will focus on is the [findTrainingData](https://docs.zingg.ai/zingg/stepbystep/createtrainingdata/findtrainingdata) task which tests various techniques for identifying matching data: + +# COMMAND ---------- + +# DBTITLE 1,Define findTrainingData Task +# define task +findTrainingData_options = ClientOptions([ClientOptions.PHASE, 'findTrainingData']) + +# configure findTrainingData task +findTrainingData = ZinggWithSpark(args, findTrainingData_options) + +# initialize task +findTrainingData.init() + +# COMMAND ---------- + +# MAGIC %md When we are done labeling the data generated through (multiple iterations of) the *findTrainingData* task, we will need to launch the *trainMatch* task. This task combines two smaller tasks, *i.e.* *[train](https://docs.zingg.ai/zingg/stepbystep/train)* and *[match](https://docs.zingg.ai/zingg/stepbystep/match)*, which train the Zingg model using the labeled data and generate output containing potential matches from the initial (input) dataset: + +# COMMAND ---------- + +# DBTITLE 1,Define trainMatch Task +# define task +trainMatch_options = ClientOptions([ClientOptions.PHASE, 'trainMatch']) + +# configure findTrainingData task +trainMatch = ZinggWithSpark(args, trainMatch_options) + +# initialize task +trainMatch.init() + +# COMMAND ---------- + +# MAGIC %md ##Step 2: Label Training Data +# MAGIC +# MAGIC With our tasks defined, we can now focus on our first task, *i.e.* *findTrainData*, and the labeling of the candidate pairs it produces. Within this step, Zingg will read an initial set of input data and from it generate a set of record pairs that it believes may be duplicates. As "expert data reviewers", we will review each pair and label it as either a *Match* or *No Match*. (We may also label it as *Uncertain* if we cannot determine if the records are a match.) +# MAGIC +# MAGIC In order to learn which techniques tend to lead to good matching, we will need to perform the labeling step numerous times. You will notice that some runs generate better results than others. This is Zingg testing out different approaches. You will want to iterate through this step numerous times until you accumulate enough labeled pairs to produce good model results. We suggest starting with 40 or more matches before attempting to train your model, but if you find after training that you aren't getting good results, you can always re-run this step to add more labeled matches to the original set of labeled records. +# MAGIC +# MAGIC That said, if you ever need to start over with a given Zingg model, you will want to either change the Zingg directory being used to persist these labeled pairs or delete the Zingg directory altogether. We have provided a function to assist with that. Be sure to set the *reset* flag used by the function appropriately for your needs: + +# COMMAND ---------- + +# DBTITLE 1,Get Unmarked & Marked Folder Locations +# this is where Zingg stores unmarked candidate pairs produced by the findTrainData task +UNMARKED_DIR = findTrainingData.getArguments().getZinggTrainingDataUnmarkedDir() + +# this is where you store your marked candidate pairs that will be read by the Zingg train task +MARKED_DIR = findTrainingData.getArguments().getZinggTrainingDataMarkedDir() + +# COMMAND ---------- + +# DBTITLE 1,Reset the Zingg Dir +def reset_zingg(): + # drop entire zingg dir (including matched and unmatched data) + dbutils.fs.rm(findTrainingData.getArguments().getZinggDir(), recurse=True) + # drop output data + dbutils.fs.rm(output_dir, recurse=True) + return + +# determine if to reset the environment +reset = False + +if reset: + reset_zingg() + +# COMMAND ---------- + +# MAGIC %md To assist with the reading of unmarked and marked pairs, we have defined a simple function. It's called at the top of the label assignment logic (later) to produce the pairs that will be presented to the user. If no data is found that requires labeling, it triggers the Zingg *findTrainingData* task to generate new candidate pairs. That task can take a while to complete depending on the volume of data and performance-relevant characteristics assigned in the tasks's configuration (above): + +# COMMAND ---------- + +# DBTITLE 1,Define Candidate Pair Retrieval Function +# retrieve candidate pairs +def get_candidate_pairs(): + + # define internal function to restrict recursive calls + def _get_candidate_pairs(depth=0): + + # initialize marked and unmarked dataframes to enable + # comparisons (even when there is no data on either side) + unmarked_pd = pd.DataFrame({'z_cluster':[]}) + marked_pd = pd.DataFrame({'z_cluster':[]}) + + # read unmarked pairs + try: + tmp_pd = pd.read_parquet( + '/dbfs'+ findTrainingData.getArguments().getZinggTrainingDataUnmarkedDir(), + engine='pyarrow' + ) + if tmp_pd.shape[0] != 0: unmarked_pd = tmp_pd + except: + pass + + # read marked pairs + try: + tmp_pd = pd.read_parquet( + '/dbfs'+ findTrainingData.getArguments().getZinggTrainingDataMarkedDir(), + engine='pyarrow' + ) + if tmp_pd.shape[0] != 0: marked_pd = tmp_pd + except: + pass + + # get unmarked not in marked + candidate_pairs_pd = unmarked_pd[~unmarked_pd['z_cluster'].isin(marked_pd['z_cluster'])] + candidate_pairs_pd.reset_index(drop=True, inplace=True) + + # test to see if anything found to label: + if depth > 1: # too deep, leave + return candidate_pairs_pd + + elif candidate_pairs_pd.shape[0] == 0: # nothing found, trigger zingg and try again + + print('No unmarked candidate pairs found. Running findTraining job ...','\n') + findTrainingData.execute() + + candidate_pairs_pd = _get_candidate_pairs(depth+1) + + return candidate_pairs_pd + + + return _get_candidate_pairs() + +# COMMAND ---------- + +# MAGIC %md Now we can present our candidate pairs for labeling. We are using [ipywidgets](https://ipywidgets.readthedocs.io/en/stable/) to make the presentation of these data and the assignment of labels a bit more presentable, but a Databricks notebook is not intended to replace a proper end user UI. +# MAGIC +# MAGIC To assign labels to a pair, run the cell below. Once the data are presented, you can use the provided buttons to mark each pair. When you are done, you can save your label assignments by running the cell that immediately follows. Once you have accumulated a sufficient number of matches - 40 should be used as a minimum for most datasets - you can move on to subsequent steps. Until you have accumulated the required amount, you will need to repeatedly run these cells (remembering to save following label assignment) until you've hit your goal: + +# COMMAND ---------- + +# DBTITLE 1,Label Training Set +# define variable to avoid duplicate saves +ready_for_save = False + +# user-friendly labels and corresponding zingg numerical value +# (the order in the dictionary affects how displayed below) +LABELS = { + 'Uncertain':2, + 'Match':1, + 'No Match':0 + } + +# GET CANDIDATE PAIRS +# ======================================================== +candidate_pairs_pd = get_candidate_pairs() +n_pairs = int(candidate_pairs_pd.shape[0]/2) +# ======================================================== + +# DEFINE IPYWIDGET DISPLAY +# ======================================================== +display_pd = candidate_pairs_pd.drop(labels=['z_zid', 'z_prediction', 'z_score', 'z_isMatch'], axis=1) + +# define header to be used with each displayed pair +html_prefix = "

" +html_suffix = "

" +header = widgets.HTML(value=f"{html_prefix}" + "
".join([str(i)+"  " for i in display_pd.columns.to_list()]) + f"
{html_suffix}") + +# initialize display +vContainers = [] +vContainers.append(widgets.HTML(value=f'

Indicate if each of the {n_pairs} record pairs is a match or not

')) + +# for each set of pairs +for n in range(n_pairs): + + # get candidate records + candidate_left = display_pd.loc[2*n].to_list() + candidate_right = display_pd.loc[(2*n)+1].to_list() + + # reformat candidate records for html + left = widgets.HTML(value=html_prefix + "
".join([str(i) for i in candidate_left]) + html_suffix) + right = widgets.HTML(value=html_prefix + "
".join([str(i) for i in candidate_right]) + html_suffix) + + # define pair for presentation + presented_pair = widgets.HBox(children=[header, left, right]) + + # assign label options to pair + label = widgets.ToggleButtons( + options=LABELS.keys(), + button_style='info' + ) + + # define blank line between displayed pair and next + blankLine=widgets.HTML(value='' + '-'.rjust(105,"-") + '') + + # append pair, label and blank line to widget structure + vContainers.append(widgets.VBox(children=[presented_pair, label, blankLine])) + +# present widget +display(widgets.VBox(children=vContainers)) +# ======================================================== + +# mark flag to allow save +ready_for_save = True + +# COMMAND ---------- + +# DBTITLE 1,Save Assigned Labels +if not ready_for_save: + print('No labels have been assigned. Run the previous cell to create candidate pairs and assign labels to them before re-running this cell.') + +else: + + # ASSIGN LABEL VALUE TO CANDIDATE PAIRS IN DATAFRAME + # ======================================================== + # for each pair in displayed widget + for pair in vContainers[1:]: + + # get pair and assigned label + html_content = pair.children[0].children[1].get_interact_value() # the displayed pair as html + user_assigned_label = pair.children[1].get_interact_value() # the assigned label + + # extract candidate pair id from html pair content + str_beg = len(html_prefix) + str_end = html_content.index("
") + pair_id = html_content[str_beg:str_end] # aka z_cluster + + # assign label to candidate pair entry in dataframe + candidate_pairs_pd.loc[candidate_pairs_pd['z_cluster']==pair_id, 'z_isMatch'] = LABELS.get(user_assigned_label) + # ======================================================== + + # SAVE LABELED DATA TO ZINGG FOLDER + # ======================================================== + # make target directory if needed + dbutils.fs.mkdirs(MARKED_DIR) + + # save label assignments + candidate_pairs_pd.to_parquet( + '/dbfs' + MARKED_DIR + f'/markedRecords_'+ str(time.time_ns()/1000) + '.parquet', + compression='snappy', + index=False, # do not include index + engine='pyarrow' + ) + # ======================================================== + + # COUNT MARKED MATCHES + # ======================================================== + marked_matches = 0 + try: + tmp_pd = pd.read_parquet( '/dbfs' + MARKED_DIR, engine='pyarrow') + marked_matches = int(tmp_pd[tmp_pd['z_isMatch'] == LABELS['Match']].shape[0] / 2) + except: + pass + + # show current status of process + print('Labels saved','\n') + print(f'You now have labeled {marked_matches} matches.') + print("If you need more pairs to label, re-run the previous cell and assign more labels.") + # ======================================================== + + # save completed + ready_for_save = False + +# COMMAND ---------- + +# MAGIC %md +# MAGIC +# MAGIC Before moving on to the next phase, it's a good idea to review the labels assigned to the candidate pairs for errors: + +# COMMAND ---------- + +# DBTITLE 1,Review Labeled Pairs +marked_pd = pd.read_parquet( + '/dbfs'+ MARKED_DIR, + engine='pyarrow' + ) + +display(marked_pd) + +# COMMAND ---------- + +# MAGIC %md Should you have any mislabeled pairs, simply run the following with the appropriate substitutions for each pair you wish to correct: +# MAGIC +# MAGIC ``` +# MAGIC +# MAGIC # set values here +# MAGIC z_cluster = 'Z_CLUSTER VALUE ASSOCIATED WITH PAIR TO RELABEL' +# MAGIC user_assigned_label = 'Match | Unmatch | Uncertain' +# MAGIC +# MAGIC +# MAGIC +# MAGIC # read existing data +# MAGIC marked_pd = pd.read_parquet( +# MAGIC '/dbfs'+ MARKED_DIR, +# MAGIC engine='pyarrow' +# MAGIC ) +# MAGIC +# MAGIC # assign new label +# MAGIC marked_pd.loc[ marked_pd['z_cluster']==z_cluster, 'z_isMatch'] = LABELS.get(user_assigned_label) +# MAGIC +# MAGIC # delete old records +# MAGIC dbutils.fs.rm(MARKED_DIR, recurse=True) +# MAGIC dbutils.fs.mkdirs(MARKED_DIR) +# MAGIC +# MAGIC # write updated records +# MAGIC marked_pd.to_parquet( +# MAGIC '/dbfs' + MARKED_DIR +'/markedRecords_'+ str(time.time_ns()/1000) + '.parquet', +# MAGIC compression='snappy', +# MAGIC index=False, # do not include index +# MAGIC engine='pyarrow' +# MAGIC ) +# MAGIC +# MAGIC ``` + +# COMMAND ---------- + +# MAGIC %md ##Step 3: Train the Model & Perform Initial Match +# MAGIC +# MAGIC With a set of labeled data in place, we can now *train* our Zingg model. A common action performed immediately after this is the identification of duplicates (through a *match* operation) within the initial dataset. While we could implement these in two tasks, these so frequently occur together that Zingg provides an option to combine the two tasks into one action: + +# COMMAND ---------- + +# DBTITLE 1,Clear Old Outputs from Any Prior Runs +dbutils.fs.rm(output_dir, recurse=True) + +# COMMAND ---------- + +# DBTITLE 1,Execute the trainMatch Step +trainMatch.execute() + +# COMMAND ---------- + +# MAGIC %md The end result of the *train* portion of this task is a Zingg model, housed in the Zingg model directory. If we ever want to retrain this model, we can re-run the findTrainingData step and add more labeled pairs to our marked dataset and re-run this action. We may need to do that if we notice a lot of poor quality matches or if Zingg complains about having insufficient data to train our model. But once this is run, we should verify a model has been recorded in the Zingg model directory: +# MAGIC +# MAGIC **NOTE** If Zingg complains about *insufficient data*, you may want to restart your Databricks cluster and retry the step before creating additional labeled pairs. + +# COMMAND ---------- + +# DBTITLE 1,Examine Zingg Model Folder +display( + dbutils.fs.ls( trainMatch.getArguments().getZinggModelDir() ) + ) + +# COMMAND ---------- + +# MAGIC %md The end result of the *match* portion of the last task are outputs representing potential duplicates in our dataset. We can examine those outputs as follows: + +# COMMAND ---------- + +# DBTITLE 1,Examine Initial Output +matches = ( + spark + .read + .format('delta') + .option('path', output_dir) + .load() + .orderBy('z_cluster') + ) + +display(matches) + +# COMMAND ---------- + +# MAGIC %md In the *match* output, we see records associated with one another through a *z_cluster* assignment. The strongest and weakest association of that record with any other member of the cluster is reflected in the *z_minScore* and *z_maxScore* values, respectively. We will want to consider these values as we decide which matches to accept and which to reject. +# MAGIC +# MAGIC Also, we can look at the number of records within each cluster. Most clusters will consist of 1 or 2 records. Thta said there will be other clusters with abnormally large numbers of entries that desere a bit more scrutiny: + +# COMMAND ---------- + +# DBTITLE 1,Examine Number of Clusters by Record Count +display( + matches + .groupBy('z_cluster') + .agg( + fn.count('*').alias('records') + ) + .groupBy('records') + .agg( + fn.count('*').alias('clusters') + ) + .orderBy('records') + ) + +# COMMAND ---------- + +# MAGIC %md As we review the match output, it seems it would be helpful if Zingg provided some high-level metrics and diagnostics to help us understand the performance of our model. The reality is that outside of evaluation scenarios where we may have some form of ground-truth against to evaluate our results, its very difficult to clearly identify the precision of a model such as this. Quite often, the best we can do is review the results and make a judgement call based on the volume of identifiable errors and the patterns associated with those errors to develop a sense of whether our model's performance is adequate for our needs. +# MAGIC +# MAGIC Even if we have a high-performing model, we will have members we will have some matches that we will need to reject. For this exercise, we will accept all the cluster assignment suggestions but in a real-world implementation, you'd want to accept only those cluster and cluster member assignments where all the entries are above a given threshold, *i.e.* *z_minScore*. Any clusters not meeting this criteria would require a manual review. +# MAGIC +# MAGIC If we are satisfied with our results, we can capture our cluster data to a table structure that will allow us to more easily perform incremental data processing. Please note that we are assigning our own unique identifier to the clusters as the *z_cluster* value Zingg assigns is specific to the output associated with a task: + +# COMMAND ---------- + +# DBTITLE 1,Get Clusters & IDs +# custom function to generate a guid string +@fn.udf('string') +def guid(): + return str(uuid.uuid1()) + +# get distinct clusters and assign a guid to each +clusters = ( + matches + .select('z_cluster') + .distinct() + .withColumn('cluster_id', guid()) + .cache() # cache to avoid re-generating guids with subsequent calls + ) + +clusters.count() # force full dataset into memory + +display( clusters ) + +# COMMAND ---------- + +# MAGIC %md And now we can persist our data to tables named *clusters* and *cluster_members*: + +# COMMAND ---------- + +# DBTITLE 1,Write Clusters +_ = ( + clusters + .select('cluster_id') + .withColumn('datetime', fn.current_timestamp()) + .write + .format('delta') + .mode('overwrite') + .option('overwriteSchema','true') + .saveAsTable('clusters') + ) + +display(spark.table('clusters')) + +# COMMAND ---------- + +# DBTITLE 1,Write Cluster Members +_ = ( + matches + .join(clusters, on='z_cluster') + .selectExpr( + 'cluster_id', + 'recid', + 'givenname', + 'surname', + 'suburb', + 'postcode', + 'z_minScore', + 'z_maxScore', + 'current_timestamp() as datetime' + ) + .write + .format('delta') + .mode('overwrite') + .option('overwriteSchema','true') + .saveAsTable('cluster_members') + ) + +display(spark.table('cluster_members')) + +# COMMAND ---------- + +# MAGIC %md Upon review of the persisted data, it's very likely we will encounter records we need to correct. Using standard [DELETE](https://docs.databricks.com/sql/language-manual/delta-delete-from.html), [UPDATE](https://docs.databricks.com/sql/language-manual/delta-update.html) and [INSERT](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-dml-insert-into.html) statements, we can update the delta lake formatted tables in this environment to achieve the results we require. +# MAGIC +# MAGIC But what happens if we decide to retrain our model after we've setup these mappings? As mentioned above, retraining our model will cause new clusters with overlapping integer *z_cluster* identifiers to be created. In this scenario, you need to decide whether you wish to preserve any manually adjusted mappings from before or otherwise start over from scratch. If starting over, then simply drop and recreate the *clusters* and *cluster_members* tables. If preserving manually adjusted records, the GUID value associated with each cluster will keep the cluster identifiers unique. You'll need to then decide how records assigned to clusters by the newly re-trained model should be merged with the preserved cluster data. It's a bit of a juggle so this isn't something you'll want to do on a regular basis. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC +# MAGIC © 2023 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source). All included or referenced third party libraries are subject to the licenses set forth below. +# MAGIC +# MAGIC | library | description | license | source | +# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------| +# MAGIC | zingg | entity resolution library | GNU Affero General Public License v3.0 | https://github.com/zinggAI/zingg/ | diff --git a/03_Incremental Workflow.py b/03_Incremental Workflow.py new file mode 100644 index 0000000..2c23e25 --- /dev/null +++ b/03_Incremental Workflow.py @@ -0,0 +1,407 @@ +# Databricks notebook source +# MAGIC %md The purpose of this notebook is to perform incremental processing of *incoming* data to existing records using a previously trained model as part of the Zingg Person Entity-Resolution solution accelerator. This notebook is available on https://github.com/databricks-industry-solutions/customer-er. + +# COMMAND ---------- + +# MAGIC %md ## Introduction +# MAGIC +# MAGIC The incremental Zingg workflow consists of two tasks, each of which is intended to examine incoming data for the inclusion of duplicate records. These tasks are:

+# MAGIC +# MAGIC 1. **link** - identify duplicates between incoming and previously observed records +# MAGIC 2. **match** - identify duplicates within the incoming dataset +# MAGIC +# MAGIC These tasks are performed on a portion of the data withheld from the initial workflow, referred to earilier as our *incremental* dataset. +# MAGIC +# MAGIC As before, we need to verify the installation of the Zingg JAR file, install the Zingg Python API, and complete configurations: + +# COMMAND ---------- + +# DBTITLE 1,Verify Zingg JAR Installed +# set default zingg path +zingg_jar_path = None + +# for each jar in the jars folder +for j in dbutils.fs.ls('/FileStore/jars') + dbutils.fs.ls('/tmp/solacc/customer_er/jar/'): # either manually extracted and uploaded jar or use the RUNME Notebook automation + # locate the zingg jar + if '-zingg_' in j.path: + zingg_jar_path = j.path + print(f'Zingg JAR found at {zingg_jar_path}') + break + +if zingg_jar_path is None: + raise Exception('The Zingg JAR was NOT found. Please install the JAR file before proceeding.') + +# COMMAND ---------- + +# DBTITLE 1,Install Zingg Python Library +# MAGIC %pip install zingg + +# COMMAND ---------- + +# DBTITLE 1,Initialize Config +# MAGIC %run "./00_Intro & Config" + +# COMMAND ---------- + +# DBTITLE 1,Import Required Libraries +from zingg.client import Arguments, ClientOptions, ZinggWithSpark +from zingg.pipes import Pipe, FieldDefinition, MatchType + +import pyspark.sql.functions as fn + +# COMMAND ---------- + +# MAGIC %md ##Step 1: Perform Record Linking +# MAGIC +# MAGIC The *[link](https://docs.zingg.ai/zingg/stepbystep/link)* task performs a comparison of two different datasets to determine which members of one are likely matches for the other. If we are thinking of this as an incremental workflow, our incoming, incremental dataset will serve as one of the datasets and the previously processed data against which we wish to compare these for matches will serve as the other. +# MAGIC +# MAGIC Both datasets will serve as inputs into our *link* task. Because we need both datasets to have the same data structure, we won't simply point to the previously processed data in the *cluster_members* table but instead will extract the relevant fields as follows: + +# COMMAND ---------- + +# DBTITLE 1,Record Prior Members +# define path where to save the prior data +prior_data_dir = config['dir']['input'] + '/prior' + +# save the data to a file location +_ = ( + spark + .table('cluster_members') + .selectExpr('recid','givenname','surname','suburb','postcode') + .write + .format('delta') + .mode('overwrite') + .option('overwriteSchema','true') + .save(prior_data_dir) + ) + +# display data +display( + spark + .read + .format('delta') + .load(prior_data_dir) + ) + +# COMMAND ---------- + +# MAGIC %md With our priors saved as such, we might then define the *link* task. Notice that we are defining two input pipes, the first of which is the priors and the second of which is the incoming/incremental. The order in which they are added to our task configuration does not matter: + +# COMMAND ---------- + +# DBTITLE 1,Initialize Zingg Arguments +args = Arguments() + +# COMMAND ---------- + +# DBTITLE 1,Assign Model Arguments +# this is where zingg models, labels, and other data will be stored +args.setZinggDir(config['dir']['zingg'] ) + +# this uniquely identifies the model you are training +args.setModelId(config['model name']) + +# COMMAND ---------- + +# DBTITLE 1,Config Model Inputs +# configure priors Zingg input pipe +priors_inputPipe = Pipe(name='priors', format='delta') +priors_inputPipe.addProperty('path', prior_data_path) +args.setData(priors_inputPipe) + +# configure incoming Zingg input pipe +incoming_input_path = spark.sql("DESCRIBE DETAIL incremental").select('location').collect()[0]['location'] +incoming_inputPipe = Pipe(name='incoming', format='delta') +incoming_inputPipe.addProperty('path', incoming_input_path ) + +# set input data pipelines +args.setData(priors_inputPipe, incoming_inputPipe) + +# COMMAND ---------- + +# DBTITLE 1,Config Model Output +linked_output_dir = config['dir']['output'] + '/incremental/linked' + +# configure Zingg output pipe +outputPipe = Pipe(name='linked', format='delta') +outputPipe.addProperty('path', linked_output_dir) + +# add output pipe to arguments collection +args.setOutput(outputPipe) + +# COMMAND ---------- + +# DBTITLE 1,Configure Field Definitions +# define logic for each field in incoming dataset +recid = FieldDefinition('recid', 'integer', MatchType.DONT_USE) +givenname = FieldDefinition("givenname", 'string', MatchType.FUZZY) +surname = FieldDefinition('surname', 'string', MatchType.FUZZY) +suburb = FieldDefinition('suburb', 'string', MatchType.FUZZY) +postcode = FieldDefinition('postcode', 'string', MatchType.FUZZY) + +# define sequence of fields to receive +field_defs = [recid, givenname, surname, suburb, postcode] + +# add field definitions to arguments collection +args.setFieldDefinition(field_defs) + +# COMMAND ---------- + +# DBTITLE 1,Config Performance Settings +# define number of partitions to distribute data across +args.setNumPartitions( sc.defaultParallelism * 20 ) # default parallelism reflects databricks's cluster capacity + +# define sample size +args.setLabelDataSampleSize(0.1) + +# COMMAND ---------- + +# DBTITLE 1,Define Link Task +# define task +link_options = ClientOptions([ClientOptions.PHASE, 'link']) + +# configure findTrainingData task +link = ZinggWithSpark(args, link_options) + +# initialize task +link.init() + +# COMMAND ---------- + +# MAGIC %md We can now run the *link* task as follows: + +# COMMAND ---------- + +# DBTITLE 1,Link Incoming & Prior Records +link.execute() + +# COMMAND ---------- + +# MAGIC %md With the task completed, we can examine the output to see which incoming records are associated with which priors: + +# COMMAND ---------- + +# DBTITLE 1,Review Linked Record Output +linked = ( + spark + .read + .format('delta') + .load(LINKED_OUTPUT_DIR) + .selectExpr( + 'z_score', + 'z_cluster', + 'z_source', + 'COALESCE(recid,-1) as recid', + "COALESCE(givenname,'') as givenname", + "COALESCE(surname,'') as surname", + "COALESCE(suburb,'') as suburb", + "COALESCE(postcode,'') as postcode" + ) + ) + +display(linked.orderBy(['z_cluster', 'z_source'], ascending=[True, False])) + +# COMMAND ---------- + +# MAGIC %md The link job output assigns a *z_cluster* value to records in the incoming dataset likely to match a record in the prior dataset. A *z_score* helps us understand the probability of that match. The *z_source* field differentiates between records coming from the prior and the incoming datasets. Please note that if a prior record is in a cluster with multiple incoming records, it's *z_score* reflects the highest scored incoming match. +# MAGIC +# MAGIC It's important to note that an incoming record may be linked to more than one prior records. Also, incoming records that do not have likely matches in the prior dataset (as determined by the blocking portion of the Zingg logic), will not appear in the linking output. This knowledge needs to be taken into the data processing steps that follow. +# MAGIC +# MAGIC To help us work with the linked data, we might separate those records from the prior dataset from those in the incoming dataset. For the prior dataset, we can lookup the *cluster_id* in our *cluster_members* table to make the appending of new data to that table easier in later steps: + +# COMMAND ---------- + +# DBTITLE 1,Get Linked Priors +linked_prior = ( + linked + .alias('a') + .filter(fn.expr("a.z_source = 'priors'")) + .join( + spark.table('cluster_members').alias('b'), + on=fn.expr(""" + a.recid=COALESCE(b.recid,-1) + a.givenname=COALESCE(b.givenname,'') AND + a.surname=COALESCE(b.surname,'') AND + a.suburb=COALESCE(b.suburb,'') AND + a.postcode=COALESCE(b.postcode,'') + """) + ) + .selectExpr( + 'b.cluster_id', + 'a.z_cluster', + 'COALESCE(a.recid,-1) as recid', + "COALESCE(a.givenname,'') as givenname", + "COALESCE(a.surname,'') as surname", + "COALESCE(a.suburb,'') as suburb", + "COALESCE(a.postcode,'') as postcode" + ) + ) + +display( + linked_prior + ) + +# COMMAND ---------- + +# DBTITLE 1,Get Linked Incoming +# get priors +linked_incoming = ( + linked + .filter(fn.expr("z_source = 'incoming'")) + .selectExpr( + 'z_cluster', + 'z_score', + 'COALESCE(recid,-1) as recid', + "COALESCE(givenname,'') as givenname", + "COALESCE(surname,'') as surname", + "COALESCE(suburb,'') as suburb", + "COALESCE(postcode,'') as postcode" + ) + ) + +# get highest scored priors if multiple entries +max_linked_incoming = ( + linked_incoming + .groupBy( + 'recid','givenname','surname','suburb','postcode' + ) + .agg(fn.max('z_score').alias('z_score')) + ) + +# restrict to just the highest scored +linked_incoming = ( + linked_incoming + .join(max_linked_incoming, on=['recid','givenname','surname','suburb','postcode','z_score']) + ) + +display( + linked_incoming + ) + +# COMMAND ---------- + +# MAGIC %md We now have our set of prior records to which one or more incoming records have a linkage. We also have the highest scored version of an incoming record and its link-cluster assignment. It is possible that an incoming record could be linked to two different prior records with identical scores so we'll need to take that into consideration as we design our persistance logic. That logic should then be something like the following:

+# MAGIC +# MAGIC 1. If there is only one link for an incoming record and the score for that record is above a given upper threshold, assign that record to its linked prior's cluster. +# MAGIC 2. If there are multiple links for an incoming record and the score for those records is above a given upper threshold, send those records to manual review. +# MAGIC 3. If a record is below a given lower threshold, reject the record as a possible match. +# MAGIC 4. If a record is between a given lower and upper threshold, send that record to manual review. + +# COMMAND ---------- + +# MAGIC %md ##Step 2: Perform Record Matching +# MAGIC +# MAGIC The *[match](https://docs.zingg.ai/zingg/stepbystep/match)* task is now used to examine potential matches within the *incoming* dataset. The configuration for this task is more straightforward than with *link* as we are only dealing with one input dataset and closely mirrors the configuration used in the last notebook, though our input is the *incoming* dataset: + +# COMMAND ---------- + +# DBTITLE 1,Initialize Zingg Arguments +args = Arguments() + +# COMMAND ---------- + +# DBTITLE 1,Assign Model Arguments +# this is where zingg models, labels, and other data will be stored +args.setZinggDir(config['dir']['zingg'] ) + +# this uniquely identifies the model you are training +args.setModelId(config['model name']) + +# COMMAND ---------- + +# DBTITLE 1,Config Model Inputs +# configure incoming Zingg input pipe +incoming_input_path = spark.sql("DESCRIBE DETAIL incremental").select('location').collect()[0]['location'] +incoming_inputPipe = Pipe(name='incoming', format='delta') +incoming_inputPipe.addProperty('path', incoming_input_path ) + +# set input data pipelines +args.setData(incoming_inputPipe) + +# COMMAND ---------- + +# DBTITLE 1,Config Model Outputs +matched_output_dir = config['dir']['output'] + '/incremental/matched' + +# configure Zingg output pipe +outputPipe = Pipe(name='matched', format='delta') +outputPipe.addProperty('path', matched_output_dir) + +# add output pipe to arguments collection +args.setOutput(outputPipe) + +# COMMAND ---------- + +# DBTITLE 1,Configure Field Definitions +# define logic for each field in incoming dataset +recid = FieldDefinition('recid', 'integer', MatchType.DONT_USE) +givenname = FieldDefinition("givenname", 'string', MatchType.FUZZY) +surname = FieldDefinition('surname', 'string', MatchType.FUZZY) +suburb = FieldDefinition('suburb', 'string', MatchType.FUZZY) +postcode = FieldDefinition('postcode', 'string', MatchType.FUZZY) + +# define sequence of fields to receive +field_defs = [recid, givenname, surname, suburb, postcode] + +# add field definitions to arguments collection +args.setFieldDefinition(field_defs) + +# COMMAND ---------- + +# DBTITLE 1,Define Match Task +# define task +match_options = ClientOptions([ClientOptions.PHASE, 'match']) + +# configure findTrainingData task +match = ZinggWithSpark(args, match_options) + +# initialize task +match.init() + +# COMMAND ---------- + +# MAGIC %md We can now run the *match* task to look for matches within the incoming dataset: + +# COMMAND ---------- + +# DBTITLE 1,Identify Matches in Incoming Dataset +match.execute() + +# COMMAND ---------- + +# DBTITLE 1,Review Matched Records +# retrieve matches +matches = ( + spark + .read + .format('delta') + .load(matched_output_dir) + .selectExpr( # reorder fields for easier interpretation + 'z_cluster', + 'z_minScore', + 'z_maxScore', + "COALESCE(recid,-1) as recid", + "COALESCE(givenname,'') as givenname", + "COALESCE(surname,'') as surname", + "COALESCE(suburb,'') as suburb", + "COALESCE(postcode,'') as postcode" + ) + .orderBy('z_cluster') + ) + +# retrieve results from temp view +display(matches) + +# COMMAND ---------- + +# MAGIC %md The output of the incremental match is the same as the output of the initial match (though the records involved are the same). As before, you'll want to carefully consider which matches to accept and which to reject. And once you've got that sorted, you'll want to insert records in the matched output not already in the the *cluster_members* table into it under new *cluster_id* values. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC +# MAGIC © 2023 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source). All included or referenced third party libraries are subject to the licenses set forth below. +# MAGIC +# MAGIC | library | description | license | source | +# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------| +# MAGIC | zingg | entity resolution library | GNU Affero General Public License v3.0 | https://github.com/zinggAI/zingg/ | diff --git a/RUNME.py b/RUNME.py index 30539b2..6d51c2e 100644 --- a/RUNME.py +++ b/RUNME.py @@ -32,6 +32,15 @@ # COMMAND ---------- +# MAGIC %sh +# MAGIC rm -rf /dbfs/tmp/solacc/customer_er/jar/ +# MAGIC mkdir -p /dbfs/tmp/solacc/customer_er/jar/ +# MAGIC cd /dbfs/tmp/solacc/customer_er/jar/ +# MAGIC wget https://github.com/zinggAI/zingg/releases/download/v0.3.4/zingg-0.3.4-SNAPSHOT-spark-3.1.2.tar.gz +# MAGIC tar -xvf zingg-0.3.4-SNAPSHOT-spark-3.1.2.tar.gz + +# COMMAND ---------- + job_json = { "timeout_seconds": 28800, "max_concurrent_runs": 1, @@ -44,10 +53,7 @@ "job_cluster_key": "zingg_cluster", "libraries": [], "notebook_task": { - "notebook_path": f"00.0_ Intro & Config", - "base_parameters": { - "holdout days": "90" - } + "notebook_path": f"00_Intro & Config" }, "task_key": "zingg_01", "description": "" @@ -55,10 +61,7 @@ { "job_cluster_key": "zingg_cluster", "notebook_task": { - "notebook_path": f"00.1_ Prepare Data", - "base_parameters": { - "holdout days": "90" - } + "notebook_path": f"01_Prepare Data" }, "task_key": "zingg_02", "depends_on": [ @@ -70,45 +73,34 @@ { "job_cluster_key": "zingg_cluster", "notebook_task": { - "notebook_path": f"00.2_ Prepare Jobs", - "base_parameters": { - "holdout days": "90" - } + "notebook_path": f"02_Initial Workflow" }, "task_key": "zingg_03", "depends_on": [ { "task_key": "zingg_02" } + ], + "libraries": [ + { + "jar": "dbfs:/tmp/solacc/customer_er/jar/zingg-0.3.4-SNAPSHOT/zingg-0.3.4-SNAPSHOT.jar" + } ] }, { "job_cluster_key": "zingg_cluster", "notebook_task": { - "notebook_path": f"01_ Initial Workflow", - "base_parameters": { - "holdout days": "90" - } + "notebook_path": f"03_Incremental Workflow" }, "task_key": "zingg_04", "depends_on": [ { "task_key": "zingg_03" } - ] - }, - { - "job_cluster_key": "zingg_cluster", - "notebook_task": { - "notebook_path": f"02_ Incremental Workflow", - "base_parameters": { - "holdout days": "90" - } - }, - "task_key": "zingg_05", - "depends_on": [ + ], + "libraries": [ { - "task_key": "zingg_04" + "jar": "dbfs:/tmp/solacc/customer_er/jar/zingg-0.3.4-SNAPSHOT/zingg-0.3.4-SNAPSHOT.jar" } ] } diff --git a/config/setup.py b/config/setup.py index a0e4279..cdad790 100644 --- a/config/setup.py +++ b/config/setup.py @@ -1,41 +1,2 @@ # Databricks notebook source -# MAGIC %fs -# MAGIC rm -r /tmp/solacc/customer_er/jar/ - -# COMMAND ---------- - -# MAGIC %fs -# MAGIC mkdirs /tmp/solacc/customer_er/jar/ - -# COMMAND ---------- - -# MAGIC %fs -# MAGIC rm -r /tmp/ncvoters/downloads/ - -# COMMAND ---------- - -# MAGIC %fs mkdirs /tmp/ncvoters/downloads/ - -# COMMAND ---------- - -# MAGIC %sh -# MAGIC cd /dbfs/tmp/solacc/customer_er/jar/ -# MAGIC wget https://github.com/zinggAI/zingg/releases/download/v0.3.3/zingg-0.3.3-SNAPSHOT-spark-3.1.2.tar.gz -# MAGIC tar -xvf zingg-0.3.3-SNAPSHOT-spark-3.1.2.tar.gz - -# COMMAND ---------- - -# MAGIC %sh -e -# MAGIC rm -r /tmp/downloads -# MAGIC mkdir /tmp/downloads -# MAGIC cd /tmp/downloads -# MAGIC wget https://www.informatik.uni-leipzig.de/~saeedi/5Party-ocp20.tar.gz -# MAGIC tar -xvf 5Party-ocp20.tar.gz -# MAGIC rm -r /dbfs/tmp/ncvoters/downloads/ -# MAGIC mkdir /dbfs/tmp/ncvoters/downloads/ -# MAGIC cp -a /tmp/downloads/5Party-ocp20/. /dbfs/tmp/ncvoters/downloads/ -# MAGIC ls /dbfs/tmp/ncvoters/downloads/ - -# COMMAND ---------- -