cancel
Showing results for 
Search instead for 
Did you mean: 
Disclaimer
JUMPCLOUD EXPRESSLY DISCLAIMS ALL REPRESENTATIONS, WARRANTIES, CONDITIONS, AND LIABILITIES OF ANY KIND ARISING FROM OR RELATED TO THIRD-PARTY SOFTWARE, SCRIPTS, REPOSITORIES, AND APIS. JUMPCLOUD IS NOT REQUIRED TO SUPPORT ANY SUCH THIRD-PARTY MATERIALS AND ALL RISKS RELATED TO THIRD-PARTY MATERIALS ARE YOUR RESPONSIBILITY. PLEASE ALSO REVIEW THE JUMPCLOUD TOS.

Python script to get installed versions of software

simonbh-cbu
Novitiate II

Here is a Python script that I put together to gather the installed versions of specified applications for Windows, Linux, and MacOS.

#!/usr/bin/env python3

import sqlite3
import requests
import json
import csv

import pathlib
import time

from datetime import date

# https://github.com/Textualize/rich
from rich.console import Console
from rich.progress import track


################################
## Define variables
################################

debug = False

ApplicationNameMacOS = "openssl"
ApplicationNameWindows = "openssl"
ApplicationNameLinux = "openssl"

################################

def main():
    
    DetailedSystemInfo, SimpleSystemInfo = GetAllSystems()
    DetailedUsersInfo, SimpleUsersInfo = GetAllUsers()

    UsersToSystemsAssociation, SystemsToUsersAssociation = CreateUsersToSystemsAssociation(SimpleUsersInfo)

    MacOSApplicationInstances   = ListApplicationInstancesMacOS(ApplicationNameMacOS)
    WindowsApplicationInstances = ListApplicationInstancesWindows(ApplicationNameWindows)
    LinuxApplicationInstances   = ListApplicationInstancesLinux(ApplicationNameLinux)

    CreateApplicationToSystemOutput(
        MacOSApplicationInstances,
        WindowsApplicationInstances,
        LinuxApplicationInstances,
        SimpleSystemInfo,
        SimpleUsersInfo,
        SystemsToUsersAssociation
    )


def LoadAPIKey():
    api = open('../.apikey', 'r').read().strip()
    if api is None:
        print('API Key File Not Found')
        print(quit)
        quit
    return api


def LoadOrgID():
    x_org_id = open('../.x_org_id', 'r').read().strip()
    if x_org_id is None:
        print('X Org ID File Not Found')
        print(quit)
        quit
    return x_org_id


def IsJSONFileValid(FilePath):
    # Check if there is an existing file that is newer than 3 days and has valid JSON format
    IsValid = False

    f_path = pathlib.Path(f'{FilePath}')
    f_path.parent.mkdir(exist_ok=True, parents=True)
    
    if f_path.exists():
        m_timestamp = f_path.stat().st_mtime
        m_time = date.fromtimestamp(m_timestamp)
        today = date.today()
        delta = today - m_time
        if delta.days > 3:
            #print('\nFile is older than 3 days.  Creating data.')
            IsValid = False
            return IsValid
        else:
            #print('\nFile is newer than 3 days.  Using the file.')
            IsValid = True
        with f_path.open() as json_file:
            # try reading the json file using the json interpreter
            try:
                JSONTesting = json.load(json_file)
                IsValid = True
            except ValueError as e:
                # if the file is not valid, print the error 
                #  and add the file to the list of invalid files
                #print("JSON object issue: %s" % e)
                IsValid = False
                return IsValid
    else:
        #print('\nCreating data.')
        IsValid = False
    return IsValid


