Functionally chaotic

Sometime in July 2019, the middle of summer break, I decided to take a deep dive into the wondrous world of functional programming in Scala. A friend had been using it for a while now, and the syntax was too weird to pass up. However, I didn’t know any Scala, nor did I know any functional programming.

I scoured the net for learning materials and found two books: “Essential Scala” and “Functional Programming in Scala”. The former functioned as a primer to basic Scala concepts and syntax, while the latter would introduce me to functional programming through Scala.

Both books were wonderful, clearly written, and to the point. However, I never finished the second book. While essential to understand concepts, the exercises were very rigorous. With a few months of free time on my hands, my interests eventually floated elsewhere.

Several months later, I’m travelling for Thanksgiving in November. I decided to revisit functional Scala. This time, I had another book in my arsenal: “Scala with Cats”. While I didn’t finish that book either, I got further into it, and it taught me some of the fundamental Cats concepts that I would need later on.

Fast forward to January of this year (last time skip, I promise!), and I have written my first program using cats and cats-effect:

import cats._
import cats.implicits._
import cats.effect._

import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths, Path}

object ReadFile extends IOApp {
  def readFile[F[_]: Sync](path: Path): F[Either[Throwable, String]] =
    Sync[F].delay(Files.readString(path, StandardCharsets.UTF_8)).attempt

  def program[F[_]: Sync](path: String): F[Unit] =
    for {
      fileResult <- readFile[F](Paths.get(path))
      text = fileResult match {
        case Left(error) => s"failed to read file: $error"
        case Right(text) => text
      }
      _ <- Sync[F].delay(println(text))
    } yield ()

  def run(args: List[String]): IO[ExitCode] =
    program[IO](args.headOption.getOrElse("build.sbt")).as(ExitCode.Success)
}

This code reads a file into memory and prints its contents to standard output, followed by a newline. build.sbt by default, but you can specify a path as the first argument. Here’s some Python code that achieves the same thing:

import sys
from pathlib import Path

path = Path(sys.argv[1] if len(sys.argv) > 1 else "build.sbt")

try:
    print(path.read_text())
except Exception as error:
    print(f"failed to read file: {error}")

Here’s how I would write the Scala version today:

import cats._
import cats.effect._
import cats.implicits._
import fs2._

import java.nio.file._

object ReadFileBetter extends IOApp {
  def eputs(text: String): IO[Unit] = IO {
    Console.err.println(text)
  }

  def run(args: List[String]): IO[ExitCode] = {
    val target = Paths.get(args.headOption.getOrElse("build.sbt"))

    Blocker[IO].use { blocker =>
      io.file
        .readAll[IO](target, blocker, 1024)
        .through(io.stdout(blocker))
        .compile
        .drain
        .as(ExitCode.Success)
        .handleErrorWith { error =>
          eputs(s"something went wrong: $error").as(ExitCode.Error)
        }
    }
  }
}

What’s important about this new iteration is that the file is streamed to standard output, instead of being read to memory as the other example does. fs2 helps us with this.

The readAll call returns a Stream[IO, Byte] (a stream of bytes), representing the contents of the file. This is then piped directly to stdout using through.

Furthermore, the contents aren’t decoded at all, allowing us to read binaries.

Because the contents of the file are streamed in 1 KiB chunks, this operation only takes linear memory. For example, we can stream a 100 MiB file to standard output with only 5 MiB of Java heap space. (Which does work, by the way.)

Some other notes:

  • A Blocker is used to explicitly encapsulate blocking IO tasks.
  • Instead of returning Eithers to indicate errors, we handle errors through the IO monad (handleErrorWith).
    • Also, a nonzero exit code is properly issued upon an error.
  • The code is no longer polymorphic (generic) on the effect monad, instead opting for IO all the way for the sake of brevity.

chaos

After that short experiment, I decided to tackle something more ambitious: an experimental Discord build scraper written in functional Scala, named chaos. It’s nearly April now, and I’ve been working on it since Februrary. It’s been a great way to practice and apply what I learned in the FP books.

The Concept

A Discord build scraper can be boiled down to a fancy cURL invocation wrapped in a while True + sleep loop:

while (true) {
  // Make some requests to Discord...
  let builds = scrapeBuilds();

  if (!newBuilds(builds)) {
    sleep(60 * 5);
    continue;
  }

  // Publish builds wherever necessary...
  publishBuilds(builds);

  // Remember latest builds between program restarts...
  saveState(builds);

  sleep(60 * 5);
}

