fp-ts pipe and flow composition

fp-ts Pipe and Flow Composition

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "fp-ts pipe and flow composition" with this command: npx skills add whatiskadudoing/fp-ts-skills/whatiskadudoing-fp-ts-skills-fp-ts-pipe-and-flow-composition

fp-ts Pipe and Flow Composition

Function composition is the heart of functional programming. fp-ts provides two powerful utilities for composing functions: pipe and flow . This guide covers everything you need to build elegant, type-safe pipelines.

Imports

import { pipe, flow, identity } from 'fp-ts/function'

Understanding Pipe vs Flow

pipe: Immediate Execution with a Starting Value

pipe takes a value and passes it through a series of functions, executing immediately.

// pipe(value, fn1, fn2, fn3) === fn3(fn2(fn1(value)))

const result = pipe( 5, n => n * 2, // 10 n => n + 1, // 11 n => Result: ${n} // "Result: 11" )

Use pipe when:

  • You have a value and want to transform it immediately

  • Building one-off transformations

  • Working with fp-ts data types (Option, Either, Task, etc.)

  • You need readable, top-to-bottom data flow

flow: Creating Reusable Pipelines

flow composes functions into a new function without executing them.

// flow(fn1, fn2, fn3) === (x) => fn3(fn2(fn1(x)))

const processNumber = flow( (n: number) => n * 2, n => n + 1, n => Result: ${n} )

processNumber(5) // "Result: 11" processNumber(10) // "Result: 21"

Use flow when:

  • Creating reusable transformations

  • Defining functions to pass as callbacks

  • Building composable utilities

  • You don't have the input value yet

Quick Comparison

Aspect pipe flow

Execution Immediate Deferred

First argument Value Function

Returns Transformed value New function

Use case Transform data now Create reusable transform

// These are equivalent: const result1 = pipe(5, double, increment, toString) const result2 = flow(double, increment, toString)(5)

Working with Different Arities

Unary Functions (Single Argument)

Most fp-ts operations return unary functions, making them ideal for composition.

import { pipe } from 'fp-ts/function' import * as A from 'fp-ts/Array'

const numbers = [1, 2, 3, 4, 5]

const result = pipe( numbers, A.filter(n => n % 2 === 0), A.map(n => n * 10) ) // [20, 40]

Handling Multi-Argument Functions

When you need to use functions with multiple arguments, use currying or partial application.

// Method 1: Inline arrow function const add = (a: number, b: number) => a + b

pipe( 5, n => add(n, 10), // Wrap in arrow function n => n * 2 )

// Method 2: Curried version const addCurried = (b: number) => (a: number) => a + b

pipe( 5, addCurried(10), // Clean composition n => n * 2 )

// Method 3: Using fp-ts curry utilities import { curry2 } from 'fp-ts-std/Function'

const addC = curry2(add)

pipe( 5, addC(10), n => n * 2 )

Data Transformation Pipelines

Array Transformations

import { pipe } from 'fp-ts/function' import * as A from 'fp-ts/Array' import * as NEA from 'fp-ts/NonEmptyArray'

interface User { id: number name: string age: number active: boolean }

const users: User[] = [ { id: 1, name: 'Alice', age: 30, active: true }, { id: 2, name: 'Bob', age: 25, active: false }, { id: 3, name: 'Charlie', age: 35, active: true }, ]

// Complex transformation pipeline const activeUserNames = pipe( users, A.filter(u => u.active), A.map(u => u.name), A.sort(S.Ord) ) // ['Alice', 'Charlie']

// With grouping import * as R from 'fp-ts/Record' import * as S from 'fp-ts/string'

const usersByActiveStatus = pipe( users, A.groupBy(u => u.active ? 'active' : 'inactive') ) // { active: [...], inactive: [...] }

Record/Object Transformations

import { pipe } from 'fp-ts/function' import * as R from 'fp-ts/Record'

const scores: Record<string, number> = { alice: 85, bob: 92, charlie: 78 }

const adjustedScores = pipe( scores, R.map(score => score * 1.1), R.filter(score => score >= 85) )

String Transformations

import { pipe, flow } from 'fp-ts/function' import * as S from 'fp-ts/string'

const normalizeString = flow( S.trim, S.toLowerCase, s => s.replace(/\s+/g, '-') )

const slug = normalizeString(' Hello World ') // 'hello-world'

// With validation import * as O from 'fp-ts/Option'

const safeSlug = flow( O.fromPredicate((s: string) => s.length > 0), O.map(normalizeString) )

Composing with fp-ts Data Types

With Option

import { pipe } from 'fp-ts/function' import * as O from 'fp-ts/Option'

interface Config { database?: { host?: string port?: number } }

const getConnectionString = (config: Config): O.Option<string> => pipe( O.fromNullable(config.database), O.flatMap(db => O.fromNullable(db.host)), O.map(host => postgresql://${host}) )

// Chaining multiple Option operations const findUser = (id: number): O.Option<User> => { /* ... / } const getUserEmail = (user: User): O.Option<string> => { / ... / } const validateEmail = (email: string): O.Option<string> => { / ... */ }

const getValidatedEmail = (userId: number): O.Option<string> => pipe( findUser(userId), O.flatMap(getUserEmail), O.flatMap(validateEmail) )

With Either

import { pipe } from 'fp-ts/function' import * as E from 'fp-ts/Either'

type ValidationError = { type: 'validation'; message: string } type NetworkError = { type: 'network'; message: string } type AppError = ValidationError | NetworkError

const validateAge = (age: number): E.Either<ValidationError, number> => age >= 0 && age <= 150 ? E.right(age) : E.left({ type: 'validation', message: 'Invalid age' })

const validateName = (name: string): E.Either<ValidationError, string> => name.length >= 2 ? E.right(name) : E.left({ type: 'validation', message: 'Name too short' })

// Sequential validation (fail on first error) const validateUser = (name: string, age: number) => pipe( E.Do, E.bind('name', () => validateName(name)), E.bind('age', () => validateAge(age)), E.map(({ name, age }) => ({ name, age, createdAt: new Date() })) )

// Accumulating errors with Validation import * as A from 'fp-ts/Apply' import * as NEA from 'fp-ts/NonEmptyArray'

type ValidationErrors = NEA.NonEmptyArray<string> type Validation<A> = E.Either<ValidationErrors, A>

const applicativeValidation = E.getApplicativeValidation(NEA.getSemigroup<string>())

const validateUserAll = (name: string, age: number) => pipe( A.sequenceS(applicativeValidation)({ name: validateName(name), age: validateAge(age) }), E.map(({ name, age }) => ({ name, age })) )

With Task and TaskEither

import { pipe } from 'fp-ts/function' import * as T from 'fp-ts/Task' import * as TE from 'fp-ts/TaskEither'

// Composing async operations const fetchUser = (id: number): TE.TaskEither<Error, User> => TE.tryCatch( () => fetch(/api/users/${id}).then(r => r.json()), (error) => new Error(String(error)) )

const fetchUserPosts = (userId: number): TE.TaskEither<Error, Post[]> => TE.tryCatch( () => fetch(/api/users/${userId}/posts).then(r => r.json()), (error) => new Error(String(error)) )

const getUserWithPosts = (id: number): TE.TaskEither<Error, UserWithPosts> => pipe( fetchUser(id), TE.flatMap(user => pipe( fetchUserPosts(user.id), TE.map(posts => ({ ...user, posts })) ) ) )

// Parallel execution import * as A from 'fp-ts/Array'

const fetchAllUsers = (ids: number[]): TE.TaskEither<Error, User[]> => pipe( ids, A.map(fetchUser), A.sequence(TE.ApplicativePar) // Parallel execution )

With Reader and ReaderTaskEither

import { pipe } from 'fp-ts/function' import * as RTE from 'fp-ts/ReaderTaskEither'

interface Dependencies { userRepo: UserRepository emailService: EmailService logger: Logger }

const getUser = (id: number): RTE.ReaderTaskEither<Dependencies, Error, User> => pipe( RTE.ask<Dependencies>(), RTE.flatMapTaskEither(deps => deps.userRepo.findById(id)) )

const sendWelcomeEmail = (user: User): RTE.ReaderTaskEither<Dependencies, Error, void> => pipe( RTE.ask<Dependencies>(), RTE.flatMapTaskEither(deps => deps.emailService.send(user.email, 'Welcome!')) )

const onboardUser = (id: number): RTE.ReaderTaskEither<Dependencies, Error, User> => pipe( getUser(id), RTE.tap(sendWelcomeEmail), RTE.tap(user => RTE.fromTask(deps => deps.logger.info(Onboarded: ${user.name})) ) )

Best Practices for Readable Pipelines

  1. Keep Functions Small and Focused

// Good: Each step does one thing const processOrder = pipe( order, validateOrder, calculateTotals, applyDiscounts, formatForDisplay )

// Avoid: Large inline functions const processOrder = pipe( order, o => { // 50 lines of validation, calculation, and formatting } )

  1. Extract Named Functions for Clarity

// Good: Named functions explain intent const isAdult = (user: User) => user.age >= 18 const formatName = (user: User) => ${user.firstName} ${user.lastName}

const adultNames = pipe( users, A.filter(isAdult), A.map(formatName) )

// Less clear: Anonymous functions inline const adultNames = pipe( users, A.filter(u => u.age >= 18), A.map(u => ${u.firstName} ${u.lastName}) )

  1. Use flow for Reusable Transformations

// Define reusable pipelines const normalizeEmail = flow( S.trim, S.toLowerCase )

const validateEmailFormat = flow( O.fromPredicate((s: string) => s.includes('@')), O.filter(s => s.length >= 5) )

// Compose them const processEmail = flow( normalizeEmail, validateEmailFormat )

  1. Group Related Operations

// Good: Logical grouping with comments const processUsers = pipe( users, // Filter A.filter(isActive), A.filter(isVerified), // Transform A.map(enrichWithMetadata), A.map(formatForAPI), // Sort A.sort(byCreatedAt) )

  1. Handle Errors at Appropriate Levels

// Good: Handle errors where you can meaningfully respond const getUserSafely = (id: number) => pipe( fetchUser(id), TE.mapLeft(toAppError), TE.orElse(error => error.type === 'not_found' ? TE.right(defaultUser) : TE.left(error) ) )

  1. Use Do Notation for Complex Dependencies

// Good: Clear when steps depend on previous results const createOrder = pipe( E.Do, E.bind('user', () => validateUser(userData)), E.bind('items', () => validateItems(itemsData)), E.bind('shipping', ({ user }) => calculateShipping(user.address)), E.bind('total', ({ items, shipping }) => calculateTotal(items, shipping)), E.map(({ user, items, total }) => ({ user, items, total })) )

Common Composition Patterns

Pattern 1: Transform and Validate

const processInput = flow( S.trim, O.fromPredicate(s => s.length > 0), O.map(S.toLowerCase), O.filter(isValidFormat) )

Pattern 2: Fetch, Transform, Persist

const syncUser = (id: number) => pipe( fetchExternalUser(id), TE.map(transformToInternalUser), TE.flatMap(saveUser), TE.map(user => ({ success: true, user })) )

Pattern 3: Parallel with Aggregation

const fetchDashboardData = pipe( TE.Do, TE.apS('users', fetchUsers()), TE.apS('orders', fetchOrders()), TE.apS('metrics', fetchMetrics()), TE.map(({ users, orders, metrics }) => buildDashboard(users, orders, metrics) ) )

Pattern 4: Sequential with Early Exit

const processPayment = (paymentData: PaymentData) => pipe( validatePayment(paymentData), TE.flatMap(checkFunds), TE.flatMap(reserveFunds), TE.flatMap(processTransaction), TE.flatMap(sendConfirmation) )

Pattern 5: Fallback Chain

const getConfig = pipe( getEnvConfig(), O.alt(() => getFileConfig()), O.alt(() => getDefaultConfig()), O.getOrElse(() => hardcodedDefaults) )

Pattern 6: Conditional Branching

const processUser = (user: User) => pipe( user, O.fromPredicate(isAdmin), O.match( () => processRegularUser(user), () => processAdminUser(user) ) )

Pattern 7: Accumulate Results

const processAll = (items: Item[]) => pipe( items, A.map(processItem), A.separate, // Split into { left: errors[], right: successes[] } ({ left: errors, right: successes }) => ({ successes, errors, successRate: successes.length / items.length }) )

Type Inference Tips

Let TypeScript Infer When Possible

// Good: Types flow through const result = pipe( users, A.filter(u => u.active), // TypeScript knows u is User A.map(u => u.name) // TypeScript knows result is string[] )

// Only annotate when necessary const processNumber = flow( (n: number) => n * 2, // First function needs annotation n => n + 1, // Rest are inferred String )

Use Explicit Types for Public APIs

// Good: Clear contract for public function const processOrder: (order: Order) => E.Either<OrderError, ProcessedOrder> = flow( validateOrder, E.flatMap(calculateTotals), E.map(formatOrder) )

Performance Considerations

pipe vs flow Performance

  • pipe has minimal overhead - essentially just function calls

  • flow creates a new function, slight overhead on creation but not on execution

  • For hot paths with known values, pipe is marginally faster

  • For reusable functions, flow avoids recreation

Avoiding Unnecessary Intermediate Arrays

// Less efficient: Multiple array iterations const result = pipe( largeArray, A.filter(predicate1), A.filter(predicate2), A.map(transform) )

// More efficient: Combine predicates const result = pipe( largeArray, A.filter(x => predicate1(x) && predicate2(x)), A.map(transform) )

// Or use filterMap for filter + map const result = pipe( largeArray, A.filterMap(x => predicate1(x) && predicate2(x) ? O.some(transform(x)) : O.none ) )

Debugging Pipelines

Using tap for Side Effects

import { tap } from 'fp-ts/function'

const debugPipeline = pipe( data, tap(x => console.log('Step 1:', x)), transform1, tap(x => console.log('Step 2:', x)), transform2 )

Using trace Helper

const trace = <A>(label: string) => (a: A): A => { console.log(label, a) return a }

const result = pipe( data, trace('input'), transform1, trace('after transform1'), transform2, trace('final') )

Summary

Use Case Use pipe

Use flow

Transform a value now Yes No

Create reusable function No Yes

Pass as callback No Yes

One-off transformation Yes No

Build utility library No Yes

fp-ts operations chain Yes Either

Remember:

  • pipe(value, f, g) executes immediately

  • flow(f, g) returns a function for later

  • Keep pipeline steps small and focused

  • Extract named functions for clarity

  • Use Do notation for complex dependencies

  • Let TypeScript infer types when possible

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

fp-ts-backend

No summary provided by upstream source.

Repository SourceNeeds Review
General

fp-immutable

No summary provided by upstream source.

Repository SourceNeeds Review
General

fp-refactor

No summary provided by upstream source.

Repository SourceNeeds Review
General

managing side effects functionally

No summary provided by upstream source.

Repository SourceNeeds Review