def GetAllSystems():
    #########################################################################
    # Get a list of all of the SystemIDs for use later
    # https://docs.jumpcloud.com/api/1.0/index.html#tag/Search/operation/search_systems_post
    #
    # Returns two objects: DetailedSystemInfo and SimpleSystemInfo
    # DetailedSystemInfo is the raw JSON object from the API call
    # SimpleSystemInfo is a dict of dicts with the following format:
    #   dict 1 key is the SystemID
    #       The value for each key is a dict that contains displayName, hostname, os, and lastContact
    #########################################################################
    console = Console()

    DetailedSystemInfoFilePath = './data/DetailedSystemInfo.json'
    SimpleSystemInfoFilePath   = './data/SimpleSystemInfo.json'

    start = time.time()
    with console.status("[green]Getting System Information...") as status:
        if IsJSONFileValid(DetailedSystemInfoFilePath) == False or IsJSONFileValid(SimpleSystemInfoFilePath) == False:
            api = LoadAPIKey()
            orgID = LoadOrgID()

            url = "https://console.jumpcloud.com/api/search/systems"

            fields = ""
            #filter = { 'and': [ {'os': 'Mac OS X'} ] }
            filter = {}
            searchFilter = {}
            limit = ''
            skip = '0'
            sort = ''
            payload = { 
                'fields':fields,
                'limit':limit,
                'searchFilter':searchFilter,
                'filter':filter,
                'skip':skip,
                'sort':sort 
            }
            headers = {
                "Accept": "application/json",
                "Content-Type": "application/json",
                "x-org-id": orgID,
                "x-api-key": api
            }
            response = requests.post(url, headers=headers, json=payload)
            
            DetailedSystemInfo = response.json()
            SimpleSystemInfo = {}

            for i in range(DetailedSystemInfo['totalCount']):
                TempValue   = DetailedSystemInfo['results'][i]
                
                system_id   = TempValue['id']
                displayName = TempValue.get('displayName', 'N/A')
                hostname    = TempValue.get('hostname', 'N/A')
                os          = TempValue.get('os', 'N/A')
                lastContact = TempValue.get('lastContact', 'N/A')
                
                # Add the details to the SimpleSystemInfo dictionary
                SimpleSystemInfo[system_id] = {
                    'displayName':displayName, 
                    'hostname':hostname,
                    'os':os,
                    'lastContact':lastContact
                }
            
            with open(DetailedSystemInfoFilePath, "w") as DetailedSystemInfoFileHandler:
                json.dump(DetailedSystemInfo, DetailedSystemInfoFileHandler)
            
            with open(SimpleSystemInfoFilePath, "w") as SimpleSystemInfoFileHandler:
                json.dump(SimpleSystemInfo, SimpleSystemInfoFileHandler)
            
        else:
            DetailedSystemInfo = json.loads(open(DetailedSystemInfoFilePath, "r").read())
            SimpleSystemInfo = json.loads(open(SimpleSystemInfoFilePath, "r").read())

    #console.log("System information obtained.")
    
    if debug:
            import inspect
            print("\n", inspect.stack()[0][3], ' elapsed time: ', round(time.time() - start, 2), " seconds. ")

    return DetailedSystemInfo, SimpleSystemInfo
        
        
def GetAllUsers():
    #########################################################################
    # Get a list of all of the Users for use later
    # https://docs.jumpcloud.com/api/1.0/index.html#tag/Systemusers/operation/systemusers_list
    #########################################################################
    console = Console()

    api = LoadAPIKey()
    orgID = LoadOrgID()

    DetailedUsersInfoFilePath = './data/DetailedUsersInfo.json'
    SimpleUsersInfoFilePath   = './data/SimpleUsersInfo.json'

    start = time.time()
    with console.status("[green]Getting User information...") as status:
        if IsJSONFileValid(DetailedUsersInfoFilePath) == False or IsJSONFileValid(SimpleUsersInfoFilePath) == False:
            console = Console()
            with console.status("[green]Getting System Information...") as status:

                url = "https://console.jumpcloud.com/api/systemusers"

                # There is a 100 user limit on the API, so we first have to determine how many users there are and 
                # pull 100 users at a time.  Start with the limit of 1 to get the totalCount
                limit = "1"
                # Skip is the offset
                skip = 0
                payload = { 
                    'limit':limit,
                    'skip':skip,
                }
                headers = {
                    "x-org-id": orgID,
                    "x-api-key": api
                }

                session = requests.Session()

                response = session.get(url, headers=headers, params=payload)

                # Determine the total number of users
                TotalUsers = response.json()['totalCount']

                # Create an empty list that will contain our users and set the limit and offset
                DetailedUsersInfo = []
                limit = "100"
                # Skip is the offset
                skip = 0
                session = requests.Session()

                while( skip < TotalUsers ):
                    payload = { 
                    'fields':'_id username displayname email state',
                    'limit':limit,
                    'skip':skip 
                    }
                    
                    response = session.get(url, headers=headers, params=payload)
                    #response = requests.get(url, headers=headers, params=payload)
                    
                    TempDictionary = response.json()['results']
                    [DetailedUsersInfo.append(item) for item in TempDictionary]
                    skip += 100

                # Create a dictionary that has the UserID as the key and the Email address as the value
                SimpleUsersInfo = {}
                for UserInfo in DetailedUsersInfo:
                    
                    username = UserInfo.get('username')
                    displayname = UserInfo.get('displayname', 'N/A')
                    email = UserInfo.get('email', 'N/A')
                    state = UserInfo.get('state', 'N/A')

                    SimpleUsersInfo[UserInfo['_id']] = { 
                        'username':username,
                        'displayname':displayname,
                        'email':email,
                        'state':state
                    }

                with open(DetailedUsersInfoFilePath, "w") as DetailedUsersInfoFileHandler:
                    json.dump(DetailedUsersInfo, DetailedUsersInfoFileHandler)
                
                with open(SimpleUsersInfoFilePath, "w") as SimpleUsersInfoFileHandler:
                    json.dump(SimpleUsersInfo, SimpleUsersInfoFileHandler)

        else:
            DetailedUsersInfo = json.loads(open(DetailedUsersInfoFilePath, "r").read())
            SimpleUsersInfo = json.loads(open(SimpleUsersInfoFilePath, "r").read())
    #console.log("User information obtained.")

    if debug:
            import inspect
            print("\n", inspect.stack()[0][3], ' elapsed time: ', round(time.time() - start, 2), " seconds. ")
    
    return DetailedUsersInfo, SimpleUsersInfo


