Apache Beam utilities.
Class |
|
Undocumented |
Class |
|
Undocumented |
Class |
|
Undocumented |
Function | beam |
Exports the given sample collection in the specified number shards via Apache Beam. |
Function | beam |
Imports the given samples into the dataset via Apache Beam. |
Function | beam |
Merges the given samples into the dataset via Apache Beam. |
Function | _pop |
Undocumented |
Exports the given sample collection in the specified number shards via Apache Beam.
This function is a parallelized alternative to
fiftyone.core.collections.SampleCollection.export
that effectively
performs the following sharded export in parallel:
for idx, (first, last) in enumerate(shards, 1): _kwargs = render_kwargs(kwargs, idx) sample_collection[first:last].export(**_kwargs)
Example:
from apache_beam.options.pipeline_options import PipelineOptions import fiftyone as fo import fiftyone.utils.beam as foub import fiftyone.zoo as foz dataset = foz.load_zoo_dataset("quickstart") # Use multithreading instead of the default multiprocessing options = PipelineOptions( runner="direct", direct_num_workers=10, direct_running_mode="multi_threading", ) foub.beam_export( dataset, num_shards=20, options=options, dataset_type=fo.types.TFObjectDetectionDataset, label_field="ground_truth", tf_records_path="/tmp/beam/tf.records-%05d-of-00020", )
Parameters | |
sample | a
fiftyone.core.collections.SampleCollection |
num | the number of shards to write |
options:None | a apache_beam.options.pipeline_options.PipelineOptions that configures how to run the pipeline. By default, the pipeline will be run via Beam's direct runner using multiprocessing |
verbose:False | whether to log the Beam pipeline's messages |
renderNone | a function that renders kwargs for the current shard. The function should have signature def render_kwargs(kwargs, idx) -> kwargs, where idx in [1, num_shards] is the shard index. By default, any string-valued arguments that contain format patterns like %05d will be rendered via value % idx |
**kwargs | keyword arguments for
fiftyone.core.collections.SampleCollection.export |
Imports the given samples into the dataset via Apache Beam.
This function is a parallelized alternative to
fiftyone.core.dataset.Dataset.add_samples
.
Note
The insertion order of the samples is not guaranteed.
Example:
import fiftyone as fo import fiftyone.utils.beam as foub samples = range(10000) def make_sample(idx): return fo.Sample(filepath="image%d.png" % idx, uuid=idx) # # Option 1: build the samples on the workers # dataset = fo.Dataset() foub.beam_import(dataset, samples, parse_fcn=make_sample) print(dataset) # # Option 2: build the samples in the main thread # # This is generally not preferred but may be necessary if your # ``parse_fcn`` is not serializable # dataset = fo.Dataset() samples = map(make_sample, samples) foub.beam_import(dataset, samples) print(dataset)
Parameters | |
dataset | a fiftyone.core.dataset.Dataset |
samples | an iterable of samples. If no parse_fcn is provided, these
must be fiftyone.core.sample.Sample instances. If a
parse_fcn is provided, these are passed to it for parsing |
parseNone | an optional function that converts elements of
samples to fiftyone.core.sample.Sample instances |
expandTrue | whether to dynamically add new sample fields encountered to the dataset schema. If False, an error is raised if a sample's schema is not a subset of the dataset schema |
dynamic:False | whether to declare dynamic attributes of embedded document fields that are encountered |
validate:True | whether to validate that the fields of each sample are compliant with the dataset schema before adding it |
options:None | a apache_beam.options.pipeline_options.PipelineOptions that configures how to run the pipeline. By default, the pipeline will be run via Beam's direct runner using threads |
verbose:False | whether to log the Beam pipeline's messages |
Merges the given samples into the dataset via Apache Beam.
This function is a parallelized alternative to
fiftyone.core.dataset.Dataset.merge_samples
.
Note
This function is only useful for merging in-memory samples into a
dataset. If you are merging a sample collection, simply call
fiftyone.core.dataset.Dataset.merge_samples
.
Example:
import fiftyone as fo import fiftyone.utils.beam as foub import fiftyone.zoo as foz dataset = foz.load_zoo_dataset("quickstart").clone() samples = iter(dataset.select_fields("predictions")) foub.beam_merge(dataset, samples, fields={"predictions": "predictions2"}) print(dataset.count("predictions.detections")) print(dataset.count("predictions2.detections"))
Parameters | |
dataset | a fiftyone.core.dataset.Dataset |
samples | an iterable of samples. If no parse_fcn is provided, these
must be fiftyone.core.sample.Sample instances. If a
parse_fcn is provided, these are passed to it for parsing |
parseNone | an optional function that converts elements of
samples to fiftyone.core.sample.Sample instances |
options:None | a apache_beam.options.pipeline_options.PipelineOptions that configures how to run the pipeline. By default, the pipeline will be run via Beam's direct runner using threads |
verbose:False | whether to log the Beam pipeline's messages |
**kwargs | keyword arguments for
fiftyone.core.dataset.Dataset.merge_samples |