Skip to content

Weather Pipeline

SingletonMeta

Bases: type

A metaclass for creating singleton classes.

Source code in WeatherPipeline.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class SingletonMeta(type):
    """A metaclass for creating singleton classes."""

    _instances: dict[Any, Any] = {}

    def __call__(cls, *args, **kwargs):
        """
        Possible changes to the value of the `__init__` argument do not affect
        the returned instance.
        If the class has not been instantiated yet, create a new instance.
        If it has, return the existing instance.
        """
        if cls not in cls._instances:
            instance = super().__call__(*args, **kwargs)
            cls._instances[cls] = instance
        return cls._instances[cls]

__call__(*args, **kwargs)

Possible changes to the value of the __init__ argument do not affect the returned instance. If the class has not been instantiated yet, create a new instance. If it has, return the existing instance.

Source code in WeatherPipeline.py
23
24
25
26
27
28
29
30
31
32
33
def __call__(cls, *args, **kwargs):
    """
    Possible changes to the value of the `__init__` argument do not affect
    the returned instance.
    If the class has not been instantiated yet, create a new instance.
    If it has, return the existing instance.
    """
    if cls not in cls._instances:
        instance = super().__call__(*args, **kwargs)
        cls._instances[cls] = instance
    return cls._instances[cls]

WeatherPipeline

A class to handle the weather data pipeline.

Source code in WeatherPipeline.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class WeatherPipeline(metaclass=SingletonMeta):
    """A class to handle the weather data pipeline."""

    def __init__(self) -> None:
        """Initialize the WeatherPipeline class and set up the database connection.

        DuckDB is used to manage the weather data locations.

        This constructor initializes the database connection and creates the necessary table
        for storing weather locations if it does not already exist.
        """
        self.con = duckdb.connect(database=os.getenv("DUCK_DB_PATH"))
        self.con.sql(
            """
            CREATE TABLE IF NOT EXISTS weather_location (
                postcode VARCHAR UNIQUE NOT NULL
                );
            """
        )
        self.weather_data = WeatherData()

    def add_location(self, postcode: str) -> None:
        """Add a new location to the weather data pipeline.

        Args:
            postcode (str): The postcode of the location to add.

        Raises:
            Exception: If the location cannot be added due to a database error.
        """
        try:
            self.con.sql(
                """
                INSERT INTO weather_location (postcode)
                VALUES (?);
                """,
                postcode,
            )
        except duckdb.DuckDBException as e:
            raise Exception(f"Failed to add location {postcode}: {e}")
        print(f"Location {postcode} added successfully.")

    def remove_location(self, postcode: str) -> None:
        """Remove a location from the weather data pipeline.

        Args:
            postcode (str): The postcode of the location to remove.

        Raises:
            Exception: If the location cannot be removed due to a database error.
        """
        try:
            self.con.sql(
                """
                DELETE FROM weather_location
                WHERE postcode = ?;
                """,
                postcode,
            )
        except duckdb.DuckDBException as e:
            raise Exception(f"Failed to remove location {postcode}: {e}")
        print(f"Location {postcode} removed successfully.")

    def get_locations(self) -> list:
        """Retrieve all locations from the weather data pipeline.

        Returns:
            list: A list of postcodes for the locations stored in the database.

        Raises:
            Exception: If the locations cannot be retrieved due to a database error.
        """
        try:
            forecast_locations = self.con.sql(
                "SELECT postcode FROM weather_location"
            ).fetchall()

            print(f"Locations retrieved: len({forecast_locations})")
            return [row[0] for row in forecast_locations]
        except duckdb.DuckDBException as e:
            raise Exception(f"Failed to retrieve locations: {e}")

    def get_and_write_forecast_history(
        self, date: str, location: str, writer: Writer, destination: str
    ) -> None:
        """Fetch and write the weather forecast history for a specific date and location.

        Args:
            date (str): The date for which to fetch the forecast history.
            location (str): The location for which to fetch the forecast history.
            writer (Writer): An instance of the Writer class to handle writing data.
            destination (str): The file path where the data should be written.

        Raises:
            Exception: If the data cannot be fetched or written successfully.
        """

        result, status_code = self.weather_data.get_forecast_history(
            date=date, location=location
        )

        if status_code != 200:
            raise Exception(f"Failed to fetch data: {result}")

        writer.write(str(result), destination)
        print(f"Data written to {destination} successfully.")