I figured that something with relatively low complexity would be a good target to channel my energy into for the coming weeks.

Planning

From the beginning of the project, I decided to use fs2 streams wherever possible. In fact, that entire while (true) loop above is modeled as an infinite stream, and I wanted an easy way to get a build stream from a Discord branch (Stream[F, Build]). First, some terminology:

source
something that has an infinite build stream
publisher
something that consumes builds

Now here’s a simplified rundown of what chaos does:

  1. A source-to-publisher mapping is derived from the configuration file.
    • This involves parsing and “selecting” sources from the config file with syntax like fe:canary, fe:{ptb,stable}, etc.
  2. For each source, an infinite build stream is created.
    • Each build stream makes periodic requests to Discord, emitting a thin Build object that has a Vector[Asset] and other information such as the build number.
  3. The build stream is “watched” for changes in build metadata in order to detect new builds.
  4. At this point, we have a stream of streams. This is then joined into one stream that runs the inner streams concurrently, giving us a stream that emits builds from across all sources.
  5. This stream of all builds is continuously serialized into a key-value mapping and piped to a file on disk, so we can read it later when the program restarts for whatever reason.

Because this stream is infinite, it runs forever, polling builds from Discord at a set interval and publishing them.

“One Big Stream”

Here is the actual code that the rundown above described. Surprisingly, one huge stream glues most of that logic in around 30 lines of code. Let’s do an incremental deep dive. I don’t expect you to understand the code without a familiarity of Scala, but the general idea should be grasped. (I hope.) What I’m trying to show here is how an impure while (true) loop, like the one above, can be modeled in a pure way in functional Scala land.

It’s interesting because functional programming forces you to model your program in a pure, referentially transparent way, which can be difficult if you are used to programming imperatively.

