nucleus.model

Model

A model that can be used to upload predictions to a dataset.

class nucleus.model.Model(model_id, name, reference_id, metadata, client, bundle_name=None, tags=None, trained_slice_ids=None)

A model that can be used to upload predictions to a dataset.

By uploading model predictions to Nucleus, you can compare your predictions to ground truth annotations and discover problems with your Models or Dataset.

You can also upload predictions for unannotated images, letting you query them based on model predictions. This can help you prioritize which unlabeled data to label next.

Within Nucleus, Models work in the following way:

  1. You first create a Model. You can do this just once and reuse the model on multiple datasets.

  2. You then upload predictions to a dataset.

  3. Trigger calculation of metrics in order to view model debugging insights.

The above steps above will allow you to visualize model performance within Nucleus, or compare multiple models that have been run on the same Dataset.

Note that you can always add more predictions to a dataset, but then you will need to re-run the calculation of metrics in order to have them be correct.

import nucleus

client = nucleus.NucleusClient(YOUR_SCALE_API_KEY)
dataset = client.get_dataset(YOUR_DATASET_ID)

prediction_1 = nucleus.BoxPrediction(
    label="label",
    x=0,
    y=0,
    width=10,
    height=10,
    reference_id="1",
    confidence=0.9,
    class_pdf={"label": 0.9, "other_label": 0.1},
)
prediction_2 = nucleus.BoxPrediction(
    label="label",
    x=0,
    y=0,
    width=10,
    height=10,
    reference_id="2",
    confidence=0.2,
    class_pdf={"label": 0.2, "other_label": 0.8},
)

model = client.create_model(
    name="My Model", reference_id="My-CNN", metadata={"timestamp": "121012401"}
)

# For small ingestions, we recommend synchronous ingestion
response = dataset.upload_predictions(model, [prediction_1, prediction_2])

# For large ingestions, we recommend asynchronous ingestion
job = dataset.upload_predictions(
    model, [prediction_1, prediction_2], asynchronous=True
)
# Check current status
job.status()
# Sleep until ingestion is done
job.sleep_until_complete()
# Check errors
job.errors()

dataset.calculate_evaluation_metrics(model)

Models cannot be instantiated directly and instead must be created via API endpoint, using NucleusClient.create_model().

add_tags(tags)

Tag the model with custom tag names.

import nucleus
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
model = client.list_models()[0]

model.add_tags(["tag_A", "tag_B"])
Parameters:

tags (List[str]) – list of tag names

add_trained_slice_ids(slice_ids)

Add trained slice id(s) to the model.

import nucleus
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
model = client.list_models()[0]

model.add_trained_slice_ids(["slc_...", "slc_..."])
Parameters:

slice_ids (List[str]) – list of trained slice ids

evaluate(scenario_test_names)

Evaluates this on the specified Unit Tests.

import nucleus
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
model = client.list_models()[0]
scenario_test = client.validate.create_scenario_test(
    "sample_scenario_test", "YOUR_SLICE_ID"
)

model.evaluate(["sample_scenario_test"])
Parameters:

scenario_test_names (List[str]) – list of unit tests to evaluate

Returns:

AsyncJob object of evaluation job

Return type:

nucleus.async_job.AsyncJob

classmethod from_json(payload, client)

Instantiates model object from schematized JSON dict payload.

Parameters:

payload (dict)

remove_tags(tags)

Remove tag(s) from the model.

import nucleus
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
model = client.list_models()[0]

model.remove_tags(["tag_x"])
Parameters:

tags (List[str]) – list of tag names to remove

remove_trained_slice_ids(slide_ids)

Remove trained slice id(s) from the model.

import nucleus
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
model = client.list_models()[0]

model.remove_trained_slice_ids(["slc_...", "slc_..."])
Parameters:
  • slice_ids – list of trained slice ids to remove

  • slide_ids (List[str])

run(dataset_id, model_run_name, slice_id)

Runs inference on the bundle associated with the model on the dataset.

import nucleus
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
model = client.list_models()[0]

model.run("ds_123456")
Parameters:
  • dataset_id (str) – The ID of the dataset to run inference on.

  • model_run_name (str) – The name of the model run.

  • slice_id (Optional[str]) – The ID of the slice of the dataset to run inference on.

Returns:

The ID of the AsyncJob used to track job progress.

Return type:

job_id