Skip to content

srmbsrg/ThreadingTemplate

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 

Repository files navigation

ThreadingTemplate

Portfolio note: This repository demonstrates async I/O operations in a production-grade .NET pipeline — specifically how to move high-volume, I/O-bound document-processing work off the main thread without sacrificing control over concurrency or resource utilisation.


What this is

DocumentNOTDExportTask is a scheduled batch-processing engine built for a toll-notification mail-house system. Every night it ingests thousands of Notice of Toll Due (NOTD) records from a backend database, transforms them into structured JSON payloads, bundles associated vehicle-image files (full-frame + licence-plate ROI crops) into partitioned ZIP archives, and ships the whole batch outbound over FTP/SFTP to a downstream print-and-mail vendor.

The volume and I/O profile make synchronous, sequential processing impractical: each record requires multiple database round-trips, and each image bundle involves reading arbitrarily large binary assets from a file server before compressing them to disk. The codebase addresses this with a layered concurrency model that keeps every I/O stage non-blocking at the caller boundary while giving the operator runtime control over how aggressively threads are spun up.


Threading & Async Model

The design intentionally combines two levels of concurrency rather than choosing one or the other:

Level 1 — async/await + Task.Run (caller boundary)

ProcessBatchTask and ZipBundlesWImages are declared async Task<T>. Each one immediately offloads its body onto the thread-pool via Task.Run(...), freeing the calling thread (the scheduler) to continue or block with a simple task.Wait() at a well-defined join point.

public static async Task<SuccessDC> ProcessBatchTask(
    List<MHHeaderRecordDC> notdFilesToProcess, ...)
{
    await Task.Run(() =>
    {
        // explicit Thread[] management lives here
        ...
    });
    return success;
}

This keeps the top-level RunTask() loop readable and deterministic: it fires a batch, waits, fires the next, waits — without ever blocking a CLR thread-pool thread on I/O.

Level 2 — Explicit Thread[] arrays (worker layer)

Inside the Task.Run lambda, the code does not delegate further to the thread pool. Instead it allocates a fixed-size Thread[], starts each thread, and calls Join() on the full list before returning. This is a deliberate choice (discussed in Design Decisions below).

Thread[] notdThread = new Thread[threadCount];
for (int i = 0; i < notdFilesToProcess.Count(); i += BatchSizeToProcessWImages)
{
    var chunk = notdFilesToProcess.Skip(i).Take(BatchSizeToProcessWImages).ToList();
    notdThread[threadIteration] = new Thread(
        () => notdBatchProcess(chunk, ...));
    notdThread[threadIteration].Start();
    notdThreadList.Add(notdThread[threadIteration]);
    threadIteration++;
}
foreach (Thread t in notdThreadList)
    t.Join();

The same pattern is used in ZipBundlesWImages for the image-compression workers.

Level 3 — Parallel.ForEach (record-level data parallelism)

Inside each worker thread, ProcessSubSetOfFiles uses Parallel.ForEach with MaxDegreeOfParallelism = -1 (unbounded, deferred to the runtime) to fan out individual record processing across TPL's internal partitioner. This is appropriate here because each record's work is a short DB call — the overhead of spinning a full Thread per record would outweigh the gain.

Parallel.ForEach(filesToProcess,
    new ParallelOptions { MaxDegreeOfParallelism = -1 },
    file => ProcessFileData(file, _MHBO));

Architecture

