-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbench_python.py
More file actions
215 lines (186 loc) · 7.57 KB
/
bench_python.py
File metadata and controls
215 lines (186 loc) · 7.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
#!/usr/bin/env python3
#python3 bench_python.py
#python3 bench_python.py --repeat 5 --sieve-limit 3000000 --matmul-n 220 --regex-len 8000000 --io-kb 32768
#!/usr/bin/env python3
import argparse
import os
import random
import re
import sys
import tempfile
import time
def now():
return time.perf_counter()
def time_call(fn, *args, **kwargs):
t0 = now()
result = fn(*args, **kwargs)
t1 = now()
return result, (t1 - t0)
# --- Tasks -------------------------------------------------------------
def sieve_primes(limit: int):
if limit < 2:
return []
sieve = bytearray(b"\x01") * (limit + 1)
sieve[0:2] = b"\x00\x00"
m = int(limit ** 0.5)
for p in range(2, m + 1):
if sieve[p]:
step = p
start = p * p
sieve[start:limit + 1:step] = b"\x00" * ((limit - start) // step + 1)
return [i for i, v in enumerate(sieve) if v]
def matmul_naive(n: int):
# Deterministic pseudo-random matrices
random.seed(42)
A = [[(i * 31 + j * 17) % 97 for j in range(n)] for i in range(n)]
B = [[(i * 13 + j * 7) % 89 for j in range(n)] for i in range(n)]
C = [[0] * n for _ in range(n)]
for i in range(n):
Ai = A[i]
Ci = C[i]
for k in range(n):
aik = Ai[k]
Bk = B[k]
for j in range(n):
Ci[j] += aik * Bk[j]
checksum = sum(c for row in C for c in row) % (10**9 + 7)
return checksum
def regex_parse_count(total_len: int):
"""
Match Perl's deterministic text generator:
- Use the same LCG: state = (1103515245*state + 12345) % 0x7fffffff
- Build "val{v1}, foo{v2}; {v3}|bar{v4} " chunks until >= total_len
- Slice to exact total_len
"""
def rnd_next(state):
return (1103515245 * state + 12345) % 0x7fffffff
state = 1234 # same seed as Perl
s_parts = []
total = 0
while total < total_len:
state = rnd_next(state); v1 = state % 1_000_000
state = rnd_next(state); v2 = state % 1_000_000
state = rnd_next(state); v3 = state % 1_000_000
state = rnd_next(state); v4 = state % 1_000_000
chunk = f"val{v1}, foo{v2}; {v3}|bar{v4} "
s_parts.append(chunk)
total += len(chunk)
s = "".join(s_parts)[:total_len]
numbers = re.findall(r"\b\d+\b", s)
return len(numbers), sum(int(x) for x in numbers) % (10**9 + 7)
def file_io(size_kb: int):
"""
Match Perl behavior:
- Repeat a fixed ASCII line until >= target bytes
- Slice to exact target length
- Compute checksum over exact bytes
"""
line = "The quick brown fox jumps over the lazy dog. 1234567890\n".encode()
target = size_kb * 1024
blob = bytearray()
while len(blob) < target:
blob.extend(line)
blob = bytes(blob[:target]) # slice to exact target
checksum_w = sum(blob) % (10**9 + 7)
with tempfile.NamedTemporaryFile(delete=False) as f:
fname = f.name
f.write(blob)
try:
with open(fname, "rb") as f:
content = f.read()
checksum_r = sum(content) % (10**9 + 7)
finally:
try:
os.remove(fname)
except OSError:
pass
return len(blob), checksum_w, checksum_r
# --- Runner ------------------------------------------------------------
def run_task(task_name, fn, *args):
result, elapsed = time_call(fn, *args)
return task_name, elapsed, result
def main():
parser = argparse.ArgumentParser(description="Python micro-benchmarks comparable to pure Perl.")
parser.add_argument("--repeat", type=int, default=3, help="Timed iterations per task (default: 3)")
parser.add_argument("--no-warmup", action="store_true", help="Disable warm-up run")
parser.add_argument("--sieve-limit", type=int, default=2_000_000, help="Prime sieve limit")
parser.add_argument("--matmul-n", type=int, default=180, help="Matrix size N for NxN naive multiply")
parser.add_argument("--regex-len", type=int, default=5_000_000, help="Target string length for regex parse")
parser.add_argument("--io-kb", type=int, default=16_384, help="File I/O size in KB (default: 16384 = 16 MB)")
parser.add_argument("--verbose", action="store_true", help="Print per-step progress")
parser.add_argument("--only", choices=["sieve", "matmul", "regex", "fileio"], nargs="*", help="Run only these tasks")
parser.add_argument("--skip", choices=["sieve", "matmul", "regex", "fileio"], nargs="*", help="Skip these tasks")
parser.add_argument("--quick", action="store_true", help="Use smaller sizes for a quick smoke test")
args = parser.parse_args()
if args.quick:
args.repeat = min(args.repeat, 1)
args.sieve_limit = min(args.sieve_limit, 200_000)
args.matmul_n = min(args.matmul_n, 100)
args.regex_len = min(args.regex_len, 1_000_000)
args.io_kb = min(args.io_kb, 1024)
header = f"Python {sys.version.split()[0]} | repeats={args.repeat} | warmup={'off' if args.no_warmup else 'on'}"
print(header, flush=True) # force immediate output
task_specs = [
("sieve", sieve_primes, (args.sieve_limit,)),
("matmul", matmul_naive, (args.matmul_n,)),
("regex", regex_parse_count, (args.regex_len,)),
("fileio", file_io, (args.io_kb,)),
]
# Filter by --only / --skip
if args.only:
only_set = set(args.only)
task_specs = [t for t in task_specs if t[0] in only_set]
if args.skip:
skip_set = set(args.skip)
task_specs = [t for t in task_specs if t[0] not in skip_set]
if not task_specs:
print("No tasks selected.", flush=True)
return
if args.verbose:
print(f"[info] tasks: {', '.join(t[0] for t in task_specs)}", flush=True)
# Warm-up
if not args.no_warmup:
for name, fn, fn_args in task_specs:
if args.verbose:
print(f"[warmup] {name}...", flush=True)
_ = fn(*fn_args)
# Timed runs
results = {}
for name, fn, fn_args in task_specs:
times = []
last_result = None
if args.verbose:
print(f"[run] {name} x{args.repeat}", flush=True)
for r in range(args.repeat):
if args.verbose:
print(f" - iter {r+1}/{args.repeat}", flush=True)
last_result, elapsed = time_call(fn, *fn_args)
times.append(elapsed)
results[name] = {
"min": min(times),
"avg": sum(times) / len(times),
"max": max(times),
"last_result": last_result
}
# Report
print("\n=== Results (seconds) ===", flush=True)
width = max(len(n) for n,_,_ in task_specs) + 2
print(f"{'task'.ljust(width)} min avg max", flush=True)
print("-" * (width + 28), flush=True)
for name in [t[0] for t in task_specs]:
r = results[name]
print(f"{name.ljust(width)} {r['min']:.6f} {r['avg']:.6f} {r['max']:.6f}", flush=True)
# Correctness crumbs
sieve_count = len(sieve_primes(args.sieve_limit)) if any(t[0]=="sieve" for t in task_specs) else None
if sieve_count is not None:
print(f"\n(check) primes <= {args.sieve_limit}: {sieve_count}", flush=True)
if "matmul" in results:
print(f"(check) matmul N={args.matmul_n} checksum: {results['matmul']['last_result']}", flush=True)
if "regex" in results:
rr = results['regex']['last_result']
print(f"(check) regex: count={rr[0]}, checksum={rr[1]}", flush=True)
if "fileio" in results:
fio = results['fileio']['last_result']
print(f"(check) fileio bytes={fio[0]}, checksum write={fio[1]}, read={fio[2]}", flush=True)
if __name__ == "__main__":
main()