Categories
apache-spark apache-spark-sql pandas pyspark python

How do you create merge_asof functionality in PySpark?

Table A has many columns with a date column, Table B has a datetime and a value. The data in both tables are generated sporadically with no regular interval. Table A is small, table B is massive.

I need to join B to A under the condition that a given element a of A.datetime corresponds to

B[B['datetime'] <= a]]['datetime'].max()

There are a couple ways to do this, but I would like the most efficient way.

Option 1

Broadcast the small dataset as a Pandas DataFrame. Set up a Spark UDF that creates a pandas DataFrame for each row merges with the large dataset using merge_asof.

Option 2

Use the broadcast join functionality of Spark SQL: set up a theta join on the following condition

B['datetime'] <= A['datetime']

Then eliminate all the superfluous rows.

Option B seems pretty terrible… but please let me know if the first way is efficient or if there is another way.

EDIT: Here is the sample input and expected output:

A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
| A |2019-02-03|
| B |2019-03-14|
+---------+----------+
B =
+---------+----------+
| Key | Datetime |
+---------+----------+
| 0 |2019-01-01|
| 1 |2019-01-15|
| 2 |2019-02-01|
| 3 |2019-02-15|
| 4 |2019-03-01|
| 5 |2019-03-15|
+---------+----------+
custom_join(A,B) =
+---------+----------+
| Column1 | Key |
+---------+----------+
| A | 2 |
| B | 4 |
+---------+----------+