Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 57 additions & 26 deletions kandji2snipe
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# the README at https://github.com/grokability/kandji2snipe
#

version = "1.0.0"
version = "1.1.0"

# Standard Library Imports
import json
Expand Down Expand Up @@ -115,25 +115,25 @@ def get_settings():
logging.debug("Checking the settings.conf file for valid values.")

if config['kandji']['tenant'] == "TENANTNAME" or config['kandji']['tenant'] == "":
logging.error('Error: Invalid Kandji Tenant, check your settings.conf and try again.')
logging.error('Invalid Kandji Tenant - check your settings.conf and try again')
sys.exit(exit_error_message)

if config['kandji']['region'] != "us" and config['kandji']['region'] != "eu":
logging.error('Invalid Kandji Region, check your settings.conf and try again.')
logging.error('Invalid Kandji Region - check your settings.conf and try again')
sys.exit(exit_error_message)

if os.environ.get("KANDJI_APITOKEN") == "":
if config['kandji']['apitoken'] == "kandji-api-bearer-token-here" or config['kandji']['apitoken'] == "" :
logging.error('Invalid Kandji API Token, check your settings.conf or environment variables and try again.')
logging.error('Invalid Kandji API Token - check your settings.conf or environment variables and try again')
sys.exit(exit_error_message)

if config['snipe-it']['url'] == "https://your_snipe_instance.com"or config['snipe-it']['url'] == "":
logging.error('Invalid Snipe-IT URL, check your settings.conf and try again.')
logging.error('Invalid Snipe-IT URL - check your settings.conf and try again')
sys.exit(exit_error_message)

if os.environ.get("SNIPE_APIKEY") == "":
if config['snipe-it']['apikey'] == "snipe-api-key-here" or config['snipe-it']['apikey'] == "" :
logging.error('Invalid Snipe-IT API Key, check your settings.conf or environment variables and try again.')
logging.error('Invalid Snipe-IT API Key - check your settings.conf or environment variables and try again')
sys.exit(exit_error_message)

if config['snipe-it']['mac_custom_fieldset_id'] != "":
Expand Down Expand Up @@ -437,8 +437,29 @@ def get_kandji_device_details(kandji_id):
# Function to lookup last activity date and time for a Kandji asset.
def get_kandji_device_activity_date(kandji_id):
endpoint=f"/api/v1/devices/{kandji_id}/activity"
limit = 300

# First call to get total count and determine pagination
params = {"limit": limit}
logging.debug('Calling for device activity in Kandji against: {}'.format(kandji_base + endpoint))
response = kandji_api(method="GET", endpoint=endpoint)
response = kandji_api(method="GET", endpoint=endpoint, params=params)

activity_data = response['activity']
total_count = activity_data.get('count', 0)

# If we got all results in the first call, return as is
if total_count <= limit:
return response

# Calculate the offset to get the last page
# We want to ensure we get the very last entry
last_page_offset = max(0, total_count - limit)

# Fetch the last page
params = {"limit": limit, "offset": last_page_offset}
logging.debug('Fetching last page of activity (total: {}, offset: {})'.format(total_count, last_page_offset))
response = kandji_api(method="GET", endpoint=endpoint, params=params)

return response

