Creating a data warehouse with Apache Pinot and Debezium 2

In the previous post we saw how we can stream data from Debezium into Pinot. In the field-level transformations that we applied, we stored the primary key along with the entire “after” JSON generated by Debezium. In this post we’ll look at how to extract fields from the “after” payload so that the data looks more tabular.

Getting Started

The JSON stored in the source column contains the user agent of the device used to to place the order. Let’s say we’d like to find out the most common user agents. To do this, we’d need to extract the value from the JSON and store it in a new column. This will be a two-step process. First, we’ll update the schema to add a new “user_agent” column. Second, we’ll update the table definition to reflect this change in schema.

Let’s start by updating the schema. We’ll add the following column to the dimensionFieldSpecs.

1
2
3
4
5
{
"name": "user_agent",
"dataType": "STRING",
"notNull": false
}

Notice how we’ve set notNull to false. This allows the column to contain null values in case the user agent is unavailable. Let’s PUT this updated schema to Pinot by executing the following curl command.

1
curl -XPUT -F schemaName=@tables/002-orders/orders_schema.json localhost:9000/schemas/orders | jq .

If we open the query console after updating the schema, we’ll see the following error on the screen. It mentions that the segments are invalid because they were created from an older version of the schema and that to reflect the changes, we’d need to reload the table’s segments. We’ll get to this in a minute.

Next, let’s update the table by adding a new column. We’ll add the following to the transformConfigs. Notice how we’re referencing the source column in the jsonPath transform function.

1
2
3
4
{
"columnName": "user_agent",
"transformFunction": "jsonPath(source, '$.user_agent')"
}

Let’s PUT this updated config.

1
curl -XPUT -H 'Content-Type: application/json' -d @tables/002-orders/orders_table.json localhost:9000/tables/orders | jq .

To ensure that the column is extracted even as more rows are generated by Debezium, we’ll rerun the Python script which added dummy data.

1
python faker/data.py

We’ll now reload the segments to let the changes take effect by executing the following command.

1
curl -XPOST localhost:9000/segments/orders/reload | jq .

We can now reload the query console to see the changes take effect.

Finally, a quick callout on schema evolution in Pinot. In Pinot schemas and tables, columns may only be added and not removed. This ensures backward compatibility.

That’s it. That’s how we can extract fields out of the source payload and into a column of their own.

Creating a data warehouse with Apache Pinot and Debezium

I’d previously written about creating a data warehouse using Debezium and Pinot. In the design of that system, I’d used a combination of Airflow and HDFS to overcome the limited SQL capabilities of Pinot. However, the newer version of Pinot has better SQL capabilities and, therefore, the design of the system can be simplified. In this post we’ll look at how to create a streaming data warehouse using just Pinot and Debezium.

Before We Begin

My setup is simple. It contains Docker containers for Pinot, Debezium, and Postgres. In a nutshell, we’ll stream data from Postgres into Pinot using Debezium. The newer version of Debezium makes it easy to run snapshots of tables using signals. This is handy when we need to run backfills. However, we’ll design a system where the need for backfills is reduced. Throughout the remainder of this blog post, we’ll look at how to ingest a table of orders placed by customers into Pinot.

Getting Started

We’ll begin by bringing up the Docker containers. I have a small script which creates a bunch of tables in the database and populates them with data. While there are many tables, the one we are interested is “orders”. Let’s run the script.

1
python faker/data.py

A row from the table looks as follows.

1
2
3
| id | user_id | address_id | cafe_id | partner_id | created_at                 | updated_at | deleted_at | status | user_agent                                                                                                                                                       |
|----|---------|------------|---------|------------|----------------------------|------------|------------|--------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 | 2 | 2 | 34 | | 2024-12-17 19:03:23.018782 | | | 0 | Mozilla/5.0 (iPhone; CPU iPhone OS 17_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 EdgiOS/121.2277.107 Mobile/15E148 Safari/605.1.15 |

Each row of the table contains foreign keys to other tables along with the user agent of the device used to place the order.

Next, we’ll stream this data into Kafka using Debezium. The configuration of the connector is as follows.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"name": "order",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db",
"database.user": "postgres",
"database.password": "my-secret-pw",
"database.dbname": "postgres",
"database.server.name": "postgres",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"time.precision.mode": "connect",
"tombstones.on.delete": "false",
"snapshot.mode": "no_data",
"heartbeat.interval.ms": "1000",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"event.processing.failure.handling.mode": "skip",
"producer.override.compression.type": "snappy",
"signal.data.collection": "debezium.signal",
"topic.prefix": "microservice"
}
}

There are a few properties to note here. First, snapshot.mode property is set to no_data which means thet Debezium will not run snapshots on the tables it captures and will only stream the upcoming inserts, updates, and deletes. Second, we’ve set signal.data.collection to debezium.signal which means that the table used to pass signals to Debezium is called signal and is located in the debezium schema. Finally, there is no explicit include or exclude list. This means Debezium will stream all tables from all schemas.

Before we send this configuration to Debezium, we’ll create the signalling table.

1
2
3
4
5
6
7
CREATE SCHEMA debezium;

CREATE TABLE debezium.signal (
id TEXT,
type TEXT,
data TEXT
);

We’ll send this configuration to Debezium to spawn a connector which will stream these changes.

1
curl -H "Content-Type: application/json" -XPOST -d @tables/001-order/debezium.json localhost:8083/connectors | jq .

Now that we can stream this data into Kafka, we’ll configure Pinot to ingest this data. We’ll begin by creating a schema which defines a table which stores the primary key of the table and the entire JSON payload generated by Debezium. The schema looks as follows.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"schemaName": "orders",
"dimensionFieldSpecs": [
{
"name": "id",
"dataType": "STRING"
},
{
"name": "source",
"dataType": "JSON"
}
],
"dateTimeFieldSpecs": [
{
"name": "created_at",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
],
"primaryKeyColumns": [
"id"
],
"metricFieldSpecs": []
}

Next, we’ll create a table which stores the data from Kafka. For brevity, only the ingestion config is shown below along with field-level transformations.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"ingestionConfig": {
"transformConfigs": [
{
"columnName": "id",
"transformFunction": "jsonPath(payload, '$.after.id')"
},
{
"columnName": "source",
"transformFunction": "jsonPath(payload, '$.after')"
},
{
"columnName": "created_at",
"transformFunction": "jsonPath(payload, '$.after.created_at')"
}
]
}
}

In the table above, we store the entire JSON payload generated by Kafka. As we’ll see, we can extract columns from this payload to create a tabular view of the data.

We’ll send these configurations to Pinot to create the schema and the table using the following curl commands.

1
2
curl -F schemaName=@tables/001-order/order_schema.json localhost:9000/schemas | jq .
curl -XPOST -H 'Content-Type: application/json' -d @tables/001-order/order_table.json localhost:9000/tables | jq .

Finally, to get Debezium to to run a snapshot, we’ll enqueue a signal in the signalling table by executing the following SQL.

1
2
3
4
5
6
INSERT INTO debezium.signal 
VALUES (
gen_random_uuid()::TEXT,
'execute-snapshot',
'{"data-collections": [".*\\.orders"], "type": "incremental"}'
);

