sábado, 15 de diciembre de 2018

Hey Vector, who do I look like?


I have played with Cozmo in the past, so when Vector came out...I knew I needed to do something with it ;)

So...what’s Vector?


Pretty much a black Cozmo? Well...yes and no :) Vector has a better processor, 4 cores, a microphone, almost double amount of parts, a better camera and colorful display.

As you know...I’m a really big fan of SAP Leonardo Machine Learning APIs...as they allow you to easily consume Machine Learning services.

For this blog I wanted to do something that I have always liked...take a picture of someone and then compare it with photos of famous actors and actresses and see who this person resembles the most ;)

So, let’s start :D

Installing the Vector SDK

Make sure that Vector is connected to the Internet by using Vectors app on IPhone or Android. Here’s a nice video on how to do that.

Once your Vector is connected to the Internet...make sure to simply kill the Vector's app on your phone.

The Vector SDK was only available to the people who back Anki on their Kickstarter campaign.
...but since November 11th, the SDK is on Public Alpha! :D Which means...you can finally get your hands on it ;)

If by any chance you got the SDK installed before...remove it before moving forward…

python3 -m pip uninstall anki_vector

Then simply install it by doing this…

python3 -m pip install --user anki_vector

Then, you need to authenticate your Vector…

python3 -m anki_vector.configure

You will be asked for Vector’s name, ip address and serial number. Also you will be requested for your Anki Cloud Credentials.

To get this information simply put Vector on his charger...and press his top twice. This will give you his name, then lift up and down his handle in order to the IP. The serial number is on Vector’s bottom.

The Learning Phase


First things first...we need a bunch of pictures from famous people...for that I relied on The Movie DB website...


I went and download almost randomly 100 images of both men and woman. I didn’t went into each person but rather saved the “thumbnails”.

Now, there’s an SAP Leonardo API called “Inference Service for Face Feature Extraction” which basically grabs and image, determine if there’s a face or not and then extracts its features...like the color of the eyes, form on the mouth, hair and so on...and that information is returned in a nice although pretty much impossible to decipher Vector of Features. I mean...they look just like numbers...and they can mean anything :P

Anyway...I created a folder called “People” and drop all the 100 images. So, next step is of course get all the features for all the images...and manually its obviously not only hard but pointless...it’s way better to optimize the process ;)

One programming language that I grown to love is Crystal...Fast as C, slick as Ruby, yep...pretty much a better way of doing Ruby :)

Installation is pretty easy and you can find instructions here but I’m using Ubuntu on VMWare, so here are the instruction for it…

On a terminal window copy and paste this…

curl -sSL https://dist.crystal-lang.org/apt/setup.sh | sudo bash

Then simply do this…

sudo apt-get update

sudo apt install crystal

Installation of the following modules is optional but recommended…

sudo apt install libssl-dev      # for using OpenSSL
sudo apt install libxml2-dev     # for using XML
sudo apt install libyaml-dev     # for using YAML
sudo apt install libgmp-dev      # for using Big numbers
sudo apt install libreadline-dev # for using Readline

Once we’re done...it’s time to write the application…first create a folder called “Features”.

Call your script PeopleGenerator.cr and copy and paste the following code…


PeopleGenerator.cr
require "http"
require "json"

class FaceFeature
  JSON.mapping({
    face_feature: Array(Float64)
  })
end

class Predictions
  JSON.mapping({
 faces: Array(FaceFeature)
  })
end

class Person
  JSON.mapping({
 id: String,
 predictions: Array(Predictions)
  })
end

folder = Dir.new("#{__DIR__}/People")
while photo = folder.read
  if photo != "." && photo != ".." && photo != "Features"
 io = IO::Memory.new
 builder = HTTP::FormData::Builder.new(io)

 File.open("#{__DIR__}/People/" + photo) do |file|
  metadata = HTTP::FormData::FileMetadata.new(filename: photo)
  headers = HTTP::Headers{"Content-Type" => "image/jpg"}
  builder.file("files", file, metadata, headers)
 end
 builder.finish

 headers = HTTP::Headers{"Content-Type" => builder.content_type, 
                                "APIKey" => "YourAPIKey",
                                "Accept" => "application/json"}
 response = HTTP::Client.post("https://sandbox.api.sap.com/ml/
                                      facefeatureextraction/
                                      face-feature-extraction", body: io.to_s , 
                                      headers: headers)
 
 feature_name = "#{__DIR__}/Features/" + File.basename(photo, ".jpg") + ".txt"
 
 puts photo 
 
 File.write(feature_name, Person.from_json(response.body).predictions[0].
                   faces[0].face_feature)
 sleep  2.second
  end
end

command = "zip -r -j features.zip #{__DIR__}/Features"
Process.run("sh", {"-c", command})

puts "Done."

Let’s explain the code before we check the results…

require "http"
require "json"