RunTask()
│
├── GetDataFileListToProcess()          // DB: fetch pending NOTD header records
│   └── Filter by fileType, order, deduplicate
│
└── for each chunk of TaskBundleSize records:
    │
    └── ProcessBatchTask(chunk)         // async Task<SuccessDC>
        │                               // offloads to Task.Run → frees scheduler thread
        │
        ├── Compute threadCount = ceil(chunk.Count / BatchSizeToProcessWImages)
        │
        ├── Spin up Thread[threadCount], each running notdBatchProcess(subChunk)
        │   │
        │   ├── ProcessSubSetOfFiles()  // Parallel.ForEach → DB inserts per record
        │   │   └── ProcessFileData()  // switch on fileType → _MHBO.Insert*FromAMS()
        │   │
        │   ├── ExportSubSetOfFiles()  // serialize records → JSON bundles on disk
        │   │   └── CreateBundledFilesToTransfer() per fileType
        │   │
        │   └── ExportSubSetOfImages() // resolve image paths → call ZipBundlesWImages
        │       │
        │       └── ZipBundlesWImages() // async Task
        │           └── Spin up Thread[n], each running zipBatch(imageChunk)
        │               └── ZipArchive.CreateEntryFromFile() per image
        │
        └── t.Join() — all threads must complete before next chunk starts

    └── GC.Collect() × 2 — explicit pressure release after large image batch

Data flow summary: DB records → in-memory List<MHHeaderRecordDC> → parallel DB hydration → JSON serialisation to disk → parallel image ZIP bundling to disk → FTP/SFTP upload (UploadFiles()).

The FTP layer supports both standard FTP-over-SSL and SFTP via SSH private-key authentication, configured at runtime.


Key Design Decisions & Tradeoffs

Why explicit Thread[] instead of Parallel.ForEach or more Task.Run calls?

The ZIP and image-export work is heavily I/O-bound (reading large binary files from a network share, writing to a local disk before FTP). The CLR thread pool is optimised for short CPU-bound tasks and will throttle injection for long-running I/O workers. By using new Thread() the code guarantees a dedicated OS thread per batch — the OS scheduler sees actual blocked I/O and can park the thread during waits without pool exhaustion affecting other work in the process.

The tradeoff is that thread creation is expensive. BatchSizeToProcessWImages must be tuned high enough that the per-thread setup cost is amortised across enough file work; misconfiguration results in thread-creation overhead dominating runtime.

async/await wrapping synchronous Thread management

The async Task<T> boundary at ProcessBatchTask and ZipBundlesWImages is thin — both simply await Task.Run(lambda) where the lambda does synchronous Thread work. The value is at the call site: RunTask() can fire task.Wait() cleanly, and if the architecture ever evolves to fire multiple outer tasks concurrently, the async signature is already in place with no refactoring required.

Configurable parallelism via app.config

All parallelism parameters (MaxDegreeOfParallel, BatchSizeToProcess, BatchSizeToProcessWImages, TaskBundleSize) are externalised to config rather than hardcoded. This allows operators to tune thread counts per deployment environment without recompiling — critical in production systems where the target server's core count and I/O throughput vary across environments (dev, staging, prod).

Two-level batching

The outer TaskBundleSize loop controls how many records are inflight across threads at any one time, acting as a soft backpressure gate. The inner BatchSizeToProcessWImages controls how many records each thread processes. This two-level model lets operators independently tune throughput vs. memory footprint: a large TaskBundleSize with a small BatchSizeToProcessWImages maximises parallelism but holds more records in memory simultaneously.

Error handling strategy: resilience over observability

Most catch blocks are written as catch { } with log.Error(...) rather than propagating exceptions up the call stack. This is a deliberate production tradeoff — a single bad record (corrupted image path, missing DB row) should not abort an entire nightly batch of tens of thousands of records. The erroredNOTD list tracks failed items for a subsequent retry pass (ExportMissedFiles config flag).

The downside is that silent swallowing makes root-cause analysis harder. A production hardening step would replace bare catch { } with catch (Exception ex) { log.Error(...); erroredNOTD.Add(id); } consistently.

Explicit GC pressure management

After large image batches, the code calls GC.Collect() twice with GC.WaitForPendingFinalizers() between them. This forces collection of the large Dictionary<string, string> image maps and ZipArchive stream objects that might otherwise linger in Gen 1/2 and delay reclamation. In a server process with no UI, this is reasonable — the batch is a known high-watermark phase, and deterministic cleanup prevents the GC from running mid-FTP-upload.

Static locks

