io-connectors

I/O Connectors in Apache Beam

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "io-connectors" with this command: npx skills add apache/beam/apache-beam-io-connectors

I/O Connectors in Apache Beam

Overview

I/O connectors enable reading from and writing to external data sources. Beam provides 51+ Java I/O connectors and several Python connectors.

Java I/O Connectors Location

sdks/java/io/

Available Connectors

Category Connectors

Cloud Storage google-cloud-platform (BigQuery, Bigtable, Spanner, Pub/Sub, GCS), amazon-web-services2, azure, azure-cosmos

Databases jdbc, mongodb, cassandra, hbase, redis, neo4j, clickhouse, influxdb, singlestore, elasticsearch

Messaging kafka, pulsar, rabbitmq, amqp, jms, mqtt, solace

File Formats parquet, csv, json, xml, thrift, iceberg

Other snowflake, splunk, cdap, debezium, hadoop-format, kudu, solr, tika

Testing I/O Connectors

Unit Tests

./gradlew :sdks:java:io:kafka:test ./gradlew :sdks:java:io:jdbc:test

Integration Tests

On Direct Runner

./gradlew :sdks:java:io:google-cloud-platform:integrationTest

With Custom GCP Settings

./gradlew :sdks:java:io:google-cloud-platform:integrationTest
-PgcpProject=<project>
-PgcpTempRoot=gs://<bucket>/path

With Explicit Pipeline Options

./gradlew :sdks:java:io:jdbc:integrationTest
-DbeamTestPipelineOptions='["--runner=TestDirectRunner"]'

Integration Test Framework

Located at it/ directory:

  • it/common/

  • Common test utilities

  • it/google-cloud-platform/

  • GCP-specific test infrastructure

  • it/jdbc/

  • JDBC test infrastructure

  • it/kafka/

  • Kafka test infrastructure

  • it/testcontainers/

  • Testcontainers support

Writing Integration Tests

Basic Structure

@RunWith(JUnit4.class) public class MyIOIT { @Rule public TestPipeline readPipeline = TestPipeline.create(); @Rule public TestPipeline writePipeline = TestPipeline.create();

@Test public void testWriteAndRead() { // Write data writePipeline.apply(Create.of(testData)) .apply(MyIO.write().to(destination)); writePipeline.run().waitUntilFinish();

// Read and verify
PCollection&#x3C;String> results = readPipeline.apply(MyIO.read().from(destination));
PAssert.that(results).containsInAnyOrder(expectedData);
readPipeline.run().waitUntilFinish();

} }

Using TestPipeline

@Rule public TestPipeline pipeline = TestPipeline.create();

TestPipeline:

  • Blocks on run by default (on TestDataflowRunner)

  • Has 15-minute default timeout

  • Reads options from beamTestPipelineOptions system property

GCP I/O Connectors

BigQuery

// Read pipeline.apply(BigQueryIO.readTableRows().from("project:dataset.table"));

// Write data.apply(BigQueryIO.writeTableRows() .to("project:dataset.table") .withSchema(schema) .withWriteDisposition(WriteDisposition.WRITE_APPEND));

Pub/Sub

// Read pipeline.apply(PubsubIO.readStrings().fromTopic("projects/project/topics/topic"));

// Write data.apply(PubsubIO.writeStrings().to("projects/project/topics/topic"));

Cloud Storage (TextIO)

// Read pipeline.apply(TextIO.read().from("gs://bucket/path/*.txt"));

// Write data.apply(TextIO.write().to("gs://bucket/output").withSuffix(".txt"));

Kafka Connector

// Read pipeline.apply(KafkaIO.<String, String>read() .withBootstrapServers("localhost:9092") .withTopic("topic") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class));

// Write data.apply(KafkaIO.<String, String>write() .withBootstrapServers("localhost:9092") .withTopic("topic") .withKeySerializer(StringSerializer.class) .withValueSerializer(StringSerializer.class));

JDBC Connector

// Read pipeline.apply(JdbcIO.<Row>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration .create("org.postgresql.Driver", "jdbc:postgresql://host/db")) .withQuery("SELECT * FROM table"));

// Write data.apply(JdbcIO.<Row>write() .withDataSourceConfiguration(config) .withStatement("INSERT INTO table VALUES (?, ?)"));

Python I/O Location

sdks/python/apache_beam/io/

Common Python I/Os

  • textio

  • Text files

  • fileio

  • General file operations

  • avroio

  • Avro files

  • parquetio

  • Parquet files

  • gcp/

  • GCP connectors (BigQuery, Pub/Sub, Datastore, etc.)

Cross-language I/O

Beam supports using I/O connectors from one SDK in another via the expansion service.

Start Java expansion service

./gradlew :sdks:java:io:expansion-service:runExpansionService

Creating New Connectors

See Developing I/O connectors

Key components:

  • Source - Reads data (bounded or unbounded)

  • Sink - Writes data

  • Read/Write transforms - User-facing API

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

gradle-build

No summary provided by upstream source.

Repository SourceNeeds Review
General

license-compliance

No summary provided by upstream source.

Repository SourceNeeds Review
General

contributing

No summary provided by upstream source.

Repository SourceNeeds Review
General

ci-cd

No summary provided by upstream source.

Repository SourceNeeds Review