# Function to update the asset tag of devices in Kandji with an number passed from Snipe-IT.
Expand All @@ -463,7 +484,7 @@ def search_snipe_asset(serial):
logging.info("No assets match {}".format(serial))
return "NoMatch"
else:
logging.warning('FOUND {} matching assets while searching for: {}'.format(jsonresponse['total'], serial))
logging.warning('Found {} matching assets while searching for: {}'.format(jsonresponse['total'], serial))
return "MultiMatch"
else:
logging.info("No assets match {}".format(serial))
Expand Down Expand Up @@ -600,7 +621,7 @@ def update_snipe_asset(snipe_id, payload):
logging.warning('Unable to update ID: {}. We failed to update the {} field with "{}"'.format(snipe_id, key, payload[key]))
goodupdate = False
else:
logging.info("Sucessfully updated {} with: {}".format(key, payload[key]))
logging.info("Successfully updated {} with: {}".format(key, payload[key]))
return goodupdate
else:
logging.error('Whoops. Got an error status code while updating ID {}: {} - {}'.format(snipe_id, response.status_code, response.content))
Expand Down Expand Up @@ -628,25 +649,27 @@ def checkout_snipe_asset(user, asset_id, asset_name, checked_out_user=None):
if user_id == 'NotFound':
logging.info("User {} not found in Snipe-IT, skipping check out".format(user))
return "NotFound"
if checked_out_user == None:
logging.info("Not checked out, checking out {} (ID: {}) to {}.".format(asset_name,asset_id,user))
elif checked_out_user == "NewAsset":
logging.info("First time this asset will be checked out, checking out to {}".format(user))
if checked_out_user == None or checked_out_user == "NewAsset":
logging.info("Checking out {} (ID: {}) to {}".format(asset_name,asset_id,user))
else:
logging.info("Checking in {} (ID: {}) to check it out to {}".format(asset_name,asset_id,user))
checkin_snipe_asset(asset_id)
api_url = '{}/api/v1/hardware/{}/checkout'.format(snipe_base, asset_id)
logging.info("Checking out {} (ID: {}) to {}.".format(asset_name,asset_id,user))
payload = {
'checkout_to_type':'user',
'assigned_user':user_id,
'note':'Checked out by kandji2snipe.'
}
logging.debug('The payload for the Snipe-IT checkin is: {}'.format(payload))
logging.debug('The payload for the Snipe-IT checkout is: {}'.format(payload))
response = requests.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler})
logging.debug('The response from Snipe-IT is: {}'.format(response.json()))
if response.status_code == 200:
logging.debug("Got back status code: 200 - {}".format(response.content))
was_checked_in = checked_out_user not in [None, "NewAsset"]
if was_checked_in:
logging.info("Successfully checked in and checked out {} (ID: {}) to {}".format(asset_name,asset_id,user))
else:
logging.info("Successfully checked out {} (ID: {}) to {}".format(asset_name,asset_id,user))
return "CheckedOut"
else:
logging.error('Asset checkout failed for asset {} with error {}'.format(asset_id,response.text))
Expand Down Expand Up @@ -877,29 +900,30 @@ for kandji_device_type in kandji_device_types:

# Log an error if there's an issue, or more than once match.
elif snipe == 'MultiMatch':
logging.warning("WARN: You need to resolve multiple assets with the same serial number in your inventory. If you can't find them in your inventory, you might need to purge your deleted records. You can find that in the Snipe-IT Admin settings. Skipping serial number {} for now.".format(kandji['hardware_overview']['serial_number']))
logging.warning("Multiple assets with serial number {} found - resolve duplicates in Snipe-IT (check Admin > Purge Deleted Records if needed)".format(kandji['hardware_overview']['serial_number']))
elif snipe == 'ERROR':
logging.error("We got an error when looking up serial number {} in Snipe-IT, which shouldn't happen at this point. Check your Snipe-IT instance and setup. Skipping for now.".format(kandji['hardware_overview']['serial_number']))

else:
# Only update if Kandji has more recent info.
snipe_id = snipe['rows'][0]['id']
snipe_time = snipe['rows'][0]['updated_at']['datetime']
kandji_device_activity = get_kandji_device_activity_date(kandji['general']['device_id'])
kandji_device_activity = get_kandji_device_activity_date(kandji['general']['device_id'])
kandji_time_conversion = datetime.strptime(kandji_device_activity['activity']['results'][-1]['created_at'], '%Y-%m-%dT%H:%M:%S.%fZ')
kandji_time_conversion = pytz.timezone('UTC').localize(kandji_time_conversion)
kandji_time_conversion = kandji_time_conversion.astimezone(pytz.timezone(config['snipe-it']['timezone']))
kandji_time = kandji_time_conversion.strftime('%Y-%m-%d %H:%M:%S')