zipLock and notdLock are declared static readonly object fields used in the worker methods. This provides intra-process mutual exclusion without Mutex overhead, appropriate since all threads are within the same AppDomain. Note that the actual lock usage appears minimal in the critical paths reviewed — most shared-state isolation is achieved structurally by partitioning work into non-overlapping chunks before thread dispatch.


Configuration Reference

All parameters are read from app.config via tcore.Common.Utils.GetConfiguration():

Key Type Description
OutDirectory string Root path for finished ZIP/JSON output
TempDirectory string Scratch space during processing
ArchiveDirectory string Destination for successfully processed files
FailedDirectory string Destination for files that errored
TaskBundleSize int Outer batch size (records per async task)
BatchSizeToProcess int Records per thread (text/JSON processing)
BatchSizeToProcessWImages int Records per thread (image/ZIP processing)
MaxDegreeOfParallel int Passed to ParallelOptions for Parallel.ForEach
ParallelProcess bool Master switch for parallel execution
ProcessSubsetInParallel bool Enable parallel record DB hydration
ExportSubsetInParallel bool Enable parallel JSON export
ExportMissedFiles bool Re-attempt previously errored files
FTPEnabled bool Enable outbound FTP upload
OutgoingFTPLocation string FTP host
OutgoingFTPUser string FTP username
OutgoingFTPPassword string FTP password
OutgoingFTPEnableSSL bool Enable FTP-over-SSL
useSshPrivateKey bool Use SFTP with SSH key auth
sshPrivateKeyPath string Path to .pem/.ppk private key file
sshPrivateKeyPassphrase string Passphrase for encrypted private key
NewNonNOTDFormat bool Switch between legacy NOTD mode and new summary-only format
NixieNCOASNeededToSend int Threshold for NCOA/Nixie processing
ImageLocalDirectory string Local cache path for vehicle images

Quickstart

This task is designed to run as a Windows scheduled job or as part of a task-runner host that calls RunTask(). There is no standalone CLI entry point in this repository — it is extracted from a larger tcore service solution.

To integrate:

  1. Reference the tcore.BusinessObjects, tcore.DataContract, tcore.Facades, and tcore.FileServerManager assemblies from your solution.
  2. Populate app.config with the keys from the table above.
  3. Instantiate and call:
var task = new DocumentNOTDExportTask();
task.RunTask();

Or wire it into a host that calls RunTask() on a cron/windows-scheduler trigger.

Tuning guidance:

  • Start with TaskBundleSize = 50, BatchSizeToProcessWImages = 10, MaxDegreeOfParallel = 4.
  • Monitor disk I/O and thread count under load. If ZIP workers are bottlenecked on disk writes, increase BatchSizeToProcessWImages to amortise thread setup; if memory pressure rises, decrease TaskBundleSize.
  • Enable ExportMissedFiles = true in subsequent runs to catch any records that errored in the primary pass.

Where This Pattern Applies in Production

The hybrid async/await + explicit-thread + Parallel.ForEach model in this codebase directly addresses a class of problems common in enterprise backend services:

  • ETL pipelines processing large record sets with per-record DB I/O and file output
  • Document generation systems (PDF rendering, image bundling, ZIP packaging at scale)
  • Outbound data feeds to third-party vendors with FTP/SFTP delivery requirements
  • Nightly batch jobs in .NET Framework environments where async stream APIs are unavailable or impractical
  • Any system where work units are large enough that thread-pool saturation is a real risk and explicit thread lifecycle management is preferred over TPL's automatic scheduling

Tech Stack

Component Technology
Language C# (.NET Framework)
Async model System.Threading.Tasks (async/await, Task.Run)
Parallelism System.Threading.Thread, Parallel.ForEach
Compression System.IO.Compression.ZipArchive
Serialisation System.Web.Script.Serialization.JavaScriptSerializer
Logging log4net
File transfer FTP-over-SSL / SFTP (SSH private key) via tcore.FileServerManager
Database access tcore.Facades / tcore.BusinessObjects (custom DAL)
Configuration app.config via tcore.Common.Utils.GetConfiguration()

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors