11-11-2022 12:32 AM
Here is a Python script that I put together to gather the installed versions of specified applications for Windows, Linux, and MacOS.
#!/usr/bin/env python3
import sqlite3
import requests
import json
import csv
import pathlib
import time
from datetime import date
# https://github.com/Textualize/rich
from rich.console import Console
from rich.progress import track
################################
## Define variables
################################
debug = False
ApplicationNameMacOS = "openssl"
ApplicationNameWindows = "openssl"
ApplicationNameLinux = "openssl"
################################
def main():
DetailedSystemInfo, SimpleSystemInfo = GetAllSystems()
DetailedUsersInfo, SimpleUsersInfo = GetAllUsers()
UsersToSystemsAssociation, SystemsToUsersAssociation = CreateUsersToSystemsAssociation(SimpleUsersInfo)
MacOSApplicationInstances = ListApplicationInstancesMacOS(ApplicationNameMacOS)
WindowsApplicationInstances = ListApplicationInstancesWindows(ApplicationNameWindows)
LinuxApplicationInstances = ListApplicationInstancesLinux(ApplicationNameLinux)
CreateApplicationToSystemOutput(
MacOSApplicationInstances,
WindowsApplicationInstances,
LinuxApplicationInstances,
SimpleSystemInfo,
SimpleUsersInfo,
SystemsToUsersAssociation
)
def LoadAPIKey():
api = open('../.apikey', 'r').read().strip()
if api is None:
print('API Key File Not Found')
print(quit)
quit
return api
def LoadOrgID():
x_org_id = open('../.x_org_id', 'r').read().strip()
if x_org_id is None:
print('X Org ID File Not Found')
print(quit)
quit
return x_org_id
def IsJSONFileValid(FilePath):
# Check if there is an existing file that is newer than 3 days and has valid JSON format
IsValid = False
f_path = pathlib.Path(f'{FilePath}')
f_path.parent.mkdir(exist_ok=True, parents=True)
if f_path.exists():
m_timestamp = f_path.stat().st_mtime
m_time = date.fromtimestamp(m_timestamp)
today = date.today()
delta = today - m_time
if delta.days > 3:
#print('\nFile is older than 3 days. Creating data.')
IsValid = False
return IsValid
else:
#print('\nFile is newer than 3 days. Using the file.')
IsValid = True
with f_path.open() as json_file:
# try reading the json file using the json interpreter
try:
JSONTesting = json.load(json_file)
IsValid = True
except ValueError as e:
# if the file is not valid, print the error
# and add the file to the list of invalid files
#print("JSON object issue: %s" % e)
IsValid = False
return IsValid
else:
#print('\nCreating data.')
IsValid = False
return IsValid
def GetAllSystems():
#########################################################################
# Get a list of all of the SystemIDs for use later
# https://docs.jumpcloud.com/api/1.0/index.html#tag/Search/operation/search_systems_post
#
# Returns two objects: DetailedSystemInfo and SimpleSystemInfo
# DetailedSystemInfo is the raw JSON object from the API call
# SimpleSystemInfo is a dict of dicts with the following format:
# dict 1 key is the SystemID
# The value for each key is a dict that contains displayName, hostname, os, and lastContact
#########################################################################
console = Console()
DetailedSystemInfoFilePath = './data/DetailedSystemInfo.json'
SimpleSystemInfoFilePath = './data/SimpleSystemInfo.json'
start = time.time()
with console.status("[green]Getting System Information...") as status:
if IsJSONFileValid(DetailedSystemInfoFilePath) == False or IsJSONFileValid(SimpleSystemInfoFilePath) == False:
api = LoadAPIKey()
orgID = LoadOrgID()
url = "https://console.jumpcloud.com/api/search/systems"
fields = ""
#filter = { 'and': [ {'os': 'Mac OS X'} ] }
filter = {}
searchFilter = {}
limit = ''
skip = '0'
sort = ''
payload = {
'fields':fields,
'limit':limit,
'searchFilter':searchFilter,
'filter':filter,
'skip':skip,
'sort':sort
}
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"x-org-id": orgID,
"x-api-key": api
}
response = requests.post(url, headers=headers, json=payload)
DetailedSystemInfo = response.json()
SimpleSystemInfo = {}
for i in range(DetailedSystemInfo['totalCount']):
TempValue = DetailedSystemInfo['results'][i]
system_id = TempValue['id']
displayName = TempValue.get('displayName', 'N/A')
hostname = TempValue.get('hostname', 'N/A')
os = TempValue.get('os', 'N/A')
lastContact = TempValue.get('lastContact', 'N/A')
# Add the details to the SimpleSystemInfo dictionary
SimpleSystemInfo[system_id] = {
'displayName':displayName,
'hostname':hostname,
'os':os,
'lastContact':lastContact
}
with open(DetailedSystemInfoFilePath, "w") as DetailedSystemInfoFileHandler:
json.dump(DetailedSystemInfo, DetailedSystemInfoFileHandler)
with open(SimpleSystemInfoFilePath, "w") as SimpleSystemInfoFileHandler:
json.dump(SimpleSystemInfo, SimpleSystemInfoFileHandler)
else:
DetailedSystemInfo = json.loads(open(DetailedSystemInfoFilePath, "r").read())
SimpleSystemInfo = json.loads(open(SimpleSystemInfoFilePath, "r").read())
#console.log("System information obtained.")
if debug:
import inspect
print("\n", inspect.stack()[0][3], ' elapsed time: ', round(time.time() - start, 2), " seconds. ")
return DetailedSystemInfo, SimpleSystemInfo
def GetAllUsers():
#########################################################################
# Get a list of all of the Users for use later
# https://docs.jumpcloud.com/api/1.0/index.html#tag/Systemusers/operation/systemusers_list
#########################################################################
console = Console()
api = LoadAPIKey()
orgID = LoadOrgID()
DetailedUsersInfoFilePath = './data/DetailedUsersInfo.json'
SimpleUsersInfoFilePath = './data/SimpleUsersInfo.json'
start = time.time()
with console.status("[green]Getting User information...") as status:
if IsJSONFileValid(DetailedUsersInfoFilePath) == False or IsJSONFileValid(SimpleUsersInfoFilePath) == False:
console = Console()
with console.status("[green]Getting System Information...") as status:
url = "https://console.jumpcloud.com/api/systemusers"
# There is a 100 user limit on the API, so we first have to determine how many users there are and
# pull 100 users at a time. Start with the limit of 1 to get the totalCount
limit = "1"
# Skip is the offset
skip = 0
payload = {
'limit':limit,
'skip':skip,
}
headers = {
"x-org-id": orgID,
"x-api-key": api
}
session = requests.Session()
response = session.get(url, headers=headers, params=payload)
# Determine the total number of users
TotalUsers = response.json()['totalCount']
# Create an empty list that will contain our users and set the limit and offset
DetailedUsersInfo = []
limit = "100"
# Skip is the offset
skip = 0
session = requests.Session()
while( skip < TotalUsers ):
payload = {
'fields':'_id username displayname email state',
'limit':limit,
'skip':skip
}
response = session.get(url, headers=headers, params=payload)
#response = requests.get(url, headers=headers, params=payload)
TempDictionary = response.json()['results']
[DetailedUsersInfo.append(item) for item in TempDictionary]
skip += 100
# Create a dictionary that has the UserID as the key and the Email address as the value
SimpleUsersInfo = {}
for UserInfo in DetailedUsersInfo:
username = UserInfo.get('username')
displayname = UserInfo.get('displayname', 'N/A')
email = UserInfo.get('email', 'N/A')
state = UserInfo.get('state', 'N/A')
SimpleUsersInfo[UserInfo['_id']] = {
'username':username,
'displayname':displayname,
'email':email,
'state':state
}
with open(DetailedUsersInfoFilePath, "w") as DetailedUsersInfoFileHandler:
json.dump(DetailedUsersInfo, DetailedUsersInfoFileHandler)
with open(SimpleUsersInfoFilePath, "w") as SimpleUsersInfoFileHandler:
json.dump(SimpleUsersInfo, SimpleUsersInfoFileHandler)
else:
DetailedUsersInfo = json.loads(open(DetailedUsersInfoFilePath, "r").read())
SimpleUsersInfo = json.loads(open(SimpleUsersInfoFilePath, "r").read())
#console.log("User information obtained.")
if debug:
import inspect
print("\n", inspect.stack()[0][3], ' elapsed time: ', round(time.time() - start, 2), " seconds. ")
return DetailedUsersInfo, SimpleUsersInfo
def CreateUsersToSystemsAssociation(SimpleUsersInfo):
#########################################################################
#
# https://docs.jumpcloud.com/api/2.0/index.html#tag/Users/operation/graph_userTraverseSystem
#
# This function takes in SimpleSystemsInfo and SimpleUsersInfo and uses the data from those to call the UserID to systems API
#
# Returns an object
#########################################################################
console = Console()
UsersToSystemsAssociationFilePath = './data/UsersToSystemsAssociation.json'
SystemsToUsersAssociationFilePath = './data/SystemsToUsersAssociation.json'
start = time.time()
if IsJSONFileValid(UsersToSystemsAssociationFilePath) == False:
api = LoadAPIKey()
orgID = LoadOrgID()
# Create a list named UsersToSystemList to contain our results
UsersToSystemsDict = {}
# This for loop goes through each User in the SimpleUsersInfo dictionary and places the details of the
# user in the UserInfo tuple.
# UserInfo[0] is a string and contains the UserID
# UserInfo[1] is a dictionary and contains 'username', 'displayname', 'email', and 'state'
for UserInfo in track(SimpleUsersInfo.items(), description="Creating the Users to System data..."):
UserID = UserInfo[0]
UserEmail = UserInfo[1]['email']
UserDisplayName = UserInfo[1]['displayname']
UserState = UserInfo[1]['state']
url = f'https://console.jumpcloud.com/api/v2/users/{UserID}/systems'
limit = '100'
skip = '0'
payload = {
'limit':limit,
'skip':skip
}
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"x-org-id": orgID,
"x-api-key": api
}
session = requests.Session()
response = session.get(url, headers=headers, params=payload)
if not response.ok:
print('An error has occurred when calling the Jumpcloud API for UserID {UserID} with UserEmail {UserEmail}')
err = requests.exceptions.HTTPError
print(err)
else:
# This API call returns a list named SystemsBoundToUser that contains all of the systems that this UserID is bound to.
# Each element of the list is a dictionary that contains the following fields, 'id' and 'type'.
# 'id' is the SystemID and 'type' is always system
SystemsBoundToUser = json.loads(response.text)
#print(json.dumps(responseJSON, indent=4, sort_keys=True))
# Now iterate over the list named SystemsBoundToUser in case a user is bound to more than one system.
TempSystemList = []
for System in SystemsBoundToUser:
# Since SystemsBoundToUser only contains a SystemID, it is not useful for a human readable report.
# We need to look up the System ID to get the system's information, such as displayName
# To do this, the dictionary named SimpleSystemsInfo is used.
# SimpleSystemsInfo's keys are SystemIDs, so we can match the SystemID contained in SystemsUsers['id']
# to gather details of the System
# Each System in SimpleSystemsInfo is a dictionary containing keys for displayName, hostname, os, and lastContact
# When iterating through SimpleSystemsInfo, the .items() function is used to create a tuple with
# System[0] containing the SystemID and
# System[1] containing a dictionary that contains displayName, hostname, os, and lastContact
# Add the System ID to the UserID's key in the dictionary UsersToSystemsDict
TempSystemList.append(System['id'])
UsersToSystemsDict[UserID] = TempSystemList
# Write the contents of UsersToSystemsDict to a json file
with open(UsersToSystemsAssociationFilePath, 'w') as UsersToSystemsAssociationFileHandler:
json.dump(UsersToSystemsDict, UsersToSystemsAssociationFileHandler)
else:
UsersToSystemsDict = json.loads(open(UsersToSystemsAssociationFilePath, "r").read())
if IsJSONFileValid(SystemsToUsersAssociationFilePath) == False:
# Instead of having to call another API endpoint, take the UsersToSystemsDict and
# invert it to create SystemsToUsersDict
SystemsToUsersDict = {}
ExistingValue = []
for UserSystems in track(UsersToSystemsDict.items(), description="Creating the System to Users data..."):
# UserSystems[0] is the UserID of User that we want to add
UserID = UserSystems[0]
# UserSystems[1] contains a list of the SystemIDs that a User belongs to
SystemList = UserSystems[1]
for System in SystemList:
# Pull the existing value, if any, and place into ExistingValue so that we can append to it.
ExistingValue = SystemsToUsersDict.get(System)
# If there is no key for System, then create an empty list and then add the UserID to that list
if ExistingValue == None:
SystemsToUsersDict[System] = []
SystemsToUsersDict[System] = [UserID]
else:
# If there is an existing value, we need to copy it out first. Otherwise, you will overwrite the existing value
# with the new value.
TempValue = SystemsToUsersDict[System]
TempValue.append(UserID)
SystemsToUsersDict[System] = TempValue
# Write the contents of UsersToSystemsDict to a json file
with open(SystemsToUsersAssociationFilePath, 'w') as SystemsToUsersAssociationFileHandler:
json.dump(SystemsToUsersDict, SystemsToUsersAssociationFileHandler)
else:
SystemsToUsersDict = json.loads(open(SystemsToUsersAssociationFilePath, "r").read())
if debug:
import inspect
print("\n", inspect.stack()[0][3], ' elapsed time: ', round(time.time() - start, 2), " seconds. ")
return UsersToSystemsDict, SystemsToUsersDict
def ListApplicationInstancesMacOS(ApplicationName):
#########################################################################
# List of all of the instances of an Application on MacOS
# Lists all apps for macOS devices. For Windows devices, use List System Insights Programs.
# https://docs.jumpcloud.com/api/2.0/index.html#tag/System-Insights/operation/systeminsights_list_apps
# #########################################################################
start = time.time()
api = LoadAPIKey()
orgID = LoadOrgID()
url = 'https://console.jumpcloud.com/api/v2/systeminsights/apps'
filter = { f'bundle_name:eq:{ApplicationName}' }
limit = '10000'
skip = '0'
sort = []
payload = {
'limit':limit,
'filter':filter,
'skip':skip,
'sort':sort
}
headers = {
'x-org-id': orgID,
'x-api-key': api
}
session = requests.Session()
response = session.get(url, headers=headers, params=payload)
responseJSON = json.loads(response.text)
#print(json.dumps(responseJSON, indent=4, sort_keys=True))
if debug:
import inspect
print(inspect.stack()[0][3], ' elapsed time: ', time.time() - start)
return responseJSON
def ListApplicationInstancesWindows(ApplicationName):
#########################################################################
# List of all of the instances of an Application on Windows
# https://docs.jumpcloud.com/api/2.0/index.html#tag/System-Insights/operation/systeminsights_list_programs
##########################################################################
start = time.time()
api = LoadAPIKey()
orgID = LoadOrgID()
url = 'https://console.jumpcloud.com/api/v2/systeminsights/programs'
filter = { f'name:eq:{ApplicationName}' }
#filter = {}
limit = '10000'
skip = '0'
sort = []
payload = {
'limit':limit,
'filter':filter,
'skip':skip,
'sort':sort
}
headers = {
'x-org-id': orgID,
'x-api-key': api
}
session = requests.Session()
response = session.get(url, headers=headers, params=payload)
responseJSON = json.loads(response.text)
#print(json.dumps(responseJSON, indent=4, sort_keys=True))
if debug:
import inspect
print(inspect.stack()[0][3], ' elapsed time: ', time.time() - start)
return responseJSON
def ListApplicationInstancesLinux(ApplicationName):
#########################################################################
# List of all of the instances of an Application on Linux
# https://docs.jumpcloud.com/api/2.0/index.html#tag/System-Insights/operation/systeminsights_list_linux_packages
##########################################################################
start = time.time()
api = LoadAPIKey()
orgID = LoadOrgID()
url = 'https://console.jumpcloud.com/api/v2/systeminsights/linux_packages'
filter = f'name:eq:{ApplicationName}'
#filter = ''
limit = '1000'
skip = '0'
sort = []
payload = {
'limit':limit,
'filter':filter,
'skip':skip,
'sort':sort
}
headers = {
'x-org-id': orgID,
'x-api-key': api
}
session = requests.Session()
response = session.get(url, headers=headers, params=payload)
responseJSON = json.loads(response.text)
#print(json.dumps(responseJSON, indent=4, sort_keys=True))
if debug:
import inspect
print(inspect.stack()[0][3], ' elapsed time: ', time.time() - start)
return responseJSON
def CreateApplicationToSystemOutput(MacOSApplicationInstances,WindowsApplicationInstances,LinuxApplicationInstances,SimpleSystemInfo,SimpleUsersInfo,SystemsToUsersAssociation):
CombinedApplicationListJSONFilePath = './data/CombinedApplicationList.json'
CombinedApplicationListCSVFilePath = './data/CombinedApplicationList.csv'
start = time.time()
CombinedApplicationList = []
for System in MacOSApplicationInstances:
TempSystemInfo = {}
TempUsersList = []
UserID = ''
SystemIDtoLookup = System['system_id']
TempSystemInfo['SystemDisplayName'] = SimpleSystemInfo[SystemIDtoLookup]['displayName']
TempSystemInfo['OS'] = 'MacOS'
TempSystemInfo['ApplicationName'] = System['bundle_name']
TempSystemInfo['Version'] = System['bundle_version']
TempSystemInfo['CollectionDateTime'] = System['collection_time']
# Need to convert the UserIDs to User email address
# for UserID in SystemsToUsersAssociation[SystemIDtoLookup]:
# TempUsersList.append(SimpleUsersInfo[UserID]['email'])
SystemUsers = SystemsToUsersAssociation.get(SystemIDtoLookup)
if SystemUsers != None:
for UserID in SystemUsers:
TempUsersList.append(SimpleUsersInfo[UserID]['email'])
else:
TempUsersList.append('no_user_found')
# Create a semicolon delimited string from the user email addresses
TempSystemInfo['Users'] = ';'.join(TempUsersList)
TempSystemInfo['SystemID'] = SystemIDtoLookup
CombinedApplicationList.append(TempSystemInfo)
for System in WindowsApplicationInstances:
TempSystemInfo = {}
TempUsersList = []
UserID = ''
SystemIDtoLookup = System['system_id']
TempSystemInfo['SystemDisplayName'] = SimpleSystemInfo[SystemIDtoLookup]['displayName']
TempSystemInfo['OS'] = 'Windows'
TempSystemInfo['ApplicationName'] = System['name']
TempSystemInfo['Version'] = System['version']
TempSystemInfo['CollectionDateTime'] = System['collection_time']
# Need to convert the UserIDs to User email address
SystemUsers = SystemsToUsersAssociation.get(SystemIDtoLookup)
if SystemUsers != None:
for UserID in SystemUsers:
TempUsersList.append(SimpleUsersInfo[UserID]['email'])
else:
TempUsersList.append('no_user_found')
# Create a semicolon delimited string from the user email addresses
TempSystemInfo['Users'] = ';'.join(TempUsersList)
TempSystemInfo['SystemID'] = SystemIDtoLookup
CombinedApplicationList.append(TempSystemInfo)
for System in LinuxApplicationInstances:
TempSystemInfo = {}
TempUsersList = []
UserID = ''
SystemIDtoLookup = System['system_id']
TempSystemInfo['SystemDisplayName'] = SimpleSystemInfo[SystemIDtoLookup]['displayName']
TempSystemInfo['OS'] = 'Linux'
TempSystemInfo['ApplicationName'] = System['name']
TempSystemInfo['Version'] = System['version']
TempSystemInfo['CollectionDateTime'] = System['collection_time']
# Need to convert the UserIDs to User email address
SystemUsers = SystemsToUsersAssociation.get(SystemIDtoLookup)
if SystemUsers != None:
for UserID in SystemUsers:
TempUsersList.append(SimpleUsersInfo[UserID]['email'])
else:
TempUsersList.append('no_user_found')
# Create a semicolon delimited string from the user email addresses
TempSystemInfo['Users'] = ';'.join(TempUsersList)
TempSystemInfo['SystemID'] = SystemIDtoLookup
CombinedApplicationList.append(TempSystemInfo)
if debug:
import inspect
print("\n", inspect.stack()[0][3], ' elapsed time: ', round(time.time() - start, 2), " seconds. ")
# Write the contents of UsersToSystemsDict to a json file
with open(CombinedApplicationListJSONFilePath, 'w') as CombinedApplicationListFileHandler:
json.dump(CombinedApplicationList, CombinedApplicationListFileHandler)
WriteListOfDictionariesToCSV(CombinedApplicationList,CombinedApplicationListCSVFilePath)
return CombinedApplicationList
def WriteListOfDictionariesToCSV(ContentsToWrite, FileName):
start = time.time()
keys = ContentsToWrite[0].keys()
# with open(FileName, 'w', newline='') as CSVFile:
# writer = csv.writer(CSVFile)
# writer.writerow(keys)
# writer.writerows(ContentsToWrite)
with open(FileName, 'w', encoding='utf8', newline='') as CSVFile:
fc = csv.DictWriter(CSVFile, fieldnames=keys)
fc.writeheader()
fc.writerows(ContentsToWrite)
if debug:
import inspect
print(inspect.stack()[0][3], ' elapsed time: ', time.time() - start)
if __name__ == '__main__':
main()
New to the site? Take a look at these additional resources:
Ready to join us? You can register here.