In the query above, we’ve specified a regular expression which will make Debezium stream orders table from all schemas. This allows us to create a single Kafka stream containing data from all the tables. Once Debezium receives this signal, it will begin taking an incremental snapshot of the tables and streaming rows to Kafka.

We can now view this data by querying Pinot.

That’s it. That’s how we can stream data from a database into Pinot using Debezium. In the coming post we will see how to create tabular views of this data by extracting columns.

Property-based testing with Hypothesis

I’d previously written about property-based testing in Clojure. In this blog post I’d like to talk about how we can do the same in Python using the Hypothesis library. We’ll begin with a quick recap of what property-based testing is, and then dive head-first into writing some tests.

What is property-based testing?

Before we get into property-based testing, let’s talk about how we usually write tests. We provide a known input to the code under test, capture the resulting output, and write an assertion to check that it matches our expectations. This technique of writing tests is called example-based testing since we provide examples of inputs that the code has to work with.

While this technique works, there are some drawbacks to it. Example-based tests take longer to write since we need to come up with examples ourselves. Also, it’s possible to miss out on corner cases.

In contrast, property-based testing allows us to specify the properties of the code and test that they hold true under a wide range of inputs. For example, if we have a function f that takes an integer and performs some computation on it, a property-based test would test it for positive integers, negative integers, very large integers, and so on. These inputs are generated for us by the testing framework and we simply need to specify what kind of inputs we’re looking for.

Having briefly discussed what property-based testing is, let’s write some code.

Writing property-based tests

Testing a pure function

Let’s start with a function which computes the n’th Fibonacci number.

1
2
3
4
5
6
7
8
9
@functools.cache
def fibonacci(n: int) -> int:
"""
Computes the nth number in the Fibonacci sequence.
"""
if n <= 1:
return n

return fibonacci(n - 1) + fibonacci(n - 2)

The sequence of numbers goes 0, 1, 1, 2, 3, etc. We can see that all of these numbers are greater than or equal to zero. Let’s write a property-based test to formalize this.

1
2
3
4
5
6
7
from hypothesis import given, strategies as st
from functions import fibonacci


@given(st.integers())
def test_fibonacci(n):
assert fibonacci(n) >= 0

In the code snippet above, we’ve wrapped our test using the @given decorator. This makes it a property-based test. The argument to the decorator is a search strategy. A search strategy generates random data of a given type for us. Here we’ve specified that we need integers. We can now run the test using pytest as follows.

1
PYTHONPATH=. pytest .

The test fails with the following summary.

1
FAILED test/functions/test_fibonacci.py::test_fibonacci - ExceptionGroup: Hypothesis found 2 distinct failures. (2 sub-exceptions)

When looking at the logs, we find that the first failure is because the maximum recursion depth is reached when the value of n is large.

1
2
3
n = 453
... lines omitted ...
RecursionError: maximum recursion depth exceeded

The second failure is because the function returned a negative integer when the value of n is negative; in this case it is n=-1. This violates our assertion that the numbers in the Fibonacci sequence are non-negative.

1
2
3
4
5
6
7
8
9
10
+---------------- 2 ----------------
| Traceback (most recent call last):
| File "/Users/fasih/Personal/pytesting/test/functions/test_fibonacci.py", line 8, in test_fibonacci
| assert fibonacci(n) >= 0
| AssertionError: assert -1 >= 0
| + where -1 = fibonacci(-1)
| Falsifying example: test_fibonacci(
| n=-1,
| )
+------------------------------------

To remedy the two failures above, we’ll add an assertion at the top of the function which will ensure that the input n is in some specified range. The updated function is given below.

1
2
3
4
5
6
7
8
9
10
11
@functools.cache
def fibonacci(n: int) -> int:
"""
Computes the nth number in the Fibonacci sequence.
"""
assert 0 <= n <= 300, f"n must be between 0 and 300; {n} was passed."

if n <= 1:
return n

return fibonacci(n - 1) + fibonacci(n - 2)

We’ll update our test cases to reflect this change in code. The first test case checks the function when n is between 0 and 300.

1
2
3
@given(st.integers(min_value=0, max_value=300))
def test_fibonacci(n):
assert fibonacci(n) >= 0

The second case checks when n is large. In this case we check that the function raises an AssertionError.

1
2
3
4
@given(st.integers(min_value=5000))
def test_fibonacci_large_n(n):
with pytest.raises(AssertionError):
fibonacci(n)

Finally, we’ll check the function with negative values of n. Similar to the previous test case, we’ll check that the function raises an AssertionError.

1
2
3
4
@given(st.integers(min_value=-2, max_value=-1))
def test_fibonacci_negative(n):
with pytest.raises(AssertionError):
fibonacci(n)

Testing persistent data

We’ll now use Hypothesis to generate data that we’d like to persist in the database. The snippet below shows a Person model with fields to store name and date of birth. The age property returns the current age of the person in years, and the MAX_AGE variable indicates that the maximum age we’d like to allow in the system is 120 years.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Person(peewee.Model):

MAX_AGE = 120

class Meta:
database = db

id = peewee.BigAutoField(primary_key=True, null=False)
name = peewee.CharField(null=False, max_length=120)
dob = peewee.DateField(null=False)

@property
def age(self) -> int:
return (datetime.date.today()).year - self.dob.year

We’ll add a helper function to create Person instances as follows.

1
2
3
4
5
6
7
8
9
def create(name: str, dob: datetime.date) -> Person:
"""
Create a new person instance with the given name and date of birth.
:param name: Name of the person.
:param dob: Date of birth of the person.
:return: A Person instance.
"""
assert name, f"name cannot by empty"
return Person.create(name=name, dob=dob)

Like we did for the function which computes Fibonacci numbers, we’ll add a test case to formalize this expectation. This time we’re generating random names and dates of birth and passing them to the helper function.

1
2
3
4
5
6
7
@given(
text=st.text(min_size=1),
dob=st.dates(),
)
def test_create_person(text, dob, create_tables):
person = pr.create(name=text, dob=dob)
assert 0 <= person.age <= Person.MAX_AGE

I’m persisting this data in a Postgres table and the create_tables fixture ensures that the tables are created before the test runs.

Upon running the test we find that it fails for two cases. The first case is when the input string contains a NULL character \x00. Postgres tables do not allow strings will NULL characters in them.

1
2
3
4
5
6
ValueError: A string literal cannot contain NUL (0x00) characters.
Falsifying example: test_create_person(
create_tables=None,
text='\x00',
dob=datetime.date(2000, 1, 1), # or any other generated value
)

The second case is when the date of birth is in the future.

1
2
3
4
5
6
7
AssertionError: assert 0 <= -1
+ where -1 = <Person: 5375>.age
Falsifying example: test_create_person(
create_tables=None,
text='0', # or any other generated value
dob=datetime.date(2025, 1, 1),
)

To remedy the first failure, we’ll have to sanitize the name input string that gets stored in the table. We’ll create a helper function which removes any NULL characters from the string. This will be called before name gets saved in the table.

1
2
def sanitize(s: str) -> str:
return s.replace("\x00", "").strip()

To remedy the second failure, we’ll add an assertion ensuring that the age is less than or equal to 120. The updated create function is shown below.

