Task Engine



Hof’s task engine allows you to write dynamic workflows with CUE. Build modular and composable DAGs that can

  • work with files, call APIs, can prompt for user input, and much more
  • run scripts and containers, use any language for a custom task
  • run a server that calls a workflow per request or bulk process a set of inputs
  • create additional task based on the results of other tasks

Hof’s task engine is an extension of cue/flow with

  • extra tasks for key-value cache and advanced structural operations on CUE values
  • additional control over parallel execution, synchronization of tasks, and message passing
  • allows for workflows along side regular CUE files so they can be imported and shared as modules

Example


A workflow for calling an LLM from the command line.

hof flow vertex.cue -t msg='What is the CUE language?'

package examples

import "strings"

// inputs supplied via tags
inputs: {
  model: string @tag(model)
  prompt: string @tag(prompt)
  msg: string @tag(msg)
}

vertex_chat: {
  @flow() // define a flow

  steps: {

    // task: get auth from external command
    gcp: {
      @task(os.Exec)
      cmd: ["gcloud", "auth", "print-access-token"]
      stdout: string
      key:    strings.TrimSpace(stdout)
    }

    // task: api call via reusable task
    call: _gemini & {
      apikey: gcp.key

      model: inputs.model
      prompt: inputs.prompt
      msg: inputs.msg

      resp: body: _
    }

    // task: print text to std output
    out: {
      @task(os.Stdout)
      text: call.final.text
    }

  }
}

// reusable task
_gemini: {
  @task(api.Call)

  apikey: string
  model: string | *"gemini-1.5-pro-001:generateContent"
  prompt: string | *"You are an assistant who is very concise when responding."
  msg: string

  req: {
    host: "https://us-central1-aiplatform.googleapis.com"
    path: "/v1/projects/hof-io--develop/locations/us-central1/publishers/google/models/\(model)"
    headers: {
      "Content-Type": "application/json"
      Authorization:  "Bearer \(apikey)"
    }
    data: {
      systemInstruction: {
        role: "MODEL"
        parts: [{
          text: prompt
        }]
      }

      contents: [{
        role: "USER"
        parts: [{
          text: msg
        }]
      }]
    }
    method: "POST"
  }

  resp: {
    body: {}
  }
  // @print(resp.body)

  // task-local ETL
  final: {
    cand: resp.body.candidates[0]
    text: cand.content.parts[0].text
  }
}

Wrapping the workflow in an API server.

hof flow @server (with both CUE files in the same directory, we can omit the filenames)

server.cue

package examples

import "strings"

server: {
  @flow(server)

  // task: get auth from external command
  gcp: {
    @task(os.Exec)
    cmd: ["gcloud", "auth", "print-access-token"]
    stdout: string
    key:    strings.TrimSpace(stdout)
  }

  run: {
    @task(api.Serve)

    port: "8080"
    routes: {

      // simple hello route
      "/hello": {
        method: "GET"
        resp: {
          status: 200
          body: "hallo chat!"
        }
      }

      // echo request object back as json
      "/jsonecho": {
        method: ["GET", "POST"]
        req: body: {}
        resp: json: req
      }

      // our gemini call warpped in an API
      "/chat": {
        @flow()

        method: "POST"

        // input schema
        req: {
          body: {
            msg: string
          }
        }

        // task: api call via reusable task
        call: _gemini & {
          apikey: gcp.key
          msg: req.body.msg
        }

        // the response to user
        resp: {
          status: 200
          body: call.final.text
        }
      }

    }
  }
}

We now have a workflow we can call from the CLI or as an API.

Command

hof flow help

run workflows and tasks powered by CUE

Usage:
  hof flow [cue files...] [@flow/name...] [+key=value] [flags]
  hof flow [command]

Aliases:
  flow, f

Available Commands:
  list        print available flows

Flags:
  -B, --bulk string        exprs for inputs to run workflow in bulk mode
  -F, --flow stringArray   flow labels to match and run
  -h, --help               help for flow
  -P, --parallel int       global flow parallelism (default 1)
      --progress           print task progress as it happens

Global Flags:
  ...

Tasks & Schemas

You can find the schema and example for all tasks in the hof/flow reference section

  • api
    • Call
    • Serve
  • csp (communicating sequential processes)
    • Chan
    • Send
    • Recv
  • cue
    • Format (print incomplete to concrete CUE values)
  • gen (generate random values)
    • Seed
    • Now
    • Str
    • Int
    • Float
    • Norm
    • UUID
    • CUID
    • Slug
  • hof
    • Template (render a hof text/template)
  • kv
    • Mem (in-memory cache)
  • os
    • Exec
    • FileLock
    • FileUnlock
    • Getenv
    • Glob
    • Mkdir
    • ReadFile
    • ReadGlobs
    • Sleep
    • Stdin
    • Stdout
    • Watch
    • WriteFile
  • prompt
    • Prompt (interactive user prompts, like creators)
  • st (structural)
    • Mask
    • Pick
    • Insert
    • Replace
    • Upsert
    • Diff
    • Patch

Examples

You can find many examples through hof’s codebase and other projects