def CreateUsersToSystemsAssociation(SimpleUsersInfo):
    #########################################################################
    # 
    # https://docs.jumpcloud.com/api/2.0/index.html#tag/Users/operation/graph_userTraverseSystem
    # 
    # This function takes in SimpleSystemsInfo and SimpleUsersInfo and uses the data from those to call the UserID to systems API
    # 
    # Returns an object
    #########################################################################
    console = Console()
    UsersToSystemsAssociationFilePath = './data/UsersToSystemsAssociation.json'
    SystemsToUsersAssociationFilePath = './data/SystemsToUsersAssociation.json'

    start = time.time()    
    
    if IsJSONFileValid(UsersToSystemsAssociationFilePath) == False:
        api = LoadAPIKey()
        orgID = LoadOrgID()

        # Create a list named UsersToSystemList to contain our results
        UsersToSystemsDict = {}
        
        # This for loop goes through each User in the SimpleUsersInfo dictionary and places the details of the
        # user in the UserInfo tuple.
        # UserInfo[0] is a string and contains the UserID
        # UserInfo[1] is a dictionary and contains 'username', 'displayname', 'email', and 'state'
        for UserInfo in track(SimpleUsersInfo.items(), description="Creating the Users to System data..."):
                UserID          = UserInfo[0]
                UserEmail       = UserInfo[1]['email']
                UserDisplayName = UserInfo[1]['displayname']
                UserState       = UserInfo[1]['state']

                url = f'https://console.jumpcloud.com/api/v2/users/{UserID}/systems'

                limit = '100'
                skip = '0'
                payload = { 
                    'limit':limit,
                    'skip':skip
                }
                headers = {
                    "Accept": "application/json",
                    "Content-Type": "application/json",
                    "x-org-id": orgID,
                    "x-api-key": api
                }

                session = requests.Session()
                response = session.get(url, headers=headers, params=payload)
                if not response.ok:
                    print('An error has occurred when calling the Jumpcloud API for UserID {UserID} with UserEmail {UserEmail}')
                    err = requests.exceptions.HTTPError
                    print(err)
                else:
                    # This API call returns a list named SystemsBoundToUser that contains all of the systems that this UserID is bound to.
                    # Each element of the list is a dictionary that contains the following fields, 'id' and 'type'.  
                    # 'id' is the SystemID and 'type' is always system
                    SystemsBoundToUser = json.loads(response.text)
                    #print(json.dumps(responseJSON, indent=4, sort_keys=True))

                    # Now iterate over the list named SystemsBoundToUser in case a user is bound to more than one system.
                    TempSystemList = []
                    for System in SystemsBoundToUser:
                        # Since SystemsBoundToUser only contains a SystemID, it is not useful for a human readable report.
                        # We need to look up the System ID to get the system's information, such as displayName
                        # To do this, the dictionary named SimpleSystemsInfo is used. 
                        # SimpleSystemsInfo's keys are SystemIDs, so we can match the SystemID contained in SystemsUsers['id']
                        # to gather details of the System
                        # Each System in SimpleSystemsInfo is a dictionary containing keys for displayName, hostname, os, and lastContact
                        # When iterating through SimpleSystemsInfo, the .items() function is used to create a tuple with 
                        # System[0] containing the SystemID and 
                        # System[1] containing a dictionary that contains displayName, hostname, os, and lastContact

                        # Add the System ID to the UserID's key in the dictionary UsersToSystemsDict
                        TempSystemList.append(System['id'])
                        
                    UsersToSystemsDict[UserID] = TempSystemList

        # Write the contents of UsersToSystemsDict to a json file
        with open(UsersToSystemsAssociationFilePath, 'w') as UsersToSystemsAssociationFileHandler:
            json.dump(UsersToSystemsDict, UsersToSystemsAssociationFileHandler)

    else:
        UsersToSystemsDict = json.loads(open(UsersToSystemsAssociationFilePath, "r").read())


    if IsJSONFileValid(SystemsToUsersAssociationFilePath) == False:
        # Instead of having to call another API endpoint, take the UsersToSystemsDict and 
        # invert it to create SystemsToUsersDict
        SystemsToUsersDict = {}
        ExistingValue = []

        for UserSystems in track(UsersToSystemsDict.items(), description="Creating the System to Users data..."):
            # UserSystems[0] is the UserID of User that we want to add
            UserID = UserSystems[0]
            # UserSystems[1] contains a list of the SystemIDs that a User belongs to
            SystemList = UserSystems[1]
            for System in SystemList:
                # Pull the existing value, if any, and place into ExistingValue so that we can append to it.
                ExistingValue = SystemsToUsersDict.get(System)
                # If there is no key for System, then create an empty list and then add the UserID to that list
                if ExistingValue == None:
                    SystemsToUsersDict[System] = []
                    SystemsToUsersDict[System] = [UserID]
                else:
                    # If there is an existing value, we need to copy it out first.  Otherwise, you will overwrite the existing value
                    # with the new value.
                    TempValue = SystemsToUsersDict[System]
                    TempValue.append(UserID)
                    SystemsToUsersDict[System] = TempValue
        
        # Write the contents of UsersToSystemsDict to a json file
        with open(SystemsToUsersAssociationFilePath, 'w') as SystemsToUsersAssociationFileHandler:
            json.dump(SystemsToUsersDict, SystemsToUsersAssociationFileHandler)
    else:
        SystemsToUsersDict = json.loads(open(SystemsToUsersAssociationFilePath, "r").read())

    if debug:
        import inspect
        print("\n", inspect.stack()[0][3], ' elapsed time: ', round(time.time() - start, 2), " seconds. ")

    return UsersToSystemsDict, SystemsToUsersDict