# Check to see that the Kandji record is newer than the previous Snipe-IT update, or if it is a new record in Snipe-IT
# Check to see that the Kandji record is newer than the previous Snipe-IT update and has different data
if ( kandji_time > snipe_time ) or ( user_args.force ):
if user_args.force:
logging.info("Forcing the update regardless of the timestamps due to -f being used.")
logging.debug("Updating the Snipe-IT asset because Kandji has a more recent timestamp: {} > {} or the Snipe-IT record is new".format(kandji_time, snipe_time))
logging.info("Forcing field comparison regardless of timestamps (-f flag used)")
else:
logging.info("Kandji record has newer timestamp than Snipe-IT - checking for field differences".format(kandji_time, snipe_time))
updates = {}

if html.unescape(snipe['rows'][0]['name']) != kandji['general']['device_name']:
logging.info('Device name changed in Kandji... Updating Snipe-IT')
logging.info('Device name changed in Kandji - updating Snipe-IT')
updates={'name': kandji['general']['device_name']}

for snipekey in config['{}-api-mapping'.format(kandji_device_type)]:
Expand Down Expand Up @@ -941,7 +965,11 @@ for kandji_device_type in kandji_device_types:
logging.debug("Skipping the payload, because it already exists, or the Snipe-IT key we're mapping to doesn't.")

if updates:
logging.info("Found {} field difference(s) - updating Snipe-IT: {}".format(len(updates), list(updates.keys())))
logging.debug("Updates to be made: {}".format(updates))
update_snipe_asset(snipe_id, updates)
else:
logging.info("All fields match - no updates needed despite newer timestamp")

if user_args.users:
if snipe['rows'][0]['status_label']['status_meta'] in ('deployable', 'deployed'):
Expand All @@ -963,16 +991,19 @@ for kandji_device_type in kandji_device_types:
logging.info("Can't checkout {} since the status isn't set to deployable".format(kandji['general']['device_name']))

else:
logging.info("Snipe-IT record is newer than the Kandji record. Nothing to sync. If this wrong, then force an inventory update in Kandji")
logging.debug("Not updating the Snipe-IT asset because Snipe-IT has a more recent timestamp: {} < {}".format(kandji_time, snipe_time))
logging.info("Skipping Kandji → Snipe-IT sync (Snipe-IT record is newer)".format(kandji_time, snipe_time))
logging.debug("To force sync from Kandji to Snipe-IT despite timestamps, use the -f flag")

# Sync the Snipe-IT Asset Tag Number back to Kandji if needed
# The user arg below is set to false if it's called, so this would fail if the user called it.
if (kandji['general']['asset_tag'] != snipe['rows'][0]['asset_tag']) and user_args.do_not_update_kandji :
logging.info("Asset tag changed in Snipe-IT... Updating Kandji")
logging.info("Syncing asset tag from Snipe-IT Kandji (asset tag differs)")
if snipe['rows'][0]['asset_tag'][0]:
update_kandji_asset_tag("{}".format(kandji['general']['device_id']), '{}'.format(snipe['rows'][0]['asset_tag']))
logging.info("Updating device record")
kandji_update_response = update_kandji_asset_tag("{}".format(kandji['general']['device_id']), '{}'.format(snipe['rows'][0]['asset_tag']))
if kandji_update_response and 'error' not in kandji_update_response:
logging.info("Successfully updated asset tag to '{}' in Kandji".format(snipe['rows'][0]['asset_tag']))
else:
logging.error("Failed to update asset tag in Kandji: {}".format(kandji_update_response))

if user_args.ratelimited:
logging.debug('Total amount of API calls made: {}'.format(snipe_api_count))