-
Notifications
You must be signed in to change notification settings - Fork 440
TEZ-4007: Introduce AmExtensions and Zookeeper-based FrameworkServices #427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
| * Curator/Zookeeper impl of AMRegistryClient | ||
| */ | ||
| @InterfaceAudience.Public | ||
| public class ZkAMRegistryClient extends AMRegistryClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question. What is the duty of ZooKeeper here? When I read this patch a while ago, I presumed ZooKeeper would be used for two reasons. My memory might be wrong.
- Generate a unique application ID
- Service discovery for application masters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@okumin: both:
- ZkAMRegistry.generateNewId takes care of generating applicationId (backed by zookeeper)
- it also for service discovery (AM registers itself, client receives updates about that)
I'm happy to see that you're interested in the area!
please be aware that it's under refactor, I'll let you know, when it's ready
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Background: I wonder if it can be independent of ZooKeeper technically. The reliability and fault-tolerance of ZK are beneficial in many environments, but in other cases, high availability is not necessary. Trino does not provide fault-tolerance with its coordinator, and some users appreciate the ease of use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
absolutely! be aware that these patches have been working for years in downstream distributions. This PR is about contributing them back, so apparently, it's ZK the long-term plan is to improve abstractions and make it more pluggable, and replace ZK with Kubernetes etcd, for example, but the default (yarn-based) behavior doesn't utilize an AM registry at all
can you please elaborate on how Trino's lack of fault tolerance comes to this picture? I mean, this ZK-based registry is more about AM service discovery, not fault tolerance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi folks, sorry, I'd like to chime in with a question here. :)
From the description of TEZ-3991, this feature seems to allow the AM to manage its own lifecycle, while clients such as HiveServer2 only need to discover available AMs via ZooKeeper (ZK) and submit tasks.
Based on my current limited understanding, the benefits of this feature should include reducing the burden on HS2 and potentially making task submission more efficient.
However, I have a few questions:
-
Is this feature primarily beneficial for LLAP mode? Since LLAP requires long-running Tez AMs, and IMO this feature seems more useful for long-running tasks.
-
Could this functionality serve as preliminary groundwork for decoupling the Tez framework from YARN? For example, could it pave the way for Tez to run directly on Kubernetes without relying on YARN for resource management?
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
be aware that these patches have been working for years in downstream distributions
Sounds excellent.
can you please elaborate on how Trino's lack of fault tolerance comes to this picture?
Trino must have SPOF, but still attracts users. Hive on Tez (with LLAP) without fault tolerance is easier to set up than Hive on Tez with fault tolerance. I expect the easy quick start should attract users.
4702ed6 to
c24caeb
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
d3c969a to
b3ae071
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
ec30327 to
db11fb6
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
2e44662 to
eec88ab
Compare
This comment was marked as outdated.
This comment was marked as outdated.
50a3c90 to
32070fc
Compare
This comment was marked as outdated.
This comment was marked as outdated.
32070fc to
67cc981
Compare
…s - checkstyle, spotbugs, javadoc improvements, refactor, test fixes
67cc981 to
021a52b
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
ayushtkn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanx @abstractdog for the PR, Had a first pass, dropped some comments
| //externalId is optional, if not provided, convert to empty string | ||
| this.externalId = (externalId == null) ? "" : externalId; | ||
| this.computeName = (computeName == null) ? ZkConfig.DEFAULT_COMPUTE_GROUP_NAME : computeName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, can we do something like:
this.externalId = Optional.ofNullable(externalId).orElse("");
this.computeName = Optional.ofNullable(computeName).orElse(ZkConfig.DEFAULT_COMPUTE_GROUP_NAME);
| if (other instanceof AMRecord otherRecord) { | ||
| if (other instanceof AMRecord) { | ||
| AMRecord otherRecord = (AMRecord) other; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be done this way
if (other instanceof AMRecord otherRecord) {
return appId.equals(otherRecord.appId)
&& hostName.equals(otherRecord.hostName)
&& hostIp.equals(otherRecord.hostIp)
&& port == otherRecord.port
&& externalId.equals(otherRecord.externalId)
&& computeName.equals(otherRecord.computeName);
}
| amRegistry.init(conf); | ||
| amRegistry.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
who does stop & close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added javadoc comment to clarify this:
/**
* Returns a singleton {@link AMRegistry} instance backed by ZooKeeper.
*
* <p>If the registry has not yet been created, this method initializes and starts
* a new {@link ZkAMRegistry} using the external AM identifier obtained from the
* {@code TEZ_AM_EXTERNAL_ID} environment variable.</p>
*
* <p>When the registry is used as a service within the DAGAppMaster, the
* DAGAppMaster is responsible for managing its lifecycle, including closure.</p>
*
* @param conf the configuration used to initialize the registry; must not be null
* @return the initialized and started {@link AMRegistry} instance
* @throws IllegalStateException if the {@code TEZ_AM_EXTERNAL_ID} environment variable is not set
* @throws RuntimeException if an error occurs while creating, initializing, or starting the registry
*/
|
|
||
| @Override | ||
| public AMExtensions getAMExtensions() { | ||
| return new ZkStandaloneAMExtensions(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are creating new instance on every invocation? should we create once & then send back the same if not null?
|
|
||
| package org.apache.tez.client.registry.zookeeper; | ||
|
|
||
| import static org.junit.Assert.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
should avoid *
|
|
||
| private TezConfiguration getTezConfForZkDiscovery() { | ||
| TezConfiguration tezConf = new TezConfiguration(); | ||
| tezConf.set(TezConfiguration.TEZ_FRAMEWORK_MODE, "STANDALONE_ZOOKEEPER"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
STANDALONE_ZOOKEEPER.name()
| } | ||
| } | ||
| assertTrue("AM record should be in getAllRecords", found); | ||
| LOG.info("Test completed successfully. AM was discovered: {}", amRecord); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what value does this log line add :-)
1b01383 to
7f72e4a
Compare
This comment was marked as outdated.
This comment was marked as outdated.
7f72e4a to
4c9ee7f
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
4c9ee7f to
731c4df
Compare
|
🎊 +1 overall
This message was automatically generated. |
731c4df to
045ff68
Compare
|
🎊 +1 overall
This message was automatically generated. |
|
@ayushtkn : all your comments have been addressed, fixed many things according to them, which made this area much better, thanks! |
|
🎊 +1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
ayushtkn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments. Rest LGTM
| if (other instanceof AMRecord otherRecord) { | ||
| return appId.equals(otherRecord.appId) | ||
| && host.equals(otherRecord.host) | ||
| && hostName.equals(otherRecord.hostName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this lead to NPE, we don't have any default here
this.hostName = serviceRecord.get(HOST_NAME_RECORD_KEY);
|
|
||
| @Override | ||
| public String toString() { | ||
| return toServiceRecord().attributes().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toString calls toServiceRecord, So, it can lead to initializing the serviceRecord = new ServiceRecord();, are we ok with it. This could lead to issues later, like someone logged it or so, before setting the values and then we have the service record cached as well
| AMRecord createAmRecord(ApplicationId appId, String hostName, String hostIp, int port, | ||
| String computeName); | ||
|
|
||
| void close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AutoCloseable already defines close, do we need to define again here?
| void add(AMRecord server) throws Exception; | ||
|
|
||
| void remove(AMRecord server) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be server or record
| * <p>Listeners may be registered to receive notifications when AM records | ||
| * appear or are removed.</p> | ||
| */ | ||
| public abstract class AMRegistryClient implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do implements AutoCloseable?
| @Override | ||
| public void start() { | ||
| try { | ||
| amRegistryClient.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should return if isRunning is true here
|
|
||
| public class ZkFrameworkClient extends FrameworkClient { | ||
|
|
||
| private AMRecord amRecord; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be volatile
| } else if (modeInEnv != null) { | ||
| return getByMode(interfaze, modeInEnv); | ||
| } else if (defaultClazz != null) { | ||
| return (T) defaultClazz.newInstance(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is deprecated, does this work
defaultClazz.getDeclaredConstructor().newInstance();
| String modeInConf = conf != null ? conf.get(TezConfiguration.TEZ_FRAMEWORK_MODE) : null; | ||
| String modeInEnv = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE); | ||
| try { | ||
| if (modeInConf != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be StringUtils.isNotEmpty()?
| .thenReturn(YarnApplicationState.RUNNING); | ||
|
|
||
| //Client 1 start | ||
| client.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing close?
This PR contains a Zookeeper-based Framework service and a major refactor in this area.
Pieces of this PR: