From 94a7b678cfd7abd7c20c48bfbbc866de086b6762 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 15 Jul 2024 10:46:58 -0700 Subject: [PATCH 1/5] Updating go version --- go.mod | 11 ++++++++--- go.sum | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 2c48b4c3..84bf78e2 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/bmeg/grip -go 1.18 +go 1.21 + +toolchain go1.21.5 require ( github.com/Shopify/sarama v1.22.1 @@ -41,7 +43,7 @@ require ( github.com/sirupsen/logrus v1.9.0 github.com/spf13/cast v1.3.0 github.com/spf13/cobra v1.0.1-0.20201006035406-b97b5ead31f7 - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.9.0 github.com/syndtr/goleveldb v1.0.0 go.mongodb.org/mongo-driver v1.12.0 golang.org/x/crypto v0.18.0 @@ -110,6 +112,9 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/oklog/run v1.0.0 // indirect + github.com/opensearch-project/opensearch-go v1.1.0 // indirect + github.com/opensearch-project/opensearch-go/v2 v2.3.0 // indirect + github.com/opensearch-project/opensearch-go/v4 v4.0.0 // indirect github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -126,7 +131,7 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect go.opencensus.io v0.24.0 // indirect - golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect + golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/oauth2 v0.16.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/term v0.16.0 // indirect diff --git a/go.sum b/go.sum index b382fe13..702e6a05 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,20 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aws/aws-sdk-go v1.22.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= +github.com/aws/aws-sdk-go v1.42.27/go.mod h1:OGr6lGMAKGlG9CVrYnWYDKIyb829c6EVBRjxqjmPepc= +github.com/aws/aws-sdk-go v1.44.263/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4= +github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.0/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -297,6 +311,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -545,6 +560,12 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/opensearch-project/opensearch-go v1.1.0 h1:eG5sh3843bbU1itPRjA9QXbxcg8LaZ+DjEzQH9aLN3M= +github.com/opensearch-project/opensearch-go v1.1.0/go.mod h1:+6/XHCuTH+fwsMJikZEWsucZ4eZMma3zNSeLrTtVGbo= +github.com/opensearch-project/opensearch-go/v2 v2.3.0 h1:nQIEMr+A92CkhHrZgUhcfsrZjibvB3APXf2a1VwCmMQ= +github.com/opensearch-project/opensearch-go/v2 v2.3.0/go.mod h1:8LDr9FCgUTVoT+5ESjc2+iaZuldqE+23Iq0r1XeNue8= +github.com/opensearch-project/opensearch-go/v4 v4.0.0 h1:Nrh30HhaknKcaPcIzlqA6Jf0CBgWP5XUaSp0HMsRBlA= +github.com/opensearch-project/opensearch-go/v4 v4.0.0/go.mod h1:amlBgHgAX9AwwW50eOuzYa5n/8aD18LoWO8eDLoe8KQ= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs= github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= @@ -657,6 +678,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= @@ -742,6 +765,8 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc= golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -806,7 +831,10 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -892,10 +920,14 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -906,6 +938,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= From 31cc38f79cf979d617dc238d1bdb3a70d27fb2fd Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 15 Jul 2024 10:49:13 -0700 Subject: [PATCH 2/5] Adding the start of an opensearch job driver --- jobstorage/open_search.go | 150 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 jobstorage/open_search.go diff --git a/jobstorage/open_search.go b/jobstorage/open_search.go new file mode 100644 index 00000000..93ee6cc8 --- /dev/null +++ b/jobstorage/open_search.go @@ -0,0 +1,150 @@ +package jobstorage + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "path/filepath" + "time" + + "github.com/bmeg/grip/gripql" + "github.com/bmeg/grip/log" + opensearch "github.com/opensearch-project/opensearch-go/v4" + "github.com/opensearch-project/opensearch-go/v4/opensearchapi" +) + +type OpenSearchStorage struct { + client *opensearchapi.Client +} + +var OS_INDEX_LIST string = "gripql-job-tables" + +func NewOpenSearchStorage(addr string, username, password string) (JobStorage, error) { + client, err := opensearchapi.NewClient(opensearchapi.Config{ + Client: opensearch.Config{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Addresses: []string{addr}, + Username: username, + Password: password, + }, + }) + if err != nil { + return nil, err + } + + resp, err := client.Indices.Exists([]string{OS_INDEX_LIST}) + if err != nil { + return nil, err + } + if resp.StatusCode == 404 { + //Create the job list index if it doesn't exist + _, err := client.Indices.Create(context.Background(), opensearchapi.IndicesCreateReq{Index: OS_INDEX_LIST}) + if err != nil { + return nil, err + } + } + return &OpenSearchStorage{ + client, + }, nil +} + +func (os *OpenSearchStorage) List(graph string) (chan string, error) { + cout := make(chan string, 5) + go func() { + defer close(cout) + searchResp, err := os.client.Search( + context.Background(), + &opensearchapi.SearchReq{ + Indices: []string{OS_INDEX_LIST}, + Params: opensearchapi.SearchParams{}, + }, + ) + if err == nil { + for _, i := range searchResp.Hits.Hits { + d := map[string]string{} + json.Unmarshal(i.Fields, &d) + if x, ok := d["index"]; ok { + cout <- x + } + } + } + }() + + return cout, nil + +} + +func (os *OpenSearchStorage) Search(graph string, Query []*gripql.GraphStatement) (chan *gripql.JobStatus, error) { + +} + +func (os *OpenSearchStorage) Spool(graph string, stream *Stream) (string, error) { + + tableName := fmt.Sprintf("grip-table-%10d", rand.Int()) + + _, err := os.client.Indices.Create(context.Background(), opensearchapi.IndicesCreateReq{Index: tableName}) + if err != nil { + return "", err + } + + cs, _ := TraversalChecksum(stream.Query) + job := &Job{ + Status: gripql.JobStatus{Query: stream.Query, Id: tableName, Graph: graph, Timestamp: time.Now().Format(time.RFC3339)}, + DataType: stream.DataType, + MarkTypes: stream.MarkTypes, + StepChecksums: cs, + } + + //fs.jobs.Store(jobKey(graph, tableName), job) + + ctx := context.Background() + + tbStream := MarshalStream(stream.Pipe, 4) //TODO: make worker count configurable + go func() { + job.Status.State = gripql.JobState_RUNNING + log.Infof("Starting Job: %#v", job) + //TODO: this could probably be accelerated using bulk insert + for i := range tbStream { + os.client.Index(ctx, + opensearchapi.IndexReq{ + Index: tableName, + Body: bytes.NewReader(i), + }) + job.Status.Count += 1 + } + statusPath := filepath.Join(spoolDir, "status") + statusFile, err := os.Create(statusPath) + if err == nil { + defer statusFile.Close() + job.Status.State = gripql.JobState_COMPLETE + out, err := json.Marshal(job) + if err == nil { + statusFile.Write([]byte(fmt.Sprintf("%s\n", out))) + } + log.Infof("Job Done: %s (%d results)", jobName, job.Status.Count) + } else { + job.Status.State = gripql.JobState_ERROR + log.Infof("Job Error: %s %s", jobName, err) + } + }() + return jobName, nil + +} + +func (os *OpenSearchStorage) Stream(ctx context.Context, graph, id string) (*Stream, error) { + +} + +func (os *OpenSearchStorage) Delete(graph, id string) error { + +} + +func (os *OpenSearchStorage) Status(graph, id string) (*gripql.JobStatus, error) { + +} From eb557020d59e5ee2952c2dcc237ae244c0b6c8ca Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Sat, 27 Jul 2024 22:26:09 -0700 Subject: [PATCH 3/5] Changing how the job drivers are configured --- config/server_config.go | 3 +- jobstorage/config.go | 12 +++++++ jobstorage/{storage.go => fs_storage.go} | 34 +++--------------- jobstorage/interface.go | 31 ++++++++++++++++ jobstorage/open_search.go | 46 ++++++++++++++++++++---- server/server.go | 36 +++++++++++++------ test/pebble.yml | 4 +++ 7 files changed, 118 insertions(+), 48 deletions(-) create mode 100644 jobstorage/config.go rename jobstorage/{storage.go => fs_storage.go} (89%) create mode 100644 jobstorage/interface.go diff --git a/config/server_config.go b/config/server_config.go index e77da04e..dddc3b20 100644 --- a/config/server_config.go +++ b/config/server_config.go @@ -4,6 +4,7 @@ import ( "time" "github.com/bmeg/grip/accounts" + "github.com/bmeg/grip/jobstorage" "github.com/bmeg/grip/util" "github.com/bmeg/grip/util/duration" ) @@ -18,7 +19,7 @@ type ServerConfig struct { ReadOnly bool EnablePlugins bool PluginDir string - NoJobs bool + JobsDriver *jobstorage.JobsConfig Accounts accounts.Config DisableHTTPCache bool // Should the server periodically build the graph schemas? diff --git a/jobstorage/config.go b/jobstorage/config.go new file mode 100644 index 00000000..d9c917ae --- /dev/null +++ b/jobstorage/config.go @@ -0,0 +1,12 @@ +package jobstorage + +type OpenSearchConfig struct { + Address string + Username string + Password string +} + +type JobsConfig struct { + File string + OpenSearch *OpenSearchConfig +} diff --git a/jobstorage/storage.go b/jobstorage/fs_storage.go similarity index 89% rename from jobstorage/storage.go rename to jobstorage/fs_storage.go index 3d263077..fa248ed3 100644 --- a/jobstorage/storage.go +++ b/jobstorage/fs_storage.go @@ -11,40 +11,21 @@ import ( "sync" "time" - "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/log" "github.com/kennygrant/sanitize" ) -type Stream struct { - Pipe gdbi.InPipe - DataType gdbi.DataType - MarkTypes map[string]gdbi.DataType - Query []*gripql.GraphStatement -} - -type JobStorage interface { - List(graph string) (chan string, error) - Search(graph string, Query []*gripql.GraphStatement) (chan *gripql.JobStatus, error) - Spool(graph string, stream *Stream) (string, error) - Stream(ctx context.Context, graph, id string) (*Stream, error) - Delete(graph, id string) error - Status(graph, id string) (*gripql.JobStatus, error) -} - -type Job struct { - Status gripql.JobStatus - DataType gdbi.DataType - MarkTypes map[string]gdbi.DataType - StepChecksums []string -} - func jobKey(graph, job string) string { return fmt.Sprintf("%s/%s", sanitize.Name(graph), sanitize.Name(job)) } +type FSResults struct { + BaseDir string + jobs *sync.Map +} + func NewFSJobStorage(path string) *FSResults { out := FSResults{path, &sync.Map{}} if _, err := os.Stat(path); os.IsNotExist(err) { @@ -79,11 +60,6 @@ func NewFSJobStorage(path string) *FSResults { return &out } -type FSResults struct { - BaseDir string - jobs *sync.Map -} - func (fs *FSResults) List(graph string) (chan string, error) { out := make(chan string) go func() { diff --git a/jobstorage/interface.go b/jobstorage/interface.go new file mode 100644 index 00000000..c6938a8d --- /dev/null +++ b/jobstorage/interface.go @@ -0,0 +1,31 @@ +package jobstorage + +import ( + "context" + + "github.com/bmeg/grip/gdbi" + "github.com/bmeg/grip/gripql" +) + +type Stream struct { + Pipe gdbi.InPipe + DataType gdbi.DataType + MarkTypes map[string]gdbi.DataType + Query []*gripql.GraphStatement +} + +type JobStorage interface { + List(graph string) (chan string, error) + Search(graph string, Query []*gripql.GraphStatement) (chan *gripql.JobStatus, error) + Spool(graph string, stream *Stream) (string, error) + Stream(ctx context.Context, graph, id string) (*Stream, error) + Delete(graph, id string) error + Status(graph, id string) (*gripql.JobStatus, error) +} + +type Job struct { + Status gripql.JobStatus + DataType gdbi.DataType + MarkTypes map[string]gdbi.DataType + StepChecksums []string +} diff --git a/jobstorage/open_search.go b/jobstorage/open_search.go index 31b406d8..410f6bcf 100644 --- a/jobstorage/open_search.go +++ b/jobstorage/open_search.go @@ -1,23 +1,27 @@ package jobstorage import ( + "bytes" "context" "crypto/tls" "encoding/json" "fmt" "math/rand" "net/http" + "time" "github.com/bmeg/grip/gripql" + "github.com/bmeg/grip/log" opensearch "github.com/opensearch-project/opensearch-go/v4" "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + "github.com/opensearch-project/opensearch-go/v4/opensearchutil" ) type OpenSearchStorage struct { client *opensearchapi.Client } -var OS_INDEX_LIST string = "gripql-job-tables" +var OS_INDEX_LIST string = "gripql-job-status" func NewOpenSearchStorage(addr string, username, password string) (JobStorage, error) { client, err := opensearchapi.NewClient(opensearchapi.Config{ @@ -71,20 +75,48 @@ func (os *OpenSearchStorage) List(graph string) (chan string, error) { } } }() - return cout, nil - } func (os *OpenSearchStorage) Search(graph string, Query []*gripql.GraphStatement) (chan *gripql.JobStatus, error) { return nil, nil } -func (os *OpenSearchStorage) Spool(graph string, stream *Stream) (string, error) { - tableName := fmt.Sprintf("grip-table-%10d", rand.Int()) - - return tableName, nil +func (os *OpenSearchStorage) putJob(id string, job *Job) error { + _, err := os.client.Index(context.Background(), opensearchapi.IndexReq{ + Index: OS_INDEX_LIST, + DocumentID: id, + Body: opensearchutil.NewJSONReader(job), + }) + return err +} +func (os *OpenSearchStorage) Spool(graph string, stream *Stream) (string, error) { + jobName := fmt.Sprintf("grip-%10d", rand.Int()) + + cs, _ := TraversalChecksum(stream.Query) + job := &Job{ + Status: gripql.JobStatus{Query: stream.Query, Id: jobName, Graph: graph, Timestamp: time.Now().Format(time.RFC3339)}, + DataType: stream.DataType, + MarkTypes: stream.MarkTypes, + StepChecksums: cs, + } + jobID := jobKey(graph, jobName) + os.putJob(jobID, job) + tbStream := MarshalStream(stream.Pipe, 4) //TODO: make worker count configurable + go func() { + job.Status.State = gripql.JobState_RUNNING + log.Infof("Starting Job: %#v", job) + for i := range tbStream { + os.client.Index(context.Background(), opensearchapi.IndexReq{ + Index: jobID, + Body: bytes.NewReader(i)}) + job.Status.Count += 1 + } + job.Status.State = gripql.JobState_COMPLETE + os.putJob(jobID, job) + }() + return jobName, nil } func (os *OpenSearchStorage) Stream(ctx context.Context, graph, id string) (*Stream, error) { diff --git a/server/server.go b/server/server.go index 2ffb33f2..f8eee347 100644 --- a/server/server.go +++ b/server/server.go @@ -112,7 +112,7 @@ func NewGripServer(conf *config.Config, baseDir string, drivers map[string]gdbi. if _, ok := gdbs[conf.Default]; !ok { return nil, fmt.Errorf("default driver '%s' does not exist", conf.Default) } - fmt.Printf("Default graph driver: %s\n", conf.Default) + log.Info("Default graph driver", "Driver", conf.Default) return server, nil } @@ -361,19 +361,33 @@ func (server *GripServer) Serve(pctx context.Context) error { } } - if !server.conf.Server.NoJobs { - gripql.RegisterJobServer(grpcServer, server) - err = gripql.RegisterJobHandlerClient(ctx, grpcMux, - gripql.NewJobDirectClient( - server, - gripql.DirectUnaryInterceptor(unaryAuthInt), - gripql.DirectStreamInterceptor(streamAuthInt), - )) + if server.conf.Server.JobsDriver != nil { + if server.conf.Server.JobsDriver.File != "" { + jobDir := filepath.Join(server.conf.Server.WorkDir, "jobs") + server.jStorage = jobstorage.NewFSJobStorage(jobDir) + } else if server.conf.Server.JobsDriver.OpenSearch != nil { + server.jStorage, err = jobstorage.NewOpenSearchStorage(server.conf.Server.JobsDriver.OpenSearch.Address, + server.conf.Server.JobsDriver.OpenSearch.Username, + server.conf.Server.JobsDriver.OpenSearch.Password) + } if err != nil { return fmt.Errorf("registering job endpoint: %v", err) } - jobDir := filepath.Join(server.conf.Server.WorkDir, "jobs") - server.jStorage = jobstorage.NewFSJobStorage(jobDir) + + if server.jStorage != nil { + gripql.RegisterJobServer(grpcServer, server) + err = gripql.RegisterJobHandlerClient(ctx, grpcMux, + gripql.NewJobDirectClient( + server, + gripql.DirectUnaryInterceptor(unaryAuthInt), + gripql.DirectStreamInterceptor(streamAuthInt), + )) + } + if err != nil { + return fmt.Errorf("registering job endpoint: %v", err) + } + } else { + log.Info("Jobs driver not configured") } if server.conf.Server.EnablePlugins { diff --git a/test/pebble.yml b/test/pebble.yml index fc185c22..e2f431b6 100644 --- a/test/pebble.yml +++ b/test/pebble.yml @@ -3,3 +3,7 @@ Default: pebble Drivers: pebble: Pebble: grip-pebble.db + +Server: + JobsDriver: + File: jobs \ No newline at end of file From 2acd71e318da342c52b7f946f1f15524e2031643 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 29 Jul 2024 14:10:52 -0700 Subject: [PATCH 4/5] Connecting OpenSearch job engine and fixing some unit test config files --- jobstorage/fs_storage.go | 22 ++++++--- jobstorage/interface.go | 7 --- jobstorage/open_search.go | 58 +++++++++++++++------- test/badger.yml | 4 ++ test/bolt.yml | 4 ++ test/elastic.yml | 4 ++ test/mongo-core-processor.yml | 4 ++ test/mongo.yml | 4 ++ test/open-search/README.md | 24 +++++++++ test/open-search/docker-compose.yml | 68 ++++++++++++++++++++++++++ test/open-search/pebble-opensearch.yml | 12 +++++ test/psql.yml | 4 ++ test/rocks.yml | 4 ++ 13 files changed, 188 insertions(+), 31 deletions(-) create mode 100644 test/open-search/README.md create mode 100644 test/open-search/docker-compose.yml create mode 100644 test/open-search/pebble-opensearch.yml diff --git a/jobstorage/fs_storage.go b/jobstorage/fs_storage.go index fa248ed3..f78e9ec1 100644 --- a/jobstorage/fs_storage.go +++ b/jobstorage/fs_storage.go @@ -11,12 +11,20 @@ import ( "sync" "time" + "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/log" "github.com/kennygrant/sanitize" ) +type FileJob struct { + Status gripql.JobStatus + DataType gdbi.DataType + MarkTypes map[string]gdbi.DataType + StepChecksums []string +} + func jobKey(graph, job string) string { return fmt.Sprintf("%s/%s", sanitize.Name(graph), sanitize.Name(job)) } @@ -42,7 +50,7 @@ func NewFSJobStorage(path string) *FSResults { if err == nil { sData, err := io.ReadAll(file) if err == nil { - job := Job{} + job := FileJob{} err := json.Unmarshal(sData, &job) if err == nil { log.Infof("Found job %s %s", graphName, jobName) @@ -65,7 +73,7 @@ func (fs *FSResults) List(graph string) (chan string, error) { go func() { defer close(out) fs.jobs.Range(func(key, value interface{}) bool { - vJob := value.(*Job) + vJob := value.(*FileJob) if vJob.Status.Graph == graph { out <- vJob.Status.Id } @@ -81,7 +89,7 @@ func (fs *FSResults) Search(graph string, Query []*gripql.GraphStatement) (chan go func() { defer close(out) fs.jobs.Range(func(key, value interface{}) bool { - vJob := value.(*Job) + vJob := value.(*FileJob) if vJob.Status.Graph == graph { if JobMatch(qcs, vJob.StepChecksums) { out <- &vJob.Status @@ -113,7 +121,7 @@ func (fs *FSResults) Spool(graph string, stream *Stream) (string, error) { } cs, _ := TraversalChecksum(stream.Query) - job := &Job{ + job := &FileJob{ Status: gripql.JobStatus{Query: stream.Query, Id: jobName, Graph: graph, Timestamp: time.Now().Format(time.RFC3339)}, DataType: stream.DataType, MarkTypes: stream.MarkTypes, @@ -150,7 +158,7 @@ func (fs *FSResults) Spool(graph string, stream *Stream) (string, error) { func (fs *FSResults) Stream(ctx context.Context, graph, id string) (*Stream, error) { if v, ok := fs.jobs.Load(jobKey(graph, id)); ok { - vJob := v.(*Job) + vJob := v.(*FileJob) if vJob.Status.State == gripql.JobState_COMPLETE { resultFile := filepath.Join(fs.BaseDir, sanitize.Name(graph), sanitize.Name(id), "results") results, err := os.Open(resultFile) @@ -191,7 +199,7 @@ func (fs *FSResults) Stream(ctx context.Context, graph, id string) (*Stream, err func (fs *FSResults) Delete(graph, id string) error { if v, ok := fs.jobs.Load(jobKey(graph, id)); ok { - vJob := v.(*Job) + vJob := v.(*FileJob) if vJob.Status.State == gripql.JobState_RUNNING || vJob.Status.State == gripql.JobState_QUEUED { return fmt.Errorf("Job cancel not yet implemented") } @@ -204,7 +212,7 @@ func (fs *FSResults) Delete(graph, id string) error { func (fs *FSResults) Status(graph, id string) (*gripql.JobStatus, error) { if v, ok := fs.jobs.Load(jobKey(graph, id)); ok { - vJob := v.(*Job) + vJob := v.(*FileJob) a := vJob.Status return &a, nil } diff --git a/jobstorage/interface.go b/jobstorage/interface.go index c6938a8d..4cb8fd66 100644 --- a/jobstorage/interface.go +++ b/jobstorage/interface.go @@ -22,10 +22,3 @@ type JobStorage interface { Delete(graph, id string) error Status(graph, id string) (*gripql.JobStatus, error) } - -type Job struct { - Status gripql.JobStatus - DataType gdbi.DataType - MarkTypes map[string]gdbi.DataType - StepChecksums []string -} diff --git a/jobstorage/open_search.go b/jobstorage/open_search.go index 410f6bcf..827961c8 100644 --- a/jobstorage/open_search.go +++ b/jobstorage/open_search.go @@ -10,6 +10,7 @@ import ( "net/http" "time" + "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/log" opensearch "github.com/opensearch-project/opensearch-go/v4" @@ -23,7 +24,17 @@ type OpenSearchStorage struct { var OS_INDEX_LIST string = "gripql-job-status" +type OpenSearchJob struct { + Index string + Graph string + Status gripql.JobStatus + DataType gdbi.DataType + MarkTypes map[string]gdbi.DataType + StepChecksums []string +} + func NewOpenSearchStorage(addr string, username, password string) (JobStorage, error) { + log.Infof("OpenSearch Job Storage: %s %s", addr, username) client, err := opensearchapi.NewClient(opensearchapi.Config{ Client: opensearch.Config{ Transport: &http.Transport{ @@ -40,17 +51,19 @@ func NewOpenSearchStorage(addr string, username, password string) (JobStorage, e resp, err := client.Indices.Exists(context.Background(), opensearchapi.IndicesExistsReq{Indices: []string{OS_INDEX_LIST}}) if err != nil { - return nil, err - } - if resp.StatusCode == 404 { - //Create the job list index if it doesn't exist - _, err := client.Indices.Create(context.Background(), opensearchapi.IndicesCreateReq{Index: OS_INDEX_LIST}) - if err != nil { + if resp.StatusCode == 404 { + //Create the job list index if it doesn't exist + _, err := client.Indices.Create(context.Background(), opensearchapi.IndicesCreateReq{Index: OS_INDEX_LIST}) + if err != nil { + return nil, err + } + } else { + log.Errorf("Contact error: %s %#v", err, resp) return nil, err } } return &OpenSearchStorage{ - client, + client: client, }, nil } @@ -62,17 +75,22 @@ func (os *OpenSearchStorage) List(graph string) (chan string, error) { context.Background(), &opensearchapi.SearchReq{ Indices: []string{OS_INDEX_LIST}, - Params: opensearchapi.SearchParams{}, + Params: opensearchapi.SearchParams{ + Query: fmt.Sprintf(`Graph: "%s"`, graph), + }, }, ) if err == nil { for _, i := range searchResp.Hits.Hits { - d := map[string]string{} - json.Unmarshal(i.Fields, &d) - if x, ok := d["index"]; ok { - cout <- x + d := map[string]any{} + json.Unmarshal(i.Source, &d) + //log.Infof("Search response: %#v", d) + if x, ok := d["Index"]; ok { + cout <- x.(string) } } + } else { + log.Errorf("JobList error: %s", err) } }() return cout, nil @@ -82,27 +100,33 @@ func (os *OpenSearchStorage) Search(graph string, Query []*gripql.GraphStatement return nil, nil } -func (os *OpenSearchStorage) putJob(id string, job *Job) error { - _, err := os.client.Index(context.Background(), opensearchapi.IndexReq{ +func (os *OpenSearchStorage) putJob(id string, job *OpenSearchJob) error { + resp, err := os.client.Index(context.Background(), opensearchapi.IndexReq{ Index: OS_INDEX_LIST, DocumentID: id, Body: opensearchutil.NewJSONReader(job), }) + log.Infof("Job Index resp: %#v %s", resp, err) return err } func (os *OpenSearchStorage) Spool(graph string, stream *Stream) (string, error) { jobName := fmt.Sprintf("grip-%10d", rand.Int()) + jobID := graph + "-" + jobName cs, _ := TraversalChecksum(stream.Query) - job := &Job{ + job := &OpenSearchJob{ + Index: jobID, + Graph: graph, Status: gripql.JobStatus{Query: stream.Query, Id: jobName, Graph: graph, Timestamp: time.Now().Format(time.RFC3339)}, DataType: stream.DataType, MarkTypes: stream.MarkTypes, StepChecksums: cs, } - jobID := jobKey(graph, jobName) - os.putJob(jobID, job) + err := os.putJob(jobID, job) + if err != nil { + return "", err + } tbStream := MarshalStream(stream.Pipe, 4) //TODO: make worker count configurable go func() { job.Status.State = gripql.JobState_RUNNING diff --git a/test/badger.yml b/test/badger.yml index b42729a5..2c458f34 100644 --- a/test/badger.yml +++ b/test/badger.yml @@ -3,3 +3,7 @@ Default: badger Drivers: badger: Badger: grip-badger.db + +Server: + JobsDriver: + File: jobs \ No newline at end of file diff --git a/test/bolt.yml b/test/bolt.yml index 34cd6d6d..7a6c47a6 100644 --- a/test/bolt.yml +++ b/test/bolt.yml @@ -3,3 +3,7 @@ Default: bolt Drivers: bolt: Bolt: grip-bolt.db + +Server: + JobsDriver: + File: jobs \ No newline at end of file diff --git a/test/elastic.yml b/test/elastic.yml index a639cdd8..1923beff 100644 --- a/test/elastic.yml +++ b/test/elastic.yml @@ -5,3 +5,7 @@ Drivers: Elasticsearch: URL: http://localhost:19200 Synchronous: true + +Server: + JobsDriver: + File: jobs \ No newline at end of file diff --git a/test/mongo-core-processor.yml b/test/mongo-core-processor.yml index c10e3973..468f6fa7 100644 --- a/test/mongo-core-processor.yml +++ b/test/mongo-core-processor.yml @@ -5,3 +5,7 @@ Drivers: MongoDB: URL: mongodb://localhost:27017 UseCorePipeline: true + +Server: + JobsDriver: + File: jobs \ No newline at end of file diff --git a/test/mongo.yml b/test/mongo.yml index 07c74f48..2c506006 100644 --- a/test/mongo.yml +++ b/test/mongo.yml @@ -4,3 +4,7 @@ Drivers: mongo: MongoDB: URL: mongodb://localhost:27017 + +Server: + JobsDriver: + File: jobs \ No newline at end of file diff --git a/test/open-search/README.md b/test/open-search/README.md new file mode 100644 index 00000000..0feb114f --- /dev/null +++ b/test/open-search/README.md @@ -0,0 +1,24 @@ + + +## Setup + +In this directory run server: +``` +export OPENSEARCH_INITIAL_ADMIN_PASSWORD=Test-Demo-42 +docker-compose up +``` + +Start up server: +``` +grip server --config pebble-opensearch.yml +``` + +Load example graph: +``` +grip load example-graph +``` + +Submit a job: +``` +grip job submit example-graph 'V().hasLabel("Movie").in_()' +``` \ No newline at end of file diff --git a/test/open-search/docker-compose.yml b/test/open-search/docker-compose.yml new file mode 100644 index 00000000..9a2f3500 --- /dev/null +++ b/test/open-search/docker-compose.yml @@ -0,0 +1,68 @@ +--- +version: '3' +services: + opensearch-node1: + image: opensearchproject/opensearch:latest + container_name: opensearch-node1 + environment: + - cluster.name=opensearch-cluster + - node.name=opensearch-node1 + - discovery.seed_hosts=opensearch-node1,opensearch-node2 + - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 + - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping + - OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m # minimum and maximum Java heap size, recommend setting both to 50% of system RAM + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD} # Sets the demo admin user password when using demo configuration, required for OpenSearch 2.12 and higher + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems + hard: 65536 + volumes: + - opensearch-data1:/usr/share/opensearch/data + ports: + - 9200:9200 + - 9600:9600 # required for Performance Analyzer + networks: + - opensearch-net + opensearch-node2: + image: opensearchproject/opensearch:latest + container_name: opensearch-node2 + environment: + - cluster.name=opensearch-cluster + - node.name=opensearch-node2 + - discovery.seed_hosts=opensearch-node1,opensearch-node2 + - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 + - bootstrap.memory_lock=true + - OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD} + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - opensearch-data2:/usr/share/opensearch/data + networks: + - opensearch-net + opensearch-dashboards: + image: opensearchproject/opensearch-dashboards:latest + container_name: opensearch-dashboards + ports: + - 5601:5601 + expose: + - '5601' + environment: + OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]' + networks: + - opensearch-net + +volumes: + opensearch-data1: + opensearch-data2: + +networks: + opensearch-net: diff --git a/test/open-search/pebble-opensearch.yml b/test/open-search/pebble-opensearch.yml new file mode 100644 index 00000000..0f9b5860 --- /dev/null +++ b/test/open-search/pebble-opensearch.yml @@ -0,0 +1,12 @@ +Default: pebble + +Drivers: + pebble: + Pebble: grip-pebble.db + +Server: + JobsDriver: + OpenSearch: + Username: admin + Password: Test-Demo-42 + Address: "https://localhost:9200" \ No newline at end of file diff --git a/test/psql.yml b/test/psql.yml index ee7134bd..5effc1f5 100644 --- a/test/psql.yml +++ b/test/psql.yml @@ -8,3 +8,7 @@ Drivers: User: postgres DBName: test SSLMode: disable + +Server: + JobsDriver: + File: jobs \ No newline at end of file diff --git a/test/rocks.yml b/test/rocks.yml index 198e4584..8b59dcc8 100644 --- a/test/rocks.yml +++ b/test/rocks.yml @@ -3,3 +3,7 @@ Default: rocks Drivers: rocks: Rocks: grip-rocks.db + +Server: + JobsDriver: + File: jobs \ No newline at end of file From 3f8b6fc50c97a795cb5925892de7d73abb367a60 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Tue, 30 Jul 2024 14:23:28 -0700 Subject: [PATCH 5/5] Updating testing config file --- gripper/test-graph/config.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gripper/test-graph/config.yaml b/gripper/test-graph/config.yaml index e18c6767..0a76e33a 100644 --- a/gripper/test-graph/config.yaml +++ b/gripper/test-graph/config.yaml @@ -12,3 +12,7 @@ Drivers: Sources: tableServer: localhost:50051 + +Server: + JobsDriver: + File: jobs \ No newline at end of file