__init__()

Initialize the WeatherPipeline class and set up the database connection.

DuckDB is used to manage the weather data locations.

This constructor initializes the database connection and creates the necessary table for storing weather locations if it does not already exist.

Source code in WeatherPipeline.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(self) -> None:
    """Initialize the WeatherPipeline class and set up the database connection.

    DuckDB is used to manage the weather data locations.

    This constructor initializes the database connection and creates the necessary table
    for storing weather locations if it does not already exist.
    """
    self.con = duckdb.connect(database=os.getenv("DUCK_DB_PATH"))
    self.con.sql(
        """
        CREATE TABLE IF NOT EXISTS weather_location (
            postcode VARCHAR UNIQUE NOT NULL
            );
        """
    )
    self.weather_data = WeatherData()

add_location(postcode)

Add a new location to the weather data pipeline.

Parameters:

Name Type Description Default
postcode str

The postcode of the location to add.

required

Raises:

Type Description
Exception

If the location cannot be added due to a database error.

Source code in WeatherPipeline.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def add_location(self, postcode: str) -> None:
    """Add a new location to the weather data pipeline.

    Args:
        postcode (str): The postcode of the location to add.

    Raises:
        Exception: If the location cannot be added due to a database error.
    """
    try:
        self.con.sql(
            """
            INSERT INTO weather_location (postcode)
            VALUES (?);
            """,
            postcode,
        )
    except duckdb.DuckDBException as e:
        raise Exception(f"Failed to add location {postcode}: {e}")
    print(f"Location {postcode} added successfully.")

get_and_write_forecast_history(date, location, writer, destination)

Fetch and write the weather forecast history for a specific date and location.

Parameters:

Name Type Description Default
date str

The date for which to fetch the forecast history.

required
location str

The location for which to fetch the forecast history.

required
writer Writer

An instance of the Writer class to handle writing data.

required
destination str

The file path where the data should be written.

required

Raises:

Type Description
Exception

If the data cannot be fetched or written successfully.

Source code in WeatherPipeline.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def get_and_write_forecast_history(
    self, date: str, location: str, writer: Writer, destination: str
) -> None:
    """Fetch and write the weather forecast history for a specific date and location.

    Args:
        date (str): The date for which to fetch the forecast history.
        location (str): The location for which to fetch the forecast history.
        writer (Writer): An instance of the Writer class to handle writing data.
        destination (str): The file path where the data should be written.

    Raises:
        Exception: If the data cannot be fetched or written successfully.
    """

    result, status_code = self.weather_data.get_forecast_history(
        date=date, location=location
    )

    if status_code != 200:
        raise Exception(f"Failed to fetch data: {result}")

    writer.write(str(result), destination)
    print(f"Data written to {destination} successfully.")

get_locations()

Retrieve all locations from the weather data pipeline.

Returns:

Name Type Description
list list

A list of postcodes for the locations stored in the database.

Raises:

Type Description
Exception

If the locations cannot be retrieved due to a database error.

Source code in WeatherPipeline.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_locations(self) -> list:
    """Retrieve all locations from the weather data pipeline.

    Returns:
        list: A list of postcodes for the locations stored in the database.

    Raises:
        Exception: If the locations cannot be retrieved due to a database error.
    """
    try:
        forecast_locations = self.con.sql(
            "SELECT postcode FROM weather_location"
        ).fetchall()

        print(f"Locations retrieved: len({forecast_locations})")
        return [row[0] for row in forecast_locations]
    except duckdb.DuckDBException as e:
        raise Exception(f"Failed to retrieve locations: {e}")

remove_location(postcode)

Remove a location from the weather data pipeline.

Parameters:

Name Type Description Default
postcode str

The postcode of the location to remove.

required

Raises:

Type Description
Exception

If the location cannot be removed due to a database error.

Source code in WeatherPipeline.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def remove_location(self, postcode: str) -> None:
    """Remove a location from the weather data pipeline.

    Args:
        postcode (str): The postcode of the location to remove.

    Raises:
        Exception: If the location cannot be removed due to a database error.
    """
    try:
        self.con.sql(
            """
            DELETE FROM weather_location
            WHERE postcode = ?;
            """,
            postcode,
        )
    except duckdb.DuckDBException as e:
        raise Exception(f"Failed to remove location {postcode}: {e}")
    print(f"Location {postcode} removed successfully.")