Back to blog
elixir broadway data-pipeline legacy-integration

Syncing Legacy Veterinary Software: A Broadway Pipeline War Story

Alembic Labs ·

I joined Vetolib as a senior engineer. Six months later, the CTO left. Suddenly, I was the one responsible for solving our biggest technical challenge: syncing data from multiple Practice Management Systems—the kind of software that runs on a Windows machine in the back office, hasn’t been updated since 2015, and definitely doesn’t have a REST API.

“We’re flying blind,” the previous architecture told me. The PMS initiates sync, calls our API, then polls to check if we’re done. Zero visibility into what’s happening. No Grafana, no dashboards, nothing. When something fails, we only find out when a vet calls support.

This is the story of how I rebuilt the data pipeline with flat files, a Rust Lambda, and Elixir Broadway—and went from callback chaos to processing 20,000+ bookings per PMS with full observability.

The Original Sin: Callback-Based Sync

The existing architecture looked something like this:

┌─────────────────┐         ┌─────────────────┐
│   PMS (Legacy)  │────────▶│   Vetolib API   │
│   On-Premise    │ "sync!" │                 │
└─────────────────┘         └────────┬────────┘
        │                            │
        │ poll poll poll             │ ???
        │◀───────────────────────────┘
        │
   "is it done yet?"
   "is it done yet?"
   "is it done yet?"

The problems were numerous:

  1. No observability: The API was a black box. When sync failed, nobody knew why until angry phone calls came in.

  2. Callback hell: The PMS initiated everything. We were passive receivers with no control over timing, batching, or error handling.

  3. Tight coupling: Every PMS vendor had slightly different sync behaviors. The API had grown into a maze of vendor-specific conditionals.

  4. No backpressure: If a PMS decided to dump 50,000 records at once, we’d either crash or slow to a crawl.

The platform needed to sync bookings, but a booking isn’t just a booking. To create one, you need:

  • Owner (a pet belongs to someone)
  • Animal (one owner can have N animals)
  • Availability (when the vet is free)
  • Unavailability (when they’re not)
  • Reasons (why you’re booking: vaccination, surgery, checkup…)

All of these have foreign key relationships. All of them need to sync in the right order. And all of them could fail independently.

The Constraint That Changed Everything

Here’s the thing about legacy PMS software: you can’t reach it.

These systems run on-premise. Behind firewalls. On Windows machines that the vet clinic’s nephew set up five years ago. There’s no API to call, no webhook to configure, no way to “pull” data.

We had to flip the model. Instead of us waiting for callbacks, the PMS would push flat files.

The New Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                           DATA PIPELINE                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌──────────┐     ┌──────────────┐     ┌─────────────┐     ┌─────────────┐  │
│  │   PMS    │     │     S3       │     │    Rust     │     │   AWS SQS   │  │
│  │ (Legacy) │────▶│   Bucket     │────▶│   Lambda    │────▶│   Queues    │  │
│  │          │     │              │     │             │     │             │  │
│  └──────────┘     └──────────────┘     └─────────────┘     └─────────────┘  │
│    drops CSVs      /pms-001/             chunks &           one queue       │
│    per topic       /pms-002/             routes             per PMS         │
│                    /pms-003/                                                │
│                                                                             │
│                                                     │                       │
│                                                     ▼                       │
│                                          ┌─────────────────┐                │
│          ┌─────────────────┐             │     Elixir      │                │
│          │   Result Box    │◀────────────│    Broadway     │                │
│          │   (OK / KO)     │             │   Consumers     │                │
│          └─────────────────┘             └─────────────────┘                │
│           PMS reads back                   backpressure                     │
│           Vetolib IDs                      circuit breaker                  │
│                                            ordered processing               │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Step 1: Flat Files with Enforced Ordering

Each PMS would deposit CSVs into an S3 bucket, organized by topic:

/pms-001/
  ├── 001_owners.csv
  ├── 002_animals.csv
  ├── 003_availability.csv
  ├── 004_unavailability.csv
  ├── 005_reasons.csv
  ├── 006_hospitals.csv
  └── 007_bookings.csv

The numeric prefix wasn’t decoration—it enforced processing order. You can’t create an animal without an owner. You can’t create a booking without an animal, availability, and reason. The file system became our state machine.

Step 2: Rust Lambda for Chunking and Routing

Why Rust for the Lambda? Two reasons:

  1. Cold start performance: Rust Lambdas start in ~10ms vs 500ms+ for JVM-based solutions
  2. Memory efficiency: We were processing large CSVs and needed predictable memory usage

The Lambda’s job was simple but critical:

// Pseudocode - actual implementation was more involved
fn handle_s3_event(event: S3Event) -> Result<()> {
    let pms_id = extract_pms_id(&event.bucket_key)?;
    let topic = extract_topic(&event.bucket_key)?;
    
    // Read CSV and chunk into manageable batches
    let records = read_csv(&event)?;
    let chunks = records.chunks(500);
    
    // Get or create dedicated queue for this PMS
    let queue_url = ensure_queue_exists(pms_id)?;
    
    for (index, chunk) in chunks.enumerate() {
        let message = Message {
            pms_id,
            topic,
            sequence: index,
            total_chunks: chunks.len(),
            records: chunk,
        };
        
        sqs.send_message(queue_url, message)?;
    }
    
    Ok(())
}

Each PMS got its own SQS queue. This gave us:

  • Isolation: A misbehaving PMS couldn’t poison the global queue
  • Visibility: We could see exactly how backed up each PMS was
  • Control: We could pause/resume individual PMS syncs

