Skip to content

Commit 36d2bec

Browse files
committed
Merge branch 'main' into shared-mini-test-suite-follow-on
2 parents 3806a7b + 77078f3 commit 36d2bec

File tree

35 files changed

+972
-197
lines changed

35 files changed

+972
-197
lines changed

.asf.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919

2020
# https://github.com/apache/infrastructure-asfyaml/blob/main/README.md
2121

22+
notifications:
23+
24+
25+
pullrequests: [email protected]
26+
jira_options: worklog
27+
2228
github:
2329
description: "Apache Accumulo"
2430
homepage: https://accumulo.apache.org

assemble/bin/accumulo-cluster

Lines changed: 190 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ $(cyan Commands):
5252
$(green stop) Stops Accumulo cluster services
5353
$(green restart) Restarts Accumulo cluster services
5454
$(green kill) Kills Accumulo cluster services
55+
$(green prune) Reomves zookeeper locks of extra processes
5556
5657
$(cyan Examples):
5758
$(purple 'accumulo-cluster start') $(blue '# start all servers')
@@ -62,6 +63,8 @@ $(cyan Examples):
6263
$(purple 'accumulo-cluster start --sservers=group1') $(blue '# start all group1 sservers')
6364
$(purple 'accumulo-cluster start --sservers="group1 group2"') $(blue '# start all group1 and group2 sservers')
6465
$(purple 'accumulo-cluster start --local --manager --tservers') $(blue '# Start the local manager and local tservers')
66+
$(purple 'accumulo-cluster prune --compactors') $(blue '# prune all extra compactors across all groups')
67+
$(purple 'accumulo-cluster prune --compactors="group1"') $(blue '# prune extra compactors running in group1')
6568
6669
EOF
6770
}
@@ -287,8 +290,14 @@ function parse_config() {
287290
exit 1
288291
fi
289292

290-
trap 'rm -f "$CONFIG_FILE"' EXIT
291-
CONFIG_FILE=$(mktemp --tmpdir "ClusterConfigParser-XXXXXXXX.out") || exit 1
293+
AC_TMP_DIR=$(mktemp -t -d "accumulo-cluster-XXXXXXXX") || exit 1
294+
if isDebug; then
295+
echo "$(blue DEBUG): Temporary files for this run are in $AC_TMP_DIR"
296+
else
297+
trap 'rm -rf -- "$AC_TMP_DIR"' EXIT
298+
fi
299+
300+
CONFIG_FILE="$AC_TMP_DIR/ClusterConfigParser.out"
292301
"$accumulo_cmd" org.apache.accumulo.core.conf.cluster.ClusterConfigParser "$conf/cluster.yaml" "$CONFIG_FILE" || parse_fail
293302
#shellcheck source=/dev/null
294303
. "$CONFIG_FILE"
@@ -590,9 +599,183 @@ function control_services() {
590599

591600
if [[ $ARG_LOCAL == 0 && $ARG_ALL == 1 && ($operation == "stop" || $operation == "kill") ]]; then
592601
debug "Cleaning all server entries in ZooKeeper"
593-
debugOrRun "$accumulo_cmd" org.apache.accumulo.server.util.ZooZap -verbose -manager -tservers -compactors -sservers
602+
debugOrRun "$accumulo_cmd" org.apache.accumulo.server.util.ZooZap -verbose -manager -tservers -compactors -sservers --gc --monitor
603+
fi
604+
605+
}
606+
607+
function prune_group() {
608+
local service_type=$1
609+
local group=$2
610+
local expectedCount=$3
611+
declare -a hosts
612+
read -r -a hosts <<<"$4"
613+
614+
if isDebug; then
615+
echo "$(blue DEBUG) starting prune for service:$service_type group:$group expected:$expectedCount"
616+
fi
617+
618+
if [ -z ${AC_TMP_DIR+x} ]; then
619+
echo "$(red ERROR): AC_TMP_DIR is not set"
620+
exit 1
621+
fi
622+
local exclude_file="$AC_TMP_DIR/accumulo-zoozap-exclude-$service_type-$group.txt"
623+
touch "$exclude_file"
624+
625+
# Determine the host:ports known by the accumulo cluster script, these should be kept
626+
for host in "${hosts[@]}"; do
627+
"${SSH[@]}" "$host" bash -c "'$bin/accumulo-service $service_type list'" | grep -E "^[a-zA-Z0-9]+_${group}_[0-9]+" | head -n "$expectedCount" | awk '{print $3}' | tr ',' '\n' | awk '{print "'"$host"':" $1}' >>"$exclude_file"
628+
done
629+
630+
local lockTypeOpt
631+
case $service_type in
632+
manager)
633+
lockTypeOpt="-manager"
634+
;;
635+
compaction-coordinator)
636+
lockTypeOpt="-compaction-coordinators"
637+
;;
638+
compactor)
639+
lockTypeOpt="-compactors"
640+
;;
641+
tserver)
642+
lockTypeOpt="-tservers"
643+
;;
644+
sserver)
645+
lockTypeOpt="-sservers"
646+
;;
647+
gc)
648+
lockTypeOpt="--gc"
649+
;;
650+
monitor)
651+
lockTypeOpt="--monitor"
652+
;;
653+
*)
654+
echo "Prune does not support $service_type"
655+
exit 1
656+
;;
657+
esac
658+
659+
if isDebug; then
660+
"$accumulo_cmd" org.apache.accumulo.server.util.ZooZap "$lockTypeOpt" -verbose --include-groups "$group" --exclude-host-ports "$exclude_file" --dry-run
661+
else
662+
"$accumulo_cmd" org.apache.accumulo.server.util.ZooZap "$lockTypeOpt" -verbose --include-groups "$group" --exclude-host-ports "$exclude_file"
663+
fi
664+
}
665+
666+
# Kills extra server processes that are not needed according to the
667+
# cluster.yaml file. Conceptually this code is trying to reconcile the
668+
# following three sets of servers.
669+
#
670+
# 1. The notional goal set of servers specified by cluster.yaml
671+
# 2. The set of servers processes seen in zookeeper
672+
# 3. The set of server processes known to the accumulo-cluster script. This
673+
# is derived from pid files on hosts in set 1.
674+
#
675+
# This function attempts to find extra servers in set 2 that are not specified
676+
# by set 1. When it does find extra servers it removes their zookeeper locks
677+
# avoiding removing locks of servers in set 3. The following are different
678+
# situations the code will see and handle.
679+
#
680+
# * When a host is not cluster.yaml but has some processes listed in
681+
# zookeeper. For this case all of the process with that host can be killed.
682+
# * When a resource group is not in cluster.yaml but has some processes listed
683+
# in zookeeper. For this case all of the processes with that resource group
684+
# can be killed.
685+
# * When a host is in cluster.yaml with a target of 3 processes but has 6
686+
# processes listed in zookeeper. For this case want to kill 3 processes that
687+
# do not have pid files on the host.
688+
#
689+
function prune() {
690+
if [[ $ARG_LOCAL == 1 ]]; then
691+
# Currently the code is structured to remove all extra servers in a single resource group. Finer granularity is not supported.
692+
echo "$(red ERROR): Prune does not support running locally"
693+
exit 1
694+
fi
695+
696+
if ! jq -h >&/dev/null; then
697+
echo "$(red ERROR:) Missing $(green jq). Unable to continue."
698+
exit 1
699+
fi
700+
701+
if [[ -z ${AC_TMP_DIR+x} ]]; then
702+
echo "AC_TMP_DIR is not set"
703+
exit 1
704+
fi
705+
local service_json="$AC_TMP_DIR/accumulo-service.json"
706+
"$accumulo_cmd" admin serviceStatus --json >"$service_json" 2>/dev/null || exit 1
707+
708+
local var_name
709+
local hosts
710+
declare -a groups
711+
712+
local manager
713+
if [[ $ARG_ALL == 1 || $ARG_MANAGER == 1 ]]; then
714+
prune_group "manager" "default" "1" "$MANAGER_HOSTS"
715+
fi
716+
717+
if [[ $ARG_ALL == 1 || $ARG_GC == 1 ]]; then
718+
prune_group "gc" "default" "1" "$GC_HOSTS"
594719
fi
595720

721+
if [[ $ARG_ALL == 1 || $ARG_MONITOR == 1 ]]; then
722+
prune_group "monitor" "default" "1" "$MONITOR_HOSTS"
723+
fi
724+
725+
if [[ $ARG_ALL == 1 || $ARG_TSERVER == 1 ]]; then
726+
groups=()
727+
if [[ -n $ARG_TSERVER_GROUP ]]; then
728+
read -r -a groups <<<"$ARG_TSERVER_GROUP"
729+
else
730+
# find all groups known in zookeeper, this will allow pruning entire groups that do not even exist in cluster.yaml
731+
readarray -t groups < <(jq -r ".summaries.T_SERVER.resourceGroups | .[] " "$service_json")
732+
fi
733+
734+
for group in "${groups[@]}"; do
735+
var_name="TSERVERS_PER_HOST_$group"
736+
local expected=${!var_name:-0}
737+
738+
hosts="TSERVER_HOSTS_$group"
739+
prune_group "tserver" "$group" "$expected" "${!hosts}"
740+
done
741+
fi
742+
743+
if [[ $ARG_ALL == 1 || $ARG_SSERVER == 1 ]]; then
744+
groups=()
745+
if [[ -n $ARG_SSERVER_GROUP ]]; then
746+
read -r -a groups <<<"$ARG_SSERVER_GROUP"
747+
else
748+
# find all groups known in zookeeper, this will allow pruning entire groups that do not even exist in cluster.yaml
749+
readarray -t groups < <(jq -r ".summaries.S_SERVER.resourceGroups | .[] " "$service_json")
750+
fi
751+
752+
for group in "${groups[@]}"; do
753+
var_name="SSERVERS_PER_HOST_$group"
754+
local expected=${!var_name:-0}
755+
756+
hosts="SSERVER_HOSTS_$group"
757+
prune_group "sserver" "$group" "$expected" "${!hosts}"
758+
done
759+
760+
fi
761+
762+
if [[ $ARG_ALL == 1 || $ARG_COMPACTOR == 1 ]]; then
763+
groups=()
764+
if [[ -n $ARG_COMPACTOR_GROUP ]]; then
765+
read -r -a groups <<<"$ARG_COMPACTOR_GROUP"
766+
else
767+
# find all groups known in zookeeper, this will allow pruning entire groups that do not even exist in cluster.yaml
768+
readarray -t groups < <(jq -r ".summaries.COMPACTOR.resourceGroups | .[] " "$service_json")
769+
fi
770+
771+
for group in "${groups[@]}"; do
772+
var_name="COMPACTORS_PER_HOST_$group"
773+
local expected=${!var_name:-0}
774+
775+
hosts="COMPACTOR_HOSTS_$group"
776+
prune_group "compactor" "$group" "$expected" "${!hosts}"
777+
done
778+
fi
596779
}
597780

598781
function main() {
@@ -674,6 +857,10 @@ EOF
674857
parse_config
675858
control_services kill
676859
;;
860+
prune)
861+
parse_config
862+
prune
863+
;;
677864
*)
678865
invalid_args "'$ARG_CMD' is an invalid <command>"
679866
;;

core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Objects;
2727
import java.util.function.Predicate;
2828

29+
import org.apache.accumulo.core.client.PluginEnvironment;
2930
import org.apache.accumulo.core.client.Scanner;
3031
import org.apache.accumulo.core.client.admin.TableOperations;
3132
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
@@ -37,6 +38,8 @@
3738
import org.apache.accumulo.core.data.Key;
3839
import org.apache.accumulo.core.data.LoadPlan;
3940
import org.apache.accumulo.core.data.Range;
41+
import org.apache.accumulo.core.data.TableId;
42+
import org.apache.accumulo.core.iterators.IteratorEnvironment;
4043
import org.apache.accumulo.core.security.Authorizations;
4144
import org.apache.accumulo.core.util.RowRangeUtil;
4245
import org.apache.hadoop.fs.FileSystem;
@@ -208,17 +211,25 @@ public interface ScannerOptions {
208211
* {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto
209212
* properties could be passed here.
210213
*
214+
* <p>
215+
* Configured iterators will have access to these properties via the
216+
* {@link PluginEnvironment#getConfiguration(TableId)} (obtained by
217+
* {@link IteratorEnvironment#getPluginEnv()}). The tableId used to get the configuration should
218+
* be the one returned programmatically from {@link IteratorEnvironment#getTableId()}.
219+
*
211220
* @param props iterable over Accumulo table key value properties.
212221
* @return this
213222
*/
214223
ScannerOptions withTableProperties(Iterable<Entry<String,String>> props);
215224

216225
/**
217-
* @see #withTableProperties(Iterable) Any property that impacts file behavior regardless of
218-
* whether it has the {@link Property#TABLE_PREFIX} may be accepted and used. For example,
219-
* cache and crypto properties could be passed here.
226+
* Any property that impacts file behavior regardless of whether it has the
227+
* {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto
228+
* properties could be passed here.
229+
*
220230
* @param props a map instead of an Iterable
221231
* @return this
232+
* @see #withTableProperties(Iterable)
222233
*/
223234
ScannerOptions withTableProperties(Map<String,String> props);
224235

@@ -296,11 +307,13 @@ public interface SummaryOptions {
296307
SummaryOptions withTableProperties(Iterable<Entry<String,String>> props);
297308

298309
/**
299-
* @see #withTableProperties(Iterable) Any property that impacts file behavior regardless of
300-
* whether it has the {@link Property#TABLE_PREFIX} may be accepted and used. For example,
301-
* cache and crypto properties could be passed here.
310+
* Any property that impacts file behavior regardless of whether it has the
311+
* {@link Property#TABLE_PREFIX} may be accepted and used. For example, cache and crypto
312+
* properties could be passed here.
313+
*
302314
* @param props a map instead of an Iterable
303315
* @return this
316+
* @see #withTableProperties(Iterable)
304317
*/
305318
SummaryOptions withTableProperties(Map<String,String> props);
306319

core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.accumulo.core.fate.FateStore;
2525
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
2626
import org.apache.accumulo.core.fate.Repo;
27+
import org.apache.accumulo.core.fate.user.schema.FateSchema;
2728

2829
public interface FateMutator<T> {
2930

@@ -57,19 +58,17 @@ public interface FateMutator<T> {
5758
FateMutator<T> requireAbsentKey();
5859

5960
/**
60-
* Add a conditional mutation to
61-
* {@link org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN}
62-
* that will put the reservation if there is not already a reservation present
61+
* Add a conditional mutation to {@link FateSchema.TxAdminColumnFamily#RESERVATION_COLUMN} that
62+
* will put the reservation if there is not already a reservation present
6363
*
6464
* @param reservation the reservation to attempt to put
6565
* @return the FateMutator with this added mutation
6666
*/
6767
FateMutator<T> putReservedTx(FateStore.FateReservation reservation);
6868

6969
/**
70-
* Add a conditional mutation to
71-
* {@link org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN}
72-
* that will delete the column if the column value matches the given reservation
70+
* Add a conditional mutation to {@link FateSchema.TxAdminColumnFamily#RESERVATION_COLUMN} that
71+
* will delete the column if the column value matches the given reservation
7372
*
7473
* @param reservation the reservation to attempt to remove
7574
* @return the FateMutator with this added mutation

core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
4545
import org.apache.accumulo.core.fate.Repo;
4646
import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
47+
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxAdminColumnFamily;
4748
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
4849
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
4950
import org.apache.accumulo.core.security.Authorizations;
@@ -69,7 +70,7 @@ public FateMutatorImpl(ClientContext context, String tableName, FateId fateId) {
6970

7071
@Override
7172
public FateMutator<T> putStatus(TStatus status) {
72-
TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
73+
TxAdminColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
7374
return this;
7475
}
7576

@@ -96,8 +97,8 @@ public FateMutator<T> requireAbsent() {
9697
@Override
9798
public FateMutator<T> requireUnreserved() {
9899
Preconditions.checkState(!requiredUnreserved);
99-
Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
100-
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
100+
Condition condition = new Condition(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
101+
TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
101102
mutation.addCondition(condition);
102103
requiredUnreserved = true;
103104
return this;
@@ -114,23 +115,23 @@ public FateMutator<T> requireAbsentKey() {
114115
@Override
115116
public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
116117
requireUnreserved();
117-
TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized()));
118+
TxAdminColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized()));
118119
return this;
119120
}
120121

121122
@Override
122123
public FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation) {
123-
Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
124-
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
124+
Condition condition = new Condition(TxAdminColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
125+
TxAdminColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
125126
.setValue(reservation.getSerialized());
126127
mutation.addCondition(condition);
127-
TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
128+
TxAdminColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
128129
return this;
129130
}
130131

131132
@Override
132133
public FateMutator<T> putFateOp(byte[] data) {
133-
TxInfoColumnFamily.FATE_OP_COLUMN.put(mutation, new Value(data));
134+
TxAdminColumnFamily.FATE_OP_COLUMN.put(mutation, new Value(data));
134135
return this;
135136
}
136137

0 commit comments

Comments
 (0)