The following code is to be used in a Jupyter notebook

Necessary Import Functions

# necessary import functions
import pandas as pd
import datetime
from dateutil.relativedelta import relativedelta
import base64
from IPython.display import HTML

from pyspark.sql import functions as F
from sedona.register import SedonaRegistrator
SedonaRegistrator.registerAll(spark)


Helper Functions

Save a local CSV from the notebook

# helper functions
# save a local CSV from the notebook
def create_download_link(df, title="Download CSV file", filename="data.csv"):
    csv = df.to_csv()
    b64 = base64.b64encode(csv.encode())
    payload = b64.decode()
    html = '<a download="{filename}" href="data:text/csv;base64,{payload}" target="_blank">{title}</a>'
    html = html.format(payload=payload, title=title, filename=filename)
    return HTML(html)

Read in UNGP S3 data from a range of dates

 # read in UNGP S3 data from a range of dates
def get_date_list(basepath, start_date, end_date):
    start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
    end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date()
    delta = end_date - start_date
    days = []
    for i in range(delta.days + 1):
        day = start_date + datetime.timedelta(days=i)
        days.append(datetime.datetime.strftime(day, "%Y-%m-%d"))
    
    paths = [basepath + f"year={day[:4]}/month={day[5:7]}/day={day[8:10]}" for day in days]
    return (paths)


Geographic locations

# geographic locations
locations = pd.read_csv("https://github.com/dhopp1-UNCTAD/ais_helper_files/raw/main/geographic_locations.csv")


def get_data(start_date, end_date, locations, distance_parameter = "0.01"):
    # distance parameter = 0.01 = 1 kilometer radius
    
    # all geographies in one query
    condition_string = ""
    select_string = ""
    for name_i in locations.name:
        condition_string += f"""ST_Contains(ST_Buffer(ST_Point({locations.loc[locations.name == name_i, 'longitude'].values[0]}, {locations.loc[locations.name == name_i, 'latitude'].values[0]}), {distance_parameter}), pos)"""
        if name_i != locations.name.values[-1]:
            condition_string += " OR "
        if name_i == locations.name.values[0]:
            select_string += f"""CASE WHEN ST_Contains(ST_Buffer(ST_Point({locations.loc[locations.name == name_i, 'longitude'].values[0]}, {locations.loc[locations.name == name_i, 'latitude'].values[0]}), {distance_parameter}), pos) THEN '{name_i}' """
        elif name_i != locations.name.values[-1]:
            select_string += f"""WHEN ST_Contains(ST_Buffer(ST_Point({locations.loc[locations.name == name_i, 'longitude'].values[0]}, {locations.loc[locations.name == name_i, 'latitude'].values[0]}), {distance_parameter}), pos) THEN '{name_i}' """
        else:
            select_string += f"""WHEN ST_Contains(ST_Buffer(ST_Point({locations.loc[locations.name == name_i, 'longitude'].values[0]}, {locations.loc[locations.name == name_i, 'latitude'].values[0]}), {distance_parameter}), pos) THEN '{name_i}' """
            select_string += "END AS geo_name"
    
    # step 1
    # read data
    basepath = "s3a://ungp-ais-data-historical-backup/exact-earth-data/transformed/prod/"
    dates = get_date_list(basepath, start_date, end_date)
    df = spark.read.parquet(*dates)

    # create temp view to be able to use spark SQL
    df.createOrReplaceTempView("df")

    # adding points and filtering for cargo and tankers
    step_01 = spark.sql(f"""
                    SELECT * FROM
                    (
                        SELECT 
                            dt_pos_utc, 
                            mmsi, 
                            vessel_type,
                            longitude, 
                            latitude, 
                            ST_Point(cast(longitude as Decimal(24,20)), cast(latitude as Decimal(24,20))) as pos 
                        FROM df
                        WHERE vessel_type IN ('Cargo','Tanker')
                    ) AS subquery
                    WHERE {condition_string}
                    """)
    step_01.createOrReplaceTempView("step_01")
    
    # step 2
    # filtering for ships that are within 1km of the point
    step_02 = spark.sql(f"""
                    SELECT
                        geo_name,
                        date,
                        vessel_type,
                        COUNT(DISTINCT mmsi) AS num_ships
                    FROM 
                        (SELECT
                            mmsi,
                            {select_string},
                            SUBSTRING(CAST(dt_pos_utc AS VARCHAR(10)), 1, 10) AS date,
                            vessel_type
                        FROM step_01
                        ) AS subquery
                    GROUP BY date, vessel_type, geo_name
                    """)
    
    return (step_02)


geo_names = list(locations.loc[locations.ISO3 == "RUS","name"].values) # russian ports

# queries for months
start_month = datetime.datetime.strptime("2021-02-01", "%Y-%m-%d")
end_month = datetime.datetime.strptime("2022-07-01", "%Y-%m-%d")

start_dates = []
end_dates = []

while start_month <= end_month:
    start_dates.append(datetime.datetime.strftime(start_month, "%Y-%m-%d"))
    end_date = min(start_month + relativedelta(months=1) - relativedelta(days = 1), datetime.datetime.today() - relativedelta(days=2)) # minimum between 2 days ago so don't go ahead of where there are actually files
    end_dates.append(datetime.datetime.strftime(end_date, "%Y-%m-%d"))
    start_month = start_month + relativedelta(months=1)

date_dict = {x: None for x in start_dates}

for i in range(len(start_dates)):
    date_dict[start_dates[i]] = get_data(start_dates[i], end_dates[i], locations.loc[locations.name.isin(geo_names), :].reset_index(drop=True))


In [ ]:

create_download_link(date_dict["2021-03-01"].toPandas(), filename="2021-03-01.csv")



  • No labels