def ListApplicationInstancesMacOS(ApplicationName):
    #########################################################################
    # List of all of the instances of an Application on MacOS
    # Lists all apps for macOS devices. For Windows devices, use List System Insights Programs.
    # https://docs.jumpcloud.com/api/2.0/index.html#tag/System-Insights/operation/systeminsights_list_apps
    # #########################################################################

    start = time.time()

    api = LoadAPIKey()
    orgID = LoadOrgID()

    url = 'https://console.jumpcloud.com/api/v2/systeminsights/apps'

    filter = { f'bundle_name:eq:{ApplicationName}' }
    limit = '10000'
    skip = '0'
    sort = []
    payload = { 
        'limit':limit,
        'filter':filter,
        'skip':skip,
        'sort':sort 
    }
    headers = {
        'x-org-id': orgID,
        'x-api-key': api
    }
    session = requests.Session()
    response = session.get(url, headers=headers, params=payload)
    
    responseJSON = json.loads(response.text)
    #print(json.dumps(responseJSON, indent=4, sort_keys=True))

    if debug:
        import inspect
        print(inspect.stack()[0][3], ' elapsed time: ', time.time() - start)

    return responseJSON


def ListApplicationInstancesWindows(ApplicationName):
    #########################################################################
    # List of all of the instances of an Application on Windows
    # https://docs.jumpcloud.com/api/2.0/index.html#tag/System-Insights/operation/systeminsights_list_programs
    ##########################################################################

    start = time.time()

    api = LoadAPIKey()
    orgID = LoadOrgID()

    url = 'https://console.jumpcloud.com/api/v2/systeminsights/programs'

    filter = { f'name:eq:{ApplicationName}' }
    #filter = {}
    limit = '10000'
    skip = '0'
    sort = []
    payload = { 
        'limit':limit,
        'filter':filter,
        'skip':skip,
        'sort':sort 
    }
    headers = {
        'x-org-id': orgID,
        'x-api-key': api
    }
    
    session = requests.Session()

    response = session.get(url, headers=headers, params=payload)
    
    responseJSON = json.loads(response.text)
    #print(json.dumps(responseJSON, indent=4, sort_keys=True))

    if debug:
        import inspect
        print(inspect.stack()[0][3], ' elapsed time: ', time.time() - start)

    return responseJSON


def ListApplicationInstancesLinux(ApplicationName):
    #########################################################################
    # List of all of the instances of an Application on Linux
    # https://docs.jumpcloud.com/api/2.0/index.html#tag/System-Insights/operation/systeminsights_list_linux_packages
    ##########################################################################

    start = time.time()

    api = LoadAPIKey()
    orgID = LoadOrgID()

    url = 'https://console.jumpcloud.com/api/v2/systeminsights/linux_packages'

    filter = f'name:eq:{ApplicationName}'
    #filter = ''
    limit = '1000'
    skip = '0'
    sort = []
    payload = { 
        'limit':limit,
        'filter':filter,
        'skip':skip,
        'sort':sort 
    }
    headers = {
        'x-org-id': orgID,
        'x-api-key': api
    }
    
    session = requests.Session()

    response = session.get(url, headers=headers, params=payload)
    
    responseJSON = json.loads(response.text)
    #print(json.dumps(responseJSON, indent=4, sort_keys=True))

    if debug:
        import inspect
        print(inspect.stack()[0][3], ' elapsed time: ', time.time() - start)

    return responseJSON


