#weather #meteorology #climate #qc #quality-control

rove

天气数据的实时空间和时间序列质量控制系统

2 个版本

0.1.1 2023 年 10 月 19 日
0.1.0 2023 年 10 月 19 日

#123 in 地理空间

AGPL-3.0-only

64KB
980

实时观测验证引擎

什么是 ROVE?

ROVE 是一个针对大规模天气数据进行实时质量控制的系统(空间和时间)。它是在 Met Norway 的 CONFIDENT 项目下为满足内部质量控制需求而创建的,旨在取代旧系统。然而,它被设计成模块化和通用,足以满足其他需求,我们希望它能得到更广泛的应用。

谁负责?

Ingrid Abraham

状态

处于 alpha 测试阶段。

基准测试

基准测试代码可在此处找到。

有三个基准测试

  • 单次,向 ROVE 发送请求以在每个数据上运行 dip_check 和 step_check
  • 系列,与单次相同,但每个请求有 10k 个数据点
  • 空间,对 10k 个数据点进行 buddy_check 和 sct 请求,这些数据点分布在约 350x350km 的区域内

以下是 M1 mac 上运行的结果

single_benchmark thrpt:  53.036 Kelem/s
series_benchmark thrpt:  5.4106 Melem/s
spatial_benchmark thrpt:  194.01 Kelem/s

Kelem/s = 每秒千个数据点,M 为百万。

值得注意的是,ROVE 可扩展。如果您需要的吞吐量超过一个节点可以提供的,您可以在负载均衡器后面设置尽可能多的节点,尽管在大多数情况下,您的瓶颈可能是数据源。

试试看

要使用 ROVE,您需要为想要使用的语言生成绑定。API 定义可在此处找到。

API 有两个端点

  1. ValidateSeries,执行时间序列 QC 测试,并接受以下参数
    • series_id:资源定位器,形式为 :例如,oda:123456,其中 oda 是系统所知的数据源,123456 是特定于 oda 的时间序列 ID
    • start_time:表示子序列开始时间的戳。如果未提供,则使用时间序列的开始时间
    • end_time:表示子序列结束时间的戳。如果未提供,则使用时间序列的结束时间
    • tests:要在数据上运行的测试名称列表
  2. ValidateSpatial,执行空间 QC 测试,并接受以下参数
    • spatial_id:资源定位器,形式为 :例如,frost:air_temperature,其中 frost 是系统所知的数据源,air_temperature 是 frost 中的“元素”,指定我们只获取空气温度数据
    • 后备数据源:提供数据以帮助对第一个数据源进行质量控制,但这些数据源的数据本身不会进行质量控制
    • 时间:要质量控制的空间数据的时间戳
    • tests:要在数据上运行的测试名称列表
    • 多边形:定义多边形的经纬度点列表,用于限制要质量控制的空间切片。如果没有提供,则将全面质量控制整个切片

设置绑定后,以下是一个如何在Python中使用它们向挪威气象局的ROVE服务器发送请求的示例

import grpc
import proto.rove_pb2 as rove
import proto.rove_pb2_grpc as rove_grpc
from proto.rove_pb2 import google_dot_protobuf_dot_timestamp__pb2 as ts
from datetime import datetime, timezone


def send_series(stub):
    request = rove.ValidateSeriesRequest(
        series_id="frost:18700/air_temperature",
        start_time=ts.Timestamp(
            seconds=int(datetime(2023, 6, 26, hour=14, tzinfo=timezone.utc).timestamp())
        ),
        end_time=ts.Timestamp(
            seconds=int(datetime(2023, 6, 26, hour=16, tzinfo=timezone.utc).timestamp())
        ),
        tests=["dip_check", "step_check"],
    )

    print("Sending ValidateSeries request")
    responses = stub.ValidateSeries(request)

    print("Response:\n")
    for response in responses:
        print("Test name: ", response.test, "\n")
        for result in response.results:
            print(
                "    Time: ",
                datetime.fromtimestamp(result.time.seconds, tz=timezone.utc),
            )
            print("    Flag: ", rove.Flag.Name(result.flag), "\n")


def send_spatial(stub):
    request = rove.ValidateSpatialRequest(
        spatial_id="frost:air_temperature",
        time=ts.Timestamp(
            seconds=int(datetime(2023, 6, 26, hour=14, tzinfo=timezone.utc).timestamp())
        ),
        tests=["buddy_check", "sct"],
        polygon=[
            rove.GeoPoint(lat=59.93, lon=10.05),
            rove.GeoPoint(lat=59.93, lon=11.0),
            rove.GeoPoint(lat=60.25, lon=10.77),
        ],
    )

    print("Sending ValidateSpatial request")
    responses = stub.ValidateSpatial(request)

    print("Response:\n")
    for response in responses:
        print("Test name: ", response.test, "\n")
        for result in response.results:
            print(
                "    location: (lat: ",
                result.location.lat,
                " lon: ",
                result.location.lon,
                ")",
            )
            print("    Flag: ", rove.Flag.Name(result.flag), "\n")


def main():
    channel = grpc.insecure_channel("157.249.77.242:1337")
    stub = rove_grpc.RoveStub(channel)

    send_series(stub)
    send_spatial(stub)


if __name__ == "__main__":
    main()

用于生产

警告:ROVE尚未准备就绪用于生产。

您可以设置自己的ROVE实例,连接到您自己的数据源。它可以作为一个通过网络接收请求的gRPC服务器,或者是一个更大服务中的组件,其中质量控制运行由函数调用触发。两种示例都在文档中提供。

特别值得注意的是,您需要提供DataConnector特质的实现,以便ROVE知道如何与您的数据源通信。可以在met_connectors下找到一些数据连接器的实现的真实世界示例,其中frost通过http REST API进行通信,而lustre_netatmo通过网络文件系统读取csv文件中的数据。

架构概述

component diagram

文档

Crate文档可在此处找到。

如何作为开发者贡献

欢迎贡献,请联系Ingrid([email protected])。

依赖关系

~15–25MB
~368K SLoC