Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ on:
jobs:
semantic:
uses: influxdata/validate-semantic-github-messages/.github/workflows/semantic.yml@main
with:
# When true:
# If there is one commit, only validate its commit message (and not the PR title).
# Else validate PR title only (and skip commit messages).
CHECK_PR_TITLE_OR_ONE_COMMIT: true
22 changes: 13 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions k8s/operator/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ resources:
# Additionally ensure that ./k8s/operator/manifests/operator.yaml uses Always as the imagePullPolicy
#
images:
- name: keramik/operator
newName: public.ecr.aws/r5b3e0r5/3box/keramik-operator
newTag: latest
- name: samika98/operator
newName: samika98/operator
newTag: lgen

# Uncomment for development
#
Expand Down
4 changes: 2 additions & 2 deletions k8s/operator/manifests/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ spec:
{}
containers:
- name: keramik-operator
image: "keramik/operator"
imagePullPolicy: Always # Should be IfNotPresent when using imageTag: dev, but Always if using imageTag: latest
image: "samika98/operator"
imagePullPolicy: IfNotPresent # Should be IfNotPresent when using imageTag: dev, but Always if using imageTag: latest
command:
- "/usr/bin/keramik-operator"
- "daemon"
Expand Down
43 changes: 43 additions & 0 deletions keramik/src/ipfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,46 @@ spec:
commands:
- ipfs config --json Swarm.RelayClient.Enabled false
```

## Migration from Kubo to Ceramic One

A Kubo blockstore can be migrated to Ceramic One by specifying the migration command in the IPFS configuration.

Example [network config](./setup_network.md) that uses Go based IPFS (i.e. Kubo) with its defaults for Ceramic (including a default
blockstore path of `/data/ipfs`) and the Ceramic network set to `dev-unstable`.

```yaml
apiVersion: "keramik.3box.io/v1alpha1"
kind: Network
metadata:
name: basic-network
spec:
replicas: 5
ceramic:
- ipfs:
go: {}
networkType: dev-unstable
```

Example [network config](./setup_network.md) that uses Ceramic One and specifies what migration command to run before
starting up the node.

```yaml
apiVersion: "keramik.3box.io/v1alpha1"
kind: Network
metadata:
name: basic-network
spec:
replicas: 5
ceramic:
- ipfs:
rust:
migrationCmd:
- from-ipfs
- -i
- /data/ipfs/blocks
- -o
- /data/ipfs/
- --network
- dev-unstable
```
4 changes: 3 additions & 1 deletion keramik/src/simulation.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ To run a simulation, first define a simulation. Available simulation types are
- `recon-event-sync` - A simulation that creates events for Recon to sync at a fixed rate (~300/s by default). Designed for a 2 node network but should work on any.
- `cas-benchmark` - A simulation that benchmarks the CAS network.
- `cas-anchoring-benchmark` - A simulation that benchmarks the Ceramic with anchoring enabled.
- `cas-benchmark` - A simulation that benchmarks the CAS network.
- `cas-anchoring-benchmark` - A simulation that benchmarks the Ceramic with anchoring enabled.

Using one of these scenarios, we can then define the configuration for that scenario:

Expand Down Expand Up @@ -152,7 +154,7 @@ spec:
```

```shell
kubectl apply -f custom-cas-api.yaml
kubectl apply -f custom-ipfs.yaml
```

### Example Custom Simulation for Ceramic Anchoring Benchmark
Expand Down
193 changes: 193 additions & 0 deletions operator/src/lgen/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use std::{sync::Arc, time::Duration};

use futures::stream::StreamExt;
use k8s_openapi::api::{batch::v1::Job};
use kube::{
api::{Patch, PatchParams},
client::Client,
core::object::HasSpec,
runtime::Controller,
Api,
};
use kube::{
runtime::{
controller::Action,
watcher::{self, Config},
},
Resource, ResourceExt,
};
use opentelemetry::{global, KeyValue};
use rand::{distributions::Alphanumeric, thread_rng, Rng, RngCore};

use tracing::{debug, error, info};

use crate::{
labels::MANAGED_BY_LABEL_SELECTOR, lgen::{
job::{JobConfig, JobImageConfig, job_spec}, spec::{LoadGenerator, LoadGeneratorStatus},
}, simulation::controller::{get_num_peers, monitoring_ready}, utils::Clock
};

use crate::network::{
ipfs_rpc::{HttpRpcClient, IpfsRpcClient},
};


use crate::utils::{apply_job, apply_service, apply_stateful_set, Context};

pub const LOAD_GENERATOR_JOB_NAME: &str = "load-gen-job";

/// Handle errors during reconciliation.
fn on_error(
_network: Arc<LoadGenerator>,
_error: &Error,
_context: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Action {
Action::requeue(Duration::from_secs(5))
}

/// Errors produced by the reconcile function.
#[derive(Debug, thiserror::Error)]
enum Error {
#[error("App error: {source}")]
App {
#[from]
source: anyhow::Error,
},
#[error("Kube error: {source}")]
Kube {
#[from]
source: kube::Error,
},
}

/// Start a controller for the LoadGenerator CRD.
pub async fn run() {
let k_client = Client::try_default().await.unwrap();
let context = Arc::new(
Context::new(k_client.clone(), HttpRpcClient).expect("should be able to create context"),
);

let load_generators: Api<LoadGenerator> = Api::all(k_client.clone());
let jobs = Api::<Job>::all(k_client.clone());

Controller::new(load_generators.clone(), Config::default())
.owns(
jobs,
watcher::Config::default().labels(MANAGED_BY_LABEL_SELECTOR),
)
.run(reconcile, on_error, context)
.for_each(|rec_res| async move {
match rec_res {
Ok((load_generator, _)) => {
info!(load_generator.name, "reconcile success");
}
Err(err) => {
error!(?err, "reconcile error")
}
}
})
.await;
}

/// Perform a reconcile pass for the LoadGenerator CRD
async fn reconcile(
load_generator: Arc<LoadGenerator>,
cx: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Result<Action, Error> {
let meter = global::meter("keramik");
let runs = meter
.u64_counter("load_generator_reconcile_count")
.with_description("Number of load generator reconciles")
.init();

match reconcile_(load_generator, cx).await {
Ok(action) => {
runs.add(
1,
&[KeyValue {
key: "result".into(),
value: "ok".into(),
}],
);
Ok(action)
}
Err(err) => {
runs.add(
1,
&[KeyValue {
key: "result".into(),
value: "err".into(),
}],
);
Err(err)
}
}
}

/// Perform a reconcile pass for the LoadGenerator CRD
async fn reconcile_(
load_generator: Arc<LoadGenerator>,
cx: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Result<Action, Error> {
let spec = load_generator.spec();

let status = if let Some(status) = &load_generator.status {
status.clone()
} else {
// Generate new status with random name and nonce
LoadGeneratorStatus {
nonce: thread_rng().gen(),
name: "load-gen-"
.chars()
.chain(
thread_rng()
.sample_iter(&Alphanumeric)
.take(6)
.map(char::from),
)
.collect::<String>(),
}
};
debug!(?spec, ?status, "reconcile");

let ns = load_generator.namespace().unwrap();
let num_peers = get_num_peers(cx.clone(), &ns).await?;

// The load generator does not deploy the monitoring resources but they must exist in order to
// collect the results of load generators.
let ready = monitoring_ready(cx.clone(), &ns).await?;

if !ready {
return Ok(Action::requeue(Duration::from_secs(10)));
}


let job_image_config = JobImageConfig::from(spec);

let job_config = JobConfig {
name: status.name.clone(),
scenario: spec.scenario.to_owned(),
users: spec.users.to_owned(),
run_time: spec.run_time.to_owned(),
nonce: status.nonce,
job_image_config: job_image_config.clone(),
throttle_requests: spec.throttle_requests,
};
let orefs = load_generator
.controller_owner_ref(&())
.map(|oref| vec![oref])
.unwrap_or_default();

apply_job(cx.clone(), &ns, orefs.clone(), LOAD_GENERATOR_JOB_NAME, job_spec(job_config)).await?;

let load_generators: Api<LoadGenerator> = Api::namespaced(cx.k_client.clone(), &ns);
let _patched = load_generators
.patch_status(
&load_generator.name_any(),
&PatchParams::default(),
&Patch::Merge(serde_json::json!({ "status": status })),
)
.await?;

Ok(Action::requeue(Duration::from_secs(10)))
}
Loading