influxdb-client
Advanced tools
| Metadata-Version: 2.1 | ||
| Name: influxdb-client | ||
| Version: 1.46.0 | ||
| Version: 1.47.0 | ||
| Summary: InfluxDB 2.0 Python client library | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/influxdata/influxdb-client-python |
@@ -56,2 +56,4 @@ """Commons function for Sync and Async client.""" | ||
| self.conf = _Configuration() | ||
| if not isinstance(self.url, str): | ||
| raise ValueError('"url" attribute is not str instance') | ||
| if self.url.endswith("/"): | ||
@@ -58,0 +60,0 @@ self.conf.host = self.url[:-1] |
@@ -49,4 +49,4 @@ """ | ||
| """Initialize defaults.""" | ||
| self.columns = [] | ||
| self.records = [] | ||
| self.columns: List[FluxColumn] = [] | ||
| self.records: List[FluxRecord] = [] | ||
@@ -53,0 +53,0 @@ def get_group_key(self): |
| """Collect and async write time series data to InfluxDB Cloud or InfluxDB OSS.""" | ||
| import logging | ||
| from asyncio import ensure_future, gather | ||
| from collections import defaultdict | ||
@@ -117,10 +118,18 @@ from typing import Union, Iterable, NamedTuple | ||
| payloads = defaultdict(list) | ||
| self._serialize(record, write_precision, payloads, precision_from_point=False, **kwargs) | ||
| self._serialize(record, write_precision, payloads, precision_from_point=True, **kwargs) | ||
| # joint list by \n | ||
| body = b'\n'.join(payloads[write_precision]) | ||
| response = await self._write_service.post_write_async(org=org, bucket=bucket, body=body, | ||
| precision=write_precision, async_req=False, | ||
| _return_http_data_only=False, | ||
| content_type="text/plain; charset=utf-8") | ||
| return response[1] in (201, 204) | ||
| futures = [] | ||
| for payload_precision, payload_line in payloads.items(): | ||
| futures.append(ensure_future | ||
| (self._write_service.post_write_async(org=org, bucket=bucket, | ||
| body=b'\n'.join(payload_line), | ||
| precision=payload_precision, async_req=False, | ||
| _return_http_data_only=False, | ||
| content_type="text/plain; charset=utf-8"))) | ||
| results = await gather(*futures, return_exceptions=True) | ||
| for result in results: | ||
| if isinstance(result, Exception): | ||
| raise result | ||
| return False not in [re[1] in (201, 204) for re in results] |
| """Version of the Client that is used in User-Agent header.""" | ||
| VERSION = '1.46.0' | ||
| VERSION = '1.47.0' |
+1
-1
| Metadata-Version: 2.1 | ||
| Name: influxdb_client | ||
| Version: 1.46.0 | ||
| Version: 1.47.0 | ||
| Summary: InfluxDB 2.0 Python client library | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/influxdata/influxdb-client-python |
+0
-1
@@ -47,3 +47,2 @@ #!/usr/bin/env python | ||
| from pathlib import Path | ||
| this_directory = Path(__file__).parent | ||
@@ -50,0 +49,0 @@ long_description = (this_directory / "README.md").read_text() |
@@ -326,2 +326,31 @@ import codecs | ||
| def test_url_attribute(self): | ||
| # Wrong URL attribute | ||
| wrong_types = [ | ||
| None, | ||
| True, False, | ||
| 123, 123.5, | ||
| dict({"url" : "http://localhost:8086"}), | ||
| list(["http://localhost:8086"]), | ||
| tuple(("http://localhost:8086")) | ||
| ] | ||
| correct_types = [ | ||
| "http://localhost:8086" | ||
| ] | ||
| for url_type in wrong_types: | ||
| try: | ||
| client_not_running = InfluxDBClient(url=url_type, token="my-token", debug=True) | ||
| status = True | ||
| except ValueError as e: | ||
| status = False | ||
| self.assertFalse(status) | ||
| for url_type in correct_types: | ||
| try: | ||
| client_not_running = InfluxDBClient(url=url_type, token="my-token", debug=True) | ||
| status = True | ||
| except ValueError as e: | ||
| status = False | ||
| self.assertTrue(status) | ||
| def test_build(self): | ||
@@ -328,0 +357,0 @@ build = self.client.build() |
| import asyncio | ||
| import dateutil.parser | ||
| import logging | ||
| import math | ||
| import re | ||
| import time | ||
| import unittest | ||
@@ -9,2 +12,3 @@ import os | ||
| import pandas | ||
| import pytest | ||
@@ -203,12 +207,99 @@ import warnings | ||
| def gen_fractional_utc(self, nano, precision) -> str: | ||
| raw_sec = nano / 1_000_000_000 | ||
| if precision == WritePrecision.NS: | ||
| rem = f"{nano % 1_000_000_000}".rjust(9,"0").rstrip("0") | ||
| return (datetime.fromtimestamp(math.floor(raw_sec), tz=timezone.utc) | ||
| .isoformat() | ||
| .replace("+00:00", "") + f".{rem}Z") | ||
| #f".{rem}Z")) | ||
| elif precision == WritePrecision.US: | ||
| # rem = f"{round(nano / 1_000) % 1_000_000}"#.ljust(6,"0") | ||
| return (datetime.fromtimestamp(round(raw_sec,6), tz=timezone.utc) | ||
| .isoformat() | ||
| .replace("+00:00","") | ||
| .strip("0") + "Z" | ||
| ) | ||
| elif precision == WritePrecision.MS: | ||
| #rem = f"{round(nano / 1_000_000) % 1_000}".rjust(3, "0") | ||
| return (datetime.fromtimestamp(round(raw_sec,3), tz=timezone.utc) | ||
| .isoformat() | ||
| .replace("+00:00","") | ||
| .strip("0") + "Z" | ||
| ) | ||
| elif precision == WritePrecision.S: | ||
| return (datetime.fromtimestamp(round(raw_sec), tz=timezone.utc) | ||
| .isoformat() | ||
| .replace("+00:00","Z")) | ||
| else: | ||
| raise ValueError(f"Unknown precision: {precision}") | ||
| @async_test | ||
| async def test_write_points_different_precision(self): | ||
| now_ns = time.time_ns() | ||
| now_us = now_ns / 1_000 | ||
| now_ms = now_us / 1_000 | ||
| now_s = now_ms / 1_000 | ||
| now_date_s = self.gen_fractional_utc(now_ns, WritePrecision.S) | ||
| now_date_ms = self.gen_fractional_utc(now_ns, WritePrecision.MS) | ||
| now_date_us = self.gen_fractional_utc(now_ns, WritePrecision.US) | ||
| now_date_ns = self.gen_fractional_utc(now_ns, WritePrecision.NS) | ||
| points = { | ||
| WritePrecision.S: [], | ||
| WritePrecision.MS: [], | ||
| WritePrecision.US: [], | ||
| WritePrecision.NS: [] | ||
| } | ||
| expected = {} | ||
| measurement = generate_name("measurement") | ||
| _point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3) \ | ||
| .time(datetime.fromtimestamp(0, tz=timezone.utc), write_precision=WritePrecision.S) | ||
| _point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3) \ | ||
| .time(datetime.fromtimestamp(1, tz=timezone.utc), write_precision=WritePrecision.MS) | ||
| _point3 = Point(measurement).tag("location", "Berlin").field("temperature", 24.3) \ | ||
| .time(datetime.fromtimestamp(2, tz=timezone.utc), write_precision=WritePrecision.NS) | ||
| await self.client.write_api().write(bucket="my-bucket", record=[_point1, _point2, _point3], | ||
| # basic date-time value | ||
| points[WritePrecision.S].append(Point(measurement).tag("method", "SecDateTime").field("temperature", 25.3) \ | ||
| .time(datetime.fromtimestamp(round(now_s), tz=timezone.utc), write_precision=WritePrecision.S)) | ||
| expected['SecDateTime'] = now_date_s | ||
| points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDateTime").field("temperature", 24.3) \ | ||
| .time(datetime.fromtimestamp(round(now_s,3), tz=timezone.utc), write_precision=WritePrecision.MS)) | ||
| expected['MilDateTime'] = now_date_ms | ||
| points[WritePrecision.US].append(Point(measurement).tag("method", "MicDateTime").field("temperature", 24.3) \ | ||
| .time(datetime.fromtimestamp(round(now_s,6), tz=timezone.utc), write_precision=WritePrecision.US)) | ||
| expected['MicDateTime'] = now_date_us | ||
| # N.B. datetime does not handle nanoseconds | ||
| # points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDateTime").field("temperature", 24.3) \ | ||
| # .time(datetime.fromtimestamp(now_s, tz=timezone.utc), write_precision=WritePrecision.NS)) | ||
| # long timestamps based on POSIX time | ||
| points[WritePrecision.S].append(Point(measurement).tag("method", "SecPosix").field("temperature", 24.3) \ | ||
| .time(round(now_s), write_precision=WritePrecision.S)) | ||
| expected['SecPosix'] = now_date_s | ||
| points[WritePrecision.MS].append(Point(measurement).tag("method", "MilPosix").field("temperature", 24.3) \ | ||
| .time(round(now_ms), write_precision=WritePrecision.MS)) | ||
| expected['MilPosix'] = now_date_ms | ||
| points[WritePrecision.US].append(Point(measurement).tag("method", "MicPosix").field("temperature", 24.3) \ | ||
| .time(round(now_us), write_precision=WritePrecision.US)) | ||
| expected['MicPosix'] = now_date_us | ||
| points[WritePrecision.NS].append(Point(measurement).tag("method", "NanPosix").field("temperature", 24.3) \ | ||
| .time(now_ns, write_precision=WritePrecision.NS)) | ||
| expected['NanPosix'] = now_date_ns | ||
| # ISO Zulu datetime with ms, us and ns e.g. "2024-09-27T13:17:16.412399728Z" | ||
| points[WritePrecision.S].append(Point(measurement).tag("method", "SecDTZulu").field("temperature", 24.3) \ | ||
| .time(now_date_s, write_precision=WritePrecision.S)) | ||
| expected['SecDTZulu'] = now_date_s | ||
| points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDTZulu").field("temperature", 24.3) \ | ||
| .time(now_date_ms, write_precision=WritePrecision.MS)) | ||
| expected['MilDTZulu'] = now_date_ms | ||
| points[WritePrecision.US].append(Point(measurement).tag("method", "MicDTZulu").field("temperature", 24.3) \ | ||
| .time(now_date_us, write_precision=WritePrecision.US)) | ||
| expected['MicDTZulu'] = now_date_us | ||
| # This keeps resulting in micro second resolution in response | ||
| # points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDTZulu").field("temperature", 24.3) \ | ||
| # .time(now_date_ns, write_precision=WritePrecision.NS)) | ||
| recs = [x for x in [v for v in points.values()]] | ||
| await self.client.write_api().write(bucket="my-bucket", record=recs, | ||
| write_precision=WritePrecision.NS) | ||
@@ -219,12 +310,46 @@ query = f''' | ||
| |> filter(fn: (r) => r["_measurement"] == "{measurement}") | ||
| |> keep(columns: ["_time"]) | ||
| |> keep(columns: ["method","_time"]) | ||
| ''' | ||
| query_api = self.client.query_api() | ||
| # ensure calls fully processed on server | ||
| await asyncio.sleep(1) | ||
| raw = await query_api.query_raw(query) | ||
| self.assertEqual(8, len(raw.splitlines())) | ||
| self.assertEqual(',,0,1970-01-01T00:00:02Z', raw.splitlines()[4]) | ||
| self.assertEqual(',,0,1970-01-01T00:00:01Z', raw.splitlines()[5]) | ||
| self.assertEqual(',,0,1970-01-01T00:00:00Z', raw.splitlines()[6]) | ||
| linesRaw = raw.splitlines()[4:] | ||
| lines = [] | ||
| for lnr in linesRaw: | ||
| lines.append(lnr[2:].split(",")) | ||
| def get_time_for_method(lines, method): | ||
| for l in lines: | ||
| if l[2] == method: | ||
| return l[1] | ||
| return "" | ||
| self.assertEqual(15, len(raw.splitlines())) | ||
| for key in expected: | ||
| t = get_time_for_method(lines,key) | ||
| comp_time = dateutil.parser.isoparse(get_time_for_method(lines,key)) | ||
| target_time = dateutil.parser.isoparse(expected[key]) | ||
| self.assertEqual(target_time.date(), comp_time.date()) | ||
| self.assertEqual(target_time.hour, comp_time.hour) | ||
| self.assertEqual(target_time.second,comp_time.second) | ||
| dif = abs(target_time.microsecond - comp_time.microsecond) | ||
| if key[:3] == "Sec": | ||
| # Already tested | ||
| pass | ||
| elif key[:3] == "Mil": | ||
| # may be slight rounding differences | ||
| self.assertLess(dif, 1500, f"failed to match timestamp for {key} {target_time} != {comp_time}") | ||
| elif key[:3] == "Mic": | ||
| # may be slight rounding differences | ||
| self.assertLess(dif, 150, f"failed to match timestamp for {key} {target_time} != {comp_time}") | ||
| elif key[:3] == "Nan": | ||
| self.assertEqual(expected[key], get_time_for_method(lines, key)) | ||
| else: | ||
| raise Exception(f"Unhandled key {key}") | ||
| @async_test | ||
@@ -231,0 +356,0 @@ async def test_delete_api(self): |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
4418098
0.17%88373
0.16%