diff --git a/accounting/filters/BaseFilter.py b/accounting/filters/BaseFilter.py index 5b40956..cf13cf5 100644 --- a/accounting/filters/BaseFilter.py +++ b/accounting/filters/BaseFilter.py @@ -7,6 +7,25 @@ import importlib +RESOURCE_TYPES = ["Cpus", "Memory", "Disk", "Gpus"] +FLOORED_RESOURCE_FIELD = "FlooredRequest{resource}" +FLOORED_RESOURCE_SCRIPT = """ +if ( + doc.containsKey("{resource}Provisioned") && + doc["{resource}Provisioned"].size() > 0 && + doc.containsKey("Request{resource}") && + doc["Request{resource}"].size() > 0 && + doc["{resource}Provisioned"].value < doc["Request{resource}"].value + ) {{ + emit(doc["{resource}Provisioned"].value); +}} else if ( + doc.containsKey("Request{resource}") && + doc["Request{resource}"].size() > 0) {{ + emit(doc["Request{resource}"].value); +}} +""" + + class BaseFilter: name = "job history" @@ -96,6 +115,20 @@ def get_query(self, index, start_ts, end_ts, scroll=None, size=500): } } } + + # Add floored resource requests (INF-3590) + fields = [] + runtime_mappings = {} + for resource in RESOURCE_TYPES: + fields.append(FLOORED_RESOURCE_FIELD.format(resource=resource)) + runtime_mappings[FLOORED_RESOURCE_FIELD.format(resource=resource)] = { + "type": "long", + "script": { + "source": FLOORED_RESOURCE_SCRIPT.format(resource=resource) + } + } + query["fields"] = fields + query["runtime_mappings"] = runtime_mappings return query def user_filter(self, data, doc): diff --git a/accounting/filters/ChtcScheddCpuFilter.py b/accounting/filters/ChtcScheddCpuFilter.py index ee9903b..d52f554 100644 --- a/accounting/filters/ChtcScheddCpuFilter.py +++ b/accounting/filters/ChtcScheddCpuFilter.py @@ -145,6 +145,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -188,9 +191,9 @@ def schedd_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) @@ -202,6 +205,8 @@ def schedd_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -210,6 +215,9 @@ def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -254,9 +262,9 @@ def user_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) @@ -271,6 +279,8 @@ def user_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -280,6 +290,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -324,9 +337,9 @@ def project_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) @@ -338,6 +351,8 @@ def project_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) diff --git a/accounting/filters/ChtcScheddCpuMonthlyFilter.py b/accounting/filters/ChtcScheddCpuMonthlyFilter.py index 36d7c4b..a46bdb6 100644 --- a/accounting/filters/ChtcScheddCpuMonthlyFilter.py +++ b/accounting/filters/ChtcScheddCpuMonthlyFilter.py @@ -77,40 +77,36 @@ def get_query(self, index, start_ts, end_ts, **kwargs): # (Dict has same structure as the REST API query language) query = super().get_query(index, start_ts, end_ts, **kwargs) - query.update({ - "body": { - "query": { - "bool": { - "filter": [ - {"range": { - "RecordTime": { - "gte": start_ts, - "lt": end_ts, - } - }}, - {"regexp": { - "ScheddName.keyword": ".*[.]chtc[.]wisc[.]edu" - }} - ], - "must_not": [ - {"terms": { - "JobUniverse": [7, 12] - }}, - ], - } - } + query["body"]["query"].update({ + "bool": { + "filter": [ + {"range": { + "RecordTime": { + "gte": start_ts, + "lt": end_ts, + } + }}, + {"regexp": { + "ScheddName.keyword": ".*[.]chtc[.]wisc[.]edu" + }} + ], + "must_not": [ + {"terms": { + "JobUniverse": [7, 12] + }}, + ], } }) return query - def reduce_data(self, i, o, t, is_site=False): + def reduce_data(self, i, f, o, t, is_site=False): is_removed = i.get("JobStatus") == 3 is_dagnode = i.get("DAGNodeName") is not None is_exec = i.get("NumJobStarts", 0) >= 1 is_multiexec = i.get("NumJobStarts", 0) > 1 has_holds = i.get("NumHolds", 0) > 0 - is_over_rqst_disk = i.get("DiskUsage", 0) > i.get("RequestDisk", 1000) + is_over_rqst_disk = i.get("DiskUsage", 0) > f.get("FlooredRequestDisk", 1000) is_singularity_job = i.get("SingularityImage") is not None has_activation_duration = i.get("activationduration") is not None if has_activation_duration: @@ -143,9 +139,9 @@ def reduce_data(self, i, o, t, is_site=False): elif not is_removed: goodput_time = int(float(i.get("lastremotewallclocktime", i.get("CommittedTime", 0)))) job_units = get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, ) input_files = 0 input_bytes = 0 @@ -221,9 +217,9 @@ def reduce_data(self, i, o, t, is_site=False): sum_cols["TotalActivationSetupDuration"] = int(activation_setup_duration) sum_cols["TotalLongJobWallClockTime"] = long_job_wallclock_time - sum_cols["GoodCpuTime"] = (goodput_time * max(i.get("RequestCpus", 1), 1)) - sum_cols["CpuTime"] = (i.get("RemoteWallClockTime", 0) * max(i.get("RequestCpus", 1), 1)) - sum_cols["BadCpuTime"] = ((i.get("RemoteWallClockTime", 0) - goodput_time) * max(i.get("RequestCpus", 1), 1)) + sum_cols["GoodCpuTime"] = (goodput_time * max(f.get("FlooredRequestCpus", 1), 1)) + sum_cols["CpuTime"] = (i.get("RemoteWallClockTime", 0) * max(f.get("FlooredRequestCpus", 1), 1)) + sum_cols["BadCpuTime"] = ((i.get("RemoteWallClockTime", 0) - goodput_time) * max(f.get("FlooredRequestCpus", 1), 1)) sum_cols["JobUnitTime"] = job_units * i.get("RemoteWallClockTime", 0) sum_cols["NumShadowStarts"] = i.get("NumShadowStarts", 0) sum_cols["NumJobStarts"] = i.get("NumJobStarts", 0) @@ -245,11 +241,11 @@ def reduce_data(self, i, o, t, is_site=False): max_cols = {} max_cols["MaxLongJobWallClockTime"] = long_job_wallclock_time - max_cols["MaxRequestMemory"] = i.get("RequestMemory", 0) + max_cols["MaxRequestMemory"] = f.get("FlooredRequestMemory", 0) max_cols["MaxMemoryUsage"] = i.get("MemoryUsage", 0) - max_cols["MaxRequestDisk"] = i.get("RequestDisk", 0) + max_cols["MaxRequestDisk"] = f.get("FlooredRequestDisk", 0) max_cols["MaxDiskUsage"] = i.get("DiskUsage", 0) - max_cols["MaxRequestCpus"] = i.get("RequestCpus", 1) + max_cols["MaxRequestCpus"] = f.get("FlooredRequestCpus", 1) max_cols["MaxJobUnits"] = job_units min_cols = {} @@ -270,24 +266,30 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" output = data["Schedds"][schedd] total = data["Schedds"]["TOTAL"] - self.reduce_data(i, output, total) + self.reduce_data(i, f, output, total) def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" output = data["Users"][user] total = data["Users"]["TOTAL"] - self.reduce_data(i, output, total) + self.reduce_data(i, f, output, total) counter_cols = {} counter_cols["ScheddNames"] = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" @@ -307,12 +309,15 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", i.get("projectname", "UNKNOWN"))) or "UNKNOWN" output = data["Projects"][project] total = data["Projects"]["TOTAL"] - self.reduce_data(i, output, total) + self.reduce_data(i, f, output, total) dict_cols = {} dict_cols["Users"] = i.get("User", "UNKNOWN") or "UNKNOWN" diff --git a/accounting/filters/ChtcScheddCpuOspoolFilter.py b/accounting/filters/ChtcScheddCpuOspoolFilter.py index 09b9530..3086fff 100644 --- a/accounting/filters/ChtcScheddCpuOspoolFilter.py +++ b/accounting/filters/ChtcScheddCpuOspoolFilter.py @@ -241,6 +241,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -288,9 +291,9 @@ def schedd_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) @@ -302,6 +305,8 @@ def schedd_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -310,6 +315,9 @@ def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -359,9 +367,9 @@ def user_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) @@ -376,6 +384,8 @@ def user_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -385,6 +395,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -433,9 +446,9 @@ def project_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) @@ -447,6 +460,8 @@ def project_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) diff --git a/accounting/filters/ChtcScheddCpuOspoolMonthlyFilter.py b/accounting/filters/ChtcScheddCpuOspoolMonthlyFilter.py index 94c881b..7ba8a03 100644 --- a/accounting/filters/ChtcScheddCpuOspoolMonthlyFilter.py +++ b/accounting/filters/ChtcScheddCpuOspoolMonthlyFilter.py @@ -152,40 +152,36 @@ def get_query(self, index, start_ts, end_ts, **kwargs): # (Dict has same structure as the REST API query language) query = super().get_query(index, start_ts, end_ts, **kwargs) - query.update({ - "body": { - "query": { - "bool": { - "filter": [ - {"range": { - "RecordTime": { - "gte": start_ts, - "lt": end_ts, - } - }}, - {"regexp": { - "ScheddName.keyword": ".*[.]chtc[.]wisc[.]edu" - }} - ], - "must_not": [ - {"terms": { - "JobUniverse": [7, 12] - }}, - ], - } - } + query["body"]["query"].update({ + "bool": { + "filter": [ + {"range": { + "RecordTime": { + "gte": start_ts, + "lt": end_ts, + } + }}, + {"regexp": { + "ScheddName.keyword": ".*[.]chtc[.]wisc[.]edu" + }} + ], + "must_not": [ + {"terms": { + "JobUniverse": [7, 12] + }}, + ], } }) return query - def reduce_data(self, i, o, t, is_site=False): + def reduce_data(self, i, f, o, t, is_site=False): is_removed = i.get("JobStatus") == 3 is_dagnode = i.get("DAGNodeName") is not None is_exec = i.get("NumJobStarts", 0) >= 1 is_multiexec = i.get("NumJobStarts", 0) > 1 has_holds = i.get("NumHolds", 0) > 0 - is_over_rqst_disk = i.get("DiskUsage", 0) > i.get("RequestDisk", 1000) + is_over_rqst_disk = i.get("DiskUsage", 0) > f.get("FlooredRequestDisk", 1000) is_singularity_job = i.get("SingularityImage") is not None has_activation_duration = i.get("activationduration") is not None if has_activation_duration: @@ -218,9 +214,9 @@ def reduce_data(self, i, o, t, is_site=False): elif not is_removed: goodput_time = int(float(i.get("lastremotewallclocktime", i.get("CommittedTime", 0)))) job_units = get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, ) input_files = 0 input_bytes = 0 @@ -296,9 +292,9 @@ def reduce_data(self, i, o, t, is_site=False): sum_cols["TotalActivationSetupDuration"] = int(activation_setup_duration) sum_cols["TotalLongJobWallClockTime"] = long_job_wallclock_time - sum_cols["GoodCpuTime"] = (goodput_time * max(i.get("RequestCpus", 1), 1)) - sum_cols["CpuTime"] = (i.get("RemoteWallClockTime", 0) * max(i.get("RequestCpus", 1), 1)) - sum_cols["BadCpuTime"] = ((i.get("RemoteWallClockTime", 0) - goodput_time) * max(i.get("RequestCpus", 1), 1)) + sum_cols["GoodCpuTime"] = (goodput_time * max(f.get("FlooredRequestCpus", 1), 1)) + sum_cols["CpuTime"] = (i.get("RemoteWallClockTime", 0) * max(f.get("FlooredRequestCpus", 1), 1)) + sum_cols["BadCpuTime"] = ((i.get("RemoteWallClockTime", 0) - goodput_time) * max(f.get("FlooredRequestCpus", 1), 1)) sum_cols["JobUnitTime"] = job_units * i.get("RemoteWallClockTime", 0) sum_cols["NumShadowStarts"] = i.get("NumShadowStarts", 0) sum_cols["NumJobStarts"] = i.get("NumJobStarts", 0) @@ -320,11 +316,11 @@ def reduce_data(self, i, o, t, is_site=False): max_cols = {} max_cols["MaxLongJobWallClockTime"] = long_job_wallclock_time - max_cols["MaxRequestMemory"] = i.get("RequestMemory", 0) + max_cols["MaxRequestMemory"] = f.get("FlooredRequestMemory", 0) max_cols["MaxMemoryUsage"] = i.get("MemoryUsage", 0) - max_cols["MaxRequestDisk"] = i.get("RequestDisk", 0) + max_cols["MaxRequestDisk"] = f.get("FlooredRequestDisk", 0) max_cols["MaxDiskUsage"] = i.get("DiskUsage", 0) - max_cols["MaxRequestCpus"] = i.get("RequestCpus", 1) + max_cols["MaxRequestCpus"] = f.get("FlooredRequestCpus", 1) max_cols["MaxJobUnits"] = job_units min_cols = {} @@ -345,6 +341,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that did not run in the OS pool if not self.is_ospool_job(i.get("ScheddName"), i.get("LastRemotePool")): return @@ -354,13 +353,16 @@ def schedd_filter(self, data, doc): output = data["Schedds"][schedd] total = data["Schedds"]["TOTAL"] - self.reduce_data(i, output, total) + self.reduce_data(i, f, output, total) def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that did not run in the OS pool if not self.is_ospool_job(i.get("ScheddName"), i.get("LastRemotePool")): return @@ -370,7 +372,7 @@ def user_filter(self, data, doc): output = data["Users"][user] total = data["Users"]["TOTAL"] - self.reduce_data(i, output, total) + self.reduce_data(i, f, output, total) counter_cols = {} counter_cols["ScheddNames"] = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" @@ -390,6 +392,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that did not run in the OS pool if not self.is_ospool_job(i.get("ScheddName"), i.get("LastRemotePool")): return @@ -399,7 +404,7 @@ def project_filter(self, data, doc): output = data["Projects"][project] total = data["Projects"]["TOTAL"] - self.reduce_data(i, output, total) + self.reduce_data(i, f, output, total) dict_cols = {} dict_cols["Users"] = i.get("User", "UNKNOWN") or "UNKNOWN" diff --git a/accounting/filters/ChtcScheddCpuRemovedFilter.py b/accounting/filters/ChtcScheddCpuRemovedFilter.py index 9f4702d..8944810 100644 --- a/accounting/filters/ChtcScheddCpuRemovedFilter.py +++ b/accounting/filters/ChtcScheddCpuRemovedFilter.py @@ -95,6 +95,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -133,13 +136,19 @@ def schedd_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -182,6 +191,8 @@ def user_filter(self, data, doc): # Use UNKNOWN for missing or blank ProjectName and ScheddName if attr in {"ScheddName", "ProjectName"}: o[attr].append(i.get(attr, i.get(attr.lower(), "UNKNOWN")) or "UNKNOWN") + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -190,6 +201,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -229,7 +243,10 @@ def project_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def get_filters(self): # Add all filter methods to a list diff --git a/accounting/filters/ChtcScheddDSIGpuFilter.py b/accounting/filters/ChtcScheddDSIGpuFilter.py index b48b8ad..c593046 100644 --- a/accounting/filters/ChtcScheddDSIGpuFilter.py +++ b/accounting/filters/ChtcScheddDSIGpuFilter.py @@ -80,6 +80,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -130,6 +133,8 @@ def project_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -139,6 +144,9 @@ def machine_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that were removed if i.get("JobStatus", 4) == 3: return @@ -161,7 +169,10 @@ def machine_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def get_filters(self): # Add all filter methods to a list diff --git a/accounting/filters/ChtcScheddGpuFilter.py b/accounting/filters/ChtcScheddGpuFilter.py index 0d014a3..1af2322 100644 --- a/accounting/filters/ChtcScheddGpuFilter.py +++ b/accounting/filters/ChtcScheddGpuFilter.py @@ -142,6 +142,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -184,13 +187,19 @@ def schedd_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -237,6 +246,8 @@ def user_filter(self, data, doc): # Use UNKNOWN for missing or blank ScheddName if attr in {"ScheddName", "ProjectName"}: o[attr].append(i.get(attr, i.get(attr.lower(), "UNKNOWN")) or "UNKNOWN") + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -246,6 +257,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -294,6 +308,8 @@ def project_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -303,6 +319,9 @@ def machine_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that were removed if i.get("JobStatus", 4) == 3: return @@ -325,7 +344,10 @@ def machine_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def get_filters(self): # Add all filter methods to a list diff --git a/accounting/filters/ChtcScheddJobDistroFilter.py b/accounting/filters/ChtcScheddJobDistroFilter.py index ccec136..8616d84 100644 --- a/accounting/filters/ChtcScheddJobDistroFilter.py +++ b/accounting/filters/ChtcScheddJobDistroFilter.py @@ -163,13 +163,16 @@ def job_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict requests = data["JobRequests"] usages = data["JobUsages"] # Check for missing attrs - request_disk = i.get("RequestDisk") - request_memory = i.get("RequestMemory") + request_disk = f.get("FlooredRequestDisk") + request_memory = f.get("FlooredRequestMemory") skip_requests = None in [request_disk, request_memory] usage_disk = i.get("DiskUsage_RAW", i.get("DiskUsage")) @@ -180,7 +183,7 @@ def job_filter(self, data, doc): total_jobs = requests.get("TotalJobs", 0) requests["TotalJobs"] = total_jobs + 1 # Filter out jobs that request more than one core - if not i.get("RequestCpus", 1) > 1: + if not f.get("FlooredRequestCpus", 1) > 1: histogram = requests.get("Histogram", defaultdict(int)) q_request_disk = self.quantize_disk(request_disk) q_request_memory = self.quantize_memory(request_memory) @@ -193,7 +196,7 @@ def job_filter(self, data, doc): total_jobs = usages.get("TotalJobs", 0) usages["TotalJobs"] = total_jobs + 1 # Filter out jobs that request more than one core - if not i.get("RequestCpus", 1) > 1: + if not f.get("FlooredRequestCpus", 1) > 1: histogram = usages.get("Histogram", defaultdict(int)) q_usage_disk = self.quantize_disk(usage_disk) q_usage_memory = self.quantize_memory(usage_memory) diff --git a/accounting/filters/IgwnScheddCpuFilter.py b/accounting/filters/IgwnScheddCpuFilter.py index ccf0f39..3fa82f6 100644 --- a/accounting/filters/IgwnScheddCpuFilter.py +++ b/accounting/filters/IgwnScheddCpuFilter.py @@ -144,6 +144,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -187,9 +190,9 @@ def schedd_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) @@ -201,6 +204,8 @@ def schedd_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -209,6 +214,9 @@ def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -253,9 +261,9 @@ def user_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) @@ -270,6 +278,8 @@ def user_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) diff --git a/accounting/filters/IgwnScheddCpuMonthlyFilter.py b/accounting/filters/IgwnScheddCpuMonthlyFilter.py index 5619425..de25cce 100644 --- a/accounting/filters/IgwnScheddCpuMonthlyFilter.py +++ b/accounting/filters/IgwnScheddCpuMonthlyFilter.py @@ -116,7 +116,7 @@ def reduce_data(self, i, o, t, is_site=False): i.get("SuccessCheckpointExitBySignal", False) or i.get("SuccessCheckpointExitCode") is not None )) - is_over_disk_request = i.get("DiskUsage", 0) > i.get("RequestDisk", 1) + is_over_disk_request = i.get("DiskUsage", 0) > f.get("FlooredRequestDisk", 1) goodput_time = 0 if not is_removed: goodput_time = int(float(i.get("lastremotewallclockTime", i.get("CommittedTime", 0)))) @@ -166,9 +166,9 @@ def reduce_data(self, i, o, t, is_site=False): input_bytes += i.get("BytesRecvd", 0) output_bytes += i.get("BytesSent", 0) job_units = get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, ) long_job_wallclock_time = int(is_long) * int(float(i.get("lastremotewallclocktime", i.get("CommittedTime", 60)))) @@ -185,9 +185,9 @@ def reduce_data(self, i, o, t, is_site=False): sum_cols["OverDiskJobs"] = int(is_over_disk_request) sum_cols["TotalLongJobWallClockTime"] = long_job_wallclock_time - sum_cols["GoodCpuTime"] = (goodput_time * max(i.get("RequestCpus", 1), 1)) - sum_cols["CpuTime"] = (i.get("RemoteWallClockTime", 0) * max(i.get("RequestCpus", 1), 1)) - sum_cols["BadCpuTime"] = ((i.get("RemoteWallClockTime", 0) - goodput_time) * max(i.get("RequestCpus", 1), 1)) + sum_cols["GoodCpuTime"] = (goodput_time * max(f.get("FlooredRequestCpus", 1), 1)) + sum_cols["CpuTime"] = (i.get("RemoteWallClockTime", 0) * max(f.get("FlooredRequestCpus", 1), 1)) + sum_cols["BadCpuTime"] = ((i.get("RemoteWallClockTime", 0) - goodput_time) * max(f.get("FlooredRequestCpus", 1), 1)) sum_cols["JobUnitTime"] = job_units * i.get("RemoteWallClockTime", 0) sum_cols["NumShadowStarts"] = i.get("NumShadowStarts", 0) sum_cols["NumJobStarts"] = i.get("NumJobStarts", 0) @@ -208,11 +208,11 @@ def reduce_data(self, i, o, t, is_site=False): max_cols = {} max_cols["MaxLongJobWallClockTime"] = long_job_wallclock_time - max_cols["MaxRequestMemory"] = i.get("RequestMemory", 0) + max_cols["MaxRequestMemory"] = f.get("FlooredRequestMemory", 0) max_cols["MaxMemoryUsage"] = i.get("MemoryUsage", 0) - max_cols["MaxRequestDisk"] = i.get("RequestDisk", 0) + max_cols["MaxRequestDisk"] = f.get("FlooredRequestDisk", 0) max_cols["MaxDiskUsage"] = i.get("DiskUsage", 0) - max_cols["MaxRequestCpus"] = i.get("RequestCpus", 1) + max_cols["MaxRequestCpus"] = f.get("FlooredRequestCpus", 1) max_cols["MaxJobUnits"] = job_units min_cols = {} @@ -233,6 +233,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" output = data["Schedds"][schedd] @@ -245,6 +248,9 @@ def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" output = data["Users"][user] diff --git a/accounting/filters/OsgScheddCpuFilter.py b/accounting/filters/OsgScheddCpuFilter.py index 2782f19..0419887 100644 --- a/accounting/filters/OsgScheddCpuFilter.py +++ b/accounting/filters/OsgScheddCpuFilter.py @@ -310,6 +310,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -354,22 +357,28 @@ def schedd_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -415,9 +424,9 @@ def user_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) @@ -427,6 +436,8 @@ def user_filter(self, data, doc): # Use UNKNOWN for missing or blank ProjectName and ScheddName if attr in {"ScheddName", "ProjectName"}: o[attr].append(i.get(attr, i.get(attr.lower(), "UNKNOWN")) or "UNKNOWN") + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -435,6 +446,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -480,9 +494,9 @@ def project_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) @@ -501,7 +515,10 @@ def project_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def institution_filter(self, data, doc): @@ -509,6 +526,9 @@ def institution_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that did not run in the OS pool if not self.is_ospool_job(i): return @@ -545,16 +565,19 @@ def institution_filter(self, data, doc): # Compute job units if i.get("RemoteWallClockTime", 0) > 0: o["NumJobUnits"].append(get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, )) else: o["NumJobUnits"].append(None) # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def get_filters(self): # Add all filter methods to a list diff --git a/accounting/filters/OsgScheddCpuHeldFilter.py b/accounting/filters/OsgScheddCpuHeldFilter.py index a80df22..88a9f6f 100644 --- a/accounting/filters/OsgScheddCpuHeldFilter.py +++ b/accounting/filters/OsgScheddCpuHeldFilter.py @@ -237,6 +237,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -271,13 +274,19 @@ def schedd_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -316,6 +325,8 @@ def user_filter(self, data, doc): # Use UNKNOWN for missing or blank ProjectName and ScheddName if attr in {"ScheddName", "ProjectName"}: o[attr].append(i.get(attr, i.get(attr.lower(), "UNKNOWN")) or "UNKNOWN") + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -324,6 +335,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -359,7 +373,10 @@ def project_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def get_filters(self): # Add all filter methods to a list diff --git a/accounting/filters/OsgScheddCpuMonthlyFilter.py b/accounting/filters/OsgScheddCpuMonthlyFilter.py index 0d33502..e0c433a 100644 --- a/accounting/filters/OsgScheddCpuMonthlyFilter.py +++ b/accounting/filters/OsgScheddCpuMonthlyFilter.py @@ -239,7 +239,7 @@ def is_ospool_job(self, schedd_name, last_remote_pool): return bool(remote_pools & self.collector_hosts) return False - def reduce_data(self, i, o, t, is_site=False): + def reduce_data(self, i, f, o, t, is_site=False): is_removed = i.get("JobStatus") == 3 is_dagnode = i.get("DAGNodeName") is not None @@ -253,7 +253,7 @@ def reduce_data(self, i, o, t, is_site=False): i.get("SuccessCheckpointExitBySignal", False) or i.get("SuccessCheckpointExitCode") is not None ) - is_over_disk_request = i.get("DiskUsage", 0) > i.get("RequestDisk", 1) + is_over_disk_request = i.get("DiskUsage", 0) > f.get("FlooredRequestDisk", 1) goodput_time = 0 if not is_removed: goodput_time = i.get("LastRemoteWallClockTime", i.get("CommittedTime", 0)) @@ -303,9 +303,9 @@ def reduce_data(self, i, o, t, is_site=False): input_bytes += i.get("BytesRecvd", 0) output_bytes += i.get("BytesSent", 0) job_units = get_job_units( - cpus=i.get("RequestCpus", 1), - memory_gb=i.get("RequestMemory", 1024)/1024, - disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + cpus=f.get("FlooredRequestCpus", 1), + memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, ) long_job_wallclock_time = int(is_long) * i.get("LastRemoteWallClockTime", i.get("CommittedTime", 60)) has_num_vacates_by_reason = False @@ -327,9 +327,9 @@ def reduce_data(self, i, o, t, is_site=False): sum_cols["OverDiskJobs"] = int(is_over_disk_request) # sum_cols["TotalLongJobWallClockTime"] = long_job_wallclock_time - sum_cols["GoodCpuTime"] = (goodput_time * max(i.get("RequestCpus", 1), 1)) - sum_cols["CpuTime"] = (i.get("RemoteWallClockTime", 0) * max(i.get("RequestCpus", 1), 1)) - sum_cols["BadCpuTime"] = ((i.get("RemoteWallClockTime", 0) - goodput_time) * max(i.get("RequestCpus", 1), 1)) + sum_cols["GoodCpuTime"] = (goodput_time * max(f.get("FlooredRequestCpus", 1), 1)) + sum_cols["CpuTime"] = (i.get("RemoteWallClockTime", 0) * max(f.get("FlooredRequestCpus", 1), 1)) + sum_cols["BadCpuTime"] = ((i.get("RemoteWallClockTime", 0) - goodput_time) * max(f.get("FlooredRequestCpus", 1), 1)) sum_cols["JobUnitTime"] = job_units * i.get("RemoteWallClockTime", 0) sum_cols["NumShadowStarts"] = i.get("NumShadowStarts", 0) sum_cols["NumJobStarts"] = i.get("NumJobStarts", 0) @@ -362,11 +362,11 @@ def reduce_data(self, i, o, t, is_site=False): max_cols = {} # max_cols["MaxLongJobWallClockTime"] = long_job_wallclock_time - max_cols["MaxRequestMemory"] = i.get("RequestMemory", 0) + max_cols["MaxRequestMemory"] = f.get("FlooredRequestMemory", 0) max_cols["MaxMemoryUsage"] = i.get("MemoryUsage", 0) - max_cols["MaxRequestDisk"] = i.get("RequestDisk", 0) + max_cols["MaxRequestDisk"] = f.get("FlooredRequestDisk", 0) max_cols["MaxDiskUsage"] = i.get("DiskUsage", 0) - max_cols["MaxRequestCpus"] = i.get("RequestCpus", 1) + max_cols["MaxRequestCpus"] = f.get("FlooredRequestCpus", 1) max_cols["MaxJobUnits"] = job_units min_cols = {} @@ -387,6 +387,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that did not run in the OS pool if not self.is_ospool_job(i.get("ScheddName"), i.get("LastRemotePool")): return @@ -396,13 +399,16 @@ def schedd_filter(self, data, doc): output = data["Schedds"][schedd] total = data["Schedds"]["TOTAL"] - self.reduce_data(i, output, total) + self.reduce_data(i, f, output, total) def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that did not run in the OS pool if not self.is_ospool_job(i.get("ScheddName"), i.get("LastRemotePool")): return @@ -412,7 +418,7 @@ def user_filter(self, data, doc): output = data["Users"][user] total = data["Users"]["TOTAL"] - self.reduce_data(i, output, total) + self.reduce_data(i, f, output, total) counter_cols = {} counter_cols["ScheddNames"] = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" @@ -431,6 +437,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that did not run in the OS pool if not self.is_ospool_job(i.get("ScheddName"), i.get("LastRemotePool")): return @@ -440,7 +449,7 @@ def project_filter(self, data, doc): output = data["Projects"][project] total = data["Projects"]["TOTAL"] - self.reduce_data(i, output, total) + self.reduce_data(i, f, output, total) dict_cols = {} dict_cols["Users"] = i.get("User", "UNKNOWN") or "UNKNOWN" @@ -467,6 +476,9 @@ def institution_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that did not run in the OS pool if not self.is_ospool_job(i.get("ScheddName"), i.get("LastRemotePool")): return @@ -487,7 +499,7 @@ def institution_filter(self, data, doc): output = data["Institution"][institution] total = data["Institution"]["TOTAL"] - self.reduce_data(i, output, total) + self.reduce_data(i, f, output, total) dict_cols = {} dict_cols["Users"] = i.get("User", "UNKNOWN") or "UNKNOWN" diff --git a/accounting/filters/OsgScheddCpuRemovedFilter.py b/accounting/filters/OsgScheddCpuRemovedFilter.py index 2757a8f..ddff232 100644 --- a/accounting/filters/OsgScheddCpuRemovedFilter.py +++ b/accounting/filters/OsgScheddCpuRemovedFilter.py @@ -165,6 +165,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -207,13 +210,19 @@ def schedd_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -260,6 +269,8 @@ def user_filter(self, data, doc): # Use UNKNOWN for missing or blank ProjectName and ScheddName if attr in {"ScheddName", "ProjectName"}: o[attr].append(i.get(attr, i.get(attr.lower(), "UNKNOWN")) or "UNKNOWN") + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -268,6 +279,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -311,7 +325,10 @@ def project_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def get_filters(self): # Add all filter methods to a list diff --git a/accounting/filters/OsgScheddCpuRetryFilter.py b/accounting/filters/OsgScheddCpuRetryFilter.py index c0ef45b..c5c6469 100644 --- a/accounting/filters/OsgScheddCpuRetryFilter.py +++ b/accounting/filters/OsgScheddCpuRetryFilter.py @@ -137,6 +137,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -153,13 +156,19 @@ def schedd_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -180,6 +189,8 @@ def user_filter(self, data, doc): # Use UNKNOWN for missing or blank ProjectName and ScheddName if attr in {"ScheddName", "ProjectName"}: o[attr].append(i.get(attr, i.get(attr.lower(), "UNKNOWN")) or "UNKNOWN") + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -188,6 +199,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -205,7 +219,10 @@ def project_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def get_filters(self): # Add all filter methods to a list diff --git a/accounting/filters/OsgScheddGpuFilter.py b/accounting/filters/OsgScheddGpuFilter.py index d40bdd2..40ac751 100644 --- a/accounting/filters/OsgScheddGpuFilter.py +++ b/accounting/filters/OsgScheddGpuFilter.py @@ -328,6 +328,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -371,13 +374,19 @@ def schedd_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -425,6 +434,8 @@ def user_filter(self, data, doc): # Use UNKNOWN for missing or blank ProjectName and ScheddName if attr in {"ScheddName", "ProjectName"}: o[attr].append(i.get(attr, i.get(attr.lower(), "UNKNOWN")) or "UNKNOWN") + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -433,6 +444,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -489,7 +503,10 @@ def project_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def institution_filter(self, data, doc): @@ -497,6 +514,9 @@ def institution_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that did not run in the OS pool if not self.is_ospool_job(i): return @@ -532,7 +552,10 @@ def institution_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) def get_filters(self): # Add all filter methods to a list diff --git a/accounting/filters/OsgScheddJobDistroFilter.py b/accounting/filters/OsgScheddJobDistroFilter.py index 0dfb1a2..eaf6c1e 100644 --- a/accounting/filters/OsgScheddJobDistroFilter.py +++ b/accounting/filters/OsgScheddJobDistroFilter.py @@ -228,6 +228,9 @@ def job_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Filter out jobs that did not run in the OS pool if not self.is_ospool_job(i.get("ScheddName"), i.get("LastRemotePool")): return @@ -237,8 +240,8 @@ def job_filter(self, data, doc): usages = data["JobUsages"] # Check for missing attrs - request_disk = i.get("RequestDisk") - request_memory = i.get("RequestMemory") + request_disk = f.get("FlooredRequestDisk") + request_memory = f.get("FlooredRequestMemory") skip_requests = None in [request_disk, request_memory] usage_disk = i.get("DiskUsage_RAW", i.get("DiskUsage")) @@ -249,7 +252,7 @@ def job_filter(self, data, doc): total_jobs = requests.get("TotalJobs", 0) requests["TotalJobs"] = total_jobs + 1 # Filter out jobs that request more than one core - if not i.get("RequestCpus", 1) > 1: + if not f.get("FlooredRequestCpus", 1) > 1: histogram = requests.get("Histogram", defaultdict(int)) q_request_disk = self.quantize_disk(request_disk) q_request_memory = self.quantize_memory(request_memory) @@ -262,7 +265,7 @@ def job_filter(self, data, doc): total_jobs = usages.get("TotalJobs", 0) usages["TotalJobs"] = total_jobs + 1 # Filter out jobs that request more than one core - if not i.get("RequestCpus", 1) > 1: + if not f.get("FlooredRequestCpus", 1) > 1: histogram = usages.get("Histogram", defaultdict(int)) q_usage_disk = self.quantize_disk(usage_disk) q_usage_memory = self.quantize_memory(usage_memory) diff --git a/accounting/filters/OsgScheddLongJobFilter.py b/accounting/filters/OsgScheddLongJobFilter.py index 020b4e2..7a64eca 100644 --- a/accounting/filters/OsgScheddLongJobFilter.py +++ b/accounting/filters/OsgScheddLongJobFilter.py @@ -180,6 +180,9 @@ def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -208,7 +211,9 @@ def user_filter(self, data, doc): "LastRemoteHost"}: o[attr][0] = i.get(attr, i.get(attr.lower(), "UNKNOWN")) or "UNKNOWN" elif attr in {"RequestGpus"}: - o[attr][0] = i.get(attr, 0) + o[attr][0] = f.get("FlooredRequestGpus", i.get("RequestGpus", 0)) + elif attr.startswith("Request"): + o[attr][0] = f.get(f"Floored{attr}", i.get(attr, None)) else: o[attr][0] = i.get(attr, None) diff --git a/accounting/filters/OsgScheddResourcesFilter.py b/accounting/filters/OsgScheddResourcesFilter.py index 9ab856b..989ea76 100644 --- a/accounting/filters/OsgScheddResourcesFilter.py +++ b/accounting/filters/OsgScheddResourcesFilter.py @@ -108,6 +108,25 @@ ] +RESOURCE_TYPES = ["Cpus", "Memory", "Disk", "Gpus"] +FLOORED_RESOURCE_FIELD = "FlooredRequest{resource}" +FLOORED_RESOURCE_SCRIPT = """ +if ( + doc.containsKey("{resource}Provisioned") && + doc["{resource}Provisioned"].size() > 0 && + doc.containsKey("Request{resource}") && + doc["Request{resource}"].size() > 0 && + doc["{resource}Provisioned"].value < doc["Request{resource}"].value + ) {{ + emit(doc["{resource}Provisioned"].value); +}} else if ( + doc.containsKey("Request{resource}") && + doc["Request{resource}"].size() > 0) {{ + emit(doc["Request{resource}"].value); +}} +""" + + # INSTITUTION_DB = get_institution_database() # RESOURCE_DATA = get_topology_resource_data() @@ -259,6 +278,19 @@ def get_query(self, index, start_ts, end_ts, scroll=None, size=500): } } } + # Add floored resource requests (INF-3590) + fields = [] + runtime_mappings = {} + for resource in RESOURCE_TYPES: + fields.append(FLOORED_RESOURCE_FIELD.format(resource=resource)) + runtime_mappings[FLOORED_RESOURCE_FIELD.format(resource=resource)] = { + "type": "long", + "script": { + "source": FLOORED_RESOURCE_SCRIPT.format(resource=resource) + } + } + query["fields"] = fields + query["runtime_mappings"] = runtime_mappings return query @@ -267,6 +299,9 @@ def get_query(self, index, start_ts, end_ts, scroll=None, size=500): # # Get input dict # i = doc["_source"] + # # Get computed fields (as single values instead of arrays) + # f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # # Get output dict for this schedd # schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" # o = data["Schedds"][schedd] @@ -311,22 +346,28 @@ def get_query(self, index, start_ts, end_ts, scroll=None, size=500): # # Compute job units # if i.get("RemoteWallClockTime", 0) > 0: # o["NumJobUnits"].append(get_job_units( - # cpus=i.get("RequestCpus", 1), - # memory_gb=i.get("RequestMemory", 1024)/1024, - # disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + # cpus=f.get("FlooredRequestCpus", 1), + # memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + # disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, # )) # else: # o["NumJobUnits"].append(None) # # Add attr values to the output dict, use None if missing # for attr in filter_attrs: - # o[attr].append(i.get(attr, None)) + # if attr.startswith("Request"): + # o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + # else: + # o[attr].append(i.get(attr, None)) # def user_filter(self, data, doc): # # Get input dict # i = doc["_source"] + # # Get computed fields (as single values instead of arrays) + # f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # # Get output dict for this user # user = i.get("User", "UNKNOWN") or "UNKNOWN" # o = data["Users"][user] @@ -372,9 +413,9 @@ def get_query(self, index, start_ts, end_ts, scroll=None, size=500): # # Compute job units # if i.get("RemoteWallClockTime", 0) > 0: # o["NumJobUnits"].append(get_job_units( - # cpus=i.get("RequestCpus", 1), - # memory_gb=i.get("RequestMemory", 1024)/1024, - # disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + # cpus=f.get("FlooredRequestCpus", 1), + # memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + # disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, # )) # else: # o["NumJobUnits"].append(None) @@ -384,6 +425,8 @@ def get_query(self, index, start_ts, end_ts, scroll=None, size=500): # # Use UNKNOWN for missing or blank ProjectName and ScheddName # if attr in {"ScheddName", "ProjectName"}: # o[attr].append(i.get(attr, i.get(attr.lower(), "UNKNOWN")) or "UNKNOWN") + # elif attr.startswith("Request"): + # o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) # else: # o[attr].append(i.get(attr, None)) @@ -392,6 +435,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -427,13 +473,13 @@ def project_filter(self, data, doc): increased_memory = False if can_increase_memory: try: - request_memory = int(i.get("RequestMemory")) + request_memory = int(f.get("FlooredRequestMemory", i.get("RequestMemory"))) target_memory = int(TRANSFORM_REQUEST_MEMORY_RE.match(increase_memory_expr).group(1)) if target_memory > request_memory: memory_increase_factor = target_memory / request_memory else: increased_memory = True - except (ValueError, AttributeError,): + except (ValueError, AttributeError, TypeError,): increased_memory = possible_vacates_due_to_resources > 0 o["_NumJobsCanBumpRequestMemory"].append(int(can_increase_memory)) @@ -442,7 +488,10 @@ def project_filter(self, data, doc): # Add attr values to the output dict, use None if missing for attr in filter_attrs: - o[attr].append(i.get(attr, None)) + if attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + else: + o[attr].append(i.get(attr, None)) # def institution_filter(self, data, doc): @@ -450,6 +499,9 @@ def project_filter(self, data, doc): # # Get input dict # i = doc["_source"] + # # Get computed fields (as single values instead of arrays) + # f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # # Filter out jobs that did not run in the OS pool # if not self.is_ospool_job(i): # return @@ -486,16 +538,19 @@ def project_filter(self, data, doc): # # Compute job units # if i.get("RemoteWallClockTime", 0) > 0: # o["NumJobUnits"].append(get_job_units( - # cpus=i.get("RequestCpus", 1), - # memory_gb=i.get("RequestMemory", 1024)/1024, - # disk_gb=i.get("RequestDisk", 1024**2)/1024**2, + # cpus=f.get("FlooredRequestCpus", 1), + # memory_gb=f.get("FlooredRequestMemory", 1024)/1024, + # disk_gb=f.get("FlooredRequestDisk", 1024**2)/1024**2, # )) # else: # o["NumJobUnits"].append(None) # # Add attr values to the output dict, use None if missing # for attr in filter_attrs: - # o[attr].append(i.get(attr, None)) + # if attr.startswith("Request"): + # o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) + # else: + # o[attr].append(i.get(attr, None)) def get_filters(self): # Add all filter methods to a list @@ -602,7 +657,7 @@ def merge_filtered_data(self, data, agg): # row["Num Jobs Over Rqst Disk"] = sum([(usage or 0) > (request or 1) # for (usage, request) in zip(data["DiskUsage"], data["RequestDisk"])]) # row["Num Short Jobs"] = sum(self.clean(is_short_job)) - # row["Max Rqst Mem MB"] = max(self.clean(data['RequestMemory'], allow_empty_list=False)) + # row["Max Rqst Mem MB"] = max(self.clean(data["RequestMemory"], allow_empty_list=False)) # row["Med Used Mem MB"] = stats.median(self.clean(data["MemoryUsage"], allow_empty_list=False)) # row["Max Used Mem MB"] = max(self.clean(data["MemoryUsage"], allow_empty_list=False)) # row["Max Rqst Disk GB"] = max(self.clean(data["RequestDisk"], allow_empty_list=False)) / (1000*1000) @@ -741,7 +796,7 @@ def compute_custom_columns(self, data, agg, agg_name): # There is no variance if there is only one value row["Stdv Hrs"] = 0 - memory_requests_sorted = self.clean(data['RequestMemory']) + memory_requests_sorted = self.clean(data["RequestMemory"]) memory_requests_sorted.sort() if len(memory_requests_sorted) > 0: row["Min Allo Mem"] = memory_requests_sorted[ 0] / 1024 @@ -779,7 +834,7 @@ def compute_custom_columns(self, data, agg, agg_name): # There is no variance if there is only one value row["Stdv Allo Mem GBh"] = 0 - memory_usages_sorted = self.clean(data['MemoryUsage']) + memory_usages_sorted = self.clean(data["MemoryUsage"]) memory_usages_sorted.sort() if len(memory_usages_sorted) > 0: row["Min Use Mem"] = memory_usages_sorted[ 0] / 1024 @@ -855,7 +910,7 @@ def compute_custom_columns(self, data, agg, agg_name): # There is no variance if there is only one value row["Stdv Unuse Mem GBh"] = 0 - memory_utility_sorted = self.clean([100*x/y if ((None not in (x, y,)) and (y > 0)) else None for x, y in zip(data['MemoryUsage'], data['RequestMemory'])]) + memory_utility_sorted = self.clean([100*x/y if ((None not in (x, y,)) and (y > 0)) else None for x, y in zip(data["MemoryUsage"], data["RequestMemory"])]) memory_utility_sorted.sort() if len(memory_utility_sorted) > 0: row["Min Util% Mem"] = memory_utility_sorted[ 0] diff --git a/accounting/filters/PathScheddCpuFilter.py b/accounting/filters/PathScheddCpuFilter.py index fdcdf0b..29d99ec 100644 --- a/accounting/filters/PathScheddCpuFilter.py +++ b/accounting/filters/PathScheddCpuFilter.py @@ -113,6 +113,9 @@ def project_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this project project = i.get("ProjectName", i.get("projectname", "UNKNOWN")) or "UNKNOWN" o = data["Projects"][project] @@ -161,6 +164,8 @@ def project_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -169,6 +174,9 @@ def schedd_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this schedd schedd = i.get("ScheddName", "UNKNOWN") or "UNKNOWN" o = data["Schedds"][schedd] @@ -216,6 +224,8 @@ def schedd_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None)) @@ -224,6 +234,9 @@ def user_filter(self, data, doc): # Get input dict i = doc["_source"] + # Get computed fields (as single values instead of arrays) + f = {k: v[0] for k, v in doc.get("fields", {}).items()} + # Get output dict for this user user = i.get("User", "UNKNOWN") or "UNKNOWN" o = data["Users"][user] @@ -275,6 +288,8 @@ def user_filter(self, data, doc): o[attr].append(int(float(i.get(attr)))) except TypeError: o[attr].append(None) + elif attr.startswith("Request"): + o[attr].append(f.get(f"Floored{attr}", i.get(attr, None))) else: o[attr].append(i.get(attr, None))