Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions src/oceanum/datamesh/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,33 +168,24 @@ def _check_info(self):
"""
Check if there are any infos available that need to be displayed.
Typically will ask to update the client if the version is outdated.
Also will try to guess gateway address if not provided.
Also will set gateway address to service address if not provided.
"""
_gateway = self._gateway or f"{self._proto}://{self._host}"
self._is_v1 = True
self._gateway = self._gateway or f"{self._proto}://{self._host}"
try:
resp = self._retried_request(
f"{_gateway}/info/oceanum_python/{__version__}",
f"{self._gateway}/info/oceanum_python/{__version__}",
retries=5,
)
if resp.status_code == 200:
r = resp.json()
if "message" in r:
print(r["message"])
self._gateway = _gateway
return
elif resp.status_code == 404:
print("Using datamesh API version 0")
self._is_v1 = False
self._gateway = self._gateway or f"{self._proto}://gateway.{self._host}"
return
raise DatameshConnectError(
f"Failed to reach datamesh: {resp.status_code}-{resp.text}"
)
except Exception as e:
warnings.warn(f"Failed to reach datamesh gateway at {_gateway}: {e}")
warnings.warn("Assuming datamesh API version 1")
self._gateway = _gateway
warnings.warn(f"Failed to reach datamesh gateway at {self._gateway}: {e}")

def _validate_response(self, resp):
if resp.status_code >= 400:
Expand Down
28 changes: 1 addition & 27 deletions src/oceanum/datamesh/session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pydantic import BaseModel
from typing import Optional
from datetime import datetime, timedelta
from datetime import datetime
from .exceptions import DatameshConnectError, DatameshSessionError
from .utils import retried_request, HTTPSession
import atexit
Expand Down Expand Up @@ -39,21 +39,6 @@ def acquire(
the connection session duration if set or 3600 (1 hour)
"""

# Back-compatibility with beta version (returning dummy session object)
if not connection._is_v1:
session = cls(
id="dummy_session",
user="dummy_user",
creation_time=datetime.now(),
end_time=datetime.now()
+ timedelta(seconds=connection._session_params.get("duration", 3600)),
write=False,
verified=False,
)
session._connection = connection
atexit.register(session.close)
return session
# v1
try:
headers = {
"Cache-Control": "no-store"
Expand Down Expand Up @@ -120,7 +105,6 @@ def from_proxy(
session = cls(**res.json())
session._connection = lambda: None
session._connection._gateway = os.environ["DATAMESH_ZARR_PROXY"]
session._connection._is_v1 = True
session._connection.http_session = http_session
atexit.register(session.close)
return session
Expand All @@ -142,12 +126,6 @@ def from_session_id(cls, connection, session_id: str):
Session id to acquire.
"""

# Back-compatibility with beta version (returning dummy session object)
if not connection._is_v1:
raise DatameshSessionError(
"Cannot acquire session from id when using datamesh v0"
)
# v1
try:
res = retried_request(
f"{connection._gateway}/session/{session_id}",
Expand All @@ -171,10 +149,6 @@ def add_header(self, headers: dict):
return {**headers, **self.header}

def close(self, finalise_write: bool = False):
# Back-compatibility with beta version (ignoring)
if not self._connection._is_v1:
return
# datamesh v1
try:
atexit.unregister(self.close)
except:
Expand Down
5 changes: 2 additions & 3 deletions src/oceanum/datamesh/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ def __init__(
self.datasource = datasource
self.session = session
self.method = method
self._is_v1 = connection._is_v1
self.api = api if connection._is_v1 else "zarr"
self.api = api
self.headers = {**connection._auth_headers}
self.headers = session.add_header(self.headers)
if nocache:
Expand Down Expand Up @@ -142,7 +141,7 @@ def __contains__(self, item):
encoded_item = urllib.parse.quote(item, safe="/")
resp = self._retried_request(
f"{self._proxy}/{self.datasource}/{encoded_item}",
method="HEAD" if self._is_v1 else "GET",
method="HEAD",
connect_timeout=self.connect_timeout,
read_timeout=self.read_timeout,
)
Expand Down