We need these two libraries to be able to call the SAP Leonardo API and also to be able to read and extract the results…

class FaceFeature
  JSON.mapping({
    face_feature: Array(Float64)
  })
end

class Predictions
  JSON.mapping({
faces: Array(FaceFeature)
  })
end

class Person
  JSON.mapping({
id: String,
predictions: Array(Predictions)
  })
end

This is the JSON mapping that we need to use to extract the information coming back from the API.

folder = Dir.new("#{__DIR__}/People")
while photo = folder.read
  if photo != "." && photo != ".." && photo != "Features"
io = IO::Memory.new
builder = HTTP::FormData::Builder.new(io)

File.open("#{__DIR__}/People/" + photo) do |file|
metadata = HTTP::FormData::FileMetadata.new(filename: photo)
headers = HTTP::Headers{"Content-Type" => "image/jpg"}
builder.file("files", file, metadata, headers)
end
builder.finish

headers = HTTP::Headers{"Content-Type" => builder.content_type, "APIKey" => "YourAPIKey,"Accept" => "application/json"}
response = HTTP::Client.post("https://sandbox.api.sap.com/ml/facefeatureextraction/face-feature-extraction", body: io.to_s , headers: headers)

feature_name = "#{__DIR__}/Features/" + File.basename(photo, ".jpg") + ".txt"

puts photo

File.write(feature_name, Person.from_json(response.body).predictions[0].faces[0].face_feature)
sleep  2.second
  end
end

command = "zip -r -j features.zip #{__DIR__}"
Process.run("sh", {"-c", command})

puts "Done."

This section is larger, first we specify the folder from the images will be read. Then for each image we will if it’s a picture or a folder structure...of course we want images only…

Then, we create a FormData builder in order to avoid having to base64 encode the images...put them in JSON payload and so on...this way is easier and native…

We open each image and feed the FormData metadata and headers.

Also, we need to pass the extra “headers” required by SAP Leonardo.

Once that is done, we can simply call the REST API, and then we create a “Feature Name” which is going to be the name of the generated file...basically the image name with an “.txt” extension.

For each file we’re going to extract the feature vector from the JSON response, write the file and give 2 seconds delay just to not overflow the API call…

Once that’s done, we simply call a “zip” command from the terminal and zip it…


Now, the zip file will contain a 100 files...each with the features of all the images that we have on our “People” folder.

Simply as that...we have trained our application ;)

The Testing and Execution Phase


I know that usually you test your model first...but for this once...we can do both at the same time ;)

We’re going to create a Python script that will deal with taking our picture...call the Features API on that image and then call another API to determine who we do look like…

Let’s create a script called GuessWho.py


GuessWho.py
import anki_vector
import threading
import requests
import os
import json
import time
import subprocess
import re
import math
from PIL import Image
from anki_vector.events import Events
from anki_vector.util import degrees

event_done = False
said_text = False
new_width  = 184
new_height = 96

def main():
    args = anki_vector.util.parse_command_args()
    with anki_vector.Robot(args.serial, enable_face_detection=True, 
                           enable_camera_feed=True) as robot:
        evt = threading.Event()

        def on_robot_observed_face(event_type, event):

            global said_text
            if not said_text
                said_text = True
                robot.say_text("Taking Picture!")
                image = robot.camera.latest_image
                image.save("Temp.png")
                robot.say_text("Picture Taken!")
                evt.set()

        robot.behavior.set_head_angle(degrees(45.0))
        robot.behavior.set_lift_height(0.0)

        robot.events.subscribe(on_robot_observed_face, Events.robot_observed_face)

        try:
            if not evt.wait(timeout=10):
                print("---------------------------------")
        except KeyboardInterrupt:
            pass

