Skip to content

Commit 21f0768

Browse files
leoyvensJannis
authored andcommitted
provider: startup failure does not alter subgraph state
1 parent 3b7d462 commit 21f0768

2 files changed

Lines changed: 33 additions & 92 deletions

File tree

core/src/subgraph/provider.rs

Lines changed: 33 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,28 @@ use std::sync::Mutex;
55
use async_trait::async_trait;
66
use futures01::sync::mpsc::{channel, Receiver, Sender};
77

8-
use graph::data::subgraph::schema::SubgraphError;
98
use graph::prelude::{
10-
DataSourceLoader as _, GraphQlRunner,
11-
SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, *,
9+
DataSourceLoader as _, SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, *,
1210
};
1311

1412
use crate::subgraph::registrar::IPFS_SUBGRAPH_LOADING_TIMEOUT;
1513
use crate::DataSourceLoader;
1614

17-
pub struct SubgraphAssignmentProvider<L, Q, S> {
15+
pub struct SubgraphAssignmentProvider<L, S> {
1816
logger_factory: LoggerFactory,
1917
event_stream: Option<Receiver<SubgraphAssignmentProviderEvent>>,
2018
event_sink: Sender<SubgraphAssignmentProviderEvent>,
2119
resolver: Arc<L>,
2220
subgraphs_running: Arc<Mutex<HashSet<SubgraphDeploymentId>>>,
2321
store: Arc<S>,
24-
graphql_runner: Arc<Q>,
2522
}
2623

27-
impl<L, Q, S> SubgraphAssignmentProvider<L, Q, S>
24+
impl<L, S> SubgraphAssignmentProvider<L, S>
2825
where
2926
L: LinkResolver + Clone,
30-
Q: GraphQlRunner,
3127
S: Store,
3228
{
33-
pub fn new(
34-
logger_factory: &LoggerFactory,
35-
resolver: Arc<L>,
36-
store: Arc<S>,
37-
graphql_runner: Arc<Q>,
38-
) -> Self {
29+
pub fn new(logger_factory: &LoggerFactory, resolver: Arc<L>, store: Arc<S>) -> Self {
3930
let (event_sink, event_stream) = channel(100);
4031

4132
let logger = logger_factory.component_logger("SubgraphAssignmentProvider", None);
@@ -55,110 +46,63 @@ where
5546
),
5647
subgraphs_running: Arc::new(Mutex::new(HashSet::new())),
5748
store,
58-
graphql_runner,
59-
}
60-
}
61-
62-
/// Clones but forcing receivers to `None`.
63-
fn clone_no_receivers(&self) -> Self {
64-
SubgraphAssignmentProvider {
65-
event_stream: None,
66-
event_sink: self.event_sink.clone(),
67-
resolver: self.resolver.clone(),
68-
subgraphs_running: self.subgraphs_running.clone(),
69-
store: self.store.clone(),
70-
graphql_runner: self.graphql_runner.clone(),
71-
logger_factory: self.logger_factory.clone(),
7249
}
7350
}
7451
}
7552