for {
  // Get our initial state from the state file, otherwise fall back to the
  // default initial state.
  initialState <- readStateFile.map(_.getOrElse(defaultInitialState))

First, we start a “for-comprehension”. If you are familiar with Haskell, this is very similar to do-notation. We then read the initial state file, falling back to the default initial state (which is just an empty map).

We call map because readStateFile evaluates to an F[Option[Map[String, Int]]], which can be read “an eFfectful operation that gives a possibly absent map of strings to int”.

The map call allows us to “modify” the Option inside. In this case, we transform it to its inner value, opting for defaultInitialState if it isn’t present. This means that we get an F[Map[String, Int]] in the end. The <- “operator” (for-comprehension syntax sugar) boils it down to a mere Map[String, Int].

It is somewhat similar to this JavaScript snippet:

let initialState = readStateFile();

if (initialState == null) {
  initialState = defaultInitialState;
}

Now it’s time to create the actual stream which will end up handling the build-to-publisher pipeline that drives the entire application:

  // Create an initial stream containing the source to publisher mapping.
  poll = Stream
    .emits(mapping.toList)

We start with a stream that is simply of the source-to-publisher mapping that was derived earlier in the code. Using further combinators (methods that transform the stream), we can outline the desired behavior.

We convert the mapping into a list so we have a Stream[(SelectedSource, Set[Publisher])]: a stream of tuples of a SelectedSource and a set of Publishers.

If you wanted to publish new Discord Canary builds to a Discord webhook for example, this is what the mapping would look like:

Map(
  SelectedSource("fe:canary", FrontendSource(...)) -> WebhookPublisher(...)
)

And the resulting stream:

Stream(
  (SelectedSource(...), WebhookPublisher(...))
)

Next, we transform this stream of tuples into a stream of build streams. We started with a stream in the first place because fs2 lets us concurrently join a stream of streams into a singular stream, a surprise tool that will help us later.

    .map({
      // For each source and its associated publishers, poll the build stream,
      // publishing new builds.
      case (selectedSource @ SelectedSource(_, source), publishers) =>
        // Get the initial build that the poller should start at.
        val initialBuild = initialState
          .get(selectedSource.selector)
          .map { number =>
            // The branch is irrelevant because only the number is
            // compared.
            Build(Branch.Stable, "", number, AssetBundle.empty)
          }

        source
          .poll(config.interval, initialBuild)
          // Tap into the poll stream to publish builds.
          .evalTap(pollTap(source, publishers) _)
          // Add the selected source to any emitted builds, so we have the
          // context when saving.
          .map((selectedSource, _))
    })

This monstrous map call turns each (source, publisher) tuple into a Stream[(SelectedSource, Either[Throwable, Poll])].

For each (source, publisher):

  1. Pattern match on this tuple (see the line with case) to get what we need out of the data structure.
  2. Get the initial build that the polling build stream should start at. Use the initial state that we initialized earlier.
  3. Start polling.
    • The poll call constructs an internal infinite build stream and watches it for changes, only emitting new builds.
    • This new stream emits Either[Throwable, Poll]. It can either be a Throwable (an error) or a Poll object, which represents a singular “poll result”. It contains the new build and the one before that.
  4. Use evalTap to tap into the stream of new builds. The pollTap does some internal work, then publishes the new build.
  5. Inject the source into a tuple alongside the build itself so we have some context when we join all of the streams together.
    .parJoinUnbounded

This method runs all of the streams concurrently. This is significant because we need access to all builds at the same time in order to save the latest ones to the state file.

    // Maintain a map of the latest builds for each source.
    .scan(Map[String, Int]()) {
      case (accumulator, (selectedSource, Right(Poll(build, _)))) =>
        accumulator + (selectedSource.selector -> build.number)
    }

The scan method accumulates a mapping of source string to latest build number, emitting each new mapping. If you are familiar with Array.prototype.reduce in JavaScript, it’s similar to that, only additionally emitting each new value:

const numbers = [1, 2, 3];

// 0 + 1 + 2 + 3 = 6
numbers.reduce((accumulator, number) => accumulator + number, 0);

// Imagine that JavaScript has a `Array.prototype.scan`...
numbers.scan((accumulator, number) => accumulator + number, 0);

// Returns [0, 0 + 1, 0 + 1 + 2, 0 + 1 + 2 + 3];
//         ↓
//         [0, 1, 3, 6];

Since this is an infinite stream, the scan call never actually “finishes” like it does in the JavaScript example above. Being able to apply a combinator that would make the most sense in a finite context into an infinite context is useful. It lets us model this behavior purely:

let latestBuilds = {};

while (true) {
  // ...

  for (const build of newBuild) {
    latestBuilds[build.selector] = build.number;
  }

  // ...
}

We’re almost done. Now, we encode the Map[String, Int] to a String and pipe it to our state file to be written on disk.

    .map(encodeStateStore)
    // Every time a new build is published, save the store into a file.
    .through(continuouslyOverwrite(stateFilePath))

The continuouslyOverwrite function returns an fs2.Pipe, which is a Stream => Stream. Instead of writing pipe(stream), we can call stream.through(pipe) instead. In this case, the pipe writes to a file, truncating it before each write.

For fun, let’s inspect the code that handles both encoding and decoding the state file:

def decodeStateStore(contents: String): Map[String, Int] =
  contents
    .linesIterator
    .collect {
      case s"$selector=$numberString" => (selector, numberString.toInt)
    }
    .toMap

def encodeStateStore(store: Map[String, Int]): String =
  store
    .map {
      case selector -> number => s"$selector=$number"
    }
    .mkString("\n")

Scala’s expressive syntax and featureful standard library really shines here.

Back to the stream:

    .compile
    .drain

These two calls convert the infinite stream into something that we can “run” (an F[Unit]).

And now, we just gotta use the for-comprehension sugar to “run” it:

  _ <- L.info(s"Scraping ${mapping.size} source(s): $mapping")

  // "Run" the "big stream".
  _ <- poll
} yield ()

So, there you go. The core source-to-build-to-publisher-to-save logic is modelled as an infinite Stream, with the combinators that modify it spanning over nearly 30 lines.

Interestingly, this entire process allowed me to stick with my mental model of the program from the beginning–that builds should originate from an infinite stream of builds (Stream[F, Build]). In chaos, no for loops are involved; just streams. In the source.poll call above, sleeps are inserted between scrapes using Stream.awakeDelay.

The infiniteness comes from this line of code in the FrontendSource class:

  def builds: Stream[F, Build] =
    Stream.repeatEval(scrape(variant))

Stream.repeatEval creates an infinite stream that repeatedly runs effects. In this case, scraping Discord for the relevant variant (Canary, PTB, or Stable).

Reflections

To reiterate, the point of this was to practice functional Scala. It has been an interesting journey for me as it has forced me to think in different ways. The combination of purity and static types has resulted in virtually no unexpected or strange bugs (well, yet, at least). It’s a unique way of programming that has some benefits.

I plan to go back and finish the books that I gave up on before.

Also, I still have more features to implement for chaos. It’s been a pleasure to apply these newfangled concepts to something that has ended up working quite well!

You can check out the source code here: slice/chaos