def CreateApplicationToSystemOutput(MacOSApplicationInstances,WindowsApplicationInstances,LinuxApplicationInstances,SimpleSystemInfo,SimpleUsersInfo,SystemsToUsersAssociation):

    CombinedApplicationListJSONFilePath = './data/CombinedApplicationList.json'
    CombinedApplicationListCSVFilePath = './data/CombinedApplicationList.csv'

    start = time.time()    
    CombinedApplicationList = []
    
    for System in MacOSApplicationInstances:
        TempSystemInfo = {}
        TempUsersList  = []
        UserID         = ''
        SystemIDtoLookup = System['system_id']
        
        TempSystemInfo['SystemDisplayName'] = SimpleSystemInfo[SystemIDtoLookup]['displayName']
        TempSystemInfo['OS'] = 'MacOS'
        TempSystemInfo['ApplicationName'] = System['bundle_name']
        TempSystemInfo['Version'] = System['bundle_version']
        TempSystemInfo['CollectionDateTime'] = System['collection_time']
        # Need to convert the UserIDs to User email address
        # for UserID in SystemsToUsersAssociation[SystemIDtoLookup]:
        #     TempUsersList.append(SimpleUsersInfo[UserID]['email'])
        SystemUsers = SystemsToUsersAssociation.get(SystemIDtoLookup)
        if SystemUsers != None:
            for UserID in SystemUsers:
                TempUsersList.append(SimpleUsersInfo[UserID]['email'])
        else:
            TempUsersList.append('no_user_found')
        # Create a semicolon delimited string from the user email addresses
        TempSystemInfo['Users'] = ';'.join(TempUsersList)
        TempSystemInfo['SystemID'] = SystemIDtoLookup
        CombinedApplicationList.append(TempSystemInfo)

    for System in WindowsApplicationInstances:
        TempSystemInfo = {}
        TempUsersList  = []
        UserID         = ''
        SystemIDtoLookup = System['system_id']
        
        TempSystemInfo['SystemDisplayName'] = SimpleSystemInfo[SystemIDtoLookup]['displayName']
        TempSystemInfo['OS'] = 'Windows'
        TempSystemInfo['ApplicationName'] = System['name']
        TempSystemInfo['Version'] = System['version']
        TempSystemInfo['CollectionDateTime'] = System['collection_time']
        # Need to convert the UserIDs to User email address
        SystemUsers = SystemsToUsersAssociation.get(SystemIDtoLookup)
        if SystemUsers != None:
            for UserID in SystemUsers:
                TempUsersList.append(SimpleUsersInfo[UserID]['email'])
        else:
            TempUsersList.append('no_user_found')
        # Create a semicolon delimited string from the user email addresses
        TempSystemInfo['Users'] = ';'.join(TempUsersList)
        TempSystemInfo['SystemID'] = SystemIDtoLookup
        CombinedApplicationList.append(TempSystemInfo)
    
    for System in LinuxApplicationInstances:
        TempSystemInfo = {}
        TempUsersList  = []
        UserID         = ''
        SystemIDtoLookup = System['system_id']
        
        TempSystemInfo['SystemDisplayName'] = SimpleSystemInfo[SystemIDtoLookup]['displayName']
        TempSystemInfo['OS'] = 'Linux'
        TempSystemInfo['ApplicationName'] = System['name']
        TempSystemInfo['Version'] = System['version']
        TempSystemInfo['CollectionDateTime'] = System['collection_time']
        # Need to convert the UserIDs to User email address
        SystemUsers = SystemsToUsersAssociation.get(SystemIDtoLookup)
        if SystemUsers != None:
            for UserID in SystemUsers:
                TempUsersList.append(SimpleUsersInfo[UserID]['email'])
        else:
            TempUsersList.append('no_user_found')
        # Create a semicolon delimited string from the user email addresses
        TempSystemInfo['Users'] = ';'.join(TempUsersList)
        TempSystemInfo['SystemID'] = SystemIDtoLookup
        CombinedApplicationList.append(TempSystemInfo)
    

    if debug:
        import inspect
        print("\n", inspect.stack()[0][3], ' elapsed time: ', round(time.time() - start, 2), " seconds. ")

    # Write the contents of UsersToSystemsDict to a json file
    with open(CombinedApplicationListJSONFilePath, 'w') as CombinedApplicationListFileHandler:
        json.dump(CombinedApplicationList, CombinedApplicationListFileHandler)
    
    WriteListOfDictionariesToCSV(CombinedApplicationList,CombinedApplicationListCSVFilePath)

    return CombinedApplicationList


def WriteListOfDictionariesToCSV(ContentsToWrite, FileName):

    start = time.time()

    keys = ContentsToWrite[0].keys()
    # with open(FileName, 'w', newline='') as CSVFile:
    #     writer = csv.writer(CSVFile)
    #     writer.writerow(keys)
    #     writer.writerows(ContentsToWrite)

    with open(FileName, 'w', encoding='utf8', newline='') as CSVFile:
        fc = csv.DictWriter(CSVFile, fieldnames=keys)
        fc.writeheader()
        fc.writerows(ContentsToWrite)
    
    if debug:
        import inspect
        print(inspect.stack()[0][3], ' elapsed time: ', time.time() - start)

    
if __name__ == '__main__':
    main()

 

0 REPLIES 0