Step 3: Broadway for the Heavy Lifting

This is where Elixir shines. Broadway gave us everything we needed out of the box:

defmodule Vetolib.SyncPipeline do
  use Broadway

  alias Broadway.Message

  @impl true
  def start_link(opts) do
    pms_id = Keyword.fetch!(opts, :pms_id)
    queue_url = Keyword.fetch!(opts, :queue_url)

    Broadway.start_link(__MODULE__,
      name: :"sync_pipeline_#{pms_id}",
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url: queue_url,
          config: [region: "eu-west-1"]
        },
        concurrency: 1  # Preserve ordering within PMS
      ],
      processors: [
        default: [concurrency: 10]
      ],
      batchers: [
        ok: [concurrency: 2, batch_size: 100],
        ko: [concurrency: 1, batch_size: 50]
      ],
      context: %{pms_id: pms_id}
    )
  end

  @impl true
  def handle_message(_, %Message{data: data} = message, context) do
    case process_record(data, context.pms_id) do
      {:ok, vetolib_id} ->
        message
        |> Message.update_data(fn _ -> {data, vetolib_id} end)
        |> Message.put_batcher(:ok)

      {:error, reason} ->
        message
        |> Message.update_data(fn _ -> {data, reason} end)
        |> Message.put_batcher(:ko)
    end
  end

  @impl true
  def handle_batch(:ok, messages, _batch_info, context) do
    # Write successful syncs to OK mailbox
    results = Enum.map(messages, fn %{data: {original, vetolib_id}} ->
      %{original_id: original.id, vetolib_id: vetolib_id}
    end)

    write_to_mailbox(context.pms_id, :ok, results)
    messages
  end

  @impl true
  def handle_batch(:ko, messages, _batch_info, context) do
    # Write failures to KO mailbox for retry/investigation
    errors = Enum.map(messages, fn %{data: {original, reason}} ->
      %{original_id: original.id, error: reason}
    end)

    write_to_mailbox(context.pms_id, :ko, errors)
    messages
  end
end

The beauty of Broadway here:

  • Backpressure: If we can’t keep up, the producer slows down automatically
  • Batching: We batch database writes for efficiency
  • Error isolation: A bad record doesn’t crash the pipeline

Step 4: Circuit Breaker for Rotten Data

Legacy data is messy. We learned this the hard way when a PMS sent us animals with birth dates in 1842.

defmodule Vetolib.CircuitBreaker do
  use GenServer

  @error_threshold 0.3  # 30% error rate triggers circuit
  @window_size 100      # Look at last 100 records

  def check(pms_id) do
    case get_error_rate(pms_id) do
      rate when rate > @error_threshold ->
        Logger.warning("Circuit OPEN for #{pms_id}: #{rate * 100}% error rate")
        pause_pipeline(pms_id)
        notify_ops_team(pms_id, rate)
        :circuit_open

      _ ->
        :ok
    end
  end

  defp pause_pipeline(pms_id) do
    # Broadway supports graceful pause
    Broadway.Topology.ProducerStage.drain(:"sync_pipeline_#{pms_id}")
  end
end

When error rate exceeded 30%, we’d pause that PMS’s pipeline and alert us. Usually it meant someone had exported the wrong file or the PMS had corrupted data.

The Mailbox System

Remember, the PMS needs to know what Vetolib IDs were assigned to their records. Without this, they can’t update or reference bookings later.

We deposited results in S3 “mailboxes”:

/results/pms-001/
  ├── ok/
  │   ├── 2024-01-15T10:30:00_owners.json
  │   ├── 2024-01-15T10:30:05_animals.json
  │   └── 2024-01-15T10:31:00_bookings.json
  └── ko/
      └── 2024-01-15T10:30:02_animals_errors.json

The PMS would poll their mailbox (much better than polling our API) and process the ID mappings at their own pace.

The Results

Before:

  • Sync initiated by PMS (no control)
  • Zero observability
  • Failures discovered via angry phone calls
  • No backpressure (occasional crashes)

After:

  • 3+ PMS actively syncing through the pipeline
  • Initial sync: 8,000 to 20,000 bookings per PMS (plus all prerequisite entities)
  • Ongoing sync: 100-500 bookings per cycle
  • Full observability via CloudWatch + Grafana dashboards
  • Circuit breaker catching bad data before it pollutes the database
  • Backpressure handling burst loads gracefully

The real number is much higher than “just bookings” because every booking requires owner + animal + availability + unavailability + reason to exist first. A “20k booking sync” is really closer to 100k+ total records when you count dependencies.

Lessons Learned

1. Sometimes flat files are the answer. When you can’t call an API, make the client push files. It’s not sexy, but it works with literally any system that can write to disk.

2. One queue per tenant is worth the overhead. The operational visibility alone justifies it. When PMS-002 is backed up, you know immediately without digging through logs.

3. Broadway’s backpressure is a superpower. We never had to think about rate limiting or queue overflow. Broadway just… handles it.

4. Circuit breakers aren’t optional for legacy integrations. Legacy data will surprise you. Plan for it.

5. Rust + Elixir is a powerful combo. Rust for the sharp, performance-critical edges (Lambda cold starts, CSV parsing). Elixir for the stateful, concurrent, observable core.


Need help building resilient data pipelines? We specialize in Elixir systems that tame legacy integrations. Get in touch.