1
2
3
4
5
6
7
8
9
10
11
12
13
def create(name: str, dob: datetime.date) -> Person:
"""
Create a new person instance with the given name and date of birth.
:param name: Name of the person.
:param dob: Date of birth of the person.
:return: A Person instance.
"""
name = sanitize(name)

assert name, f"name cannot by empty"
assert 0 <= (datetime.date.today().year - dob.year) <= Person.MAX_AGE

return Person.create(name=name, dob=dob)

We’ll update the test cases to reflect these changes. Let’s start by creating two variables that will hold the minimum and maximum dates allowed.

1
2
MIN_DATE = datetime.date.today() - datetime.timedelta(days=Person.MAX_AGE * 365)
MAX_DATE = datetime.date.today()

Next, we’ll add a test to ensure that we raise an AssertionError when the string contains only NULL characters.

1
2
3
4
@given(text=st.text(alphabet=["\x00"]))
def test_create_person_null_text(text, create_tables):
with pytest.raises(AssertionError):
pr.create(name=text, dob=MIN_DATE)

Next, we’ll add a test to ensure that dates cannot be in the future.

1
2
3
4
5
6
7
@given(
text=st.text(min_size=1),
dob=st.dates(min_value=MAX_DATE + datetime.timedelta(days=365)),
)
def test_create_person_future_dob(text, dob, create_tables):
with pytest.raises(AssertionError):
pr.create(name=text, dob=dob)

Similarly, we’ll add a test to ensure that dates cannot be more than 120 years in the past.

1
2
3
4
5
6
7
@given(
text=st.text(min_size=1),
dob=st.dates(max_value=MIN_DATE - datetime.timedelta(days=365)),
)
def test_create_person_past_dob(text, dob, create_tables):
with pytest.raises(AssertionError):
pr.create(name=text, dob=dob)

Finally, we’ll add a test to ensure that in all other cases, the function creates a Person instance as expected.

1
2
3
4
5
6
7
8
9
10
@given(
text=st.text(min_size=5),
dob=st.dates(
min_value=MIN_DATE,
max_value=MAX_DATE,
),
)
def test_create_person(text, dob, create_tables):
person = pr.create(name=text, dob=dob)
assert 0 <= person.age <= Person.MAX_AGE

The tests pass when we rerun them so we can be sure that the function behaves as expected.

Testing a REST API.

Finally, we’ll look at testing a REST API. We’ll create a small Flask app with an endpoint which allows us to create Person instances. The API endpoint is a simple wrapper around the create helper function and returns the created Person instance as a dictionary.

1
2
3
4
5
6
7
8
9
10
@api.route("/person", methods=["POST"])
def create_person():
name = request.json["name"]

dob = request.json["dob"]
dob = parse(dob).date()

person = pr.create(name, dob)

return model_to_dict(person)

We’ll add a test to generate random JSON dictionaries which we’ll pass as the body of the POST request. The test is given below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@given(
json=st.fixed_dictionaries(
{
"name": st.text(min_size=5),
"dob": st.dates(min_value=MIN_DATE, max_value=MAX_DATE),
}
)
)
def test_create_person(json, test_client):
response = test_client.post(
"/api/person",
json=json,
headers={"Content-Type": "application/json"},
)

assert response.status_code == 200

Similar to the tests for create function, we test that the API returns a response successfully when the inputs are proper.

That’s it. That’s how we can leverage Hypothesis to test Python code. You’ll find the code for this post in the Github repository.

Programming Puzzles 4

In the previous post we looked at computing the Fibonacci series both with and without dynamic programming. In this post we’ll look at another example where dynamic programming is applicable. The example is borrowed from ‘Introduction to Algorithms’ by CLRS and implemented in Python. By the end of this post we’ll try to develop an intuition for when dynamic programming applies.

Rod Cutting

The problem we are presented with is the following: given a steel rod, we’d like to find the optimal way to cut it into smaller rods. More formally, we’re presented with a rod of size n inches and a table of prices pi. We’d like to determine the maximum revenue rn that we can obtain by cutting the rod and selling it. If the price pn of the rod of length n is large enough, we may sell the rod without making any cuts.

The table of prices that we’ll work with is given below.

length i 1 2 3 4 5 6 7 8 9 10
price pi 1 5 8 9 10 17 17 20 24 30

Consider a rod of length 4 inches. The maxium revenue we can obtain is 10 by cutting the rod into two parts of length 2 inches each.

Given a rod of n inches, we may sell it uncut or we may sell it by cutting it into smaller pieces. Since we do not know the size of the cuts to make, we will have to consider all possible sizes. Once we make a cut of size from the left end of the rod, we can view the remaining length of the rod of size as an independent instance of the rod cutting problem. In other words, we are solving a smaller instance of the same problem. The equation below shows how we can mathematically formulate the problem.

It states that the revenue rn is the maximum revenue obtained by considering all cuts of size plus the revenue obtained by cutting the remaining rod of size . We can write a recursive function to obtain this value as follows:

1
2
3
4
5
6
7
8
9
10
def rod_cut(p: list[int], n: int) -> int:
if n == 0:
return 0

q = float("-inf")

for i in range(1, n + 1):
q = max(q, p[i] + rod_cut(p, n - i))

return q

We can verify the results by calling the function for a rod of size 4 and passing the table of prices.

1
2
p = [0, 1, 5, 8, 9, 10, 17, 17, 20, 24, 30]
assert 10 == rod_cut(p=p, n=4)

The recursive version, however, does redundant work. Consider the rod of size n = 4. We will have to consider cuts of size . When considering the remaining rod of size 3, we’d consider cuts of size . In both of these cases we recompute, for example, the revenue obtained when the remainder of the rod is of size 2.

We can use dynamic programming to solve this problem by modifying the rod_cut function as follows.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def rod_cut(p: list[int], t: list[int | None], n: int) -> int:
if n == 0:
return 0

if t[n] is not None:
return t[n]

q = float("-inf")

for i in range(1, n + 1):
q = max(q, p[i] + rod_cut(p, t, n - i))

t[n] = q

return q

Notice how we’ve introduced a table t which stores the maximum revenue obtained by cutting a rod of size n. This allows us to reuse previous computations. We can run this function and verify the result.

1
2
3
t = [None] * 11
p = [0, 1, 5, 8, 9, 10, 17, 17, 20, 24, 30]
assert 10 == rod_cut(p, t, 4)

The question that we’re left with is the following: how did we decide that the problem could be solved with dynamic programming? There are two key factors that help us in deciding if dynamic programming can be used. The first is overlapping subproblems and the second is optimal substructure.

For a dynamic programming algorithm to work, the number of subproblems must be small. This means that the recursive algorithm which solves the problem encounters the same subproblems over and over again. When this happens, we say that the problem we’re trying to solve has overlapping subproblems. In the rod cutting problem, when we try to cut a rod of size 4, we consider cuts of size . When we, then, consider the remaining rod of size 3, we consider cuts of size . The smaller problem of optimally cutting the rod of size 2 is encountered again. In other words, it is an overlapping subproblem. Dynamic programming algorithms solve an overlapping subproblem once and store its result in a table so that it can be reused again.

The second factor is optimal substructure. When a problem exhibits optimal substructure, it means that the solution to the problem contains within it the optimal solutions to the subproblems; we build an optimal solution to the problem from optimal solutions to the subproblems. The rod-cutting problem exhibits optimal substructure because the optimal solution to cutting a rod of length involves finding the optimal solution to cutting the remaining rod, if a cut has been made.

Moving on. So far our algorithm has returned the optimal value of the solution. In other words, it returned the maximum revenue that can be obtained by optimaly cutting the rod. We often need to store the choice that led to the optimal solution. In the context of the rod cutting problem, this would be the lengths of the cuts made. We can do this by keeping additional information in a separate table. The following code listing is a modification of the above function with an additional table s to store the value of the optimal cut.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def rod_cut(p: list[int], s: list[int | None], t: list[int | None], n: int) -> int:
if n == 0:
return 0

if t[n] is not None:
return t[n]

q = float("-inf")
j = None

for i in range(1, n + 1):
r = p[i] + rod_cut(p, s, t, n - i)

if r > q:
q = r
j = i

s[n] = j
t[n] = q

return q

In this version of the code, we store the size of the cut being made for a rod of length n in the table s. Once we have this information, we can reconstruct the optimal solution. The function that follows shows how to do that.

1
2
3
4
5
6
7
8
9
def optimal_cuts(s: list[int | None], n: int) -> list[int]:
cuts = []

while n:
cut = s[n]
n = n - s[n]
cuts.append(cut)

return cuts

Finally, we call the function to see the optimal solution. Since we know that the optimal solution for a rod of size 4 is to cut it into two equal halves, we’ll use .

1
2
3
4
5
6
7
n = 4
t = [None] * 11
s = [None] * 11
p = [0, 1, 5, 8, 9, 10, 17, 17, 20, 24, 30]
q = rod_cut(p, s, t, n)

assert [2, 2] == optimal_cuts(s, n)

That’s it. That’s how we can use dynamic programming to optimally cut a rod of size inches.

Programming Puzzles 3

In this post, and hopefully in the next few posts, I’d like to devle into the topic of dynamic programming. The aim is to develop an intuition for when it is applicable by solving a few puzzles. I’ll be referring to the chapter on dynamic programming in ‘Introduction to Algorithms’ by CLRS, and elucidating it in my own words.

Dynamic Programming

The chapter opens with the definition of dynamic programming: it is a technique for solving problems by combining solutions to subproblems. The subproblems may have subsubproblems that are common between them. A dynamic programming algorithm solves these subsubproblems only once and saves the result, thereby avoiding unnecessary work. The term “programming” refers to a tabular method in which the results of subsubproblems are saved in a table and reused when the same subsubproblem is encountered again.

All of this is abstract so let’s look at a concrete example of computing the Fibonacci series.

1
2
def fibonacci(n: int) -> int:
return n if n < 2 else fibonacci(n - 1) + fibonacci(n - 2)

The call graph for fibonacci(4) is given below.

As we can see, we’re computing fibonacci(2) twice. In other words, the subsubproblem of computing fibonacci(2) is shared between fibonacci(4) and fibonacci(3). A dynamic programming algorithm would solve this subsubproblem only once and save the result in a table and reuse it. Let’s see what that looks like.

1
2
3
4
5
6
7
8
9
def fibonacci(n: int) -> int:
T = [0, 1] + ([None] * (n - 2))
return fibonacci_helper(n=n - 1, T=T)


def fibonacci_helper(n: int, T: list[int | None]) -> int:
if T[n] is None:
T[n] = fibonacci_helper(n - 1, T) + fibonacci_helper(n - 2, T)
return T[n]

In the code above, we create a table T which stores the Fibonacci numbers. If an entry exists in the table, we return it immediately. Otherwise, we compute and store it. This recursive approach, with results of subsubproblems stored in a table, is called “top-down with memoziation”; the table is called the “memo”. We begin with the original problem and then proceed to solve it by finding solutions to smaller subproblems. The procedure which computes the solution is said to be “memoized” as it remembers the previous computations.

Another approach is called “bottom-up” in which the solutions to the smaller subproblems are computed first. This depends on the notion that subproblems have “size”. In this approach, a solution to the subproblem is found only when the solutions to its smaller subsubproblems have been found. We can apply this approach when computing the Fibonacci series.

1
2
3
4
5
6
7
def fibonacci(n: int) -> int:
T = [0, 1] + ([None] * (n - 2))

for i in range(2, n):
T[i] = T[i - 1] + T[i - 2]

return T[n - 1]

As we can see, the larger numbers in the Fibonacci series are computed only when the smaller numbers have been computed.

This was a small example of how dynamic programming algorithms work. They are applied to problems where subproblems share subsubproblems. The solutions to these subsubproblems are stored in a table and reused when they are encountered again. This enables the algorithm to work more efficiently as it avoids the rework of solving the subsubproblems.

In the next post we’ll look at another example of dynamic programming that’s presented in the book and implement it in Python to further our understanding of the subject.

Low Level Design 1 - Todo List

I recently came across the “machine coding” round, also called “low level design” round, of the tech interview process. In this round of the interview, you’re presented with a real-life problem statement, and are expected to create a functioning system in under 90 minutes. The intent is to see how you structure your code, follow best practices, apply design patterns, and so on. In the spirit of practising for this round, this blog post looks at how to create a simple todo list application in Python.

Problem Statement

Let’s start with the problem statement: create a todo list. The very first requirement that comes to mind when creating a todo list is the ability to add tasks to the list. For example, getting groceries. It’s also possible to add subtasks to the list. For example, getting apples and oranges are subtasks when getting groceries. The todo list may be shared with family and friends. They receive notifications when items are added or removed from the list, and have the ability to add or remove items. Only the one who created the list and the ones with whom the list has been shared may modify the list. The todo list may be displayed in the console or in the webpage. Finally, the todo list may be persisted in the database.

With these requirements in mind, we’ll begin by looking at the classes and methods that comprise our system, the design patterns we can apply, and finally go ahead and create the system. Please note that since the implementation is going to be in Python, it may seem like I’m trying to do Java in Python by creating too many classes. For example, Python has the concept of “factory function” rather than “factory method” since functions are first-class citizens of the language. You’ve been warned.

Design Patterns

Let’s begin with a quick refresher on design patterns. In a nutshell, design patterns allow us to write reusable, maintainable, and testable code by providing patterns for solving common software engineering problems. These patterns can be categorized into three types: behavioral, creational, and structural. Behavioral patterns describe how objects interact with each other. Creational patterns describe how objects can be created. Finally, structural patterns describe how objects can be composed to create larger systems.

In the context of the todo list, we’ll see the following patterns being used. We’ll take a quick look at these patterns and then map them onto the classes we’ll create.

Composite Pattern

Composite pattern allows us to represent hierarchical, tree-like, data structures. The intention of this pattern is to ensure that composite objects (the non-leaf nodes), and individual objects (the leaf nodes) are treated the same. The UML diagram is given below.

As we can see, the non-leaf Composite class is itself a Component and has a collection of Components. These may be Composites or Leafs. In our todo list example, we can model the the tasks and subtasks using this pattern. We begin by creating a top-level abstract class TodoItem which corresponds to top-level Component. The subtask can be represented by a SubTask class and is the Leaf. Finally, we’ll create a Task class which is the Composite.

Adapter Pattern

The adapter pattern allows disparate components to work together seamlessly by reconciling differences between interfaces. It does so by providing a familiar interface that the client can talk to, and in turn using the interface of the underlying component. In other words, adapter pattern translates from one interface to another; the adapter calls specific methods on the adaptee when it is invoked. The UML diagram is given below.

In our todo list example, we can model connection to database using a DatabaseAdapter. This allows us to expose a familiar set of methods to perform operations like saving the todo list to the database while abstracting the nitty-gritties of the actual database. This class can then be subclassed to provide functionality for specific databases. For example, a MySQLDatabaseAdapter for using MySQL as the database.

The advantage of using this approach is that it allows seamless migration between databases; we may start small with SQLiteDatabaseAdapter and then switch to MySQLDatabaseAdapter by simply changing which class we use. Since the interface exposed is the same, there will be minimal refactoring in the client code to make this transition.

Strategy Pattern

The strategy pattern allows defining a family of algorithms, encapsulating each one, and making them interchangeable. This means we can define and pass algorithms around. There is a base class Strategy that defines the interface for the algorithm, and one or more concretions that provide the actual implementation. The UML diagram is given below.

In our todo list example, we can design the rendering of the list as a strategy. For example, we can have a markdown strategy, an HTML strategy, etc. These different strategies produce different representations of the todo list.

Proxy Pattern

The proxy pattern encapsulates an object and allows itself to be used in place of the original object. Since the proxy is used in place of the original object, it enables use cases such as controlling access. There can be methods on the proxy which require authentication before the operation is performed on the original object.

In our todolist example, we can create a proxy which ensures that only the owner of the list and those with whom it is shared can make changes to it. To do this, we’ll create an interface called TodoListProtocol which defines methods which the todo list must implement. This will be used by the TodoList class, which represents the actual todo list, and by the TodoListProxy which provides access control for the todo list. The proxy will require that that when a method call is made, the user making the call also be passed as an argument. Only if the user is the owner of the list or is one of the collaborators will the operation be performed.

Observer Pattern

The observer pattern allows objects, the observers, to be notified when the state of another object, the observable, changes.

In our todolist example, we’ll create observers which get notified when the state of the todo list changes. We’ll create an abstract class called Observable which will be inherited by the TodoList class. This makes the class “observable”. We’ll create another class TodoListObserver which inherits the Observer class. This makes it the “observer” and it’ll be notified when changes happen to the todo list.

Now that we’ve looked at the design patterns we’ll use, let’s look at the code.

Code

User

We’ll begin with the simplest class first, the User class which represents a user of the system. The owners and collaborators of the list will be represented by this class.

1
2
3
@dc.dataclass(frozen=True)
class User:
email: str

Observer Pattern

Next, let’s add classes to create the observer pattern. We’ll begin by creating the abstract observable class. It’ll store the list of observers it has to notify, and require the subclasses to provide an implementation to return their state. Since the observers hold a reference to the observable they are observing, the method which returns the state will be used to find what’s changed.

The implementation for the Observable class is shown below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@dc.dataclass(kw_only=True)
class Observable(abc.ABC):
observers: list["Observer"] = dc.field(default_factory=list)

def notify(self):
for observer in self.observers:
observer.update()

def add_observer(self, observer: "Observer"):
self.observers.append(observer)

@property
@abc.abstractmethod
def state(self) -> typing.Any: ...

Similarly, we’ll implement the Observer class. It requires its subclasses to provide an implementation for the update method which will be called by the Observable it’s observing.

1
2
3
4
5
6
@dc.dataclass(kw_only=True)
class Observer(abc.ABC):
observable: "Observable"

@abc.abstractmethod
def update(self): ...

Finally, we’ll create the concrete observer TodoListObserver. It notifies the user by sending them an email when the list is updated. For simplicity, however, we’ll just log a message to the console.

1
2
3
4
5
6
7
8
class TodoListObserver(Observer):

def __init__(self, user: "User"):
self.user = user

def update(self):
state = self.observable.state
print(f"Notify {self.user.email} about changed state")

Composite Pattern

Next we’ll create the composite pattern. We’ll create a base class TodoItem which will be inherited by both the Task and SubTask class. It has basic fields that are required to define a task like an ID, a title, etc. and a couple of base methods to mark the item as complete or check if it’s complete.

1
2
3
4
5
6
7
8
9
10
11
@dc.dataclass(kw_only=True)
class TodoItem(abc.ABC):
id: uuid.UUID = dc.field(default_factory=uuid.uuid4)
title: str
completed: bool = dc.field(default=False)

@abc.abstractmethod
def is_complete(self) -> bool: ...

@abc.abstractmethod
def complete(self) -> None: ...

The first of the subclasses is the Task class which we’ll implement next. A task is considered complete when it’s been marked completed and are all of its subtasks. Similarly, marking a task complete marks all the subtasks as complete.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@dc.dataclass(kw_only=True)
class Task(TodoItem):
subtasks: list["SubTask"] = dc.field(default_factory=list)

def is_complete(self) -> bool:
return self.completed and all(subtask.completed for subtask in self.subtasks)

def complete(self):
self.completed = True

for subtask in self.subtasks:
subtask.complete()

assert self.is_complete() is True

To complete the composite, we’ll add the SubTask class.

1
2
3
4
5
6
7
8
@dc.dataclass(kw_only=True)
class SubTask(TodoItem):

def is_complete(self) -> bool:
return self.completed

def complete(self):
self.completed = True

If we look at the TodoItem, Task, SubTask classes, we can see the hierarchical structure where each Task, a non-leaf component, may have zero or more SubTasks, leaf components. Since both the Task and SubTask are instances of TodoItem, they have the same interface and can be used interchangeably.

Adapter Pattern

Next we’ll create a simple database adapter with a single method to save the todo list. There is only one method for the sake of simplicity but it’s easy to see how there can be more of these methods. We’ll start with a protocol called DatabaseAdapter. Think of a Python protocol to be similar to a Java interface.

1
2
3
class DatabaseAdapter(Protocol):

def save_list(self, todolist: "TodoList"): ...

Next we’ll create two concrete classes which implement this protocol. The first class creates an adapter for MySQL and another for SQLite. Both of these classes take an instance of their specific database and use it to persist the todo list. Since each database instance may have its own set of methods to save data, an adapter provides a familiar interface that can be used elsewhere in the code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class MySQLAdapter():

def __init__(self, db):
self.db = db

def save_list(self, todolist: "TodoList"): ...


class SQLiteAdapter():

def __init__(self, db):
self.db = db

def save_list(self, todolist: "TodoList"): ...

Strategy Pattern

Next, we’ll create strategies to render the todo list. We’ll start by creating a protocol called RenderingStratgy with a single method called render which returns a string representation of the todo list.

1
2
3
class RenderingStrategy(Protocol):

def render(self, todolist: "TodoList") -> str: ...

We’ll add a concrete strategy called TableRenderingStrategy which displays the tasks and subtasks in tabular format.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TableRenderingStrategy():

def render(self, todolist: "TodoList") -> str:
headers = ["Title", "Status"]
tasks_and_subtasks = []

for task in todolist.tasks:
tasks_and_subtasks.append([task.title, task.is_complete()])

for subtask in task.subtasks:
tasks_and_subtasks.append(
["-> " + subtask.title, subtask.is_complete()]
)

return tabulate(tasks_and_subtasks, headers=headers)

Proxy Pattern

To create the proxy pattern, we’ll create the protocol, TodoListProtocol, for both the todo list and the proxy. Let’s start with the protocol which defines the methods that are common to both the todo list and the proxy. As we can see, there’s methods to mark the list as complete, to search for tasks, to render the list, and so on.

1
2
3
4
5
6
7
8
9
10
11
class TodoListProtocol(Protocol):

def complete(self, *args, **kwargs): ...

def search(self, *args, **kwargs) -> list[Task]: ...

def add(self, *args, **kwargs): ...

def save(self, *args, **kwargs): ...

def render(self, *args, **kwargs) -> str: ...

Next we’ll add the implementation for the TodoList which implemenets this protocol.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@dc.dataclass(kw_only=True)
class TodoList(Observable):
id: uuid.UUID = dc.field(default_factory=uuid.uuid4)
title: str
tasks: list[Task] = dc.field(default_factory=list)
owner: User
collaborators: set[User] = dc.field(default_factory=list)

def complete(self, *args, **kwargs):
for task in self.tasks:
task.complete()

def search(self, title: str, *args, **kwargs) -> list[Task]:
return [task for task in self.tasks if title.lower() in task.title.lower()]

def add(self, task: Task, *args, **kwargs):
self.tasks.append(task)
self.notify()

def save(self, adapter: DatabaseAdapter, *args, **kwargs):
adapter.save_list(todolist=self)

def render(self, strategy: RenderingStrategy, *args, **kwargs) -> str:
return strategy.render(self)

@property
def state(self) -> list[Task]:
return copy.copy(self.tasks)

Notice how it implements both TodoListProtocol and Observable. The TodoListProtocol is implemented by providing an implementation for all its methods even when there is no explicit declaration that the protocol has been implemented. In the add method, we call notify which updates all the observers for this list.

Finally, we’ll add the proxy for TodoList. The proxy authenticates each call to the underlying TodoList by checking whether the user trying to access the list is the owner or a collaborator.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@dc.dataclass(kw_only=True)
class TodoListProxy():
todolist: TodoList

def complete(self, user: User, *args, **kwargs):
self.raise_if_not_authenticated(user=user)
self.todolist.complete()

def search(self, title: str, user: User, *args, **kwargs) -> list[Task]:
self.raise_if_not_authenticated(user=user)
return self.todolist.search(title=title)

def add(self, task: Task, user: User, *args, **kwargs):
self.raise_if_not_authenticated(user=user)
self.todolist.add(task=task)

def save(self, adapter: DatabaseAdapter, user: User, *args, **kwargs):
self.raise_if_not_authenticated(user=user)
self.todolist.save(adapter=adapter)

def render(self, strategy: RenderingStrategy, user: User, *args, **kwargs) -> str:
self.raise_if_not_authenticated(user=user)
return self.todolist.render(strategy=strategy)

def raise_if_not_authenticated(self, user: User):
if not self.is_authenticated_user(user=user):
raise Exception(f"User {user.email} is not authenticated.")

def is_authenticated_user(self, user: User):
return (self.todolist.owner == user) or (user in self.todolist.collaborators)

Running the Code

Let’s wire the pieces together and run them. We’ll create an adapter, a rendering strategy, and an observer. Since we’ll use dependency injection, we’ll pass these obhects to the appropriate methods.

1
2
3
adapter: DatabaseAdapter = MySQLAdapter(db={})
strategy: RenderingStrategy = TableRenderingStrategy()
observer: Observer = TodoListObserver(user=User(email="jane.doe@gmail.com"))

Next, let’s create the todo list itself.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
todolist = TodoList(
title="...",
owner=User("john.doe@gmail.com"),
collaborators={
User("jane.doe@gmail.com"),
},
tasks=[
Task(
title="Get Groceries",
subtasks=[
SubTask(title="Get Apples"),
SubTask(title="Get Bananas"),
],
),
Task(
title="Do laundry",
completed=False,
),
],
observers=[
observer,
],
)

Notice we’ve added tasks and subtasks to the list. We’ve also added a collaborator and an observer to the list. We’ll finish adding the observer by updating its observable property. This will allow it to fetch the state from the list when it gets updated.

1
observer.observable = todolist

Next, we’ll create a proxy for the list so that we can authenticate the calls.

1
proxy = TodoListProxy(todolist=todolist)

We can now make method calls to see the code in action. Let’s begin by adding a task.

1
2
3
4
proxy.add(
task=Task(title="Walk the dog"),
user=User("john.doe@gmail.com"),
)

This produces the following output in the console. Since John Doe added an item to the list, Jane Doe will be notified of the change.

1
Notify jane.doe@gmail.com about changed state

Next, we’ll render the list.

1
2
3
4
5
6
print(
proxy.render(
strategy=strategy,
user=User("jane.doe@gmail.com"),
)
)

The list is rendered as follows. The subtasks are marked with an arrow to indicate a level of nesting.

1
2
3
4
5
6
7
Title           Status
-------------- --------
Get Groceries False
-> Get Apples False
-> Get Bananas False
Do laundry False
Walk the dog False

Finally, we’ll make an unauthenticated call to see proxy in action.

1
2
3
4
proxy.add(
task=Task(title="Walk the dog"),
user=User("..."),
)

This raises an exception saying that the user is not authenticated.

That’s it. That’s how to create a todo list.

Detecting disguised email addresses in a corpus of text

I’d recently written about an experimental library to detect PII. When discussing it with an acquaintance of mine, I was told that PII can also be disguised. For example, a corpus of text like a review or a comment can contain email address in the form “johndoeatgmaildotcom”. This led me to update the library so that emails like these can also be flagged. In a nutshell, I had to update the regex which was used to find the email.

Example

This is best explained with a few examples. In all of the examples, we begin with a proper email and disguise it one step at a time.

1
2
3
4
5
6
7
8
9
column = Column(name="comment")
detector = ColumnValueRegexDetector()

assert detector.detect(column=column, sample="john.doe@provider.com") == Email()
assert detector.detect(column=column, sample="johndotdoe@provider.com") == Email()
assert detector.detect(column=column, sample="johndotdoeatproviderdotcom") == Email()
assert detector.detect(column=column, sample="john.doe@provider.co.uk") == Email()
assert detector.detect(column=column, sample="johndotdoeatproviderdotcodotuk") == Email()
assert detector.detect(column=column, sample="myemailis:john.doeatproviderdotcom") == Email()

All of these assertions pass and the regex detector is able to flag all of these examples as email.

An experiemental library to detect PII

I recently created an experimental library, detectpii, to detect PII data in relational databases. In this post we’ll take a look at the rationale behind the library and it’s architecture.

Rationale

A common requirement in many software systems is to store PII information. For example, a food ordering system may store the user’s name, address, phone number, and email. This information may also be replicated into the data warehouse. As a matter of good data governance, you may want to restrict access to such information. Many data warehouses allow applying a masking policy that makes it easier to redact such values. However, you’d have to specify which columns to apply this policy to. detectpii makes it easier to identify such tables and columns.

