module documentation

Apache Beam utilities.

Copyright 2017-2025, Voxel51, Inc.

Class ExportBatch Undocumented
Class ImportBatch Undocumented
Class MergeBatch Undocumented
Function beam_export Exports the given sample collection in the specified number shards via Apache Beam.
Function beam_import Imports the given samples into the dataset via Apache Beam.
Function beam_merge Merges the given samples into the dataset via Apache Beam.
Function _pop_first Undocumented
def beam_export(sample_collection, num_shards, options=None, verbose=False, render_kwargs=None, **kwargs): (source)

Exports the given sample collection in the specified number shards via Apache Beam.

This function is a parallelized alternative to fiftyone.core.collections.SampleCollection.export that effectively performs the following sharded export in parallel:

for idx, (first, last) in enumerate(shards, 1):
    _kwargs = render_kwargs(kwargs, idx)
    sample_collection[first:last].export(**_kwargs)

Example:

from apache_beam.options.pipeline_options import PipelineOptions

import fiftyone as fo
import fiftyone.utils.beam as foub
import fiftyone.zoo as foz

dataset = foz.load_zoo_dataset("quickstart")

# Use multithreading instead of the default multiprocessing
options = PipelineOptions(
    runner="direct",
    direct_num_workers=10,
    direct_running_mode="multi_threading",
)

foub.beam_export(
    dataset,
    num_shards=20,
    options=options,
    dataset_type=fo.types.TFObjectDetectionDataset,
    label_field="ground_truth",
    tf_records_path="/tmp/beam/tf.records-%05d-of-00020",
)
Parameters
sample_collectiona fiftyone.core.collections.SampleCollection
num_shardsthe number of shards to write
options:Nonea apache_beam.options.pipeline_options.PipelineOptions that configures how to run the pipeline. By default, the pipeline will be run via Beam's direct runner using multiprocessing
verbose:Falsewhether to log the Beam pipeline's messages
render_kwargs:Nonea function that renders kwargs for the current shard. The function should have signature def render_kwargs(kwargs, idx) -> kwargs, where idx in [1, num_shards] is the shard index. By default, any string-valued arguments that contain format patterns like %05d will be rendered via value % idx
**kwargskeyword arguments for fiftyone.core.collections.SampleCollection.export
def beam_import(dataset, samples, parse_fcn=None, expand_schema=True, dynamic=False, validate=True, options=None, verbose=False): (source)

Imports the given samples into the dataset via Apache Beam.

This function is a parallelized alternative to fiftyone.core.dataset.Dataset.add_samples.

Note

The insertion order of the samples is not guaranteed.

Example:

import fiftyone as fo
import fiftyone.utils.beam as foub

samples = range(10000)

def make_sample(idx):
    return fo.Sample(filepath="image%d.png" % idx, uuid=idx)

#
# Option 1: build the samples on the workers
#

dataset = fo.Dataset()

foub.beam_import(dataset, samples, parse_fcn=make_sample)
print(dataset)

#
# Option 2: build the samples in the main thread
#
# This is generally not preferred but may be necessary if your
# ``parse_fcn`` is not serializable
#

dataset = fo.Dataset()

samples = map(make_sample, samples)

foub.beam_import(dataset, samples)
print(dataset)
Parameters
dataseta fiftyone.core.dataset.Dataset
samplesan iterable of samples. If no parse_fcn is provided, these must be fiftyone.core.sample.Sample instances. If a parse_fcn is provided, these are passed to it for parsing
parse_fcn:Nonean optional function that converts elements of samples to fiftyone.core.sample.Sample instances
expand_schema:Truewhether to dynamically add new sample fields encountered to the dataset schema. If False, an error is raised if a sample's schema is not a subset of the dataset schema
dynamic:Falsewhether to declare dynamic attributes of embedded document fields that are encountered
validate:Truewhether to validate that the fields of each sample are compliant with the dataset schema before adding it
options:Nonea apache_beam.options.pipeline_options.PipelineOptions that configures how to run the pipeline. By default, the pipeline will be run via Beam's direct runner using threads
verbose:Falsewhether to log the Beam pipeline's messages
def beam_merge(dataset, samples, parse_fcn=None, options=None, verbose=False, **kwargs): (source)

Merges the given samples into the dataset via Apache Beam.

This function is a parallelized alternative to fiftyone.core.dataset.Dataset.merge_samples.

Note

This function is only useful for merging in-memory samples into a dataset. If you are merging a sample collection, simply call fiftyone.core.dataset.Dataset.merge_samples.

Example:

import fiftyone as fo
import fiftyone.utils.beam as foub
import fiftyone.zoo as foz

dataset = foz.load_zoo_dataset("quickstart").clone()

samples = iter(dataset.select_fields("predictions"))

foub.beam_merge(dataset, samples, fields={"predictions": "predictions2"})

print(dataset.count("predictions.detections"))
print(dataset.count("predictions2.detections"))
Parameters
dataseta fiftyone.core.dataset.Dataset
samplesan iterable of samples. If no parse_fcn is provided, these must be fiftyone.core.sample.Sample instances. If a parse_fcn is provided, these are passed to it for parsing
parse_fcn:Nonean optional function that converts elements of samples to fiftyone.core.sample.Sample instances
options:Nonea apache_beam.options.pipeline_options.PipelineOptions that configures how to run the pipeline. By default, the pipeline will be run via Beam's direct runner using threads
verbose:Falsewhether to log the Beam pipeline's messages
**kwargskeyword arguments for fiftyone.core.dataset.Dataset.merge_samples
def _pop_first(x): (source)

Undocumented