def guess_who():
    args = anki_vector.util.parse_command_args()
    with anki_vector.Robot(args.serial) as robot: 
        url = "https://sandbox.api.sap.com/ml/facefeatureextraction/
               face-feature-extraction"
                
        img_path = "Temp.png"
        files = {'files': open (img_path, 'rb')}

        headers = {
            'APIKey': "YourAPIKey",
            'Accept': "application/json",
        }
    
        response = requests.post(url, files=files, headers=headers)
  
        robot.say_text("I'm processing your picture!")
    
        json_response = json.loads(response.text)
        json_text = json_response['predictions'][0]['faces'][0]['face_feature']
    
        f = open("myfile.txt", "w")
        f.write(str(json_text))
        f.close()
    
        time.sleep(1)
    
        p = subprocess.Popen('zip -u features.zip myfile.txt', shell=True)
    
        time.sleep(1)
    
        url = "https://sandbox.api.sap.com/ml/similarityscoring/similarity-scoring"
    
        files = {'files': ("features.zip", open ("features.zip", 'rb'), 
                 'application/zip')}
        params = {'options': '{"numSimilarVectors":100}'}
    
        response = requests.post(url, data=params, files=files, headers=headers)
        json_response = json.loads(response.text)

        robot.say_text("I'm comparing your picture with one hundred other pictures!")

        for x in range(len(json_response['predictions'])):
            if json_response['predictions'][x]['id'] == "myfile.txt":
                name, _ = os.path.splitext(json_response['predictions'][x]
                          ['similarVectors'][0]['id']) 
                name = re.findall('[A-Z][^A-Z]*', name)
                full_name = " ".join(name)
                pic_name = "People/" + "".join(name) + ".jpg"
                avg = json_response['predictions'][x]['similarVectors'][0]['score']
                robot.say_text("You look like " + full_name + 
                               " with a confidence of " + 
                                str(math.floor(avg * 100)) + " percent")
                image_file = Image.open(pic_name)
                image_file = image_file.resize((new_width, new_height), 
                                                Image.ANTIALIAS)  
                screen_data = anki_vector.screen.convert_image_to_screen_data(
                                                                   image_file)
                robot.behavior.set_head_angle(degrees(45.0))
                robot.conn.release_control()
                time.sleep(1)
                robot.conn.request_control()                
                robot.screen.set_screen_with_image_data(screen_data, 0.0)
                robot.screen.set_screen_with_image_data(screen_data, 25.0)
                
                print(full_name)
                print(str(math.floor(avg * 100)) + " percent")

                time.sleep(5)

if __name__ == '__main__':
    main()
    guess_who()

This script is bigger...so let’s make sure we understand everything that is going on…

import anki_vector
import threading
import requests
import os
import json
import time
import subprocess
import re
import math
from PIL import Image
from anki_vector.events import Events
from anki_vector.util import degrees

That’s a lot of libraries :) The first one is pretty obvious...is how we can connect to Vector ;)

The second one is to handle “threads” as we need to do a couple of things asynchronously.

The third one is to handle the call to the APIs.

The fourth one is to handle folder access.

The fifth one is to handle the JSON response coming back from the API.

The sixth one is so that we can have a delay in the execution of the application.

The seventh is to be able to call terminal commands.

The eight one is to use Regular Expressions.

The ninth one is to handle math operations.

The tenth one is to handle image operations.

The eleventh is to handle events as we want Vector to try to detect our face.

The last one is to be able to move Vectors face.

def main():
    args = anki_vector.util.parse_command_args()
    with anki_vector.Robot(args.serial, enable_face_detection=True, enable_camera_feed=True) as robot:
        evt = threading.Event()

        def on_robot_observed_face(event_type, event):

            global said_text
            if not said_text:
                said_text = True
                robot.say_text("Taking Picture!")
                image = robot.camera.latest_image
                image.save("Temp.png")
                robot.say_text("Picture Taken!")
                evt.set()

        robot.behavior.set_head_angle(degrees(45.0))
        robot.behavior.set_lift_height(0.0)

        robot.events.subscribe(on_robot_observed_face, Events.robot_observed_face)

        try:
            if not evt.wait(timeout=5):
                print("---------------------------------")
        except KeyboardInterrupt:
            pass

This one is for sure...our main event :) Here we’re going to open a connection with Vector, and as we can have multiple Vectors...we need to grab the serial number to specify which one we want to use...also we need to activate both face detection and camera feed.

We’re going to start a thread as we need to call an event where Vector tries to detect our face. If he can see us, then he will say “Taking Picture!”...grab the image, save it and then say “Picture Taken!”. After that the event is done...but...while this is happening we can move his head and drop down his handle so that he can see us better.

As you can see we’re subscribed to two events, one to observe our face and the other when our face is there and visible…