My journey into creating this library began by looking for open-source projects that would help identify such information. After some research, I did find a few projects that do this. The first project is piicatcher. It allows comparing the column names of tables against regular expressions that represent common PII column names like user_name. The second project is CommonRegex which allows comparing column values against regular expression patterns like emails, IP addresses, etc.

detectpii combines these two libraries to allow finding column names and values that may potentially contain PII. Out of the box, the library allows scanning column names and a subset of its values for potentially PII information.

Architecure

At the heart of the library is the PiiDetectionPipeline. A pipeline consists of a Catalog, which represents the database we’d like to scan, and a number of Scanners, which perform the actual scan. The library ships with two scanners - the MetadataScanner and the DataScanner. The first compares column names of the tables in the catalog against known patterns for the ones which store PII information. The second compares the value of each column of the table by retrieving a subset of the rows and comparing them against patterns for PII. The result of the scan is a list of column names that may potentially be PII.

The design of the library is extensible and more scanners can be added. For example, a scanner to use a proprietary machine learning algorithm instead of regular expression match.

Usage

To perform a scan, we create a pipeline and pass it a catalog and a list of scanners. To inititate the scan, we call the scan method on the pipeline to get back a list of PII columns.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from detectpii.catalog import PostgresCatalog
from detectpii.pipeline import PiiDetectionPipeline
from detectpii.scanner import DataScanner, MetadataScanner

# -- Create a catalog to connect to a database / warehouse
pg_catalog = PostgresCatalog(
host="localhost",
user="postgres",
password="my-secret-pw",
database="postgres",
port=5432,
schema="public"
)

# -- Create a pipeline to detect PII in the tables
pipeline = PiiDetectionPipeline(
catalog=pg_catalog,
scanners=[
MetadataScanner(),
DataScanner(percentage=20, times=2,),
]
)

# -- Scan for PII columns.
pii_columns = pipeline.scan()

That’s it. That’s how to use the library to detect PII columns in tables.

A question on algebraic manipulations

In the exercises that follow the chapter on algebraic manipulations, there is a question that pertains to expressing an integer as the sum of two other integers squared. We are then asked to find expressions for the multiples of , namely and . In this post, we’ll take a look at the solution provided by the authors for the first half of the question, , and then come up with a method of our own to solve the second half, .

Question

If is an integer that can be expressed as the sum of two integer squares, show that both and can also be expressed as the sum of two integer squares.

Solution

From the question, since it is the sum of two integer squares. This means . We need to find two integers such that when their squares are summed, we end with . From the solution, these are the numbers (a + b) and (a - b) because when they are squared and summed, we get . This is the result of . Go ahead and expand them to verify the result.

What do we deduce from this? We find that both the expressions contributed an and a . These were added together to get the final result. How do we use this to get ? Notice that we’re squaring the integers. This means that, for example, one of them would have to contribute an and the other would have to contribute a ; similar logic applies for .

This leaves us with two pairs of numbers — and . Let’s square and sum both of these numbers one-by-one.

What integers would we need for ?

Setting up a data catalog with DataHub

In a previous post we’d seen how to create a realtime data platform with Pinot, Trino, Airflow, and Debezium. In this post we’ll see how to setup a data catalog using DataHub. A data catalog, as the name suggests, is an inventory of the data within the organization. Data catalogs make it easy to find the data within the organisation like tables, data sets, reports, etc.

Before we begin

My setup consists of Docker containers required to run DataHub. While DataHub provides features like data lineage, column assertions, and much more, we will look at three of the simpler featuers. One, we’ll look at creating a glossary of the terms that will be used frequently in the organization. Two, we’ll catalog the datasets and views that we saw in the previous post. Three, we’ll create an inventory of dashboards and reports created for various departments within the organisation.

The rationale for this as follows. Imagine a day in the life of a business analyst. Their responsibilities include creating reports and dashboards for various departments. For example, the marketing team may want to see an “orders by day” dashboard so that they can correlate the effects of advertising campaigns with an uptick in the volume of orders. Similarly, the product team may want a report of which features are being used by the users. The requests of both of these teams will be served by the business analyts using the data that’s been brought into the data platform. While they create these reports and dashboards, it’s common for them to receive queries asking where a team member can find a certain report or how to interpret a data point within a report. They may also have to search for tables and data sets to create new reports, acquaint themselves with the vocabulary of the various departments, and so on.

A data catalog makes all of this a more efficient process. In the following sections we’ll see how we can use DataHub to do it. For example, we’ll create the definition of the term “order”, create a list of reports created for the marketing department, and bring in the views and data sets so that they become searchable.

The work of data scientists is similar, too, because they create data sets that can be reused across various models. For example, data sets representing features for various customers can be stored in the platform, made searchable, and used with various models. They, too, benefit from having a data catalog.

Finally, it helps bring people up to speed with the data that is consumed by their department or team. For example, when someone joins the marketing team, pointing them to the data catalog helps them get productive quickly by finding the relevant reports, terminology, etc.

Ingesting data sets and views

To ingest the tables and views, we’ll create a data pipeline which ingets metadata from AWS Glue and writes it to the metadata service. This is done by creating a YAML configuration in DataHub that specifies where to ingest the metadata from, and where to write it. Once this is created, we can schedule it to run periodically so that it stays updated with Glue.

The image above shows how we define a “source” and how we ingest it into a “sink”. Here we’ve specified that we’d like to read from Glue and write it to DataHub’s metadata service.

Once the source and destination are defined, we can set a schedule to run the ingestion. This will bring in the metadata about the data sets and views we’ve created in Glue.

The image above shows that a successful run of the ingestion pipeline brings in the views and data sets. These are then browsable in the UI. Similarly, they are also searchable as shown in the following image.

This makes it possible for the analysts and the data scientists to quickly locate data sets.

Defining the vocabulary

Next, we’ll create the definition of the word “order”. This can be done from the UI as shown below. The definition can be added by editing the documentation.

Once created, this is available under “Glossary” and in search results.

Data products

Finally, we’ll create a data product. This is the catalog of reports and dashboards created for various departments. For example, the image below shows a dashboard created for the marketing team.

Expanding the dashboard allows us to look at the documentation for the report. This could contain the definition of the terms used in the report, as shown on the bottom right, a link to the dashboard in Superset, definitions of data points, report owners, and so on.

That’s it. That’s how a data catalog helps streamline working with data.

Programming Puzzles 2

As I continue working my way through the book on programming puzzles, I came across those involving permutations. In this post I’ll collect puzzles with the same theme, both from the book and from the internet.

All permutations

The first puzzle is to compute all the permutations of a given array. By extension, it can be used to compute all the permutations of a string, too, if we view it as an array of characters. To do this we’ll implement Heap’s algorithm. The following is its recursive version.

1
2
3
4
5
6
7
8
9
10
11
12
13
def heap(permutations: list[list], A: list, n: int):
if n == 1:
permutations.append(list(A))
else:
heap(permutations, A, n - 1)

for i in range(n - 1):
if n % 2 == 0:
A[i], A[n - 1] = A[n - 1], A[i]
else:
A[0], A[n - 1] = A[n - 1], A[0]

heap(permutations, A, n - 1)

The array permutations is the accumulator which will store all the permutations of the array. The initial arguments to the function would be an empty acuumulator, the list to permute, and the length of the list.