7653
#[async_trait]
77-
impl<L, Q, S> SubgraphAssignmentProviderTrait for SubgraphAssignmentProvider<L, Q, S>
54+
impl<L, S> SubgraphAssignmentProviderTrait for SubgraphAssignmentProvider<L, S>
7855
where
7956
L: LinkResolver + Clone,
80-
Q: GraphQlRunner,
8157
S: Store,
8258
{
8359
async fn start(
8460
&self,
8561
id: &SubgraphDeploymentId,
8662
) -> Result<(), SubgraphAssignmentProviderError> {
87-
let self_clone = self.clone_no_receivers();
88-
let store = self.store.clone();
89-
let subgraph_id = id.clone();
90-
91-
let loader = Arc::new(DataSourceLoader::new(store.clone()));
63+
let loader = Arc::new(DataSourceLoader::new(self.store.clone()));
9264

9365
let link = format!("/ipfs/{}", id);
9466

9567
let logger = self.logger_factory.subgraph_logger(id);
96-
let logger_for_resolve = logger.clone();
97-
let logger_for_err = logger.clone();
9868

9969
info!(logger, "Resolve subgraph files using IPFS");
10070

101-
if let Err(e) = async move {
102-
let mut subgraph = SubgraphManifest::resolve(
103-
Link { link },
104-
self.resolver.deref(),
105-
&logger_for_resolve,
106-
)
71+
let mut subgraph = SubgraphManifest::resolve(Link { link }, self.resolver.deref(), &logger)
10772
.map_err(SubgraphAssignmentProviderError::ResolveError)
10873
.await?;
10974

110-
let data_sources = loader
111-
.load_dynamic_data_sources(id.clone(), logger.clone(), subgraph.clone())
112-
.map_err(SubgraphAssignmentProviderError::DynamicDataSourcesError)
113-
.await?;
114-
115-
info!(logger, "Successfully resolved subgraph files using IPFS");
75+
let data_sources = loader
76+
.load_dynamic_data_sources(id.clone(), logger.clone(), subgraph.clone())
77+
.map_err(SubgraphAssignmentProviderError::DynamicDataSourcesError)
78+
.await?;
11679

117-
// Add dynamic data sources to the subgraph
118-
subgraph.data_sources.extend(data_sources);
80+
info!(logger, "Successfully resolved subgraph files using IPFS");
11981

120-
// If subgraph ID already in set
121-
if !self_clone
122-
.subgraphs_running
123-
.lock()
124-
.unwrap()
125-
.insert(subgraph.id.clone())
126-
{
127-
info!(logger, "Subgraph deployment is already running");
82+
// Add dynamic data sources to the subgraph
83+
subgraph.data_sources.extend(data_sources);
12884

129-
return Err(SubgraphAssignmentProviderError::AlreadyRunning(subgraph.id));
130-
}
85+
// If subgraph ID already in set
86+
if !self
87+
.subgraphs_running
88+
.lock()
89+
.unwrap()
90+
.insert(subgraph.id.clone())
91+
{
92+
info!(logger, "Subgraph deployment is already running");
13193

132-
// Send events to trigger subgraph processing
133-
if let Err(e) = self_clone
134-
.event_sink
135-
.clone()
136-
.send(SubgraphAssignmentProviderEvent::SubgraphStart(subgraph))
137-
.compat()
138-
.await
139-
{
140-
panic!("failed to forward subgraph: {}", e);
141-
}
142-
Ok(())
94+
return Err(SubgraphAssignmentProviderError::AlreadyRunning(subgraph.id));
14395
}
144-
.await
96+
97+
// Send events to trigger subgraph processing
98+
if let Err(e) = self
99+
.event_sink
100+
.clone()
101+
.send(SubgraphAssignmentProviderEvent::SubgraphStart(subgraph))
102+
.compat()
103+
.await
145104
{
146-
error!(
147-
logger_for_err,
148-
"Failed to resolve subgraph files using IPFS";
149-
"error" => format!("{}", e)
150-
);
151-
152-
let error = SubgraphError {
153-
subgraph_id: subgraph_id.clone(),
154-
message: e.to_string(),
155-
block_ptr: None,
156-
handler: None,
157-
deterministic: false,
158-
};
159-
160-
let _ignore_error = store.fail_subgraph(subgraph_id, error).await;
161-
return Err(e);
105+
panic!("failed to forward subgraph: {}", e);
162106
}
163107

164108
Ok(())
@@ -181,9 +125,7 @@ where
181125
}
182126
}
183127

184-
impl<L, Q, S> EventProducer<SubgraphAssignmentProviderEvent>
185-
for SubgraphAssignmentProvider<L, Q, S>
186-
{
128+
impl<L, S> EventProducer<SubgraphAssignmentProviderEvent> for SubgraphAssignmentProvider<L, S> {
187129
fn take_event_stream(
188130
&mut self,
189131
) -> Option<Box<dyn Stream<Item = SubgraphAssignmentProviderEvent, Error = ()> + Send>> {

node/src/main.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,6 @@ async fn main() {
361361
&logger_factory,
362362
link_resolver.clone(),
363363
store_builder.store(),
364-
graphql_runner.clone(),
365364
);
366365

367366
// Forward subgraph events from the subgraph provider to the subgraph instance manager

0 commit comments

Comments
 (0)