-
Notifications
You must be signed in to change notification settings - Fork 4
Support Client TLS #43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,13 +19,12 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Collections.Generic; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.IO; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Linq; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Net.Sockets; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Numerics; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Threading; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Threading.Tasks; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Security.Cryptography.X509Certificates; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using Apache.IoTDB.DataStructure; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using Microsoft.Extensions.Configuration; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using Microsoft.Extensions.Logging; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using Thrift; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using Thrift.Protocol; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -47,6 +46,8 @@ public partial class SessionPool : IDisposable | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly List<TEndPoint> _endPoints = new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly string _host; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly int _port; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly bool _useSsl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly string _certificatePath; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly int _fetchSize; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// _timeout is the amount of time a Session will wait for a send operation to complete successfully. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -86,10 +87,10 @@ public SessionPool(string host, int port) : this(host, port, "root", "root", 102 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public SessionPool(string host, int port, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| : this(host, port, username, password, fetchSize, zoneId, poolSize, enableRpcCompression, timeout, IoTDBConstant.TREE_SQL_DIALECT, "") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| : this(host, port, username, password, fetchSize, zoneId, poolSize, enableRpcCompression, timeout, false, null, IoTDBConstant.TREE_SQL_DIALECT, "") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| protected internal SessionPool(string host, int port, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout, string sqlDialect, string database) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| protected internal SessionPool(string host, int port, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout, bool useSsl, string certificatePath, string sqlDialect, string database) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _host = host; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _port = port; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -101,6 +102,8 @@ protected internal SessionPool(string host, int port, string username, string pa | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _poolSize = poolSize; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _enableRpcCompression = enableRpcCompression; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _timeout = timeout; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _useSsl = useSsl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _certificatePath = certificatePath; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _sqlDialect = sqlDialect; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _database = database; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -126,11 +129,11 @@ public SessionPool(List<string> nodeUrls, string username, string password, int | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| : this(nodeUrls, username, password, fetchSize, zoneId, poolSize, enableRpcCompression, timeout, IoTDBConstant.TREE_SQL_DIALECT, "") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| : this(nodeUrls, username, password, fetchSize, zoneId, poolSize, enableRpcCompression, timeout, false, null, IoTDBConstant.TREE_SQL_DIALECT, "") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| protected internal SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout, string sqlDialect, string database) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| protected internal SessionPool(List<string> nodeUrls, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout, bool useSsl, string certificatePath, string sqlDialect, string database) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (nodeUrls.Count == 0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -146,6 +149,8 @@ protected internal SessionPool(List<string> nodeUrls, string username, string pa | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _poolSize = poolSize; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _enableRpcCompression = enableRpcCompression; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _timeout = timeout; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _useSsl = useSsl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _certificatePath = certificatePath; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _sqlDialect = sqlDialect; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _database = database; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -241,7 +246,7 @@ public async Task Open(CancellationToken cancellationToken = default) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _clients.Add(await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _clients.Add(await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, _useSsl, _certificatePath, _sqlDialect, _database, cancellationToken)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| catch (Exception e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -264,7 +269,7 @@ public async Task Open(CancellationToken cancellationToken = default) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var endPoint = _endPoints[endPointIndex]; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var client = await CreateAndOpen(endPoint.Ip, endPoint.Port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var client = await CreateAndOpen(endPoint.Ip, endPoint.Port, _enableRpcCompression, _timeout, _useSsl, _certificatePath, _sqlDialect, _database, cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _clients.Add(client); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isConnected = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| startIndex = (endPointIndex + 1) % _endPoints.Count; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -303,7 +308,7 @@ public async Task<Client> Reconnect(Client originalClient = null, CancellationTo | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var client = await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var client = await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, _useSsl, _certificatePath, _sqlDialect, _database, cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return client; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| catch (Exception e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -330,7 +335,7 @@ public async Task<Client> Reconnect(Client originalClient = null, CancellationTo | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| int j = (startIndex + i) % _endPoints.Count; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, _sqlDialect, _database, cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, _useSsl, _certificatePath, _sqlDialect, _database, cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return client; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| catch (Exception e) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -423,12 +428,14 @@ public async Task<string> GetTimeZone() | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private async Task<Client> CreateAndOpen(string host, int port, bool enableRpcCompression, int timeout, string sqlDialect, string database, CancellationToken cancellationToken = default) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private async Task<Client> CreateAndOpen(string host, int port, bool enableRpcCompression, int timeout, bool useSsl, string cert, string sqlDialect, string database, CancellationToken cancellationToken = default) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var tcpClient = new TcpClient(host, port); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tcpClient.SendTimeout = timeout; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tcpClient.ReceiveTimeout = timeout; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var transport = new TFramedTransport(new TSocketTransport(tcpClient, null)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TTransport socket = useSsl ? | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| new TTlsSocketTransport(host, port, null, timeout, new X509Certificate2(File.ReadAllBytes(cert))) : | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| new TSocketTransport(host, port, null, timeout); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+434
to
+436
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+434
to
+437
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TTransport socket = useSsl ? | |
| new TTlsSocketTransport(host, port, null, timeout, new X509Certificate2(File.ReadAllBytes(cert))) : | |
| new TSocketTransport(host, port, null, timeout); | |
| TTransport socket; | |
| if (useSsl) | |
| { | |
| if (string.IsNullOrWhiteSpace(cert)) | |
| { | |
| throw new ArgumentException("Certificate path must be provided when SSL is enabled.", nameof(cert)); | |
| } | |
| if (!File.Exists(cert)) | |
| { | |
| throw new FileNotFoundException($"Certificate file not found at path '{cert}'.", cert); | |
| } | |
| X509Certificate2 certificate; | |
| try | |
| { | |
| var certificateBytes = File.ReadAllBytes(cert); | |
| certificate = new X509Certificate2(certificateBytes); | |
| } | |
| catch (Exception ex) | |
| { | |
| throw new InvalidOperationException($"Failed to load SSL certificate from path '{cert}'.", ex); | |
| } | |
| socket = new TTlsSocketTransport(host, port, null, timeout, certificate); | |
| } | |
| else | |
| { | |
| socket = new TSocketTransport(host, port, null, timeout); | |
| } |
Uh oh!
There was an error while loading. Please reload this page.