Next permutation

The next puzzle we’ll look at is computing the next permutation of the array in lexicographical order. The following implementation has been taken from the book.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def next_permutation(perm: list[int]) -> list[int]:
inversion_point = len(perm) - 2

while (inversion_point >= 0) and (perm[inversion_point] >= perm[inversion_point + 1]):
inversion_point = inversion_point - 1

if inversion_point == -1:
return []

for i in reversed(range(inversion_point + 1, len(perm))):
if perm[i] > perm[inversion_point]:
perm[inversion_point], perm[i] = perm[i], perm[inversion_point]
break

perm[inversion_point + 1:] = reversed(perm[inversion_point + 1:])

return perm

Previous permutation

A variation of the puzzle is to compute the previous permutation of the array in lexicographical order. The idea is to “reverse” the logic for computing the next permutation. If we look closely, we’ll find that all we’re changing are the comparison operators.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def previous_permutation(perm: list[int]) -> list[int]:
inversion_point = len(perm) - 2

while (inversion_point >= 0) and (perm[inversion_point] <= perm[inversion_point + 1]):
inversion_point = inversion_point - 1

if inversion_point == -1:
return []

for i in reversed(range(inversion_point + 1, len(perm))):
if perm[i] < perm[inversion_point]:
perm[inversion_point], perm[i] = perm[i], perm[inversion_point]
break

perm[inversion_point + 1:] = reversed(perm[inversion_point + 1:])

return perm

kth smallest permutation

The final puzzle we’ll look at is the one where we need to compute the k’th smallest permutation. The solution to this uses the previous_permutation function that we saw above. The idea is to call this function k times on the lexicographically-largest array. Sorting the array in decreasing order results is the largest. This becomes the input to the previous_permutation function.

1
2
3
4
5
6
7
8
9
def kth_smallest_permutation(perm: list[int], k: int) -> list[int]:
# -- Arrange the numbers in decreasing order
# -- thereby creating the lexicographically largest permutation
perm = sorted(perm, reverse=True)

for _ in range(k):
perm = previous_permutation(perm)

return perm

That’s it. These are puzzles involving permutations.

Programming Puzzles 1

I am working my way through a classic book of programming puzzles. As I work through more of these puzzles, I’ll share what I discover to solidify my understanding and help others who are doing the same. If you’ve ever completed puzzles on a site like leetcode, you’ll notice that the sheer volume of puzzles is overwhelming. However, there are patterns to these puzzles, and becoming familiar with them makes it easier to solve them. In this post we’ll take a look at one such pattern - two pointers - and see how it can be used to solve puzzles involving arrays.

Two Pointers

The idea behind two pointers is that there are, as the name suggests, two pointers that traverse the array, with one pointer leading the other. Using these two pointers we update the array and solve the puzzle at hand. As an illustrative example, let us consider the puzzle where we’re given an array of even and odd numbers and we’d like to move all the even numbers to the front of the array.

Even and Odd

1
2
3
4
5
6
7
8
9
10
11
def even_odd(A: list[int]) -> None:
"""Move even numbers to the front of the array."""
write_idx = 0
idx = 0

while idx < len(A):
if A[idx] % 2 == 0:
A[write_idx], A[idx] = A[idx], A[write_idx]
write_idx = write_idx + 1

idx = idx + 1

The two pointers here are idx and write_idx. While idx traverses the array and indicates the current element, write_idx indicates the position where the next even number should be written. Whenever idx points to an even number, it is written at the position indicated by write_idx. With this logic, if all the numbers in the array are even, idx and write_idx point to the same element i.e. the number is swapped with itself and the pointers are moved forward.

We’ll build upon this technique to remove duplicates from the array.

Remove Duplicates

Consider a sorted array containing duplicate numbers. We’d like to keep only one occurrence of each number and overwrite the rest. This can be solved using two pointers as follows.

1
2
3
4
5
6
7
8
9
10
11
12
def remove_duplicates(A: list[int]) -> int:
"""Remove all duplicates in the array."""
write_idx, idx = 1, 1

while idx < len(A):
if A[write_idx - 1] != A[idx]:
A[write_idx] = A[idx]
write_idx = write_idx + 1

idx = idx + 1

return write_idx

In this solution, idx and write_idx start at index 1 instead of 0. The reason is that we’d like to look at the number to the left of write_idx, and starting at index 1 allows us to do that. Notice also how we’re writing the if condition to check for duplicity in the vicinity of write_idx; the number to the left of write_idx should be different from the one that idx is presently pointing to.

As a varitation, move the duplicates to the end of the array instead of overwriting them.

As another variation, remove a given number from the array by moving it to the end.

With this same pattern, we can now change the puzzle to state that we want at most two instances of the number in the sorted array.

Remove Duplicates Variation

1
2
3
4
5
6
7
8
9
10
def remove_duplicates(A: list[int]) -> int:
"""Keep at most two instances of the number."""
write_idx = 2

for idx in range(2, len(A)):
if A[write_idx - 2] != A[idx]:
A[write_idx] = A[idx]
write_idx = write_idx + 1

return write_idx

Akin to the previous puzzle, we look for duplicates in the vicinity of write_idx. While in the previous puzzle the if condition checked for one number to the left, in this variation we look at two positions to the left of write_idx to keep at most two instances. The remainder of the logic is the same. As a variation, try keeping at most three instances of the number in the sorted array.

Finally, we’ll use the same pattern to solve the Dutch national flag problem.

Dutch National Flag

In this problem, we sort the array by dividing it into three distinct regions. The first region contains elements less than the pivot, the second region contains elements equal to the pivot, and the third region contains elements greater than the pivot.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def dutch_national_flag(A: list[int], pivot_idx: int) -> None:
"""Divide the array into three distinct regions."""
pivot = A[pivot_idx]
write_idx = 0
idx = 0

# --- Move all elements less than pivot to the front
while idx < len(A):
if A[idx] < pivot:
A[write_idx], A[idx] = A[idx], A[write_idx]
write_idx = write_idx + 1
idx = idx + 1

idx = write_idx

# -- Move all elements equal to the pivot to the middle
while idx < len(A):
if A[idx] == pivot:
A[write_idx], A[idx] = A[idx], A[write_idx]
write_idx = write_idx + 1
idx = idx + 1

# -- All elements greater than pivot have been moved to the end.

This problem combines everything we’ve seen so far about two pointers and divides the array into three distinct regions. As we compute the first two regions, the third region is computed as a side-effect.

We can now solve a variation of the Dutch national flag partitioning problem by accepting a list of pivot elements. In this variation all the numbers within the list of pivots appear together i.e. all the elements equal to the first pivot element appear first, equal to second pivot element appear second, and so on.

1
2
3
4
5
6
7
8
9
10
11
12
def dutch_national_flag(A: list[int], pivots: list[int]) -> None:
"""This is a variation in which all elements with same key appear together."""
write_idx = 0

for pivot in pivots:
idx = write_idx

while idx < len(A):
if A[idx] == pivot:
A[write_idx], A[idx] = A[idx], A[write_idx]
write_idx = write_idx + 1
idx = idx + 1

That’s it, that’s how we can solve puzzles involving two pointers and arrays.