def guess_who():
    args = anki_vector.util.parse_command_args()
    with anki_vector.Robot(args.serial) as robot:
        url = "https://sandbox.api.sap.com/ml/facefeatureextraction/face-feature-extraction"
                
        img_path = "Temp.png"
        files = {'files': open (img_path, 'rb')}

        headers = {
            'APIKey': "YourAPIKey",
            'Accept': "application/json",
        }

        response = requests.post(url, files=files, headers=headers)

        robot.say_text("I'm processing your picture!")

        json_response = json.loads(response.text)
        json_text = json_response['predictions'][0]['faces'][0]['face_feature']

        f = open("myfile.txt", "w")
        f.write(str(json_text))
        f.close()

        time.sleep(1)

        p = subprocess.Popen('zip -u features.zip myfile.txt', shell=True)

        time.sleep(1)

        url = "https://sandbox.api.sap.com/ml/similarityscoring/similarity-scoring"

        files = {'files': ("features.zip", open ("features.zip", 'rb'), 'application/zip')}
        params = {'options': '{"numSimilarVectors":100}'}

        response = requests.post(url, data=params, files=files, headers=headers)
        json_response = json.loads(response.text)

        robot.say_text("I'm comparing your picture with one hundred other pictures!")

        for x in range(len(json_response['predictions'])):
            if json_response['predictions'][x]['id'] == "myfile.txt":
                name, _ = os.path.splitext(json_response['predictions'][x]['similarVectors'][0]['id']) 
                name = re.findall('[A-Z][^A-Z]*', name)
                full_name = " ".join(name)
                pic_name = "People/" + "".join(name) + ".jpg"
                avg = json_response['predictions'][x]['similarVectors'][0]['score']
                robot.say_text("You look like " + full_name + " with a confidence of " + str(math.floor(avg * 100)) + " percent")
                image_file = Image.open(pic_name)
                image_file = image_file.resize((new_width, new_height), Image.ANTIALIAS)  
                screen_data = anki_vector.screen.convert_image_to_screen_data(image_file)
                robot.behavior.set_head_angle(degrees(45.0))
                robot.conn.release_control()
                time.sleep(1)
                robot.conn.request_control()                
                robot.screen.set_screen_with_image_data(screen_data, 0.0)
                robot.screen.set_screen_with_image_data(screen_data, 25.0)
                
                print(full_name)
                print(str(math.floor(avg * 100)) + " percent")

                time.sleep(5)

This method will handle to rough parts of our application…

We connect to Vector once again...although this time we don’t need to activate anything as the picture has been already taken.

We pass the URL for the features API.

Then we open our “Temp.png” file which is the image that Vector took from us.

We need to pass the extra header for the SAP Leonardo API.

We call the API and get the JSON response.

Again, we need to extract the feature information from the JSON response. This time however we’re going to create a single file called “myfile.txt”. We going to make the application sleep for a second and then call a terminal process to add “myfile.txt” to our Features.zip file…

Then we sleep again for another second...and this is just not to overflow the API calls…

Here, we’re going to call a different API which is called Inference Service for Similarity Scoring

This API will read all the 101 features files and determine the cosine distance (-1 to 1) from each file compared to one another. This way it can determine which files are closer to each other and hence to whom do we resemble the most...providing us with a percentage of confidence.

This call is a little bit more complicated that the previous one as we need to upload the zip file…

        files = {'files': ("features.zip", open ("features.zip", 'rb'), 'application/zip')}
        params = {'options': '{"numSimilarVectors":100}'}

        response = requests.post(url, data=params, files=files, headers=headers)
        json_response = json.loads(response.text)

Take into account that while we have 101 files...we need to compare 1 file against 100 others...so we pass 100 as the “numSimilarVectors”.

Once we done that, we need to read each section from the JSON response until we find the id that have the value of “myfile.txt”. Once we have that, we use a Regular Expression to extract only the name without the extension. Also, we need to have the name of the image...so in the end we need to have something like this…

full_name = “Nicolas Cage”
pic_name = “People/NicolasCage.jpg”

We need to extract the percentage of confidence as well…

avg = json_response['predictions'][x]['similarVectors'][0]['score'] 

So, we can have Vector saying “You look like Nicolas Cage with a confidence of 75 percent”.

Now...here’s come the fun part ;) We already know how do we look like...but let’s say...we don’t really remember how Nicolas Cage looks like...so let’s take advantage of Vector’s fancy screen and display it there ;) By the way...we need to release control, gain it back and display the image for zero seconds and then re-display it...this is mainly because Vector’s eyes keep blocking the image on the screen...and this a way to prevent that behavior ;)

           image_file = Image.open(pic_name)
                image_file = image_file.resize((new_width, new_height), Image.ANTIALIAS)  
                screen_data = anki_vector.screen.convert_image_to_screen_data(image_file)
                robot.behavior.set_head_angle(degrees(45.0))
                robot.conn.release_control()
                time.sleep(1)
                robot.conn.request_control()                
                robot.screen.set_screen_with_image_data(screen_data, 0.0)
                robot.screen.set_screen_with_image_data(screen_data, 25.0)

First we open the image, then we resize it so it fits on the screen, then we convert it to Vector’s format and finally we display it on the screen, specifying for how long we want it there…

                print(full_name)
                print(str(math.floor(avg * 100)) + " percent")

                time.sleep(5)

We print some information on the screen and then sent it to sleep for 5 seconds so the image doesn’t disappear too quickly ;)

Finally! The most important part of the whole script...calling the functions :P

if __name__ == '__main__':
    main()
    guess_who()

And that’s pretty much it :) We open a terminal window and type…

python3 GuessWho.py

Vector is going to try to look at us and detect our face...he will take a picture...SAP Leonardo APIs are going to be called...we will listened and see who do we look like ;)

Hope you enjoyed this blog...I obviously did :D

And just to wrap up things...here’s a small video…


BTW...this is the picture that Vector took of me...


Greetings,

Blag.
SAP Labs Network.